Coverage for britney2/inputs/suiteloader.py: 92%
289 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-29 17:21 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-29 17:21 +0000
1import logging
2import optparse
3import os
4import sys
5from abc import abstractmethod
6from collections.abc import Callable, Iterable
7from typing import Any, Literal, TypeVar, overload
9import apt_pkg
11from britney2 import (
12 BinaryPackage,
13 BinaryPackageId,
14 SourcePackage,
15 Suite,
16 SuiteClass,
17 Suites,
18 TargetSuite,
19)
20from britney2.utils import (
21 create_provides_map,
22 parse_builtusing,
23 parse_provides,
24 possibly_compressed,
25 read_release_file,
26 read_sources_file,
27)
30class MissingRequiredConfigurationError(RuntimeError):
31 pass
34_T = TypeVar("_T")
37class SuiteContentLoader:
38 def __init__(self, base_config: optparse.Values) -> None:
39 self._base_config = base_config
40 self._architectures: list[str] = SuiteContentLoader.config_str_as_list(
41 base_config.architectures
42 )
43 self._nobreakall_arches: list[str] = SuiteContentLoader.config_str_as_list(
44 base_config.nobreakall_arches, []
45 )
46 self._outofsync_arches: list[str] = SuiteContentLoader.config_str_as_list(
47 base_config.outofsync_arches, []
48 )
49 self._break_arches: list[str] = SuiteContentLoader.config_str_as_list(
50 base_config.break_arches, []
51 )
52 self._new_arches: list[str] = SuiteContentLoader.config_str_as_list(
53 base_config.new_arches, []
54 )
55 self._components: list[str] = []
56 self._all_binaries: dict[BinaryPackageId, BinaryPackage] = {}
57 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
58 self.logger = logging.getLogger(logger_name)
60 @overload
61 @staticmethod
62 def config_str_as_list(value: Literal[None], default_value: _T) -> _T: ... 62 ↛ exitline 62 didn't return from function 'config_str_as_list' because
64 @overload
65 @staticmethod
66 def config_str_as_list(value: str, default_value: Any) -> list[str]: ... 66 ↛ exitline 66 didn't return from function 'config_str_as_list' because
68 @overload
69 @staticmethod
70 def config_str_as_list(value: Any, default_value: Any | None = None) -> Any: ... 70 ↛ exitline 70 didn't return from function 'config_str_as_list' because
72 @staticmethod
73 def config_str_as_list(value: Any, default_value: Any | None = None) -> Any:
74 if value is None:
75 return default_value
76 if isinstance(value, str): 76 ↛ 78line 76 didn't jump to line 78 because the condition on line 76 was always true
77 return value.split()
78 return value
80 @property
81 def architectures(self) -> list[str]:
82 return self._architectures
84 @property
85 def nobreakall_arches(self) -> list[str]:
86 return self._nobreakall_arches
88 @property
89 def outofsync_arches(self) -> list[str]:
90 return self._outofsync_arches
92 @property
93 def break_arches(self) -> list[str]:
94 return self._break_arches
96 @property
97 def new_arches(self) -> list[str]:
98 return self._new_arches
100 @property
101 def components(self) -> list[str]:
102 return self._components
104 def all_binaries(self) -> dict[BinaryPackageId, BinaryPackage]:
105 return self._all_binaries
107 @abstractmethod
108 def load_suites(self) -> Suites: # pragma: no cover
109 pass
112class DebMirrorLikeSuiteContentLoader(SuiteContentLoader):
113 CHECK_FIELDS = [
114 "source",
115 "source_version",
116 "architecture",
117 "multi_arch",
118 "depends",
119 "conflicts",
120 "provides",
121 ]
123 def load_suites(self) -> Suites:
124 suites = []
125 missing_config_msg = (
126 "Configuration %s is not set in the config (and cannot be auto-detected)"
127 )
128 for suitename in ("testing", "unstable", "pu", "tpu"):
129 suffix = suitename if suitename in {"pu", "tpu"} else ""
130 if hasattr(self._base_config, suitename):
131 suite_path = getattr(self._base_config, suitename)
132 suite_class = SuiteClass.TARGET_SUITE
133 if suitename != "testing":
134 suite_class = (
135 SuiteClass.ADDITIONAL_SOURCE_SUITE
136 if suffix
137 else SuiteClass.PRIMARY_SOURCE_SUITE
138 )
139 suites.append(
140 Suite(
141 suite_class, suitename, suite_path, suite_short_name=suffix
142 )
143 )
144 else:
145 target_suite = TargetSuite(
146 suite_class, suitename, suite_path, suite_short_name=suffix
147 )
148 else:
149 if suitename in {"testing", "unstable"}: # pragma: no cover
150 self.logger.error(missing_config_msg, suitename.upper())
151 raise MissingRequiredConfigurationError(
152 missing_config_msg % suitename.upper()
153 )
154 self.logger.info(
155 "Optional suite %s is not defined (config option: %s) ",
156 suitename,
157 suitename.upper(),
158 )
160 assert target_suite is not None, "Logic regression, this should be impossible."
162 self._check_release_file(target_suite, missing_config_msg)
163 self._setup_architectures()
165 # read the source and binary packages for the involved distributions. Notes:
166 # - Load testing last as some live-data tests have more complete information in
167 # unstable
168 # - Load all sources before any of the binaries.
169 for suite in [target_suite, *suites]:
170 sources = self._read_sources(suite.path)
171 self._update_suite_name(suite)
172 suite.sources = sources
173 (suite.binaries, suite.provides_table) = self._read_binaries(
174 suite, self._architectures
175 )
176 self._fixup_faux_arch_all_binaries(suite)
177 if self._base_config.be_strict_with_build_deps: 177 ↛ 169line 177 didn't jump to line 169 because the condition on line 177 was always true
178 self._add_build_dep_faux_binaries(suite)
180 return Suites(target_suite, suites)
182 def _fixup_faux_arch_all_binaries(self, suite: Suite) -> None:
183 """remove faux arch:all binary if a real arch:all binary is available
185 We don't know for which architectures bin/$something must be available
186 except for arch:all, which should be available in each arch. The
187 information that a source builds an arch:all binary is available during
188 the loading of the sources, but we have to pick an order in which to
189 load the files and the Sources is loaded before the Packages are
190 read. Hence we fake an arch:all binary during source loading, but it
191 shouldn't be there in the final list if real arch:all binaries are
192 present in the Packages file.
194 Also, if we keep the fake binary, it should be added to the lists of
195 known binaries in the suite, otherwise britney2 trips later on.
197 """
199 all_binaries = self._all_binaries
200 binaries = suite.binaries
201 faux_arches = (
202 set(self.architectures)
203 - set(self.break_arches)
204 - set(self.outofsync_arches)
205 - set(self.new_arches)
206 )
208 for srcpkg in suite.sources.values():
209 faux = {x for x in srcpkg.binaries if x[2] == "faux"}
210 if faux and [
211 x
212 for x in (srcpkg.binaries - faux)
213 if all_binaries[x].architecture == "all"
214 ]:
215 srcpkg.binaries -= faux
217 # Calculate again because we may have changed the set
218 faux = {x for x in srcpkg.binaries if x[2] == "faux"}
219 for binpkg_id in faux:
220 bin_data = BinaryPackage(
221 binpkg_id[1],
222 sys.intern("faux"),
223 srcpkg.source,
224 srcpkg.version,
225 "all",
226 "no",
227 None,
228 None,
229 [],
230 False,
231 binpkg_id,
232 [],
233 )
234 for arch_all in faux_arches:
235 binaries[arch_all][binpkg_id[0]] = bin_data
236 all_binaries[binpkg_id] = bin_data
237 suite.binaries = binaries
239 def _add_build_dep_faux_binaries(self, suite: Suite) -> None:
240 """Add faux packages that keep track of build depends
242 To ensure that Build-Depends are fully protected against inappropriate
243 removal or upgrade, we add faux packages to source packages containing
244 the Build-Depends as Depends.
245 """
247 all_binaries = self._all_binaries
248 for src_name, src_pkg in suite.sources.items():
249 # TODO: something with arch, which one?
250 archall = self._nobreakall_arches[0]
251 bd_pid = BinaryPackageId(
252 src_name + "-faux-build-depends", sys.intern(src_pkg.version), archall
253 )
254 deps = ""
255 for bds in (src_pkg.build_deps_arch, src_pkg.build_deps_indep):
256 if bds is not None:
257 for block in apt_pkg.parse_src_depends(bds, architecture=archall):
258 # Like the buildds, we don't care about alternatives
259 deps += "," + block[0][0]
260 if block[0][1] != "":
261 # The extra space in the middle is a workaround for
262 # an apt_pkg bug in bookworm
263 deps += "(" + block[0][2] + " " + block[0][1] + ")"
264 deps = deps.strip(",")
265 if deps == "":
266 continue
267 dpkg = BinaryPackage(
268 bd_pid[1],
269 sys.intern("faux"),
270 src_name,
271 src_pkg.version,
272 archall,
273 None,
274 sys.intern(deps),
275 None,
276 [],
277 False,
278 bd_pid,
279 [],
280 )
281 suite.binaries.setdefault(archall, {})[bd_pid[0]] = dpkg
282 src_pkg.binaries.add(bd_pid)
283 all_binaries[bd_pid] = dpkg
285 def _setup_architectures(self) -> None:
286 allarches = self._architectures
287 # Re-order the architectures such as that the most important architectures are listed first
288 # (this is to make the log easier to read as most important architectures will be listed
289 # first)
290 arches = [x for x in allarches if x in self._nobreakall_arches]
291 arches += [
292 x for x in allarches if x not in arches and x not in self._outofsync_arches
293 ]
294 arches += [
295 x for x in allarches if x not in arches and x not in self._break_arches
296 ]
297 arches += [
298 x for x in allarches if x not in arches and x not in self._new_arches
299 ]
300 arches += [x for x in allarches if x not in arches]
302 # Intern architectures for efficiency; items in this list will be used for lookups and
303 # building items/keys - by intern strings we reduce memory (considerably).
304 self._architectures = [sys.intern(arch) for arch in allarches]
305 assert "all" not in self._architectures, "all not allowed in architectures"
307 def _get_suite_name(
308 self, suite: Suite, release_file: "apt_pkg.TagSection[str]"
309 ) -> tuple[str, str]:
310 name = None
311 codename = None
312 if "Suite" in release_file: 312 ↛ 314line 312 didn't jump to line 314 because the condition on line 312 was always true
313 name = release_file["Suite"]
314 if "Codename" in release_file:
315 codename = release_file["Codename"]
317 if name is None: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true
318 name = codename
319 elif codename is None:
320 codename = name
322 if name is None: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 self.logger.warning(
324 'Either of the fields "Suite" or "Codename" '
325 + "should be present in a release file."
326 )
327 self.logger.error(
328 "Release file for suite %s is missing both the "
329 + '"Suite" and the "Codename" fields.',
330 suite.name,
331 )
332 raise KeyError("Suite")
334 assert codename is not None # required for type checking
335 return (name, codename)
337 def _update_suite_name(self, suite: Suite) -> None:
338 try:
339 release_file = read_release_file(suite.path)
340 except FileNotFoundError:
341 self.logger.info(
342 "The %s suite does not have a Release file, unable to update the name",
343 suite.name,
344 )
345 release_file = None
347 if release_file is not None:
348 (suite.name, suite.codename) = self._get_suite_name(suite, release_file)
349 self.logger.info("Using suite name from Release file: %s", suite.name)
350 self.logger.debug(
351 "Using suite codename from Release file: %s", suite.codename
352 )
354 def _check_release_file(self, target_suite: Suite, missing_config_msg: str) -> None:
355 try:
356 release_file = read_release_file(target_suite.path)
357 self.logger.info(
358 "Found a Release file in %s - using that for defaults",
359 target_suite.name,
360 )
361 except FileNotFoundError:
362 self.logger.info(
363 "The %s suite does not have a Release file.", target_suite.name
364 )
365 release_file = None
367 if release_file is not None:
368 self._components = release_file["Components"].split()
369 self.logger.info(
370 "Using components listed in Release file: %s",
371 " ".join(self._components),
372 )
374 if self._architectures is None:
375 if release_file is None: # pragma: no cover
376 self.logger.error(
377 "No configured architectures and there is no release file in the %s suite.",
378 target_suite.name,
379 )
380 self.logger.error(
381 'Please check if there is a "Release" file in %s', target_suite.path
382 )
383 self.logger.error(
384 'or if the config file contains a non-empty "ARCHITECTURES" field'
385 )
386 raise MissingRequiredConfigurationError(
387 missing_config_msg % "ARCHITECTURES"
388 )
389 self._architectures = sorted(
390 x for x in release_file["Architectures"].split() if x != "all"
391 )
392 self.logger.info(
393 "Using architectures listed in Release file: %s",
394 " ".join(self._architectures),
395 )
397 def _read_sources(self, basedir: str) -> dict[str, SourcePackage]:
398 """Read the list of source packages from the specified directory
400 The source packages are read from the `Sources' file within the
401 directory specified as `basedir' parameter. Considering the
402 large amount of memory needed, not all the fields are loaded
403 in memory. The available fields are Version, Maintainer and Section.
405 The method returns a list where every item represents a source
406 package as a dictionary.
407 """
409 if self._components:
410 sources: dict[str, SourcePackage] = {}
411 for component in self._components:
412 filename = os.path.join(basedir, component, "source", "Sources")
413 try:
414 filename = possibly_compressed(filename)
415 except FileNotFoundError:
416 if component == "non-free-firmware":
417 self.logger.info("Skipping %s as it doesn't exist", filename)
418 continue
419 raise
420 self.logger.info("Loading source packages from %s", filename)
421 read_sources_file(
422 filename,
423 sources,
424 not self._base_config.archall_inconsistency_allowed,
425 )
426 else:
427 filename = os.path.join(basedir, "Sources")
428 self.logger.info("Loading source packages from %s", filename)
429 sources = read_sources_file(
430 filename, None, not self._base_config.archall_inconsistency_allowed
431 )
433 return sources
435 @staticmethod
436 def merge_fields(
437 get_field: Callable[[str], str | None],
438 *field_names: str,
439 separator: str = ", ",
440 ) -> str | None:
441 """Merge two or more fields (filtering out empty fields; returning None if all are empty)"""
442 return separator.join(filter(None, (get_field(x) for x in field_names))) or None
444 def _read_packages_file(
445 self,
446 filename: str,
447 arch: str,
448 srcdist: dict[str, SourcePackage],
449 packages: dict[str, BinaryPackage] | None = None,
450 intern: Callable[[str], str] = sys.intern,
451 ) -> dict[str, BinaryPackage]:
452 self.logger.info("Loading binary packages from %s", filename)
454 if packages is None:
455 packages = {}
457 all_binaries = self._all_binaries
459 tag_file = apt_pkg.TagFile(filename)
460 get_field = tag_file.section.get
461 step = tag_file.step
463 while step():
464 pkg = get_field("Package")
465 version = get_field("Version")
467 # There may be multiple versions of any arch:all packages
468 # (in unstable) if some architectures have out-of-date
469 # binaries. We only ever consider the package with the
470 # largest version for migration.
471 pkg = intern(pkg)
472 version = intern(version)
473 pkg_id = BinaryPackageId(pkg, version, arch)
475 if pkg in packages:
476 old_pkg_data = packages[pkg]
477 if apt_pkg.version_compare(old_pkg_data.version, version) > 0:
478 continue
479 old_pkg_id = old_pkg_data.pkg_id
480 old_src_binaries = srcdist[old_pkg_data.source].binaries
481 old_src_binaries.remove(old_pkg_id)
482 # This may seem weird at first glance, but the current code rely
483 # on this behaviour to avoid issues like #709460. Admittedly it
484 # is a special case, but Britney will attempt to remove the
485 # arch:all packages without this. Even then, this particular
486 # stop-gap relies on the packages files being sorted by name
487 # and the version, so it is not particularly resilient.
488 if pkg_id not in old_src_binaries: 488 ↛ 494line 488 didn't jump to line 494 because the condition on line 488 was always true
489 old_src_binaries.add(pkg_id)
491 # Merge Pre-Depends with Depends and Conflicts with
492 # Breaks. Britney is not interested in the "finer
493 # semantic differences" of these fields anyway.
494 deps = DebMirrorLikeSuiteContentLoader.merge_fields(
495 get_field, "Pre-Depends", "Depends"
496 )
497 conflicts = DebMirrorLikeSuiteContentLoader.merge_fields(
498 get_field, "Conflicts", "Breaks"
499 )
501 ess = False
502 if get_field("Essential", "no") == "yes":
503 ess = True
505 source = pkg
506 source_version = version
507 # retrieve the name and the version of the source package
508 source_raw = get_field("Source")
509 if source_raw:
510 source = intern(source_raw.split(" ")[0])
511 if "(" in source_raw:
512 source_version = intern(
513 source_raw[source_raw.find("(") + 1 : source_raw.find(")")]
514 )
516 provides_raw = get_field("Provides")
517 if provides_raw:
518 provides = parse_provides(
519 provides_raw, pkg_id=pkg_id, logger=self.logger
520 )
521 else:
522 provides = []
524 raw_arch = intern(get_field("Architecture"))
525 if raw_arch not in {"all", arch}: # pragma: no cover
526 raise AssertionError(
527 "%s has wrong architecture (%s) - should be either %s or all"
528 % (str(pkg_id), raw_arch, arch)
529 )
531 builtusing_raw = get_field("Built-Using")
532 if builtusing_raw:
533 builtusing = parse_builtusing(
534 builtusing_raw, pkg_id=pkg_id, logger=self.logger
535 )
536 else:
537 builtusing = []
539 dpkg = BinaryPackage(
540 version,
541 intern(get_field("Section")),
542 source,
543 source_version,
544 raw_arch,
545 get_field("Multi-Arch"),
546 deps,
547 conflicts,
548 provides,
549 ess,
550 pkg_id,
551 builtusing,
552 )
554 # if the source package is available in the distribution, then register this binary package
555 if source in srcdist:
556 # There may be multiple versions of any arch:all packages
557 # (in unstable) if some architectures have out-of-date
558 # binaries. We only want to include the package in the
559 # source -> binary mapping once. It doesn't matter which
560 # of the versions we include as only the package name and
561 # architecture are recorded.
562 srcdist[source].binaries.add(pkg_id)
563 # if the source package doesn't exist, create a fake one
564 else:
565 srcdist[source] = SourcePackage(
566 source,
567 source_version,
568 "faux",
569 {pkg_id},
570 None,
571 True,
572 None,
573 None,
574 [],
575 [],
576 )
578 # add the resulting dictionary to the package list
579 packages[pkg] = dpkg
580 if pkg_id in all_binaries:
581 self._merge_pkg_entries(pkg, arch, all_binaries[pkg_id], dpkg)
582 else:
583 all_binaries[pkg_id] = dpkg
585 return packages
587 def _read_binaries(
588 self, suite: Suite, architectures: Iterable[str]
589 ) -> tuple[
590 dict[str, dict[str, BinaryPackage]], dict[str, dict[str, set[tuple[str, str]]]]
591 ]:
592 """Read the list of binary packages from the specified directory
594 This method reads all the binary packages for a given suite.
596 If the "components" config parameter is set, the directory should
597 be the "suite" directory of a local mirror (i.e. the one containing
598 the "Release" file). Otherwise, Britney will read the packages
599 information from all the "Packages_${arch}" files referenced by
600 the "architectures" parameter.
602 Considering the
603 large amount of memory needed, not all the fields are loaded
604 in memory. The available fields are Version, Source, Multi-Arch,
605 Depends, Conflicts, Provides and Architecture.
607 The `Provides' field is used to populate the virtual packages list.
609 The method returns a tuple of two dicts with architecture as key and
610 another dict as value. The value dicts of the first dict map
611 from binary package name to "BinaryPackage" objects; the other second
612 value dicts map a package name to the packages providing them.
613 """
614 binaries: dict[str, dict[str, BinaryPackage]] = {}
615 provides_table: dict[str, dict[str, set[tuple[str, str]]]] = {}
616 basedir = suite.path
618 if self._components:
619 release_file = read_release_file(basedir)
620 listed_archs = set(release_file["Architectures"].split())
621 for arch in architectures:
622 packages: dict[str, BinaryPackage] = {}
623 if arch not in listed_archs: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true
624 self.logger.info(
625 "Skipping arch %s for %s: It is not listed in the Release file",
626 arch,
627 suite.name,
628 )
629 binaries[arch] = {}
630 provides_table[arch] = {}
631 continue
632 for component in self._components:
633 binary_dir = "binary-%s" % arch
634 filename = os.path.join(basedir, component, binary_dir, "Packages")
635 try:
636 filename = possibly_compressed(filename)
637 except FileNotFoundError:
638 if component == "non-free-firmware":
639 self.logger.info(
640 "Skipping %s as it doesn't exist", filename
641 )
642 continue
643 raise
644 udeb_filename = os.path.join(
645 basedir, component, "debian-installer", binary_dir, "Packages"
646 )
647 # We assume the udeb Packages file is present if the
648 # regular one is present
649 udeb_filename = possibly_compressed(udeb_filename)
650 self._read_packages_file(filename, arch, suite.sources, packages)
651 self._read_packages_file(
652 udeb_filename, arch, suite.sources, packages
653 )
654 # create provides
655 provides = create_provides_map(packages)
656 binaries[arch] = packages
657 provides_table[arch] = provides
658 else:
659 for arch in architectures:
660 filename = os.path.join(basedir, "Packages_%s" % arch)
661 packages = self._read_packages_file(filename, arch, suite.sources)
662 provides = create_provides_map(packages)
663 binaries[arch] = packages
664 provides_table[arch] = provides
666 return (binaries, provides_table)
668 def _merge_pkg_entries(
669 self,
670 package: str,
671 parch: str,
672 pkg_entry1: BinaryPackage,
673 pkg_entry2: BinaryPackage,
674 ) -> None:
675 bad = []
676 for f in self.CHECK_FIELDS:
677 v1 = getattr(pkg_entry1, f)
678 v2 = getattr(pkg_entry2, f)
679 if v1 != v2: # pragma: no cover
680 bad.append((f, v1, v2))
682 if bad: # pragma: no cover
683 self.logger.error(
684 "Mismatch found %s %s %s differs", package, pkg_entry1.version, parch
685 )
686 for f, v1, v2 in bad:
687 self.logger.info(" ... %s %s != %s", f, v1, v2)
688 raise ValueError("Inconsistent / Unsupported data set")
690 # Merge ESSENTIAL if necessary
691 assert pkg_entry1.is_essential or not pkg_entry2.is_essential