Coverage for apio / utils / util.py: 86%

229 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-08 02:47 +0000

1# -*- coding: utf-8 -*- 

2# -- This file is part of the Apio project 

3# -- (C) 2016-2018 FPGAwars 

4# -- Author Jesús Arroyo 

5# -- License GPLv2 

6# -- Derived from: 

7# ---- Platformio project 

8# ---- (C) 2014-2016 Ivan Kravets <me@ikravets.com> 

9# ---- License Apache v2 

10"""Misc utility functions and classes.""" 

11 

12import sys 

13import os 

14import platform 

15from contextlib import contextmanager 

16from enum import Enum 

17from dataclasses import dataclass 

18from typing import Optional, Any, Tuple, List 

19import subprocess 

20from threading import Thread 

21from pathlib import Path 

22import apio 

23from apio.utils import env_options 

24from apio.common.apio_console import cout, cerror 

25from apio.common.apio_styles import INFO 

26 

27 

28# ---------------------------------------- 

29# -- Constants 

30# ---------------------------------------- 

31 

32 

33class ApioException(Exception): 

34 """Apio error""" 

35 

36 

37class AsyncPipe(Thread): 

38 """A class that implements a pipe that calls back on each incoming line 

39 from an internal thread. Used to process in real time scons output to 

40 show its progress.""" 

41 

42 def __init__(self, line_callback=None): 

43 """If line_callback is not None, it is called for each line as 

44 line_callback(line:str, terminator:str) where line is the line content 

45 and terminator is one of: 

46 "\r" (CR) 

47 "\n" (LF) 

48 "" (EOF) 

49 

50 The callback is done from a private Python thread of this pipe so make 

51 sure to have the proper locking and synchronization as needed. 

52 """ 

53 

54 Thread.__init__(self) 

55 self.outcallback = line_callback 

56 

57 self._fd_read, self._fd_write = os.pipe() 

58 

59 # -- A list of lines received so far. 

60 self._lines_buffer = [] 

61 

62 self.start() 

63 

64 def get_buffer(self): 

65 """DOC: TODO""" 

66 

67 return self._lines_buffer 

68 

69 def fileno(self): 

70 """DOC: TODO""" 

71 

72 return self._fd_write 

73 

74 def _handle_incoming_line(self, bfr: bytearray, terminator: str): 

75 """Handle a new incoming line. 

76 Bfr is a bytes with the line's content, possibly empty. 

77 See __init__ for the description of terminator. 

78 """ 

79 # -- Convert the line's bytes to a string. Replace invalid utf-8 

80 # -- chars with "�" 

81 line = bfr.decode("utf-8", errors="replace") 

82 

83 # -- Append to the lines log buffer. 

84 self._lines_buffer.append(line) 

85 

86 # -- Report back if caller passed a callback. 

87 if self.outcallback: 87 ↛ exitline 87 didn't return from function '_handle_incoming_line' because the condition on line 87 was always true

88 self.outcallback(line, terminator) 

89 

90 def run(self): 

91 """DOC: TODO""" 

92 

93 # -- Prepare a buffer for collecting the line chars, excluding 

94 # -- its line terminator. 

95 bfr = bytearray() 

96 

97 # -- We open in binary mode so we have access to the line terminators. 

98 # -- This is important with progress bars which don't advance to the 

99 # -- next line but redraw on the same line. 

100 with os.fdopen(self._fd_read, "rb") as f: 

101 while True: 

102 b: Optional[bytearray] = f.read(1) 

103 assert len(b) <= 1 

104 

105 # -- Handle end of file 

106 if not b: 

107 if bfr: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 self._handle_incoming_line(bfr, "") 

109 return 

110 

111 # -- Handle \r terminator 

112 if b == b"\r": 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 self._handle_incoming_line(bfr, "\r") 

114 bfr.clear() 

115 continue 

116 

117 # -- Handle \n terminator 

118 if b == b"\n": 

119 self._handle_incoming_line(bfr, "\n") 

120 bfr.clear() 

121 continue 

122 

123 # -- Handle a regular character 

124 bfr.append(b[0]) 

125 

126 def close(self): 

127 """DOC: TODO""" 

128 

129 os.close(self._fd_write) 

130 self.join() 

131 

132 

133class TerminalMode(Enum): 

134 """Represents to two modes of stdout/err.""" 

135 

136 # Output is sent to a terminal. Terminal width is available, and text 

137 # can have ansi colors. 

138 TERMINAL = 1 

139 # Output is sent to a filter or a file. No width and ansi colors should 

140 # be avoided. 

