Coverage for apio/utils/util.py: 85%

208 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-06 10:20 +0000

1# -*- coding: utf-8 -*- 

2# -- This file is part of the Apio project 

3# -- (C) 2016-2018 FPGAwars 

4# -- Author Jesús Arroyo 

5# -- License GPLv2 

6# -- Derived from: 

7# ---- Platformio project 

8# ---- (C) 2014-2016 Ivan Kravets <me@ikravets.com> 

9# ---- License Apache v2 

10"""Misc utility functions and classes.""" 

11 

12import sys 

13import os 

14from contextlib import contextmanager 

15from enum import Enum 

16from dataclasses import dataclass 

17from typing import Optional, Any, Tuple, List 

18import subprocess 

19from threading import Thread 

20from pathlib import Path 

21import apio 

22from apio.utils import env_options 

23from apio.common.apio_console import cout, cerror 

24from apio.common.apio_styles import INFO 

25 

26 

27# ---------------------------------------- 

28# -- Constants 

29# ---------------------------------------- 

30 

31 

32class ApioException(Exception): 

33 """Apio error""" 

34 

35 

36class AsyncPipe(Thread): 

37 """A class that implements a pipe that calls back on each incoming line 

38 from an internal thread. Used to process in real time scons output to 

39 show its progress.""" 

40 

41 def __init__(self, line_callback=None): 

42 """If line_callback is not None, it is called for each line as 

43 line_callback(line:str, terminator:str) where line is the line content 

44 and terminator is one of: 

45 "\r" (CR) 

46 "\n" (LF) 

47 "" (EOF) 

48 

49 The callback is done from a private Python thread of this pipe so make 

50 sure to have the proper locking and synchronization as needed. 

51 """ 

52 

53 Thread.__init__(self) 

54 self.outcallback = line_callback 

55 

56 self._fd_read, self._fd_write = os.pipe() 

57 

58 # -- A list of lines received so far. 

59 self._lines_buffer = [] 

60 

61 self.start() 

62 

63 def get_buffer(self): 

64 """DOC: TODO""" 

65 

66 return self._lines_buffer 

67 

68 def fileno(self): 

69 """DOC: TODO""" 

70 

71 return self._fd_write 

72 

73 def _handle_incoming_line(self, bfr: bytearray, terminator: str): 

74 """Handle a new incoming line. 

75 Bfr is a bytes with the line's content, possibly empty. 

76 See __init__ for the description of terminator. 

77 """ 

78 # -- Convert the line's bytes to a string. Replace invalid utf-8 

79 # -- chars with "�" 

80 line = bfr.decode("utf-8", errors="replace") 

81 

82 # -- Append to the lines log buffer. 

83 self._lines_buffer.append(line) 

84 

85 # -- Report back if caller passed a callback. 

86 if self.outcallback: 86 ↛ exitline 86 didn't return from function '_handle_incoming_line' because the condition on line 86 was always true

87 self.outcallback(line, terminator) 

88 

89 def run(self): 

90 """DOC: TODO""" 

91 

92 # -- Prepare a buffer for collecting the line chars, excluding 

93 # -- its line terminator. 

94 bfr = bytearray() 

95 

96 # -- We open in binary mode so we have access to the line terminators. 

97 # -- This is important with progress bars which don't advance to the 

98 # -- next line but redraw on the same line. 

99 with os.fdopen(self._fd_read, "rb") as f: 

100 while True: 

101 b: Optional[bytearray] = f.read(1) 

102 assert len(b) <= 1 

103 

104 # -- Handle end of file 

105 if not b: 

106 if bfr: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 self._handle_incoming_line(bfr, "") 

108 return 

109 

110 # -- Handle \r terminator 

111 if b == b"\r": 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 self._handle_incoming_line(bfr, "\r") 

113 bfr.clear() 

114 continue 

115 

116 # -- Handle \n terminator 

117 if b == b"\n": 

118 self._handle_incoming_line(bfr, "\n") 

119 bfr.clear() 

120 continue 

121 

122 # -- Handle a regular character 

123 bfr.append(b[0]) 

124 

125 def close(self): 

126 """DOC: TODO""" 

127 

128 os.close(self._fd_write) 

129 self.join() 

130 

131 

132class TerminalMode(Enum): 

