Coverage for apio/utils/util.py: 86%

222 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-24 03:51 +0000

1# -*- coding: utf-8 -*- 

2# -- This file is part of the Apio project 

3# -- (C) 2016-2018 FPGAwars 

4# -- Author Jesús Arroyo 

5# -- License GPLv2 

6# -- Derived from: 

7# ---- Platformio project 

8# ---- (C) 2014-2016 Ivan Kravets <me@ikravets.com> 

9# ---- License Apache v2 

10"""Misc utility functions and classes.""" 

11 

12import sys 

13import os 

14import platform 

15from contextlib import contextmanager 

16from enum import Enum 

17from dataclasses import dataclass 

18from typing import Optional, Any, Tuple, List 

19import subprocess 

20from threading import Thread 

21from pathlib import Path 

22import apio 

23from apio.utils import env_options 

24from apio.common.apio_console import cout, cerror 

25from apio.common.apio_styles import INFO 

26 

27 

28# ---------------------------------------- 

29# -- Constants 

30# ---------------------------------------- 

31 

32 

33class ApioException(Exception): 

34 """Apio error""" 

35 

36 

37class AsyncPipe(Thread): 

38 """A class that implements a pipe that calls back on each incoming line 

39 from an internal thread. Used to process in real time scons output to 

40 show its progress.""" 

41 

42 def __init__(self, line_callback=None): 

43 """If line_callback is not None, it is called for each line as 

44 line_callback(line:str, terminator:str) where line is the line content 

45 and terminator is one of: 

46 "\r" (CR) 

47 "\n" (LF) 

48 "" (EOF) 

49 

50 The callback is done from a private Python thread of this pipe so make 

51 sure to have the proper locking and synchronization as needed. 

52 """ 

53 

54 Thread.__init__(self) 

55 self.outcallback = line_callback 

56 

57 self._fd_read, self._fd_write = os.pipe() 

58 

59 # -- A list of lines received so far. 

60 self._lines_buffer = [] 

61 

62 self.start() 

63 

64 def get_buffer(self): 

65 """DOC: TODO""" 

66 

67 return self._lines_buffer 

68 

69 def fileno(self): 

70 """DOC: TODO""" 

71 

72 return self._fd_write 

73 

74 def _handle_incoming_line(self, bfr: bytearray, terminator: str): 

75 """Handle a new incoming line. 

76 Bfr is a bytes with the line's content, possibly empty. 

77 See __init__ for the description of terminator. 

78 """ 

79 # -- Convert the line's bytes to a string. Replace invalid utf-8 

80 # -- chars with "�" 

81 line = bfr.decode("utf-8", errors="replace") 

82 

83 # -- Append to the lines log buffer. 

84 self._lines_buffer.append(line) 

85 

86 # -- Report back if caller passed a callback. 

87 if self.outcallback: 87 ↛ exitline 87 didn't return from function '_handle_incoming_line' because the condition on line 87 was always true

88 self.outcallback(line, terminator) 

89 

90 def run(self): 

91 """DOC: TODO""" 

92 

93 # -- Prepare a buffer for collecting the line chars, excluding 

94 # -- its line terminator. 

95 bfr = bytearray() 

96 

97 # -- We open in binary mode so we have access to the line terminators. 

98 # -- This is important with progress bars which don't advance to the 

99 # -- next line but redraw on the same line. 

100 with os.fdopen(self._fd_read, "rb") as f: 

101 while True: 

102 b: bytes = f.read(1) 

103 assert len(b) <= 1 

104 

105 # -- Handle end of file 

106 if not b: 

107 if bfr: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 self._handle_incoming_line(bfr, "") 

109 return 

110 

111 # -- Handle \r terminator 

112 if b == b"\r": 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 self._handle_incoming_line(bfr, "\r") 

114 bfr.clear() 

115 continue 

116 

117 # -- Handle \n terminator 

118 if b == b"\n": 

119 self._handle_incoming_line(bfr, "\n") 

120 bfr.clear() 

121 continue 

122 

123 # -- Handle a regular character 

124 bfr.append(b[0]) 

125 

126 def close(self): 

127 """DOC: TODO""" 

128 

129 os.close(self._fd_write) 

130 self.join() 

131 

132 

133class TerminalMode(Enum): 

134 """Represents to two modes of stdout/err.""" 

135 

136 # Output is sent to a terminal. Terminal width is available, and text 

137 # can have ansi colors. 

