Coverage for britney2/inputs/suiteloader.py: 92%

271 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-03-23 07:34 +0000

1import logging 

2import optparse 

3import os 

4import sys 

5from abc import abstractmethod 

6from collections.abc import Callable 

7from typing import Any, Literal, Optional, TypeVar, overload 

8from collections.abc import Iterable 

9 

10import apt_pkg 

11 

12from britney2 import ( 

13 BinaryPackage, 

14 BinaryPackageId, 

15 PackageId, 

16 SourcePackage, 

17 Suite, 

18 SuiteClass, 

19 Suites, 

20 TargetSuite, 

21) 

22from britney2.utils import ( 

23 create_provides_map, 

24 parse_builtusing, 

25 parse_provides, 

26 possibly_compressed, 

27 read_release_file, 

28 read_sources_file, 

29) 

30 

31 

32class MissingRequiredConfigurationError(RuntimeError): 

33 pass 

34 

35 

36_T = TypeVar("_T") 

37 

38 

39class SuiteContentLoader(object): 

40 def __init__(self, base_config: optparse.Values) -> None: 

41 self._base_config = base_config 

42 self._architectures: list[str] = SuiteContentLoader.config_str_as_list( 

43 base_config.architectures 

44 ) 

45 self._nobreakall_arches: list[str] = SuiteContentLoader.config_str_as_list( 

46 base_config.nobreakall_arches, [] 

47 ) 

48 self._outofsync_arches: list[str] = SuiteContentLoader.config_str_as_list( 

49 base_config.outofsync_arches, [] 

50 ) 

51 self._break_arches: list[str] = SuiteContentLoader.config_str_as_list( 

52 base_config.break_arches, [] 

53 ) 

54 self._new_arches: list[str] = SuiteContentLoader.config_str_as_list( 

55 base_config.new_arches, [] 

56 ) 

57 self._components: list[str] = [] 

58 self._all_binaries: dict[BinaryPackageId, BinaryPackage] = {} 

59 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

60 self.logger = logging.getLogger(logger_name) 

61 

62 @overload 

63 @staticmethod 

64 def config_str_as_list(value: Literal[None], default_value: _T) -> _T: ... 64 ↛ exitline 64 didn't return from function 'config_str_as_list'

65 

66 @overload 

67 @staticmethod 

68 def config_str_as_list(value: str, default_value: Any) -> list[str]: ... 68 ↛ exitline 68 didn't return from function 'config_str_as_list'

69 

70 @overload 

71 @staticmethod 

72 def config_str_as_list(value: Any, default_value: Optional[Any] = None) -> Any: ... 72 ↛ exitline 72 didn't return from function 'config_str_as_list'

73 

74 @staticmethod 

75 def config_str_as_list(value: Any, default_value: Optional[Any] = None) -> Any: 

76 if value is None: 

77 return default_value 

78 if isinstance(value, str): 78 ↛ 80line 78 didn't jump to line 80, because the condition on line 78 was never false

79 return value.split() 

80 return value 

81 

82 @property 

83 def architectures(self) -> list[str]: 

84 return self._architectures 

85 

86 @property 

87 def nobreakall_arches(self) -> list[str]: 

88 return self._nobreakall_arches 

89 

90 @property 

91 def outofsync_arches(self) -> list[str]: 

92 return self._outofsync_arches 

93 

94 @property 

95 def break_arches(self) -> list[str]: 

96 return self._break_arches 

97 

98 @property 

99 def new_arches(self) -> list[str]: 

100 return self._new_arches 

101 

102 @property 

103 def components(self) -> list[str]: 

104 return self._components 

105 

106 def all_binaries(self) -> dict[BinaryPackageId, BinaryPackage]: 

107 return self._all_binaries 

108 

109 @abstractmethod 

110 def load_suites(self) -> Suites: # pragma: no cover 

111 pass 

112 

113 

114class DebMirrorLikeSuiteContentLoader(SuiteContentLoader): 