141 PIPE = 2 

142 

143 

144def get_path_in_apio_package(subpath: str) -> Path: 

145 """Get the full path to the given folder in the apio package. 

146 Inputs: 

147 * subdir: String with a relative path within the apio package. 

148 Use "" for root directory. 

149 

150 Returns: 

151 * The absolute path as a PosixPath() object 

152 

153 Example: folder="commands" 

154 Output: PosixPath('/home/obijuan/.../apio/commands') 

155 """ 

156 

157 # -- Get the full path of this file (util.py) 

158 # -- Ex: /home/obijuan/.../site-packages/apio/util.py 

159 current_python_file = Path(__file__) 

160 

161 # -- The parent folder is the apio root folder 

162 # -- Ex: /home/obijuan/.../site-packages/apio 

163 path = current_python_file.parent.parent 

164 

165 # -- Add the given folder to the path. If subpath = "" this 

166 # -- does nothing, but fails if subpath is None. 

167 path = path / subpath 

168 

169 # -- Return the path 

170 return path 

171 

172 

173@dataclass(frozen=True) 

174class CommandResult: 

175 """Contains the results of a command (subprocess) execution.""" 

176 

177 out_text: Optional[str] = None # stdout multi-line text. 

178 err_text: Optional[str] = None # stderr multi-line text. 

179 exit_code: Optional[int] = None # Exit code, 0 = OK. 

180 

181 

182def exec_command( 

183 cmd: List[str], stdout: AsyncPipe, stderr: AsyncPipe 

184) -> CommandResult: 

185 """Execute the given command using async stdout/stderr.. 

186 

187 NOTE: When running on windows, this function does not support 

188 privilege elevation, to achieve that, use os.system() instead, as 

189 done in drivers.py 

190 

191 INPUTS: 

192 cmd: list of command token (strings) 

193 stdout: the AsyncPipe to use for stdout 

194 stderr: the AsyncPipe to use for stderr. 

195 

196 OUTPUT: 

197 A CommandResult with the command results. 

198 """ 

199 

200 # -- Sanity check. 

201 assert isinstance(cmd, list) 

202 assert isinstance(cmd[0], str) 

203 assert isinstance(stdout, AsyncPipe) 

204 assert isinstance(stderr, AsyncPipe) 

205 

206 # -- Execute the command 

207 try: 

208 with subprocess.Popen( 

209 cmd, stdout=stdout, stderr=stderr, shell=False 

210 ) as proc: 

211 

212 # -- Wait for completion. 

213 out_text, err_text = proc.communicate() 

214 

215 # -- Get status code. 

216 exit_code = proc.returncode 

217 

218 # -- Close the async pipes. 

219 stdout.close() 

220 stderr.close() 

221 

222 # -- User has pressed the Ctrl-C for aborting the command 

223 except KeyboardInterrupt: 

224 cerror("Aborted by user") 

225 # -- NOTE: If using here sys.exit(1), apio requires pressing ctl-c 

226 # -- twice when running 'apio sim'. This form of exit is more direct 

227 # -- and harder. 

228 os._exit(1) 

229 

230 # -- The command does not exist! 

231 except FileNotFoundError: 

232 cerror("Command not found:", cmd) 

233 sys.exit(1) 

234 

235 # -- Extract stdout text 

236 lines = stdout.get_buffer() 

237 out_text = "\n".join(lines) 

238 

239 # -- Extract stderr text 

240 lines = stderr.get_buffer() 

241 err_text = "\n".join(lines) 

242 

243 # -- All done. 

244 result = CommandResult(out_text, err_text, exit_code) 

245 return result 

246 

247 

248def user_directory_or_cwd( 

249 dir_arg: Optional[Path], 

250 *, 

251 description: str, 

252 must_exist: bool = False, 

253 create_if_missing=False, 

254) -> Path: 

255 """Condition a directory arg with current directory as default. If dir_arg 

256 is specified, it is return after validation, else cwd "." is returned. 

257 Description is directory function to include in error messages, e.g. 

258 "Project" or "Destination". 

259 """ 

260 

261 assert not (create_if_missing and must_exist), "Conflicting flags." 

262 

263 # -- Case 1: User provided dir path. 

264 if dir_arg: 

265 project_dir = dir_arg 

266 

267 # -- If exists, it must be a dir. 

268 if project_dir.exists() and not project_dir.is_dir(): 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 cerror(f"{description} directory is a file: {project_dir}") 

270 sys.exit(1) 

271 

272 # -- If required, it must exist. 