133 """Represents to two modes of stdout/err.""" 

134 

135 # Output is sent to a terminal. Terminal width is available, and text 

136 # can have ansi colors. 

137 TERMINAL = 1 

138 # Output is sent to a filter or a file. No width and ansi colors should 

139 # be avoided. 

140 PIPE = 2 

141 

142 

143def get_path_in_apio_package(subpath: str) -> Path: 

144 """Get the full path to the given folder in the apio package. 

145 Inputs: 

146 * subdir: String with a relative path within the apio package. 

147 Use "" for root directory. 

148 

149 Returns: 

150 * The absolute path as a PosixPath() object 

151 

152 Example: folder="commands" 

153 Output: PosixPath('/home/obijuan/.../apio/commands') 

154 """ 

155 

156 # -- Get the full path of this file (util.py) 

157 # -- Ex: /home/obijuan/.../site-packages/apio/util.py 

158 current_python_file = Path(__file__) 

159 

160 # -- The parent folder is the apio root folder 

161 # -- Ex: /home/obijuan/.../site-packages/apio 

162 path = current_python_file.parent.parent 

163 

164 # -- Add the given folder to the path. If subpath = "" this 

165 # -- does nothing, but fails if subpath is None. 

166 path = path / subpath 

167 

168 # -- Return the path 

169 return path 

170 

171 

172@dataclass(frozen=True) 

173class CommandResult: 

174 """Contains the results of a command (subprocess) execution.""" 

175 

176 out_text: Optional[str] = None # stdout multi-line text. 

177 err_text: Optional[str] = None # stderr multi-line text. 

178 exit_code: Optional[int] = None # Exit code, 0 = OK. 

179 

180 

181def exec_command( 

182 cmd: List[str], stdout: AsyncPipe, stderr: AsyncPipe 

183) -> CommandResult: 

184 """Execute the given command using async stdout/stderr.. 

185 

186 NOTE: When running on windows, this function does not support 

187 privilege elevation, to achieve that, use os.system() instead, as 

188 done in drivers.py 

189 

190 INPUTS: 

191 cmd: list of command token (strings) 

192 stdout: the AsyncPipe to use for stdout 

193 stderr: the AsyncPipe to use for stderr. 

194 

195 OUTPUT: 

196 A CommandResult with the command results. 

197 """ 

198 

199 # -- Sanity check. 

200 assert isinstance(cmd, list) 

201 assert isinstance(cmd[0], str) 

202 assert isinstance(stdout, AsyncPipe) 

203 assert isinstance(stderr, AsyncPipe) 

204 

205 # -- Execute the command 

206 try: 

207 with subprocess.Popen( 

208 cmd, stdout=stdout, stderr=stderr, shell=False 

209 ) as proc: 

210 

211 # -- Wait for completion. 

212 out_text, err_text = proc.communicate() 

213 

214 # -- Get status code. 

215 exit_code = proc.returncode 

216 

217 # -- Close the async pipes. 

218 stdout.close() 

219 stderr.close() 

220 

221 # -- User has pressed the Ctrl-C for aborting the command 

222 except KeyboardInterrupt: 

223 cerror("Aborted by user") 

224 # -- NOTE: If using here sys.exit(1), apio requires pressing ctl-c 

225 # -- twice when running 'apio sim'. This form of exit is more direct 

226 # -- and harder. 

227 os._exit(1) 

228 

229 # -- The command does not exist! 

230 except FileNotFoundError: 

231 cerror("Command not found:", cmd) 

232 sys.exit(1) 

233 

234 # -- Extract stdout text 

235 lines = stdout.get_buffer() 

236 out_text = "\n".join(lines) 

237 

238 # -- Extract stderr text 

239 lines = stderr.get_buffer() 

240 err_text = "\n".join(lines) 

241 

242 # -- All done. 

243 result = CommandResult(out_text, err_text, exit_code) 

244 return result 

245 

246 

247def user_directory_or_cwd( 

248 dir_arg: Optional[Path], 

249 *, 

250 description: str, 

251 must_exist: bool = False, 

252 create_if_missing=False, 

253) -> Path: 

254 """Condition a directory arg with current directory as default. If dir_arg 

255 is specified, it is return after validation, else cwd "." is returned. 

256 Description is directory function to include in error messages, e.g. 

257 "Project" or "Destination". 

258 """ 