115 CHECK_FIELDS = [ 

116 "source", 

117 "source_version", 

118 "architecture", 

119 "multi_arch", 

120 "depends", 

121 "conflicts", 

122 "provides", 

123 ] 

124 

125 def load_suites(self) -> Suites: 

126 suites = [] 

127 missing_config_msg = ( 

128 "Configuration %s is not set in the config (and cannot be auto-detected)" 

129 ) 

130 for suitename in ("testing", "unstable", "pu", "tpu"): 

131 suffix = suitename if suitename in {"pu", "tpu"} else "" 

132 if hasattr(self._base_config, suitename): 

133 suite_path = getattr(self._base_config, suitename) 

134 suite_class = SuiteClass.TARGET_SUITE 

135 if suitename != "testing": 

136 suite_class = ( 

137 SuiteClass.ADDITIONAL_SOURCE_SUITE 

138 if suffix 

139 else SuiteClass.PRIMARY_SOURCE_SUITE 

140 ) 

141 suites.append( 

142 Suite( 

143 suite_class, suitename, suite_path, suite_short_name=suffix 

144 ) 

145 ) 

146 else: 

147 target_suite = TargetSuite( 

148 suite_class, suitename, suite_path, suite_short_name=suffix 

149 ) 

150 else: 

151 if suitename in {"testing", "unstable"}: # pragma: no cover 

152 self.logger.error(missing_config_msg, suitename.upper()) 

153 raise MissingRequiredConfigurationError( 

154 missing_config_msg % suitename.upper() 

155 ) 

156 self.logger.info( 

157 "Optional suite %s is not defined (config option: %s) ", 

158 suitename, 

159 suitename.upper(), 

160 ) 

161 

162 assert target_suite is not None, "Logic regression, this should be impossible." 

163 

164 self._check_release_file(target_suite, missing_config_msg) 

165 self._setup_architectures() 

166 

167 # read the source and binary packages for the involved distributions. Notes: 

168 # - Load testing last as some live-data tests have more complete information in 

169 # unstable 

170 # - Load all sources before any of the binaries. 

171 for suite in [target_suite, *suites]: 

172 sources = self._read_sources(suite.path) 

173 self._update_suite_name(suite) 

174 suite.sources = sources 

175 (suite.binaries, suite.provides_table) = self._read_binaries( 

176 suite, self._architectures 

177 ) 

178 self._fixup_faux_arch_all_binaries(suite) 

179 

180 return Suites(target_suite, suites) 

181 

182 def _fixup_faux_arch_all_binaries(self, suite: Suite) -> None: 

183 """remove faux arch:all binary if a real arch:all binary is available 

184 

185 We don't know for which architectures bin/$something must be available 

186 except for arch:all, which should be available in each arch. The 

187 information that a source builds an arch:all binary is available during 

188 the loading of the sources, but we have to pick an order in which to 

189 load the files and the Sources is loaded before the Packages are 

190 read. Hence we fake an arch:all binary during source loading, but it 

191 shouldn't be there in the final list if real arch:all binaries are 

192 present in the Packages file. 

193 

194 Also, if we keep the fake binary, it should be added to the lists of 

195 known binaries in the suite, otherwise britney2 trips later on. 

196 

197 """ 

198 

199 all_binaries = self._all_binaries 

200 binaries = suite.binaries 

201 faux_arches = ( 

202 set(self.architectures) 

203 - set(self.break_arches) 

204 - set(self.outofsync_arches) 

205 - set(self.new_arches) 

206 ) 

207 

208 for srcpkg in suite.sources.values(): 

209 faux = {x for x in srcpkg.binaries if x[2] == "faux"} 

210 if faux and [ 

211 x 

212 for x in (srcpkg.binaries - faux) 

213 if all_binaries[x].architecture == "all" 

214 ]: 

215 srcpkg.binaries -= faux 

216 

217 # Calculate again because we may have changed the set 

218 faux = {x for x in srcpkg.binaries if x[2] == "faux"} 

219 for binpkg_id in faux: 

