Coverage for britney2/inputs/suiteloader.py: 90%

269 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-10-17 17:32 +0000

1import logging 

2import optparse 

3import os 

4import sys 

5from abc import abstractmethod 

6from collections.abc import Callable, Iterable 

7from typing import Any, Literal, Optional, TypeVar, overload 

8 

9import apt_pkg 

10 

11from britney2 import ( 

12 BinaryPackage, 

13 BinaryPackageId, 

14 PackageId, 

15 SourcePackage, 

16 Suite, 

17 SuiteClass, 

18 Suites, 

19 TargetSuite, 

20) 

21from britney2.utils import ( 

22 create_provides_map, 

23 parse_builtusing, 

24 parse_provides, 

25 possibly_compressed, 

26 read_release_file, 

27 read_sources_file, 

28) 

29 

30 

31class MissingRequiredConfigurationError(RuntimeError): 

32 pass 

33 

34 

35_T = TypeVar("_T") 

36 

37 

38class SuiteContentLoader: 

39 def __init__(self, base_config: optparse.Values) -> None: 

40 self._base_config = base_config 

41 self._architectures: list[str] = SuiteContentLoader.config_str_as_list( 

42 base_config.architectures 

43 ) 

44 self._nobreakall_arches: list[str] = SuiteContentLoader.config_str_as_list( 

45 base_config.nobreakall_arches, [] 

46 ) 

47 self._outofsync_arches: list[str] = SuiteContentLoader.config_str_as_list( 

48 base_config.outofsync_arches, [] 

49 ) 

50 self._break_arches: list[str] = SuiteContentLoader.config_str_as_list( 

51 base_config.break_arches, [] 

52 ) 

53 self._new_arches: list[str] = SuiteContentLoader.config_str_as_list( 

54 base_config.new_arches, [] 

55 ) 

56 self._components: list[str] = [] 

57 self._all_binaries: dict[BinaryPackageId, BinaryPackage] = {} 

58 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

59 self.logger = logging.getLogger(logger_name) 

60 

61 @overload 

62 @staticmethod 

63 def config_str_as_list(value: Literal[None], default_value: _T) -> _T: ... 63 ↛ exitline 63 didn't return from function 'config_str_as_list' because

64 

65 @overload 

66 @staticmethod 

67 def config_str_as_list(value: str, default_value: Any) -> list[str]: ... 67 ↛ exitline 67 didn't return from function 'config_str_as_list' because

68 

69 @overload 

70 @staticmethod 

71 def config_str_as_list(value: Any, default_value: Any | None = None) -> Any: ... 71 ↛ exitline 71 didn't return from function 'config_str_as_list' because

72 

73 @staticmethod 

74 def config_str_as_list(value: Any, default_value: Any | None = None) -> Any: 

75 if value is None: 

76 return default_value 

77 if isinstance(value, str): 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true

78 return value.split() 

79 return value 

80 

81 @property 

82 def architectures(self) -> list[str]: 

83 return self._architectures 

84 

85 @property 

86 def nobreakall_arches(self) -> list[str]: 

87 return self._nobreakall_arches 

88 

89 @property 

90 def outofsync_arches(self) -> list[str]: 

91 return self._outofsync_arches 

92 

93 @property 

94 def break_arches(self) -> list[str]: 

95 return self._break_arches 

96 

97 @property 

98 def new_arches(self) -> list[str]: 

99 return self._new_arches 

100 

101 @property 

102 def components(self) -> list[str]: 

103 return self._components 

104 

105 def all_binaries(self) -> dict[BinaryPackageId, BinaryPackage]: 

106 return self._all_binaries 

107 

108 @abstractmethod 

109 def load_suites(self) -> Suites: # pragma: no cover 

110 pass 

111 

112 

113class DebMirrorLikeSuiteContentLoader(SuiteContentLoader): 

114 CHECK_FIELDS = [ 

115 "source", 

116 "source_version", 

117 "architecture", 

118 "multi_arch", 

119 "depends", 

120 "conflicts", 

121 "provides", 

122 ] 

