Coverage for apio / utils / util.py: 85%

220 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-24 01:53 +0000

1# -*- coding: utf-8 -*- 

2# -- This file is part of the Apio project 

3# -- (C) 2016-2018 FPGAwars 

4# -- Author Jesús Arroyo 

5# -- License GPLv2 

6# -- Derived from: 

7# ---- Platformio project 

8# ---- (C) 2014-2016 Ivan Kravets <me@ikravets.com> 

9# ---- License Apache v2 

10"""Misc utility functions and classes.""" 

11 

12import sys 

13import os 

14from contextlib import contextmanager 

15from enum import Enum 

16from dataclasses import dataclass 

17from typing import Optional, Any, Tuple, List 

18import subprocess 

19from threading import Thread 

20from pathlib import Path 

21import apio 

22from apio.utils import env_options 

23from apio.common.apio_console import cout, cerror 

24from apio.common.apio_styles import INFO 

25 

26 

27# ---------------------------------------- 

28# -- Constants 

29# ---------------------------------------- 

30 

31 

32class ApioException(Exception): 

33 """Apio error""" 

34 

35 

36class AsyncPipe(Thread): 

37 """A class that implements a pipe that calls back on each incoming line 

38 from an internal thread. Used to process in real time scons output to 

39 show its progress.""" 

40 

41 def __init__(self, line_callback=None): 

42 """If line_callback is not None, it is called for each line as 

43 line_callback(line:str, terminator:str) where line is the line content 

44 and terminator is one of: 

45 "\r" (CR) 

46 "\n" (LF) 

47 "" (EOF) 

48 

49 The callback is done from a private Python thread of this pipe so make 

50 sure to have the proper locking and synchronization as needed. 

51 """ 

52 

53 Thread.__init__(self) 

54 self.outcallback = line_callback 

55 

56 self._fd_read, self._fd_write = os.pipe() 

57 

58 # -- A list of lines received so far. 

59 self._lines_buffer = [] 

60 

61 self.start() 

62 

63 def get_buffer(self): 

64 """DOC: TODO""" 

65 

66 return self._lines_buffer 

67 

68 def fileno(self): 

69 """DOC: TODO""" 

70 

71 return self._fd_write 

72 

73 def _handle_incoming_line(self, bfr: bytearray, terminator: str): 

74 """Handle a new incoming line. 

75 Bfr is a bytes with the line's content, possibly empty. 

76 See __init__ for the description of terminator. 

77 """ 

78 # -- Convert the line's bytes to a string. Replace invalid utf-8 

79 # -- chars with "�" 

80 line = bfr.decode("utf-8", errors="replace") 

81 

82 # -- Append to the lines log buffer. 

83 self._lines_buffer.append(line) 

84 

85 # -- Report back if caller passed a callback. 

86 if self.outcallback: 86 ↛ exitline 86 didn't return from function '_handle_incoming_line' because the condition on line 86 was always true

87 self.outcallback(line, terminator) 

88 

89 def run(self): 

90 """DOC: TODO""" 

91 

92 # -- Prepare a buffer for collecting the line chars, excluding 

93 # -- its line terminator. 

94 bfr = bytearray() 

95 

96 # -- We open in binary mode so we have access to the line terminators. 

97 # -- This is important with progress bars which don't advance to the 

98 # -- next line but redraw on the same line. 

99 with os.fdopen(self._fd_read, "rb") as f: 

100 while True: 

101 b: Optional[bytearray] = f.read(1) 

102 assert len(b) <= 1 

103 

104 # -- Handle end of file 

105 if not b: 

106 if bfr: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 self._handle_incoming_line(bfr, "") 

108 return 

109 

110 # -- Handle \r terminator 

111 if b == b"\r": 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 self._handle_incoming_line(bfr, "\r") 

113 bfr.clear() 

114 continue 

115 

116 # -- Handle \n terminator 

117 if b == b"\n": 

118 self._handle_incoming_line(bfr, "\n") 

119 bfr.clear() 

120 continue 

121 

122 # -- Handle a regular character 

123 bfr.append(b[0]) 

124 

125 def close(self): 

126 """DOC: TODO""" 

127 

128 os.close(self._fd_write) 

129 self.join() 

130 

131 

132class TerminalMode(Enum): 

133 """Represents to two modes of stdout/err.""" 

134 

135 # Output is sent to a terminal. Terminal width is available, and text 

136 # can have ansi colors. 