220 bin_data = BinaryPackage( 

221 binpkg_id[1], 

222 sys.intern("faux"), 

223 srcpkg.source, 

224 srcpkg.version, 

225 "all", 

226 "no", 

227 None, 

228 None, 

229 [], 

230 False, 

231 binpkg_id, 

232 [], 

233 ) 

234 for arch_all in faux_arches: 

235 binaries[arch_all][binpkg_id[0]] = bin_data 

236 all_binaries[binpkg_id] = bin_data 

237 suite.binaries = binaries 

238 

239 def _setup_architectures(self) -> None: 

240 allarches = self._architectures 

241 # Re-order the architectures such as that the most important architectures are listed first 

242 # (this is to make the log easier to read as most important architectures will be listed 

243 # first) 

244 arches = [x for x in allarches if x in self._nobreakall_arches] 

245 arches += [ 

246 x for x in allarches if x not in arches and x not in self._outofsync_arches 

247 ] 

248 arches += [ 

249 x for x in allarches if x not in arches and x not in self._break_arches 

250 ] 

251 arches += [ 

252 x for x in allarches if x not in arches and x not in self._new_arches 

253 ] 

254 arches += [x for x in allarches if x not in arches] 

255 

256 # Intern architectures for efficiency; items in this list will be used for lookups and 

257 # building items/keys - by intern strings we reduce memory (considerably). 

258 self._architectures = [sys.intern(arch) for arch in allarches] 

259 assert "all" not in self._architectures, "all not allowed in architectures" 

260 

261 def _get_suite_name( 

262 self, suite: Suite, release_file: "apt_pkg.TagSection[str]" 

263 ) -> tuple[str, str]: 

264 name = None 

265 codename = None 

266 if "Suite" in release_file: 266 ↛ 268line 266 didn't jump to line 268, because the condition on line 266 was never false

267 name = release_file["Suite"] 

268 if "Codename" in release_file: 

269 codename = release_file["Codename"] 

270 

271 if name is None: 271 ↛ 272line 271 didn't jump to line 272, because the condition on line 271 was never true

272 name = codename 

273 elif codename is None: 

274 codename = name 

275 

276 if name is None: 276 ↛ 277line 276 didn't jump to line 277, because the condition on line 276 was never true

277 self.logger.warning( 

278 'Either of the fields "Suite" or "Codename" ' 

279 + "should be present in a release file." 

280 ) 

281 self.logger.error( 

282 "Release file for suite %s is missing both the " 

283 + '"Suite" and the "Codename" fields.', 

284 suite.name, 

285 ) 

286 raise KeyError("Suite") 

287 

288 assert codename is not None # required for type checking 

289 return (name, codename) 

290 

291 def _update_suite_name(self, suite: Suite) -> None: 

292 try: 

293 release_file = read_release_file(suite.path) 

294 except FileNotFoundError: 

295 self.logger.info( 

296 "The %s suite does not have a Release file, unable to update the name", 

297 suite.name, 

298 ) 

299 release_file = None 

300 

301 if release_file is not None: 

302 (suite.name, suite.codename) = self._get_suite_name(suite, release_file) 

303 self.logger.info("Using suite name from Release file: %s", suite.name) 

304 self.logger.debug( 

305 "Using suite codename from Release file: %s", suite.codename 

306 ) 

307 

308 def _check_release_file(self, target_suite: Suite, missing_config_msg: str) -> None: 

309 try: 

310 release_file = read_release_file(target_suite.path) 

311 self.logger.info( 

312 "Found a Release file in %s - using that for defaults", 

313 target_suite.name, 

314 ) 

315 except FileNotFoundError: 

316 self.logger.info( 

317 "The %s suite does not have a Release file.", target_suite.name 

318 ) 

319 release_file = None 

320 

321 if release_file is not None: 

322 self._components = release_file["Components"].split() 

323 self.logger.info( 

324 "Using components listed in Release file: %s", 

325 " ".join(self._components), 

326 ) 

327 

328 if self._architectures is None: 

329 if release_file is None: # pragma: no cover 

