Coverage for britney2/inputs/suiteloader.py: 92%
289 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-08 19:15 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-08 19:15 +0000
1import logging
2import optparse
3import os
4import sys
5from abc import abstractmethod
6from collections.abc import Callable, Iterable
7from typing import Any, Literal, Optional, TypeVar, overload
9import apt_pkg
11from britney2 import (
12 BinaryPackage,
13 BinaryPackageId,
14 PackageId,
15 SourcePackage,
16 Suite,
17 SuiteClass,
18 Suites,
19 TargetSuite,
20)
21from britney2.utils import (
22 create_provides_map,
23 parse_builtusing,
24 parse_provides,
25 possibly_compressed,
26 read_release_file,
27 read_sources_file,
28)
31class MissingRequiredConfigurationError(RuntimeError):
32 pass
35_T = TypeVar("_T")
38class SuiteContentLoader:
39 def __init__(self, base_config: optparse.Values) -> None:
40 self._base_config = base_config
41 self._architectures: list[str] = SuiteContentLoader.config_str_as_list(
42 base_config.architectures
43 )
44 self._nobreakall_arches: list[str] = SuiteContentLoader.config_str_as_list(
45 base_config.nobreakall_arches, []
46 )
47 self._outofsync_arches: list[str] = SuiteContentLoader.config_str_as_list(
48 base_config.outofsync_arches, []
49 )
50 self._break_arches: list[str] = SuiteContentLoader.config_str_as_list(
51 base_config.break_arches, []
52 )
53 self._new_arches: list[str] = SuiteContentLoader.config_str_as_list(
54 base_config.new_arches, []
55 )
56 self._components: list[str] = []
57 self._all_binaries: dict[BinaryPackageId, BinaryPackage] = {}
58 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
59 self.logger = logging.getLogger(logger_name)
61 @overload
62 @staticmethod
63 def config_str_as_list(value: Literal[None], default_value: _T) -> _T: ... 63 ↛ exitline 63 didn't return from function 'config_str_as_list' because
65 @overload
66 @staticmethod
67 def config_str_as_list(value: str, default_value: Any) -> list[str]: ... 67 ↛ exitline 67 didn't return from function 'config_str_as_list' because
69 @overload
70 @staticmethod
71 def config_str_as_list(value: Any, default_value: Any | None = None) -> Any: ... 71 ↛ exitline 71 didn't return from function 'config_str_as_list' because
73 @staticmethod
74 def config_str_as_list(value: Any, default_value: Any | None = None) -> Any:
75 if value is None:
76 return default_value
77 if isinstance(value, str): 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true
78 return value.split()
79 return value
81 @property
82 def architectures(self) -> list[str]:
83 return self._architectures
85 @property
86 def nobreakall_arches(self) -> list[str]:
87 return self._nobreakall_arches
89 @property
90 def outofsync_arches(self) -> list[str]:
91 return self._outofsync_arches
93 @property
94 def break_arches(self) -> list[str]:
95 return self._break_arches
97 @property
98 def new_arches(self) -> list[str]:
99 return self._new_arches
101 @property
102 def components(self) -> list[str]:
103 return self._components
105 def all_binaries(self) -> dict[BinaryPackageId, BinaryPackage]:
106 return self._all_binaries
108 @abstractmethod
109 def load_suites(self) -> Suites: # pragma: no cover
110 pass
113class DebMirrorLikeSuiteContentLoader(SuiteContentLoader):
114 CHECK_FIELDS = [
115 "source",
116 "source_version",
117 "architecture",
118 "multi_arch",
119 "depends",
120 "conflicts",
121 "provides",
122 ]
124 def load_suites(self) -> Suites:
125 suites = []
126 missing_config_msg = (
127 "Configuration %s is not set in the config (and cannot be auto-detected)"
128 )
129 for suitename in ("testing", "unstable", "pu", "tpu"):
130 suffix = suitename if suitename in {"pu", "tpu"} else ""
131 if hasattr(self._base_config, suitename):
132 suite_path = getattr(self._base_config, suitename)
133 suite_class = SuiteClass.TARGET_SUITE
134 if suitename != "testing":
135 suite_class = (
136 SuiteClass.ADDITIONAL_SOURCE_SUITE
137 if suffix
138 else SuiteClass.PRIMARY_SOURCE_SUITE
139 )
140 suites.append(
141 Suite(
142 suite_class, suitename, suite_path, suite_short_name=suffix
143 )
144 )
145 else:
146 target_suite = TargetSuite(
147 suite_class, suitename, suite_path, suite_short_name=suffix
148 )
149 else:
150 if suitename in {"testing", "unstable"}: # pragma: no cover
151 self.logger.error(missing_config_msg, suitename.upper())
152 raise MissingRequiredConfigurationError(
153 missing_config_msg % suitename.upper()
154 )
155 self.logger.info(
156 "Optional suite %s is not defined (config option: %s) ",
157 suitename,
158 suitename.upper(),
159 )
161 assert target_suite is not None, "Logic regression, this should be impossible."
163 self._check_release_file(target_suite, missing_config_msg)
164 self._setup_architectures()
166 # read the source and binary packages for the involved distributions. Notes:
167 # - Load testing last as some live-data tests have more complete information in
168 # unstable
169 # - Load all sources before any of the binaries.
170 for suite in [target_suite, *suites]:
171 sources = self._read_sources(suite.path)
172 self._update_suite_name(suite)
173 suite.sources = sources
174 (suite.binaries, suite.provides_table) = self._read_binaries(
175 suite, self._architectures
176 )
177 self._fixup_faux_arch_all_binaries(suite)
178 if self._base_config.be_strict_with_build_deps: 178 ↛ 170line 178 didn't jump to line 170 because the condition on line 178 was always true
179 self._add_build_dep_faux_binaries(suite)
181 return Suites(target_suite, suites)
183 def _fixup_faux_arch_all_binaries(self, suite: Suite) -> None:
184 """remove faux arch:all binary if a real arch:all binary is available
186 We don't know for which architectures bin/$something must be available
187 except for arch:all, which should be available in each arch. The
188 information that a source builds an arch:all binary is available during
189 the loading of the sources, but we have to pick an order in which to
190 load the files and the Sources is loaded before the Packages are
191 read. Hence we fake an arch:all binary during source loading, but it
192 shouldn't be there in the final list if real arch:all binaries are
193 present in the Packages file.
195 Also, if we keep the fake binary, it should be added to the lists of
196 known binaries in the suite, otherwise britney2 trips later on.
198 """
200 all_binaries = self._all_binaries
201 binaries = suite.binaries
202 faux_arches = (
203 set(self.architectures)
204 - set(self.break_arches)
205 - set(self.outofsync_arches)
206 - set(self.new_arches)
207 )
209 for srcpkg in suite.sources.values():
210 faux = {x for x in srcpkg.binaries if x[2] == "faux"}
211 if faux and [
212 x
213 for x in (srcpkg.binaries - faux)
214 if all_binaries[x].architecture == "all"
215 ]:
216 srcpkg.binaries -= faux
218 # Calculate again because we may have changed the set
219 faux = {x for x in srcpkg.binaries if x[2] == "faux"}
220 for binpkg_id in faux:
221 bin_data = BinaryPackage(
222 binpkg_id[1],
223 sys.intern("faux"),
224 srcpkg.source,
225 srcpkg.version,
226 "all",
227 "no",
228 None,
229 None,
230 [],
231 False,
232 binpkg_id,
233 [],
234 )
235 for arch_all in faux_arches:
236 binaries[arch_all][binpkg_id[0]] = bin_data
237 all_binaries[binpkg_id] = bin_data
238 suite.binaries = binaries
240 def _add_build_dep_faux_binaries(self, suite: Suite) -> None:
241 """Add faux packages that keep track of build depends
243 To ensure that Build-Depends are fully protected against inappropriate
244 removal or upgrade, we add faux packages to source packages containing
245 the Build-Depends as Depends.
246 """
248 all_binaries = self._all_binaries
249 for src_name, src_pkg in suite.sources.items():
250 # TODO: something with arch, which one?
251 archall = self._nobreakall_arches[0]
252 bd_pid = BinaryPackageId(
253 src_name + "-faux-build-depends", sys.intern(src_pkg.version), archall
254 )
255 deps = ""
256 for bds in (src_pkg.build_deps_arch, src_pkg.build_deps_indep):
257 if bds is not None:
258 for block in apt_pkg.parse_src_depends(bds, architecture=archall):
259 # Like the buildds, we don't care about alternatives
260 deps += "," + block[0][0]
261 if block[0][1] != "":
262 # The extra space in the middle is a workaround for
263 # an apt_pkg bug in bookworm
264 deps += "(" + block[0][2] + " " + block[0][1] + ")"
265 deps = deps.strip(",")
266 if deps == "":
267 continue
268 dpkg = BinaryPackage(
269 bd_pid[1],
270 sys.intern("faux"),
271 src_name,
272 src_pkg.version,
273 archall,
274 None,
275 sys.intern(deps),
276 None,
277 [],
278 False,
279 bd_pid,
280 [],
281 )
282 suite.binaries.setdefault(archall, {})[bd_pid[0]] = dpkg
283 src_pkg.binaries.add(bd_pid)
284 all_binaries[bd_pid] = dpkg
286 def _setup_architectures(self) -> None:
287 allarches = self._architectures
288 # Re-order the architectures such as that the most important architectures are listed first
289 # (this is to make the log easier to read as most important architectures will be listed
290 # first)
291 arches = [x for x in allarches if x in self._nobreakall_arches]
292 arches += [
293 x for x in allarches if x not in arches and x not in self._outofsync_arches
294 ]
295 arches += [
296 x for x in allarches if x not in arches and x not in self._break_arches
297 ]
298 arches += [
299 x for x in allarches if x not in arches and x not in self._new_arches
300 ]
301 arches += [x for x in allarches if x not in arches]
303 # Intern architectures for efficiency; items in this list will be used for lookups and
304 # building items/keys - by intern strings we reduce memory (considerably).
305 self._architectures = [sys.intern(arch) for arch in allarches]
306 assert "all" not in self._architectures, "all not allowed in architectures"
308 def _get_suite_name(
309 self, suite: Suite, release_file: "apt_pkg.TagSection[str]"
310 ) -> tuple[str, str]:
311 name = None
312 codename = None
313 if "Suite" in release_file: 313 ↛ 315line 313 didn't jump to line 315 because the condition on line 313 was always true
314 name = release_file["Suite"]
315 if "Codename" in release_file:
316 codename = release_file["Codename"]
318 if name is None: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 name = codename
320 elif codename is None:
321 codename = name
323 if name is None: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true
324 self.logger.warning(
325 'Either of the fields "Suite" or "Codename" '
326 + "should be present in a release file."
327 )
328 self.logger.error(
329 "Release file for suite %s is missing both the "
330 + '"Suite" and the "Codename" fields.',
331 suite.name,
332 )
333 raise KeyError("Suite")
335 assert codename is not None # required for type checking
336 return (name, codename)
338 def _update_suite_name(self, suite: Suite) -> None:
339 try:
340 release_file = read_release_file(suite.path)
341 except FileNotFoundError:
342 self.logger.info(
343 "The %s suite does not have a Release file, unable to update the name",
344 suite.name,
345 )
346 release_file = None
348 if release_file is not None:
349 (suite.name, suite.codename) = self._get_suite_name(suite, release_file)
350 self.logger.info("Using suite name from Release file: %s", suite.name)
351 self.logger.debug(
352 "Using suite codename from Release file: %s", suite.codename
353 )
355 def _check_release_file(self, target_suite: Suite, missing_config_msg: str) -> None:
356 try:
357 release_file = read_release_file(target_suite.path)
358 self.logger.info(
359 "Found a Release file in %s - using that for defaults",
360 target_suite.name,
361 )
362 except FileNotFoundError:
363 self.logger.info(
364 "The %s suite does not have a Release file.", target_suite.name
365 )
366 release_file = None
368 if release_file is not None:
369 self._components = release_file["Components"].split()
370 self.logger.info(
371 "Using components listed in Release file: %s",
372 " ".join(self._components),
373 )
375 if self._architectures is None:
376 if release_file is None: # pragma: no cover
377 self.logger.error(
378 "No configured architectures and there is no release file in the %s suite.",
379 target_suite.name,
380 )
381 self.logger.error(
382 'Please check if there is a "Release" file in %s', target_suite.path
383 )
384 self.logger.error(
385 'or if the config file contains a non-empty "ARCHITECTURES" field'
386 )
387 raise MissingRequiredConfigurationError(
388 missing_config_msg % "ARCHITECTURES"
389 )
390 self._architectures = sorted(
391 x for x in release_file["Architectures"].split() if x != "all"
392 )
393 self.logger.info(
394 "Using architectures listed in Release file: %s",
395 " ".join(self._architectures),
396 )
398 def _read_sources(self, basedir: str) -> dict[str, SourcePackage]:
399 """Read the list of source packages from the specified directory
401 The source packages are read from the `Sources' file within the
402 directory specified as `basedir' parameter. Considering the
403 large amount of memory needed, not all the fields are loaded
404 in memory. The available fields are Version, Maintainer and Section.
406 The method returns a list where every item represents a source
407 package as a dictionary.
408 """
410 if self._components:
411 sources: dict[str, SourcePackage] = {}
412 for component in self._components:
413 filename = os.path.join(basedir, component, "source", "Sources")
414 try:
415 filename = possibly_compressed(filename)
416 except FileNotFoundError:
417 if component == "non-free-firmware":
418 self.logger.info("Skipping %s as it doesn't exist", filename)
419 continue
420 raise
421 self.logger.info("Loading source packages from %s", filename)
422 read_sources_file(
423 filename,
424 sources,
425 not self._base_config.archall_inconsistency_allowed,
426 )
427 else:
428 filename = os.path.join(basedir, "Sources")
429 self.logger.info("Loading source packages from %s", filename)
430 sources = read_sources_file(
431 filename, None, not self._base_config.archall_inconsistency_allowed
432 )
434 return sources
436 @staticmethod
437 def merge_fields(
438 get_field: Callable[[str], str | None],
439 *field_names: str,
440 separator: str = ", ",
441 ) -> str | None:
442 """Merge two or more fields (filtering out empty fields; returning None if all are empty)"""
443 return separator.join(filter(None, (get_field(x) for x in field_names))) or None
445 def _read_packages_file(
446 self,
447 filename: str,
448 arch: str,
449 srcdist: dict[str, SourcePackage],
450 packages: dict[str, BinaryPackage] | None = None,
451 intern: Callable[[str], str] = sys.intern,
452 ) -> dict[str, BinaryPackage]:
453 self.logger.info("Loading binary packages from %s", filename)
455 if packages is None:
456 packages = {}
458 all_binaries = self._all_binaries
460 tag_file = apt_pkg.TagFile(filename)
461 get_field = tag_file.section.get
462 step = tag_file.step
464 while step():
465 pkg = get_field("Package")
466 version = get_field("Version")
468 # There may be multiple versions of any arch:all packages
469 # (in unstable) if some architectures have out-of-date
470 # binaries. We only ever consider the package with the
471 # largest version for migration.
472 pkg = intern(pkg)
473 version = intern(version)
474 pkg_id = BinaryPackageId(pkg, version, arch)
476 if pkg in packages:
477 old_pkg_data = packages[pkg]
478 if apt_pkg.version_compare(old_pkg_data.version, version) > 0:
479 continue
480 old_pkg_id = old_pkg_data.pkg_id
481 old_src_binaries = srcdist[old_pkg_data.source].binaries
482 old_src_binaries.remove(old_pkg_id)
483 # This may seem weird at first glance, but the current code rely
484 # on this behaviour to avoid issues like #709460. Admittedly it
485 # is a special case, but Britney will attempt to remove the
486 # arch:all packages without this. Even then, this particular
487 # stop-gap relies on the packages files being sorted by name
488 # and the version, so it is not particularly resilient.
489 if pkg_id not in old_src_binaries: 489 ↛ 495line 489 didn't jump to line 495 because the condition on line 489 was always true
490 old_src_binaries.add(pkg_id)
492 # Merge Pre-Depends with Depends and Conflicts with
493 # Breaks. Britney is not interested in the "finer
494 # semantic differences" of these fields anyway.
495 deps = DebMirrorLikeSuiteContentLoader.merge_fields(
496 get_field, "Pre-Depends", "Depends"
497 )
498 conflicts = DebMirrorLikeSuiteContentLoader.merge_fields(
499 get_field, "Conflicts", "Breaks"
500 )
502 ess = False
503 if get_field("Essential", "no") == "yes":
504 ess = True
506 source = pkg
507 source_version = version
508 # retrieve the name and the version of the source package
509 source_raw = get_field("Source")
510 if source_raw:
511 source = intern(source_raw.split(" ")[0])
512 if "(" in source_raw:
513 source_version = intern(
514 source_raw[source_raw.find("(") + 1 : source_raw.find(")")]
515 )
517 provides_raw = get_field("Provides")
518 if provides_raw:
519 provides = parse_provides(
520 provides_raw, pkg_id=pkg_id, logger=self.logger
521 )
522 else:
523 provides = []
525 raw_arch = intern(get_field("Architecture"))
526 if raw_arch not in {"all", arch}: # pragma: no cover
527 raise AssertionError(
528 "%s has wrong architecture (%s) - should be either %s or all"
529 % (str(pkg_id), raw_arch, arch)
530 )
532 builtusing_raw = get_field("Built-Using")
533 if builtusing_raw:
534 builtusing = parse_builtusing(
535 builtusing_raw, pkg_id=pkg_id, logger=self.logger
536 )
537 else:
538 builtusing = []
540 dpkg = BinaryPackage(
541 version,
542 intern(get_field("Section")),
543 source,
544 source_version,
545 raw_arch,
546 get_field("Multi-Arch"),
547 deps,
548 conflicts,
549 provides,
550 ess,
551 pkg_id,
552 builtusing,
553 )
555 # if the source package is available in the distribution, then register this binary package
556 if source in srcdist:
557 # There may be multiple versions of any arch:all packages
558 # (in unstable) if some architectures have out-of-date
559 # binaries. We only want to include the package in the
560 # source -> binary mapping once. It doesn't matter which
561 # of the versions we include as only the package name and
562 # architecture are recorded.
563 srcdist[source].binaries.add(pkg_id)
564 # if the source package doesn't exist, create a fake one
565 else:
566 srcdist[source] = SourcePackage(
567 source,
568 source_version,
569 "faux",
570 {pkg_id},
571 None,
572 True,
573 None,
574 None,
575 [],
576 [],
577 )
579 # add the resulting dictionary to the package list
580 packages[pkg] = dpkg
581 if pkg_id in all_binaries:
582 self._merge_pkg_entries(pkg, arch, all_binaries[pkg_id], dpkg)
583 else:
584 all_binaries[pkg_id] = dpkg
586 return packages
588 def _read_binaries(
589 self, suite: Suite, architectures: Iterable[str]
590 ) -> tuple[
591 dict[str, dict[str, BinaryPackage]], dict[str, dict[str, set[tuple[str, str]]]]
592 ]:
593 """Read the list of binary packages from the specified directory
595 This method reads all the binary packages for a given suite.
597 If the "components" config parameter is set, the directory should
598 be the "suite" directory of a local mirror (i.e. the one containing
599 the "Release" file). Otherwise, Britney will read the packages
600 information from all the "Packages_${arch}" files referenced by
601 the "architectures" parameter.
603 Considering the
604 large amount of memory needed, not all the fields are loaded
605 in memory. The available fields are Version, Source, Multi-Arch,
606 Depends, Conflicts, Provides and Architecture.
608 The `Provides' field is used to populate the virtual packages list.
610 The method returns a tuple of two dicts with architecture as key and
611 another dict as value. The value dicts of the first dict map
612 from binary package name to "BinaryPackage" objects; the other second
613 value dicts map a package name to the packages providing them.
614 """
615 binaries: dict[str, dict[str, BinaryPackage]] = {}
616 provides_table: dict[str, dict[str, set[tuple[str, str]]]] = {}
617 basedir = suite.path
619 if self._components:
620 release_file = read_release_file(basedir)
621 listed_archs = set(release_file["Architectures"].split())
622 for arch in architectures:
623 packages: dict[str, BinaryPackage] = {}
624 if arch not in listed_archs: 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true
625 self.logger.info(
626 "Skipping arch %s for %s: It is not listed in the Release file",
627 arch,
628 suite.name,
629 )
630 binaries[arch] = {}
631 provides_table[arch] = {}
632 continue
633 for component in self._components:
634 binary_dir = "binary-%s" % arch
635 filename = os.path.join(basedir, component, binary_dir, "Packages")
636 try:
637 filename = possibly_compressed(filename)
638 except FileNotFoundError:
639 if component == "non-free-firmware":
640 self.logger.info(
641 "Skipping %s as it doesn't exist", filename
642 )
643 continue
644 raise
645 udeb_filename = os.path.join(
646 basedir, component, "debian-installer", binary_dir, "Packages"
647 )
648 # We assume the udeb Packages file is present if the
649 # regular one is present
650 udeb_filename = possibly_compressed(udeb_filename)
651 self._read_packages_file(filename, arch, suite.sources, packages)
652 self._read_packages_file(
653 udeb_filename, arch, suite.sources, packages
654 )
655 # create provides
656 provides = create_provides_map(packages)
657 binaries[arch] = packages
658 provides_table[arch] = provides
659 else:
660 for arch in architectures:
661 filename = os.path.join(basedir, "Packages_%s" % arch)
662 packages = self._read_packages_file(filename, arch, suite.sources)
663 provides = create_provides_map(packages)
664 binaries[arch] = packages
665 provides_table[arch] = provides
667 return (binaries, provides_table)
669 def _merge_pkg_entries(
670 self,
671 package: str,
672 parch: str,
673 pkg_entry1: BinaryPackage,
674 pkg_entry2: BinaryPackage,
675 ) -> None:
676 bad = []
677 for f in self.CHECK_FIELDS:
678 v1 = getattr(pkg_entry1, f)
679 v2 = getattr(pkg_entry2, f)
680 if v1 != v2: # pragma: no cover
681 bad.append((f, v1, v2))
683 if bad: # pragma: no cover
684 self.logger.error(
685 "Mismatch found %s %s %s differs", package, pkg_entry1.version, parch
686 )
687 for f, v1, v2 in bad:
688 self.logger.info(" ... %s %s != %s", f, v1, v2)
689 raise ValueError("Inconsistent / Unsupported data set")
691 # Merge ESSENTIAL if necessary
692 assert pkg_entry1.is_essential or not pkg_entry2.is_essential