137 TERMINAL = 1 

138 # Output is sent to a filter or a file. No width and ansi colors should 

139 # be avoided. 

140 PIPE = 2 

141 

142 

143def get_path_in_apio_package(subpath: str) -> Path: 

144 """Get the full path to the given folder in the apio package. 

145 Inputs: 

146 * subdir: String with a relative path within the apio package. 

147 Use "" for root directory. 

148 

149 Returns: 

150 * The absolute path as a PosixPath() object 

151 

152 Example: folder="commands" 

153 Output: PosixPath('/home/obijuan/.../apio/commands') 

154 """ 

155 

156 # -- Get the full path of this file (util.py) 

157 # -- Ex: /home/obijuan/.../site-packages/apio/util.py 

158 current_python_file = Path(__file__) 

159 

160 # -- The parent folder is the apio root folder 

161 # -- Ex: /home/obijuan/.../site-packages/apio 

162 path = current_python_file.parent.parent 

163 

164 # -- Add the given folder to the path. If subpath = "" this 

165 # -- does nothing, but fails if subpath is None. 

166 path = path / subpath 

167 

168 # -- Return the path 

169 return path 

170 

171 

172@dataclass(frozen=True) 

173class CommandResult: 

174 """Contains the results of a command (subprocess) execution.""" 

175 

176 out_text: Optional[str] = None # stdout multi-line text. 

177 err_text: Optional[str] = None # stderr multi-line text. 

178 exit_code: Optional[int] = None # Exit code, 0 = OK. 

179 

180 

181def exec_command( 

182 cmd: List[str], stdout: AsyncPipe, stderr: AsyncPipe 

183) -> CommandResult: 

184 """Execute the given command using async stdout/stderr.. 

185 

186 NOTE: When running on windows, this function does not support 

187 privilege elevation, to achieve that, use os.system() instead, as 

188 done in drivers.py 

189 

190 INPUTS: 

191 cmd: list of command token (strings) 

192 stdout: the AsyncPipe to use for stdout 

193 stderr: the AsyncPipe to use for stderr. 

194 

195 OUTPUT: 

196 A CommandResult with the command results. 

197 """ 

198 

199 # -- Sanity check. 

200 assert isinstance(cmd, list) 

201 assert isinstance(cmd[0], str) 

202 assert isinstance(stdout, AsyncPipe) 

203 assert isinstance(stderr, AsyncPipe) 

204 

205 # -- Execute the command 

206 try: 

207 with subprocess.Popen( 

208 cmd, stdout=stdout, stderr=stderr, shell=False 

209 ) as proc: 

210 

211 # -- Wait for completion. 

212 out_text, err_text = proc.communicate() 

213 

214 # -- Get status code. 

215 exit_code = proc.returncode 

216 

217 # -- Close the async pipes. 

218 stdout.close() 

219 stderr.close() 

220 

221 # -- User has pressed the Ctrl-C for aborting the command 

222 except KeyboardInterrupt: 

223 cerror("Aborted by user") 

224 # -- NOTE: If using here sys.exit(1), apio requires pressing ctl-c 

225 # -- twice when running 'apio sim'. This form of exit is more direct 

226 # -- and harder. 

227 os._exit(1) 

228 

229 # -- The command does not exist! 

230 except FileNotFoundError: 

231 cerror("Command not found:", cmd) 

232 sys.exit(1) 

233 

234 # -- Extract stdout text 

235 lines = stdout.get_buffer() 

236 out_text = "\n".join(lines) 

237 

238 # -- Extract stderr text 

239 lines = stderr.get_buffer() 

240 err_text = "\n".join(lines) 

241 

242 # -- All done. 

243 result = CommandResult(out_text, err_text, exit_code) 

244 return result 

245 

246 

247def user_directory_or_cwd( 

248 dir_arg: Optional[Path], 

249 *, 

250 description: str, 

251 must_exist: bool = False, 

252 create_if_missing=False, 

253) -> Path: 

254 """Condition a directory arg with current directory as default. If dir_arg 

255 is specified, it is return after validation, else cwd "." is returned. 

256 Description is directory function to include in error messages, e.g. 

257 "Project" or "Destination". 

258 """ 

259 

260 assert not (create_if_missing and must_exist), "Conflicting flags." 

261 

262 # -- Case 1: User provided dir path. 

263 if dir_arg: 

264 project_dir = dir_arg 

265 

266 # -- If exists, it must be a dir. 

