Coverage for britney2/__init__.py: 94%
201 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
1import logging
2from collections.abc import Iterable, Iterator
3from dataclasses import dataclass, field
4from enum import Enum, unique
5from typing import TYPE_CHECKING, Any
7if TYPE_CHECKING: 7 ↛ 8line 7 didn't jump to line 8 because the condition on line 7 was never true
8 from .installability.tester import InstallabilityTester
11@dataclass(slots=True, frozen=True, order=True)
12class PackageId:
13 package_name: str
14 version: str
15 architecture: str
16 """Represent a source or binary package"""
18 def __post_init__(self) -> None:
19 assert self.architecture != "all", "all not allowed for PackageId (%s)" % (
20 self.name
21 )
23 def __repr__(self) -> str:
24 return f"PID({self.name})"
26 @property
27 def name(self) -> str:
28 if self.architecture == "source": 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true
29 return f"{self.package_name}/{self.version}"
30 else:
31 return f"{self.package_name}/{self.version}/{self.architecture}"
33 @property
34 def uvname(self) -> str:
35 if self.architecture == "source": 35 ↛ 38line 35 didn't jump to line 38 because the condition on line 35 was always true
36 return self.package_name
37 else:
38 return f"{self.package_name}/{self.architecture}"
41class BinaryPackageId(PackageId):
42 """Represent a binary package"""
44 # The base class has slots, this class does not need __dict__.
45 __slots__ = ()
47 def __init__(self, package_name: str, version: str, architecture: str) -> None:
48 super().__init__(package_name, version, architecture)
49 assert (
50 self.architecture != "source"
51 ), "Source not allowed for BinaryPackageId (%s)" % (package_name)
53 def __repr__(self) -> str:
54 return f"BPID({self.name})"
57class MultiArch(Enum):
58 SAME = 0
59 FOREIGN = 1
60 ALLOWED = 2
61 NO = 3
63 @staticmethod
64 def from_str(value: str | None) -> "MultiArch":
65 if value == "same": 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 return MultiArch.SAME
67 elif value == "foreign": 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 return MultiArch.FOREIGN
69 elif value == "allowed":
70 return MultiArch.ALLOWED
71 elif value == "no" or value is None: 71 ↛ 74line 71 didn't jump to line 74 because the condition on line 71 was always true
72 return MultiArch.NO
73 else:
74 raise ValueError(f"invalid Multi-Arch value {value}")
77@dataclass(slots=True)
78class BinaryPackage:
79 section: str
80 source: str
81 source_version: str
82 architecture: str
83 multi_arch: MultiArch
84 depends: str | None
85 conflicts: str | None
86 provides: list[tuple[str, str, str]] | None
87 is_essential: bool
88 pkg_id: BinaryPackageId
89 builtusing: list[tuple[str, str]] | None
91 @property
92 def version(self) -> str:
93 return self.pkg_id.version
96@dataclass(slots=True)
97class SourcePackage:
98 source: str
99 version: str
100 section: str
101 binaries: set[BinaryPackageId]
102 maintainer: str | None
103 is_fakesrc: bool
104 build_deps_arch: str | None
105 build_deps_indep: str | None
106 testsuite: list[str]
107 testsuite_triggers: list[str]
110class DependencyType(Enum):
111 DEPENDS = ("Depends", "depends", "dependency")
112 # BUILD_DEPENDS includes BUILD_DEPENDS_ARCH
113 BUILD_DEPENDS = ("Build-Depends(-Arch)", "build-depends", "build-dependency")
114 BUILD_DEPENDS_INDEP = (
115 "Build-Depends-Indep",
116 "build-depends-indep",
117 "build-dependency (indep)",
118 )
119 BUILT_USING = ("Built-Using", "built-using", "built-using")
120 # Pseudo dependency where Breaks/Conflicts effectively become a inverted dependency. E.g.
121 # p Depends on q plus q/2 breaks p/1 implies that p/2 must migrate before q/2 can migrate
122 # (or they go at the same time).
123 # - can also happen with version ranges
124 IMPLICIT_DEPENDENCY = (
125 "Implicit dependency",
126 "implicit-dependency",
127 "implicit-dependency",
128 )
130 def __str__(self) -> str:
131 return self.value[0]
133 def get_reason(self) -> str:
134 return self.value[1]
136 def get_description(self) -> str:
137 return self.value[2]
140@unique
141class SuiteClass(Enum):
142 TARGET_SUITE = (False, False)
143 PRIMARY_SOURCE_SUITE = (True, True)
144 ADDITIONAL_SOURCE_SUITE = (True, False)
146 @property
147 def is_source(self) -> bool:
148 return self.value[0]
150 @property
151 def is_target(self) -> bool:
152 return not self.is_source
154 @property
155 def is_primary_source(self) -> bool:
156 return self is SuiteClass.PRIMARY_SOURCE_SUITE
158 @property
159 def is_additional_source(self) -> bool:
160 return self is SuiteClass.ADDITIONAL_SOURCE_SUITE
163class Suite:
164 def __init__(
165 self,
166 suite_class: SuiteClass,
167 name: str,
168 path: str,
169 suite_short_name: str | None = None,
170 ) -> None:
171 self.suite_class = suite_class
172 self.name = name
173 self.codename = name
174 self.path = path
175 self.suite_short_name = suite_short_name if suite_short_name else ""
176 self.sources: dict[str, SourcePackage] = {}
177 self._binaries: dict[str, dict[str, BinaryPackage]] = {}
178 self.provides_table: dict[str, dict[str, set[tuple[str, str]]]] = {}
179 self._all_binaries_in_suite: dict[BinaryPackageId, BinaryPackage] | None = None
181 @property
182 def excuses_suffix(self) -> str:
183 return self.suite_short_name
185 @property
186 def binaries(self) -> dict[str, dict[str, BinaryPackage]]:
187 # TODO some callers modify this structure, which doesn't invalidate
188 # the self._all_binaries_in_suite cache
189 return self._binaries
191 @binaries.setter
192 def binaries(self, binaries: dict[str, dict[str, BinaryPackage]]) -> None:
193 self._binaries = binaries
194 self._all_binaries_in_suite = None
196 @property
197 def all_binaries_in_suite(self) -> dict[BinaryPackageId, BinaryPackage]:
198 if self._all_binaries_in_suite is None:
199 self._all_binaries_in_suite = {
200 x.pkg_id: x for a in self._binaries.values() for x in a.values()
201 }
202 return self._all_binaries_in_suite
204 def any_of_these_are_in_the_suite(self, pkgs: Iterable[BinaryPackageId]) -> bool:
205 """Test if at least one package of a given set is in the suite
207 :return: True if any of the packages in pkgs are currently in the suite
208 """
209 return not self.all_binaries_in_suite.keys().isdisjoint(pkgs)
211 def is_pkg_in_the_suite(self, pkg_id: BinaryPackageId) -> bool:
212 """Test if the package of is in testing
214 :return: True if the pkg is currently in the suite
215 """
216 return pkg_id in self.all_binaries_in_suite
218 def which_of_these_are_in_the_suite(
219 self, pkgs: Iterable[BinaryPackageId]
220 ) -> Iterator[BinaryPackageId]:
221 """Iterate over all packages that are in the suite
223 :return: An iterable of package ids that are in the suite
224 """
225 yield from (x for x in pkgs if x in self.all_binaries_in_suite)
227 def is_cruft(self, pkg: BinaryPackage) -> bool:
228 """Check if the package is cruft in the suite
230 :param pkg: which BinaryPackage to check
231 Note that this package is assumed to be in the suite
232 """
233 newest_src_in_suite = self.sources[pkg.source]
234 return pkg.source_version != newest_src_in_suite.version
237class TargetSuite(Suite):
238 inst_tester: "InstallabilityTester"
240 def __init__(self, *args: Any, **kwargs: Any) -> None:
241 super().__init__(*args, **kwargs)
242 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
243 self._logger = logging.getLogger(logger_name)
245 def is_installable(self, pkg_id: "BinaryPackageId") -> bool:
246 """Determine whether the given package can be installed in the suite
248 :param pkg_id: A BinaryPackageId
249 :return: True if the pkg is currently installable in the suite
250 """
251 return self.inst_tester.is_installable(pkg_id)
253 def add_binary(self, pkg_id: "BinaryPackageId") -> None:
254 """Add a binary package to the suite
256 :param pkg_id: The id of the package
257 :raises KeyError: if the package is not known
258 """
260 # TODO The calling code currently manually updates the contents of
261 # target_suite.binaries when this is called. It would probably make
262 # more sense to do that here instead
263 self.inst_tester.add_binary(pkg_id)
264 self._all_binaries_in_suite = None
266 def remove_binary(self, pkg_id: BinaryPackageId) -> None:
267 """Remove a binary from the suite
269 :param pkg_id: The id of the package
270 :raises KeyError: if the package is not known
271 """
273 # TODO The calling code currently manually updates the contents of
274 # target_suite.binaries when this is called. It would probably make
275 # more sense to do that here instead
276 self.inst_tester.remove_binary(pkg_id)
277 self._all_binaries_in_suite = None
279 def check_suite_source_pkg_consistency(self, comment: str) -> None:
280 sources_t = self.sources
281 binaries_t = self.binaries
282 logger = self._logger
283 issues_found = False
285 logger.info("check_target_suite_source_pkg_consistency %s", comment)
287 for binaries in binaries_t.values():
288 for pkg_name, pkg in binaries.items():
289 src = pkg.source
291 if src not in sources_t: # pragma: no cover
292 issues_found = True
293 logger.error(
294 "inconsistency found (%s): src %s not in target, target has pkg %s with source %s"
295 % (comment, src, pkg_name, src)
296 )
298 for src, source_data in sources_t.items():
299 for pkg_id in source_data.binaries:
300 if (
301 pkg_id.package_name not in binaries_t[pkg_id.architecture]
302 ): # pragma: no cover
303 issues_found = True
304 logger.error(
305 "inconsistency found (%s): binary %s from source %s not in binaries_t[%s]"
306 % (comment, pkg_id.package_name, src, pkg_id.architecture)
307 )
309 if issues_found: # pragma: no cover
310 raise AssertionError("inconsistencies found in target suite")
313@dataclass(slots=True)
314class Suites:
315 target_suite: TargetSuite
316 source_suites: list[Suite]
317 _suites: dict[str, Suite] = field(init=False, default_factory=dict)
318 _by_name_or_alias: dict[str, Suite] = field(init=False, default_factory=dict)
320 def __post_init__(self) -> None:
321 self._suites[self.target_suite.name] = self.target_suite
322 self._by_name_or_alias[self.target_suite.name] = self.target_suite
323 if self.target_suite.suite_short_name: 323 ↛ 324line 323 didn't jump to line 324
324 self._by_name_or_alias[self.target_suite.suite_short_name] = (
325 self.target_suite
326 )
327 for suite in self.source_suites:
328 self._suites[suite.name] = suite
329 self._by_name_or_alias[suite.name] = suite
330 if suite.suite_short_name:
331 self._by_name_or_alias[suite.suite_short_name] = suite
333 @property
334 def primary_source_suite(self) -> Suite:
335 return self.source_suites[0]
337 @property
338 def by_name_or_alias(self) -> dict[str, Suite]:
339 return self._by_name_or_alias
341 @property
342 def additional_source_suites(self) -> list[Suite]:
343 return self.source_suites[1:]
345 def __getitem__(self, item: str) -> Suite:
346 return self._suites[item]
348 def __len__(self) -> int:
349 return len(self.source_suites) + 1
351 def __contains__(self, item: str) -> bool:
352 return item in self._suites
354 def __iter__(self) -> Iterator[Suite]:
355 # Sources first (as we will rely on this for loading data in the old live-data tests)
356 yield from self.source_suites
357 yield self.target_suite