330 self.logger.error( 

331 "No configured architectures and there is no release file in the %s suite.", 

332 target_suite.name, 

333 ) 

334 self.logger.error( 

335 'Please check if there is a "Release" file in %s', target_suite.path 

336 ) 

337 self.logger.error( 

338 'or if the config file contains a non-empty "ARCHITECTURES" field' 

339 ) 

340 raise MissingRequiredConfigurationError( 

341 missing_config_msg % "ARCHITECTURES" 

342 ) 

343 self._architectures = sorted( 

344 x for x in release_file["Architectures"].split() if x != "all" 

345 ) 

346 self.logger.info( 

347 "Using architectures listed in Release file: %s", 

348 " ".join(self._architectures), 

349 ) 

350 

351 def _read_sources(self, basedir: str) -> dict[str, SourcePackage]: 

352 """Read the list of source packages from the specified directory 

353 

354 The source packages are read from the `Sources' file within the 

355 directory specified as `basedir' parameter. Considering the 

356 large amount of memory needed, not all the fields are loaded 

357 in memory. The available fields are Version, Maintainer and Section. 

358 

359 The method returns a list where every item represents a source 

360 package as a dictionary. 

361 """ 

362 

363 if self._components: 

364 sources: dict[str, SourcePackage] = {} 

365 for component in self._components: 

366 filename = os.path.join(basedir, component, "source", "Sources") 

367 try: 

368 filename = possibly_compressed(filename) 

369 except FileNotFoundError: 

370 if component == "non-free-firmware": 

371 self.logger.info("Skipping %s as it doesn't exist", filename) 

372 continue 

373 raise 

374 self.logger.info("Loading source packages from %s", filename) 

375 read_sources_file( 

376 filename, 

377 sources, 

378 not self._base_config.archall_inconsistency_allowed, 

379 ) 

380 else: 

381 filename = os.path.join(basedir, "Sources") 

382 self.logger.info("Loading source packages from %s", filename) 

383 sources = read_sources_file( 

384 filename, None, not self._base_config.archall_inconsistency_allowed 

385 ) 

386 

387 return sources 

388 

389 @staticmethod 

390 def merge_fields( 

391 get_field: Callable[[str], Optional[str]], 

392 *field_names: str, 

393 separator: str = ", ", 

394 ) -> Optional[str]: 

395 """Merge two or more fields (filtering out empty fields; returning None if all are empty)""" 

396 return separator.join(filter(None, (get_field(x) for x in field_names))) or None 

397 

398 def _read_packages_file( 

399 self, 

400 filename: str, 

401 arch: str, 

402 srcdist: dict[str, SourcePackage], 

403 packages: Optional[dict[str, BinaryPackage]] = None, 

404 intern: Callable[[str], str] = sys.intern, 

405 ) -> dict[str, BinaryPackage]: 

406 self.logger.info("Loading binary packages from %s", filename) 

407 

408 if packages is None: 

409 packages = {} 

410 

411 all_binaries = self._all_binaries 

412 

413 tag_file = apt_pkg.TagFile(filename) 

414 get_field = tag_file.section.get 

415 step = tag_file.step 

416 

417 while step(): 

418 pkg = get_field("Package") 

419 version = get_field("Version") 

420 

421 # There may be multiple versions of any arch:all packages 

422 # (in unstable) if some architectures have out-of-date 

423 # binaries. We only ever consider the package with the 

424 # largest version for migration. 

425 pkg = intern(pkg) 

426 version = intern(version) 

427 pkg_id = BinaryPackageId(pkg, version, arch) 

428 

429 if pkg in packages: 

430 old_pkg_data = packages[pkg] 

431 if apt_pkg.version_compare(old_pkg_data.version, version) > 0: 

432 continue 

433 old_pkg_id = old_pkg_data.pkg_id 

434 old_src_binaries = srcdist[old_pkg_data.source].binaries 

435 old_src_binaries.remove(old_pkg_id) 

436 # This may seem weird at first glance, but the current code rely 

437 # on this behaviour to avoid issues like #709460. Admittedly it 