259 

260 assert not (create_if_missing and must_exist), "Conflicting flags." 

261 

262 # -- Case 1: User provided dir path. 

263 if dir_arg: 

264 project_dir = dir_arg 

265 

266 # -- If exists, it must be a dir. 

267 if project_dir.exists() and not project_dir.is_dir(): 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true

268 cerror(f"{description} directory is a file: {project_dir}") 

269 sys.exit(1) 

270 

271 # -- If required, it must exist. 

272 if must_exist and not project_dir.exists(): 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 cerror(f"{description} directory is missing: {str(project_dir)}") 

274 sys.exit(1) 

275 

276 # -- If requested, create 

277 if create_if_missing and not project_dir.exists(): 

278 cout(f"Creating folder: {project_dir}") 

279 project_dir.mkdir(parents=True) 

280 

281 # -- All done. 

282 return project_dir 

283 

284 # -- Case 2: Using current directory. 

285 # -- We prefer the relative path "." over the absolute path Path.cwd(). 

286 return Path(".") 

287 

288 

289def get_python_version() -> str: 

290 """Return a string with the python version""" 

291 

292 return f"{sys.version_info[0]}.{sys.version_info[1]}" 

293 

294 

295def get_python_ver_tuple() -> Tuple[int, int, int]: 

296 """Return a tuple with the python version. e.g. (3, 12, 1).""" 

297 return sys.version_info[:3] 

298 

299 

300def plurality( 

301 obj: Any, singular: str, plural: str = None, include_num: bool = True 

302) -> str: 

303 """Returns singular or plural based on the size of the object.""" 

304 # -- Figure out the size of the object 

305 if isinstance(obj, int): 

306 n = obj 

307 else: 

308 n = len(obj) 

309 

310 # -- For value of 1 return the singular form. 

311 if n == 1: 

312 if include_num: 312 ↛ 314line 312 didn't jump to line 314 because the condition on line 312 was always true

313 return f"{n} {singular}" 

314 return singular 

315 

316 # -- For all other values, return the plural form. 

317 if plural is None: 317 ↛ 320line 317 didn't jump to line 320 because the condition on line 317 was always true

318 plural = singular + "s" 

319 

320 if include_num: 

321 return f"{n} {plural}" 

322 return plural 

323 

324 

325def list_plurality(str_list: List[str], conjunction: str) -> str: 

326 """Format a list as a human friendly string.""" 

327 # -- This is a programming error. Not a user error. 

328 assert str_list, "list_plurality expect len() >= 1." 

329 

330 # -- Handle the case of a single item. 

331 if len(str_list) == 1: 

332 return str_list[0] 

333 

334 # -- Handle the case of 2 items. 

335 if len(str_list) == 2: 

336 return f"{str_list[0]} {conjunction} {str_list[1]}" 

337 

338 # -- Handle the case of three or more items. 

339 return ", ".join(str_list[:-1]) + f", {conjunction} {str_list[-1]}" 

340 

341 

342def debug_level() -> int: 

343 """Returns the current debug level, with 0 as 'off'.""" 

344 

345 # -- We get a fresh value so it can be adjusted dynamically when needed. 

346 level_str = env_options.get(env_options.APIO_DEBUG, "0") 

347 try: 

348 level_int = int(level_str) 

349 except ValueError: 

350 cerror(f"APIO_DEBUG value '{level_str}' is not an int.") 

351 sys.exit(1) 

352 

353 # -- All done. We don't validate the value, assuming the user knows how 

354 # -- to use it. 

355 return level_int 

356 

357 

358def is_debug(level: int) -> bool: 

359 """Returns True if apio is in debug mode level 'level' or higher. Use 

360 it to enable printing of debug information but not to modify the behavior 

361 of the code. Also, all apio tests should be performed with debug 

362 disabled. Important debug information should be at level 1 while 

363 less important or spammy should be at higher levels.""" 

364 # -- Sanity check. A failure is indicates a programming error. 

365 assert isinstance(level, int), type(level) 

366 assert 1 <= level <= 10, level 

367 

368 return debug_level() >= level 

369 

370 

371def get_apio_version() -> str: 