123 

124 def load_suites(self) -> Suites: 

125 suites = [] 

126 missing_config_msg = ( 

127 "Configuration %s is not set in the config (and cannot be auto-detected)" 

128 ) 

129 for suitename in ("testing", "unstable", "pu", "tpu"): 

130 suffix = suitename if suitename in {"pu", "tpu"} else "" 

131 if hasattr(self._base_config, suitename): 

132 suite_path = getattr(self._base_config, suitename) 

133 suite_class = SuiteClass.TARGET_SUITE 

134 if suitename != "testing": 

135 suite_class = ( 

136 SuiteClass.ADDITIONAL_SOURCE_SUITE 

137 if suffix 

138 else SuiteClass.PRIMARY_SOURCE_SUITE 

139 ) 

140 suites.append( 

141 Suite( 

142 suite_class, suitename, suite_path, suite_short_name=suffix 

143 ) 

144 ) 

145 else: 

146 target_suite = TargetSuite( 

147 suite_class, suitename, suite_path, suite_short_name=suffix 

148 ) 

149 else: 

150 if suitename in {"testing", "unstable"}: # pragma: no cover 

151 self.logger.error(missing_config_msg, suitename.upper()) 

152 raise MissingRequiredConfigurationError( 

153 missing_config_msg % suitename.upper() 

154 ) 

155 self.logger.info( 

156 "Optional suite %s is not defined (config option: %s) ", 

157 suitename, 

158 suitename.upper(), 

159 ) 

160 

161 assert target_suite is not None, "Logic regression, this should be impossible." 

162 

163 self._check_release_file(target_suite, missing_config_msg) 

164 self._setup_architectures() 

165 

166 # read the source and binary packages for the involved distributions. Notes: 

167 # - Load testing last as some live-data tests have more complete information in 

168 # unstable 

169 # - Load all sources before any of the binaries. 

170 for suite in [target_suite, *suites]: 

171 sources = self._read_sources(suite.path) 

172 self._update_suite_name(suite) 

173 suite.sources = sources 

174 (suite.binaries, suite.provides_table) = self._read_binaries( 

175 suite, self._architectures 

176 ) 

177 self._fixup_faux_arch_all_binaries(suite) 

178 

179 return Suites(target_suite, suites) 

180 

181 def _fixup_faux_arch_all_binaries(self, suite: Suite) -> None: 

182 """remove faux arch:all binary if a real arch:all binary is available 

183 

184 We don't know for which architectures bin/$something must be available 

185 except for arch:all, which should be available in each arch. The 

186 information that a source builds an arch:all binary is available during 

187 the loading of the sources, but we have to pick an order in which to 

188 load the files and the Sources is loaded before the Packages are 

189 read. Hence we fake an arch:all binary during source loading, but it 

190 shouldn't be there in the final list if real arch:all binaries are 

191 present in the Packages file. 

192 

193 Also, if we keep the fake binary, it should be added to the lists of 

194 known binaries in the suite, otherwise britney2 trips later on. 

195 

196 """ 

197 

198 all_binaries = self._all_binaries 

199 binaries = suite.binaries 

200 faux_arches = ( 

201 set(self.architectures) 

202 - set(self.break_arches) 

203 - set(self.outofsync_arches) 

204 - set(self.new_arches) 

205 ) 

206 

207 for srcpkg in suite.sources.values(): 

208 faux = {x for x in srcpkg.binaries if x[2] == "faux"} 

209 if faux and [ 

210 x 

211 for x in (srcpkg.binaries - faux) 

212 if all_binaries[x].architecture == "all" 

213 ]: 

214 srcpkg.binaries -= faux 

215 

216 # Calculate again because we may have changed the set 

217 faux = {x for x in srcpkg.binaries if x[2] == "faux"} 

218 for binpkg_id in faux: 218 ↛ 219line 218 didn't jump to line 219 because the loop on line 218 never started

