Coverage for britney2/inputs/suiteloader.py: 92%
271 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
1import logging
2import optparse
3import os
4import sys
5from abc import abstractmethod
6from collections.abc import Callable
7from typing import Any, Literal, Optional, TypeVar, overload
8from collections.abc import Iterable
10import apt_pkg
12from britney2 import (
13 BinaryPackage,
14 BinaryPackageId,
15 PackageId,
16 SourcePackage,
17 Suite,
18 SuiteClass,
19 Suites,
20 TargetSuite,
21)
22from britney2.utils import (
23 create_provides_map,
24 parse_builtusing,
25 parse_provides,
26 possibly_compressed,
27 read_release_file,
28 read_sources_file,
29)
32class MissingRequiredConfigurationError(RuntimeError):
33 pass
36_T = TypeVar("_T")
39class SuiteContentLoader(object):
40 def __init__(self, base_config: optparse.Values) -> None:
41 self._base_config = base_config
42 self._architectures: list[str] = SuiteContentLoader.config_str_as_list(
43 base_config.architectures
44 )
45 self._nobreakall_arches: list[str] = SuiteContentLoader.config_str_as_list(
46 base_config.nobreakall_arches, []
47 )
48 self._outofsync_arches: list[str] = SuiteContentLoader.config_str_as_list(
49 base_config.outofsync_arches, []
50 )
51 self._break_arches: list[str] = SuiteContentLoader.config_str_as_list(
52 base_config.break_arches, []
53 )
54 self._new_arches: list[str] = SuiteContentLoader.config_str_as_list(
55 base_config.new_arches, []
56 )
57 self._components: list[str] = []
58 self._all_binaries: dict[BinaryPackageId, BinaryPackage] = {}
59 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
60 self.logger = logging.getLogger(logger_name)
62 @overload
63 @staticmethod
64 def config_str_as_list(value: Literal[None], default_value: _T) -> _T: ... 64 ↛ exitline 64 didn't return from function 'config_str_as_list'
66 @overload
67 @staticmethod
68 def config_str_as_list(value: str, default_value: Any) -> list[str]: ... 68 ↛ exitline 68 didn't return from function 'config_str_as_list'
70 @overload
71 @staticmethod
72 def config_str_as_list(value: Any, default_value: Optional[Any] = None) -> Any: ... 72 ↛ exitline 72 didn't return from function 'config_str_as_list'
74 @staticmethod
75 def config_str_as_list(value: Any, default_value: Optional[Any] = None) -> Any:
76 if value is None:
77 return default_value
78 if isinstance(value, str): 78 ↛ 80line 78 didn't jump to line 80, because the condition on line 78 was never false
79 return value.split()
80 return value
82 @property
83 def architectures(self) -> list[str]:
84 return self._architectures
86 @property
87 def nobreakall_arches(self) -> list[str]:
88 return self._nobreakall_arches
90 @property
91 def outofsync_arches(self) -> list[str]:
92 return self._outofsync_arches
94 @property
95 def break_arches(self) -> list[str]:
96 return self._break_arches
98 @property
99 def new_arches(self) -> list[str]:
100 return self._new_arches
102 @property
103 def components(self) -> list[str]:
104 return self._components
106 def all_binaries(self) -> dict[BinaryPackageId, BinaryPackage]:
107 return self._all_binaries
109 @abstractmethod
110 def load_suites(self) -> Suites: # pragma: no cover
111 pass
114class DebMirrorLikeSuiteContentLoader(SuiteContentLoader):
115 CHECK_FIELDS = [
116 "source",
117 "source_version",
118 "architecture",
119 "multi_arch",
120 "depends",
121 "conflicts",
122 "provides",
123 ]
125 def load_suites(self) -> Suites:
126 suites = []
127 missing_config_msg = (
128 "Configuration %s is not set in the config (and cannot be auto-detected)"
129 )
130 for suitename in ("testing", "unstable", "pu", "tpu"):
131 suffix = suitename if suitename in {"pu", "tpu"} else ""
132 if hasattr(self._base_config, suitename):
133 suite_path = getattr(self._base_config, suitename)
134 suite_class = SuiteClass.TARGET_SUITE
135 if suitename != "testing":
136 suite_class = (
137 SuiteClass.ADDITIONAL_SOURCE_SUITE
138 if suffix
139 else SuiteClass.PRIMARY_SOURCE_SUITE
140 )
141 suites.append(
142 Suite(
143 suite_class, suitename, suite_path, suite_short_name=suffix
144 )
145 )
146 else:
147 target_suite = TargetSuite(
148 suite_class, suitename, suite_path, suite_short_name=suffix
149 )
150 else:
151 if suitename in {"testing", "unstable"}: # pragma: no cover
152 self.logger.error(missing_config_msg, suitename.upper())
153 raise MissingRequiredConfigurationError(
154 missing_config_msg % suitename.upper()
155 )
156 self.logger.info(
157 "Optional suite %s is not defined (config option: %s) ",
158 suitename,
159 suitename.upper(),
160 )
162 assert target_suite is not None, "Logic regression, this should be impossible."
164 self._check_release_file(target_suite, missing_config_msg)
165 self._setup_architectures()
167 # read the source and binary packages for the involved distributions. Notes:
168 # - Load testing last as some live-data tests have more complete information in
169 # unstable
170 # - Load all sources before any of the binaries.
171 for suite in [target_suite, *suites]:
172 sources = self._read_sources(suite.path)
173 self._update_suite_name(suite)
174 suite.sources = sources
175 (suite.binaries, suite.provides_table) = self._read_binaries(
176 suite, self._architectures
177 )
178 self._fixup_faux_arch_all_binaries(suite)
180 return Suites(target_suite, suites)
182 def _fixup_faux_arch_all_binaries(self, suite: Suite) -> None:
183 """remove faux arch:all binary if a real arch:all binary is available
185 We don't know for which architectures bin/$something must be available
186 except for arch:all, which should be available in each arch. The
187 information that a source builds an arch:all binary is available during
188 the loading of the sources, but we have to pick an order in which to
189 load the files and the Sources is loaded before the Packages are
190 read. Hence we fake an arch:all binary during source loading, but it
191 shouldn't be there in the final list if real arch:all binaries are
192 present in the Packages file.
194 Also, if we keep the fake binary, it should be added to the lists of
195 known binaries in the suite, otherwise britney2 trips later on.
197 """
199 all_binaries = self._all_binaries
200 binaries = suite.binaries
201 faux_arches = (
202 set(self.architectures)
203 - set(self.break_arches)
204 - set(self.outofsync_arches)
205 - set(self.new_arches)
206 )
208 for srcpkg in suite.sources.values():
209 faux = {x for x in srcpkg.binaries if x[2] == "faux"}
210 if faux and [
211 x
212 for x in (srcpkg.binaries - faux)
213 if all_binaries[x].architecture == "all"
214 ]:
215 srcpkg.binaries -= faux
217 # Calculate again because we may have changed the set
218 faux = {x for x in srcpkg.binaries if x[2] == "faux"}
219 for binpkg_id in faux:
220 bin_data = BinaryPackage(
221 binpkg_id[1],
222 sys.intern("faux"),
223 srcpkg.source,
224 srcpkg.version,
225 "all",
226 "no",
227 None,
228 None,
229 [],
230 False,
231 binpkg_id,
232 [],
233 )
234 for arch_all in faux_arches:
235 binaries[arch_all][binpkg_id[0]] = bin_data
236 all_binaries[binpkg_id] = bin_data
237 suite.binaries = binaries
239 def _setup_architectures(self) -> None:
240 allarches = self._architectures
241 # Re-order the architectures such as that the most important architectures are listed first
242 # (this is to make the log easier to read as most important architectures will be listed
243 # first)
244 arches = [x for x in allarches if x in self._nobreakall_arches]
245 arches += [
246 x for x in allarches if x not in arches and x not in self._outofsync_arches
247 ]
248 arches += [
249 x for x in allarches if x not in arches and x not in self._break_arches
250 ]
251 arches += [
252 x for x in allarches if x not in arches and x not in self._new_arches
253 ]
254 arches += [x for x in allarches if x not in arches]
256 # Intern architectures for efficiency; items in this list will be used for lookups and
257 # building items/keys - by intern strings we reduce memory (considerably).
258 self._architectures = [sys.intern(arch) for arch in allarches]
259 assert "all" not in self._architectures, "all not allowed in architectures"
261 def _get_suite_name(
262 self, suite: Suite, release_file: "apt_pkg.TagSection[str]"
263 ) -> tuple[str, str]:
264 name = None
265 codename = None
266 if "Suite" in release_file: 266 ↛ 268line 266 didn't jump to line 268, because the condition on line 266 was never false
267 name = release_file["Suite"]
268 if "Codename" in release_file:
269 codename = release_file["Codename"]
271 if name is None: 271 ↛ 272line 271 didn't jump to line 272, because the condition on line 271 was never true
272 name = codename
273 elif codename is None:
274 codename = name
276 if name is None: 276 ↛ 277line 276 didn't jump to line 277, because the condition on line 276 was never true
277 self.logger.warning(
278 'Either of the fields "Suite" or "Codename" '
279 + "should be present in a release file."
280 )
281 self.logger.error(
282 "Release file for suite %s is missing both the "
283 + '"Suite" and the "Codename" fields.',
284 suite.name,
285 )
286 raise KeyError("Suite")
288 assert codename is not None # required for type checking
289 return (name, codename)
291 def _update_suite_name(self, suite: Suite) -> None:
292 try:
293 release_file = read_release_file(suite.path)
294 except FileNotFoundError:
295 self.logger.info(
296 "The %s suite does not have a Release file, unable to update the name",
297 suite.name,
298 )
299 release_file = None
301 if release_file is not None:
302 (suite.name, suite.codename) = self._get_suite_name(suite, release_file)
303 self.logger.info("Using suite name from Release file: %s", suite.name)
304 self.logger.debug(
305 "Using suite codename from Release file: %s", suite.codename
306 )
308 def _check_release_file(self, target_suite: Suite, missing_config_msg: str) -> None:
309 try:
310 release_file = read_release_file(target_suite.path)
311 self.logger.info(
312 "Found a Release file in %s - using that for defaults",
313 target_suite.name,
314 )
315 except FileNotFoundError:
316 self.logger.info(
317 "The %s suite does not have a Release file.", target_suite.name
318 )
319 release_file = None
321 if release_file is not None:
322 self._components = release_file["Components"].split()
323 self.logger.info(
324 "Using components listed in Release file: %s",
325 " ".join(self._components),
326 )
328 if self._architectures is None:
329 if release_file is None: # pragma: no cover
330 self.logger.error(
331 "No configured architectures and there is no release file in the %s suite.",
332 target_suite.name,
333 )
334 self.logger.error(
335 'Please check if there is a "Release" file in %s', target_suite.path
336 )
337 self.logger.error(
338 'or if the config file contains a non-empty "ARCHITECTURES" field'
339 )
340 raise MissingRequiredConfigurationError(
341 missing_config_msg % "ARCHITECTURES"
342 )
343 self._architectures = sorted(
344 x for x in release_file["Architectures"].split() if x != "all"
345 )
346 self.logger.info(
347 "Using architectures listed in Release file: %s",
348 " ".join(self._architectures),
349 )
351 def _read_sources(self, basedir: str) -> dict[str, SourcePackage]:
352 """Read the list of source packages from the specified directory
354 The source packages are read from the `Sources' file within the
355 directory specified as `basedir' parameter. Considering the
356 large amount of memory needed, not all the fields are loaded
357 in memory. The available fields are Version, Maintainer and Section.
359 The method returns a list where every item represents a source
360 package as a dictionary.
361 """
363 if self._components:
364 sources: dict[str, SourcePackage] = {}
365 for component in self._components:
366 filename = os.path.join(basedir, component, "source", "Sources")
367 try:
368 filename = possibly_compressed(filename)
369 except FileNotFoundError:
370 if component == "non-free-firmware":
371 self.logger.info("Skipping %s as it doesn't exist", filename)
372 continue
373 raise
374 self.logger.info("Loading source packages from %s", filename)
375 read_sources_file(
376 filename,
377 sources,
378 not self._base_config.archall_inconsistency_allowed,
379 )
380 else:
381 filename = os.path.join(basedir, "Sources")
382 self.logger.info("Loading source packages from %s", filename)
383 sources = read_sources_file(
384 filename, None, not self._base_config.archall_inconsistency_allowed
385 )
387 return sources
389 @staticmethod
390 def merge_fields(
391 get_field: Callable[[str], Optional[str]],
392 *field_names: str,
393 separator: str = ", ",
394 ) -> Optional[str]:
395 """Merge two or more fields (filtering out empty fields; returning None if all are empty)"""
396 return separator.join(filter(None, (get_field(x) for x in field_names))) or None
398 def _read_packages_file(
399 self,
400 filename: str,
401 arch: str,
402 srcdist: dict[str, SourcePackage],
403 packages: Optional[dict[str, BinaryPackage]] = None,
404 intern: Callable[[str], str] = sys.intern,
405 ) -> dict[str, BinaryPackage]:
406 self.logger.info("Loading binary packages from %s", filename)
408 if packages is None:
409 packages = {}
411 all_binaries = self._all_binaries
413 tag_file = apt_pkg.TagFile(filename)
414 get_field = tag_file.section.get
415 step = tag_file.step
417 while step():
418 pkg = get_field("Package")
419 version = get_field("Version")
421 # There may be multiple versions of any arch:all packages
422 # (in unstable) if some architectures have out-of-date
423 # binaries. We only ever consider the package with the
424 # largest version for migration.
425 pkg = intern(pkg)
426 version = intern(version)
427 pkg_id = BinaryPackageId(pkg, version, arch)
429 if pkg in packages:
430 old_pkg_data = packages[pkg]
431 if apt_pkg.version_compare(old_pkg_data.version, version) > 0:
432 continue
433 old_pkg_id = old_pkg_data.pkg_id
434 old_src_binaries = srcdist[old_pkg_data.source].binaries
435 old_src_binaries.remove(old_pkg_id)
436 # This may seem weird at first glance, but the current code rely
437 # on this behaviour to avoid issues like #709460. Admittedly it
438 # is a special case, but Britney will attempt to remove the
439 # arch:all packages without this. Even then, this particular
440 # stop-gap relies on the packages files being sorted by name
441 # and the version, so it is not particularly resilient.
442 if pkg_id not in old_src_binaries: 442 ↛ 448line 442 didn't jump to line 448, because the condition on line 442 was never false
443 old_src_binaries.add(pkg_id)
445 # Merge Pre-Depends with Depends and Conflicts with
446 # Breaks. Britney is not interested in the "finer
447 # semantic differences" of these fields anyway.
448 deps = DebMirrorLikeSuiteContentLoader.merge_fields(
449 get_field, "Pre-Depends", "Depends"
450 )
451 conflicts = DebMirrorLikeSuiteContentLoader.merge_fields(
452 get_field, "Conflicts", "Breaks"
453 )
455 ess = False
456 if get_field("Essential", "no") == "yes":
457 ess = True
459 source = pkg
460 source_version = version
461 # retrieve the name and the version of the source package
462 source_raw = get_field("Source")
463 if source_raw:
464 source = intern(source_raw.split(" ")[0])
465 if "(" in source_raw:
466 source_version = intern(
467 source_raw[source_raw.find("(") + 1 : source_raw.find(")")]
468 )
470 provides_raw = get_field("Provides")
471 if provides_raw:
472 provides = parse_provides(
473 provides_raw, pkg_id=pkg_id, logger=self.logger
474 )
475 else:
476 provides = []
478 raw_arch = intern(get_field("Architecture"))
479 if raw_arch not in {"all", arch}: # pragma: no cover
480 raise AssertionError(
481 "%s has wrong architecture (%s) - should be either %s or all"
482 % (str(pkg_id), raw_arch, arch)
483 )
485 builtusing_raw = get_field("Built-Using")
486 if builtusing_raw:
487 builtusing = parse_builtusing(
488 builtusing_raw, pkg_id=pkg_id, logger=self.logger
489 )
490 else:
491 builtusing = []
493 dpkg = BinaryPackage(
494 version,
495 intern(get_field("Section")),
496 source,
497 source_version,
498 raw_arch,
499 get_field("Multi-Arch"),
500 deps,
501 conflicts,
502 provides,
503 ess,
504 pkg_id,
505 builtusing,
506 )
508 # if the source package is available in the distribution, then register this binary package
509 if source in srcdist:
510 # There may be multiple versions of any arch:all packages
511 # (in unstable) if some architectures have out-of-date
512 # binaries. We only want to include the package in the
513 # source -> binary mapping once. It doesn't matter which
514 # of the versions we include as only the package name and
515 # architecture are recorded.
516 srcdist[source].binaries.add(pkg_id)
517 # if the source package doesn't exist, create a fake one
518 else:
519 srcdist[source] = SourcePackage(
520 source,
521 source_version,
522 "faux",
523 {pkg_id},
524 None,
525 True,
526 None,
527 None,
528 [],
529 [],
530 )
532 # add the resulting dictionary to the package list
533 packages[pkg] = dpkg
534 if pkg_id in all_binaries:
535 self._merge_pkg_entries(pkg, arch, all_binaries[pkg_id], dpkg)
536 else:
537 all_binaries[pkg_id] = dpkg
539 # add the resulting dictionary to the package list
540 packages[pkg] = dpkg
542 return packages
544 def _read_binaries(
545 self, suite: Suite, architectures: Iterable[str]
546 ) -> tuple[
547 dict[str, dict[str, BinaryPackage]], dict[str, dict[str, set[tuple[str, str]]]]
548 ]:
549 """Read the list of binary packages from the specified directory
551 This method reads all the binary packages for a given suite.
553 If the "components" config parameter is set, the directory should
554 be the "suite" directory of a local mirror (i.e. the one containing
555 the "Release" file). Otherwise, Britney will read the packages
556 information from all the "Packages_${arch}" files referenced by
557 the "architectures" parameter.
559 Considering the
560 large amount of memory needed, not all the fields are loaded
561 in memory. The available fields are Version, Source, Multi-Arch,
562 Depends, Conflicts, Provides and Architecture.
564 The `Provides' field is used to populate the virtual packages list.
566 The method returns a tuple of two dicts with architecture as key and
567 another dict as value. The value dicts of the first dict map
568 from binary package name to "BinaryPackage" objects; the other second
569 value dicts map a package name to the packages providing them.
570 """
571 binaries: dict[str, dict[str, BinaryPackage]] = {}
572 provides_table: dict[str, dict[str, set[tuple[str, str]]]] = {}
573 basedir = suite.path
575 if self._components:
576 release_file = read_release_file(basedir)
577 listed_archs = set(release_file["Architectures"].split())
578 for arch in architectures:
579 packages: dict[str, BinaryPackage] = {}
580 if arch not in listed_archs: 580 ↛ 581line 580 didn't jump to line 581, because the condition on line 580 was never true
581 self.logger.info(
582 "Skipping arch %s for %s: It is not listed in the Release file",
583 arch,
584 suite.name,
585 )
586 binaries[arch] = {}
587 provides_table[arch] = {}
588 continue
589 for component in self._components:
590 binary_dir = "binary-%s" % arch
591 filename = os.path.join(basedir, component, binary_dir, "Packages")
592 try:
593 filename = possibly_compressed(filename)
594 except FileNotFoundError:
595 if component == "non-free-firmware":
596 self.logger.info(
597 "Skipping %s as it doesn't exist", filename
598 )
599 continue
600 raise
601 udeb_filename = os.path.join(
602 basedir, component, "debian-installer", binary_dir, "Packages"
603 )
604 # We assume the udeb Packages file is present if the
605 # regular one is present
606 udeb_filename = possibly_compressed(udeb_filename)
607 self._read_packages_file(filename, arch, suite.sources, packages)
608 self._read_packages_file(
609 udeb_filename, arch, suite.sources, packages
610 )
611 # create provides
612 provides = create_provides_map(packages)
613 binaries[arch] = packages
614 provides_table[arch] = provides
615 else:
616 for arch in architectures:
617 filename = os.path.join(basedir, "Packages_%s" % arch)
618 packages = self._read_packages_file(filename, arch, suite.sources)
619 provides = create_provides_map(packages)
620 binaries[arch] = packages
621 provides_table[arch] = provides
623 return (binaries, provides_table)
625 def _merge_pkg_entries(
626 self,
627 package: str,
628 parch: str,
629 pkg_entry1: BinaryPackage,
630 pkg_entry2: BinaryPackage,
631 ) -> None:
632 bad = []
633 for f in self.CHECK_FIELDS:
634 v1 = getattr(pkg_entry1, f)
635 v2 = getattr(pkg_entry2, f)
636 if v1 != v2: # pragma: no cover
637 bad.append((f, v1, v2))
639 if bad: # pragma: no cover
640 self.logger.error(
641 "Mismatch found %s %s %s differs", package, pkg_entry1.version, parch
642 )
643 for f, v1, v2 in bad:
644 self.logger.info(" ... %s %s != %s", f, v1, v2)
645 raise ValueError("Inconsistent / Unsupported data set")
647 # Merge ESSENTIAL if necessary
648 assert pkg_entry1.is_essential or not pkg_entry2.is_essential