273 if must_exist and not project_dir.exists(): 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 cerror(f"{description} directory is missing: {str(project_dir)}") 

275 sys.exit(1) 

276 

277 # -- If requested, create 

278 if create_if_missing and not project_dir.exists(): 

279 cout(f"Creating folder: {project_dir}") 

280 project_dir.mkdir(parents=True) 

281 

282 # -- All done. 

283 return project_dir 

284 

285 # -- Case 2: Using current directory. 

286 # -- We prefer the relative path "." over the absolute path Path.cwd(). 

287 return Path(".") 

288 

289 

290def get_python_version() -> str: 

291 """Return a string with the python version""" 

292 

293 return f"{sys.version_info[0]}.{sys.version_info[1]}" 

294 

295 

296def get_python_ver_tuple() -> Tuple[int, int, int]: 

297 """Return a tuple with the python version. e.g. (3, 12, 1).""" 

298 return sys.version_info[:3] 

299 

300 

301def plurality( 

302 obj: Any, singular: str, plural: str = None, include_num: bool = True 

303) -> str: 

304 """Returns singular or plural based on the size of the object.""" 

305 # -- Figure out the size of the object 

306 if isinstance(obj, int): 

307 n = obj 

308 else: 

309 n = len(obj) 

310 

311 # -- For value of 1 return the singular form. 

312 if n == 1: 

313 if include_num: 313 ↛ 315line 313 didn't jump to line 315 because the condition on line 313 was always true

314 return f"{n} {singular}" 

315 return singular 

316 

317 # -- For all other values, return the plural form. 

318 if plural is None: 318 ↛ 321line 318 didn't jump to line 321 because the condition on line 318 was always true

319 plural = singular + "s" 

320 

321 if include_num: 

322 return f"{n} {plural}" 

323 return plural 

324 

325 

326def list_plurality(str_list: List[str], conjunction: str) -> str: 

327 """Format a list as a human friendly string.""" 

328 # -- This is a programming error. Not a user error. 

329 assert str_list, "list_plurality expect len() >= 1." 

330 

331 # -- Handle the case of a single item. 

332 if len(str_list) == 1: 

333 return str_list[0] 

334 

335 # -- Handle the case of 2 items. 

336 if len(str_list) == 2: 

337 return f"{str_list[0]} {conjunction} {str_list[1]}" 

338 

339 # -- Handle the case of three or more items. 

340 return ", ".join(str_list[:-1]) + f", {conjunction} {str_list[-1]}" 

341 

342 

343def debug_level() -> int: 

344 """Returns the current debug level, with 0 as 'off'.""" 

345 

346 # -- We get a fresh value so it can be adjusted dynamically when needed. 

347 level_str = env_options.get(env_options.APIO_DEBUG, "0") 

348 try: 

349 level_int = int(level_str) 

350 except ValueError: 

351 cerror(f"APIO_DEBUG value '{level_str}' is not an int.") 

352 sys.exit(1) 

353 

354 # -- All done. We don't validate the value, assuming the user knows how 

355 # -- to use it. 

356 return level_int 

357 

358 

359def is_debug(level: int) -> bool: 

360 """Returns True if apio is in debug mode level 'level' or higher. Use 

361 it to enable printing of debug information but not to modify the behavior 

362 of the code. Also, all apio tests should be performed with debug 

363 disabled. Important debug information should be at level 1 while 

364 less important or spammy should be at higher levels.""" 

365 # -- Sanity check. A failure is indicates a programming error. 

366 assert isinstance(level, int), type(level) 

367 assert 1 <= level <= 10, level 

368 

369 return debug_level() >= level 

370 

371 

372def get_apio_version_tuple() -> Tuple[int]: 

373 """Returns the version of the apio package as tuple of 3 ints.""" 

374 # -- Apio's version is defined in the __init__.py file of the apio package. 

375 # -- Using the version from a file in the apio package rather than from 

376 # -- the pip metadata makes apio more self contained, for example when 

377 # -- installing with pyinstaller rather than with pip. 

378 ver: Tuple[int] = apio.APIO_VERSION 

379 assert len(ver) == 3, ver 

380 assert isinstance(ver[0], int) 

381 assert isinstance(ver[1], int) 

382 assert isinstance(ver[2], int) 

383 return ver 

384 

385 

386def get_apio_version_str() -> str: 

387 """Returns the version of the apio package as a string like "1.22.3".""" 

388 ver: Tuple[int] = get_apio_version_tuple() 

389 return f"{ver[0]}.{ver[1]}.{ver[2]}" 

390 

391 

392def get_apio_release_info() -> str: 

