Coverage for apio / managers / packages.py: 69%

203 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-24 01:53 +0000

1# -*- coding: utf-8 -*- 

2# -- This file is part of the Apio project 

3# -- (C) 2016-2021 FPGAwars 

4# -- Author Jesús Arroyo 

5# -- License GPLv2 

6"""Package install/uninstall functionality. 

7Used by the 'apio packages' command. 

8""" 

9 

10import sys 

11import os 

12from dataclasses import dataclass 

13from typing import Dict, List 

14from pathlib import Path 

15import shutil 

16from apio.common.apio_console import cout, cerror, cstyle 

17from apio.common.apio_styles import WARNING, ERROR, SUCCESS, EMPH3 

18from apio.managers.downloader import FileDownloader 

19from apio.managers.unpacker import FileUnpacker 

20from apio.utils import util 

21from apio.profile import Profile, PackageRemoteConfig 

22 

23 

24@dataclass(frozen=True) 

25class PackagesContext: 

26 """Context for package managements operations. 

27 This class provides the information needed for package management 

28 operations. This is a subset of the information contained by ApioContext 

29 and we use it, instead of passing the ApioContext, because we need to 

30 perform package management operations (e.g. updating packages) before 

31 the ApioContext object is fully initialized. 

32 """ 

33 

34 # -- Same as ApioContext.profile 

35 profile: Profile 

36 # -- Same as ApioContext.required_packages 

37 required_packages: Dict 

38 # -- Same as ApioContext.platform_id 

39 platform_id: str 

40 # -- Same as ApioContext.packages_dir 

41 packages_dir: str 

42 

43 def __post_init__(self): 

44 """Assert that all fields initialized to actual values.""" 

45 assert self.profile 

46 assert self.required_packages 

47 assert self.platform_id 

48 assert self.packages_dir 

49 

50 

51def _construct_package_download_url( 

52 packages_ctx: PackagesContext, 

53 package_remote_config: PackageRemoteConfig, 

54) -> str: 

55 """Construct the download URL for the given package name and version.""" 

56 

57 # -- Create vars mapping. 

58 url_vars = { 

59 "${PLATFORM}": packages_ctx.platform_id, 

60 "${YYYYMMDD}": package_remote_config.release_tag.replace("-", ""), 

61 } 

62 if util.is_debug(1): 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 cout(f"Package URL vars: {url_vars}") 

64 

65 # -- Define the url parts. 

66 url_parts = [ 

67 "https://github.com/", 

68 package_remote_config.repo_organization, 

69 "/", 

70 package_remote_config.repo_name, 

71 "/releases/download/", 

72 package_remote_config.release_tag, 

73 "/", 

74 package_remote_config.release_file, 

75 ] 

76 

77 if util.is_debug(1): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true

78 cout(f"package url parts = {url_parts}") 

79 

80 # -- Concatenate the URL parts. 

81 url = "".join(url_parts) 

82 

83 if util.is_debug(1): 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 cout(f"Combined package url: {url}") 

85 

86 # -- Replace placeholders with values. 

87 for name, val in url_vars.items(): 

88 url = url.replace(name, val) 

89 

90 if util.is_debug(1): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 cout(f"Resolved package url: {url}") 

92 

93 # -- All done. 

94 return url 

95 

96 

97def _download_package_file(url: str, dir_path: Path) -> str: 

98 """Download the given file (url). Return the path of local destination 

99 file. Exits with a user message and error code if any error. 

100 

101 * INPUTS: 

102 * url: File to download 

103 * OUTPUTS: 

104 * The path of the destination file 

105 """ 

106 

107 filepath = None 

108 

109 try: 

110 # -- Object for downloading the file 

111 downloader = FileDownloader(url, dir_path) 

112 

113 # -- Get the destination path 

114 filepath = downloader.destination 

115 

116 downloader.start() 

117 

118 # -- If the user press Ctrl-C (Abort) 

119 except KeyboardInterrupt: 

120 

121 # -- Remove the file 

122 if filepath and filepath.is_file(): 

123 filepath.unlink() 

124 

125 # -- Inform the user 

126 cout("User aborted download", style=ERROR) 

127 sys.exit(1) 

128 

129 except IOError as exc: 

130 cout("I/O error while downloading", style=ERROR) 

131 cout(str(exc), style=ERROR) 

132 sys.exit(1) 