219 bin_data = BinaryPackage( 

220 binpkg_id[1], 

221 sys.intern("faux"), 

222 srcpkg.source, 

223 srcpkg.version, 

224 "all", 

225 "no", 

226 None, 

227 None, 

228 [], 

229 False, 

230 binpkg_id, 

231 [], 

232 ) 

233 for arch_all in faux_arches: 

234 binaries[arch_all][binpkg_id[0]] = bin_data 

235 all_binaries[binpkg_id] = bin_data 

236 suite.binaries = binaries 

237 

238 def _setup_architectures(self) -> None: 

239 allarches = self._architectures 

240 # Re-order the architectures such as that the most important architectures are listed first 

241 # (this is to make the log easier to read as most important architectures will be listed 

242 # first) 

243 arches = [x for x in allarches if x in self._nobreakall_arches] 

244 arches += [ 

245 x for x in allarches if x not in arches and x not in self._outofsync_arches 

246 ] 

247 arches += [ 

248 x for x in allarches if x not in arches and x not in self._break_arches 

249 ] 

250 arches += [ 

251 x for x in allarches if x not in arches and x not in self._new_arches 

252 ] 

253 arches += [x for x in allarches if x not in arches] 

254 

255 # Intern architectures for efficiency; items in this list will be used for lookups and 

256 # building items/keys - by intern strings we reduce memory (considerably). 

257 self._architectures = [sys.intern(arch) for arch in allarches] 

258 assert "all" not in self._architectures, "all not allowed in architectures" 

259 

260 def _get_suite_name( 

261 self, suite: Suite, release_file: "apt_pkg.TagSection[str]" 

262 ) -> tuple[str, str]: 

263 name = None 

264 codename = None 

265 if "Suite" in release_file: 265 ↛ 267line 265 didn't jump to line 267 because the condition on line 265 was always true

266 name = release_file["Suite"] 

267 if "Codename" in release_file: 

268 codename = release_file["Codename"] 

269 

270 if name is None: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 name = codename 

272 elif codename is None: 

273 codename = name 

274 

275 if name is None: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 self.logger.warning( 

277 'Either of the fields "Suite" or "Codename" ' 

278 + "should be present in a release file." 

279 ) 

280 self.logger.error( 

281 "Release file for suite %s is missing both the " 

282 + '"Suite" and the "Codename" fields.', 

283 suite.name, 

284 ) 

285 raise KeyError("Suite") 

286 

287 assert codename is not None # required for type checking 

288 return (name, codename) 

289 

290 def _update_suite_name(self, suite: Suite) -> None: 

291 try: 

292 release_file = read_release_file(suite.path) 

293 except FileNotFoundError: 

294 self.logger.info( 

295 "The %s suite does not have a Release file, unable to update the name", 

296 suite.name, 

297 ) 

298 release_file = None 

299 

300 if release_file is not None: 

301 (suite.name, suite.codename) = self._get_suite_name(suite, release_file) 

302 self.logger.info("Using suite name from Release file: %s", suite.name) 

303 self.logger.debug( 

304 "Using suite codename from Release file: %s", suite.codename 

305 ) 

306 

307 def _check_release_file(self, target_suite: Suite, missing_config_msg: str) -> None: 

308 try: 

309 release_file = read_release_file(target_suite.path) 

310 self.logger.info( 

311 "Found a Release file in %s - using that for defaults", 

312 target_suite.name, 

313 ) 

314 except FileNotFoundError: 

315 self.logger.info( 

316 "The %s suite does not have a Release file.", target_suite.name 

317 ) 

318 release_file = None 

319 

320 if release_file is not None: 

321 self._components = release_file["Components"].split() 

322 self.logger.info( 

323 "Using components listed in Release file: %s", 

324 " ".join(self._components), 

325 ) 

326 

327 if self._architectures is None: 

328 if release_file is None: # pragma: no cover 

329 self.logger.error( 

330 "No configured architectures and there is no release file in the %s suite.", 

331 target_suite.name, 

332 ) 

