Coverage for britney2/utils.py: 90%
469 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
1# Refactored parts from britney.py, which is/was:
2# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org>
3# Andreas Barth <aba@debian.org>
4# Fabio Tranchitella <kobold@debian.org>
5# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org>
6# Copyright (C) 2012 Niels Thykier <niels@thykier.net>
7#
8# New portions
9# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
22import errno
23import logging
24import optparse
25import os
26import sys
27import time
28from collections import defaultdict
29from collections.abc import Callable, Container, Iterable, Iterator, Mapping, MutableSet
30from datetime import UTC, datetime
31from functools import partial
32from itertools import chain, filterfalse
33from typing import (
34 IO,
35 TYPE_CHECKING,
36 Any,
37 Literal,
38 Optional,
39 Protocol,
40 TypeVar,
41 Union,
42 cast,
43 overload,
44)
46import apt_pkg
47import yaml
49from britney2 import (
50 BinaryPackage,
51 BinaryPackageId,
52 PackageId,
53 SourcePackage,
54 Suite,
55 SuiteClass,
56 Suites,
57 TargetSuite,
58)
59from britney2.excusedeps import DependencyState, ImpossibleDependencyState
60from britney2.policies import PolicyVerdict
62if TYPE_CHECKING: 62 ↛ 64line 62 didn't jump to line 64 because the condition on line 62 was never true
64 from _typeshed import SupportsRichComparisonT
65 from apt_pkg import TagSection
67 from .excuse import Excuse
68 from .hints import HintCollection
69 from .installability.universe import BinaryPackageUniverse
70 from .migrationitem import MigrationItem, MigrationItemFactory
72_T = TypeVar("_T")
75class MigrationConstraintException(Exception):
76 pass
79@overload
80def ifilter_except( 80 ↛ exitline 80 didn't jump to the function exit
81 container: Container[_T], iterable: Literal[None] = None
82) -> "partial[filterfalse[_T]]": ...
85@overload
86def ifilter_except( 86 ↛ exitline 86 didn't jump to the function exit
87 container: Container[_T], iterable: Iterable[_T]
88) -> "filterfalse[_T]": ...
91def ifilter_except(
92 container: Container[_T], iterable: Iterable[_T] | None = None
93) -> Union["filterfalse[_T]", "partial[filterfalse[_T]]"]:
94 """Filter out elements in container
96 If given an iterable it returns a filtered iterator, otherwise it
97 returns a function to generate filtered iterators. The latter is
98 useful if the same filter has to be (re-)used on multiple
99 iterators that are not known on beforehand.
100 """
101 if iterable is not None: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 return filterfalse(container.__contains__, iterable)
103 return cast(
104 "partial[filterfalse[_T]]", partial(filterfalse, container.__contains__)
105 )
108@overload
109def ifilter_only(container: Container[_T], iterable: Iterable[_T]) -> "filter[_T]": ... 109 ↛ exitline 109 didn't return from function 'ifilter_only' because
112@overload
113def ifilter_only(container: Container[_T], iterable: None) -> "partial[filter[_T]]": ... 113 ↛ exitline 113 didn't return from function 'ifilter_only' because
116def ifilter_only(
117 container: Container[_T], iterable: Iterable[_T] | None = None
118) -> Union["filter[_T]", "partial[filter[_T]]"]:
119 """Filter out elements in which are not in container
121 If given an iterable it returns a filtered iterator, otherwise it
122 returns a function to generate filtered iterators. The latter is
123 useful if the same filter has to be (re-)used on multiple
124 iterators that are not known on beforehand.
125 """
126 if iterable is not None: 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true
127 return filter(container.__contains__, iterable)
128 return partial(filter, container.__contains__)
131# iter_except is from the "itertools" recipe
132def iter_except(
133 func: Callable[[], _T],
134 exception: type[BaseException] | tuple[type[BaseException], ...],
135 first: Any = None,
136) -> Iterator[_T]: # pragma: no cover - itertools recipe function
137 """Call a function repeatedly until an exception is raised.
139 Converts a call-until-exception interface to an iterator interface.
140 Like __builtin__.iter(func, sentinel) but uses an exception instead
141 of a sentinel to end the loop.
143 Examples:
144 bsddbiter = iter_except(db.next, bsddb.error, db.first)
145 heapiter = iter_except(functools.partial(heappop, h), IndexError)
146 dictiter = iter_except(d.popitem, KeyError)
147 dequeiter = iter_except(d.popleft, IndexError)
148 queueiter = iter_except(q.get_nowait, Queue.Empty)
149 setiter = iter_except(s.pop, KeyError)
151 """
152 try:
153 if first is not None:
154 yield first()
155 while 1:
156 yield func()
157 except exception:
158 pass
161def log_and_format_old_libraries(
162 logger: logging.Logger, libs: list["MigrationItem"]
163) -> None:
164 """Format and log old libraries in a table (no header)"""
165 libraries: dict[str, list[str]] = {}
166 for i in libs:
167 pkg = i.package
168 if pkg in libraries:
169 libraries[pkg].append(i.architecture)
170 else:
171 libraries[pkg] = [i.architecture]
173 for lib in sorted(libraries):
174 logger.info(" %s: %s", lib, " ".join(libraries[lib]))
177def compute_reverse_tree(
178 pkg_universe: "BinaryPackageUniverse", affected: set[BinaryPackageId]
179) -> None:
180 """Calculate the full dependency tree for a set of packages
182 This method returns the full dependency tree for a given set of
183 packages. The first argument is an instance of the BinaryPackageUniverse
184 and the second argument are a set of BinaryPackageId.
186 The set of affected packages will be updated in place and must
187 therefore be mutable.
188 """
189 remain = list(affected)
190 while remain:
191 pkg_id = remain.pop()
192 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected
193 affected.update(new_pkg_ids)
194 remain.extend(new_pkg_ids)
197def add_transitive_dependencies_flatten(
198 pkg_universe: "BinaryPackageUniverse", initial_set: MutableSet[BinaryPackageId]
199) -> None:
200 """Find and include all transitive dependencies
202 This method updates the initial_set parameter to include all transitive
203 dependencies. The first argument is an instance of the BinaryPackageUniverse
204 and the second argument are a set of BinaryPackageId.
206 The set of initial packages will be updated in place and must
207 therefore be mutable.
208 """
209 remain = list(initial_set)
210 while remain:
211 pkg_id = remain.pop()
212 new_pkg_ids = {
213 x
214 for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id))
215 if x not in initial_set
216 }
217 initial_set |= new_pkg_ids
218 remain.extend(new_pkg_ids)
221def write_nuninst(filename: str, nuninst: dict[str, set[str]]) -> None:
222 """Write the non-installable report
224 Write the non-installable report derived from "nuninst" to the
225 file denoted by "filename".
226 """
227 with open(filename, "w", encoding="utf-8") as f:
228 # Having two fields with (almost) identical dates seems a bit
229 # redundant.
230 f.write(
231 "Built on: "
232 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
233 + "\n"
234 )
235 f.write(
236 "Last update: "
237 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
238 + "\n\n"
239 )
240 for k in nuninst:
241 f.write("{}: {}\n".format(k, " ".join(nuninst[k])))
244def read_nuninst(filename: str, architectures: set[str]) -> dict[str, set[str]]:
245 """Read the non-installable report
247 Read the non-installable report from the file denoted by
248 "filename" and return it. Only architectures in "architectures"
249 will be included in the report.
250 """
251 nuninst: dict[str, set[str]] = {}
252 with open(filename, encoding="ascii") as f:
253 for r in f:
254 if ":" not in r:
255 continue
256 arch, packages = r.strip().split(":", 1)
257 if arch.split("+", 1)[0] in architectures:
258 nuninst[arch] = set(packages.split())
259 return nuninst
262def newly_uninst(
263 nuold: dict[str, set[str]], nunew: dict[str, set[str]]
264) -> dict[str, list[str]]:
265 """Return a nuninst statistic with only new uninstallable packages
267 This method subtracts the uninstallable packages of the statistic
268 "nunew" from the statistic "nuold".
270 It returns a dictionary with the architectures as keys and the list
271 of uninstallable packages as values. If there are no regressions
272 on a given architecture, then the architecture will be omitted in
273 the result. Accordingly, if none of the architectures have
274 regressions an empty directory is returned.
275 """
276 res: dict[str, list[str]] = {}
277 for arch in ifilter_only(nunew, nuold):
278 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]]
279 # Leave res empty if there are no newly uninst packages
280 if arch_nuninst:
281 res[arch] = arch_nuninst
282 return res
285def format_and_log_uninst(
286 logger: logging.Logger,
287 architectures: Iterable[str],
288 nuninst: Mapping[str, Iterable[str]],
289 *,
290 loglevel: int = logging.INFO,
291) -> None:
292 """Emits the uninstallable packages to the log
294 An example of the output string is:
295 * i386: broken-pkg1, broken-pkg2
297 Note that if there is no uninstallable packages, then nothing is emitted.
298 """
299 for arch in architectures:
300 if arch in nuninst and nuninst[arch]:
301 msg = " * {}: {}".format(arch, ", ".join(sorted(nuninst[arch])))
302 logger.log(loglevel, msg)
305class Sorted(Protocol):
306 def __call__( 306 ↛ exitline 306 didn't jump to the function exit
307 self,
308 iterable: Iterable["SupportsRichComparisonT"],
309 /,
310 *,
311 key: None = None,
312 reverse: bool = False,
313 ) -> list["SupportsRichComparisonT"]: ...
316def write_heidi(
317 filename: str,
318 target_suite: TargetSuite,
319 *,
320 outofsync_arches: frozenset[str] = frozenset(),
321 sorted: Sorted = sorted,
322) -> None:
323 """Write the output HeidiResult
325 This method write the output for Heidi, which contains all the
326 binary packages and the source packages in the form:
328 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section>
329 <src-name> <src-version> source <src-section>
331 The file is written as "filename" using the sources and packages
332 from the "target_suite" parameter.
334 outofsync_arches: If given, it is a set of architectures marked
335 as "out of sync". The output file may exclude some out of date
336 arch:all packages for those architectures to reduce the noise.
338 The "X=X" parameters are optimizations to avoid "load global" in
339 the loops.
340 """
341 sources_t = target_suite.sources
342 packages_t = target_suite.binaries
344 with open(filename, "w", encoding="ascii") as f:
346 # write binary packages
347 for arch in sorted(packages_t):
348 binaries = packages_t[arch]
349 for pkg_name in sorted(binaries):
350 pkg = binaries[pkg_name]
351 pkgv = pkg.version
352 pkgarch = pkg.architecture or "all"
353 pkgsec = pkg.section or "faux"
354 if pkgsec == "faux" or pkgsec.endswith("/faux"):
355 # Faux package; not really a part of testing
356 continue
357 if ( 357 ↛ 369line 357 didn't jump to line 369
358 pkg.source_version
359 and pkgarch == "all"
360 and pkg.source_version != sources_t[pkg.source].version
361 and arch in outofsync_arches
362 ):
363 # when architectures are marked as "outofsync", their binary
364 # versions may be lower than those of the associated
365 # source package in testing. the binary package list for
366 # such architectures will include arch:all packages
367 # matching those older versions, but we only want the
368 # newer arch:all in testing
369 continue
370 f.write(f"{pkg_name} {pkgv} {pkgarch} {pkgsec}\n")
372 # write sources
373 for src_name in sorted(sources_t):
374 src = sources_t[src_name]
375 srcv = src.version
376 srcsec = src.section or "unknown"
377 if srcsec == "faux" or srcsec.endswith("/faux"):
378 # Faux package; not really a part of testing
379 continue
380 f.write(f"{src_name} {srcv} source {srcsec}\n")
383def write_heidi_delta(filename: str, all_selected: list["MigrationItem"]) -> None:
384 """Write the output delta
386 This method writes the packages to be upgraded, in the form:
387 <src-name> <src-version>
388 or (if the source is to be removed):
389 -<src-name> <src-version>
391 The order corresponds to that shown in update_output.
392 """
393 with open(filename, "w", encoding="ascii") as fd:
395 fd.write("#HeidiDelta\n")
397 for item in all_selected:
398 prefix = ""
400 if item.is_removal:
401 prefix = "-"
403 if item.architecture == "source":
404 fd.write(f"{prefix}{item.package} {item.version}\n")
405 else:
406 fd.write(
407 "%s%s %s %s\n"
408 % (prefix, item.package, item.version, item.architecture)
409 )
412class Opener(Protocol):
413 def __call__( 413 ↛ exitline 413 didn't jump to the function exit
414 self, file: str, mode: Literal["wt"], encoding: Literal["utf-8"]
415 ) -> IO[Any]: ...
418def write_excuses(
419 excuses: dict[str, "Excuse"] | dict[PackageId, "Excuse"],
420 dest_file: str,
421 output_format: Literal["yaml", "legacy-html"] = "yaml",
422) -> None:
423 """Write the excuses to dest_file
425 Writes a list of excuses in a specified output_format to the
426 path denoted by dest_file. The output_format can either be "yaml"
427 or "legacy-html".
428 """
429 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey())
430 if output_format == "yaml":
431 os.makedirs(os.path.dirname(dest_file), exist_ok=True)
432 opener: Opener = open # type: ignore[assignment]
433 if dest_file.endswith(".xz"): 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true
434 import lzma
436 opener = lzma.open # type: ignore[assignment]
437 elif dest_file.endswith(".gz"): 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 import gzip
440 opener = gzip.open # type: ignore[assignment]
441 with opener(dest_file, "wt", encoding="utf-8") as f:
442 edatalist = [e.excusedata(excuses) for e in excuselist]
443 excusesdata = {
444 "sources": edatalist,
445 "generated-date": datetime.now(UTC),
446 }
447 f.write(
448 yaml.dump(excusesdata, default_flow_style=False, allow_unicode=True)
449 )
450 elif output_format == "legacy-html":
451 with open(dest_file, "w", encoding="utf-8") as f:
452 f.write(
453 '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">\n'
454 )
455 f.write("<html><head><title>excuses...</title>")
456 f.write(
457 '<meta http-equiv="Content-Type" content="text/html;charset=utf-8"></head><body>\n'
458 )
459 f.write(
460 "<p>Generated: "
461 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
462 + "</p>\n"
463 )
464 f.write("<ul>\n")
465 for e in excuselist:
466 f.write("<li>%s" % e.html(excuses))
467 f.write("</ul></body></html>\n")
468 else: # pragma: no cover
469 raise ValueError('Output format must be either "yaml or "legacy-html"')
472def old_libraries(
473 mi_factory: "MigrationItemFactory",
474 suite_info: Suites,
475 outofsync_arches: Iterable[str] = frozenset(),
476) -> list["MigrationItem"]:
477 """Detect old libraries left in the target suite for smooth transitions
479 This method detects old libraries which are in the target suite but no
480 longer built from the source package: they are still there because
481 other packages still depend on them, but they should be removed as
482 soon as possible.
484 For "outofsync" architectures, outdated binaries are allowed to be in
485 the target suite, so they are only added to the removal list if they
486 are no longer in the (primary) source suite.
487 """
488 sources_t = suite_info.target_suite.sources
489 binaries_t = suite_info.target_suite.binaries
490 binaries_s = suite_info.primary_source_suite.binaries
491 removals = []
492 for arch in binaries_t:
493 for pkg_name in binaries_t[arch]:
494 pkg = binaries_t[arch][pkg_name]
495 if sources_t[pkg.source].version != pkg.source_version and (
496 arch not in outofsync_arches or pkg_name not in binaries_s[arch]
497 ):
498 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id))
499 return removals
502def is_nuninst_asgood_generous(
503 constraints: dict[str, list[str]],
504 allow_uninst: dict[str, set[str | None]],
505 architectures: list[str],
506 old: dict[str, set[str]],
507 new: dict[str, set[str]],
508 break_arches: set[str] = cast(set[str], frozenset()),
509) -> bool:
510 """Compares the nuninst counters and constraints to see if they improved
512 Given a list of architectures, the previous and the current nuninst
513 counters, this function determines if the current nuninst counter
514 is better than the previous one. Optionally it also accepts a set
515 of "break_arches", the nuninst counter for any architecture listed
516 in this set are completely ignored.
518 If the nuninst counters are equal or better, then the constraints
519 are checked for regressions (ignoring break_arches).
521 Returns True if the new nuninst counter is better than the
522 previous and there are no constraint regressions (ignoring Break-archs).
523 Returns False otherwise.
525 """
526 diff = 0
527 for arch in architectures:
528 if arch in break_arches:
529 continue
530 diff = diff + (
531 len(new[arch] - allow_uninst[arch]) - len(old[arch] - allow_uninst[arch])
532 )
533 if diff > 0: 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true
534 return False
535 must_be_installable = constraints["keep-installable"]
536 for arch in architectures:
537 if arch in break_arches:
538 continue
539 regression = new[arch] - old[arch]
540 if not regression.isdisjoint(must_be_installable): 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 return False
542 return True
545def clone_nuninst(
546 nuninst: dict[str, set[str]],
547 *,
548 packages_s: dict[str, dict[str, BinaryPackage]] | None = None,
549 architectures: Iterable[str] | None = None,
550) -> dict[str, set[str]]:
551 """Completely or Selectively deep clone nuninst
553 Given nuninst table, the package table for a given suite and
554 a list of architectures, this function will clone the nuninst
555 table. Only the listed architectures will be deep cloned -
556 the rest will only be shallow cloned. When packages_s is given,
557 packages not listed in packages_s will be pruned from the clone
558 (if packages_s is omitted, the per architecture nuninst is cloned
559 as-is)
560 """
561 clone = nuninst.copy()
562 if architectures is None: 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true
563 return clone
564 if packages_s is not None:
565 for arch in architectures:
566 clone[arch] = {x for x in nuninst[arch] if x in packages_s[arch]}
567 clone[arch + "+all"] = {
568 x for x in nuninst[arch + "+all"] if x in packages_s[arch]
569 }
570 else:
571 for arch in architectures:
572 clone[arch] = set(nuninst[arch])
573 clone[arch + "+all"] = set(nuninst[arch + "+all"])
574 return clone
577def test_installability(
578 target_suite: TargetSuite,
579 pkg_name: str,
580 pkg_id: BinaryPackageId,
581 broken: set[str],
582 nuninst_arch: set[str] | None,
583) -> Literal[-1, 0, 1]:
584 """Test for installability of a package on an architecture
586 (pkg_name, pkg_version, pkg_arch) is the package to check.
588 broken is the set of broken packages. If p changes
589 installability (e.g. goes from uninstallable to installable),
590 broken will be updated accordingly.
592 If nuninst_arch is not None then it also updated in the same
593 way as broken is.
594 """
595 c: Literal[-1, 0, 1] = 0
596 r = target_suite.is_installable(pkg_id)
597 if not r:
598 # not installable
599 if pkg_name not in broken:
600 # regression
601 broken.add(pkg_name)
602 c = -1
603 if nuninst_arch is not None and pkg_name not in nuninst_arch:
604 nuninst_arch.add(pkg_name)
605 else:
606 if pkg_name in broken:
607 # Improvement
608 broken.remove(pkg_name)
609 c = 1
610 if nuninst_arch is not None and pkg_name in nuninst_arch:
611 nuninst_arch.remove(pkg_name)
612 return c
615def check_installability(
616 target_suite: TargetSuite,
617 binaries: dict[str, dict[str, BinaryPackage]],
618 arch: str,
619 updates: set[BinaryPackageId],
620 check_archall: bool,
621 nuninst: dict[str, set[str]],
622) -> None:
623 broken = nuninst[arch + "+all"]
624 packages_t_a = binaries[arch]
626 for pkg_id in (x for x in updates if x.architecture == arch):
627 name, version, parch = pkg_id
628 if name not in packages_t_a:
629 continue
630 pkgdata = packages_t_a[name]
631 if version != pkgdata.version:
632 # Not the version in testing right now, ignore
633 continue
634 actual_arch = pkgdata.architecture
635 nuninst_arch = None
636 # only check arch:all packages if requested
637 if check_archall or actual_arch != "all":
638 nuninst_arch = nuninst[parch]
639 elif actual_arch == "all": 639 ↛ 641line 639 didn't jump to line 641 because the condition on line 639 was always true
640 nuninst[parch].discard(name)
641 test_installability(target_suite, name, pkg_id, broken, nuninst_arch)
644def possibly_compressed(
645 path: str, *, permitted_compressions: list[str] | None = None
646) -> str:
647 """Find and select a (possibly compressed) variant of a path
649 If the given path exists, it will be returned
651 :param path: The base path.
652 :param permitted_compressions: Alternative extensions to look for. Defaults to "gz" and "xz".
653 :return: The path given possibly with one of the permitted extensions.
654 :raises FileNotFoundError: if the path is not found
655 """
656 if os.path.exists(path): 656 ↛ 658line 656 didn't jump to line 658 because the condition on line 656 was always true
657 return path
658 if permitted_compressions is None:
659 permitted_compressions = ["gz", "xz"]
660 for ext in permitted_compressions:
661 cpath = f"{path}.{ext}"
662 if os.path.exists(cpath):
663 return cpath
664 raise FileNotFoundError(
665 errno.ENOENT, os.strerror(errno.ENOENT), path
666 ) # pragma: no cover
669def create_provides_map(
670 packages: dict[str, BinaryPackage],
671) -> dict[str, set[tuple[str, str]]]:
672 """Create a provides map from a map binary package names and their BinaryPackage objects
674 :param packages: A dict mapping binary package names to their BinaryPackage object
675 :return: A provides map
676 """
677 # create provides
678 provides = defaultdict(set)
680 for pkg, dpkg in packages.items():
681 # register virtual packages and real packages that provide
682 # them
683 for provided_pkg, provided_version, _ in dpkg.provides:
684 provides[provided_pkg].add((pkg, provided_version))
686 return provides
689def read_release_file(suite_dir: str) -> "TagSection[str]":
690 """Parses a given "Release" file
692 :param suite_dir: The directory to the suite
693 :return: A dict of the first (and only) paragraph in an Release file
694 """
695 release_file = os.path.join(suite_dir, "Release")
696 with open(release_file) as fd:
697 tag_file = iter(apt_pkg.TagFile(fd))
698 result = next(tag_file)
699 if next(tag_file, None) is not None: # pragma: no cover
700 raise TypeError("%s has more than one paragraph" % release_file)
701 return result
704def read_sources_file(
705 filename: str,
706 sources: dict[str, SourcePackage] | None = None,
707 add_faux: bool = True,
708 intern: Callable[[str], str] = sys.intern,
709) -> dict[str, SourcePackage]:
710 """Parse a single Sources file into a hash
712 Parse a single Sources file into a dict mapping a source package
713 name to a SourcePackage object. If there are multiple source
714 packages with the same version, then highest versioned source
715 package (that is not marked as "Extra-Source-Only") is the
716 version kept in the dict.
718 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile
719 :param sources: Optional dict to add the packages to. If given, this is also the value returned.
720 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all
721 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop
722 :return: mapping from names to a source package
723 """
724 if sources is None:
725 sources = {}
727 tag_file = apt_pkg.TagFile(filename)
728 get_field = tag_file.section.get
729 step = tag_file.step
731 while step():
732 if get_field("Extra-Source-Only", "no") == "yes":
733 # Ignore sources only referenced by Built-Using
734 continue
735 pkg = get_field("Package")
736 ver = get_field("Version")
737 # There may be multiple versions of the source package
738 # (in unstable) if some architectures have out-of-date
739 # binaries. We only ever consider the source with the
740 # largest version for migration.
741 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0:
742 continue
743 maint = get_field("Maintainer")
744 if maint: 744 ↛ 746line 744 didn't jump to line 746 because the condition on line 744 was always true
745 maint = intern(maint.strip())
746 section = get_field("Section")
747 if section: 747 ↛ 750line 747 didn't jump to line 750 because the condition on line 747 was always true
748 section = intern(section.strip())
749 build_deps_arch: str | None
750 build_deps_arch = ", ".join(
751 x
752 for x in (get_field("Build-Depends"), get_field("Build-Depends-Arch"))
753 if x is not None
754 )
755 if build_deps_arch != "":
756 build_deps_arch = sys.intern(build_deps_arch)
757 else:
758 build_deps_arch = None
759 build_deps_indep = get_field("Build-Depends-Indep")
760 if build_deps_indep is not None:
761 build_deps_indep = sys.intern(build_deps_indep)
763 # Adding arch:all packages to the list of binaries already to be able
764 # to check for them later. Helps mitigate bug 887060 and is the
765 # (partial?) answer to bug 1064428.
766 binaries: set[BinaryPackageId] = set()
767 if add_faux and "all" in get_field("Architecture", "").split():
768 # the value "faux" in arch:faux is used elsewhere, so keep in sync
769 pkg_id = BinaryPackageId(pkg + "-faux", intern("0~~~~"), intern("faux"))
770 binaries.add(pkg_id)
772 sources[intern(pkg)] = SourcePackage(
773 intern(pkg),
774 intern(ver),
775 section,
776 binaries,
777 maint,
778 False,
779 build_deps_arch,
780 build_deps_indep,
781 get_field("Testsuite", "").split(),
782 get_field("Testsuite-Triggers", "").replace(",", "").split(),
783 )
784 return sources
787def _check_and_update_packages(
788 packages: list[BinaryPackage],
789 package: BinaryPackage,
790 archqual: str | None,
791 build_depends: bool,
792) -> None:
793 """Helper for get_dependency_solvers
795 This method updates the list of packages with a given package if that
796 package is a valid (Build-)Depends.
798 :param packages: which packages are to be updated
799 :param archqual: Architecture qualifier
800 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency.
801 """
803 # See also bug #971739 and #1059929
804 if archqual is None:
805 packages.append(package)
806 elif archqual == "native" and build_depends:
807 # Multi-arch handling for build-dependencies
808 # - :native is ok always
809 packages.append(package)
810 elif archqual == "any" and package.multi_arch == "allowed": 810 ↛ 813line 810 didn't jump to line 813 because the condition on line 810 was never true
811 # Multi-arch handling for both build-dependencies and regular dependencies
812 # - :any is ok iff the target has "M-A: allowed"
813 packages.append(package)
816class GetDependencySolversProto(Protocol):
817 def __call__( 817 ↛ exitline 817 didn't jump to the function exit
818 self,
819 block: list[tuple[str, str, str]],
820 binaries_s_a: dict[str, BinaryPackage],
821 provides_s_a: dict[str, set[tuple[str, str]]],
822 *,
823 build_depends: bool = False,
824 empty_set: Any = frozenset(),
825 ) -> list[BinaryPackage]: ...
828def get_dependency_solvers(
829 block: list[tuple[str, str, str]],
830 binaries_s_a: dict[str, BinaryPackage],
831 provides_s_a: dict[str, set[tuple[str, str]]],
832 *,
833 build_depends: bool = False,
834 empty_set: Any = frozenset(),
835) -> list[BinaryPackage]:
836 """Find the packages which satisfy a dependency block
838 This method returns the list of packages which satisfy a dependency
839 block (as returned by apt_pkg.parse_depends) in a package table
840 for a given suite and architecture (a la self.binaries[suite][arch])
842 It can also handle build-dependency relations if the named parameter
843 "build_depends" is set to True. In this case, block should be based
844 on the return value from apt_pkg.parse_src_depends.
846 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends
847 if the "build_depends" is True)
848 :param binaries_s_a: Mapping of package names to the relevant BinaryPackage
849 :param provides_s_a: Mapping of package names to their providers (as generated by parse_provides)
850 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than
851 a regular dependency relation.
852 :param empty_set: Internal implementation detail / optimisation
853 :return: package names solving the relation
854 """
855 packages: list[BinaryPackage] = []
857 # for every package, version and operation in the block
858 for name, version, op in block:
859 if ":" in name:
860 name, archqual = name.split(":", 1)
861 else:
862 archqual = None
864 # look for the package in unstable
865 if name in binaries_s_a:
866 package = binaries_s_a[name]
867 # check the versioned dependency and architecture qualifier
868 # (if present)
869 if (op == "" and version == "") or apt_pkg.check_dep(
870 package.version, op, version
871 ):
872 _check_and_update_packages(packages, package, archqual, build_depends)
874 # look for the package in the virtual packages list and loop on them
875 for prov, prov_version in provides_s_a.get(name, empty_set):
876 assert prov in binaries_s_a
877 package = binaries_s_a[prov]
878 # See Policy Manual §7.5
879 if (op == "" and version == "") or (
880 prov_version != "" and apt_pkg.check_dep(prov_version, op, version)
881 ):
882 _check_and_update_packages(packages, package, archqual, build_depends)
884 return packages
887def invalidate_excuses(
888 excuses: dict[str, "Excuse"],
889 valid: set[str],
890 invalid: set[str],
891 invalidated: set[str],
892) -> None:
893 """Invalidate impossible excuses
895 This method invalidates the impossible excuses, which depend
896 on invalid excuses. The two parameters contains the sets of
897 `valid' and `invalid' excuses.
898 """
899 # make a list of all packages (source and binary) that are present in the
900 # excuses we have
901 excuses_packages: dict[PackageId | BinaryPackageId, set[str]] = defaultdict(set)
902 for exc in excuses.values():
903 for arch in exc.packages:
904 for pkg_arch_id in exc.packages[arch]:
905 # note that the same package can be in multiple excuses
906 # eg. when unstable and TPU have the same packages
907 excuses_packages[pkg_arch_id].add(exc.name)
909 # create dependencies between excuses based on packages
910 excuses_rdeps = defaultdict(set)
911 for exc in excuses.values():
912 # Note that excuses_rdeps is only populated by dependencies generated
913 # based on packages below. There are currently no dependencies between
914 # excuses that are added directly, so this is ok.
916 for pkg_dep in exc.depends_packages:
917 # set of excuses, each of which can satisfy this specific
918 # dependency
919 # if there is a dependency on a package for which no
920 # excuses exist (e.g. a cruft binary), the set will
921 # contain an ImpossibleDependencyState
922 dep_exc: set[str | DependencyState] = set()
923 for pkg_dep_id in cast(set[BinaryPackageId], pkg_dep.deps):
924 pkg_excuses = excuses_packages[pkg_dep_id]
925 # if the dependency isn't found, we get an empty set
926 if pkg_excuses == frozenset():
927 imp_dep = ImpossibleDependencyState(
928 PolicyVerdict.REJECTED_PERMANENTLY, "%s" % (pkg_dep_id.name)
929 )
930 dep_exc.add(imp_dep)
932 else:
933 dep_exc |= pkg_excuses
934 for e in pkg_excuses:
935 excuses_rdeps[e].add(exc.name)
936 if not exc.add_dependency(dep_exc, pkg_dep.spec):
937 valid.discard(exc.name)
938 invalid.add(exc.name)
940 # loop on the invalid excuses
941 # Convert invalid to a list for deterministic results
942 invalid2 = sorted(invalid)
943 for ename in iter_except(invalid2.pop, IndexError):
944 invalidated.add(ename)
945 # if there is no reverse dependency, skip the item
946 if ename not in excuses_rdeps:
947 continue
949 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM
950 if excuses[ename].policy_verdict.is_blocked:
951 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM
953 # loop on the reverse dependencies
954 for x in sorted(excuses_rdeps[ename]):
955 exc = excuses[x]
956 # if the item is valid and it is not marked as `forced', then we
957 # invalidate this specific dependency
958 if x in valid and not exc.forced:
959 # mark this specific dependency as invalid
960 still_valid = exc.invalidate_dependency(ename, rdep_verdict)
962 # if there are no alternatives left for this dependency,
963 # invalidate the excuse
964 if not still_valid:
965 valid.discard(x)
966 invalid2.append(x)
969def compile_nuninst(
970 target_suite: TargetSuite, architectures: list[str], nobreakall_arches: list[str]
971) -> dict[str, set[str]]:
972 """Compile a nuninst dict from the current testing
974 :param target_suite: The target suite
975 :param architectures: Which architectures to check
976 :param nobreakall_arches: Which architectures where arch:all packages must be installable
977 """
978 nuninst: dict[str, set[str]] = {}
979 binaries_t = target_suite.binaries
981 # for all the architectures
982 for arch in architectures:
983 # if it is in the nobreakall ones, check arch-independent packages too
984 check_archall = arch in nobreakall_arches
986 # check all the packages for this architecture
987 nuninst[arch] = set()
988 packages_t_a = binaries_t[arch]
989 for pkg_name, pkg_data in packages_t_a.items():
990 r = target_suite.is_installable(pkg_data.pkg_id)
991 if not r:
992 nuninst[arch].add(pkg_name)
994 # if they are not required, remove architecture-independent packages
995 nuninst[arch + "+all"] = nuninst[arch].copy()
996 if not check_archall:
997 for pkg_name in nuninst[arch + "+all"]:
998 pkg_data = packages_t_a[pkg_name]
999 if pkg_data.architecture == "all":
1000 nuninst[arch].remove(pkg_name)
1002 return nuninst
1005def is_smooth_update_allowed(
1006 binary: BinaryPackage, smooth_updates: list[str], hints: "HintCollection"
1007) -> bool:
1008 if "ALL" in smooth_updates: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true
1009 return True
1010 section = binary.section.split("/")[-1]
1011 if section in smooth_updates:
1012 return True
1013 if hints.search( 1013 ↛ 1017line 1013 didn't jump to line 1017 because the condition on line 1013 was never true
1014 "allow-smooth-update", package=binary.source, version=binary.source_version
1015 ):
1016 # note that this needs to match the source version *IN TESTING*
1017 return True
1018 return False
1021def find_smooth_updateable_binaries(
1022 binaries_to_check: list[BinaryPackageId],
1023 source_data: SourcePackage,
1024 pkg_universe: "BinaryPackageUniverse",
1025 target_suite: TargetSuite,
1026 binaries_t: dict[str, dict[str, BinaryPackage]],
1027 binaries_s: dict[str, dict[str, BinaryPackage]],
1028 removals: set[BinaryPackageId] | frozenset[BinaryPackageId],
1029 smooth_updates: list[str],
1030 hints: "HintCollection",
1031) -> set[BinaryPackageId]:
1032 check: set[BinaryPackageId] = set()
1033 smoothbins: set[BinaryPackageId] = set()
1035 for check_pkg_id in binaries_to_check:
1036 binary, _, parch = check_pkg_id
1038 cruftbins: set[BinaryPackageId] = set()
1040 # Not a candidate for smooth up date (newer non-cruft version in unstable)
1041 if binary in binaries_s[parch]:
1042 if binaries_s[parch][binary].source_version == source_data.version:
1043 continue
1044 cruftbins.add(binaries_s[parch][binary].pkg_id)
1046 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it.
1047 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints):
1048 # if the package has reverse-dependencies which are
1049 # built from other sources, it's a valid candidate for
1050 # a smooth update. if not, it may still be a valid
1051 # candidate if one if its r-deps is itself a candidate,
1052 # so note it for checking later
1053 rdeps = set(pkg_universe.reverse_dependencies_of(check_pkg_id))
1054 # We ignore all binaries listed in "removals" as we
1055 # assume they will leave at the same time as the
1056 # given package.
1057 rdeps.difference_update(removals, binaries_to_check)
1059 smooth_update_it = False
1060 if target_suite.any_of_these_are_in_the_suite(rdeps):
1061 combined = set(smoothbins)
1062 combined.add(check_pkg_id)
1063 for rdep in rdeps:
1064 # each dependency clause has a set of possible
1065 # alternatives that can satisfy that dependency.
1066 # if any of them is outside the set of smoothbins, the
1067 # dependency can be satisfied even if this binary was
1068 # removed, so there is no need to keep it around for a
1069 # smooth update
1070 # if not, only this binary can satisfy the dependency, so
1071 # we should keep it around until the rdep is no longer in
1072 # testing
1073 for dep_clause in pkg_universe.dependencies_of(rdep):
1074 # filter out cruft binaries from unstable, because
1075 # they will not be added to the set of packages that
1076 # will be migrated
1077 if dep_clause - cruftbins <= combined:
1078 smooth_update_it = True
1079 break
1081 if smooth_update_it:
1082 smoothbins = combined
1083 else:
1084 check.add(check_pkg_id)
1086 # check whether we should perform a smooth update for
1087 # packages which are candidates but do not have r-deps
1088 # outside of the current source
1089 while 1:
1090 found_any = False
1091 for candidate_pkg_id in check:
1092 rdeps = pkg_universe.reverse_dependencies_of(candidate_pkg_id)
1093 if not rdeps.isdisjoint(smoothbins):
1094 smoothbins.add(candidate_pkg_id)
1095 found_any = True
1096 if not found_any:
1097 break
1098 check = {x for x in check if x not in smoothbins}
1100 return smoothbins
1103def find_newer_binaries(
1104 suite_info: Suites, pkg: BinaryPackage, add_source_for_dropped_bin: bool = False
1105) -> list[tuple[PackageId, Suite]]:
1106 """
1107 Find newer binaries for pkg in any of the source suites.
1109 :param pkg: BinaryPackage (is assumed to be in the target suite)
1111 :param add_source_for_dropped_bin: If True, newer versions of the
1112 source of pkg will be added if they don't have the binary pkg
1114 :return: the newer binaries (or sources) and their suites
1115 """
1116 source = pkg.source
1117 newer_versions: list[tuple[PackageId, Suite]] = []
1118 for suite in suite_info:
1119 if suite.suite_class == SuiteClass.TARGET_SUITE:
1120 continue
1122 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture)
1123 if not suite_binaries_on_arch: 1123 ↛ 1124line 1123 didn't jump to line 1124 because the condition on line 1123 was never true
1124 continue
1126 newerbin = None
1127 if pkg.pkg_id.package_name in suite_binaries_on_arch:
1128 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name]
1129 if suite.is_cruft(newerbin):
1130 # We pretend the cruft binary doesn't exist.
1131 # We handle this as if the source didn't have the binary
1132 # (see below)
1133 newerbin = None
1134 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0:
1135 continue
1136 else:
1137 if source not in suite.sources:
1138 # bin and source not in suite: no newer version
1139 continue
1141 if not newerbin:
1142 if not add_source_for_dropped_bin: 1142 ↛ 1143line 1142 didn't jump to line 1143 because the condition on line 1142 was never true
1143 continue
1144 # We only get here if there is a newer version of the source,
1145 # which doesn't have the binary anymore (either it doesn't
1146 # exist, or it's cruft and we pretend it doesn't exist).
1147 # Add the new source instead.
1148 nsrc = suite.sources[source]
1149 n_id = PackageId(source, nsrc.version, "source")
1150 overs = pkg.source_version
1151 if apt_pkg.version_compare(nsrc.version, overs) <= 0:
1152 continue
1153 else:
1154 n_id = newerbin.pkg_id
1156 newer_versions.append((n_id, suite))
1158 return newer_versions
1161def parse_provides(
1162 provides_raw: str,
1163 pkg_id: BinaryPackageId | None = None,
1164 logger: logging.Logger | None = None,
1165) -> list[tuple[str, str, str]]:
1166 parts = apt_pkg.parse_depends(provides_raw, False)
1167 nprov = []
1168 for or_clause in parts:
1169 if len(or_clause) != 1: # pragma: no cover
1170 if logger is not None:
1171 msg = "Ignoring invalid provides in %s: Alternatives [%s]"
1172 logger.warning(msg, str(pkg_id), str(or_clause))
1173 continue
1174 for part in or_clause:
1175 provided, provided_version, op = part
1176 if op != "" and op != "=": # pragma: no cover
1177 if logger is not None:
1178 msg = "Ignoring invalid provides in %s: %s (%s %s)"
1179 logger.warning(msg, str(pkg_id), provided, op, provided_version)
1180 continue
1181 provided = sys.intern(provided)
1182 provided_version = sys.intern(provided_version)
1183 part = (provided, provided_version, sys.intern(op))
1184 nprov.append(part)
1185 return nprov
1188def parse_builtusing(
1189 builtusing_raw: str,
1190 pkg_id: BinaryPackageId | None = None,
1191 logger: logging.Logger | None = None,
1192) -> list[tuple[str, str]]:
1193 parts = apt_pkg.parse_depends(builtusing_raw, False)
1194 nbu = []
1195 for or_clause in parts:
1196 if len(or_clause) != 1: # pragma: no cover
1197 if logger is not None:
1198 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]"
1199 logger.warning(msg, str(pkg_id), str(or_clause))
1200 continue
1201 for part in or_clause:
1202 bu, bu_version, op = part
1203 if op != "=": # pragma: no cover
1204 if logger is not None:
1205 msg = "Ignoring invalid builtusing in %s: %s (%s %s)"
1206 logger.warning(msg, str(pkg_id), bu, op, bu_version)
1207 continue
1208 bu = sys.intern(bu)
1209 bu_version = sys.intern(bu_version)
1210 nbu.append((bu, bu_version))
1211 return nbu
1214def parse_option(
1215 options: "optparse.Values",
1216 option_name: str,
1217 default: Any | None = None,
1218 to_bool: bool = False,
1219 to_int: bool = False,
1220 day_to_sec: bool = False,
1221) -> None:
1222 """Ensure the option exist and has a sane value
1224 :param options: dict with options
1226 :param option_name: string with the name of the option
1228 :param default: the default value for the option
1230 :param to_int: convert the input to int (defaults to sys.maxsize)
1232 :param to_bool: convert the input to bool
1234 :param day_to_sec: convert the input from days to seconds (implies to_int=True)
1235 """
1236 value = getattr(options, option_name, default)
1238 # Option was provided with no value (or default is '') so pick up the default
1239 if value == "":
1240 value = default
1242 if (to_int or day_to_sec) and value in (None, ""):
1243 value = sys.maxsize
1245 if day_to_sec:
1246 value = int(float(value) * 24 * 60 * 60) # type: ignore[arg-type]
1248 if to_int:
1249 value = int(value) # type: ignore[arg-type]
1251 if to_bool:
1252 if value and (
1253 isinstance(value, bool) or value.lower() in ("yes", "y", "true", "t", "1")
1254 ):
1255 value = True
1256 else:
1257 value = False
1259 setattr(options, option_name, value)