133 

134 except util.ApioException: 

135 cerror("Package not found") 

136 sys.exit(1) 

137 

138 # -- Return the destination path 

139 return filepath 

140 

141 

142def _unpack_package_file(package_file: Path, package_dir: Path) -> None: 

143 """Unpack the package_file in the package_dir directory. 

144 Exit with an error message and error status if any error.""" 

145 

146 # -- Create the unpacker. 

147 operation = FileUnpacker(package_file, package_dir) 

148 

149 # -- Perform the operation. 

150 ok = operation.start() 

151 

152 # -- Exit if error. 

153 if not ok: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 cerror(f"Failed to unpack package file {package_file}") 

155 sys.exit(1) 

156 

157 

158def _delete_package_dir( 

159 packages_ctx: PackagesContext, package_name: str, verbose: bool 

160) -> bool: 

161 """Delete the directory of the package with given name. Returns 

162 True if the packages existed. Exits with an error message on error.""" 

163 package_dir = packages_ctx.packages_dir / package_name 

164 

165 dir_found = package_dir.is_dir() 

166 if dir_found: 

167 if verbose: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 cout(f"Deleting {str(package_dir)}") 

169 

170 # -- Sanity check the path and delete. 

171 assert "packages" in str(package_dir).lower(), package_dir 

172 shutil.rmtree(package_dir) 

173 

174 if package_dir.exists(): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 cerror(f"Directory deletion failed: {str(package_dir.absolute())}") 

176 sys.exit(1) 

177 

178 return dir_found 

179 

180 

181def scan_and_fix_packages(packages_ctx: PackagesContext) -> bool: 

182 """Scan the packages and fix if there are errors. Returns true 

183 if the packages are installed ok.""" 

184 

185 # -- Scan the packages. 

186 scan = scan_packages(packages_ctx) 

187 

188 # -- If there are fixable errors, fix them. 

189 if scan.num_errors_to_fix() > 0: 

190 _fix_packages(packages_ctx, scan) 

191 

192 # -- Return a flag that indicates if all packages are installed ok. We 

193 # -- use a scan from before the fixing but the fixing does not touch 

194 # -- installed ok packages. 

195 return scan.packages_installed_ok() 

196 

197 

198def install_missing_packages_on_the_fly( 

199 packages_ctx: PackagesContext, verbose=False 

200) -> None: 

201 """Install on the fly any missing packages. Does not print a thing if 

202 all packages are already ok. This function is intended for on demand 

203 package fetching by commands such as apio build, and thus is allowed 

204 to use fetched remote config instead of fetching a fresh one.""" 

205 

206 # -- Scan and fix broken package. 

207 # -- Since this is a on-the-fly operation, we don't require a fresh 

208 # -- remote config file for required packages versions. 

209 installed_ok = scan_and_fix_packages(packages_ctx) 

210 

211 # -- If all the packages are installed, we are done. 

212 if installed_ok: 212 ↛ 220line 212 didn't jump to line 220 because the condition on line 212 was always true

213 return 

214 

215 # -- Here when we need to install some packages. Since we just fixed 

216 # -- we can't have broken or packages with version mismatch, just 

217 # -- installed ok, and not installed. 

218 # -- 

219 # -- Get lists of installed and required packages. 

220 installed_packages = packages_ctx.profile.installed_packages 

221 required_packages_names = packages_ctx.required_packages.keys() 

222 

223 # -- Install any required package that is not installed. 

224 for package_name in required_packages_names: 

225 if package_name not in installed_packages: 

226 install_package( 

227 packages_ctx, 

228 package_name=package_name, 

229 force_reinstall=False, 

230 verbose=verbose, 

231 ) 

232 

233 # -- Here all packages should be ok but we check again just in case. 

234 scan_results = scan_packages(packages_ctx) 

235 if not scan_results.is_all_ok(): 

236 cout( 

237 "Warning: packages issues detected. Use " 

238 "'apio packages list' to investigate.", 

239 style=WARNING, 

240 ) 

241 

242 

243def install_package( 

244 packages_ctx: PackagesContext, 

245 *, 

246 package_name: str, 

247 force_reinstall: bool, 

248 verbose: bool, 

249) -> None: 