267 if project_dir.exists() and not project_dir.is_dir(): 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true

268 cerror(f"{description} directory is a file: {project_dir}") 

269 sys.exit(1) 

270 

271 # -- If required, it must exist. 

272 if must_exist and not project_dir.exists(): 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 cerror(f"{description} directory is missing: {str(project_dir)}") 

274 sys.exit(1) 

275 

276 # -- If requested, create 

277 if create_if_missing and not project_dir.exists(): 

278 cout(f"Creating folder: {project_dir}") 

279 project_dir.mkdir(parents=True) 

280 

281 # -- All done. 

282 return project_dir 

283 

284 # -- Case 2: Using current directory. 

285 # -- We prefer the relative path "." over the absolute path Path.cwd(). 

286 return Path(".") 

287 

288 

289def get_python_version() -> str: 

290 """Return a string with the python version""" 

291 

292 return f"{sys.version_info[0]}.{sys.version_info[1]}" 

293 

294 

295def get_python_ver_tuple() -> Tuple[int, int, int]: 

296 """Return a tuple with the python version. e.g. (3, 12, 1).""" 

297 return sys.version_info[:3] 

298 

299 

300def plurality( 

301 obj: Any, singular: str, plural: str = None, include_num: bool = True 

302) -> str: 

303 """Returns singular or plural based on the size of the object.""" 

304 # -- Figure out the size of the object 

305 if isinstance(obj, int): 

306 n = obj 

307 else: 

308 n = len(obj) 

309 

310 # -- For value of 1 return the singular form. 

311 if n == 1: 

312 if include_num: 312 ↛ 314line 312 didn't jump to line 314 because the condition on line 312 was always true

313 return f"{n} {singular}" 

314 return singular 

315 

316 # -- For all other values, return the plural form. 

317 if plural is None: 317 ↛ 320line 317 didn't jump to line 320 because the condition on line 317 was always true

318 plural = singular + "s" 

319 

320 if include_num: 

321 return f"{n} {plural}" 

322 return plural 

323 

324 

325def list_plurality(str_list: List[str], conjunction: str) -> str: 

326 """Format a list as a human friendly string.""" 

327 # -- This is a programming error. Not a user error. 

328 assert str_list, "list_plurality expect len() >= 1." 

329 

330 # -- Handle the case of a single item. 

331 if len(str_list) == 1: 

332 return str_list[0] 

333 

334 # -- Handle the case of 2 items. 

335 if len(str_list) == 2: 

336 return f"{str_list[0]} {conjunction} {str_list[1]}" 

337 

338 # -- Handle the case of three or more items. 

339 return ", ".join(str_list[:-1]) + f", {conjunction} {str_list[-1]}" 

340 

341 

342def debug_level() -> int: 

343 """Returns the current debug level, with 0 as 'off'.""" 

344 

345 # -- We get a fresh value so it can be adjusted dynamically when needed. 

346 level_str = env_options.get(env_options.APIO_DEBUG, "0") 

347 try: 

348 level_int = int(level_str) 

349 except ValueError: 

350 cerror(f"APIO_DEBUG value '{level_str}' is not an int.") 

351 sys.exit(1) 

352 

353 # -- All done. We don't validate the value, assuming the user knows how 

354 # -- to use it. 

355 return level_int 

356 

357 

358def is_debug(level: int) -> bool: 

359 """Returns True if apio is in debug mode level 'level' or higher. Use 

360 it to enable printing of debug information but not to modify the behavior 

361 of the code. Also, all apio tests should be performed with debug 

362 disabled. Important debug information should be at level 1 while 

363 less important or spammy should be at higher levels.""" 

364 # -- Sanity check. A failure is indicates a programming error. 

365 assert isinstance(level, int), type(level) 

366 assert 1 <= level <= 10, level 

367 

368 return debug_level() >= level 

369 

370 

371def get_apio_version_tuple() -> Tuple[int]: 

372 """Returns the version of the apio package as tuple of 3 ints.""" 

373 # -- Apio's version is defined in the __init__.py file of the apio package. 

374 # -- Using the version from a file in the apio package rather than from 

375 # -- the pip metadata makes apio more self contained, for example when 

376 # -- installing with pyinstaller rather than with pip. 

377 ver: Tuple[int] = apio.VERSION 

378 assert len(ver) == 3, ver 

379 assert isinstance(ver[0], int) 

380 assert isinstance(ver[1], int) 

