Coverage for britney2/utils.py: 90%
462 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-30 09:44 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-30 09:44 +0000
1# Refactored parts from britney.py, which is/was:
2# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org>
3# Andreas Barth <aba@debian.org>
4# Fabio Tranchitella <kobold@debian.org>
5# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org>
6# Copyright (C) 2012 Niels Thykier <niels@thykier.net>
7#
8# New portions
9# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
22import errno
23import logging
24import optparse
25import os
26import sys
27import time
28from collections import defaultdict
29from collections.abc import Callable, Container, Iterable, Iterator, Mapping, MutableSet
30from datetime import UTC, datetime
31from functools import partial
32from itertools import chain, filterfalse
33from typing import (
34 IO,
35 TYPE_CHECKING,
36 Any,
37 Literal,
38 Optional,
39 Protocol,
40 TypeVar,
41 Union,
42 cast,
43 overload,
44)
46import apt_pkg
47import yaml
49from britney2 import (
50 BinaryPackage,
51 BinaryPackageId,
52 PackageId,
53 SourcePackage,
54 Suite,
55 SuiteClass,
56 Suites,
57 TargetSuite,
58)
59from britney2.excusedeps import DependencyState, ImpossibleDependencyState
60from britney2.policies import PolicyVerdict
62if TYPE_CHECKING: 62 ↛ 64line 62 didn't jump to line 64 because the condition on line 62 was never true
64 from _typeshed import SupportsRichComparisonT
65 from apt_pkg import TagSection
67 from .excuse import Excuse
68 from .hints import HintCollection
69 from .installability.universe import BinaryPackageUniverse
70 from .migrationitem import MigrationItem, MigrationItemFactory
72_T = TypeVar("_T")
75class MigrationConstraintException(Exception):
76 pass
79@overload
80def ifilter_except( 80 ↛ exitline 80 didn't jump to the function exit
81 container: Container[_T], iterable: Literal[None] = None
82) -> "partial[filterfalse[_T]]": ...
85@overload
86def ifilter_except( 86 ↛ exitline 86 didn't jump to the function exit
87 container: Container[_T], iterable: Iterable[_T]
88) -> "filterfalse[_T]": ...
91def ifilter_except(
92 container: Container[_T], iterable: Iterable[_T] | None = None
93) -> Union["filterfalse[_T]", "partial[filterfalse[_T]]"]:
94 """Filter out elements in container
96 If given an iterable it returns a filtered iterator, otherwise it
97 returns a function to generate filtered iterators. The latter is
98 useful if the same filter has to be (re-)used on multiple
99 iterators that are not known on beforehand.
100 """
101 if iterable is not None: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 return filterfalse(container.__contains__, iterable)
103 return cast(
104 "partial[filterfalse[_T]]", partial(filterfalse, container.__contains__)
105 )
108@overload
109def ifilter_only(container: Container[_T], iterable: Iterable[_T]) -> "filter[_T]": ... 109 ↛ exitline 109 didn't return from function 'ifilter_only' because
112@overload
113def ifilter_only(container: Container[_T], iterable: None) -> "partial[filter[_T]]": ... 113 ↛ exitline 113 didn't return from function 'ifilter_only' because
116def ifilter_only(
117 container: Container[_T], iterable: Iterable[_T] | None = None
118) -> Union["filter[_T]", "partial[filter[_T]]"]:
119 """Filter out elements in which are not in container
121 If given an iterable it returns a filtered iterator, otherwise it
122 returns a function to generate filtered iterators. The latter is
123 useful if the same filter has to be (re-)used on multiple
124 iterators that are not known on beforehand.
125 """
126 if iterable is not None: 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true
127 return filter(container.__contains__, iterable)
128 return partial(filter, container.__contains__)
131# iter_except is from the "itertools" recipe
132def iter_except(
133 func: Callable[[], _T],
134 exception: type[BaseException] | tuple[type[BaseException], ...],
135 first: Any = None,
136) -> Iterator[_T]: # pragma: no cover - itertools recipe function
137 """Call a function repeatedly until an exception is raised.
139 Converts a call-until-exception interface to an iterator interface.
140 Like __builtin__.iter(func, sentinel) but uses an exception instead
141 of a sentinel to end the loop.
143 Examples:
144 bsddbiter = iter_except(db.next, bsddb.error, db.first)
145 heapiter = iter_except(functools.partial(heappop, h), IndexError)
146 dictiter = iter_except(d.popitem, KeyError)
147 dequeiter = iter_except(d.popleft, IndexError)
148 queueiter = iter_except(q.get_nowait, Queue.Empty)
149 setiter = iter_except(s.pop, KeyError)
151 """
152 try:
153 if first is not None:
154 yield first()
155 while 1:
156 yield func()
157 except exception:
158 pass
161def log_and_format_old_libraries(
162 logger: logging.Logger, libs: list["MigrationItem"]
163) -> None:
164 """Format and log old libraries in a table (no header)"""
165 libraries: dict[str, list[str]] = {}
166 for i in libs:
167 pkg = i.package
168 if pkg in libraries:
169 libraries[pkg].append(i.architecture)
170 else:
171 libraries[pkg] = [i.architecture]
173 for lib in sorted(libraries):
174 logger.info(" %s: %s", lib, " ".join(libraries[lib]))
177def compute_reverse_tree(
178 pkg_universe: "BinaryPackageUniverse", affected: set[BinaryPackageId]
179) -> None:
180 """Calculate the full dependency tree for a set of packages
182 This method returns the full dependency tree for a given set of
183 packages. The first argument is an instance of the BinaryPackageUniverse
184 and the second argument are a set of BinaryPackageId.
186 The set of affected packages will be updated in place and must
187 therefore be mutable.
188 """
189 remain = list(affected)
190 while remain:
191 pkg_id = remain.pop()
192 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected
193 affected.update(new_pkg_ids)
194 remain.extend(new_pkg_ids)
197def add_transitive_dependencies_flatten(
198 pkg_universe: "BinaryPackageUniverse", initial_set: MutableSet[BinaryPackageId]
199) -> None:
200 """Find and include all transitive dependencies
202 This method updates the initial_set parameter to include all transitive
203 dependencies. The first argument is an instance of the BinaryPackageUniverse
204 and the second argument are a set of BinaryPackageId.
206 The set of initial packages will be updated in place and must
207 therefore be mutable.
208 """
209 remain = list(initial_set)
210 while remain:
211 pkg_id = remain.pop()
212 new_pkg_ids = {
213 x
214 for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id))
215 if x not in initial_set
216 }
217 initial_set |= new_pkg_ids
218 remain.extend(new_pkg_ids)
221def write_nuninst(filename: str, nuninst: dict[str, set[str]]) -> None:
222 """Write the non-installable report
224 Write the non-installable report derived from "nuninst" to the
225 file denoted by "filename".
226 """
227 with open(filename, "w", encoding="utf-8") as f:
228 # Having two fields with (almost) identical dates seems a bit
229 # redundant.
230 f.write(
231 "Built on: "
232 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
233 + "\n"
234 )
235 f.write(
236 "Last update: "
237 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
238 + "\n\n"
239 )
240 for k in nuninst:
241 f.write("{}: {}\n".format(k, " ".join(nuninst[k])))
244def read_nuninst(filename: str, architectures: set[str]) -> dict[str, set[str]]:
245 """Read the non-installable report
247 Read the non-installable report from the file denoted by
248 "filename" and return it. Only architectures in "architectures"
249 will be included in the report.
250 """
251 nuninst: dict[str, set[str]] = {}
252 with open(filename, encoding="ascii") as f:
253 for r in f:
254 if ":" not in r:
255 continue
256 arch, packages = r.strip().split(":", 1)
257 if arch.split("+", 1)[0] in architectures:
258 nuninst[arch] = set(packages.split())
259 return nuninst
262def newly_uninst(
263 nuold: dict[str, set[str]], nunew: dict[str, set[str]]
264) -> dict[str, list[str]]:
265 """Return a nuninst statistic with only new uninstallable packages
267 This method subtracts the uninstallable packages of the statistic
268 "nunew" from the statistic "nuold".
270 It returns a dictionary with the architectures as keys and the list
271 of uninstallable packages as values. If there are no regressions
272 on a given architecture, then the architecture will be omitted in
273 the result. Accordingly, if none of the architectures have
274 regressions an empty directory is returned.
275 """
276 res: dict[str, list[str]] = {}
277 for arch in ifilter_only(nunew, nuold):
278 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]]
279 # Leave res empty if there are no newly uninst packages
280 if arch_nuninst:
281 res[arch] = arch_nuninst
282 return res
285def format_and_log_uninst(
286 logger: logging.Logger,
287 architectures: Iterable[str],
288 nuninst: Mapping[str, Iterable[str]],
289 *,
290 loglevel: int = logging.INFO,
291) -> None:
292 """Emits the uninstallable packages to the log
294 An example of the output string is:
295 * i386: broken-pkg1, broken-pkg2
297 Note that if there is no uninstallable packages, then nothing is emitted.
298 """
299 for arch in architectures:
300 if arch in nuninst and nuninst[arch]:
301 msg = " * {}: {}".format(arch, ", ".join(sorted(nuninst[arch])))
302 logger.log(loglevel, msg)
305class Sorted(Protocol):
306 def __call__( 306 ↛ exitline 306 didn't jump to the function exit
307 self,
308 iterable: Iterable["SupportsRichComparisonT"],
309 /,
310 *,
311 key: None = None,
312 reverse: bool = False,
313 ) -> list["SupportsRichComparisonT"]: ...
316def write_heidi(
317 filename: str,
318 target_suite: TargetSuite,
319 *,
320 outofsync_arches: frozenset[str] = frozenset(),
321 sorted: Sorted = sorted,
322) -> None:
323 """Write the output HeidiResult
325 This method write the output for Heidi, which contains all the
326 binary packages and the source packages in the form:
328 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section>
329 <src-name> <src-version> source <src-section>
331 The file is written as "filename" using the sources and packages
332 from the "target_suite" parameter.
334 outofsync_arches: If given, it is a set of architectures marked
335 as "out of sync". The output file may exclude some out of date
336 arch:all packages for those architectures to reduce the noise.
338 The "X=X" parameters are optimizations to avoid "load global" in
339 the loops.
340 """
341 sources_t = target_suite.sources
342 packages_t = target_suite.binaries
344 with open(filename, "w", encoding="ascii") as f:
346 # write binary packages
347 for arch in sorted(packages_t):
348 binaries = packages_t[arch]
349 for pkg_name in sorted(binaries):
350 pkg = binaries[pkg_name]
351 pkgv = pkg.version
352 pkgarch = pkg.architecture or "all"
353 pkgsec = pkg.section or "faux"
354 if pkgsec == "faux" or pkgsec.endswith("/faux"):
355 # Faux package; not really a part of testing
356 continue
357 if ( 357 ↛ 369line 357 didn't jump to line 369
358 pkg.source_version
359 and pkgarch == "all"
360 and pkg.source_version != sources_t[pkg.source].version
361 and arch in outofsync_arches
362 ):
363 # when architectures are marked as "outofsync", their binary
364 # versions may be lower than those of the associated
365 # source package in testing. the binary package list for
366 # such architectures will include arch:all packages
367 # matching those older versions, but we only want the
368 # newer arch:all in testing
369 continue
370 f.write(f"{pkg_name} {pkgv} {pkgarch} {pkgsec}\n")
372 # write sources
373 for src_name in sorted(sources_t):
374 src = sources_t[src_name]
375 srcv = src.version
376 srcsec = src.section or "unknown"
377 if srcsec == "faux" or srcsec.endswith("/faux"):
378 # Faux package; not really a part of testing
379 continue
380 f.write(f"{src_name} {srcv} source {srcsec}\n")
383def write_heidi_delta(filename: str, all_selected: list["MigrationItem"]) -> None:
384 """Write the output delta
386 This method writes the packages to be upgraded, in the form:
387 <src-name> <src-version>
388 or (if the source is to be removed):
389 -<src-name> <src-version>
391 The order corresponds to that shown in update_output.
392 """
393 with open(filename, "w", encoding="ascii") as fd:
395 fd.write("#HeidiDelta\n")
397 for item in all_selected:
398 prefix = ""
400 if item.is_removal:
401 prefix = "-"
403 if item.architecture == "source":
404 fd.write(f"{prefix}{item.package} {item.version}\n")
405 else:
406 fd.write(
407 "%s%s %s %s\n"
408 % (prefix, item.package, item.version, item.architecture)
409 )
412class Opener(Protocol):
413 def __call__( 413 ↛ exitline 413 didn't jump to the function exit
414 self, file: str, mode: Literal["wt"], encoding: Literal["utf-8"]
415 ) -> IO[Any]: ...
418def write_excuses(
419 excuses: dict[str, "Excuse"] | dict[PackageId, "Excuse"],
420 dest_file: str,
421 output_format: Literal["yaml", "legacy-html"] = "yaml",
422) -> None:
423 """Write the excuses to dest_file
425 Writes a list of excuses in a specified output_format to the
426 path denoted by dest_file. The output_format can either be "yaml"
427 or "legacy-html".
428 """
429 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey())
430 if output_format == "yaml":
431 os.makedirs(os.path.dirname(dest_file), exist_ok=True)
432 opener: Opener = open # type: ignore[assignment]
433 if dest_file.endswith(".xz"): 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true
434 import lzma
436 opener = lzma.open # type: ignore[assignment]
437 elif dest_file.endswith(".gz"): 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 import gzip
440 opener = gzip.open # type: ignore[assignment]
441 with opener(dest_file, "wt", encoding="utf-8") as f:
442 edatalist = [e.excusedata(excuses) for e in excuselist]
443 excusesdata = {
444 "sources": edatalist,
445 "generated-date": datetime.now(UTC),
446 }
447 f.write(
448 yaml.dump(excusesdata, default_flow_style=False, allow_unicode=True)
449 )
450 elif output_format == "legacy-html":
451 with open(dest_file, "w", encoding="utf-8") as f:
452 f.write(
453 '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">\n'
454 )
455 f.write("<html><head><title>excuses...</title>")
456 f.write(
457 '<meta http-equiv="Content-Type" content="text/html;charset=utf-8"></head><body>\n'
458 )
459 f.write(
460 "<p>Generated: "
461 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
462 + "</p>\n"
463 )
464 f.write("<ul>\n")
465 for e in excuselist:
466 f.write("<li>%s" % e.html(excuses))
467 f.write("</ul></body></html>\n")
468 else: # pragma: no cover
469 raise ValueError('Output format must be either "yaml or "legacy-html"')
472def old_libraries(
473 mi_factory: "MigrationItemFactory",
474 suite_info: Suites,
475 outofsync_arches: Iterable[str] = frozenset(),
476) -> list["MigrationItem"]:
477 """Detect old libraries left in the target suite for smooth transitions
479 This method detects old libraries which are in the target suite but no
480 longer built from the source package: they are still there because
481 other packages still depend on them, but they should be removed as
482 soon as possible.
484 For "outofsync" architectures, outdated binaries are allowed to be in
485 the target suite, so they are only added to the removal list if they
486 are no longer in the (primary) source suite.
487 """
488 sources_t = suite_info.target_suite.sources
489 binaries_t = suite_info.target_suite.binaries
490 binaries_s = suite_info.primary_source_suite.binaries
491 removals = []
492 for arch in binaries_t:
493 for pkg_name in binaries_t[arch]:
494 pkg = binaries_t[arch][pkg_name]
495 if sources_t[pkg.source].version != pkg.source_version and (
496 arch not in outofsync_arches or pkg_name not in binaries_s[arch]
497 ):
498 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id))
499 return removals
502def is_nuninst_asgood_generous(
503 constraints: dict[str, list[str]],
504 allow_uninst: dict[str, set[str | None]],
505 architectures: list[str],
506 old: dict[str, set[str]],
507 new: dict[str, set[str]],
508 break_arches: set[str] = cast(set[str], frozenset()),
509) -> bool:
510 """Compares the nuninst counters and constraints to see if they improved
512 Given a list of architectures, the previous and the current nuninst
513 counters, this function determines if the current nuninst counter
514 is better than the previous one. Optionally it also accepts a set
515 of "break_arches", the nuninst counter for any architecture listed
516 in this set are completely ignored.
518 If the nuninst counters are equal or better, then the constraints
519 are checked for regressions (ignoring break_arches).
521 Returns True if the new nuninst counter is better than the
522 previous and there are no constraint regressions (ignoring Break-archs).
523 Returns False otherwise.
525 """
526 diff = 0
527 for arch in architectures:
528 if arch in break_arches:
529 continue
530 diff = diff + (
531 len(new[arch] - allow_uninst[arch]) - len(old[arch] - allow_uninst[arch])
532 )
533 if diff > 0: 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true
534 return False
535 must_be_installable = constraints["keep-installable"]
536 for arch in architectures:
537 if arch in break_arches:
538 continue
539 regression = new[arch] - old[arch]
540 if not regression.isdisjoint(must_be_installable): 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 return False
542 return True
545def clone_nuninst(
546 nuninst: dict[str, set[str]],
547 *,
548 packages_s: dict[str, dict[str, BinaryPackage]] | None = None,
549 architectures: Iterable[str] | None = None,
550) -> dict[str, set[str]]:
551 """Completely or Selectively deep clone nuninst
553 Given nuninst table, the package table for a given suite and
554 a list of architectures, this function will clone the nuninst
555 table. Only the listed architectures will be deep cloned -
556 the rest will only be shallow cloned. When packages_s is given,
557 packages not listed in packages_s will be pruned from the clone
558 (if packages_s is omitted, the per architecture nuninst is cloned
559 as-is)
560 """
561 clone = nuninst.copy()
562 if architectures is None: 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true
563 return clone
564 if packages_s is not None:
565 for arch in architectures:
566 clone[arch] = {x for x in nuninst[arch] if x in packages_s[arch]}
567 clone[arch + "+all"] = {
568 x for x in nuninst[arch + "+all"] if x in packages_s[arch]
569 }
570 else:
571 for arch in architectures:
572 clone[arch] = set(nuninst[arch])
573 clone[arch + "+all"] = set(nuninst[arch + "+all"])
574 return clone
577def test_installability(
578 target_suite: TargetSuite,
579 pkg_name: str,
580 pkg_id: BinaryPackageId,
581 broken: set[str],
582 nuninst_arch: set[str] | None,
583) -> None:
584 """Test for installability of a package on an architecture
586 (pkg_name, pkg_version, pkg_arch) is the package to check.
588 broken is the set of broken packages. If p changes
589 installability (e.g. goes from uninstallable to installable),
590 broken will be updated accordingly.
592 If nuninst_arch is not None then it also updated in the same
593 way as broken is.
594 """
595 if not target_suite.is_installable(pkg_id):
596 # if pkg_name not in broken: regression else: already broken
597 broken.add(pkg_name)
598 if nuninst_arch is not None:
599 nuninst_arch.add(pkg_name)
600 else:
601 # if pkg_name in broken: # improvement else: already not broken
602 broken.discard(pkg_name)
603 if nuninst_arch is not None:
604 nuninst_arch.discard(pkg_name)
607def check_installability(
608 target_suite: TargetSuite,
609 binaries: dict[str, dict[str, BinaryPackage]],
610 arch: str,
611 updates: set[BinaryPackageId],
612 check_archall: bool,
613 nuninst: dict[str, set[str]],
614) -> None:
615 broken = nuninst[arch + "+all"]
616 packages_t_a = binaries[arch]
618 for pkg_id in (x for x in updates if x.architecture == arch):
619 name, version, parch = pkg_id
620 if name not in packages_t_a:
621 continue
622 pkgdata = packages_t_a[name]
623 if version != pkgdata.version:
624 # Not the version in testing right now, ignore
625 continue
626 actual_arch = pkgdata.architecture
627 nuninst_arch = None
628 # only check arch:all packages if requested
629 if check_archall or actual_arch != "all":
630 nuninst_arch = nuninst[parch]
631 elif actual_arch == "all": 631 ↛ 633line 631 didn't jump to line 633 because the condition on line 631 was always true
632 nuninst[parch].discard(name)
633 test_installability(target_suite, name, pkg_id, broken, nuninst_arch)
636def possibly_compressed(
637 path: str, *, permitted_compressions: list[str] | None = None
638) -> str:
639 """Find and select a (possibly compressed) variant of a path
641 If the given path exists, it will be returned
643 :param path: The base path.
644 :param permitted_compressions: Alternative extensions to look for. Defaults to "gz" and "xz".
645 :return: The path given possibly with one of the permitted extensions.
646 :raises FileNotFoundError: if the path is not found
647 """
648 if os.path.exists(path): 648 ↛ 650line 648 didn't jump to line 650 because the condition on line 648 was always true
649 return path
650 if permitted_compressions is None:
651 permitted_compressions = ["gz", "xz"]
652 for ext in permitted_compressions:
653 cpath = f"{path}.{ext}"
654 if os.path.exists(cpath):
655 return cpath
656 raise FileNotFoundError(
657 errno.ENOENT, os.strerror(errno.ENOENT), path
658 ) # pragma: no cover
661def create_provides_map(
662 packages: dict[str, BinaryPackage],
663) -> dict[str, set[tuple[str, str]]]:
664 """Create a provides map from a map binary package names and their BinaryPackage objects
666 :param packages: A dict mapping binary package names to their BinaryPackage object
667 :return: A provides map
668 """
669 # create provides
670 provides = defaultdict(set)
672 for pkg, dpkg in packages.items():
673 # register virtual packages and real packages that provide
674 # them
675 for provided_pkg, provided_version, _ in dpkg.provides:
676 provides[provided_pkg].add((pkg, provided_version))
678 return provides
681def read_release_file(suite_dir: str) -> "TagSection[str]":
682 """Parses a given "Release" file
684 :param suite_dir: The directory to the suite
685 :return: A dict of the first (and only) paragraph in an Release file
686 """
687 release_file = os.path.join(suite_dir, "Release")
688 with open(release_file) as fd:
689 tag_file = iter(apt_pkg.TagFile(fd))
690 result = next(tag_file)
691 if next(tag_file, None) is not None: # pragma: no cover
692 raise TypeError("%s has more than one paragraph" % release_file)
693 return result
696def read_sources_file(
697 filename: str,
698 sources: dict[str, SourcePackage] | None = None,
699 add_faux: bool = True,
700 intern: Callable[[str], str] = sys.intern,
701) -> dict[str, SourcePackage]:
702 """Parse a single Sources file into a hash
704 Parse a single Sources file into a dict mapping a source package
705 name to a SourcePackage object. If there are multiple source
706 packages with the same version, then highest versioned source
707 package (that is not marked as "Extra-Source-Only") is the
708 version kept in the dict.
710 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile
711 :param sources: Optional dict to add the packages to. If given, this is also the value returned.
712 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all
713 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop
714 :return: mapping from names to a source package
715 """
716 if sources is None:
717 sources = {}
719 tag_file = apt_pkg.TagFile(filename)
720 get_field = tag_file.section.get
721 step = tag_file.step
723 while step():
724 if get_field("Extra-Source-Only", "no") == "yes":
725 # Ignore sources only referenced by Built-Using
726 continue
727 pkg = get_field("Package")
728 ver = get_field("Version")
729 # There may be multiple versions of the source package
730 # (in unstable) if some architectures have out-of-date
731 # binaries. We only ever consider the source with the
732 # largest version for migration.
733 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0:
734 continue
735 maint = get_field("Maintainer")
736 if maint: 736 ↛ 738line 736 didn't jump to line 738 because the condition on line 736 was always true
737 maint = intern(maint.strip())
738 section = get_field("Section")
739 if section: 739 ↛ 742line 739 didn't jump to line 742 because the condition on line 739 was always true
740 section = intern(section.strip())
741 build_deps_arch: str | None
742 build_deps_arch = ", ".join(
743 x
744 for x in (get_field("Build-Depends"), get_field("Build-Depends-Arch"))
745 if x is not None
746 )
747 if build_deps_arch != "":
748 build_deps_arch = sys.intern(build_deps_arch)
749 else:
750 build_deps_arch = None
751 build_deps_indep = get_field("Build-Depends-Indep")
752 if build_deps_indep is not None:
753 build_deps_indep = sys.intern(build_deps_indep)
755 # Adding arch:all packages to the list of binaries already to be able
756 # to check for them later. Helps mitigate bug 887060 and is the
757 # (partial?) answer to bug 1064428.
758 binaries: set[BinaryPackageId] = set()
759 if add_faux and "all" in get_field("Architecture", "").split():
760 # the value "faux" in arch:faux is used elsewhere, so keep in sync
761 pkg_id = BinaryPackageId(pkg + "-faux", intern("0~~~~"), intern("faux"))
762 binaries.add(pkg_id)
764 sources[intern(pkg)] = SourcePackage(
765 intern(pkg),
766 intern(ver),
767 section,
768 binaries,
769 maint,
770 False,
771 build_deps_arch,
772 build_deps_indep,
773 get_field("Testsuite", "").split(),
774 get_field("Testsuite-Triggers", "").replace(",", "").split(),
775 )
776 return sources
779def _check_and_update_packages(
780 packages: list[BinaryPackage],
781 package: BinaryPackage,
782 archqual: str | None,
783 build_depends: bool,
784) -> None:
785 """Helper for get_dependency_solvers
787 This method updates the list of packages with a given package if that
788 package is a valid (Build-)Depends.
790 :param packages: which packages are to be updated
791 :param archqual: Architecture qualifier
792 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency.
793 """
795 # See also bug #971739 and #1059929
796 if archqual is None:
797 packages.append(package)
798 elif archqual == "native" and build_depends:
799 # Multi-arch handling for build-dependencies
800 # - :native is ok always
801 packages.append(package)
802 elif archqual == "any" and package.multi_arch == "allowed": 802 ↛ 805line 802 didn't jump to line 805 because the condition on line 802 was never true
803 # Multi-arch handling for both build-dependencies and regular dependencies
804 # - :any is ok iff the target has "M-A: allowed"
805 packages.append(package)
808class GetDependencySolversProto(Protocol):
809 def __call__( 809 ↛ exitline 809 didn't jump to the function exit
810 self,
811 block: list[tuple[str, str, str]],
812 binaries_s_a: dict[str, BinaryPackage],
813 provides_s_a: dict[str, set[tuple[str, str]]],
814 *,
815 build_depends: bool = False,
816 empty_set: Any = frozenset(),
817 ) -> list[BinaryPackage]: ...
820def get_dependency_solvers(
821 block: list[tuple[str, str, str]],
822 binaries_s_a: dict[str, BinaryPackage],
823 provides_s_a: dict[str, set[tuple[str, str]]],
824 *,
825 build_depends: bool = False,
826 empty_set: Any = frozenset(),
827) -> list[BinaryPackage]:
828 """Find the packages which satisfy a dependency block
830 This method returns the list of packages which satisfy a dependency
831 block (as returned by apt_pkg.parse_depends) in a package table
832 for a given suite and architecture (a la self.binaries[suite][arch])
834 It can also handle build-dependency relations if the named parameter
835 "build_depends" is set to True. In this case, block should be based
836 on the return value from apt_pkg.parse_src_depends.
838 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends
839 if the "build_depends" is True)
840 :param binaries_s_a: Mapping of package names to the relevant BinaryPackage
841 :param provides_s_a: Mapping of package names to their providers (as generated by parse_provides)
842 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than
843 a regular dependency relation.
844 :param empty_set: Internal implementation detail / optimisation
845 :return: package names solving the relation
846 """
847 packages: list[BinaryPackage] = []
849 # for every package, version and operation in the block
850 for name, version, op in block:
851 if ":" in name:
852 name, archqual = name.split(":", 1)
853 else:
854 archqual = None
856 # look for the package in unstable
857 if name in binaries_s_a:
858 package = binaries_s_a[name]
859 # check the versioned dependency and architecture qualifier
860 # (if present)
861 if (op == "" and version == "") or apt_pkg.check_dep(
862 package.version, op, version
863 ):
864 _check_and_update_packages(packages, package, archqual, build_depends)
866 # look for the package in the virtual packages list and loop on them
867 for prov, prov_version in provides_s_a.get(name, empty_set):
868 assert prov in binaries_s_a
869 package = binaries_s_a[prov]
870 # See Policy Manual §7.5
871 if (op == "" and version == "") or (
872 prov_version != "" and apt_pkg.check_dep(prov_version, op, version)
873 ):
874 _check_and_update_packages(packages, package, archqual, build_depends)
876 return packages
879def invalidate_excuses(
880 excuses: dict[str, "Excuse"],
881 valid: set[str],
882 invalid: set[str],
883 invalidated: set[str],
884) -> None:
885 """Invalidate impossible excuses
887 This method invalidates the impossible excuses, which depend
888 on invalid excuses. The two parameters contains the sets of
889 `valid' and `invalid' excuses.
890 """
891 # make a list of all packages (source and binary) that are present in the
892 # excuses we have
893 excuses_packages: dict[PackageId | BinaryPackageId, set[str]] = defaultdict(set)
894 for exc in excuses.values():
895 for arch in exc.packages:
896 for pkg_arch_id in exc.packages[arch]:
897 # note that the same package can be in multiple excuses
898 # eg. when unstable and TPU have the same packages
899 excuses_packages[pkg_arch_id].add(exc.name)
901 # create dependencies between excuses based on packages
902 excuses_rdeps = defaultdict(set)
903 for exc in excuses.values():
904 # Note that excuses_rdeps is only populated by dependencies generated
905 # based on packages below. There are currently no dependencies between
906 # excuses that are added directly, so this is ok.
908 for pkg_dep in exc.depends_packages:
909 # set of excuses, each of which can satisfy this specific
910 # dependency
911 # if there is a dependency on a package for which no
912 # excuses exist (e.g. a cruft binary), the set will
913 # contain an ImpossibleDependencyState
914 dep_exc: set[str | DependencyState] = set()
915 for pkg_dep_id in cast(set[BinaryPackageId], pkg_dep.deps):
916 pkg_excuses = excuses_packages[pkg_dep_id]
917 # if the dependency isn't found, we get an empty set
918 if pkg_excuses == frozenset():
919 imp_dep = ImpossibleDependencyState(
920 PolicyVerdict.REJECTED_PERMANENTLY, "%s" % (pkg_dep_id.name)
921 )
922 dep_exc.add(imp_dep)
924 else:
925 dep_exc |= pkg_excuses
926 for e in pkg_excuses:
927 excuses_rdeps[e].add(exc.name)
928 if not exc.add_dependency(dep_exc, pkg_dep.spec):
929 valid.discard(exc.name)
930 invalid.add(exc.name)
932 # loop on the invalid excuses
933 # Convert invalid to a list for deterministic results
934 invalid2 = sorted(invalid)
935 for ename in iter_except(invalid2.pop, IndexError):
936 invalidated.add(ename)
937 # if there is no reverse dependency, skip the item
938 if ename not in excuses_rdeps:
939 continue
941 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM
942 if excuses[ename].policy_verdict.is_blocked:
943 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM
945 # loop on the reverse dependencies
946 for x in sorted(excuses_rdeps[ename]):
947 exc = excuses[x]
948 # if the item is valid and it is not marked as `forced', then we
949 # invalidate this specific dependency
950 if x in valid and not exc.forced:
951 # mark this specific dependency as invalid
952 still_valid = exc.invalidate_dependency(ename, rdep_verdict)
954 # if there are no alternatives left for this dependency,
955 # invalidate the excuse
956 if not still_valid:
957 valid.discard(x)
958 invalid2.append(x)
961def compile_nuninst(
962 target_suite: TargetSuite, architectures: list[str], nobreakall_arches: list[str]
963) -> dict[str, set[str]]:
964 """Compile a nuninst dict from the current testing
966 :param target_suite: The target suite
967 :param architectures: Which architectures to check
968 :param nobreakall_arches: Which architectures where arch:all packages must be installable
969 """
970 nuninst: dict[str, set[str]] = {}
971 binaries_t = target_suite.binaries
973 # for all the architectures
974 for arch in architectures:
975 # if it is in the nobreakall ones, check arch-independent packages too
976 check_archall = arch in nobreakall_arches
978 # check all the packages for this architecture
979 nuninst[arch] = set()
980 packages_t_a = binaries_t[arch]
981 for pkg_name, pkg_data in packages_t_a.items():
982 r = target_suite.is_installable(pkg_data.pkg_id)
983 if not r:
984 nuninst[arch].add(pkg_name)
986 # if they are not required, remove architecture-independent packages
987 nuninst[arch + "+all"] = nuninst[arch].copy()
988 if not check_archall:
989 for pkg_name in nuninst[arch + "+all"]:
990 pkg_data = packages_t_a[pkg_name]
991 if pkg_data.architecture == "all":
992 nuninst[arch].remove(pkg_name)
994 return nuninst
997def is_smooth_update_allowed(
998 binary: BinaryPackage, smooth_updates: list[str], hints: "HintCollection"
999) -> bool:
1000 if "ALL" in smooth_updates: 1000 ↛ 1001line 1000 didn't jump to line 1001 because the condition on line 1000 was never true
1001 return True
1002 section = binary.section.split("/")[-1]
1003 if section in smooth_updates:
1004 return True
1005 if hints.search( 1005 ↛ 1009line 1005 didn't jump to line 1009 because the condition on line 1005 was never true
1006 "allow-smooth-update", package=binary.source, version=binary.source_version
1007 ):
1008 # note that this needs to match the source version *IN TESTING*
1009 return True
1010 return False
1013def find_smooth_updateable_binaries(
1014 binaries_to_check: list[BinaryPackageId],
1015 source_data: SourcePackage,
1016 pkg_universe: "BinaryPackageUniverse",
1017 target_suite: TargetSuite,
1018 binaries_t: dict[str, dict[str, BinaryPackage]],
1019 binaries_s: dict[str, dict[str, BinaryPackage]],
1020 removals: set[BinaryPackageId] | frozenset[BinaryPackageId],
1021 smooth_updates: list[str],
1022 hints: "HintCollection",
1023) -> set[BinaryPackageId]:
1024 check: set[BinaryPackageId] = set()
1025 smoothbins: set[BinaryPackageId] = set()
1027 for check_pkg_id in binaries_to_check:
1028 binary, _, parch = check_pkg_id
1030 cruftbins: set[BinaryPackageId] = set()
1032 # Not a candidate for smooth up date (newer non-cruft version in unstable)
1033 if binary in binaries_s[parch]:
1034 if binaries_s[parch][binary].source_version == source_data.version:
1035 continue
1036 cruftbins.add(binaries_s[parch][binary].pkg_id)
1038 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it.
1039 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints):
1040 # if the package has reverse-dependencies which are
1041 # built from other sources, it's a valid candidate for
1042 # a smooth update. if not, it may still be a valid
1043 # candidate if one if its r-deps is itself a candidate,
1044 # so note it for checking later
1045 rdeps = set(pkg_universe.reverse_dependencies_of(check_pkg_id))
1046 # We ignore all binaries listed in "removals" as we
1047 # assume they will leave at the same time as the
1048 # given package.
1049 rdeps.difference_update(removals, binaries_to_check)
1051 smooth_update_it = False
1052 if target_suite.any_of_these_are_in_the_suite(rdeps):
1053 combined = set(smoothbins)
1054 combined.add(check_pkg_id)
1055 for rdep in rdeps:
1056 # each dependency clause has a set of possible
1057 # alternatives that can satisfy that dependency.
1058 # if any of them is outside the set of smoothbins, the
1059 # dependency can be satisfied even if this binary was
1060 # removed, so there is no need to keep it around for a
1061 # smooth update
1062 # if not, only this binary can satisfy the dependency, so
1063 # we should keep it around until the rdep is no longer in
1064 # testing
1065 for dep_clause in pkg_universe.dependencies_of(rdep):
1066 # filter out cruft binaries from unstable, because
1067 # they will not be added to the set of packages that
1068 # will be migrated
1069 if dep_clause - cruftbins <= combined:
1070 smooth_update_it = True
1071 break
1073 if smooth_update_it:
1074 smoothbins = combined
1075 else:
1076 check.add(check_pkg_id)
1078 # check whether we should perform a smooth update for
1079 # packages which are candidates but do not have r-deps
1080 # outside of the current source
1081 while 1:
1082 found_any = False
1083 for candidate_pkg_id in check:
1084 rdeps = pkg_universe.reverse_dependencies_of(candidate_pkg_id)
1085 if not rdeps.isdisjoint(smoothbins):
1086 smoothbins.add(candidate_pkg_id)
1087 found_any = True
1088 if not found_any:
1089 break
1090 check = {x for x in check if x not in smoothbins}
1092 return smoothbins
1095def find_newer_binaries(
1096 suite_info: Suites, pkg: BinaryPackage, add_source_for_dropped_bin: bool = False
1097) -> list[tuple[PackageId, Suite]]:
1098 """
1099 Find newer binaries for pkg in any of the source suites.
1101 :param pkg: BinaryPackage (is assumed to be in the target suite)
1103 :param add_source_for_dropped_bin: If True, newer versions of the
1104 source of pkg will be added if they don't have the binary pkg
1106 :return: the newer binaries (or sources) and their suites
1107 """
1108 source = pkg.source
1109 newer_versions: list[tuple[PackageId, Suite]] = []
1110 for suite in suite_info:
1111 if suite.suite_class == SuiteClass.TARGET_SUITE:
1112 continue
1114 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture)
1115 if not suite_binaries_on_arch: 1115 ↛ 1116line 1115 didn't jump to line 1116 because the condition on line 1115 was never true
1116 continue
1118 newerbin = None
1119 if pkg.pkg_id.package_name in suite_binaries_on_arch:
1120 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name]
1121 if suite.is_cruft(newerbin):
1122 # We pretend the cruft binary doesn't exist.
1123 # We handle this as if the source didn't have the binary
1124 # (see below)
1125 newerbin = None
1126 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0:
1127 continue
1128 else:
1129 if source not in suite.sources:
1130 # bin and source not in suite: no newer version
1131 continue
1133 if not newerbin:
1134 if not add_source_for_dropped_bin: 1134 ↛ 1135line 1134 didn't jump to line 1135 because the condition on line 1134 was never true
1135 continue
1136 # We only get here if there is a newer version of the source,
1137 # which doesn't have the binary anymore (either it doesn't
1138 # exist, or it's cruft and we pretend it doesn't exist).
1139 # Add the new source instead.
1140 nsrc = suite.sources[source]
1141 n_id = PackageId(source, nsrc.version, "source")
1142 overs = pkg.source_version
1143 if apt_pkg.version_compare(nsrc.version, overs) <= 0:
1144 continue
1145 else:
1146 n_id = newerbin.pkg_id
1148 newer_versions.append((n_id, suite))
1150 return newer_versions
1153def parse_provides(
1154 provides_raw: str,
1155 pkg_id: BinaryPackageId | None = None,
1156 logger: logging.Logger | None = None,
1157) -> list[tuple[str, str, str]]:
1158 parts = apt_pkg.parse_depends(provides_raw, False)
1159 nprov = []
1160 for or_clause in parts:
1161 if len(or_clause) != 1: # pragma: no cover
1162 if logger is not None:
1163 msg = "Ignoring invalid provides in %s: Alternatives [%s]"
1164 logger.warning(msg, str(pkg_id), str(or_clause))
1165 continue
1166 for part in or_clause:
1167 provided, provided_version, op = part
1168 if op != "" and op != "=": # pragma: no cover
1169 if logger is not None:
1170 msg = "Ignoring invalid provides in %s: %s (%s %s)"
1171 logger.warning(msg, str(pkg_id), provided, op, provided_version)
1172 continue
1173 provided = sys.intern(provided)
1174 provided_version = sys.intern(provided_version)
1175 part = (provided, provided_version, sys.intern(op))
1176 nprov.append(part)
1177 return nprov
1180def parse_builtusing(
1181 builtusing_raw: str,
1182 pkg_id: BinaryPackageId | None = None,
1183 logger: logging.Logger | None = None,
1184) -> list[tuple[str, str]]:
1185 parts = apt_pkg.parse_depends(builtusing_raw, False)
1186 nbu = []
1187 for or_clause in parts:
1188 if len(or_clause) != 1: # pragma: no cover
1189 if logger is not None:
1190 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]"
1191 logger.warning(msg, str(pkg_id), str(or_clause))
1192 continue
1193 for part in or_clause:
1194 bu, bu_version, op = part
1195 if op != "=": # pragma: no cover
1196 if logger is not None:
1197 msg = "Ignoring invalid builtusing in %s: %s (%s %s)"
1198 logger.warning(msg, str(pkg_id), bu, op, bu_version)
1199 continue
1200 bu = sys.intern(bu)
1201 bu_version = sys.intern(bu_version)
1202 nbu.append((bu, bu_version))
1203 return nbu
1206def parse_option(
1207 options: "optparse.Values",
1208 option_name: str,
1209 default: Any | None = None,
1210 to_bool: bool = False,
1211 to_int: bool = False,
1212 day_to_sec: bool = False,
1213) -> None:
1214 """Ensure the option exist and has a sane value
1216 :param options: dict with options
1218 :param option_name: string with the name of the option
1220 :param default: the default value for the option
1222 :param to_int: convert the input to int (defaults to sys.maxsize)
1224 :param to_bool: convert the input to bool
1226 :param day_to_sec: convert the input from days to seconds (implies to_int=True)
1227 """
1228 value = getattr(options, option_name, default)
1230 # Option was provided with no value (or default is '') so pick up the default
1231 if value == "":
1232 value = default
1234 if (to_int or day_to_sec) and value in (None, ""):
1235 value = sys.maxsize
1237 if day_to_sec:
1238 value = int(float(value) * 24 * 60 * 60) # type: ignore[arg-type]
1240 if to_int:
1241 value = int(value) # type: ignore[arg-type]
1243 if to_bool:
1244 if value and (
1245 isinstance(value, bool) or value.lower() in ("yes", "y", "true", "t", "1")
1246 ):
1247 value = True
1248 else:
1249 value = False
1251 setattr(options, option_name, value)