250 """Install a given package. 

251 

252 'packages_ctx' is the context object of this apio invocation. 

253 'package_name' is the package name, e.g. 'examples' or 'oss-cad-suite'. 

254 'force' indicates if to perform the installation even if a matching 

255 package is already installed. 

256 'explicit' indicates that the user specified the package name(s) explicitly 

257 and thus expect more feedback in case of a 'no change' 

258 'verbose' indicates if to print extra information. 

259 

260 Returns normally if no error, exits the program with an error status 

261 and a user message if an error is detected. 

262 """ 

263 

264 # -- Caller is responsible to check check that package name is valid 

265 # -- on this platform. 

266 assert package_name in packages_ctx.required_packages, package_name 

267 

268 # -- Set up installation announcement 

269 pending_announcement = cstyle( 

270 f"Installing apio package '{package_name}'", style=EMPH3 

271 ) 

272 

273 # -- If in chatty mode, announce now and clear. Otherwise we will 

274 # -- announce later only if actually installing. 

275 if verbose: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 cout(pending_announcement) 

277 pending_announcement = None 

278 

279 # -- Get package remote config from the cache. Caller can refresh the 

280 # -- cache with the latest remote config if desired. 

281 package_config: PackageRemoteConfig = ( 

282 packages_ctx.profile.get_package_config(package_name) 

283 ) 

284 

285 # -- Get the version we should have. 

286 target_version = package_config.release_version 

287 

288 # -- If not forcing and the target version already installed then 

289 # -- nothing to do and we leave quietly. 

290 if not force_reinstall: 

291 # -- Get the version of the installed package, None if not installed. 

292 installed_version, package_platform_id = ( 

293 packages_ctx.profile.get_installed_package_info(package_name) 

294 ) 

295 

296 if verbose: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 cout( 

298 f"Installed version: {installed_version} " 

299 f"({package_platform_id})" 

300 ) 

301 

302 # -- If the installed and the target versions are the same then 

303 # -- nothing to do. 

304 if ( 

305 target_version == installed_version 

306 and package_platform_id == packages_ctx.platform_id 

307 ): 

308 if verbose: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 cout( 

310 f"Version {target_version} ({package_platform_id}) " 

311 "already installed", 

312 style=SUCCESS, 

313 ) 

314 return 

315 

316 # -- Here we need to fetch and install so can be more chatty. 

317 

318 # -- Here we actually do the work. Announce if we haven't done it yet. 

319 if pending_announcement: 319 ↛ 323line 319 didn't jump to line 323 because the condition on line 319 was always true

320 cout(pending_announcement) 

321 pending_announcement = True 

322 

323 cout(f"Fetching version {target_version} ({packages_ctx.platform_id})") 

324 

325 # -- Construct the download URL. 

326 download_url = _construct_package_download_url( 

327 packages_ctx, package_config 

328 ) 

329 if verbose: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 cout(f"Download URL: {download_url}") 

331 

332 # -- Prepare the packages directory. 

333 packages_ctx.packages_dir.mkdir(exist_ok=True) 

334 

335 # -- Prepare the package directory. 

336 # package_dir = packages_ctx.get_package_dir(package_name) 

337 package_dir = packages_ctx.packages_dir / package_name 

338 cout(f"Package dir: {package_dir}") 

339 

340 # -- Download the package file from the remote server. 

341 local_package_file = _download_package_file( 

342 download_url, packages_ctx.packages_dir 

343 ) 

344 if verbose: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true

345 cout(f"Local package file: {local_package_file}") 

346 

347 # -- Delete the old package dir, if exists, to avoid name conflicts and 

348 # -- left over files. 

349 _delete_package_dir(packages_ctx, package_name, verbose) 

350 

351 # -- Unpack the package. This creates a new package dir. 

352 _unpack_package_file(local_package_file, package_dir) 

353 

354 # -- Remove the package file. We don't need it anymore. 

355 if verbose: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 cout(f"Deleting package file {local_package_file}") 

357 local_package_file.unlink() 

358 

359 # -- Add package to profile and save. 

360 packages_ctx.profile.add_package( 

361 package_name, target_version, packages_ctx.platform_id, download_url 

362 ) 

363 # packages_ctx.profile.save() 

364 

365 # -- Inform the user! 

366 cout(f"Package '{package_name}' installed successfully", style=SUCCESS) 

367 

368 

369def _fix_packages( 

370 packages_ctx: PackagesContext, scan: "PackageScanResults" 

371) -> None: 

