Coverage for britney2/inputs/suiteloader.py: 92%
290 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
1import logging
2import optparse
3import os
4import sys
5from abc import abstractmethod
6from collections.abc import Callable, Iterable
7from itertools import chain
8from typing import Any, Literal, TypeVar, overload
10import apt_pkg
12from britney2 import (
13 BinaryPackage,
14 BinaryPackageId,
15 MultiArch,
16 SourcePackage,
17 Suite,
18 SuiteClass,
19 Suites,
20 TargetSuite,
21)
22from britney2.utils import (
23 create_provides_map,
24 parse_builtusing,
25 parse_provides,
26 possibly_compressed,
27 read_release_file,
28 read_sources_file,
29)
32class MissingRequiredConfigurationError(RuntimeError):
33 pass
36_T = TypeVar("_T")
39class SuiteContentLoader:
40 def __init__(self, base_config: optparse.Values) -> None:
41 self._base_config = base_config
42 self._architectures: list[str] = SuiteContentLoader.config_str_as_list(
43 base_config.architectures
44 )
45 self._nobreakall_arches: list[str] = SuiteContentLoader.config_str_as_list(
46 base_config.nobreakall_arches, []
47 )
48 self._outofsync_arches: list[str] = SuiteContentLoader.config_str_as_list(
49 base_config.outofsync_arches, []
50 )
51 self._break_arches: list[str] = SuiteContentLoader.config_str_as_list(
52 base_config.break_arches, []
53 )
54 self._new_arches: list[str] = SuiteContentLoader.config_str_as_list(
55 base_config.new_arches, []
56 )
57 self._components: list[str] = []
58 self._all_binaries: dict[BinaryPackageId, BinaryPackage] = {}
59 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
60 self.logger = logging.getLogger(logger_name)
62 @overload
63 @staticmethod
64 def config_str_as_list(value: Literal[None], default_value: _T) -> _T: ... 64 ↛ exitline 64 didn't return from function 'config_str_as_list' because
66 @overload
67 @staticmethod
68 def config_str_as_list(value: str, default_value: Any) -> list[str]: ... 68 ↛ exitline 68 didn't return from function 'config_str_as_list' because
70 @overload
71 @staticmethod
72 def config_str_as_list(value: Any, default_value: Any | None = None) -> Any: ... 72 ↛ exitline 72 didn't return from function 'config_str_as_list' because
74 @staticmethod
75 def config_str_as_list(value: Any, default_value: Any | None = None) -> Any:
76 if value is None:
77 return default_value
78 if isinstance(value, str): 78 ↛ 80line 78 didn't jump to line 80 because the condition on line 78 was always true
79 return value.split()
80 return value
82 @property
83 def architectures(self) -> list[str]:
84 return self._architectures
86 @property
87 def nobreakall_arches(self) -> list[str]:
88 return self._nobreakall_arches
90 @property
91 def outofsync_arches(self) -> list[str]:
92 return self._outofsync_arches
94 @property
95 def break_arches(self) -> list[str]:
96 return self._break_arches
98 @property
99 def new_arches(self) -> list[str]:
100 return self._new_arches
102 @property
103 def components(self) -> list[str]:
104 return self._components
106 def all_binaries(self) -> dict[BinaryPackageId, BinaryPackage]:
107 return self._all_binaries
109 @abstractmethod
110 def load_suites(self) -> Suites: # pragma: no cover
111 pass
114class DebMirrorLikeSuiteContentLoader(SuiteContentLoader):
115 CHECK_FIELDS = (
116 "source",
117 "source_version",
118 "architecture",
119 "multi_arch",
120 "depends",
121 "conflicts",
122 "provides",
123 )
125 def load_suites(self) -> Suites:
126 suites = []
127 missing_config_msg = (
128 "Configuration %s is not set in the config (and cannot be auto-detected)"
129 )
130 target_suite = None
131 for suitename in ("testing", "unstable", "pu", "tpu"):
132 suffix = suitename if suitename in ("pu", "tpu") else ""
133 if hasattr(self._base_config, suitename):
134 suite_path = getattr(self._base_config, suitename)
135 suite_class = SuiteClass.TARGET_SUITE
136 if suitename != "testing":
137 suite_class = (
138 SuiteClass.ADDITIONAL_SOURCE_SUITE
139 if suffix
140 else SuiteClass.PRIMARY_SOURCE_SUITE
141 )
142 suites.append(
143 Suite(
144 suite_class, suitename, suite_path, suite_short_name=suffix
145 )
146 )
147 else:
148 target_suite = TargetSuite(
149 suite_class, suitename, suite_path, suite_short_name=suffix
150 )
151 else:
152 if suitename in ("testing", "unstable"): # pragma: no cover
153 self.logger.error(missing_config_msg, suitename.upper())
154 raise MissingRequiredConfigurationError(
155 missing_config_msg % suitename.upper()
156 )
157 self.logger.info(
158 "Optional suite %s is not defined (config option: %s) ",
159 suitename,
160 suitename.upper(),
161 )
163 assert target_suite is not None, "Logic regression, this should be impossible."
165 self._check_release_file(target_suite, missing_config_msg)
166 self._setup_architectures()
168 # read the source and binary packages for the involved distributions. Notes:
169 # - Load testing last as some live-data tests have more complete information in
170 # unstable
171 # - Load all sources before any of the binaries.
172 for suite in chain((target_suite,), suites):
173 sources = self._read_sources(
174 suite.path, None if suite is target_suite else target_suite.sources
175 )
176 self._update_suite_name(suite)
177 suite.sources = sources
178 (suite.binaries, suite.provides_table) = self._read_binaries(
179 suite, self._architectures
180 )
181 self._fixup_faux_arch_all_binaries(suite)
182 if self._base_config.be_strict_with_build_deps: 182 ↛ 172line 182 didn't jump to line 172 because the condition on line 182 was always true
183 self._add_build_dep_faux_binaries(suite)
185 return Suites(target_suite, suites)
187 def _fixup_faux_arch_all_binaries(self, suite: Suite) -> None:
188 """remove faux arch:all binary if a real arch:all binary is available
190 We don't know for which architectures bin/$something must be available
191 except for arch:all, which should be available in each arch. The
192 information that a source builds an arch:all binary is available during
193 the loading of the sources, but we have to pick an order in which to
194 load the files and the Sources is loaded before the Packages are
195 read. Hence we fake an arch:all binary during source loading, but it
196 shouldn't be there in the final list if real arch:all binaries are
197 present in the Packages file.
199 Also, if we keep the fake binary, it should be added to the lists of
200 known binaries in the suite, otherwise britney2 trips later on.
202 """
204 all_binaries = self._all_binaries
205 binaries = suite.binaries
206 faux_arches = (
207 set(self.architectures)
208 - set(self.break_arches)
209 - set(self.outofsync_arches)
210 - set(self.new_arches)
211 )
213 for srcpkg in suite.sources.values():
214 faux = {x for x in srcpkg.binaries if x.architecture == "faux"}
215 if faux and any(
216 x
217 for x in (srcpkg.binaries - faux)
218 if all_binaries[x].architecture == "all"
219 ):
220 srcpkg.binaries -= faux
222 # Calculate again because we may have changed the set
223 faux = {x for x in srcpkg.binaries if x.architecture == "faux"}
224 for binpkg_id in faux:
225 bin_data = BinaryPackage(
226 sys.intern("faux"),
227 srcpkg.source,
228 srcpkg.version,
229 "all",
230 MultiArch.NO,
231 None,
232 None,
233 None,
234 False,
235 binpkg_id,
236 None,
237 )
238 for arch_all in faux_arches:
239 binaries[arch_all][binpkg_id.package_name] = bin_data
240 all_binaries[binpkg_id] = bin_data
241 suite.binaries = binaries
243 def _add_build_dep_faux_binaries(self, suite: Suite) -> None:
244 """Add faux packages that keep track of build depends
246 To ensure that Build-Depends are fully protected against inappropriate
247 removal or upgrade, we add faux packages to source packages containing
248 the Build-Depends as Depends.
249 """
251 all_binaries = self._all_binaries
252 for src_name, src_pkg in suite.sources.items():
253 # TODO: something with arch, which one?
254 archall = self._nobreakall_arches[0]
255 bd_pid = BinaryPackageId(
256 f"{src_name}-faux-build-depends", sys.intern(src_pkg.version), archall
257 )
258 deps = ""
259 for bds in (src_pkg.build_deps_arch, src_pkg.build_deps_indep):
260 if bds is not None:
261 for block in apt_pkg.parse_src_depends(bds, architecture=archall):
262 # Like the buildds, we don't care about alternatives
263 deps += "," + block[0][0]
264 if block[0][1] != "":
265 # The extra space in the middle is a workaround for
266 # an apt_pkg bug in bookworm
267 deps += "(" + block[0][2] + " " + block[0][1] + ")"
268 deps = deps.strip(",")
269 if deps == "":
270 continue
271 dpkg = BinaryPackage(
272 sys.intern("faux"),
273 src_name,
274 src_pkg.version,
275 archall,
276 MultiArch.NO,
277 sys.intern(deps),
278 None,
279 None,
280 False,
281 bd_pid,
282 None,
283 )
284 suite.binaries.setdefault(archall, {})[bd_pid.package_name] = dpkg
285 src_pkg.binaries.add(bd_pid)
286 all_binaries[bd_pid] = dpkg
288 def _setup_architectures(self) -> None:
289 allarches = self._architectures
290 # Re-order the architectures such as that the most important architectures are listed first
291 # (this is to make the log easier to read as most important architectures will be listed
292 # first)
293 arches = [x for x in allarches if x in self._nobreakall_arches]
294 arches += [
295 x for x in allarches if x not in arches and x not in self._outofsync_arches
296 ]
297 arches += [
298 x for x in allarches if x not in arches and x not in self._break_arches
299 ]
300 arches += [
301 x for x in allarches if x not in arches and x not in self._new_arches
302 ]
303 arches += [x for x in allarches if x not in arches]
305 # Intern architectures for efficiency; items in this list will be used for lookups and
306 # building items/keys - by intern strings we reduce memory (considerably).
307 self._architectures = [sys.intern(arch) for arch in allarches]
308 assert "all" not in self._architectures, "all not allowed in architectures"
310 def _get_suite_name(
311 self, suite: Suite, release_file: "apt_pkg.TagSection[str]"
312 ) -> tuple[str, str]:
313 name = None
314 codename = None
315 if "Suite" in release_file: 315 ↛ 317line 315 didn't jump to line 317 because the condition on line 315 was always true
316 name = release_file["Suite"]
317 if "Codename" in release_file:
318 codename = release_file["Codename"]
320 if name is None: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 name = codename
322 elif codename is None:
323 codename = name
325 if name is None: 325 ↛ 326line 325 didn't jump to line 326 because the condition on line 325 was never true
326 self.logger.warning(
327 'Either of the fields "Suite" or "Codename" '
328 + "should be present in a release file."
329 )
330 self.logger.error(
331 "Release file for suite %s is missing both the "
332 + '"Suite" and the "Codename" fields.',
333 suite.name,
334 )
335 raise KeyError("Suite")
337 assert codename is not None # required for type checking
338 return (name, codename)
340 def _update_suite_name(self, suite: Suite) -> None:
341 try:
342 release_file = read_release_file(suite.path)
343 except FileNotFoundError:
344 self.logger.info(
345 "The %s suite does not have a Release file, unable to update the name",
346 suite.name,
347 )
348 release_file = None
350 if release_file is not None:
351 (suite.name, suite.codename) = self._get_suite_name(suite, release_file)
352 self.logger.info("Using suite name from Release file: %s", suite.name)
353 self.logger.debug(
354 "Using suite codename from Release file: %s", suite.codename
355 )
357 def _check_release_file(self, target_suite: Suite, missing_config_msg: str) -> None:
358 try:
359 release_file = read_release_file(target_suite.path)
360 self.logger.info(
361 "Found a Release file in %s - using that for defaults",
362 target_suite.name,
363 )
364 except FileNotFoundError:
365 self.logger.info(
366 "The %s suite does not have a Release file.", target_suite.name
367 )
368 release_file = None
370 if release_file is not None:
371 self._components = release_file["Components"].split()
372 self.logger.info(
373 "Using components listed in Release file: %s",
374 " ".join(self._components),
375 )
377 if self._architectures is None:
378 if release_file is None: # pragma: no cover
379 self.logger.error(
380 "No configured architectures and there is no release file in the %s suite.",
381 target_suite.name,
382 )
383 self.logger.error(
384 'Please check if there is a "Release" file in %s', target_suite.path
385 )
386 self.logger.error(
387 'or if the config file contains a non-empty "ARCHITECTURES" field'
388 )
389 raise MissingRequiredConfigurationError(
390 missing_config_msg % "ARCHITECTURES"
391 )
392 self._architectures = sorted(
393 x for x in release_file["Architectures"].split() if x != "all"
394 )
395 self.logger.info(
396 "Using architectures listed in Release file: %s",
397 " ".join(self._architectures),
398 )
400 def _read_sources(
401 self, basedir: str, sources_target_suite: dict[str, SourcePackage] | None = None
402 ) -> dict[str, SourcePackage]:
403 """Read the list of source packages from the specified directory
405 The source packages are read from the `Sources' file within the
406 directory specified as `basedir' parameter. Considering the
407 large amount of memory needed, not all the fields are loaded
408 in memory. The available fields are Version, Maintainer and Section.
410 The method returns a list where every item represents a source
411 package as a dictionary.
412 """
414 if self._components:
415 sources: dict[str, SourcePackage] = {}
416 for component in self._components:
417 filename = os.path.join(basedir, component, "source", "Sources")
418 try:
419 filename = possibly_compressed(filename)
420 except FileNotFoundError:
421 if component == "non-free-firmware":
422 self.logger.info("Skipping %s as it doesn't exist", filename)
423 continue
424 raise
425 self.logger.info("Loading source packages from %s", filename)
426 read_sources_file(
427 filename,
428 sources,
429 not self._base_config.archall_inconsistency_allowed,
430 sources_target_suite,
431 )
432 else:
433 filename = os.path.join(basedir, "Sources")
434 self.logger.info("Loading source packages from %s", filename)
435 sources = read_sources_file(
436 filename,
437 None,
438 not self._base_config.archall_inconsistency_allowed,
439 sources_target_suite,
440 )
442 return sources
444 @staticmethod
445 def merge_fields(
446 get_field: Callable[[str], str | None],
447 *field_names: str,
448 separator: str = ", ",
449 ) -> str | None:
450 """Merge two or more fields (filtering out empty fields; returning None if all are empty)"""
451 return separator.join(filter(None, (get_field(x) for x in field_names))) or None
453 def _read_packages_file(
454 self,
455 filename: str,
456 arch: str,
457 srcdist: dict[str, SourcePackage],
458 packages: dict[str, BinaryPackage] | None = None,
459 intern: Callable[[str], str] = sys.intern,
460 ) -> dict[str, BinaryPackage]:
461 self.logger.info("Loading binary packages from %s", filename)
463 if packages is None:
464 packages = {}
466 all_binaries = self._all_binaries
468 tag_file = apt_pkg.TagFile(filename)
469 get_field = tag_file.section.get
470 step = tag_file.step
472 while step():
473 pkg = get_field("Package")
474 version = get_field("Version")
476 # There may be multiple versions of any arch:all packages
477 # (in unstable) if some architectures have out-of-date
478 # binaries. We only ever consider the package with the
479 # largest version for migration.
480 pkg = intern(pkg)
481 version = intern(version)
482 pkg_id = BinaryPackageId(pkg, version, arch)
484 if pkg in packages:
485 old_pkg_data = packages[pkg]
486 if apt_pkg.version_compare(old_pkg_data.version, version) > 0:
487 continue
488 old_pkg_id = old_pkg_data.pkg_id
489 old_src_binaries = srcdist[old_pkg_data.source].binaries
490 old_src_binaries.remove(old_pkg_id)
491 # This may seem weird at first glance, but the current code rely
492 # on this behaviour to avoid issues like #709460. Admittedly it
493 # is a special case, but Britney will attempt to remove the
494 # arch:all packages without this. Even then, this particular
495 # stop-gap relies on the packages files being sorted by name
496 # and the version, so it is not particularly resilient.
497 if pkg_id not in old_src_binaries: 497 ↛ 503line 497 didn't jump to line 503 because the condition on line 497 was always true
498 old_src_binaries.add(pkg_id)
500 # Merge Pre-Depends with Depends and Conflicts with
501 # Breaks. Britney is not interested in the "finer
502 # semantic differences" of these fields anyway.
503 deps = DebMirrorLikeSuiteContentLoader.merge_fields(
504 get_field, "Pre-Depends", "Depends"
505 )
506 conflicts = DebMirrorLikeSuiteContentLoader.merge_fields(
507 get_field, "Conflicts", "Breaks"
508 )
510 ess = get_field("Essential", "no") == "yes"
512 source = pkg
513 source_version = version
514 # retrieve the name and the version of the source package
515 source_raw = get_field("Source")
516 if source_raw:
517 source = intern(source_raw.split(" ")[0])
518 if "(" in source_raw:
519 source_version = intern(
520 source_raw[source_raw.find("(") + 1 : source_raw.find(")")]
521 )
523 provides_raw = get_field("Provides")
524 if provides_raw:
525 provides = parse_provides(
526 provides_raw, pkg_id=pkg_id, logger=self.logger
527 )
528 else:
529 provides = None
531 raw_arch = intern(get_field("Architecture"))
532 if raw_arch not in {"all", arch}: # pragma: no cover
533 raise AssertionError(
534 "%s has wrong architecture (%s) - should be either %s or all"
535 % (str(pkg_id), raw_arch, arch)
536 )
538 builtusing_raw = get_field("Built-Using")
539 if builtusing_raw:
540 builtusing = parse_builtusing(
541 builtusing_raw, pkg_id=pkg_id, logger=self.logger
542 )
543 else:
544 builtusing = None
546 dpkg = BinaryPackage(
547 intern(get_field("Section")),
548 source,
549 source_version,
550 raw_arch,
551 MultiArch.from_str(get_field("Multi-Arch")),
552 deps,
553 conflicts,
554 provides,
555 ess,
556 pkg_id,
557 builtusing,
558 )
560 # if the source package is available in the distribution, then register this binary package
561 if source in srcdist:
562 # There may be multiple versions of any arch:all packages
563 # (in unstable) if some architectures have out-of-date
564 # binaries. We only want to include the package in the
565 # source -> binary mapping once. It doesn't matter which
566 # of the versions we include as only the package name and
567 # architecture are recorded.
568 srcdist[source].binaries.add(pkg_id)
569 # if the source package doesn't exist, create a fake one
570 else:
571 srcdist[source] = SourcePackage(
572 source,
573 source_version,
574 "faux",
575 {pkg_id},
576 None,
577 True,
578 None,
579 None,
580 [],
581 [],
582 )
584 # add the resulting dictionary to the package list
585 if pkg_id in all_binaries:
586 # If the binary package is the same in across suites, we reuse
587 # existing BinaryPackage instances to reduce memory usage.
588 self._merge_pkg_entries(pkg, arch, all_binaries[pkg_id], dpkg)
589 packages[pkg] = all_binaries[pkg_id]
590 else:
591 packages[pkg] = dpkg
592 all_binaries[pkg_id] = dpkg
594 return packages
596 def _read_binaries(
597 self, suite: Suite, architectures: Iterable[str]
598 ) -> tuple[
599 dict[str, dict[str, BinaryPackage]], dict[str, dict[str, set[tuple[str, str]]]]
600 ]:
601 """Read the list of binary packages from the specified directory
603 This method reads all the binary packages for a given suite.
605 If the "components" config parameter is set, the directory should
606 be the "suite" directory of a local mirror (i.e. the one containing
607 the "Release" file). Otherwise, Britney will read the packages
608 information from all the "Packages_${arch}" files referenced by
609 the "architectures" parameter.
611 Considering the
612 large amount of memory needed, not all the fields are loaded
613 in memory. The available fields are Version, Source, Multi-Arch,
614 Depends, Conflicts, Provides and Architecture.
616 The `Provides' field is used to populate the virtual packages list.
618 The method returns a tuple of two dicts with architecture as key and
619 another dict as value. The value dicts of the first dict map
620 from binary package name to "BinaryPackage" objects; the other second
621 value dicts map a package name to the packages providing them.
622 """
623 binaries: dict[str, dict[str, BinaryPackage]] = {}
624 provides_table: dict[str, dict[str, set[tuple[str, str]]]] = {}
625 basedir = suite.path
627 if self._components:
628 release_file = read_release_file(basedir)
629 listed_archs = set(release_file["Architectures"].split())
630 for arch in architectures:
631 packages: dict[str, BinaryPackage] = {}
632 if arch not in listed_archs: 632 ↛ 633line 632 didn't jump to line 633 because the condition on line 632 was never true
633 self.logger.info(
634 "Skipping arch %s for %s: It is not listed in the Release file",
635 arch,
636 suite.name,
637 )
638 binaries[arch] = {}
639 provides_table[arch] = {}
640 continue
641 for component in self._components:
642 binary_dir = "binary-%s" % arch
643 filename = os.path.join(basedir, component, binary_dir, "Packages")
644 try:
645 filename = possibly_compressed(filename)
646 except FileNotFoundError:
647 if component == "non-free-firmware":
648 self.logger.info(
649 "Skipping %s as it doesn't exist", filename
650 )
651 continue
652 raise
653 udeb_filename = os.path.join(
654 basedir, component, "debian-installer", binary_dir, "Packages"
655 )
656 # We assume the udeb Packages file is present if the
657 # regular one is present
658 udeb_filename = possibly_compressed(udeb_filename)
659 self._read_packages_file(filename, arch, suite.sources, packages)
660 self._read_packages_file(
661 udeb_filename, arch, suite.sources, packages
662 )
663 # create provides
664 provides = create_provides_map(packages)
665 binaries[arch] = packages
666 provides_table[arch] = provides
667 else:
668 for arch in architectures:
669 filename = os.path.join(basedir, f"Packages_{arch}")
670 packages = self._read_packages_file(filename, arch, suite.sources)
671 provides = create_provides_map(packages)
672 binaries[arch] = packages
673 provides_table[arch] = provides
675 return (binaries, provides_table)
677 def _merge_pkg_entries(
678 self,
679 package: str,
680 parch: str,
681 pkg_entry1: BinaryPackage,
682 pkg_entry2: BinaryPackage,
683 ) -> None:
684 bad = []
685 for f in self.CHECK_FIELDS:
686 v1 = getattr(pkg_entry1, f)
687 v2 = getattr(pkg_entry2, f)
688 if v1 != v2: # pragma: no cover
689 bad.append((f, v1, v2))
691 if bad: # pragma: no cover
692 self.logger.error(
693 "Mismatch found %s %s %s differs", package, pkg_entry1.version, parch
694 )
695 for f, v1, v2 in bad:
696 self.logger.info(" ... %s %s != %s", f, v1, v2)
697 raise ValueError("Inconsistent / Unsupported data set")
699 # Merge ESSENTIAL if necessary
700 assert pkg_entry1.is_essential or not pkg_entry2.is_essential