138 TERMINAL = 1 

139 # Output is sent to a filter or a file. No width and ansi colors should 

140 # be avoided. 

141 PIPE = 2 

142 

143 

144def get_path_in_apio_package(subpath: str) -> Path: 

145 """Get the full path to the given folder in the apio package. 

146 Inputs: 

147 * subdir: String with a relative path within the apio package. 

148 Use "" for root directory. 

149 

150 Returns: 

151 * The absolute path as a PosixPath() object 

152 

153 Example: folder="commands" 

154 Output: PosixPath('/home/obijuan/.../apio/commands') 

155 """ 

156 

157 # -- Get the full path of this file (util.py) 

158 # -- Ex: /home/obijuan/.../site-packages/apio/util.py 

159 current_python_file = Path(__file__) 

160 

161 # -- The parent folder is the apio root folder 

162 # -- Ex: /home/obijuan/.../site-packages/apio 

163 path = current_python_file.parent.parent 

164 

165 # -- Add the given folder to the path. If subpath = "" this 

166 # -- does nothing, but fails if subpath is None. 

167 path = path / subpath 

168 

169 # -- Return the path 

170 return path 

171 

172 

173@dataclass(frozen=True) 

174class CommandResult: 

175 """Contains the results of a command (subprocess) execution.""" 

176 

177 out_text: Optional[str] = None # stdout multi-line text. 

178 err_text: Optional[str] = None # stderr multi-line text. 

179 exit_code: Optional[int] = None # Exit code, 0 = OK. 

180 

181 

182def exec_command( 

183 cmd: List[str], stdout: AsyncPipe, stderr: AsyncPipe 

184) -> CommandResult: 

185 """Execute the given command using async stdout/stderr.. 

186 

187 NOTE: When running on windows, this function does not support 

188 privilege elevation, to achieve that, use os.system() instead, as 

189 done in drivers.py 

190 

191 INPUTS: 

192 cmd: list of command token (strings) 

193 stdout: the AsyncPipe to use for stdout 

194 stderr: the AsyncPipe to use for stderr. 

195 

196 OUTPUT: 

197 A CommandResult with the command results. 

198 """ 

199 

200 # -- Sanity check. 

201 assert isinstance(cmd, list) 

202 assert isinstance(cmd[0], str) 

203 assert isinstance(stdout, AsyncPipe) 

204 assert isinstance(stderr, AsyncPipe) 

205 

206 # -- Execute the command 

207 try: 

208 with subprocess.Popen( 

209 cmd, stdout=stdout.fileno(), stderr=stderr.fileno(), shell=False 

210 ) as proc: 

211 

212 # -- Wait for completion. 

213 out_text, err_text = proc.communicate() 

214 

215 # -- Get status code. 

216 exit_code = proc.returncode 

217 

218 # -- Close the async pipes. 

219 stdout.close() 

220 stderr.close() 

221 

222 # -- User has pressed the Ctrl-C for aborting the command 

223 except KeyboardInterrupt: 

224 cerror("Aborted by user") 

225 # -- NOTE: If using here sys.exit(1), apio requires pressing ctl-c 

226 # -- twice when running 'apio sim'. This form of exit is more direct 

227 # -- and harder. 

228 os._exit(1) 

229 

230 # -- The command does not exist! 

231 except FileNotFoundError: 

232 cerror("Command not found:", str(cmd)) 

233 sys.exit(1) 

234 

235 # -- Extract stdout text 

236 lines = stdout.get_buffer() 

237 out_text = "\n".join(lines) 

238 

239 # -- Extract stderr text 

240 lines = stderr.get_buffer() 

241 err_text = "\n".join(lines) 

242 

243 # -- All done. 

244 result = CommandResult(out_text, err_text, exit_code) 

245 return result 

246 

247 

248def user_directory_or_cwd( 

249 dir_arg: Optional[Path], 

250 *, 

251 description: str, 

252 must_exist: bool = False, 

253 create_if_missing=False, 

254) -> Path: 

255 """Condition a directory arg with current directory as default. If dir_arg 

256 is specified, it is return after validation, else cwd "." is returned. 

257 Description is directory function to include in error messages, e.g. 

258 "Project" or "Destination". 

259 """ 

260 

261 assert not (create_if_missing and must_exist), "Conflicting flags." 

262 

263 # -- Case 1: User provided dir path. 

264 if dir_arg: 

265 project_dir = dir_arg 

266 

