Coverage for apio/managers/packages.py: 69%

206 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-06 10:20 +0000

1# -*- coding: utf-8 -*- 

2# -- This file is part of the Apio project 

3# -- (C) 2016-2021 FPGAwars 

4# -- Author Jesús Arroyo 

5# -- License GPLv2 

6"""Package install/uninstall functionality. 

7Used by the 'apio packages' command. 

8""" 

9 

10import sys 

11import os 

12from dataclasses import dataclass 

13from typing import Dict, List 

14from pathlib import Path 

15import shutil 

16from apio.common.apio_console import cout, cerror, cstyle 

17from apio.common.apio_styles import WARNING, ERROR, SUCCESS, EMPH3 

18from apio.managers.downloader import FileDownloader 

19from apio.managers.unpacker import FileUnpacker 

20from apio.utils import util 

21from apio.profile import Profile, PackageRemoteConfig 

22 

23 

24@dataclass(frozen=True) 

25class PackagesContext: 

26 """Context for package managements operations. 

27 This class provides the information needed for package management 

28 operations. This is a subset of the information contained by ApioContext 

29 and we use it, instead of passing the ApioContext, because we need to 

30 perform package management operations (e.g. updating packages) before 

31 the ApioContext object is fully initialized. 

32 """ 

33 

34 # -- Same as ApioContext.profile 

35 profile: Profile 

36 # -- Same as ApioContext.required_packages 

37 required_packages: Dict 

38 # -- Same as ApioContext.platform_id 

39 platform_id: str 

40 # -- Same as ApioContext.packages_dir 

41 packages_dir: str 

42 

43 def __post_init__(self): 

44 """Assert that all fields initialized to actual values.""" 

45 assert self.profile 

46 assert self.required_packages 

47 assert self.platform_id 

48 assert self.packages_dir 

49 

50 

51def _construct_package_download_url( 

52 packages_ctx: PackagesContext, 

53 target_version: str, 

54 package_remote_config: PackageRemoteConfig, 

55) -> str: 

56 """Construct the download URL for the given package name and version.""" 

57 

58 # -- Convert the version to "YYYY-MM-DD" 

59 # -- Move to a function in util.py. 

60 version_tokens = target_version.split(".") 

61 assert len(version_tokens) == 3, version_tokens 

62 yyyy_mm_dd = ( 

63 f"{int(version_tokens[0]):04d}" 

64 + "-" 

65 + f"{int(version_tokens[1]):02d}" 

66 + "-" 

67 + f"{int(version_tokens[2]):02d}" 

68 ) 

69 

70 # -- Create vars mapping. 

71 url_vars = { 

72 "${PLATFORM}": packages_ctx.platform_id, 

73 "${YYYY-MM-DD}": yyyy_mm_dd, 

74 "${YYYYMMDD}": yyyy_mm_dd.replace("-", ""), 

75 } 

76 if util.is_debug(1): 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 cout(f"Package URL vars: {url_vars}") 

78 

79 # -- Define the url parts. 

80 url_parts = [ 

81 "https://github.com/", 

82 package_remote_config.repo_organization, 

83 "/", 

84 package_remote_config.repo_name, 

85 "/releases/download/", 

86 package_remote_config.release_tag, 

87 "/", 

88 package_remote_config.release_file, 

89 ] 

90 

91 if util.is_debug(1): 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 cout(f"package url parts = {url_parts}") 

93 

94 # -- Concatanate the URL parts. 

95 url = "".join(url_parts) 

96 

97 if util.is_debug(1): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 cout(f"Combined package url: {url}") 

99 

100 # -- Replace placeholders with values. 

101 for name, val in url_vars.items(): 

102 url = url.replace(name, val) 

103 

104 if util.is_debug(1): 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 cout(f"Resolved package url: {url}") 

106 

107 # -- All done. 

108 return url 

109 

110 

111def _download_package_file(url: str, dir_path: Path) -> str: 

112 """Download the given file (url). Return the path of local destination 

113 file. Exits with a user message and error code if any error. 

114 

115 * INPUTS: 

116 * url: File to download 

117 * OUTPUTS: 

118 * The path of the destination file 

119 """ 

120 

121 filepath = None 

122 

123 try: 

124 # -- Object for downloading the file 

125 downloader = FileDownloader(url, dir_path) 

126 

127 # -- Get the destination path 

128 filepath = downloader.destination 

129 

130 downloader.start() 

131 

132 # -- If the user press Ctrl-C (Abort) 