372 """If the package scan result contains errors, fix them.""" 

373 

374 for package_name in scan.bad_version_package_names: 374 ↛ 375line 374 didn't jump to line 375 because the loop on line 374 never started

375 cout(f"Uninstalling incompatible version of '{package_name}'") 

376 _delete_package_dir(packages_ctx, package_name, verbose=False) 

377 packages_ctx.profile.remove_package(package_name) 

378 

379 for package_name in scan.broken_package_names: 

380 cout(f"Uninstalling broken package '{package_name}'") 

381 _delete_package_dir(packages_ctx, package_name, verbose=False) 

382 packages_ctx.profile.remove_package(package_name) 

383 

384 for package_name in scan.orphan_package_names: 384 ↛ 385line 384 didn't jump to line 385 because the loop on line 384 never started

385 cout(f"Uninstalling unknown package '{package_name}'") 

386 packages_ctx.profile.remove_package(package_name) 

387 

388 for dir_name in scan.orphan_dir_names: 

389 cout(f"Deleting unknown package dir '{dir_name}'") 

390 # -- Sanity check. Since packages_ctx.packages_dir is guaranteed to 

391 # -- include the word packages, this can fail only due to programming 

392 # -- error. 

393 dir_path = packages_ctx.packages_dir / dir_name 

394 assert "packages" in str(dir_path).lower(), dir_path 

395 # -- Delete. 

396 shutil.rmtree(dir_path) 

397 

398 for file_name in scan.orphan_file_names: 398 ↛ 399line 398 didn't jump to line 399 because the loop on line 398 never started

399 cout(f"Deleting unknown package file '{file_name}'") 

400 # -- Sanity check. Since packages_ctx.packages_dir is guaranteed to 

401 # -- include the word packages, this can fail only due to programming 

402 # -- error. 

403 file_path = packages_ctx.packages_dir / file_name 

404 assert "packages" in str(file_path).lower(), dir_path 

405 # -- Delete. 

406 file_path.unlink() 

407 

408 

409@dataclass 

410class PackageScanResults: 

411 """Represents results of packages scan.""" 

412 

413 # -- Normal and Error. Packages in required_packages that are installed 

414 # -- regardless if the version matches or not. 

415 installed_ok_package_names: List[str] 

416 # -- Error. Packages in required_packages that are installed but with 

417 # -- version mismatch. 

418 bad_version_package_names: List[str] 

419 # -- Normal. Packages in required_packages that are uninstalled properly. 

420 uninstalled_package_names: List[str] 

421 # -- Error. Packages in required_packages with broken installation. E.g, 

422 # -- registered in profile but package directory is missing. 

423 broken_package_names: List[str] 

424 # -- Error. Packages that are marked in profile as registered but are not 

425 # -- in required_packages. 

426 orphan_package_names: List[str] 

427 # -- Error. Basenames of directories in packages dir that don't match 

428 # -- folder_name of packages in required_packages. 

429 orphan_dir_names: List[str] 

430 # -- Error. Basenames of all files in packages directory. That directory is 

431 # -- expected to contain only directories for packages.a 

432 orphan_file_names: List[str] 

433 

434 def packages_installed_ok(self) -> bool: 

435 """Returns true if all packages are installed ok, regardless of 

436 other fixable errors.""" 

437 return ( 

438 len(self.bad_version_package_names) == 0 

439 and len(self.uninstalled_package_names) == 0 

440 and len(self.broken_package_names) == 0 

441 ) 

442 

443 def num_errors_to_fix(self) -> bool: 

444 """Returns the number of errors that required , having a non installed 

445 packages is not considered an error that need to be fix.""" 

446 return ( 

447 len(self.bad_version_package_names) 

448 + len(self.broken_package_names) 

449 + len(self.orphan_package_names) 

450 + len(self.orphan_dir_names) 

451 + len(self.orphan_file_names) 

452 ) 

453 

454 def is_all_ok(self) -> bool: 

455 """Return True if all packages are installed properly with no 

456 issues.""" 

457 return ( 

458 not self.num_errors_to_fix() and not self.uninstalled_package_names 

459 ) 

460 

461 def dump(self): 

462 """Dump the content of this object. For debugging.""" 

463 cout() 

464 cout("Package scan results:") 

465 cout(f" Installed {self.installed_ok_package_names}") 

466 cout(f" bad version {self.bad_version_package_names}") 