333 self.logger.error( 

334 'Please check if there is a "Release" file in %s', target_suite.path 

335 ) 

336 self.logger.error( 

337 'or if the config file contains a non-empty "ARCHITECTURES" field' 

338 ) 

339 raise MissingRequiredConfigurationError( 

340 missing_config_msg % "ARCHITECTURES" 

341 ) 

342 self._architectures = sorted( 

343 x for x in release_file["Architectures"].split() if x != "all" 

344 ) 

345 self.logger.info( 

346 "Using architectures listed in Release file: %s", 

347 " ".join(self._architectures), 

348 ) 

349 

350 def _read_sources(self, basedir: str) -> dict[str, SourcePackage]: 

351 """Read the list of source packages from the specified directory 

352 

353 The source packages are read from the `Sources' file within the 

354 directory specified as `basedir' parameter. Considering the 

355 large amount of memory needed, not all the fields are loaded 

356 in memory. The available fields are Version, Maintainer and Section. 

357 

358 The method returns a list where every item represents a source 

359 package as a dictionary. 

360 """ 

361 

362 if self._components: 

363 sources: dict[str, SourcePackage] = {} 

364 for component in self._components: 

365 filename = os.path.join(basedir, component, "source", "Sources") 

366 try: 

367 filename = possibly_compressed(filename) 

368 except FileNotFoundError: 

369 if component == "non-free-firmware": 

370 self.logger.info("Skipping %s as it doesn't exist", filename) 

371 continue 

372 raise 

373 self.logger.info("Loading source packages from %s", filename) 

374 read_sources_file( 

375 filename, 

376 sources, 

377 not self._base_config.archall_inconsistency_allowed, 

378 ) 

379 else: 

380 filename = os.path.join(basedir, "Sources") 

381 self.logger.info("Loading source packages from %s", filename) 

382 sources = read_sources_file( 

383 filename, None, not self._base_config.archall_inconsistency_allowed 

384 ) 

385 

386 return sources 

387 

388 @staticmethod 

389 def merge_fields( 

390 get_field: Callable[[str], str | None], 

391 *field_names: str, 

392 separator: str = ", ", 

393 ) -> str | None: 

394 """Merge two or more fields (filtering out empty fields; returning None if all are empty)""" 

395 return separator.join(filter(None, (get_field(x) for x in field_names))) or None 

396 

397 def _read_packages_file( 

398 self, 

399 filename: str, 

400 arch: str, 

401 srcdist: dict[str, SourcePackage], 

402 packages: dict[str, BinaryPackage] | None = None, 

403 intern: Callable[[str], str] = sys.intern, 

404 ) -> dict[str, BinaryPackage]: 

405 self.logger.info("Loading binary packages from %s", filename) 

406 

407 if packages is None: 

408 packages = {} 

409 

410 all_binaries = self._all_binaries 

411 

412 tag_file = apt_pkg.TagFile(filename) 

413 get_field = tag_file.section.get 

414 step = tag_file.step 

415 

416 while step(): 

417 pkg = get_field("Package") 

418 version = get_field("Version") 

419 

420 # There may be multiple versions of any arch:all packages 

421 # (in unstable) if some architectures have out-of-date 

422 # binaries. We only ever consider the package with the 

423 # largest version for migration. 

424 pkg = intern(pkg) 

425 version = intern(version) 

426 pkg_id = BinaryPackageId(pkg, version, arch) 

427 

428 if pkg in packages: 

429 old_pkg_data = packages[pkg] 

430 if apt_pkg.version_compare(old_pkg_data.version, version) > 0: 

431 continue 

432 old_pkg_id = old_pkg_data.pkg_id 

433 old_src_binaries = srcdist[old_pkg_data.source].binaries 

434 old_src_binaries.remove(old_pkg_id) 

435 # This may seem weird at first glance, but the current code rely 

436 # on this behaviour to avoid issues like #709460. Admittedly it 