381 assert isinstance(ver[2], int) 

382 return ver 

383 

384 

385def get_apio_version_str() -> str: 

386 """Returns the version of the apio package as a string like "1.22.3".""" 

387 ver: Tuple[int] = get_apio_version_tuple() 

388 return f"{ver[0]}.{ver[1]}.{ver[2]}" 

389 

390 

391def _check_apio_dir(apio_dir: Path, desc: str, env_var: str): 

392 """Checks the apio home dir or packages dir path for the apio 

393 requirements.""" 

394 

395 # Sanity check. If this fails, it's a programming error. 

396 assert isinstance( 

397 apio_dir, Path 

398 ), f"Error: {desc} is no a Path: {type(apio_dir)}, {apio_dir}" 

399 

400 # -- The path should be absolute, see discussion here: 

401 # -- https://github.com/FPGAwars/apio/issues/522 

402 if not apio_dir.is_absolute(): 

403 cerror( 

404 f"Apio {desc} should be an absolute path " f"[{str(apio_dir)}].", 

405 ) 

406 cout( 

407 f"You can use the system env var {env_var} to set " 

408 f"a different apio {desc}.", 

409 style=INFO, 

410 ) 

411 sys.exit(1) 

412 

413 # -- We have problem with spaces and non ascii character above value 

414 # -- 127, so we allow only ascii characters in the range [33, 127]. 

415 # -- See here https://github.com/FPGAwars/apio/issues/515 

416 for ch in str(apio_dir): 

417 if ord(ch) < 33 or ord(ch) > 127: 

418 cerror( 

419 f"Unsupported character [{ch}] in apio {desc}: " 

420 f"[{str(apio_dir)}].", 

421 ) 

422 cout( 

423 "Only the ASCII characters in the range 33 to 127 are " 

424 "allowed. You can use the\n" 

425 f"system env var '{env_var}' to set a different apio" 

426 f"{desc}.", 

427 style=INFO, 

428 ) 

429 sys.exit(1) 

430 

431 

432def resolve_home_dir() -> Path: 

433 """Get the absolute apio home dir. This is the apio folder where the 

434 profile is located and the packages are installed. 

435 The apio home dir can be overridden using the APIO_HOME environment 

436 variable. If not set, the user_home/.apio folder is used by default: 

437 Ej. Linux: /home/obijuan/.apio 

438 If the folders does not exist, they are created 

439 """ 

440 

441 # -- Get the optional apio home env. 

442 apio_home_dir_env = env_options.get(env_options.APIO_HOME, default=None) 

443 

444 # -- If the env vars specified an home dir then use it. 

445 if apio_home_dir_env: 

446 # -- Expand user home '~' marker, if exists. 

447 apio_home_dir_env = os.path.expanduser(apio_home_dir_env) 

448 # -- Expand varas such as $HOME or %HOME% on windows. 

449 apio_home_dir_env = os.path.expandvars(apio_home_dir_env) 

450 # -- Convert string to path. 

451 home_dir = Path(apio_home_dir_env) 

452 else: 

453 # -- Else, use the default home dir ~/.apio. 

454 home_dir = Path.home() / ".apio" 

455 

456 # -- Verify that the home dir meets apio's requirements. 

457 _check_apio_dir(home_dir, "home dir", "APIO_HOME") 

458 

459 # -- Create the folder if it does not exist 

460 try: 

461 home_dir.mkdir(parents=True, exist_ok=True) 

462 except PermissionError: 

463 cerror(f"No usable home directory {home_dir}") 

464 sys.exit(1) 

465 

466 # Return the home_dir as a Path 

467 return home_dir 

468 

469 

470def resolve_packages_dir(apio_home_dir: Path) -> Path: 

471 """Get the absolute apio packages dir. This is the apio folder where the 

472 packages are installed. The default apio packages dir can be overridden 

473 using the APIO_PACKAGES environment variable. If not set, 

474 the <apio-home>/packages folder is used by default: 

475 Ej. Linux: /home/obijuan/.apio/packages 

476 If the folders does not exist, they are created 

477 """ 

478 

479 # -- Get the optional apio packages env. 

480 apio_packages_dir_env = env_options.get( 

481 env_options.APIO_PACKAGES, default=None 

482 ) 

483 

484 # -- If the env vars specified an packages dir then use it. 

485 if apio_packages_dir_env: 485 ↛ 502line 485 didn't jump to line 502 because the condition on line 485 was always true

