Coverage for apio / managers / packages.py: 69%
203 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-24 01:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-24 01:53 +0000
1# -*- coding: utf-8 -*-
2# -- This file is part of the Apio project
3# -- (C) 2016-2021 FPGAwars
4# -- Author Jesús Arroyo
5# -- License GPLv2
6"""Package install/uninstall functionality.
7Used by the 'apio packages' command.
8"""
10import sys
11import os
12from dataclasses import dataclass
13from typing import Dict, List
14from pathlib import Path
15import shutil
16from apio.common.apio_console import cout, cerror, cstyle
17from apio.common.apio_styles import WARNING, ERROR, SUCCESS, EMPH3
18from apio.managers.downloader import FileDownloader
19from apio.managers.unpacker import FileUnpacker
20from apio.utils import util
21from apio.profile import Profile, PackageRemoteConfig
24@dataclass(frozen=True)
25class PackagesContext:
26 """Context for package managements operations.
27 This class provides the information needed for package management
28 operations. This is a subset of the information contained by ApioContext
29 and we use it, instead of passing the ApioContext, because we need to
30 perform package management operations (e.g. updating packages) before
31 the ApioContext object is fully initialized.
32 """
34 # -- Same as ApioContext.profile
35 profile: Profile
36 # -- Same as ApioContext.required_packages
37 required_packages: Dict
38 # -- Same as ApioContext.platform_id
39 platform_id: str
40 # -- Same as ApioContext.packages_dir
41 packages_dir: str
43 def __post_init__(self):
44 """Assert that all fields initialized to actual values."""
45 assert self.profile
46 assert self.required_packages
47 assert self.platform_id
48 assert self.packages_dir
51def _construct_package_download_url(
52 packages_ctx: PackagesContext,
53 package_remote_config: PackageRemoteConfig,
54) -> str:
55 """Construct the download URL for the given package name and version."""
57 # -- Create vars mapping.
58 url_vars = {
59 "${PLATFORM}": packages_ctx.platform_id,
60 "${YYYYMMDD}": package_remote_config.release_tag.replace("-", ""),
61 }
62 if util.is_debug(1): 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 cout(f"Package URL vars: {url_vars}")
65 # -- Define the url parts.
66 url_parts = [
67 "https://github.com/",
68 package_remote_config.repo_organization,
69 "/",
70 package_remote_config.repo_name,
71 "/releases/download/",
72 package_remote_config.release_tag,
73 "/",
74 package_remote_config.release_file,
75 ]
77 if util.is_debug(1): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 cout(f"package url parts = {url_parts}")
80 # -- Concatenate the URL parts.
81 url = "".join(url_parts)
83 if util.is_debug(1): 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 cout(f"Combined package url: {url}")
86 # -- Replace placeholders with values.
87 for name, val in url_vars.items():
88 url = url.replace(name, val)
90 if util.is_debug(1): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 cout(f"Resolved package url: {url}")
93 # -- All done.
94 return url
97def _download_package_file(url: str, dir_path: Path) -> str:
98 """Download the given file (url). Return the path of local destination
99 file. Exits with a user message and error code if any error.
101 * INPUTS:
102 * url: File to download
103 * OUTPUTS:
104 * The path of the destination file
105 """
107 filepath = None
109 try:
110 # -- Object for downloading the file
111 downloader = FileDownloader(url, dir_path)
113 # -- Get the destination path
114 filepath = downloader.destination
116 downloader.start()
118 # -- If the user press Ctrl-C (Abort)
119 except KeyboardInterrupt:
121 # -- Remove the file
122 if filepath and filepath.is_file():
123 filepath.unlink()
125 # -- Inform the user
126 cout("User aborted download", style=ERROR)
127 sys.exit(1)
129 except IOError as exc:
130 cout("I/O error while downloading", style=ERROR)
131 cout(str(exc), style=ERROR)
132 sys.exit(1)
134 except util.ApioException:
135 cerror("Package not found")
136 sys.exit(1)
138 # -- Return the destination path
139 return filepath
142def _unpack_package_file(package_file: Path, package_dir: Path) -> None:
143 """Unpack the package_file in the package_dir directory.
144 Exit with an error message and error status if any error."""
146 # -- Create the unpacker.
147 operation = FileUnpacker(package_file, package_dir)
149 # -- Perform the operation.
150 ok = operation.start()
152 # -- Exit if error.
153 if not ok: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true
154 cerror(f"Failed to unpack package file {package_file}")
155 sys.exit(1)
158def _delete_package_dir(
159 packages_ctx: PackagesContext, package_name: str, verbose: bool
160) -> bool:
161 """Delete the directory of the package with given name. Returns
162 True if the packages existed. Exits with an error message on error."""
163 package_dir = packages_ctx.packages_dir / package_name
165 dir_found = package_dir.is_dir()
166 if dir_found:
167 if verbose: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 cout(f"Deleting {str(package_dir)}")
170 # -- Sanity check the path and delete.
171 assert "packages" in str(package_dir).lower(), package_dir
172 shutil.rmtree(package_dir)
174 if package_dir.exists(): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 cerror(f"Directory deletion failed: {str(package_dir.absolute())}")
176 sys.exit(1)
178 return dir_found
181def scan_and_fix_packages(packages_ctx: PackagesContext) -> bool:
182 """Scan the packages and fix if there are errors. Returns true
183 if the packages are installed ok."""
185 # -- Scan the packages.
186 scan = scan_packages(packages_ctx)
188 # -- If there are fixable errors, fix them.
189 if scan.num_errors_to_fix() > 0:
190 _fix_packages(packages_ctx, scan)
192 # -- Return a flag that indicates if all packages are installed ok. We
193 # -- use a scan from before the fixing but the fixing does not touch
194 # -- installed ok packages.
195 return scan.packages_installed_ok()
198def install_missing_packages_on_the_fly(
199 packages_ctx: PackagesContext, verbose=False
200) -> None:
201 """Install on the fly any missing packages. Does not print a thing if
202 all packages are already ok. This function is intended for on demand
203 package fetching by commands such as apio build, and thus is allowed
204 to use fetched remote config instead of fetching a fresh one."""
206 # -- Scan and fix broken package.
207 # -- Since this is a on-the-fly operation, we don't require a fresh
208 # -- remote config file for required packages versions.
209 installed_ok = scan_and_fix_packages(packages_ctx)
211 # -- If all the packages are installed, we are done.
212 if installed_ok: 212 ↛ 220line 212 didn't jump to line 220 because the condition on line 212 was always true
213 return
215 # -- Here when we need to install some packages. Since we just fixed
216 # -- we can't have broken or packages with version mismatch, just
217 # -- installed ok, and not installed.
218 # --
219 # -- Get lists of installed and required packages.
220 installed_packages = packages_ctx.profile.installed_packages
221 required_packages_names = packages_ctx.required_packages.keys()
223 # -- Install any required package that is not installed.
224 for package_name in required_packages_names:
225 if package_name not in installed_packages:
226 install_package(
227 packages_ctx,
228 package_name=package_name,
229 force_reinstall=False,
230 verbose=verbose,
231 )
233 # -- Here all packages should be ok but we check again just in case.
234 scan_results = scan_packages(packages_ctx)
235 if not scan_results.is_all_ok():
236 cout(
237 "Warning: packages issues detected. Use "
238 "'apio packages list' to investigate.",
239 style=WARNING,
240 )
243def install_package(
244 packages_ctx: PackagesContext,
245 *,
246 package_name: str,
247 force_reinstall: bool,
248 verbose: bool,
249) -> None:
250 """Install a given package.
252 'packages_ctx' is the context object of this apio invocation.
253 'package_name' is the package name, e.g. 'examples' or 'oss-cad-suite'.
254 'force' indicates if to perform the installation even if a matching
255 package is already installed.
256 'explicit' indicates that the user specified the package name(s) explicitly
257 and thus expect more feedback in case of a 'no change'
258 'verbose' indicates if to print extra information.
260 Returns normally if no error, exits the program with an error status
261 and a user message if an error is detected.
262 """
264 # -- Caller is responsible to check check that package name is valid
265 # -- on this platform.
266 assert package_name in packages_ctx.required_packages, package_name
268 # -- Set up installation announcement
269 pending_announcement = cstyle(
270 f"Installing apio package '{package_name}'", style=EMPH3
271 )
273 # -- If in chatty mode, announce now and clear. Otherwise we will
274 # -- announce later only if actually installing.
275 if verbose: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 cout(pending_announcement)
277 pending_announcement = None
279 # -- Get package remote config from the cache. Caller can refresh the
280 # -- cache with the latest remote config if desired.
281 package_config: PackageRemoteConfig = (
282 packages_ctx.profile.get_package_config(package_name)
283 )
285 # -- Get the version we should have.
286 target_version = package_config.release_version
288 # -- If not forcing and the target version already installed then
289 # -- nothing to do and we leave quietly.
290 if not force_reinstall:
291 # -- Get the version of the installed package, None if not installed.
292 installed_version, package_platform_id = (
293 packages_ctx.profile.get_installed_package_info(package_name)
294 )
296 if verbose: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 cout(
298 f"Installed version: {installed_version} "
299 f"({package_platform_id})"
300 )
302 # -- If the installed and the target versions are the same then
303 # -- nothing to do.
304 if (
305 target_version == installed_version
306 and package_platform_id == packages_ctx.platform_id
307 ):
308 if verbose: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 cout(
310 f"Version {target_version} ({package_platform_id}) "
311 "already installed",
312 style=SUCCESS,
313 )
314 return
316 # -- Here we need to fetch and install so can be more chatty.
318 # -- Here we actually do the work. Announce if we haven't done it yet.
319 if pending_announcement: 319 ↛ 323line 319 didn't jump to line 323 because the condition on line 319 was always true
320 cout(pending_announcement)
321 pending_announcement = True
323 cout(f"Fetching version {target_version} ({packages_ctx.platform_id})")
325 # -- Construct the download URL.
326 download_url = _construct_package_download_url(
327 packages_ctx, package_config
328 )
329 if verbose: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true
330 cout(f"Download URL: {download_url}")
332 # -- Prepare the packages directory.
333 packages_ctx.packages_dir.mkdir(exist_ok=True)
335 # -- Prepare the package directory.
336 # package_dir = packages_ctx.get_package_dir(package_name)
337 package_dir = packages_ctx.packages_dir / package_name
338 cout(f"Package dir: {package_dir}")
340 # -- Download the package file from the remote server.
341 local_package_file = _download_package_file(
342 download_url, packages_ctx.packages_dir
343 )
344 if verbose: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true
345 cout(f"Local package file: {local_package_file}")
347 # -- Delete the old package dir, if exists, to avoid name conflicts and
348 # -- left over files.
349 _delete_package_dir(packages_ctx, package_name, verbose)
351 # -- Unpack the package. This creates a new package dir.
352 _unpack_package_file(local_package_file, package_dir)
354 # -- Remove the package file. We don't need it anymore.
355 if verbose: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 cout(f"Deleting package file {local_package_file}")
357 local_package_file.unlink()
359 # -- Add package to profile and save.
360 packages_ctx.profile.add_package(
361 package_name, target_version, packages_ctx.platform_id, download_url
362 )
363 # packages_ctx.profile.save()
365 # -- Inform the user!
366 cout(f"Package '{package_name}' installed successfully", style=SUCCESS)
369def _fix_packages(
370 packages_ctx: PackagesContext, scan: "PackageScanResults"
371) -> None:
372 """If the package scan result contains errors, fix them."""
374 for package_name in scan.bad_version_package_names: 374 ↛ 375line 374 didn't jump to line 375 because the loop on line 374 never started
375 cout(f"Uninstalling incompatible version of '{package_name}'")
376 _delete_package_dir(packages_ctx, package_name, verbose=False)
377 packages_ctx.profile.remove_package(package_name)
379 for package_name in scan.broken_package_names:
380 cout(f"Uninstalling broken package '{package_name}'")
381 _delete_package_dir(packages_ctx, package_name, verbose=False)
382 packages_ctx.profile.remove_package(package_name)
384 for package_name in scan.orphan_package_names: 384 ↛ 385line 384 didn't jump to line 385 because the loop on line 384 never started
385 cout(f"Uninstalling unknown package '{package_name}'")
386 packages_ctx.profile.remove_package(package_name)
388 for dir_name in scan.orphan_dir_names:
389 cout(f"Deleting unknown package dir '{dir_name}'")
390 # -- Sanity check. Since packages_ctx.packages_dir is guaranteed to
391 # -- include the word packages, this can fail only due to programming
392 # -- error.
393 dir_path = packages_ctx.packages_dir / dir_name
394 assert "packages" in str(dir_path).lower(), dir_path
395 # -- Delete.
396 shutil.rmtree(dir_path)
398 for file_name in scan.orphan_file_names: 398 ↛ 399line 398 didn't jump to line 399 because the loop on line 398 never started
399 cout(f"Deleting unknown package file '{file_name}'")
400 # -- Sanity check. Since packages_ctx.packages_dir is guaranteed to
401 # -- include the word packages, this can fail only due to programming
402 # -- error.
403 file_path = packages_ctx.packages_dir / file_name
404 assert "packages" in str(file_path).lower(), dir_path
405 # -- Delete.
406 file_path.unlink()
409@dataclass
410class PackageScanResults:
411 """Represents results of packages scan."""
413 # -- Normal and Error. Packages in required_packages that are installed
414 # -- regardless if the version matches or not.
415 installed_ok_package_names: List[str]
416 # -- Error. Packages in required_packages that are installed but with
417 # -- version mismatch.
418 bad_version_package_names: List[str]
419 # -- Normal. Packages in required_packages that are uninstalled properly.
420 uninstalled_package_names: List[str]
421 # -- Error. Packages in required_packages with broken installation. E.g,
422 # -- registered in profile but package directory is missing.
423 broken_package_names: List[str]
424 # -- Error. Packages that are marked in profile as registered but are not
425 # -- in required_packages.
426 orphan_package_names: List[str]
427 # -- Error. Basenames of directories in packages dir that don't match
428 # -- folder_name of packages in required_packages.
429 orphan_dir_names: List[str]
430 # -- Error. Basenames of all files in packages directory. That directory is
431 # -- expected to contain only directories for packages.a
432 orphan_file_names: List[str]
434 def packages_installed_ok(self) -> bool:
435 """Returns true if all packages are installed ok, regardless of
436 other fixable errors."""
437 return (
438 len(self.bad_version_package_names) == 0
439 and len(self.uninstalled_package_names) == 0
440 and len(self.broken_package_names) == 0
441 )
443 def num_errors_to_fix(self) -> bool:
444 """Returns the number of errors that required , having a non installed
445 packages is not considered an error that need to be fix."""
446 return (
447 len(self.bad_version_package_names)
448 + len(self.broken_package_names)
449 + len(self.orphan_package_names)
450 + len(self.orphan_dir_names)
451 + len(self.orphan_file_names)
452 )
454 def is_all_ok(self) -> bool:
455 """Return True if all packages are installed properly with no
456 issues."""
457 return (
458 not self.num_errors_to_fix() and not self.uninstalled_package_names
459 )
461 def dump(self):
462 """Dump the content of this object. For debugging."""
463 cout()
464 cout("Package scan results:")
465 cout(f" Installed {self.installed_ok_package_names}")
466 cout(f" bad version {self.bad_version_package_names}")
467 cout(f" Uninstalled {self.uninstalled_package_names}")
468 cout(f" Broken {self.broken_package_names}")
469 cout(f" Orphan ids {self.orphan_package_names}")
470 cout(f" Orphan dirs {self.orphan_dir_names}")
471 cout(f" Orphan files {self.orphan_file_names}")
474def package_version_ok(
475 packages_ctx: PackagesContext,
476 package_name: str,
477) -> bool:
478 """Return true if the package is both in profile and required packages
479 and its version in the profile meet the requirements in the
480 config.jsonc file. Otherwise return false."""
482 # If this package is not applicable to this platform, return False.
483 if package_name not in packages_ctx.required_packages: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 return False
486 # -- If the current version is not available, the package is not installed.
487 current_ver, package_platform_id = (
488 packages_ctx.profile.get_installed_package_info(package_name)
489 )
490 if not current_ver or package_platform_id != packages_ctx.platform_id:
491 return False
493 # -- Get the package remote config.
494 package_config: PackageRemoteConfig = (
495 packages_ctx.profile.get_package_config(package_name)
496 )
498 # -- Compare to the required version. We expect the two version to be
499 # -- normalized and ths a string comparison is sufficient.
500 return current_ver == package_config.release_version
503def scan_packages(packages_ctx: PackagesContext) -> PackageScanResults:
504 """Scans the available and installed packages and returns
505 the findings as a PackageScanResults object."""
507 # pylint: disable=too-many-branches
509 assert isinstance(packages_ctx, PackagesContext)
511 # Initialize the result with empty data.
512 result = PackageScanResults([], [], [], [], [], [], [])
514 # -- A helper set that we populate with the 'folder_name' values of the
515 # -- all the packages for this platform.
516 platform_folder_names = set()
518 # -- Scan packages ids in required_packages and populate
519 # -- the installed/uninstall/broken packages lists.
520 for package_name in packages_ctx.required_packages.keys():
521 # -- Collect package's folder names in a set. For a later use.
522 platform_folder_names.add(package_name)
524 # -- Classify the package as one of four cases.
525 in_profile = package_name in packages_ctx.profile.installed_packages
526 # has_dir = packages_ctx.get_package_dir(package_name).is_dir()
527 package_dir = packages_ctx.packages_dir / package_name
528 has_dir = package_dir.is_dir()
529 version_ok = package_version_ok(packages_ctx, package_name)
530 if in_profile and has_dir:
531 if version_ok: 531 ↛ 536line 531 didn't jump to line 536 because the condition on line 531 was always true
532 # Case 1: Package installed ok.
533 result.installed_ok_package_names.append(package_name)
534 else:
535 # -- Case 2: Package installed but version mismatch.
536 result.bad_version_package_names.append(package_name)
537 elif not in_profile and not has_dir:
538 # -- Case 3: Package not installed.
539 result.uninstalled_package_names.append(package_name)
540 else:
541 # -- Case 4: Package is broken.
542 result.broken_package_names.append(package_name)
544 # -- Scan the packages ids that are registered in profile as installed
545 # -- the ones that are not required_packages as orphans.
546 for package_name in packages_ctx.profile.installed_packages:
547 if package_name not in packages_ctx.required_packages: 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true
548 result.orphan_package_names.append(package_name)
550 # -- Scan the packages directory and identify orphan dirs and files.
551 for path in packages_ctx.packages_dir.glob("*"):
552 base_name = os.path.basename(path)
553 if path.is_dir():
554 if base_name not in platform_folder_names:
555 result.orphan_dir_names.append(base_name)
556 else:
557 # -- Skip the packages installed file, so we don't consider it as
558 # -- an orphan file.
559 # TODO Make this a const.
560 if base_name == "installed_packages.json": 560 ↛ 562line 560 didn't jump to line 562 because the condition on line 560 was always true
561 continue
562 result.orphan_file_names.append(base_name)
564 # -- Return results
565 if util.is_debug(1): 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true
566 result.dump()
568 return result