372 """Returns the version of the apio package.""" 

373 # -- Apio's version is defined in the __init__.py file of the apio package. 

374 # -- Using the version from a file in the apio package rather than from 

375 # -- the pip metadata makes apio more self contained, for example when 

376 # -- installing with pyinstaller rather than with pip. 

377 ver: Tuple[int] = apio.VERSION 

378 assert len(ver) == 3, ver 

379 # -- Format the tuple of three ints as a string such as "0.9.83" 

380 return f"{ver[0]}.{ver[1]}.{ver[2]}" 

381 

382 

383def _check_apio_dir(apio_dir: Path, desc: str, env_var: str): 

384 """Checks the apio home dir or packages dir path for the apio 

385 requirements.""" 

386 

387 # Sanity check. If this fails, it's a programming error. 

388 assert isinstance( 

389 apio_dir, Path 

390 ), f"Error: {desc} is no a Path: {type(apio_dir)}, {apio_dir}" 

391 

392 # -- The path should be absolute, see discussion here: 

393 # -- https://github.com/FPGAwars/apio/issues/522 

394 if not apio_dir.is_absolute(): 

395 cerror( 

396 f"Apio {desc} should be an absolute path " f"[{str(apio_dir)}].", 

397 ) 

398 cout( 

399 f"You can use the system env var {env_var} to set " 

400 f"a different apio {desc}.", 

401 style=INFO, 

402 ) 

403 sys.exit(1) 

404 

405 # -- We have problem with spaces and non ascii character above value 

406 # -- 127, so we allow only ascii characters in the range [33, 127]. 

407 # -- See here https://github.com/FPGAwars/apio/issues/515 

408 for ch in str(apio_dir): 

409 if ord(ch) < 33 or ord(ch) > 127: 

410 cerror( 

411 f"Unsupported character [{ch}] in apio {desc}: " 

412 f"[{str(apio_dir)}].", 

413 ) 

414 cout( 

415 "Only the ASCII characters in the range 33 to 127 are " 

416 "allowed. You can use the\n" 

417 f"system env var '{env_var}' to set a different apio" 

418 f"{desc}.", 

419 style=INFO, 

420 ) 

421 sys.exit(1) 

422 

423 

424def resolve_home_dir() -> Path: 

425 """Get the absolute apio home dir. This is the apio folder where the 

426 profile is located and the packages are installed. 

427 The apio home dir can be overridden using the APIO_HOME environment 

428 variable. If not set, the user_home/.apio folder is used by default: 

429 Ej. Linux: /home/obijuan/.apio 

430 If the folders does not exist, they are created 

431 """ 

432 

433 # -- Get the optional apio home env. 

434 apio_home_dir_env = env_options.get(env_options.APIO_HOME, default=None) 

435 

436 # -- If the env vars specified an home dir then use it. 

437 if apio_home_dir_env: 

438 # -- Expand user home '~' marker, if exists. 

439 apio_home_dir_env = os.path.expanduser(apio_home_dir_env) 

440 # -- Expand varas such as $HOME or %HOME% on windows. 

441 apio_home_dir_env = os.path.expandvars(apio_home_dir_env) 

442 # -- Convert string to path. 

443 home_dir = Path(apio_home_dir_env) 

444 else: 

445 # -- Else, use the default home dir ~/.apio. 

446 home_dir = Path.home() / ".apio" 

447 

448 # -- Verify that the home dir meets apio's requirements. 

449 _check_apio_dir(home_dir, "home dir", "APIO_HOME") 

450 

451 # -- Create the folder if it does not exist 

452 try: 

453 home_dir.mkdir(parents=True, exist_ok=True) 

454 except PermissionError: 

455 cerror(f"No usable home directory {home_dir}") 

456 sys.exit(1) 

457 

458 # Return the home_dir as a Path 

459 return home_dir 

460 

461 

462def resolve_packages_dir(apio_home_dir: Path) -> Path: 

463 """Get the absolute apio packages dir. This is the apio folder where the 

464 packages are installed. The default apio packages dir can be overridden 

465 using the APIO_PACKAGES environment variable. If not set, 

466 the <apio-home>/packages folder is used by default: 

467 Ej. Linux: /home/obijuan/.apio/packages 

468 If the folders does not exist, they are created 

469 """ 

