Coverage for britney2/utils.py: 91%
464 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-29 17:21 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-29 17:21 +0000
1# Refactored parts from britney.py, which is/was:
2# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org>
3# Andreas Barth <aba@debian.org>
4# Fabio Tranchitella <kobold@debian.org>
5# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org>
6# Copyright (C) 2012 Niels Thykier <niels@thykier.net>
7#
8# New portions
9# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
22import errno
23import logging
24import optparse
25import os
26import sys
27import time
28from collections import defaultdict
29from collections.abc import Callable, Container, Iterable, Iterator, Mapping, MutableSet
30from datetime import UTC, datetime
31from functools import partial
32from itertools import chain, filterfalse
33from typing import (
34 IO,
35 TYPE_CHECKING,
36 Any,
37 Literal,
38 Protocol,
39 TypeVar,
40 Union,
41 cast,
42 overload,
43)
45import apt_pkg
46import yaml
48from britney2 import (
49 BinaryPackage,
50 BinaryPackageId,
51 PackageId,
52 SourcePackage,
53 Suite,
54 SuiteClass,
55 Suites,
56 TargetSuite,
57)
58from britney2.excusedeps import DependencyState, ImpossibleDependencyState
59from britney2.policies import PolicyVerdict
61if TYPE_CHECKING: 61 ↛ 63line 61 didn't jump to line 63 because the condition on line 61 was never true
63 from _typeshed import SupportsRichComparisonT
64 from apt_pkg import TagSection
66 from .excuse import Excuse
67 from .hints import HintCollection
68 from .installability.universe import BinaryPackageUniverse
69 from .migrationitem import MigrationItem, MigrationItemFactory
71_T = TypeVar("_T")
74class MigrationConstraintException(Exception):
75 pass
78@overload
79def ifilter_except( 79 ↛ exitline 79 didn't jump to the function exit
80 container: Container[_T], iterable: Literal[None] = None
81) -> "partial[filterfalse[_T]]": ...
84@overload
85def ifilter_except( 85 ↛ exitline 85 didn't jump to the function exit
86 container: Container[_T], iterable: Iterable[_T]
87) -> "filterfalse[_T]": ...
90def ifilter_except(
91 container: Container[_T], iterable: Iterable[_T] | None = None
92) -> Union["filterfalse[_T]", "partial[filterfalse[_T]]"]:
93 """Filter out elements in container
95 If given an iterable it returns a filtered iterator, otherwise it
96 returns a function to generate filtered iterators. The latter is
97 useful if the same filter has to be (re-)used on multiple
98 iterators that are not known on beforehand.
99 """
100 if iterable is not None: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true
101 return filterfalse(container.__contains__, iterable)
102 return cast(
103 "partial[filterfalse[_T]]", partial(filterfalse, container.__contains__)
104 )
107@overload
108def ifilter_only(container: Container[_T], iterable: Iterable[_T]) -> "filter[_T]": ... 108 ↛ exitline 108 didn't return from function 'ifilter_only' because
111@overload
112def ifilter_only(container: Container[_T], iterable: None) -> "partial[filter[_T]]": ... 112 ↛ exitline 112 didn't return from function 'ifilter_only' because
115def ifilter_only(
116 container: Container[_T], iterable: Iterable[_T] | None = None
117) -> Union["filter[_T]", "partial[filter[_T]]"]:
118 """Filter out elements in which are not in container
120 If given an iterable it returns a filtered iterator, otherwise it
121 returns a function to generate filtered iterators. The latter is
122 useful if the same filter has to be (re-)used on multiple
123 iterators that are not known on beforehand.
124 """
125 if iterable is not None: 125 ↛ 127line 125 didn't jump to line 127 because the condition on line 125 was always true
126 return filter(container.__contains__, iterable)
127 return partial(filter, container.__contains__)
130# iter_except is from the "itertools" recipe
131def iter_except(
132 func: Callable[[], _T],
133 exception: type[BaseException] | tuple[type[BaseException], ...],
134 first: Any = None,
135) -> Iterator[_T]: # pragma: no cover - itertools recipe function
136 """Call a function repeatedly until an exception is raised.
138 Converts a call-until-exception interface to an iterator interface.
139 Like __builtin__.iter(func, sentinel) but uses an exception instead
140 of a sentinel to end the loop.
142 Examples:
143 bsddbiter = iter_except(db.next, bsddb.error, db.first)
144 heapiter = iter_except(functools.partial(heappop, h), IndexError)
145 dictiter = iter_except(d.popitem, KeyError)
146 dequeiter = iter_except(d.popleft, IndexError)
147 queueiter = iter_except(q.get_nowait, Queue.Empty)
148 setiter = iter_except(s.pop, KeyError)
150 """
151 try:
152 if first is not None:
153 yield first()
154 while 1:
155 yield func()
156 except exception:
157 pass
160def log_and_format_old_libraries(
161 logger: logging.Logger, libs: list["MigrationItem"]
162) -> None:
163 """Format and log old libraries in a table (no header)"""
164 libraries: dict[str, list[str]] = {}
165 for i in libs:
166 pkg = i.package
167 if pkg in libraries:
168 libraries[pkg].append(i.architecture)
169 else:
170 libraries[pkg] = [i.architecture]
172 for lib in sorted(libraries):
173 logger.info(" %s: %s", lib, " ".join(libraries[lib]))
176def compute_reverse_tree(
177 pkg_universe: "BinaryPackageUniverse", affected: set[BinaryPackageId]
178) -> None:
179 """Calculate the full dependency tree for a set of packages
181 This method returns the full dependency tree for a given set of
182 packages. The first argument is an instance of the BinaryPackageUniverse
183 and the second argument are a set of BinaryPackageId.
185 The set of affected packages will be updated in place and must
186 therefore be mutable.
187 """
188 remain = list(affected)
189 while remain:
190 pkg_id = remain.pop()
191 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected
192 affected.update(new_pkg_ids)
193 remain.extend(new_pkg_ids)
196def add_transitive_dependencies_flatten(
197 pkg_universe: "BinaryPackageUniverse", initial_set: MutableSet[BinaryPackageId]
198) -> None:
199 """Find and include all transitive dependencies
201 This method updates the initial_set parameter to include all transitive
202 dependencies. The first argument is an instance of the BinaryPackageUniverse
203 and the second argument are a set of BinaryPackageId.
205 The set of initial packages will be updated in place and must
206 therefore be mutable.
207 """
208 remain = list(initial_set)
209 while remain:
210 pkg_id = remain.pop()
211 new_pkg_ids = {
212 x
213 for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id))
214 if x not in initial_set
215 }
216 initial_set |= new_pkg_ids
217 remain.extend(new_pkg_ids)
220def write_nuninst(filename: str, nuninst: dict[str, set[str]]) -> None:
221 """Write the non-installable report
223 Write the non-installable report derived from "nuninst" to the
224 file denoted by "filename".
225 """
226 with open(filename, "w", encoding="utf-8") as f:
227 # Having two fields with (almost) identical dates seems a bit
228 # redundant.
229 f.write(
230 "Built on: "
231 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
232 + "\n"
233 )
234 f.write(
235 "Last update: "
236 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
237 + "\n\n"
238 )
239 for k in nuninst:
240 f.write("{}: {}\n".format(k, " ".join(nuninst[k])))
243def read_nuninst(filename: str, architectures: set[str]) -> dict[str, set[str]]:
244 """Read the non-installable report
246 Read the non-installable report from the file denoted by
247 "filename" and return it. Only architectures in "architectures"
248 will be included in the report.
249 """
250 nuninst: dict[str, set[str]] = {}
251 with open(filename, encoding="ascii") as f:
252 for r in f:
253 if ":" not in r:
254 continue
255 arch, packages = r.strip().split(":", 1)
256 if arch.split("+", 1)[0] in architectures:
257 nuninst[arch] = set(packages.split())
258 return nuninst
261def newly_uninst(
262 nuold: dict[str, set[str]], nunew: dict[str, set[str]]
263) -> dict[str, list[str]]:
264 """Return a nuninst statistic with only new uninstallable packages
266 This method subtracts the uninstallable packages of the statistic
267 "nunew" from the statistic "nuold".
269 It returns a dictionary with the architectures as keys and the list
270 of uninstallable packages as values. If there are no regressions
271 on a given architecture, then the architecture will be omitted in
272 the result. Accordingly, if none of the architectures have
273 regressions an empty directory is returned.
274 """
275 res: dict[str, list[str]] = {}
276 for arch in ifilter_only(nunew, nuold):
277 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]]
278 # Leave res empty if there are no newly uninst packages
279 if arch_nuninst:
280 res[arch] = arch_nuninst
281 return res
284def format_and_log_uninst(
285 logger: logging.Logger,
286 architectures: Iterable[str],
287 nuninst: Mapping[str, Iterable[str]],
288 *,
289 loglevel: int = logging.INFO,
290) -> None:
291 """Emits the uninstallable packages to the log
293 An example of the output string is:
294 * i386: broken-pkg1, broken-pkg2
296 Note that if there is no uninstallable packages, then nothing is emitted.
297 """
298 for arch in architectures:
299 if arch in nuninst and nuninst[arch]:
300 msg = " * {}: {}".format(arch, ", ".join(sorted(nuninst[arch])))
301 logger.log(loglevel, msg)
304class Sorted(Protocol):
305 def __call__( 305 ↛ exitline 305 didn't jump to the function exit
306 self,
307 iterable: Iterable["SupportsRichComparisonT"],
308 /,
309 *,
310 key: None = None,
311 reverse: bool = False,
312 ) -> list["SupportsRichComparisonT"]: ...
315def write_heidi(
316 filename: str,
317 target_suite: TargetSuite,
318 *,
319 outofsync_arches: frozenset[str] = frozenset(),
320 sorted: Sorted = sorted,
321) -> None:
322 """Write the output HeidiResult
324 This method write the output for Heidi, which contains all the
325 binary packages and the source packages in the form:
327 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section>
328 <src-name> <src-version> source <src-section>
330 The file is written as "filename" using the sources and packages
331 from the "target_suite" parameter.
333 outofsync_arches: If given, it is a set of architectures marked
334 as "out of sync". The output file may exclude some out of date
335 arch:all packages for those architectures to reduce the noise.
337 The "X=X" parameters are optimizations to avoid "load global" in
338 the loops.
339 """
340 sources_t = target_suite.sources
341 packages_t = target_suite.binaries
343 with open(filename, "w", encoding="ascii") as f:
345 # write binary packages
346 for arch in sorted(packages_t):
347 binaries = packages_t[arch]
348 for pkg_name in sorted(binaries):
349 pkg = binaries[pkg_name]
350 pkgv = pkg.version
351 pkgarch = pkg.architecture or "all"
352 pkgsec = pkg.section or "faux"
353 if pkgsec == "faux" or pkgsec.endswith("/faux"):
354 # Faux package; not really a part of testing
355 continue
356 if ( 356 ↛ 368line 356 didn't jump to line 368
357 pkg.source_version
358 and pkgarch == "all"
359 and pkg.source_version != sources_t[pkg.source].version
360 and arch in outofsync_arches
361 ):
362 # when architectures are marked as "outofsync", their binary
363 # versions may be lower than those of the associated
364 # source package in testing. the binary package list for
365 # such architectures will include arch:all packages
366 # matching those older versions, but we only want the
367 # newer arch:all in testing
368 continue
369 f.write(f"{pkg_name} {pkgv} {pkgarch} {pkgsec}\n")
371 # write sources
372 for src_name in sorted(sources_t):
373 src = sources_t[src_name]
374 srcv = src.version
375 srcsec = src.section or "unknown"
376 if srcsec == "faux" or srcsec.endswith("/faux"):
377 # Faux package; not really a part of testing
378 continue
379 f.write(f"{src_name} {srcv} source {srcsec}\n")
382def write_heidi_delta(filename: str, all_selected: list["MigrationItem"]) -> None:
383 """Write the output delta
385 This method writes the packages to be upgraded, in the form:
386 <src-name> <src-version>
387 or (if the source is to be removed):
388 -<src-name> <src-version>
390 The order corresponds to that shown in update_output.
391 """
392 with open(filename, "w", encoding="ascii") as fd:
394 fd.write("#HeidiDelta\n")
396 for item in all_selected:
397 prefix = ""
399 if item.is_removal:
400 prefix = "-"
402 if item.architecture == "source":
403 fd.write(f"{prefix}{item.package} {item.version}\n")
404 else:
405 fd.write(
406 "%s%s %s %s\n"
407 % (prefix, item.package, item.version, item.architecture)
408 )
411class Opener(Protocol):
412 def __call__( 412 ↛ exitline 412 didn't jump to the function exit
413 self, file: str, mode: Literal["wt"], encoding: Literal["utf-8"]
414 ) -> IO[Any]: ...
417def write_excuses(
418 excuses: dict[str, "Excuse"] | dict[PackageId, "Excuse"],
419 dest_file: str,
420 output_format: Literal["yaml", "legacy-html"] = "yaml",
421) -> None:
422 """Write the excuses to dest_file
424 Writes a list of excuses in a specified output_format to the
425 path denoted by dest_file. The output_format can either be "yaml"
426 or "legacy-html".
427 """
428 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey())
429 if output_format == "yaml":
430 os.makedirs(os.path.dirname(dest_file), exist_ok=True)
431 opener: Opener = open # type: ignore[assignment]
432 if dest_file.endswith(".xz"): 432 ↛ 433line 432 didn't jump to line 433 because the condition on line 432 was never true
433 import lzma
435 opener = lzma.open # type: ignore[assignment]
436 elif dest_file.endswith(".gz"): 436 ↛ 437line 436 didn't jump to line 437 because the condition on line 436 was never true
437 import gzip
439 opener = gzip.open # type: ignore[assignment]
440 with opener(dest_file, "wt", encoding="utf-8") as f:
441 edatalist = [e.excusedata(excuses) for e in excuselist]
442 excusesdata = {
443 "sources": edatalist,
444 "generated-date": datetime.now(UTC),
445 }
446 f.write(
447 yaml.dump(excusesdata, default_flow_style=False, allow_unicode=True)
448 )
449 elif output_format == "legacy-html":
450 with open(dest_file, "w", encoding="utf-8") as f:
451 f.write(
452 '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">\n'
453 )
454 f.write("<html><head><title>excuses...</title>")
455 f.write(
456 '<meta http-equiv="Content-Type" content="text/html;charset=utf-8"></head><body>\n'
457 )
458 f.write(
459 "<p>Generated: "
460 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
461 + "</p>\n"
462 )
463 f.write("<ul>\n")
464 for e in excuselist:
465 f.write("<li>%s" % e.html(excuses))
466 f.write("</ul></body></html>\n")
467 else: # pragma: no cover
468 raise ValueError('Output format must be either "yaml or "legacy-html"')
471def old_libraries(
472 mi_factory: "MigrationItemFactory",
473 suite_info: Suites,
474 outofsync_arches: Iterable[str] = frozenset(),
475) -> list["MigrationItem"]:
476 """Detect old libraries left in the target suite for smooth transitions
478 This method detects old libraries which are in the target suite but no
479 longer built from the source package: they are still there because
480 other packages still depend on them, but they should be removed as
481 soon as possible.
483 For "outofsync" architectures, outdated binaries are allowed to be in
484 the target suite, so they are only added to the removal list if they
485 are no longer in the (primary) source suite.
486 """
487 sources_t = suite_info.target_suite.sources
488 binaries_t = suite_info.target_suite.binaries
489 binaries_s = suite_info.primary_source_suite.binaries
490 removals = []
491 for arch in binaries_t:
492 for pkg_name in binaries_t[arch]:
493 pkg = binaries_t[arch][pkg_name]
494 if sources_t[pkg.source].version != pkg.source_version and (
495 arch not in outofsync_arches or pkg_name not in binaries_s[arch]
496 ):
497 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id))
498 return removals
501def is_nuninst_asgood_generous(
502 constraints: dict[str, list[str]],
503 allow_uninst: dict[str, set[str | None]],
504 architectures: list[str],
505 old: dict[str, set[str]],
506 new: dict[str, set[str]],
507 break_arches: set[str] = cast(set[str], frozenset()),
508) -> bool:
509 """Compares the nuninst counters and constraints to see if they improved
511 Given a list of architectures, the previous and the current nuninst
512 counters, this function determines if the current nuninst counter
513 is better than the previous one. Optionally it also accepts a set
514 of "break_arches", the nuninst counter for any architecture listed
515 in this set are completely ignored.
517 If the nuninst counters are equal or better, then the constraints
518 are checked for regressions (ignoring break_arches).
520 Returns True if the new nuninst counter is better than the
521 previous and there are no constraint regressions (ignoring Break-archs).
522 Returns False otherwise.
524 """
525 diff = 0
526 for arch in architectures:
527 if arch in break_arches:
528 continue
529 diff = diff + (
530 len(new[arch] - allow_uninst[arch]) - len(old[arch] - allow_uninst[arch])
531 )
532 if diff > 0:
533 return False
534 must_be_installable = constraints["keep-installable"]
535 for arch in architectures:
536 if arch in break_arches:
537 continue
538 regression = new[arch] - old[arch]
539 if not regression.isdisjoint(must_be_installable): 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true
540 return False
541 return True
544def clone_nuninst(
545 nuninst: dict[str, set[str]],
546 *,
547 packages_s: dict[str, dict[str, BinaryPackage]] | None = None,
548 architectures: Iterable[str] | None = None,
549) -> dict[str, set[str]]:
550 """Completely or Selectively deep clone nuninst
552 Given nuninst table, the package table for a given suite and
553 a list of architectures, this function will clone the nuninst
554 table. Only the listed architectures will be deep cloned -
555 the rest will only be shallow cloned. When packages_s is given,
556 packages not listed in packages_s will be pruned from the clone
557 (if packages_s is omitted, the per architecture nuninst is cloned
558 as-is)
559 """
560 clone = nuninst.copy()
561 if architectures is None: 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true
562 return clone
563 if packages_s is not None:
564 for arch in architectures:
565 clone[arch] = {x for x in nuninst[arch] if x in packages_s[arch]}
566 clone[arch + "+all"] = {
567 x for x in nuninst[arch + "+all"] if x in packages_s[arch]
568 }
569 else:
570 for arch in architectures:
571 clone[arch] = set(nuninst[arch])
572 clone[arch + "+all"] = set(nuninst[arch + "+all"])
573 return clone
576def test_installability(
577 target_suite: TargetSuite,
578 pkg_name: str,
579 pkg_id: BinaryPackageId,
580 broken: set[str],
581 nuninst_arch: set[str] | None,
582) -> None:
583 """Test for installability of a package on an architecture
585 (pkg_name, pkg_version, pkg_arch) is the package to check.
587 broken is the set of broken packages. If p changes
588 installability (e.g. goes from uninstallable to installable),
589 broken will be updated accordingly.
591 If nuninst_arch is not None then it also updated in the same
592 way as broken is.
593 """
594 if not target_suite.is_installable(pkg_id):
595 # if pkg_name not in broken: regression else: already broken
596 broken.add(pkg_name)
597 if nuninst_arch is not None:
598 nuninst_arch.add(pkg_name)
599 else:
600 # if pkg_name in broken: # improvement else: already not broken
601 broken.discard(pkg_name)
602 if nuninst_arch is not None:
603 nuninst_arch.discard(pkg_name)
606def check_installability(
607 target_suite: TargetSuite,
608 binaries: dict[str, dict[str, BinaryPackage]],
609 arch: str,
610 updates: set[BinaryPackageId],
611 check_archall: bool,
612 nuninst: dict[str, set[str]],
613) -> None:
614 broken = nuninst[arch + "+all"]
615 packages_t_a = binaries[arch]
617 for pkg_id in (x for x in updates if x.architecture == arch):
618 name, version, parch = pkg_id
619 if name not in packages_t_a:
620 continue
621 pkgdata = packages_t_a[name]
622 if version != pkgdata.version:
623 # Not the version in testing right now, ignore
624 continue
625 actual_arch = pkgdata.architecture
626 nuninst_arch = None
627 # only check arch:all packages if requested
628 if check_archall or actual_arch != "all":
629 nuninst_arch = nuninst[parch]
630 elif actual_arch == "all": 630 ↛ 632line 630 didn't jump to line 632 because the condition on line 630 was always true
631 nuninst[parch].discard(name)
632 test_installability(target_suite, name, pkg_id, broken, nuninst_arch)
635def possibly_compressed(
636 path: str, *, permitted_compressions: list[str] | None = None
637) -> str:
638 """Find and select a (possibly compressed) variant of a path
640 If the given path exists, it will be returned
642 :param path: The base path.
643 :param permitted_compressions: Alternative extensions to look for. Defaults to "gz" and "xz".
644 :return: The path given possibly with one of the permitted extensions.
645 :raises FileNotFoundError: if the path is not found
646 """
647 if os.path.exists(path): 647 ↛ 649line 647 didn't jump to line 649 because the condition on line 647 was always true
648 return path
649 if permitted_compressions is None:
650 permitted_compressions = ["gz", "xz"]
651 for ext in permitted_compressions:
652 cpath = f"{path}.{ext}"
653 if os.path.exists(cpath):
654 return cpath
655 raise FileNotFoundError(
656 errno.ENOENT, os.strerror(errno.ENOENT), path
657 ) # pragma: no cover
660def create_provides_map(
661 packages: dict[str, BinaryPackage],
662) -> dict[str, set[tuple[str, str]]]:
663 """Create a provides map from a map binary package names and their BinaryPackage objects
665 :param packages: A dict mapping binary package names to their BinaryPackage object
666 :return: A provides map
667 """
668 # create provides
669 provides = defaultdict(set)
671 for pkg, dpkg in packages.items():
672 # register virtual packages and real packages that provide
673 # them
674 for provided_pkg, provided_version, _ in dpkg.provides:
675 provides[provided_pkg].add((pkg, provided_version))
677 return provides
680def read_release_file(suite_dir: str) -> "TagSection[str]":
681 """Parses a given "Release" file
683 :param suite_dir: The directory to the suite
684 :return: A dict of the first (and only) paragraph in an Release file
685 """
686 release_file = os.path.join(suite_dir, "Release")
687 with open(release_file) as fd:
688 tag_file = iter(apt_pkg.TagFile(fd))
689 result = next(tag_file)
690 if next(tag_file, None) is not None: # pragma: no cover
691 raise TypeError("%s has more than one paragraph" % release_file)
692 return result
695def read_sources_file(
696 filename: str,
697 sources: dict[str, SourcePackage] | None = None,
698 add_faux: bool = True,
699 intern: Callable[[str], str] = sys.intern,
700) -> dict[str, SourcePackage]:
701 """Parse a single Sources file into a hash
703 Parse a single Sources file into a dict mapping a source package
704 name to a SourcePackage object. If there are multiple source
705 packages with the same version, then highest versioned source
706 package (that is not marked as "Extra-Source-Only") is the
707 version kept in the dict.
709 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile
710 :param sources: Optional dict to add the packages to. If given, this is also the value returned.
711 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all
712 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop
713 :return: mapping from names to a source package
714 """
715 if sources is None:
716 sources = {}
718 tag_file = apt_pkg.TagFile(filename)
719 get_field = tag_file.section.get
720 step = tag_file.step
722 while step():
723 if get_field("Extra-Source-Only", "no") == "yes":
724 # Ignore sources only referenced by Built-Using
725 continue
726 pkg = get_field("Package")
727 ver = get_field("Version")
728 # There may be multiple versions of the source package
729 # (in unstable) if some architectures have out-of-date
730 # binaries. We only ever consider the source with the
731 # largest version for migration.
732 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0:
733 continue
734 maint = get_field("Maintainer")
735 if maint: 735 ↛ 737line 735 didn't jump to line 737 because the condition on line 735 was always true
736 maint = intern(maint.strip())
737 section = get_field("Section")
738 if section: 738 ↛ 741line 738 didn't jump to line 741 because the condition on line 738 was always true
739 section = intern(section.strip())
740 build_deps_arch: str | None
741 build_deps_arch = ", ".join(
742 x
743 for x in (get_field("Build-Depends"), get_field("Build-Depends-Arch"))
744 if x is not None
745 )
746 if build_deps_arch != "":
747 build_deps_arch = sys.intern(build_deps_arch)
748 else:
749 build_deps_arch = None
750 build_deps_indep = get_field("Build-Depends-Indep")
751 if build_deps_indep is not None:
752 build_deps_indep = sys.intern(build_deps_indep)
754 # Adding arch:all packages to the list of binaries already to be able
755 # to check for them later. Helps mitigate bug 887060 and is the
756 # (partial?) answer to bug 1064428.
757 binaries: set[BinaryPackageId] = set()
758 if add_faux and "all" in get_field("Architecture", "").split():
759 # the value "faux" in arch:faux is used elsewhere, so keep in sync
760 pkg_id = BinaryPackageId(pkg + "-faux", intern("0~~~~"), intern("faux"))
761 binaries.add(pkg_id)
763 sources[intern(pkg)] = SourcePackage(
764 intern(pkg),
765 intern(ver),
766 section,
767 binaries,
768 maint,
769 False,
770 build_deps_arch,
771 build_deps_indep,
772 get_field("Testsuite", "").split(),
773 get_field("Testsuite-Triggers", "").replace(",", "").split(),
774 )
775 return sources
778def _check_and_update_packages(
779 packages: list[BinaryPackage],
780 package: BinaryPackage,
781 archqual: str | None,
782 build_depends: bool,
783) -> None:
784 """Helper for get_dependency_solvers
786 This method updates the list of packages with a given package if that
787 package is a valid (Build-)Depends.
789 :param packages: which packages are to be updated
790 :param archqual: Architecture qualifier
791 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency.
792 """
794 # See also bug #971739 and #1059929
795 if archqual is None:
796 packages.append(package)
797 elif archqual == "native" and build_depends:
798 # Multi-arch handling for build-dependencies
799 # - :native is ok always
800 packages.append(package)
801 elif archqual == "any" and package.multi_arch == "allowed":
802 # Multi-arch handling for both build-dependencies and regular dependencies
803 # - :any is ok iff the target has "M-A: allowed"
804 packages.append(package)
807class GetDependencySolversProto(Protocol):
808 def __call__( 808 ↛ exitline 808 didn't jump to the function exit
809 self,
810 block: list[tuple[str, str, str]],
811 binaries_s_a: dict[str, BinaryPackage],
812 provides_s_a: dict[str, set[tuple[str, str]]],
813 *,
814 build_depends: bool = False,
815 empty_set: Any = frozenset(),
816 ) -> list[BinaryPackage]: ...
819def get_dependency_solvers(
820 block: list[tuple[str, str, str]],
821 binaries_s_a: dict[str, BinaryPackage],
822 provides_s_a: dict[str, set[tuple[str, str]]],
823 *,
824 build_depends: bool = False,
825 empty_set: Any = frozenset(),
826) -> list[BinaryPackage]:
827 """Find the packages which satisfy a dependency block
829 This method returns the list of packages which satisfy a dependency
830 block (as returned by apt_pkg.parse_depends) in a package table
831 for a given suite and architecture (a la self.binaries[suite][arch])
833 It can also handle build-dependency relations if the named parameter
834 "build_depends" is set to True. In this case, block should be based
835 on the return value from apt_pkg.parse_src_depends.
837 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends
838 if the "build_depends" is True)
839 :param binaries_s_a: Mapping of package names to the relevant BinaryPackage
840 :param provides_s_a: Mapping of package names to their providers (as generated by parse_provides)
841 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than
842 a regular dependency relation.
843 :param empty_set: Internal implementation detail / optimisation
844 :return: package names solving the relation
845 """
846 packages: list[BinaryPackage] = []
848 # for every package, version and operation in the block
849 for name, version, op in block:
850 if ":" in name:
851 name, archqual = name.split(":", 1)
852 else:
853 archqual = None
855 # look for the package in unstable
856 if name in binaries_s_a:
857 package = binaries_s_a[name]
858 # check the versioned dependency and architecture qualifier
859 # (if present)
860 if (op == "" and version == "") or apt_pkg.check_dep(
861 package.version, op, version
862 ):
863 _check_and_update_packages(packages, package, archqual, build_depends)
865 # look for the package in the virtual packages list and loop on them
866 for prov, prov_version in provides_s_a.get(name, empty_set):
867 assert prov in binaries_s_a
868 package = binaries_s_a[prov]
869 # See Policy Manual §7.5
870 if (op == "" and version == "") or (
871 prov_version != "" and apt_pkg.check_dep(prov_version, op, version)
872 ):
873 _check_and_update_packages(packages, package, archqual, build_depends)
875 return packages
878def invalidate_excuses(
879 excuses: dict[str, "Excuse"],
880 valid: set[str],
881 invalid: set[str],
882 invalidated: set[str],
883) -> None:
884 """Invalidate impossible excuses
886 This method invalidates the impossible excuses, which depend
887 on invalid excuses. The two parameters contains the sets of
888 `valid' and `invalid' excuses.
889 """
890 # make a list of all packages (source and binary) that are present in the
891 # excuses we have
892 excuses_packages: dict[PackageId | BinaryPackageId, set[str]] = defaultdict(set)
893 for exc in excuses.values():
894 for arch in exc.packages:
895 for pkg_arch_id in exc.packages[arch]:
896 # note that the same package can be in multiple excuses
897 # eg. when unstable and TPU have the same packages
898 excuses_packages[pkg_arch_id].add(exc.name)
900 # create dependencies between excuses based on packages
901 excuses_rdeps = defaultdict(set)
902 for exc in excuses.values():
903 # Note that excuses_rdeps is only populated by dependencies generated
904 # based on packages below. There are currently no dependencies between
905 # excuses that are added directly, so this is ok.
907 for pkg_dep in exc.depends_packages:
908 # set of excuses, each of which can satisfy this specific
909 # dependency
910 # if there is a dependency on a package for which no
911 # excuses exist (e.g. a cruft binary), the set will
912 # contain an ImpossibleDependencyState
913 dep_exc: set[str | DependencyState] = set()
914 for pkg_dep_id in cast(set[BinaryPackageId], pkg_dep.deps):
915 pkg_excuses = excuses_packages[pkg_dep_id]
916 # if the dependency isn't found, we get an empty set
917 if pkg_excuses == frozenset():
918 imp_dep = ImpossibleDependencyState(
919 PolicyVerdict.REJECTED_PERMANENTLY, "%s" % (pkg_dep_id.name)
920 )
921 dep_exc.add(imp_dep)
923 else:
924 dep_exc |= pkg_excuses
925 for e in pkg_excuses:
926 excuses_rdeps[e].add(exc.name)
927 if not exc.add_dependency(dep_exc, pkg_dep.spec):
928 valid.discard(exc.name)
929 invalid.add(exc.name)
931 # loop on the invalid excuses
932 # Convert invalid to a list for deterministic results
933 invalid2 = sorted(invalid)
934 for ename in iter_except(invalid2.pop, IndexError):
935 invalidated.add(ename)
936 # if there is no reverse dependency, skip the item
937 if ename not in excuses_rdeps:
938 continue
940 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM
941 if excuses[ename].policy_verdict.is_blocked:
942 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM
944 # loop on the reverse dependencies
945 for x in sorted(excuses_rdeps[ename]):
946 exc = excuses[x]
947 # if the item is valid and it is not marked as `forced', then we
948 # invalidate this specific dependency
949 if x in valid and not exc.forced:
950 # mark this specific dependency as invalid
951 still_valid = exc.invalidate_dependency(ename, rdep_verdict)
953 # if there are no alternatives left for this dependency,
954 # invalidate the excuse
955 if not still_valid:
956 valid.discard(x)
957 invalid2.append(x)
960def compile_nuninst(
961 target_suite: TargetSuite, architectures: list[str], nobreakall_arches: list[str]
962) -> dict[str, set[str]]:
963 """Compile a nuninst dict from the current testing
965 :param target_suite: The target suite
966 :param architectures: Which architectures to check
967 :param nobreakall_arches: Which architectures where arch:all packages must be installable
968 """
969 nuninst: dict[str, set[str]] = {}
970 binaries_t = target_suite.binaries
972 # for all the architectures
973 for arch in architectures:
974 # if it is in the nobreakall ones, check arch-independent packages too
975 check_archall = arch in nobreakall_arches
977 # check all the packages for this architecture
978 nuninst[arch] = set()
979 packages_t_a = binaries_t[arch]
980 for pkg_name, pkg_data in packages_t_a.items():
981 r = target_suite.is_installable(pkg_data.pkg_id)
982 if not r:
983 nuninst[arch].add(pkg_name)
985 # if they are not required, remove architecture-independent packages
986 nuninst[arch + "+all"] = nuninst[arch].copy()
987 if not check_archall:
988 for pkg_name in nuninst[arch + "+all"]:
989 pkg_data = packages_t_a[pkg_name]
990 if pkg_data.architecture == "all":
991 nuninst[arch].remove(pkg_name)
993 return nuninst
996def is_smooth_update_allowed(
997 binary: BinaryPackage, smooth_updates: list[str], hints: "HintCollection"
998) -> bool:
999 if "ALL" in smooth_updates: 999 ↛ 1000line 999 didn't jump to line 1000 because the condition on line 999 was never true
1000 return True
1001 section = binary.section.split("/")[-1]
1002 if section in smooth_updates:
1003 return True
1004 if hints.search(
1005 "allow-smooth-update", package=binary.source, version=binary.source_version
1006 ):
1007 # note that this needs to match the source version *IN TESTING*
1008 return True
1009 return False
1012def find_smooth_updateable_binaries(
1013 binaries_to_check: list[BinaryPackageId],
1014 source_data: SourcePackage,
1015 pkg_universe: "BinaryPackageUniverse",
1016 target_suite: TargetSuite,
1017 binaries_t: dict[str, dict[str, BinaryPackage]],
1018 binaries_s: dict[str, dict[str, BinaryPackage]],
1019 removals: set[BinaryPackageId] | frozenset[BinaryPackageId],
1020 smooth_updates: list[str],
1021 hints: "HintCollection",
1022) -> set[BinaryPackageId]:
1023 check: set[BinaryPackageId] = set()
1024 smoothbins: set[BinaryPackageId] = set()
1026 for check_pkg_id in binaries_to_check:
1027 binary, _, parch = check_pkg_id
1029 cruftbins: set[BinaryPackageId] = set()
1031 # Not a candidate for smooth up date (newer non-cruft version in unstable)
1032 if binary in binaries_s[parch]:
1033 if binaries_s[parch][binary].source_version == source_data.version:
1034 continue
1035 cruftbins.add(binaries_s[parch][binary].pkg_id)
1037 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it.
1038 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints):
1039 # if the package has reverse-dependencies which are
1040 # built from other sources, it's a valid candidate for
1041 # a smooth update. if not, it may still be a valid
1042 # candidate if one if its r-deps is itself a candidate,
1043 # so note it for checking later
1044 rdeps = set(pkg_universe.reverse_dependencies_of(check_pkg_id))
1045 # We ignore all binaries listed in "removals" as we
1046 # assume they will leave at the same time as the
1047 # given package.
1048 rdeps.difference_update(removals, binaries_to_check)
1050 smooth_update_it = False
1051 if target_suite.any_of_these_are_in_the_suite(rdeps):
1052 combined = set(smoothbins)
1053 combined.add(check_pkg_id)
1054 for rdep in rdeps:
1055 # each dependency clause has a set of possible
1056 # alternatives that can satisfy that dependency.
1057 # if any of them is outside the set of smoothbins, the
1058 # dependency can be satisfied even if this binary was
1059 # removed, so there is no need to keep it around for a
1060 # smooth update
1061 # if not, only this binary can satisfy the dependency, so
1062 # we should keep it around until the rdep is no longer in
1063 # testing
1064 for dep_clause in pkg_universe.dependencies_of(rdep):
1065 # filter out cruft binaries from unstable, because
1066 # they will not be added to the set of packages that
1067 # will be migrated
1068 if dep_clause - cruftbins <= combined:
1069 smooth_update_it = True
1070 break
1072 if smooth_update_it:
1073 smoothbins = combined
1074 else:
1075 check.add(check_pkg_id)
1077 # check whether we should perform a smooth update for
1078 # packages which are candidates but do not have r-deps
1079 # outside of the current source
1080 while 1:
1081 found_any = False
1082 for candidate_pkg_id in check:
1083 rdeps = pkg_universe.reverse_dependencies_of(candidate_pkg_id)
1084 if not rdeps.isdisjoint(smoothbins):
1085 smoothbins.add(candidate_pkg_id)
1086 found_any = True
1087 if not found_any:
1088 break
1089 check = {x for x in check if x not in smoothbins}
1091 return smoothbins
1094def find_newer_binaries(
1095 suite_info: Suites, pkg: BinaryPackage, add_source_for_dropped_bin: bool = False
1096) -> list[tuple[PackageId, Suite]]:
1097 """
1098 Find newer binaries for pkg in any of the source suites.
1100 :param pkg: BinaryPackage (is assumed to be in the target suite)
1102 :param add_source_for_dropped_bin: If True, newer versions of the
1103 source of pkg will be added if they don't have the binary pkg
1105 :return: the newer binaries (or sources) and their suites
1106 """
1107 source = pkg.source
1108 newer_versions: list[tuple[PackageId, Suite]] = []
1109 for suite in suite_info:
1110 if suite.suite_class == SuiteClass.TARGET_SUITE:
1111 continue
1113 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture)
1114 if not suite_binaries_on_arch: 1114 ↛ 1115line 1114 didn't jump to line 1115 because the condition on line 1114 was never true
1115 continue
1117 newerbin = None
1118 if pkg.pkg_id.package_name in suite_binaries_on_arch:
1119 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name]
1120 if suite.is_cruft(newerbin):
1121 # We pretend the cruft binary doesn't exist.
1122 # We handle this as if the source didn't have the binary
1123 # (see below)
1124 newerbin = None
1125 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0:
1126 continue
1127 else:
1128 if source not in suite.sources:
1129 # bin and source not in suite: no newer version
1130 continue
1132 if not newerbin:
1133 if not add_source_for_dropped_bin: 1133 ↛ 1134line 1133 didn't jump to line 1134 because the condition on line 1133 was never true
1134 continue
1135 # We only get here if there is a newer version of the source,
1136 # which doesn't have the binary anymore (either it doesn't
1137 # exist, or it's cruft and we pretend it doesn't exist).
1138 # Add the new source instead.
1139 nsrc = suite.sources[source]
1140 n_id = PackageId(source, nsrc.version, "source")
1141 overs = pkg.source_version
1142 if apt_pkg.version_compare(nsrc.version, overs) <= 0:
1143 continue
1144 else:
1145 n_id = newerbin.pkg_id
1147 newer_versions.append((n_id, suite))
1149 return newer_versions
1152def parse_provides(
1153 provides_raw: str,
1154 pkg_id: BinaryPackageId | None = None,
1155 logger: logging.Logger | None = None,
1156) -> list[tuple[str, str, str]]:
1157 parts = apt_pkg.parse_depends(provides_raw, False)
1158 nprov = []
1159 for or_clause in parts:
1160 if len(or_clause) != 1: # pragma: no cover
1161 if logger is not None:
1162 msg = "Ignoring invalid provides in %s: Alternatives [%s]"
1163 logger.warning(msg, str(pkg_id), str(or_clause))
1164 continue
1165 for part in or_clause:
1166 provided, provided_version, op = part
1167 if op != "" and op != "=": # pragma: no cover
1168 if logger is not None:
1169 msg = "Ignoring invalid provides in %s: %s (%s %s)"
1170 logger.warning(msg, str(pkg_id), provided, op, provided_version)
1171 continue
1172 provided = sys.intern(provided)
1173 provided_version = sys.intern(provided_version)
1174 part = (provided, provided_version, sys.intern(op))
1175 nprov.append(part)
1176 return nprov
1179def parse_builtusing(
1180 builtusing_raw: str,
1181 pkg_id: BinaryPackageId | None = None,
1182 logger: logging.Logger | None = None,
1183) -> list[tuple[str, str]]:
1184 parts = apt_pkg.parse_depends(builtusing_raw, False)
1185 nbu = []
1186 for or_clause in parts:
1187 if len(or_clause) != 1: # pragma: no cover
1188 if logger is not None:
1189 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]"
1190 logger.warning(msg, str(pkg_id), str(or_clause))
1191 continue
1192 for part in or_clause:
1193 bu, bu_version, op = part
1194 if op != "=": # pragma: no cover
1195 if logger is not None:
1196 msg = "Ignoring invalid builtusing in %s: %s (%s %s)"
1197 logger.warning(msg, str(pkg_id), bu, op, bu_version)
1198 continue
1199 bu = sys.intern(bu)
1200 bu_version = sys.intern(bu_version)
1201 nbu.append((bu, bu_version))
1202 return nbu
1205def parse_option(
1206 options: "optparse.Values",
1207 option_name: str,
1208 default: Any | None = None,
1209 to_bool: bool = False,
1210 to_int: bool = False,
1211 day_to_sec: bool = False,
1212) -> None:
1213 """Ensure the option exist and has a sane value
1215 :param options: dict with options
1217 :param option_name: string with the name of the option
1219 :param default: the default value for the option
1221 :param to_int: convert the input to int (defaults to sys.maxsize)
1223 :param to_bool: convert the input to bool
1225 :param day_to_sec: convert the input from days to seconds (implies to_int=True)
1226 """
1227 value = getattr(options, option_name, default)
1229 # Option was provided with no value (or default is '') so pick up the default
1230 if value == "":
1231 value = default
1233 if (to_int or day_to_sec) and value in (None, ""):
1234 value = sys.maxsize
1236 if day_to_sec:
1237 value = int(float(value) * 24 * 60 * 60) # type: ignore[arg-type]
1239 if to_int:
1240 value = int(value) # type: ignore[arg-type]
1242 if to_bool:
1243 if value and (
1244 isinstance(value, bool) or value.lower() in ("yes", "y", "true", "t", "1")
1245 ):
1246 value = True
1247 else:
1248 value = False
1250 setattr(options, option_name, value)
1253def filter_out_faux(binaries: Iterable[BinaryPackageId]) -> set[BinaryPackageId]:
1254 """Returns a set without faux packages"""
1256 return {
1257 pkg for pkg in binaries if not pkg.package_name.endswith("-faux-build-depends")
1258 }