393 """Returns the release info string.""" 

394 return apio.RELEASE_INFO 

395 

396 

397def get_apio_version_message() -> str: 

398 """Returns the string to show on `apio --version`.""" 

399 ver_str = get_apio_version_str() 

400 release_str = get_apio_release_info() or "no release info" 

401 return f"Apio CLI version {ver_str} ({release_str})" 

402 

403 

404def _check_apio_dir(apio_dir: Path, desc: str, env_var: str): 

405 """Checks the apio home dir or packages dir path for the apio 

406 requirements.""" 

407 

408 # Sanity check. If this fails, it's a programming error. 

409 assert isinstance( 

410 apio_dir, Path 

411 ), f"Error: {desc} is no a Path: {type(apio_dir)}, {apio_dir}" 

412 

413 # -- The path should be absolute, see discussion here: 

414 # -- https://github.com/FPGAwars/apio/issues/522 

415 if not apio_dir.is_absolute(): 

416 cerror( 

417 f"Apio {desc} should be an absolute path " f"[{str(apio_dir)}].", 

418 ) 

419 cout( 

420 f"You can use the system env var {env_var} to set " 

421 f"a different apio {desc}.", 

422 style=INFO, 

423 ) 

424 sys.exit(1) 

425 

426 # -- We have problem with spaces and non ascii character above value 

427 # -- 127, so we allow only ascii characters in the range [33, 127]. 

428 # -- See here https://github.com/FPGAwars/apio/issues/515 

429 for ch in str(apio_dir): 

430 if ord(ch) < 33 or ord(ch) > 127: 

431 cerror( 

432 f"Unsupported character [{ch}] in apio {desc}: " 

433 f"[{str(apio_dir)}].", 

434 ) 

435 cout( 

436 "Only the ASCII characters in the range 33 to 127 are " 

437 "allowed. You can use the\n" 

438 f"system env var '{env_var}' to set a different apio" 

439 f"{desc}.", 

440 style=INFO, 

441 ) 

442 sys.exit(1) 

443 

444 

445def resolve_home_dir() -> Path: 

446 """Get the absolute apio home dir. This is the apio folder where the 

447 profile is located and the packages are installed. 

448 The apio home dir can be overridden using the APIO_HOME environment 

449 variable. If not set, the user_home/.apio folder is used by default: 

450 Ej. Linux: /home/obijuan/.apio 

451 If the folders does not exist, they are created 

452 """ 

453 

454 # -- Get the optional apio home env. 

455 apio_home_dir_env = env_options.get(env_options.APIO_HOME, default=None) 

456 

457 # -- If the env vars specified an home dir then use it. 

458 if apio_home_dir_env: 

459 # -- Expand user home '~' marker, if exists. 

460 apio_home_dir_env = os.path.expanduser(apio_home_dir_env) 

461 # -- Expand varas such as $HOME or %HOME% on windows. 

462 apio_home_dir_env = os.path.expandvars(apio_home_dir_env) 

463 # -- Convert string to path. 

464 home_dir = Path(apio_home_dir_env) 

465 else: 

466 # -- Else, use the default home dir ~/.apio. 

467 home_dir = Path.home() / ".apio" 

468 

469 # -- Verify that the home dir meets apio's requirements. 

470 _check_apio_dir(home_dir, "home dir", "APIO_HOME") 

471 

472 # -- Create the folder if it does not exist 

473 try: 

474 home_dir.mkdir(parents=True, exist_ok=True) 

475 except PermissionError: 

476 cerror(f"No usable home directory {home_dir}") 

477 sys.exit(1) 

478 

479 # Return the home_dir as a Path 

480 return home_dir 

481 

482 

483def resolve_packages_dir(apio_home_dir: Path) -> Path: 

484 """Get the absolute apio packages dir. This is the apio folder where the 

485 packages are installed. The default apio packages dir can be overridden 

486 using the APIO_PACKAGES environment variable. If not set, 

487 the <apio-home>/packages folder is used by default: 

488 Ej. Linux: /home/obijuan/.apio/packages 

489 If the folders does not exist, they are created 

490 """ 

491 

492 # -- Get the optional apio packages env. 

493 apio_packages_dir_env = env_options.get( 

494 env_options.APIO_PACKAGES, default=None 

495 ) 

496 

497 # -- If the env vars specified an packages dir then use it. 

498 if apio_packages_dir_env: 498 ↛ 515line 498 didn't jump to line 515 because the condition on line 498 was always true

499 # -- Verify that the env variable contains 'packages' to make sure we 

500 # -- don't clobber system directories. 