467 cout(f" Uninstalled {self.uninstalled_package_names}") 

468 cout(f" Broken {self.broken_package_names}") 

469 cout(f" Orphan ids {self.orphan_package_names}") 

470 cout(f" Orphan dirs {self.orphan_dir_names}") 

471 cout(f" Orphan files {self.orphan_file_names}") 

472 

473 

474def package_version_ok( 

475 packages_ctx: PackagesContext, 

476 package_name: str, 

477) -> bool: 

478 """Return true if the package is both in profile and required packages 

479 and its version in the profile meet the requirements in the 

480 config.jsonc file. Otherwise return false.""" 

481 

482 # If this package is not applicable to this platform, return False. 

483 if package_name not in packages_ctx.required_packages: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 return False 

485 

486 # -- If the current version is not available, the package is not installed. 

487 current_ver, package_platform_id = ( 

488 packages_ctx.profile.get_installed_package_info(package_name) 

489 ) 

490 if not current_ver or package_platform_id != packages_ctx.platform_id: 

491 return False 

492 

493 # -- Get the package remote config. 

494 package_config: PackageRemoteConfig = ( 

495 packages_ctx.profile.get_package_config(package_name) 

496 ) 

497 

498 # -- Compare to the required version. We expect the two version to be 

499 # -- normalized and ths a string comparison is sufficient. 

500 return current_ver == package_config.release_version 

501 

502 

503def scan_packages(packages_ctx: PackagesContext) -> PackageScanResults: 

504 """Scans the available and installed packages and returns 

505 the findings as a PackageScanResults object.""" 

506 

507 # pylint: disable=too-many-branches 

508 

509 assert isinstance(packages_ctx, PackagesContext) 

510 

511 # Initialize the result with empty data. 

512 result = PackageScanResults([], [], [], [], [], [], []) 

513 

514 # -- A helper set that we populate with the 'folder_name' values of the 

515 # -- all the packages for this platform. 

516 platform_folder_names = set() 

517 

518 # -- Scan packages ids in required_packages and populate 

519 # -- the installed/uninstall/broken packages lists. 

520 for package_name in packages_ctx.required_packages.keys(): 

521 # -- Collect package's folder names in a set. For a later use. 

522 platform_folder_names.add(package_name) 

523 

524 # -- Classify the package as one of four cases. 

525 in_profile = package_name in packages_ctx.profile.installed_packages 

526 # has_dir = packages_ctx.get_package_dir(package_name).is_dir() 

527 package_dir = packages_ctx.packages_dir / package_name 

528 has_dir = package_dir.is_dir() 

529 version_ok = package_version_ok(packages_ctx, package_name) 

530 if in_profile and has_dir: 

531 if version_ok: 531 ↛ 536line 531 didn't jump to line 536 because the condition on line 531 was always true

532 # Case 1: Package installed ok. 

533 result.installed_ok_package_names.append(package_name) 

534 else: 

535 # -- Case 2: Package installed but version mismatch. 

536 result.bad_version_package_names.append(package_name) 

537 elif not in_profile and not has_dir: 

538 # -- Case 3: Package not installed. 

539 result.uninstalled_package_names.append(package_name) 

540 else: 

541 # -- Case 4: Package is broken. 

542 result.broken_package_names.append(package_name) 

543 

544 # -- Scan the packages ids that are registered in profile as installed 

545 # -- the ones that are not required_packages as orphans. 

546 for package_name in packages_ctx.profile.installed_packages: 

547 if package_name not in packages_ctx.required_packages: 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true

548 result.orphan_package_names.append(package_name) 

549 

550 # -- Scan the packages directory and identify orphan dirs and files. 

551 for path in packages_ctx.packages_dir.glob("*"): 

552 base_name = os.path.basename(path) 

553 if path.is_dir(): 

554 if base_name not in platform_folder_names: 

555 result.orphan_dir_names.append(base_name) 

556 else: 

557 # -- Skip the packages installed file, so we don't consider it as 

558 # -- an orphan file. 

559 # TODO Make this a const. 

560 if base_name == "installed_packages.json": 560 ↛ 562line 560 didn't jump to line 562 because the condition on line 560 was always true

561 continue 

562 result.orphan_file_names.append(base_name) 

563 

564 # -- Return results 

565 if util.is_debug(1): 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true

566 result.dump() 

567 

568 return result