133 except KeyboardInterrupt: 

134 

135 # -- Remove the file 

136 if filepath and filepath.is_file(): 

137 filepath.unlink() 

138 

139 # -- Inform the user 

140 cout("User aborted download", style=ERROR) 

141 sys.exit(1) 

142 

143 except IOError as exc: 

144 cout("I/O error while downloading", style=ERROR) 

145 cout(str(exc), style=ERROR) 

146 sys.exit(1) 

147 

148 except util.ApioException: 

149 cerror("Package not found") 

150 sys.exit(1) 

151 

152 # -- Return the destination path 

153 return filepath 

154 

155 

156def _unpack_package_file(package_file: Path, package_dir: Path) -> None: 

157 """Unpack the package_file in the package_dir directory. 

158 Exit with an error message and error status if any error.""" 

159 

160 # -- Create the unpacker. 

161 operation = FileUnpacker(package_file, package_dir) 

162 

163 # -- Perform the operation. 

164 ok = operation.start() 

165 

166 # -- Exit if error. 

167 if not ok: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 cerror(f"Failed to unpack package file {package_file}") 

169 sys.exit(1) 

170 

171 

172def _delete_package_dir( 

173 packages_ctx: PackagesContext, package_name: str, verbose: bool 

174) -> bool: 

175 """Delete the directory of the package with given name. Returns 

176 True if the packages existed. Exits with an error message on error.""" 

177 package_dir = packages_ctx.packages_dir / package_name 

178 

179 dir_found = package_dir.is_dir() 

180 if dir_found: 

181 if verbose: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 cout(f"Deleting {str(package_dir)}") 

183 

184 # -- Sanity check the path and delete. 

185 assert "packages" in str(package_dir).lower(), package_dir 

186 shutil.rmtree(package_dir) 

187 

188 if package_dir.exists(): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 cerror(f"Directory deletion failed: {str(package_dir.absolute())}") 

190 sys.exit(1) 

191 

192 return dir_found 

193 

194 

195def scan_and_fix_packages(packages_ctx: PackagesContext) -> bool: 

196 """Scan the packages and fix if there are errors. Returns true 

197 if the packages are installed ok.""" 

198 

199 # -- Scan the packages. 

200 scan = scan_packages(packages_ctx) 

201 

202 # -- If there are fixable errors, fix them. 

203 if scan.num_errors_to_fix() > 0: 

204 _fix_packages(packages_ctx, scan) 

205 

206 # -- Return a flag that indicates if all packages are installed ok. We 

207 # -- use a scan from before the fixing but the fixing does not touch 

208 # -- installed ok packages. 

209 return scan.packages_installed_ok() 

210 

211 

212def install_missing_packages_on_the_fly( 

213 packages_ctx: PackagesContext, verbose=False 

214) -> None: 

215 """Install on the fly any missing packages. Does not print a thing if 

216 all packages are already ok. This function is intended for on demand 

217 package fetching by commands such as apio build, and thus is allowed 

218 to use fetched remote config instead of fetching a fresh one.""" 

219 

220 # -- Scan and fix broken package. 

221 # -- Since this is a on-the-fly operation, we don't require a fresh 

222 # -- remote config file for required packages versions. 

223 installed_ok = scan_and_fix_packages(packages_ctx) 

224 

225 # -- If all the packages are installed, we are done. 

226 if installed_ok: 226 ↛ 234line 226 didn't jump to line 234 because the condition on line 226 was always true

227 return 

228 

229 # -- Here when we need to install some packages. Since we just fixed 

230 # -- we can't have broken or packages with version mismatch, just 

231 # -- installed ok, and not installed. 

232 # -- 

233 # -- Get lists of installed and required packages. 

234 installed_packages = packages_ctx.profile.installed_packages 

235 required_packages_names = packages_ctx.required_packages.keys() 

236 

237 # -- Install any required package that is not installed. 

238 for package_name in required_packages_names: 

239 if package_name not in installed_packages: 

240 install_package( 

241 packages_ctx, 

242 package_name=package_name, 

243 force_reinstall=False, 

244 verbose=verbose, 

245 ) 

246 

247 # -- Here all packages should be ok but we check again just in case. 

248 scan_results = scan_packages(packages_ctx) 

249 if not scan_results.is_all_ok(): 

250 cout( 

251 "Warning: packages issues detected. Use " 

252 "'apio packages list' to investigate.", 

253 style=WARNING, 

254 ) 

255 

256 