267 # -- If exists, it must be a dir. 

268 if project_dir.exists() and not project_dir.is_dir(): 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 cerror(f"{description} directory is a file: {project_dir}") 

270 sys.exit(1) 

271 

272 # -- If required, it must exist. 

273 if must_exist and not project_dir.exists(): 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 cerror(f"{description} directory is missing: {str(project_dir)}") 

275 sys.exit(1) 

276 

277 # -- If requested, create 

278 if create_if_missing and not project_dir.exists(): 

279 cout(f"Creating folder: {project_dir}") 

280 project_dir.mkdir(parents=True) 

281 

282 # -- All done. 

283 return project_dir 

284 

285 # -- Case 2: Using current directory. 

286 # -- We prefer the relative path "." over the absolute path Path.cwd(). 

287 return Path(".") 

288 

289 

290def get_python_version() -> str: 

291 """Return a string with the python version""" 

292 

293 return f"{sys.version_info[0]}.{sys.version_info[1]}" 

294 

295 

296def get_python_ver_tuple() -> Tuple[int, int, int]: 

297 """Return a tuple with the python version. e.g. (3, 12, 1).""" 

298 return sys.version_info[:3] 

299 

300 

301def plurality( 

302 obj: Any, 

303 singular: str, 

304 plural: str | None = None, 

305 include_num: bool = True, 

306) -> str: 

307 """Returns singular or plural based on the size of the object.""" 

308 # -- Figure out the size of the object 

309 if isinstance(obj, int): 

310 n = obj 

311 else: 

312 n = len(obj) 

313 

314 # -- For value of 1 return the singular form. 

315 if n == 1: 

316 if include_num: 316 ↛ 318line 316 didn't jump to line 318 because the condition on line 316 was always true

317 return f"{n} {singular}" 

318 return singular 

319 

320 # -- For all other values, return the plural form. 

321 if plural is None: 321 ↛ 324line 321 didn't jump to line 324 because the condition on line 321 was always true

322 plural = singular + "s" 

323 

324 if include_num: 

325 return f"{n} {plural}" 

326 return plural 

327 

328 

329def list_plurality(str_list: List[str], conjunction: str) -> str: 

330 """Format a list as a human friendly string.""" 

331 # -- This is a programming error. Not a user error. 

332 assert str_list, "list_plurality expect len() >= 1." 

333 

334 # -- Handle the case of a single item. 

335 if len(str_list) == 1: 

336 return str_list[0] 

337 

338 # -- Handle the case of 2 items. 

339 if len(str_list) == 2: 

340 return f"{str_list[0]} {conjunction} {str_list[1]}" 

341 

342 # -- Handle the case of three or more items. 

343 return ", ".join(str_list[:-1]) + f", {conjunction} {str_list[-1]}" 

344 

345 

346def debug_level() -> int: 

347 """Returns the current debug level, with 0 as 'off'.""" 

348 

349 # -- We get a fresh value so it can be adjusted dynamically when needed. 

350 level_str = env_options.get(env_options.APIO_DEBUG, "0") 

351 try: 

352 level_int = int(level_str) # pyright: ignore[reportArgumentType] 

353 except ValueError: 

354 cerror(f"APIO_DEBUG value '{level_str}' is not an int.") 

355 sys.exit(1) 

356 

357 # -- All done. We don't validate the value, assuming the user knows how 

358 # -- to use it. 

359 return level_int 

360 

361 

362def is_debug(level: int) -> bool: 

363 """Returns True if apio is in debug mode level 'level' or higher. Use 

364 it to enable printing of debug information but not to modify the behavior 

365 of the code. Also, all apio tests should be performed with debug 

366 disabled. Important debug information should be at level 1 while 

367 less important or spammy should be at higher levels.""" 

368 # -- Sanity check. A failure is indicates a programming error. 

369 assert isinstance(level, int), type(level) 

370 assert 1 <= level <= 10, level 

371 

372 return debug_level() >= level 

373 

374 

375def get_apio_version_tuple() -> Tuple[int, int, int]: 

376 """Returns the version of the apio package as tuple of 3 ints.""" 

377 # -- Apio's version is defined in the __init__.py file of the apio package. 

378 # -- Using the version from a file in the apio package rather than from 

379 # -- the pip metadata makes apio more self contained, for example when 

380 # -- installing with pyinstaller rather than with pip. 

381 ver: Tuple[int, int, int] = apio.APIO_VERSION 

