Coverage for britney2/inputs/suiteloader.py: 90%
269 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
1import logging
2import optparse
3import os
4import sys
5from abc import abstractmethod
6from collections.abc import Callable, Iterable
7from typing import Any, Literal, Optional, TypeVar, overload
9import apt_pkg
11from britney2 import (
12 BinaryPackage,
13 BinaryPackageId,
14 PackageId,
15 SourcePackage,
16 Suite,
17 SuiteClass,
18 Suites,
19 TargetSuite,
20)
21from britney2.utils import (
22 create_provides_map,
23 parse_builtusing,
24 parse_provides,
25 possibly_compressed,
26 read_release_file,
27 read_sources_file,
28)
31class MissingRequiredConfigurationError(RuntimeError):
32 pass
35_T = TypeVar("_T")
38class SuiteContentLoader:
39 def __init__(self, base_config: optparse.Values) -> None:
40 self._base_config = base_config
41 self._architectures: list[str] = SuiteContentLoader.config_str_as_list(
42 base_config.architectures
43 )
44 self._nobreakall_arches: list[str] = SuiteContentLoader.config_str_as_list(
45 base_config.nobreakall_arches, []
46 )
47 self._outofsync_arches: list[str] = SuiteContentLoader.config_str_as_list(
48 base_config.outofsync_arches, []
49 )
50 self._break_arches: list[str] = SuiteContentLoader.config_str_as_list(
51 base_config.break_arches, []
52 )
53 self._new_arches: list[str] = SuiteContentLoader.config_str_as_list(
54 base_config.new_arches, []
55 )
56 self._components: list[str] = []
57 self._all_binaries: dict[BinaryPackageId, BinaryPackage] = {}
58 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
59 self.logger = logging.getLogger(logger_name)
61 @overload
62 @staticmethod
63 def config_str_as_list(value: Literal[None], default_value: _T) -> _T: ... 63 ↛ exitline 63 didn't return from function 'config_str_as_list' because
65 @overload
66 @staticmethod
67 def config_str_as_list(value: str, default_value: Any) -> list[str]: ... 67 ↛ exitline 67 didn't return from function 'config_str_as_list' because
69 @overload
70 @staticmethod
71 def config_str_as_list(value: Any, default_value: Any | None = None) -> Any: ... 71 ↛ exitline 71 didn't return from function 'config_str_as_list' because
73 @staticmethod
74 def config_str_as_list(value: Any, default_value: Any | None = None) -> Any:
75 if value is None:
76 return default_value
77 if isinstance(value, str): 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true
78 return value.split()
79 return value
81 @property
82 def architectures(self) -> list[str]:
83 return self._architectures
85 @property
86 def nobreakall_arches(self) -> list[str]:
87 return self._nobreakall_arches
89 @property
90 def outofsync_arches(self) -> list[str]:
91 return self._outofsync_arches
93 @property
94 def break_arches(self) -> list[str]:
95 return self._break_arches
97 @property
98 def new_arches(self) -> list[str]:
99 return self._new_arches
101 @property
102 def components(self) -> list[str]:
103 return self._components
105 def all_binaries(self) -> dict[BinaryPackageId, BinaryPackage]:
106 return self._all_binaries
108 @abstractmethod
109 def load_suites(self) -> Suites: # pragma: no cover
110 pass
113class DebMirrorLikeSuiteContentLoader(SuiteContentLoader):
114 CHECK_FIELDS = [
115 "source",
116 "source_version",
117 "architecture",
118 "multi_arch",
119 "depends",
120 "conflicts",
121 "provides",
122 ]
124 def load_suites(self) -> Suites:
125 suites = []
126 missing_config_msg = (
127 "Configuration %s is not set in the config (and cannot be auto-detected)"
128 )
129 for suitename in ("testing", "unstable", "pu", "tpu"):
130 suffix = suitename if suitename in {"pu", "tpu"} else ""
131 if hasattr(self._base_config, suitename):
132 suite_path = getattr(self._base_config, suitename)
133 suite_class = SuiteClass.TARGET_SUITE
134 if suitename != "testing":
135 suite_class = (
136 SuiteClass.ADDITIONAL_SOURCE_SUITE
137 if suffix
138 else SuiteClass.PRIMARY_SOURCE_SUITE
139 )
140 suites.append(
141 Suite(
142 suite_class, suitename, suite_path, suite_short_name=suffix
143 )
144 )
145 else:
146 target_suite = TargetSuite(
147 suite_class, suitename, suite_path, suite_short_name=suffix
148 )
149 else:
150 if suitename in {"testing", "unstable"}: # pragma: no cover
151 self.logger.error(missing_config_msg, suitename.upper())
152 raise MissingRequiredConfigurationError(
153 missing_config_msg % suitename.upper()
154 )
155 self.logger.info(
156 "Optional suite %s is not defined (config option: %s) ",
157 suitename,
158 suitename.upper(),
159 )
161 assert target_suite is not None, "Logic regression, this should be impossible."
163 self._check_release_file(target_suite, missing_config_msg)
164 self._setup_architectures()
166 # read the source and binary packages for the involved distributions. Notes:
167 # - Load testing last as some live-data tests have more complete information in
168 # unstable
169 # - Load all sources before any of the binaries.
170 for suite in [target_suite, *suites]:
171 sources = self._read_sources(suite.path)
172 self._update_suite_name(suite)
173 suite.sources = sources
174 (suite.binaries, suite.provides_table) = self._read_binaries(
175 suite, self._architectures
176 )
177 self._fixup_faux_arch_all_binaries(suite)
179 return Suites(target_suite, suites)
181 def _fixup_faux_arch_all_binaries(self, suite: Suite) -> None:
182 """remove faux arch:all binary if a real arch:all binary is available
184 We don't know for which architectures bin/$something must be available
185 except for arch:all, which should be available in each arch. The
186 information that a source builds an arch:all binary is available during
187 the loading of the sources, but we have to pick an order in which to
188 load the files and the Sources is loaded before the Packages are
189 read. Hence we fake an arch:all binary during source loading, but it
190 shouldn't be there in the final list if real arch:all binaries are
191 present in the Packages file.
193 Also, if we keep the fake binary, it should be added to the lists of
194 known binaries in the suite, otherwise britney2 trips later on.
196 """
198 all_binaries = self._all_binaries
199 binaries = suite.binaries
200 faux_arches = (
201 set(self.architectures)
202 - set(self.break_arches)
203 - set(self.outofsync_arches)
204 - set(self.new_arches)
205 )
207 for srcpkg in suite.sources.values():
208 faux = {x for x in srcpkg.binaries if x[2] == "faux"}
209 if faux and [
210 x
211 for x in (srcpkg.binaries - faux)
212 if all_binaries[x].architecture == "all"
213 ]:
214 srcpkg.binaries -= faux
216 # Calculate again because we may have changed the set
217 faux = {x for x in srcpkg.binaries if x[2] == "faux"}
218 for binpkg_id in faux: 218 ↛ 219line 218 didn't jump to line 219 because the loop on line 218 never started
219 bin_data = BinaryPackage(
220 binpkg_id[1],
221 sys.intern("faux"),
222 srcpkg.source,
223 srcpkg.version,
224 "all",
225 "no",
226 None,
227 None,
228 [],
229 False,
230 binpkg_id,
231 [],
232 )
233 for arch_all in faux_arches:
234 binaries[arch_all][binpkg_id[0]] = bin_data
235 all_binaries[binpkg_id] = bin_data
236 suite.binaries = binaries
238 def _setup_architectures(self) -> None:
239 allarches = self._architectures
240 # Re-order the architectures such as that the most important architectures are listed first
241 # (this is to make the log easier to read as most important architectures will be listed
242 # first)
243 arches = [x for x in allarches if x in self._nobreakall_arches]
244 arches += [
245 x for x in allarches if x not in arches and x not in self._outofsync_arches
246 ]
247 arches += [
248 x for x in allarches if x not in arches and x not in self._break_arches
249 ]
250 arches += [
251 x for x in allarches if x not in arches and x not in self._new_arches
252 ]
253 arches += [x for x in allarches if x not in arches]
255 # Intern architectures for efficiency; items in this list will be used for lookups and
256 # building items/keys - by intern strings we reduce memory (considerably).
257 self._architectures = [sys.intern(arch) for arch in allarches]
258 assert "all" not in self._architectures, "all not allowed in architectures"
260 def _get_suite_name(
261 self, suite: Suite, release_file: "apt_pkg.TagSection[str]"
262 ) -> tuple[str, str]:
263 name = None
264 codename = None
265 if "Suite" in release_file: 265 ↛ 267line 265 didn't jump to line 267 because the condition on line 265 was always true
266 name = release_file["Suite"]
267 if "Codename" in release_file:
268 codename = release_file["Codename"]
270 if name is None: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true
271 name = codename
272 elif codename is None:
273 codename = name
275 if name is None: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 self.logger.warning(
277 'Either of the fields "Suite" or "Codename" '
278 + "should be present in a release file."
279 )
280 self.logger.error(
281 "Release file for suite %s is missing both the "
282 + '"Suite" and the "Codename" fields.',
283 suite.name,
284 )
285 raise KeyError("Suite")
287 assert codename is not None # required for type checking
288 return (name, codename)
290 def _update_suite_name(self, suite: Suite) -> None:
291 try:
292 release_file = read_release_file(suite.path)
293 except FileNotFoundError:
294 self.logger.info(
295 "The %s suite does not have a Release file, unable to update the name",
296 suite.name,
297 )
298 release_file = None
300 if release_file is not None:
301 (suite.name, suite.codename) = self._get_suite_name(suite, release_file)
302 self.logger.info("Using suite name from Release file: %s", suite.name)
303 self.logger.debug(
304 "Using suite codename from Release file: %s", suite.codename
305 )
307 def _check_release_file(self, target_suite: Suite, missing_config_msg: str) -> None:
308 try:
309 release_file = read_release_file(target_suite.path)
310 self.logger.info(
311 "Found a Release file in %s - using that for defaults",
312 target_suite.name,
313 )
314 except FileNotFoundError:
315 self.logger.info(
316 "The %s suite does not have a Release file.", target_suite.name
317 )
318 release_file = None
320 if release_file is not None:
321 self._components = release_file["Components"].split()
322 self.logger.info(
323 "Using components listed in Release file: %s",
324 " ".join(self._components),
325 )
327 if self._architectures is None:
328 if release_file is None: # pragma: no cover
329 self.logger.error(
330 "No configured architectures and there is no release file in the %s suite.",
331 target_suite.name,
332 )
333 self.logger.error(
334 'Please check if there is a "Release" file in %s', target_suite.path
335 )
336 self.logger.error(
337 'or if the config file contains a non-empty "ARCHITECTURES" field'
338 )
339 raise MissingRequiredConfigurationError(
340 missing_config_msg % "ARCHITECTURES"
341 )
342 self._architectures = sorted(
343 x for x in release_file["Architectures"].split() if x != "all"
344 )
345 self.logger.info(
346 "Using architectures listed in Release file: %s",
347 " ".join(self._architectures),
348 )
350 def _read_sources(self, basedir: str) -> dict[str, SourcePackage]:
351 """Read the list of source packages from the specified directory
353 The source packages are read from the `Sources' file within the
354 directory specified as `basedir' parameter. Considering the
355 large amount of memory needed, not all the fields are loaded
356 in memory. The available fields are Version, Maintainer and Section.
358 The method returns a list where every item represents a source
359 package as a dictionary.
360 """
362 if self._components:
363 sources: dict[str, SourcePackage] = {}
364 for component in self._components:
365 filename = os.path.join(basedir, component, "source", "Sources")
366 try:
367 filename = possibly_compressed(filename)
368 except FileNotFoundError:
369 if component == "non-free-firmware":
370 self.logger.info("Skipping %s as it doesn't exist", filename)
371 continue
372 raise
373 self.logger.info("Loading source packages from %s", filename)
374 read_sources_file(
375 filename,
376 sources,
377 not self._base_config.archall_inconsistency_allowed,
378 )
379 else:
380 filename = os.path.join(basedir, "Sources")
381 self.logger.info("Loading source packages from %s", filename)
382 sources = read_sources_file(
383 filename, None, not self._base_config.archall_inconsistency_allowed
384 )
386 return sources
388 @staticmethod
389 def merge_fields(
390 get_field: Callable[[str], str | None],
391 *field_names: str,
392 separator: str = ", ",
393 ) -> str | None:
394 """Merge two or more fields (filtering out empty fields; returning None if all are empty)"""
395 return separator.join(filter(None, (get_field(x) for x in field_names))) or None
397 def _read_packages_file(
398 self,
399 filename: str,
400 arch: str,
401 srcdist: dict[str, SourcePackage],
402 packages: dict[str, BinaryPackage] | None = None,
403 intern: Callable[[str], str] = sys.intern,
404 ) -> dict[str, BinaryPackage]:
405 self.logger.info("Loading binary packages from %s", filename)
407 if packages is None:
408 packages = {}
410 all_binaries = self._all_binaries
412 tag_file = apt_pkg.TagFile(filename)
413 get_field = tag_file.section.get
414 step = tag_file.step
416 while step():
417 pkg = get_field("Package")
418 version = get_field("Version")
420 # There may be multiple versions of any arch:all packages
421 # (in unstable) if some architectures have out-of-date
422 # binaries. We only ever consider the package with the
423 # largest version for migration.
424 pkg = intern(pkg)
425 version = intern(version)
426 pkg_id = BinaryPackageId(pkg, version, arch)
428 if pkg in packages:
429 old_pkg_data = packages[pkg]
430 if apt_pkg.version_compare(old_pkg_data.version, version) > 0:
431 continue
432 old_pkg_id = old_pkg_data.pkg_id
433 old_src_binaries = srcdist[old_pkg_data.source].binaries
434 old_src_binaries.remove(old_pkg_id)
435 # This may seem weird at first glance, but the current code rely
436 # on this behaviour to avoid issues like #709460. Admittedly it
437 # is a special case, but Britney will attempt to remove the
438 # arch:all packages without this. Even then, this particular
439 # stop-gap relies on the packages files being sorted by name
440 # and the version, so it is not particularly resilient.
441 if pkg_id not in old_src_binaries: 441 ↛ 447line 441 didn't jump to line 447 because the condition on line 441 was always true
442 old_src_binaries.add(pkg_id)
444 # Merge Pre-Depends with Depends and Conflicts with
445 # Breaks. Britney is not interested in the "finer
446 # semantic differences" of these fields anyway.
447 deps = DebMirrorLikeSuiteContentLoader.merge_fields(
448 get_field, "Pre-Depends", "Depends"
449 )
450 conflicts = DebMirrorLikeSuiteContentLoader.merge_fields(
451 get_field, "Conflicts", "Breaks"
452 )
454 ess = False
455 if get_field("Essential", "no") == "yes":
456 ess = True
458 source = pkg
459 source_version = version
460 # retrieve the name and the version of the source package
461 source_raw = get_field("Source")
462 if source_raw:
463 source = intern(source_raw.split(" ")[0])
464 if "(" in source_raw:
465 source_version = intern(
466 source_raw[source_raw.find("(") + 1 : source_raw.find(")")]
467 )
469 provides_raw = get_field("Provides")
470 if provides_raw:
471 provides = parse_provides(
472 provides_raw, pkg_id=pkg_id, logger=self.logger
473 )
474 else:
475 provides = []
477 raw_arch = intern(get_field("Architecture"))
478 if raw_arch not in {"all", arch}: # pragma: no cover
479 raise AssertionError(
480 "%s has wrong architecture (%s) - should be either %s or all"
481 % (str(pkg_id), raw_arch, arch)
482 )
484 builtusing_raw = get_field("Built-Using")
485 if builtusing_raw:
486 builtusing = parse_builtusing(
487 builtusing_raw, pkg_id=pkg_id, logger=self.logger
488 )
489 else:
490 builtusing = []
492 dpkg = BinaryPackage(
493 version,
494 intern(get_field("Section")),
495 source,
496 source_version,
497 raw_arch,
498 get_field("Multi-Arch"),
499 deps,
500 conflicts,
501 provides,
502 ess,
503 pkg_id,
504 builtusing,
505 )
507 # if the source package is available in the distribution, then register this binary package
508 if source in srcdist:
509 # There may be multiple versions of any arch:all packages
510 # (in unstable) if some architectures have out-of-date
511 # binaries. We only want to include the package in the
512 # source -> binary mapping once. It doesn't matter which
513 # of the versions we include as only the package name and
514 # architecture are recorded.
515 srcdist[source].binaries.add(pkg_id)
516 # if the source package doesn't exist, create a fake one
517 else:
518 srcdist[source] = SourcePackage(
519 source,
520 source_version,
521 "faux",
522 {pkg_id},
523 None,
524 True,
525 None,
526 None,
527 [],
528 [],
529 )
531 # add the resulting dictionary to the package list
532 packages[pkg] = dpkg
533 if pkg_id in all_binaries:
534 self._merge_pkg_entries(pkg, arch, all_binaries[pkg_id], dpkg)
535 else:
536 all_binaries[pkg_id] = dpkg
538 # add the resulting dictionary to the package list
539 packages[pkg] = dpkg
541 return packages
543 def _read_binaries(
544 self, suite: Suite, architectures: Iterable[str]
545 ) -> tuple[
546 dict[str, dict[str, BinaryPackage]], dict[str, dict[str, set[tuple[str, str]]]]
547 ]:
548 """Read the list of binary packages from the specified directory
550 This method reads all the binary packages for a given suite.
552 If the "components" config parameter is set, the directory should
553 be the "suite" directory of a local mirror (i.e. the one containing
554 the "Release" file). Otherwise, Britney will read the packages
555 information from all the "Packages_${arch}" files referenced by
556 the "architectures" parameter.
558 Considering the
559 large amount of memory needed, not all the fields are loaded
560 in memory. The available fields are Version, Source, Multi-Arch,
561 Depends, Conflicts, Provides and Architecture.
563 The `Provides' field is used to populate the virtual packages list.
565 The method returns a tuple of two dicts with architecture as key and
566 another dict as value. The value dicts of the first dict map
567 from binary package name to "BinaryPackage" objects; the other second
568 value dicts map a package name to the packages providing them.
569 """
570 binaries: dict[str, dict[str, BinaryPackage]] = {}
571 provides_table: dict[str, dict[str, set[tuple[str, str]]]] = {}
572 basedir = suite.path
574 if self._components:
575 release_file = read_release_file(basedir)
576 listed_archs = set(release_file["Architectures"].split())
577 for arch in architectures:
578 packages: dict[str, BinaryPackage] = {}
579 if arch not in listed_archs: 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true
580 self.logger.info(
581 "Skipping arch %s for %s: It is not listed in the Release file",
582 arch,
583 suite.name,
584 )
585 binaries[arch] = {}
586 provides_table[arch] = {}
587 continue
588 for component in self._components:
589 binary_dir = "binary-%s" % arch
590 filename = os.path.join(basedir, component, binary_dir, "Packages")
591 try:
592 filename = possibly_compressed(filename)
593 except FileNotFoundError:
594 if component == "non-free-firmware":
595 self.logger.info(
596 "Skipping %s as it doesn't exist", filename
597 )
598 continue
599 raise
600 udeb_filename = os.path.join(
601 basedir, component, "debian-installer", binary_dir, "Packages"
602 )
603 # We assume the udeb Packages file is present if the
604 # regular one is present
605 udeb_filename = possibly_compressed(udeb_filename)
606 self._read_packages_file(filename, arch, suite.sources, packages)
607 self._read_packages_file(
608 udeb_filename, arch, suite.sources, packages
609 )
610 # create provides
611 provides = create_provides_map(packages)
612 binaries[arch] = packages
613 provides_table[arch] = provides
614 else:
615 for arch in architectures:
616 filename = os.path.join(basedir, "Packages_%s" % arch)
617 packages = self._read_packages_file(filename, arch, suite.sources)
618 provides = create_provides_map(packages)
619 binaries[arch] = packages
620 provides_table[arch] = provides
622 return (binaries, provides_table)
624 def _merge_pkg_entries(
625 self,
626 package: str,
627 parch: str,
628 pkg_entry1: BinaryPackage,
629 pkg_entry2: BinaryPackage,
630 ) -> None:
631 bad = []
632 for f in self.CHECK_FIELDS:
633 v1 = getattr(pkg_entry1, f)
634 v2 = getattr(pkg_entry2, f)
635 if v1 != v2: # pragma: no cover
636 bad.append((f, v1, v2))
638 if bad: # pragma: no cover
639 self.logger.error(
640 "Mismatch found %s %s %s differs", package, pkg_entry1.version, parch
641 )
642 for f, v1, v2 in bad:
643 self.logger.info(" ... %s %s != %s", f, v1, v2)
644 raise ValueError("Inconsistent / Unsupported data set")
646 # Merge ESSENTIAL if necessary
647 assert pkg_entry1.is_essential or not pkg_entry2.is_essential