257def install_package( 

258 packages_ctx: PackagesContext, 

259 *, 

260 package_name: str, 

261 force_reinstall: bool, 

262 verbose: bool, 

263) -> None: 

264 """Install a given package. 

265 

266 'packages_ctx' is the context object of this apio invocation. 

267 'package_name' is the package name, e.g. 'examples' or 'oss-cad-suite'. 

268 'force' indicates if to perform the installation even if a matching 

269 package is already installed. 

270 'explicit' indicates that the user specified the package name(s) explicitly 

271 and thus expect more feedback in case of a 'no change' 

272 'verbose' indicates if to print extra information. 

273 

274 Returns normally if no error, exits the program with an error status 

275 and a user message if an error is detected. 

276 """ 

277 

278 # -- Caller is responsible to check check that package name is valid 

279 # -- on this platform. 

280 assert package_name in packages_ctx.required_packages, package_name 

281 

282 # -- Set up installation announcement 

283 pending_announcement = cstyle( 

284 f"Installing apio package '{package_name}'", style=EMPH3 

285 ) 

286 

287 # -- If in chatty mode, announce now and clear. Otherwise we will 

288 # -- announce later only if actually installing. 

289 if verbose: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 cout(pending_announcement) 

291 pending_announcement = None 

292 

293 # -- Get package remote config from the cache. Caller can refresh the 

294 # -- cache with the latest remote config if desired. 

295 package_config: PackageRemoteConfig = ( 

296 packages_ctx.profile.get_package_config(package_name) 

297 ) 

298 

299 # -- Get the version we should have. 

300 target_version = package_config.release_version 

301 

302 # -- If not forcing and the target version already installed then 

303 # -- nothing to do and we leave quietly. 

304 if not force_reinstall: 

305 # -- Get the version of the installed package, None if not installed. 

306 installed_version, package_platform_id = ( 

307 packages_ctx.profile.get_installed_package_info(package_name) 

308 ) 

309 

310 if verbose: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 cout( 

312 f"Installed version: {installed_version} " 

313 f"({package_platform_id})" 

314 ) 

315 

316 # -- If the installed and the target versions are the same then 

317 # -- nothing to do. 

318 if ( 

319 target_version == installed_version 

320 and package_platform_id == packages_ctx.platform_id 

321 ): 

322 if verbose: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 cout( 

324 f"Version {target_version} ({package_platform_id}) " 

325 "already installed", 

326 style=SUCCESS, 

327 ) 

328 return 

329 

330 # -- Here we need to fetch and install so can be more chatty. 

331 

332 # -- Here we actually do the work. Announce if we haven't done it yet. 

333 if pending_announcement: 333 ↛ 337line 333 didn't jump to line 337 because the condition on line 333 was always true

334 cout(pending_announcement) 

335 pending_announcement = True 

336 

337 cout(f"Fetching version {target_version} ({packages_ctx.platform_id})") 

338 

339 # -- Construct the download URL. 

340 download_url = _construct_package_download_url( 

341 packages_ctx, target_version, package_config 

342 ) 

343 if verbose: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true

344 cout(f"Download URL: {download_url}") 

345 

346 # -- Prepare the packages directory. 

347 packages_ctx.packages_dir.mkdir(exist_ok=True) 

348 

349 # -- Prepare the package directory. 

350 # package_dir = packages_ctx.get_package_dir(package_name) 

351 package_dir = packages_ctx.packages_dir / package_name 

352 cout(f"Package dir: {package_dir}") 

353 

354 # -- Download the package file from the remote server. 

355 local_package_file = _download_package_file( 

356 download_url, packages_ctx.packages_dir 

357 ) 

358 if verbose: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 cout(f"Local package file: {local_package_file}") 

360 

361 # -- Delete the old package dir, if exists, to avoid name conflicts and 

362 # -- left over files. 

363 _delete_package_dir(packages_ctx, package_name, verbose) 

364 

365 # -- Unpack the package. This creates a new package dir. 

366 _unpack_package_file(local_package_file, package_dir) 

367 

368 # -- Remove the package file. We don't need it anymore. 

369 if verbose: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 cout(f"Deleting package file {local_package_file}") 

371 local_package_file.unlink() 

372 

373 # -- Add package to profile and save. 

374 packages_ctx.profile.add_package( 

375 package_name, target_version, packages_ctx.platform_id, download_url 

376 ) 

377 # packages_ctx.profile.save() 

378 

379 # -- Inform the user! 