382 assert len(ver) == 3, ver 

383 assert isinstance(ver[0], int) 

384 assert isinstance(ver[1], int) 

385 assert isinstance(ver[2], int) 

386 return ver 

387 

388 

389def get_apio_version_str() -> str: 

390 """Returns the version of the apio package as a string like "1.22.3".""" 

391 ver: Tuple[int, int, int] = get_apio_version_tuple() 

392 return f"{ver[0]}.{ver[1]}.{ver[2]}" 

393 

394 

395def get_apio_release_info() -> str: 

396 """Returns the release info string.""" 

397 return apio.RELEASE_INFO 

398 

399 

400def get_apio_version_message() -> str: 

401 """Returns the string to show on `apio --version`.""" 

402 ver_str = get_apio_version_str() 

403 release_str = get_apio_release_info() or "no release info" 

404 return f"Apio CLI version {ver_str} ({release_str})" 

405 

406 

407def _check_apio_dir(apio_dir: Path, desc: str, env_var: str): 

408 """Checks the apio home dir or packages dir path for the apio 

409 requirements.""" 

410 

411 # Sanity check. If this fails, it's a programming error. 

412 assert isinstance( 

413 apio_dir, Path 

414 ), f"Error: {desc} is no a Path: {type(apio_dir)}, {apio_dir}" 

415 

416 # -- The path should be absolute, see discussion here: 

417 # -- https://github.com/FPGAwars/apio/issues/522 

418 if not apio_dir.is_absolute(): 

419 cerror( 

420 f"Apio {desc} should be an absolute path " f"[{str(apio_dir)}].", 

421 ) 

422 cout( 

423 f"You can use the system env var {env_var} to set " 

424 f"a different apio {desc}.", 

425 style=INFO, 

426 ) 

427 sys.exit(1) 

428 

429 # -- We have problem with spaces and non ascii character above value 

430 # -- 127, so we allow only ascii characters in the range [33, 127]. 

431 # -- See here https://github.com/FPGAwars/apio/issues/515 

432 for ch in str(apio_dir): 

433 if ord(ch) < 33 or ord(ch) > 127: 

434 cerror( 

435 f"Unsupported character [{ch}] in apio {desc}: " 

436 f"[{str(apio_dir)}].", 

437 ) 

438 cout( 

439 "Only the ASCII characters in the range 33 to 127 are " 

440 "allowed. You can use the\n" 

441 f"system env var '{env_var}' to set a different apio" 

442 f"{desc}.", 

443 style=INFO, 

444 ) 

445 sys.exit(1) 

446 

447 

448def resolve_home_dir() -> Path: 

449 """Get the absolute apio home dir. This is the apio folder where the 

450 profile is located and the packages are installed. 

451 The apio home dir can be overridden using the APIO_HOME environment 

452 variable. If not set, the user_home/.apio folder is used by default: 

453 Ej. Linux: /home/obijuan/.apio 

454 If the folders does not exist, they are created 

455 """ 

456 

457 # -- Get the optional apio home env. 

458 apio_home_dir_env = env_options.get(env_options.APIO_HOME) 

459 

460 # -- If the env vars specified an home dir then use it. 

461 if apio_home_dir_env: 

462 # -- Expand user home '~' marker, if exists. 

463 apio_home_dir_env = os.path.expanduser(apio_home_dir_env) 

464 # -- Expand varas such as $HOME or %HOME% on windows. 

465 apio_home_dir_env = os.path.expandvars(apio_home_dir_env) 

466 # -- Convert string to path. 

467 home_dir = Path(apio_home_dir_env) 

468 else: 

469 # -- Else, use the default home dir ~/.apio. 

470 home_dir = Path.home() / ".apio" 

471 

472 # -- Verify that the home dir meets apio's requirements. 

473 _check_apio_dir(home_dir, "home dir", "APIO_HOME") 

474 

475 # -- Create the folder if it does not exist 

476 try: 

477 home_dir.mkdir(parents=True, exist_ok=True) 

478 except PermissionError: 

479 cerror(f"No usable home directory {home_dir}") 

480 sys.exit(1) 

481 

482 # Return the home_dir as a Path 

483 return home_dir 

484 

485 

486def resolve_packages_dir(apio_home_dir: Path) -> Path: 