501 if "packages" not in apio_packages_dir_env: 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true

502 cerror( 

503 "Apio packages dir APIO_PACKAGES should include the " 

504 "string 'packages'." 

505 ) 

506 sys.exit(1) 

507 # -- Expand user home '~' marker, if exists. 

508 apio_packages_dir_env = os.path.expanduser(apio_packages_dir_env) 

509 # -- Expand varas such as $HOME or %HOME% on windows. 

510 apio_packages_dir_env = os.path.expandvars(apio_packages_dir_env) 

511 # -- Convert string to path. 

512 packages_dir = Path(apio_packages_dir_env) 

513 else: 

514 # -- Else, use the default <home_dir>/packages. 

515 packages_dir = apio_home_dir / "packages" 

516 

517 # -- Verify that the home dir meets apio's requirements. 

518 _check_apio_dir(packages_dir, "packages dir", "APIO_PACKAGES") 

519 

520 # -- Create the folder if it does not exist 

521 # try: 

522 # packages_dir.mkdir(parents=True, exist_ok=True) 

523 # except PermissionError: 

524 # cerror(f"No usable packages directory {packages_dir}") 

525 # sys.exit(1) 

526 

527 # Return the packages as a Path 

528 return packages_dir 

529 

530 

531def split( 

532 s: str, 

533 separator: str, 

534 strip: bool = False, 

535 keep_empty: bool = True, 

536) -> str: 

537 """Split a string into parts.""" 

538 # -- A workaround for python's "".split(",") returning ['']. 

539 s = s.split(separator) if s else [] 

540 

541 # -- Strip the elements if requested. 

542 if strip: 542 ↛ 546line 542 didn't jump to line 546 because the condition on line 542 was always true

543 s = [x.strip() for x in s] 

544 

545 # -- Remove empty elements if requested. 

546 if not keep_empty: 546 ↛ 550line 546 didn't jump to line 550 because the condition on line 546 was always true

547 s = [x for x in s if x] 

548 

549 # --All done. 

550 return s 

551 

552 

553def fpga_arch_sort_key(fpga_arch: str) -> Any: 

554 """Given an fpga arch name such as 'ice40', return a sort key 

555 got force our preferred order of sorting by architecture. Used in 

556 reports such as examples, fpgas, and boards.""" 

557 

558 # -- The preferred order of architectures, Add more if adding new 

559 # -- architectures. 

560 archs = ["ice40", "ecp5", "gowin"] 

561 

562 # -- Primary key with preferred architecture first and in the 

563 # -- preferred order. 

564 primary_key = archs.index(fpga_arch) if fpga_arch in archs else len(archs) 

565 

566 # -- Construct the key, unknown architectures list at the end by 

567 # -- lexicographic order. 

568 return (primary_key, fpga_arch) 

569 

570 

571def subprocess_call( 

572 cmd: List[str], 

573) -> int: 

574 """A helper for running subprocess.call. Exit if an error.""" 

575 

576 if is_debug(1): 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true

577 cout(f"subprocess_call: {cmd}") 

578 

579 # -- Invoke the command. 

580 exit_code = subprocess.call(cmd, shell=False) 

581 

582 if is_debug(1): 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true

583 cout(f"subprocess_call: exit code is {exit_code}") 

584 

585 # -- If ok, return. 

586 if exit_code == 0: 

587 return exit_code 

588 

589 # -- Here when error 

590 cerror(f"Command failed: {cmd}") 

591 sys.exit(1) 

592 

593 

594@contextmanager 

595def pushd(target_dir: Path): 

596 """A context manager for temporary execution in a given directory.""" 

597 prev_dir = os.getcwd() 

598 os.chdir(target_dir) 

599 try: 

600 yield 

601 finally: 

602 os.chdir(prev_dir) 

603 

604 

605def is_pyinstaller_app() -> bool: 

606 """Return true if this is a pyinstaller packaged app. 

607 Base on https://pyinstaller.org/en/stable/runtime-information.html 

608 """ 

609 return getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS") 

610 

611 

612def is_under_vscode_debugger() -> bool: 

613 """Returns true if running under VSCode debugger.""" 

614 # if os.environ.get("TERM_PROGRAM") == "vscode": 

615 # return True 

616 if os.environ.get("DEBUGPY_RUNNING"): 616 ↛ 617line 616 didn't jump to line 617 because the condition on line 616 was never true

617 return True 

618 return False 

619 

620 

621def get_platform_info() -> str: 

622 """Return a short string with additional platform info such as version.""" 

623 return platform.platform()