437 # is a special case, but Britney will attempt to remove the 

438 # arch:all packages without this. Even then, this particular 

439 # stop-gap relies on the packages files being sorted by name 

440 # and the version, so it is not particularly resilient. 

441 if pkg_id not in old_src_binaries: 441 ↛ 447line 441 didn't jump to line 447 because the condition on line 441 was always true

442 old_src_binaries.add(pkg_id) 

443 

444 # Merge Pre-Depends with Depends and Conflicts with 

445 # Breaks. Britney is not interested in the "finer 

446 # semantic differences" of these fields anyway. 

447 deps = DebMirrorLikeSuiteContentLoader.merge_fields( 

448 get_field, "Pre-Depends", "Depends" 

449 ) 

450 conflicts = DebMirrorLikeSuiteContentLoader.merge_fields( 

451 get_field, "Conflicts", "Breaks" 

452 ) 

453 

454 ess = False 

455 if get_field("Essential", "no") == "yes": 

456 ess = True 

457 

458 source = pkg 

459 source_version = version 

460 # retrieve the name and the version of the source package 

461 source_raw = get_field("Source") 

462 if source_raw: 

463 source = intern(source_raw.split(" ")[0]) 

464 if "(" in source_raw: 

465 source_version = intern( 

466 source_raw[source_raw.find("(") + 1 : source_raw.find(")")] 

467 ) 

468 

469 provides_raw = get_field("Provides") 

470 if provides_raw: 

471 provides = parse_provides( 

472 provides_raw, pkg_id=pkg_id, logger=self.logger 

473 ) 

474 else: 

475 provides = [] 

476 

477 raw_arch = intern(get_field("Architecture")) 

478 if raw_arch not in {"all", arch}: # pragma: no cover 

479 raise AssertionError( 

480 "%s has wrong architecture (%s) - should be either %s or all" 

481 % (str(pkg_id), raw_arch, arch) 

482 ) 

483 

484 builtusing_raw = get_field("Built-Using") 

485 if builtusing_raw: 

486 builtusing = parse_builtusing( 

487 builtusing_raw, pkg_id=pkg_id, logger=self.logger 

488 ) 

489 else: 

490 builtusing = [] 

491 

492 dpkg = BinaryPackage( 

493 version, 

494 intern(get_field("Section")), 

495 source, 

496 source_version, 

497 raw_arch, 

498 get_field("Multi-Arch"), 

499 deps, 

500 conflicts, 

501 provides, 

502 ess, 

503 pkg_id, 

504 builtusing, 

505 ) 

506 

507 # if the source package is available in the distribution, then register this binary package 

508 if source in srcdist: 

509 # There may be multiple versions of any arch:all packages 

510 # (in unstable) if some architectures have out-of-date 

511 # binaries. We only want to include the package in the 

512 # source -> binary mapping once. It doesn't matter which 

513 # of the versions we include as only the package name and 

514 # architecture are recorded. 

515 srcdist[source].binaries.add(pkg_id) 

516 # if the source package doesn't exist, create a fake one 

517 else: 

518 srcdist[source] = SourcePackage( 

519 source, 

520 source_version, 

521 "faux", 

522 {pkg_id}, 

523 None, 

524 True, 

525 None, 

526 None, 

527 [], 

528 [], 

529 ) 

530 

531 # add the resulting dictionary to the package list 

532 packages[pkg] = dpkg 

533 if pkg_id in all_binaries: 

534 self._merge_pkg_entries(pkg, arch, all_binaries[pkg_id], dpkg) 

535 else: 

536 all_binaries[pkg_id] = dpkg 

537 

538 # add the resulting dictionary to the package list 

539 packages[pkg] = dpkg 

540 

541 return packages 

542 

543 def _read_binaries( 

544 self, suite: Suite, architectures: Iterable[str] 

545 ) -> tuple[ 

546 dict[str, dict[str, BinaryPackage]], dict[str, dict[str, set[tuple[str, str]]]] 

547 ]: 