380 cout(f"Package '{package_name}' installed successfully", style=SUCCESS) 

381 

382 

383def _fix_packages( 

384 packages_ctx: PackagesContext, scan: "PackageScanResults" 

385) -> None: 

386 """If the package scan result contains errors, fix them.""" 

387 

388 for package_name in scan.bad_version_package_names: 388 ↛ 389line 388 didn't jump to line 389 because the loop on line 388 never started

389 cout(f"Uninstalling incompatible version of '{package_name}'") 

390 _delete_package_dir(packages_ctx, package_name, verbose=False) 

391 packages_ctx.profile.remove_package(package_name) 

392 

393 for package_name in scan.broken_package_names: 

394 cout(f"Uninstalling broken package '{package_name}'") 

395 _delete_package_dir(packages_ctx, package_name, verbose=False) 

396 packages_ctx.profile.remove_package(package_name) 

397 

398 for package_name in scan.orphan_package_names: 398 ↛ 399line 398 didn't jump to line 399 because the loop on line 398 never started

399 cout(f"Uninstalling unknown package '{package_name}'") 

400 packages_ctx.profile.remove_package(package_name) 

401 

402 for dir_name in scan.orphan_dir_names: 

403 cout(f"Deleting unknown package dir '{dir_name}'") 

404 # -- Sanity check. Since packages_ctx.packages_dir is guaranteed to 

405 # -- include the word packages, this can fail only due to programming 

406 # -- error. 

407 dir_path = packages_ctx.packages_dir / dir_name 

408 assert "packages" in str(dir_path).lower(), dir_path 

409 # -- Delete. 

410 shutil.rmtree(dir_path) 

411 

412 for file_name in scan.orphan_file_names: 412 ↛ 413line 412 didn't jump to line 413 because the loop on line 412 never started

413 cout(f"Deleting unknown package file '{file_name}'") 

414 # -- Sanity check. Since packages_ctx.packages_dir is guaranteed to 

415 # -- include the word packages, this can fail only due to programming 

416 # -- error. 

417 file_path = packages_ctx.packages_dir / file_name 

418 assert "packages" in str(file_path).lower(), dir_path 

419 # -- Delete. 

420 file_path.unlink() 

421 

422 

423@dataclass 

424class PackageScanResults: 

425 """Represents results of packages scan.""" 

426 

427 # -- Normal and Error. Packages in required_packages that are installed 

428 # -- regardless if the version matches or not. 

429 installed_ok_package_names: List[str] 

430 # -- Error. Packages in required_packages that are installed but with 

431 # -- version mismatch. 

432 bad_version_package_names: List[str] 

433 # -- Normal. Packages in required_packages that are uninstalled properly. 

434 uninstalled_package_names: List[str] 

435 # -- Error. Packages in required_packages with broken installation. E.g, 

436 # -- registered in profile but package directory is missing. 

437 broken_package_names: List[str] 

438 # -- Error. Packages that are marked in profile as registered but are not 

439 # -- in required_packages. 

440 orphan_package_names: List[str] 

441 # -- Error. Basenames of directories in packages dir that don't match 

442 # -- folder_name of packages in required_packages. 

443 orphan_dir_names: List[str] 

444 # -- Error. Basenames of all files in packages directory. That directory is 

445 # -- expected to contain only directories for packages.a 

446 orphan_file_names: List[str] 

447 

448 def packages_installed_ok(self) -> bool: 

449 """Returns true if all packages are installed ok, regardless of 

450 other fixable errors.""" 

451 return ( 

452 len(self.bad_version_package_names) == 0 

453 and len(self.uninstalled_package_names) == 0 

454 and len(self.broken_package_names) == 0 

455 ) 

456 

457 def num_errors_to_fix(self) -> bool: 

458 """Returns the number of errors that required , having a non installed 

459 packages is not considered an error that need to be fix.""" 

460 return ( 

461 len(self.bad_version_package_names) 

462 + len(self.broken_package_names) 

463 + len(self.orphan_package_names) 

464 + len(self.orphan_dir_names) 

465 + len(self.orphan_file_names) 

466 ) 

467 

468 def is_all_ok(self) -> bool: 

469 """Return True if all packages are installed properly with no 

470 issues.""" 

471 return ( 

472 not self.num_errors_to_fix() and not self.uninstalled_package_names 

473 ) 

474 

475 def dump(self): 

476 """Dump the content of this object. For debugging.""" 

477 cout() 

478 cout("Package scan results:") 