438 # is a special case, but Britney will attempt to remove the 

439 # arch:all packages without this. Even then, this particular 

440 # stop-gap relies on the packages files being sorted by name 

441 # and the version, so it is not particularly resilient. 

442 if pkg_id not in old_src_binaries: 442 ↛ 448line 442 didn't jump to line 448, because the condition on line 442 was never false

443 old_src_binaries.add(pkg_id) 

444 

445 # Merge Pre-Depends with Depends and Conflicts with 

446 # Breaks. Britney is not interested in the "finer 

447 # semantic differences" of these fields anyway. 

448 deps = DebMirrorLikeSuiteContentLoader.merge_fields( 

449 get_field, "Pre-Depends", "Depends" 

450 ) 

451 conflicts = DebMirrorLikeSuiteContentLoader.merge_fields( 

452 get_field, "Conflicts", "Breaks" 

453 ) 

454 

455 ess = False 

456 if get_field("Essential", "no") == "yes": 

457 ess = True 

458 

459 source = pkg 

460 source_version = version 

461 # retrieve the name and the version of the source package 

462 source_raw = get_field("Source") 

463 if source_raw: 

464 source = intern(source_raw.split(" ")[0]) 

465 if "(" in source_raw: 

466 source_version = intern( 

467 source_raw[source_raw.find("(") + 1 : source_raw.find(")")] 

468 ) 

469 

470 provides_raw = get_field("Provides") 

471 if provides_raw: 

472 provides = parse_provides( 

473 provides_raw, pkg_id=pkg_id, logger=self.logger 

474 ) 

475 else: 

476 provides = [] 

477 

478 raw_arch = intern(get_field("Architecture")) 

479 if raw_arch not in {"all", arch}: # pragma: no cover 

480 raise AssertionError( 

481 "%s has wrong architecture (%s) - should be either %s or all" 

482 % (str(pkg_id), raw_arch, arch) 

483 ) 

484 

485 builtusing_raw = get_field("Built-Using") 

486 if builtusing_raw: 

487 builtusing = parse_builtusing( 

488 builtusing_raw, pkg_id=pkg_id, logger=self.logger 

489 ) 

490 else: 

491 builtusing = [] 

492 

493 dpkg = BinaryPackage( 

494 version, 

495 intern(get_field("Section")), 

496 source, 

497 source_version, 

498 raw_arch, 

499 get_field("Multi-Arch"), 

500 deps, 

501 conflicts, 

502 provides, 

503 ess, 

504 pkg_id, 

505 builtusing, 

506 ) 

507 

508 # if the source package is available in the distribution, then register this binary package 

509 if source in srcdist: 

510 # There may be multiple versions of any arch:all packages 

511 # (in unstable) if some architectures have out-of-date 

512 # binaries. We only want to include the package in the 

513 # source -> binary mapping once. It doesn't matter which 

514 # of the versions we include as only the package name and 

515 # architecture are recorded. 

516 srcdist[source].binaries.add(pkg_id) 

517 # if the source package doesn't exist, create a fake one 

518 else: 

519 srcdist[source] = SourcePackage( 

520 source, 

521 source_version, 

522 "faux", 

523 {pkg_id}, 

524 None, 

525 True, 

526 None, 

527 None, 

528 [], 

529 [], 

530 ) 

531 

532 # add the resulting dictionary to the package list 

533 packages[pkg] = dpkg 

534 if pkg_id in all_binaries: 

535 self._merge_pkg_entries(pkg, arch, all_binaries[pkg_id], dpkg) 

536 else: 

537 all_binaries[pkg_id] = dpkg 

538 

539 # add the resulting dictionary to the package list 

540 packages[pkg] = dpkg 

541 

542 return packages 

543 

544 def _read_binaries( 

545 self, suite: Suite, architectures: Iterable[str] 

546 ) -> tuple[ 

547 dict[str, dict[str, BinaryPackage]], dict[str, dict[str, set[tuple[str, str]]]] 

548 ]: 