548 """Read the list of binary packages from the specified directory 

549 

550 This method reads all the binary packages for a given suite. 

551 

552 If the "components" config parameter is set, the directory should 

553 be the "suite" directory of a local mirror (i.e. the one containing 

554 the "Release" file). Otherwise, Britney will read the packages 

555 information from all the "Packages_${arch}" files referenced by 

556 the "architectures" parameter. 

557 

558 Considering the 

559 large amount of memory needed, not all the fields are loaded 

560 in memory. The available fields are Version, Source, Multi-Arch, 

561 Depends, Conflicts, Provides and Architecture. 

562 

563 The `Provides' field is used to populate the virtual packages list. 

564 

565 The method returns a tuple of two dicts with architecture as key and 

566 another dict as value. The value dicts of the first dict map 

567 from binary package name to "BinaryPackage" objects; the other second 

568 value dicts map a package name to the packages providing them. 

569 """ 

570 binaries: dict[str, dict[str, BinaryPackage]] = {} 

571 provides_table: dict[str, dict[str, set[tuple[str, str]]]] = {} 

572 basedir = suite.path 

573 

574 if self._components: 

575 release_file = read_release_file(basedir) 

576 listed_archs = set(release_file["Architectures"].split()) 

577 for arch in architectures: 

578 packages: dict[str, BinaryPackage] = {} 

579 if arch not in listed_archs: 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true

580 self.logger.info( 

581 "Skipping arch %s for %s: It is not listed in the Release file", 

582 arch, 

583 suite.name, 

584 ) 

585 binaries[arch] = {} 

586 provides_table[arch] = {} 

587 continue 

588 for component in self._components: 

589 binary_dir = "binary-%s" % arch 

590 filename = os.path.join(basedir, component, binary_dir, "Packages") 

591 try: 

592 filename = possibly_compressed(filename) 

593 except FileNotFoundError: 

594 if component == "non-free-firmware": 

595 self.logger.info( 

596 "Skipping %s as it doesn't exist", filename 

597 ) 

598 continue 

599 raise 

600 udeb_filename = os.path.join( 

601 basedir, component, "debian-installer", binary_dir, "Packages" 

602 ) 

603 # We assume the udeb Packages file is present if the 

604 # regular one is present 

605 udeb_filename = possibly_compressed(udeb_filename) 

606 self._read_packages_file(filename, arch, suite.sources, packages) 

607 self._read_packages_file( 

608 udeb_filename, arch, suite.sources, packages 

609 ) 

610 # create provides 

611 provides = create_provides_map(packages) 

612 binaries[arch] = packages 

613 provides_table[arch] = provides 

614 else: 

615 for arch in architectures: 

616 filename = os.path.join(basedir, "Packages_%s" % arch) 

617 packages = self._read_packages_file(filename, arch, suite.sources) 

618 provides = create_provides_map(packages) 

619 binaries[arch] = packages 

620 provides_table[arch] = provides 

621 

622 return (binaries, provides_table) 

623 

624 def _merge_pkg_entries( 

625 self, 

626 package: str, 

627 parch: str, 

628 pkg_entry1: BinaryPackage, 

629 pkg_entry2: BinaryPackage, 

630 ) -> None: 

631 bad = [] 

632 for f in self.CHECK_FIELDS: 

633 v1 = getattr(pkg_entry1, f) 

634 v2 = getattr(pkg_entry2, f) 

635 if v1 != v2: # pragma: no cover 

636 bad.append((f, v1, v2)) 

637 

638 if bad: # pragma: no cover 

639 self.logger.error( 

640 "Mismatch found %s %s %s differs", package, pkg_entry1.version, parch 

641 ) 

642 for f, v1, v2 in bad: 

643 self.logger.info(" ... %s %s != %s", f, v1, v2) 

644 raise ValueError("Inconsistent / Unsupported data set") 

645 

646 # Merge ESSENTIAL if necessary 

647 assert pkg_entry1.is_essential or not pkg_entry2.is_essential