Coverage for apio/utils/util.py: 85%
208 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-06 10:20 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-06 10:20 +0000
1# -*- coding: utf-8 -*-
2# -- This file is part of the Apio project
3# -- (C) 2016-2018 FPGAwars
4# -- Author Jesús Arroyo
5# -- License GPLv2
6# -- Derived from:
7# ---- Platformio project
8# ---- (C) 2014-2016 Ivan Kravets <me@ikravets.com>
9# ---- License Apache v2
10"""Misc utility functions and classes."""
12import sys
13import os
14from contextlib import contextmanager
15from enum import Enum
16from dataclasses import dataclass
17from typing import Optional, Any, Tuple, List
18import subprocess
19from threading import Thread
20from pathlib import Path
21import apio
22from apio.utils import env_options
23from apio.common.apio_console import cout, cerror
24from apio.common.apio_styles import INFO
27# ----------------------------------------
28# -- Constants
29# ----------------------------------------
32class ApioException(Exception):
33 """Apio error"""
36class AsyncPipe(Thread):
37 """A class that implements a pipe that calls back on each incoming line
38 from an internal thread. Used to process in real time scons output to
39 show its progress."""
41 def __init__(self, line_callback=None):
42 """If line_callback is not None, it is called for each line as
43 line_callback(line:str, terminator:str) where line is the line content
44 and terminator is one of:
45 "\r" (CR)
46 "\n" (LF)
47 "" (EOF)
49 The callback is done from a private Python thread of this pipe so make
50 sure to have the proper locking and synchronization as needed.
51 """
53 Thread.__init__(self)
54 self.outcallback = line_callback
56 self._fd_read, self._fd_write = os.pipe()
58 # -- A list of lines received so far.
59 self._lines_buffer = []
61 self.start()
63 def get_buffer(self):
64 """DOC: TODO"""
66 return self._lines_buffer
68 def fileno(self):
69 """DOC: TODO"""
71 return self._fd_write
73 def _handle_incoming_line(self, bfr: bytearray, terminator: str):
74 """Handle a new incoming line.
75 Bfr is a bytes with the line's content, possibly empty.
76 See __init__ for the description of terminator.
77 """
78 # -- Convert the line's bytes to a string. Replace invalid utf-8
79 # -- chars with "�"
80 line = bfr.decode("utf-8", errors="replace")
82 # -- Append to the lines log buffer.
83 self._lines_buffer.append(line)
85 # -- Report back if caller passed a callback.
86 if self.outcallback: 86 ↛ exitline 86 didn't return from function '_handle_incoming_line' because the condition on line 86 was always true
87 self.outcallback(line, terminator)
89 def run(self):
90 """DOC: TODO"""
92 # -- Prepare a buffer for collecting the line chars, excluding
93 # -- its line terminator.
94 bfr = bytearray()
96 # -- We open in binary mode so we have access to the line terminators.
97 # -- This is important with progress bars which don't advance to the
98 # -- next line but redraw on the same line.
99 with os.fdopen(self._fd_read, "rb") as f:
100 while True:
101 b: Optional[bytearray] = f.read(1)
102 assert len(b) <= 1
104 # -- Handle end of file
105 if not b:
106 if bfr: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 self._handle_incoming_line(bfr, "")
108 return
110 # -- Handle \r terminator
111 if b == b"\r": 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 self._handle_incoming_line(bfr, "\r")
113 bfr.clear()
114 continue
116 # -- Handle \n terminator
117 if b == b"\n":
118 self._handle_incoming_line(bfr, "\n")
119 bfr.clear()
120 continue
122 # -- Handle a regular character
123 bfr.append(b[0])
125 def close(self):
126 """DOC: TODO"""
128 os.close(self._fd_write)
129 self.join()
132class TerminalMode(Enum):
133 """Represents to two modes of stdout/err."""
135 # Output is sent to a terminal. Terminal width is available, and text
136 # can have ansi colors.
137 TERMINAL = 1
138 # Output is sent to a filter or a file. No width and ansi colors should
139 # be avoided.
140 PIPE = 2
143def get_path_in_apio_package(subpath: str) -> Path:
144 """Get the full path to the given folder in the apio package.
145 Inputs:
146 * subdir: String with a relative path within the apio package.
147 Use "" for root directory.
149 Returns:
150 * The absolute path as a PosixPath() object
152 Example: folder="commands"
153 Output: PosixPath('/home/obijuan/.../apio/commands')
154 """
156 # -- Get the full path of this file (util.py)
157 # -- Ex: /home/obijuan/.../site-packages/apio/util.py
158 current_python_file = Path(__file__)
160 # -- The parent folder is the apio root folder
161 # -- Ex: /home/obijuan/.../site-packages/apio
162 path = current_python_file.parent.parent
164 # -- Add the given folder to the path. If subpath = "" this
165 # -- does nothing, but fails if subpath is None.
166 path = path / subpath
168 # -- Return the path
169 return path
172@dataclass(frozen=True)
173class CommandResult:
174 """Contains the results of a command (subprocess) execution."""
176 out_text: Optional[str] = None # stdout multi-line text.
177 err_text: Optional[str] = None # stderr multi-line text.
178 exit_code: Optional[int] = None # Exit code, 0 = OK.
181def exec_command(
182 cmd: List[str], stdout: AsyncPipe, stderr: AsyncPipe
183) -> CommandResult:
184 """Execute the given command using async stdout/stderr..
186 NOTE: When running on windows, this function does not support
187 privilege elevation, to achieve that, use os.system() instead, as
188 done in drivers.py
190 INPUTS:
191 cmd: list of command token (strings)
192 stdout: the AsyncPipe to use for stdout
193 stderr: the AsyncPipe to use for stderr.
195 OUTPUT:
196 A CommandResult with the command results.
197 """
199 # -- Sanity check.
200 assert isinstance(cmd, list)
201 assert isinstance(cmd[0], str)
202 assert isinstance(stdout, AsyncPipe)
203 assert isinstance(stderr, AsyncPipe)
205 # -- Execute the command
206 try:
207 with subprocess.Popen(
208 cmd, stdout=stdout, stderr=stderr, shell=False
209 ) as proc:
211 # -- Wait for completion.
212 out_text, err_text = proc.communicate()
214 # -- Get status code.
215 exit_code = proc.returncode
217 # -- Close the async pipes.
218 stdout.close()
219 stderr.close()
221 # -- User has pressed the Ctrl-C for aborting the command
222 except KeyboardInterrupt:
223 cerror("Aborted by user")
224 # -- NOTE: If using here sys.exit(1), apio requires pressing ctl-c
225 # -- twice when running 'apio sim'. This form of exit is more direct
226 # -- and harder.
227 os._exit(1)
229 # -- The command does not exist!
230 except FileNotFoundError:
231 cerror("Command not found:", cmd)
232 sys.exit(1)
234 # -- Extract stdout text
235 lines = stdout.get_buffer()
236 out_text = "\n".join(lines)
238 # -- Extract stderr text
239 lines = stderr.get_buffer()
240 err_text = "\n".join(lines)
242 # -- All done.
243 result = CommandResult(out_text, err_text, exit_code)
244 return result
247def user_directory_or_cwd(
248 dir_arg: Optional[Path],
249 *,
250 description: str,
251 must_exist: bool = False,
252 create_if_missing=False,
253) -> Path:
254 """Condition a directory arg with current directory as default. If dir_arg
255 is specified, it is return after validation, else cwd "." is returned.
256 Description is directory function to include in error messages, e.g.
257 "Project" or "Destination".
258 """
260 assert not (create_if_missing and must_exist), "Conflicting flags."
262 # -- Case 1: User provided dir path.
263 if dir_arg:
264 project_dir = dir_arg
266 # -- If exists, it must be a dir.
267 if project_dir.exists() and not project_dir.is_dir(): 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true
268 cerror(f"{description} directory is a file: {project_dir}")
269 sys.exit(1)
271 # -- If required, it must exist.
272 if must_exist and not project_dir.exists(): 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 cerror(f"{description} directory is missing: {str(project_dir)}")
274 sys.exit(1)
276 # -- If requested, create
277 if create_if_missing and not project_dir.exists():
278 cout(f"Creating folder: {project_dir}")
279 project_dir.mkdir(parents=True)
281 # -- All done.
282 return project_dir
284 # -- Case 2: Using current directory.
285 # -- We prefer the relative path "." over the absolute path Path.cwd().
286 return Path(".")
289def get_python_version() -> str:
290 """Return a string with the python version"""
292 return f"{sys.version_info[0]}.{sys.version_info[1]}"
295def get_python_ver_tuple() -> Tuple[int, int, int]:
296 """Return a tuple with the python version. e.g. (3, 12, 1)."""
297 return sys.version_info[:3]
300def plurality(
301 obj: Any, singular: str, plural: str = None, include_num: bool = True
302) -> str:
303 """Returns singular or plural based on the size of the object."""
304 # -- Figure out the size of the object
305 if isinstance(obj, int):
306 n = obj
307 else:
308 n = len(obj)
310 # -- For value of 1 return the singular form.
311 if n == 1:
312 if include_num: 312 ↛ 314line 312 didn't jump to line 314 because the condition on line 312 was always true
313 return f"{n} {singular}"
314 return singular
316 # -- For all other values, return the plural form.
317 if plural is None: 317 ↛ 320line 317 didn't jump to line 320 because the condition on line 317 was always true
318 plural = singular + "s"
320 if include_num:
321 return f"{n} {plural}"
322 return plural
325def list_plurality(str_list: List[str], conjunction: str) -> str:
326 """Format a list as a human friendly string."""
327 # -- This is a programming error. Not a user error.
328 assert str_list, "list_plurality expect len() >= 1."
330 # -- Handle the case of a single item.
331 if len(str_list) == 1:
332 return str_list[0]
334 # -- Handle the case of 2 items.
335 if len(str_list) == 2:
336 return f"{str_list[0]} {conjunction} {str_list[1]}"
338 # -- Handle the case of three or more items.
339 return ", ".join(str_list[:-1]) + f", {conjunction} {str_list[-1]}"
342def debug_level() -> int:
343 """Returns the current debug level, with 0 as 'off'."""
345 # -- We get a fresh value so it can be adjusted dynamically when needed.
346 level_str = env_options.get(env_options.APIO_DEBUG, "0")
347 try:
348 level_int = int(level_str)
349 except ValueError:
350 cerror(f"APIO_DEBUG value '{level_str}' is not an int.")
351 sys.exit(1)
353 # -- All done. We don't validate the value, assuming the user knows how
354 # -- to use it.
355 return level_int
358def is_debug(level: int) -> bool:
359 """Returns True if apio is in debug mode level 'level' or higher. Use
360 it to enable printing of debug information but not to modify the behavior
361 of the code. Also, all apio tests should be performed with debug
362 disabled. Important debug information should be at level 1 while
363 less important or spammy should be at higher levels."""
364 # -- Sanity check. A failure is indicates a programming error.
365 assert isinstance(level, int), type(level)
366 assert 1 <= level <= 10, level
368 return debug_level() >= level
371def get_apio_version() -> str:
372 """Returns the version of the apio package."""
373 # -- Apio's version is defined in the __init__.py file of the apio package.
374 # -- Using the version from a file in the apio package rather than from
375 # -- the pip metadata makes apio more self contained, for example when
376 # -- installing with pyinstaller rather than with pip.
377 ver: Tuple[int] = apio.VERSION
378 assert len(ver) == 3, ver
379 # -- Format the tuple of three ints as a string such as "0.9.83"
380 return f"{ver[0]}.{ver[1]}.{ver[2]}"
383def _check_apio_dir(apio_dir: Path, desc: str, env_var: str):
384 """Checks the apio home dir or packages dir path for the apio
385 requirements."""
387 # Sanity check. If this fails, it's a programming error.
388 assert isinstance(
389 apio_dir, Path
390 ), f"Error: {desc} is no a Path: {type(apio_dir)}, {apio_dir}"
392 # -- The path should be absolute, see discussion here:
393 # -- https://github.com/FPGAwars/apio/issues/522
394 if not apio_dir.is_absolute():
395 cerror(
396 f"Apio {desc} should be an absolute path " f"[{str(apio_dir)}].",
397 )
398 cout(
399 f"You can use the system env var {env_var} to set "
400 f"a different apio {desc}.",
401 style=INFO,
402 )
403 sys.exit(1)
405 # -- We have problem with spaces and non ascii character above value
406 # -- 127, so we allow only ascii characters in the range [33, 127].
407 # -- See here https://github.com/FPGAwars/apio/issues/515
408 for ch in str(apio_dir):
409 if ord(ch) < 33 or ord(ch) > 127:
410 cerror(
411 f"Unsupported character [{ch}] in apio {desc}: "
412 f"[{str(apio_dir)}].",
413 )
414 cout(
415 "Only the ASCII characters in the range 33 to 127 are "
416 "allowed. You can use the\n"
417 f"system env var '{env_var}' to set a different apio"
418 f"{desc}.",
419 style=INFO,
420 )
421 sys.exit(1)
424def resolve_home_dir() -> Path:
425 """Get the absolute apio home dir. This is the apio folder where the
426 profile is located and the packages are installed.
427 The apio home dir can be overridden using the APIO_HOME environment
428 variable. If not set, the user_home/.apio folder is used by default:
429 Ej. Linux: /home/obijuan/.apio
430 If the folders does not exist, they are created
431 """
433 # -- Get the optional apio home env.
434 apio_home_dir_env = env_options.get(env_options.APIO_HOME, default=None)
436 # -- If the env vars specified an home dir then use it.
437 if apio_home_dir_env:
438 # -- Expand user home '~' marker, if exists.
439 apio_home_dir_env = os.path.expanduser(apio_home_dir_env)
440 # -- Expand varas such as $HOME or %HOME% on windows.
441 apio_home_dir_env = os.path.expandvars(apio_home_dir_env)
442 # -- Convert string to path.
443 home_dir = Path(apio_home_dir_env)
444 else:
445 # -- Else, use the default home dir ~/.apio.
446 home_dir = Path.home() / ".apio"
448 # -- Verify that the home dir meets apio's requirements.
449 _check_apio_dir(home_dir, "home dir", "APIO_HOME")
451 # -- Create the folder if it does not exist
452 try:
453 home_dir.mkdir(parents=True, exist_ok=True)
454 except PermissionError:
455 cerror(f"No usable home directory {home_dir}")
456 sys.exit(1)
458 # Return the home_dir as a Path
459 return home_dir
462def resolve_packages_dir(apio_home_dir: Path) -> Path:
463 """Get the absolute apio packages dir. This is the apio folder where the
464 packages are installed. The default apio packages dir can be overridden
465 using the APIO_PACKAGES environment variable. If not set,
466 the <apio-home>/packages folder is used by default:
467 Ej. Linux: /home/obijuan/.apio/packages
468 If the folders does not exist, they are created
469 """
471 # -- Get the optional apio packages env.
472 apio_packages_dir_env = env_options.get(
473 env_options.APIO_PACKAGES, default=None
474 )
476 # -- If the env vars specified an packages dir then use it.
477 if apio_packages_dir_env: 477 ↛ 494line 477 didn't jump to line 494 because the condition on line 477 was always true
478 # -- Verify that the env variable contains 'packages' to make sure we
479 # -- don't clobber system directories.
480 if "packages" not in apio_packages_dir_env: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true
481 cerror(
482 "Apio packages dir APIO_PACKAGES should include the "
483 "string 'packages'."
484 )
485 sys.exit(1)
486 # -- Expand user home '~' marker, if exists.
487 apio_packages_dir_env = os.path.expanduser(apio_packages_dir_env)
488 # -- Expand varas such as $HOME or %HOME% on windows.
489 apio_packages_dir_env = os.path.expandvars(apio_packages_dir_env)
490 # -- Convert string to path.
491 packages_dir = Path(apio_packages_dir_env)
492 else:
493 # -- Else, use the default <home_dir>/packages.
494 packages_dir = apio_home_dir / "packages"
496 # -- Verify that the home dir meets apio's requirements.
497 _check_apio_dir(packages_dir, "packages dir", "APIO_PACKAGES")
499 # -- Create the folder if it does not exist
500 # try:
501 # packages_dir.mkdir(parents=True, exist_ok=True)
502 # except PermissionError:
503 # cerror(f"No usable packages directory {packages_dir}")
504 # sys.exit(1)
506 # Return the packages as a Path
507 return packages_dir
510def split(
511 s: str,
512 separator: str,
513 strip: bool = False,
514 keep_empty: bool = True,
515) -> str:
516 """Split a string into parts."""
517 # -- A workaround for python's "".split(",") returning [''].
518 s = s.split(separator) if s else []
520 # -- Strip the elements if requested.
521 if strip: 521 ↛ 525line 521 didn't jump to line 525 because the condition on line 521 was always true
522 s = [x.strip() for x in s]
524 # -- Remove empty elements if requested.
525 if not keep_empty: 525 ↛ 529line 525 didn't jump to line 529 because the condition on line 525 was always true
526 s = [x for x in s if x]
528 # --All done.
529 return s
532def fpga_arch_sort_key(fpga_arch: str) -> Any:
533 """Given an fpga arch name such as 'ice40', return a sort key
534 got force our preferred order of sorting by architecture. Used in
535 reports such as examples, fpgas, and boards."""
537 # -- The preferred order of architectures, Add more if adding new
538 # -- architectures.
539 archs = ["ice40", "ecp5", "gowin"]
541 # -- Primary key with preferred architecture first and in the
542 # -- preferred order.
543 primary_key = archs.index(fpga_arch) if fpga_arch in archs else len(archs)
545 # -- Construct the key, unknown architectures list at the end by
546 # -- lexicographic order.
547 return (primary_key, fpga_arch)
550def subprocess_call(
551 cmd: List[str],
552) -> int:
553 """A helper for running subprocess.call. Exit if an error."""
555 if is_debug(1): 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true
556 cout(f"subprocess_call: {cmd}")
558 # -- Invoke the command.
559 exit_code = subprocess.call(cmd, shell=False)
561 if is_debug(1): 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true
562 cout(f"subprocess_call: exit code is {exit_code}")
564 # -- If ok, return.
565 if exit_code == 0:
566 return exit_code
568 # -- Here when error
569 cerror(f"Command failed: {cmd}")
570 sys.exit(1)
573@contextmanager
574def pushd(target_dir: Path):
575 """A context manager for temporary execution in a given directory."""
576 prev_dir = os.getcwd()
577 os.chdir(target_dir)
578 try:
579 yield
580 finally:
581 os.chdir(prev_dir)