470 

471 # -- Get the optional apio packages env. 

472 apio_packages_dir_env = env_options.get( 

473 env_options.APIO_PACKAGES, default=None 

474 ) 

475 

476 # -- If the env vars specified an packages dir then use it. 

477 if apio_packages_dir_env: 477 ↛ 494line 477 didn't jump to line 494 because the condition on line 477 was always true

478 # -- Verify that the env variable contains 'packages' to make sure we 

479 # -- don't clobber system directories. 

480 if "packages" not in apio_packages_dir_env: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true

481 cerror( 

482 "Apio packages dir APIO_PACKAGES should include the " 

483 "string 'packages'." 

484 ) 

485 sys.exit(1) 

486 # -- Expand user home '~' marker, if exists. 

487 apio_packages_dir_env = os.path.expanduser(apio_packages_dir_env) 

488 # -- Expand varas such as $HOME or %HOME% on windows. 

489 apio_packages_dir_env = os.path.expandvars(apio_packages_dir_env) 

490 # -- Convert string to path. 

491 packages_dir = Path(apio_packages_dir_env) 

492 else: 

493 # -- Else, use the default <home_dir>/packages. 

494 packages_dir = apio_home_dir / "packages" 

495 

496 # -- Verify that the home dir meets apio's requirements. 

497 _check_apio_dir(packages_dir, "packages dir", "APIO_PACKAGES") 

498 

499 # -- Create the folder if it does not exist 

500 # try: 

501 # packages_dir.mkdir(parents=True, exist_ok=True) 

502 # except PermissionError: 

503 # cerror(f"No usable packages directory {packages_dir}") 

504 # sys.exit(1) 

505 

506 # Return the packages as a Path 

507 return packages_dir 

508 

509 

510def split( 

511 s: str, 

512 separator: str, 

513 strip: bool = False, 

514 keep_empty: bool = True, 

515) -> str: 

516 """Split a string into parts.""" 

517 # -- A workaround for python's "".split(",") returning ['']. 

518 s = s.split(separator) if s else [] 

519 

520 # -- Strip the elements if requested. 

521 if strip: 521 ↛ 525line 521 didn't jump to line 525 because the condition on line 521 was always true

522 s = [x.strip() for x in s] 

523 

524 # -- Remove empty elements if requested. 

525 if not keep_empty: 525 ↛ 529line 525 didn't jump to line 529 because the condition on line 525 was always true

526 s = [x for x in s if x] 

527 

528 # --All done. 

529 return s 

530 

531 

532def fpga_arch_sort_key(fpga_arch: str) -> Any: 

533 """Given an fpga arch name such as 'ice40', return a sort key 

534 got force our preferred order of sorting by architecture. Used in 

535 reports such as examples, fpgas, and boards.""" 

536 

537 # -- The preferred order of architectures, Add more if adding new 

538 # -- architectures. 

539 archs = ["ice40", "ecp5", "gowin"] 

540 

541 # -- Primary key with preferred architecture first and in the 

542 # -- preferred order. 

543 primary_key = archs.index(fpga_arch) if fpga_arch in archs else len(archs) 

544 

545 # -- Construct the key, unknown architectures list at the end by 

546 # -- lexicographic order. 

547 return (primary_key, fpga_arch) 

548 

549 

550def subprocess_call( 

551 cmd: List[str], 

552) -> int: 

553 """A helper for running subprocess.call. Exit if an error.""" 

554 

555 if is_debug(1): 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true

556 cout(f"subprocess_call: {cmd}") 

557 

558 # -- Invoke the command. 

559 exit_code = subprocess.call(cmd, shell=False) 

560 

561 if is_debug(1): 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true

562 cout(f"subprocess_call: exit code is {exit_code}") 

563 

564 # -- If ok, return. 

565 if exit_code == 0: 

566 return exit_code 

567 

568 # -- Here when error 

569 cerror(f"Command failed: {cmd}") 

570 sys.exit(1) 

571 

572 

573@contextmanager 

574def pushd(target_dir: Path): 

575 """A context manager for temporary execution in a given directory.""" 

576 prev_dir = os.getcwd() 

577 os.chdir(target_dir) 

578 try: 

579 yield 

580 finally: 

581 os.chdir(prev_dir)