549 """Read the list of binary packages from the specified directory 

550 

551 This method reads all the binary packages for a given suite. 

552 

553 If the "components" config parameter is set, the directory should 

554 be the "suite" directory of a local mirror (i.e. the one containing 

555 the "Release" file). Otherwise, Britney will read the packages 

556 information from all the "Packages_${arch}" files referenced by 

557 the "architectures" parameter. 

558 

559 Considering the 

560 large amount of memory needed, not all the fields are loaded 

561 in memory. The available fields are Version, Source, Multi-Arch, 

562 Depends, Conflicts, Provides and Architecture. 

563 

564 The `Provides' field is used to populate the virtual packages list. 

565 

566 The method returns a tuple of two dicts with architecture as key and 

567 another dict as value. The value dicts of the first dict map 

568 from binary package name to "BinaryPackage" objects; the other second 

569 value dicts map a package name to the packages providing them. 

570 """ 

571 binaries: dict[str, dict[str, BinaryPackage]] = {} 

572 provides_table: dict[str, dict[str, set[tuple[str, str]]]] = {} 

573 basedir = suite.path 

574 

575 if self._components: 

576 release_file = read_release_file(basedir) 

577 listed_archs = set(release_file["Architectures"].split()) 

578 for arch in architectures: 

579 packages: dict[str, BinaryPackage] = {} 

580 if arch not in listed_archs: 580 ↛ 581line 580 didn't jump to line 581, because the condition on line 580 was never true

581 self.logger.info( 

582 "Skipping arch %s for %s: It is not listed in the Release file", 

583 arch, 

584 suite.name, 

585 ) 

586 binaries[arch] = {} 

587 provides_table[arch] = {} 

588 continue 

589 for component in self._components: 

590 binary_dir = "binary-%s" % arch 

591 filename = os.path.join(basedir, component, binary_dir, "Packages") 

592 try: 

593 filename = possibly_compressed(filename) 

594 except FileNotFoundError: 

595 if component == "non-free-firmware": 

596 self.logger.info( 

597 "Skipping %s as it doesn't exist", filename 

598 ) 

599 continue 

600 raise 

601 udeb_filename = os.path.join( 

602 basedir, component, "debian-installer", binary_dir, "Packages" 

603 ) 

604 # We assume the udeb Packages file is present if the 

605 # regular one is present 

606 udeb_filename = possibly_compressed(udeb_filename) 

607 self._read_packages_file(filename, arch, suite.sources, packages) 

608 self._read_packages_file( 

609 udeb_filename, arch, suite.sources, packages 

610 ) 

611 # create provides 

612 provides = create_provides_map(packages) 

613 binaries[arch] = packages 

614 provides_table[arch] = provides 

615 else: 

616 for arch in architectures: 

617 filename = os.path.join(basedir, "Packages_%s" % arch) 

618 packages = self._read_packages_file(filename, arch, suite.sources) 

619 provides = create_provides_map(packages) 

620 binaries[arch] = packages 

621 provides_table[arch] = provides 

622 

623 return (binaries, provides_table) 

624 

625 def _merge_pkg_entries( 

626 self, 

627 package: str, 

628 parch: str, 

629 pkg_entry1: BinaryPackage, 

630 pkg_entry2: BinaryPackage, 

631 ) -> None: 

632 bad = [] 

633 for f in self.CHECK_FIELDS: 

634 v1 = getattr(pkg_entry1, f) 

635 v2 = getattr(pkg_entry2, f) 

636 if v1 != v2: # pragma: no cover 

637 bad.append((f, v1, v2)) 

638 

639 if bad: # pragma: no cover 

640 self.logger.error( 

641 "Mismatch found %s %s %s differs", package, pkg_entry1.version, parch 

642 ) 

643 for f, v1, v2 in bad: 

644 self.logger.info(" ... %s %s != %s", f, v1, v2) 

645 raise ValueError("Inconsistent / Unsupported data set") 

646 

647 # Merge ESSENTIAL if necessary 

648 assert pkg_entry1.is_essential or not pkg_entry2.is_essential