479 cout(f" Installed {self.installed_ok_package_names}") 

480 cout(f" bad version {self.bad_version_package_names}") 

481 cout(f" Uninstalled {self.uninstalled_package_names}") 

482 cout(f" Broken {self.broken_package_names}") 

483 cout(f" Orphan ids {self.orphan_package_names}") 

484 cout(f" Orphan dirs {self.orphan_dir_names}") 

485 cout(f" Orphan files {self.orphan_file_names}") 

486 

487 

488def package_version_ok( 

489 packages_ctx: PackagesContext, 

490 package_name: str, 

491) -> bool: 

492 """Return true if the package is both in profile and required packages 

493 and its version in the profile meet the requirements in the 

494 config.jsonc file. Otherwise return false.""" 

495 

496 # If this package is not applicable to this platform, return False. 

497 if package_name not in packages_ctx.required_packages: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true

498 return False 

499 

500 # -- If the current version is not available, the package is not installed. 

501 current_ver, package_platform_id = ( 

502 packages_ctx.profile.get_installed_package_info(package_name) 

503 ) 

504 if not current_ver or package_platform_id != packages_ctx.platform_id: 

505 return False 

506 

507 # -- Get the package remote config. 

508 package_config: PackageRemoteConfig = ( 

509 packages_ctx.profile.get_package_config(package_name) 

510 ) 

511 

512 # -- Compare to the required version. We expect the two version to be 

513 # -- normalized and ths a string comparison is sufficient. 

514 return current_ver == package_config.release_version 

515 

516 

517def scan_packages(packages_ctx: PackagesContext) -> PackageScanResults: 

518 """Scans the available and installed packages and returns 

519 the findings as a PackageScanResults object.""" 

520 

521 # pylint: disable=too-many-branches 

522 

523 assert isinstance(packages_ctx, PackagesContext) 

524 

525 # Initialize the result with empty data. 

526 result = PackageScanResults([], [], [], [], [], [], []) 

527 

528 # -- A helper set that we populate with the 'folder_name' values of the 

529 # -- all the packages for this platform. 

530 platform_folder_names = set() 

531 

532 # -- Scan packages ids in required_packages and populate 

533 # -- the installed/uninstall/broken packages lists. 

534 for package_name in packages_ctx.required_packages.keys(): 

535 # -- Collect package's folder names in a set. For a later use. 

536 platform_folder_names.add(package_name) 

537 

538 # -- Classify the package as one of four cases. 

539 in_profile = package_name in packages_ctx.profile.installed_packages 

540 # has_dir = packages_ctx.get_package_dir(package_name).is_dir() 

541 package_dir = packages_ctx.packages_dir / package_name 

542 has_dir = package_dir.is_dir() 

543 version_ok = package_version_ok(packages_ctx, package_name) 

544 if in_profile and has_dir: 

545 if version_ok: 545 ↛ 550line 545 didn't jump to line 550 because the condition on line 545 was always true

546 # Case 1: Package installed ok. 

547 result.installed_ok_package_names.append(package_name) 

548 else: 

549 # -- Case 2: Package installed but version mismatch. 

550 result.bad_version_package_names.append(package_name) 

551 elif not in_profile and not has_dir: 

552 # -- Case 3: Package not installed. 

553 result.uninstalled_package_names.append(package_name) 

554 else: 

555 # -- Case 4: Package is broken. 

556 result.broken_package_names.append(package_name) 

557 

558 # -- Scan the packages ids that are registered in profile as installed 

559 # -- the ones that are not required_packages as orphans. 

560 for package_name in packages_ctx.profile.installed_packages: 

561 if package_name not in packages_ctx.required_packages: 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true

562 result.orphan_package_names.append(package_name) 

563 

564 # -- Scan the packages directory and identify orphan dirs and files. 

565 for path in packages_ctx.packages_dir.glob("*"): 

566 base_name = os.path.basename(path) 

567 if path.is_dir(): 

568 if base_name not in platform_folder_names: 

569 result.orphan_dir_names.append(base_name) 

570 else: 

571 # -- Skip the packages installed file, so we don't consider it as 

572 # -- an orphan file. 

573 # TODO Make this a const. 

574 if base_name == "installed_packages.json": 574 ↛ 576line 574 didn't jump to line 576 because the condition on line 574 was always true

575 continue 

576 result.orphan_file_names.append(base_name) 

577 

578 # -- Return results 

579 if util.is_debug(1): 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true

580 result.dump() 

581 

582 return result