486 # -- Verify that the env variable contains 'packages' to make sure we 

487 # -- don't clobber system directories. 

488 if "packages" not in apio_packages_dir_env: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 cerror( 

490 "Apio packages dir APIO_PACKAGES should include the " 

491 "string 'packages'." 

492 ) 

493 sys.exit(1) 

494 # -- Expand user home '~' marker, if exists. 

495 apio_packages_dir_env = os.path.expanduser(apio_packages_dir_env) 

496 # -- Expand varas such as $HOME or %HOME% on windows. 

497 apio_packages_dir_env = os.path.expandvars(apio_packages_dir_env) 

498 # -- Convert string to path. 

499 packages_dir = Path(apio_packages_dir_env) 

500 else: 

501 # -- Else, use the default <home_dir>/packages. 

502 packages_dir = apio_home_dir / "packages" 

503 

504 # -- Verify that the home dir meets apio's requirements. 

505 _check_apio_dir(packages_dir, "packages dir", "APIO_PACKAGES") 

506 

507 # -- Create the folder if it does not exist 

508 # try: 

509 # packages_dir.mkdir(parents=True, exist_ok=True) 

510 # except PermissionError: 

511 # cerror(f"No usable packages directory {packages_dir}") 

512 # sys.exit(1) 

513 

514 # Return the packages as a Path 

515 return packages_dir 

516 

517 

518def split( 

519 s: str, 

520 separator: str, 

521 strip: bool = False, 

522 keep_empty: bool = True, 

523) -> str: 

524 """Split a string into parts.""" 

525 # -- A workaround for python's "".split(",") returning ['']. 

526 s = s.split(separator) if s else [] 

527 

528 # -- Strip the elements if requested. 

529 if strip: 529 ↛ 533line 529 didn't jump to line 533 because the condition on line 529 was always true

530 s = [x.strip() for x in s] 

531 

532 # -- Remove empty elements if requested. 

533 if not keep_empty: 533 ↛ 537line 533 didn't jump to line 537 because the condition on line 533 was always true

534 s = [x for x in s if x] 

535 

536 # --All done. 

537 return s 

538 

539 

540def fpga_arch_sort_key(fpga_arch: str) -> Any: 

541 """Given an fpga arch name such as 'ice40', return a sort key 

542 got force our preferred order of sorting by architecture. Used in 

543 reports such as examples, fpgas, and boards.""" 

544 

545 # -- The preferred order of architectures, Add more if adding new 

546 # -- architectures. 

547 archs = ["ice40", "ecp5", "gowin"] 

548 

549 # -- Primary key with preferred architecture first and in the 

550 # -- preferred order. 

551 primary_key = archs.index(fpga_arch) if fpga_arch in archs else len(archs) 

552 

553 # -- Construct the key, unknown architectures list at the end by 

554 # -- lexicographic order. 

555 return (primary_key, fpga_arch) 

556 

557 

558def subprocess_call( 

559 cmd: List[str], 

560) -> int: 

561 """A helper for running subprocess.call. Exit if an error.""" 

562 

563 if is_debug(1): 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true

564 cout(f"subprocess_call: {cmd}") 

565 

566 # -- Invoke the command. 

567 exit_code = subprocess.call(cmd, shell=False) 

568 

569 if is_debug(1): 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true

570 cout(f"subprocess_call: exit code is {exit_code}") 

571 

572 # -- If ok, return. 

573 if exit_code == 0: 

574 return exit_code 

575 

576 # -- Here when error 

577 cerror(f"Command failed: {cmd}") 

578 sys.exit(1) 

579 

580 

581@contextmanager 

582def pushd(target_dir: Path): 

583 """A context manager for temporary execution in a given directory.""" 

584 prev_dir = os.getcwd() 

585 os.chdir(target_dir) 

586 try: 

587 yield 

588 finally: 

589 os.chdir(prev_dir) 

590 

591 

592def is_pyinstaller_app() -> bool: 

593 """Return true if this is a pyinstaller packaged app. 

594 Base on https://pyinstaller.org/en/stable/runtime-information.html 

595 """ 

596 return getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS") 

597 

598 

599def is_under_vscode_debugger() -> bool: 

600 """Returns true if running under VSCode debugger.""" 

601 # if os.environ.get("TERM_PROGRAM") == "vscode": 

602 # return True 

603 if os.environ.get("DEBUGPY_RUNNING"): 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 return True 

605 return False