487 """Get the absolute apio packages dir. This is the apio folder where the 

488 packages are installed. The default apio packages dir can be overridden 

489 using the APIO_PACKAGES environment variable. If not set, 

490 the <apio-home>/packages folder is used by default: 

491 Ej. Linux: /home/obijuan/.apio/packages 

492 If the folders does not exist, they are created 

493 """ 

494 

495 # -- Get the optional apio packages env. 

496 apio_packages_dir_env = env_options.get(env_options.APIO_PACKAGES) 

497 

498 # -- If the env vars specified an packages dir then use it. 

499 if apio_packages_dir_env: 499 ↛ 516line 499 didn't jump to line 516 because the condition on line 499 was always true

500 # -- Verify that the env variable contains 'packages' to make sure we 

501 # -- don't clobber system directories. 

502 if "packages" not in apio_packages_dir_env: 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true

503 cerror( 

504 "Apio packages dir APIO_PACKAGES should include the " 

505 "string 'packages'." 

506 ) 

507 sys.exit(1) 

508 # -- Expand user home '~' marker, if exists. 

509 apio_packages_dir_env = os.path.expanduser(apio_packages_dir_env) 

510 # -- Expand varas such as $HOME or %HOME% on windows. 

511 apio_packages_dir_env = os.path.expandvars(apio_packages_dir_env) 

512 # -- Convert string to path. 

513 packages_dir = Path(apio_packages_dir_env) 

514 else: 

515 # -- Else, use the default <home_dir>/packages. 

516 packages_dir = apio_home_dir / "packages" 

517 

518 # -- Verify that the home dir meets apio's requirements. 

519 _check_apio_dir(packages_dir, "packages dir", "APIO_PACKAGES") 

520 

521 # -- Create the folder if it does not exist 

522 # try: 

523 # packages_dir.mkdir(parents=True, exist_ok=True) 

524 # except PermissionError: 

525 # cerror(f"No usable packages directory {packages_dir}") 

526 # sys.exit(1) 

527 

528 # Return the packages as a Path 

529 return packages_dir 

530 

531 

532def fpga_arch_sort_key(fpga_arch: str) -> Any: 

533 """Given an fpga arch name such as 'ice40', return a sort key 

534 got force our preferred order of sorting by architecture. Used in 

535 reports such as examples, fpgas, and boards.""" 

536 

537 # -- The preferred order of architectures, Add more if adding new 

538 # -- architectures. 

539 archs = ["ice40", "ecp5", "gowin", "xilinx"] 

540 

541 # -- Primary key with preferred architecture first and in the 

542 # -- preferred order. 

543 primary_key = archs.index(fpga_arch) if fpga_arch in archs else len(archs) 

544 

545 # -- Construct the key, unknown architectures list at the end by 

546 # -- lexicographic order. 

547 return (primary_key, fpga_arch) 

548 

549 

550def subprocess_call( 

551 cmd: List[str], 

552) -> int: 

553 """A helper for running subprocess.call. Exit if an error.""" 

554 

555 if is_debug(1): 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true

556 cout(f"subprocess_call: {cmd}") 

557 

558 # -- Invoke the command. 

559 exit_code = subprocess.call(cmd, shell=False) 

560 

561 if is_debug(1): 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true

562 cout(f"subprocess_call: exit code is {exit_code}") 

563 

564 # -- If ok, return. 

565 if exit_code == 0: 

566 return exit_code 

567 

568 # -- Here when error 

569 cerror(f"Command failed: {cmd}") 

570 sys.exit(1) 

571 

572 

573@contextmanager 

574def pushd(target_dir: Path): 

575 """A context manager for temporary execution in a given directory.""" 

576 prev_dir = os.getcwd() 

577 os.chdir(target_dir) 

578 try: 

579 yield 

580 finally: 

581 os.chdir(prev_dir) 

582 

583 

584def is_pyinstaller_app() -> bool: 

585 """Return true if this is a pyinstaller packaged app. 

586 Base on https://pyinstaller.org/en/stable/runtime-information.html 

587 """ 

588 return getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS") 

589 

590 

591def is_under_vscode_debugger() -> bool: 

592 """Returns true if running under VSCode debugger.""" 

593 # if os.environ.get("TERM_PROGRAM") == "vscode": 

594 # return True 

595 if os.environ.get("DEBUGPY_RUNNING"): 595 ↛ 596line 595 didn't jump to line 596 because the condition on line 595 was never true

596 return True 

597 return False 

598 

599 

600def get_platform_info() -> str: 

601 """Return a short string with additional platform info such as version.""" 

602 return platform.platform()