Coverage for apio/managers/packages.py: 69%
206 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-06 10:20 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-06 10:20 +0000
1# -*- coding: utf-8 -*-
2# -- This file is part of the Apio project
3# -- (C) 2016-2021 FPGAwars
4# -- Author Jesús Arroyo
5# -- License GPLv2
6"""Package install/uninstall functionality.
7Used by the 'apio packages' command.
8"""
10import sys
11import os
12from dataclasses import dataclass
13from typing import Dict, List
14from pathlib import Path
15import shutil
16from apio.common.apio_console import cout, cerror, cstyle
17from apio.common.apio_styles import WARNING, ERROR, SUCCESS, EMPH3
18from apio.managers.downloader import FileDownloader
19from apio.managers.unpacker import FileUnpacker
20from apio.utils import util
21from apio.profile import Profile, PackageRemoteConfig
24@dataclass(frozen=True)
25class PackagesContext:
26 """Context for package managements operations.
27 This class provides the information needed for package management
28 operations. This is a subset of the information contained by ApioContext
29 and we use it, instead of passing the ApioContext, because we need to
30 perform package management operations (e.g. updating packages) before
31 the ApioContext object is fully initialized.
32 """
34 # -- Same as ApioContext.profile
35 profile: Profile
36 # -- Same as ApioContext.required_packages
37 required_packages: Dict
38 # -- Same as ApioContext.platform_id
39 platform_id: str
40 # -- Same as ApioContext.packages_dir
41 packages_dir: str
43 def __post_init__(self):
44 """Assert that all fields initialized to actual values."""
45 assert self.profile
46 assert self.required_packages
47 assert self.platform_id
48 assert self.packages_dir
51def _construct_package_download_url(
52 packages_ctx: PackagesContext,
53 target_version: str,
54 package_remote_config: PackageRemoteConfig,
55) -> str:
56 """Construct the download URL for the given package name and version."""
58 # -- Convert the version to "YYYY-MM-DD"
59 # -- Move to a function in util.py.
60 version_tokens = target_version.split(".")
61 assert len(version_tokens) == 3, version_tokens
62 yyyy_mm_dd = (
63 f"{int(version_tokens[0]):04d}"
64 + "-"
65 + f"{int(version_tokens[1]):02d}"
66 + "-"
67 + f"{int(version_tokens[2]):02d}"
68 )
70 # -- Create vars mapping.
71 url_vars = {
72 "${PLATFORM}": packages_ctx.platform_id,
73 "${YYYY-MM-DD}": yyyy_mm_dd,
74 "${YYYYMMDD}": yyyy_mm_dd.replace("-", ""),
75 }
76 if util.is_debug(1): 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 cout(f"Package URL vars: {url_vars}")
79 # -- Define the url parts.
80 url_parts = [
81 "https://github.com/",
82 package_remote_config.repo_organization,
83 "/",
84 package_remote_config.repo_name,
85 "/releases/download/",
86 package_remote_config.release_tag,
87 "/",
88 package_remote_config.release_file,
89 ]
91 if util.is_debug(1): 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 cout(f"package url parts = {url_parts}")
94 # -- Concatanate the URL parts.
95 url = "".join(url_parts)
97 if util.is_debug(1): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 cout(f"Combined package url: {url}")
100 # -- Replace placeholders with values.
101 for name, val in url_vars.items():
102 url = url.replace(name, val)
104 if util.is_debug(1): 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 cout(f"Resolved package url: {url}")
107 # -- All done.
108 return url
111def _download_package_file(url: str, dir_path: Path) -> str:
112 """Download the given file (url). Return the path of local destination
113 file. Exits with a user message and error code if any error.
115 * INPUTS:
116 * url: File to download
117 * OUTPUTS:
118 * The path of the destination file
119 """
121 filepath = None
123 try:
124 # -- Object for downloading the file
125 downloader = FileDownloader(url, dir_path)
127 # -- Get the destination path
128 filepath = downloader.destination
130 downloader.start()
132 # -- If the user press Ctrl-C (Abort)
133 except KeyboardInterrupt:
135 # -- Remove the file
136 if filepath and filepath.is_file():
137 filepath.unlink()
139 # -- Inform the user
140 cout("User aborted download", style=ERROR)
141 sys.exit(1)
143 except IOError as exc:
144 cout("I/O error while downloading", style=ERROR)
145 cout(str(exc), style=ERROR)
146 sys.exit(1)
148 except util.ApioException:
149 cerror("Package not found")
150 sys.exit(1)
152 # -- Return the destination path
153 return filepath
156def _unpack_package_file(package_file: Path, package_dir: Path) -> None:
157 """Unpack the package_file in the package_dir directory.
158 Exit with an error message and error status if any error."""
160 # -- Create the unpacker.
161 operation = FileUnpacker(package_file, package_dir)
163 # -- Perform the operation.
164 ok = operation.start()
166 # -- Exit if error.
167 if not ok: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 cerror(f"Failed to unpack package file {package_file}")
169 sys.exit(1)
172def _delete_package_dir(
173 packages_ctx: PackagesContext, package_name: str, verbose: bool
174) -> bool:
175 """Delete the directory of the package with given name. Returns
176 True if the packages existed. Exits with an error message on error."""
177 package_dir = packages_ctx.packages_dir / package_name
179 dir_found = package_dir.is_dir()
180 if dir_found:
181 if verbose: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 cout(f"Deleting {str(package_dir)}")
184 # -- Sanity check the path and delete.
185 assert "packages" in str(package_dir).lower(), package_dir
186 shutil.rmtree(package_dir)
188 if package_dir.exists(): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true
189 cerror(f"Directory deletion failed: {str(package_dir.absolute())}")
190 sys.exit(1)
192 return dir_found
195def scan_and_fix_packages(packages_ctx: PackagesContext) -> bool:
196 """Scan the packages and fix if there are errors. Returns true
197 if the packages are installed ok."""
199 # -- Scan the packages.
200 scan = scan_packages(packages_ctx)
202 # -- If there are fixable errors, fix them.
203 if scan.num_errors_to_fix() > 0:
204 _fix_packages(packages_ctx, scan)
206 # -- Return a flag that indicates if all packages are installed ok. We
207 # -- use a scan from before the fixing but the fixing does not touch
208 # -- installed ok packages.
209 return scan.packages_installed_ok()
212def install_missing_packages_on_the_fly(
213 packages_ctx: PackagesContext, verbose=False
214) -> None:
215 """Install on the fly any missing packages. Does not print a thing if
216 all packages are already ok. This function is intended for on demand
217 package fetching by commands such as apio build, and thus is allowed
218 to use fetched remote config instead of fetching a fresh one."""
220 # -- Scan and fix broken package.
221 # -- Since this is a on-the-fly operation, we don't require a fresh
222 # -- remote config file for required packages versions.
223 installed_ok = scan_and_fix_packages(packages_ctx)
225 # -- If all the packages are installed, we are done.
226 if installed_ok: 226 ↛ 234line 226 didn't jump to line 234 because the condition on line 226 was always true
227 return
229 # -- Here when we need to install some packages. Since we just fixed
230 # -- we can't have broken or packages with version mismatch, just
231 # -- installed ok, and not installed.
232 # --
233 # -- Get lists of installed and required packages.
234 installed_packages = packages_ctx.profile.installed_packages
235 required_packages_names = packages_ctx.required_packages.keys()
237 # -- Install any required package that is not installed.
238 for package_name in required_packages_names:
239 if package_name not in installed_packages:
240 install_package(
241 packages_ctx,
242 package_name=package_name,
243 force_reinstall=False,
244 verbose=verbose,
245 )
247 # -- Here all packages should be ok but we check again just in case.
248 scan_results = scan_packages(packages_ctx)
249 if not scan_results.is_all_ok():
250 cout(
251 "Warning: packages issues detected. Use "
252 "'apio packages list' to investigate.",
253 style=WARNING,
254 )
257def install_package(
258 packages_ctx: PackagesContext,
259 *,
260 package_name: str,
261 force_reinstall: bool,
262 verbose: bool,
263) -> None:
264 """Install a given package.
266 'packages_ctx' is the context object of this apio invocation.
267 'package_name' is the package name, e.g. 'examples' or 'oss-cad-suite'.
268 'force' indicates if to perform the installation even if a matching
269 package is already installed.
270 'explicit' indicates that the user specified the package name(s) explicitly
271 and thus expect more feedback in case of a 'no change'
272 'verbose' indicates if to print extra information.
274 Returns normally if no error, exits the program with an error status
275 and a user message if an error is detected.
276 """
278 # -- Caller is responsible to check check that package name is valid
279 # -- on this platform.
280 assert package_name in packages_ctx.required_packages, package_name
282 # -- Set up installation announcement
283 pending_announcement = cstyle(
284 f"Installing apio package '{package_name}'", style=EMPH3
285 )
287 # -- If in chatty mode, announce now and clear. Otherwise we will
288 # -- announce later only if actually installing.
289 if verbose: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 cout(pending_announcement)
291 pending_announcement = None
293 # -- Get package remote config from the cache. Caller can refresh the
294 # -- cache with the latest remote config if desired.
295 package_config: PackageRemoteConfig = (
296 packages_ctx.profile.get_package_config(package_name)
297 )
299 # -- Get the version we should have.
300 target_version = package_config.release_version
302 # -- If not forcing and the target version already installed then
303 # -- nothing to do and we leave quietly.
304 if not force_reinstall:
305 # -- Get the version of the installed package, None if not installed.
306 installed_version, package_platform_id = (
307 packages_ctx.profile.get_installed_package_info(package_name)
308 )
310 if verbose: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true
311 cout(
312 f"Installed version: {installed_version} "
313 f"({package_platform_id})"
314 )
316 # -- If the installed and the target versions are the same then
317 # -- nothing to do.
318 if (
319 target_version == installed_version
320 and package_platform_id == packages_ctx.platform_id
321 ):
322 if verbose: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 cout(
324 f"Version {target_version} ({package_platform_id}) "
325 "already installed",
326 style=SUCCESS,
327 )
328 return
330 # -- Here we need to fetch and install so can be more chatty.
332 # -- Here we actually do the work. Announce if we haven't done it yet.
333 if pending_announcement: 333 ↛ 337line 333 didn't jump to line 337 because the condition on line 333 was always true
334 cout(pending_announcement)
335 pending_announcement = True
337 cout(f"Fetching version {target_version} ({packages_ctx.platform_id})")
339 # -- Construct the download URL.
340 download_url = _construct_package_download_url(
341 packages_ctx, target_version, package_config
342 )
343 if verbose: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true
344 cout(f"Download URL: {download_url}")
346 # -- Prepare the packages directory.
347 packages_ctx.packages_dir.mkdir(exist_ok=True)
349 # -- Prepare the package directory.
350 # package_dir = packages_ctx.get_package_dir(package_name)
351 package_dir = packages_ctx.packages_dir / package_name
352 cout(f"Package dir: {package_dir}")
354 # -- Download the package file from the remote server.
355 local_package_file = _download_package_file(
356 download_url, packages_ctx.packages_dir
357 )
358 if verbose: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true
359 cout(f"Local package file: {local_package_file}")
361 # -- Delete the old package dir, if exists, to avoid name conflicts and
362 # -- left over files.
363 _delete_package_dir(packages_ctx, package_name, verbose)
365 # -- Unpack the package. This creates a new package dir.
366 _unpack_package_file(local_package_file, package_dir)
368 # -- Remove the package file. We don't need it anymore.
369 if verbose: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true
370 cout(f"Deleting package file {local_package_file}")
371 local_package_file.unlink()
373 # -- Add package to profile and save.
374 packages_ctx.profile.add_package(
375 package_name, target_version, packages_ctx.platform_id, download_url
376 )
377 # packages_ctx.profile.save()
379 # -- Inform the user!
380 cout(f"Package '{package_name}' installed successfully", style=SUCCESS)
383def _fix_packages(
384 packages_ctx: PackagesContext, scan: "PackageScanResults"
385) -> None:
386 """If the package scan result contains errors, fix them."""
388 for package_name in scan.bad_version_package_names: 388 ↛ 389line 388 didn't jump to line 389 because the loop on line 388 never started
389 cout(f"Uninstalling incompatible version of '{package_name}'")
390 _delete_package_dir(packages_ctx, package_name, verbose=False)
391 packages_ctx.profile.remove_package(package_name)
393 for package_name in scan.broken_package_names:
394 cout(f"Uninstalling broken package '{package_name}'")
395 _delete_package_dir(packages_ctx, package_name, verbose=False)
396 packages_ctx.profile.remove_package(package_name)
398 for package_name in scan.orphan_package_names: 398 ↛ 399line 398 didn't jump to line 399 because the loop on line 398 never started
399 cout(f"Uninstalling unknown package '{package_name}'")
400 packages_ctx.profile.remove_package(package_name)
402 for dir_name in scan.orphan_dir_names:
403 cout(f"Deleting unknown package dir '{dir_name}'")
404 # -- Sanity check. Since packages_ctx.packages_dir is guaranteed to
405 # -- include the word packages, this can fail only due to programming
406 # -- error.
407 dir_path = packages_ctx.packages_dir / dir_name
408 assert "packages" in str(dir_path).lower(), dir_path
409 # -- Delete.
410 shutil.rmtree(dir_path)
412 for file_name in scan.orphan_file_names: 412 ↛ 413line 412 didn't jump to line 413 because the loop on line 412 never started
413 cout(f"Deleting unknown package file '{file_name}'")
414 # -- Sanity check. Since packages_ctx.packages_dir is guaranteed to
415 # -- include the word packages, this can fail only due to programming
416 # -- error.
417 file_path = packages_ctx.packages_dir / file_name
418 assert "packages" in str(file_path).lower(), dir_path
419 # -- Delete.
420 file_path.unlink()
423@dataclass
424class PackageScanResults:
425 """Represents results of packages scan."""
427 # -- Normal and Error. Packages in required_packages that are installed
428 # -- regardless if the version matches or not.
429 installed_ok_package_names: List[str]
430 # -- Error. Packages in required_packages that are installed but with
431 # -- version mismatch.
432 bad_version_package_names: List[str]
433 # -- Normal. Packages in required_packages that are uninstalled properly.
434 uninstalled_package_names: List[str]
435 # -- Error. Packages in required_packages with broken installation. E.g,
436 # -- registered in profile but package directory is missing.
437 broken_package_names: List[str]
438 # -- Error. Packages that are marked in profile as registered but are not
439 # -- in required_packages.
440 orphan_package_names: List[str]
441 # -- Error. Basenames of directories in packages dir that don't match
442 # -- folder_name of packages in required_packages.
443 orphan_dir_names: List[str]
444 # -- Error. Basenames of all files in packages directory. That directory is
445 # -- expected to contain only directories for packages.a
446 orphan_file_names: List[str]
448 def packages_installed_ok(self) -> bool:
449 """Returns true if all packages are installed ok, regardless of
450 other fixable errors."""
451 return (
452 len(self.bad_version_package_names) == 0
453 and len(self.uninstalled_package_names) == 0
454 and len(self.broken_package_names) == 0
455 )
457 def num_errors_to_fix(self) -> bool:
458 """Returns the number of errors that required , having a non installed
459 packages is not considered an error that need to be fix."""
460 return (
461 len(self.bad_version_package_names)
462 + len(self.broken_package_names)
463 + len(self.orphan_package_names)
464 + len(self.orphan_dir_names)
465 + len(self.orphan_file_names)
466 )
468 def is_all_ok(self) -> bool:
469 """Return True if all packages are installed properly with no
470 issues."""
471 return (
472 not self.num_errors_to_fix() and not self.uninstalled_package_names
473 )
475 def dump(self):
476 """Dump the content of this object. For debugging."""
477 cout()
478 cout("Package scan results:")
479 cout(f" Installed {self.installed_ok_package_names}")
480 cout(f" bad version {self.bad_version_package_names}")
481 cout(f" Uninstalled {self.uninstalled_package_names}")
482 cout(f" Broken {self.broken_package_names}")
483 cout(f" Orphan ids {self.orphan_package_names}")
484 cout(f" Orphan dirs {self.orphan_dir_names}")
485 cout(f" Orphan files {self.orphan_file_names}")
488def package_version_ok(
489 packages_ctx: PackagesContext,
490 package_name: str,
491) -> bool:
492 """Return true if the package is both in profile and required packages
493 and its version in the profile meet the requirements in the
494 config.jsonc file. Otherwise return false."""
496 # If this package is not applicable to this platform, return False.
497 if package_name not in packages_ctx.required_packages: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true
498 return False
500 # -- If the current version is not available, the package is not installed.
501 current_ver, package_platform_id = (
502 packages_ctx.profile.get_installed_package_info(package_name)
503 )
504 if not current_ver or package_platform_id != packages_ctx.platform_id:
505 return False
507 # -- Get the package remote config.
508 package_config: PackageRemoteConfig = (
509 packages_ctx.profile.get_package_config(package_name)
510 )
512 # -- Compare to the required version. We expect the two version to be
513 # -- normalized and ths a string comparison is sufficient.
514 return current_ver == package_config.release_version
517def scan_packages(packages_ctx: PackagesContext) -> PackageScanResults:
518 """Scans the available and installed packages and returns
519 the findings as a PackageScanResults object."""
521 # pylint: disable=too-many-branches
523 assert isinstance(packages_ctx, PackagesContext)
525 # Initialize the result with empty data.
526 result = PackageScanResults([], [], [], [], [], [], [])
528 # -- A helper set that we populate with the 'folder_name' values of the
529 # -- all the packages for this platform.
530 platform_folder_names = set()
532 # -- Scan packages ids in required_packages and populate
533 # -- the installed/uninstall/broken packages lists.
534 for package_name in packages_ctx.required_packages.keys():
535 # -- Collect package's folder names in a set. For a later use.
536 platform_folder_names.add(package_name)
538 # -- Classify the package as one of four cases.
539 in_profile = package_name in packages_ctx.profile.installed_packages
540 # has_dir = packages_ctx.get_package_dir(package_name).is_dir()
541 package_dir = packages_ctx.packages_dir / package_name
542 has_dir = package_dir.is_dir()
543 version_ok = package_version_ok(packages_ctx, package_name)
544 if in_profile and has_dir:
545 if version_ok: 545 ↛ 550line 545 didn't jump to line 550 because the condition on line 545 was always true
546 # Case 1: Package installed ok.
547 result.installed_ok_package_names.append(package_name)
548 else:
549 # -- Case 2: Package installed but version mismatch.
550 result.bad_version_package_names.append(package_name)
551 elif not in_profile and not has_dir:
552 # -- Case 3: Package not installed.
553 result.uninstalled_package_names.append(package_name)
554 else:
555 # -- Case 4: Package is broken.
556 result.broken_package_names.append(package_name)
558 # -- Scan the packages ids that are registered in profile as installed
559 # -- the ones that are not required_packages as orphans.
560 for package_name in packages_ctx.profile.installed_packages:
561 if package_name not in packages_ctx.required_packages: 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true
562 result.orphan_package_names.append(package_name)
564 # -- Scan the packages directory and identify orphan dirs and files.
565 for path in packages_ctx.packages_dir.glob("*"):
566 base_name = os.path.basename(path)
567 if path.is_dir():
568 if base_name not in platform_folder_names:
569 result.orphan_dir_names.append(base_name)
570 else:
571 # -- Skip the packages installed file, so we don't consider it as
572 # -- an orphan file.
573 # TODO Make this a const.
574 if base_name == "installed_packages.json": 574 ↛ 576line 574 didn't jump to line 576 because the condition on line 574 was always true
575 continue
576 result.orphan_file_names.append(base_name)
578 # -- Return results
579 if util.is_debug(1): 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true
580 result.dump()
582 return result