Coverage for britney2/utils.py: 92%
496 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
1# Refactored parts from britney.py, which is/was:
2# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org>
3# Andreas Barth <aba@debian.org>
4# Fabio Tranchitella <kobold@debian.org>
5# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org>
6# Copyright (C) 2012 Niels Thykier <niels@thykier.net>
7#
8# New portions
9# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
22import errno
23import logging
24import optparse
25import os
26import sys
27import time
28from collections import defaultdict
29from collections.abc import (
30 Callable,
31 Container,
32 Generator,
33 Iterable,
34 Iterator,
35 Mapping,
36 MutableSet,
37)
38from datetime import UTC, datetime
39from enum import Enum, StrEnum
40from functools import partial
41from itertools import chain, filterfalse
42from typing import (
43 IO,
44 TYPE_CHECKING,
45 Any,
46 Literal,
47 Protocol,
48 TypeVar,
49 Union,
50 cast,
51 overload,
52)
54import apt_pkg
55import yaml
57from britney2 import (
58 BinaryPackage,
59 BinaryPackageId,
60 MultiArch,
61 PackageId,
62 SourcePackage,
63 Suite,
64 SuiteClass,
65 Suites,
66 TargetSuite,
67)
68from britney2.excuse import Excuse
69from britney2.excusedeps import DependencyState, ImpossibleDependencyState
70from britney2.policies import PolicyVerdict
72if TYPE_CHECKING: 72 ↛ 74line 72 didn't jump to line 74 because the condition on line 72 was never true
74 from _typeshed import SupportsRichComparisonT
75 from apt_pkg import TagSection
77 from .hints import HintCollection
78 from .installability.universe import BinaryPackageUniverse
79 from .migrationitem import MigrationItem, MigrationItemFactory
81_T = TypeVar("_T")
84class MigrationConstraintException(Exception):
85 pass
88@overload
89def ifilter_except( 89 ↛ exitline 89 didn't jump to the function exit
90 container: Container[_T], iterable: Literal[None] = None
91) -> "partial[filterfalse[_T]]": ...
94@overload
95def ifilter_except( 95 ↛ exitline 95 didn't jump to the function exit
96 container: Container[_T], iterable: Iterable[_T]
97) -> "filterfalse[_T]": ...
100def ifilter_except(
101 container: Container[_T], iterable: Iterable[_T] | None = None
102) -> Union["filterfalse[_T]", "partial[filterfalse[_T]]"]:
103 """Filter out elements in container
105 If given an iterable it returns a filtered iterator, otherwise it
106 returns a function to generate filtered iterators. The latter is
107 useful if the same filter has to be (re-)used on multiple
108 iterators that are not known on beforehand.
109 """
110 if iterable is not None: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 return filterfalse(container.__contains__, iterable)
112 return cast(
113 "partial[filterfalse[_T]]", partial(filterfalse, container.__contains__)
114 )
117@overload
118def ifilter_only(container: Container[_T], iterable: Iterable[_T]) -> "filter[_T]": ... 118 ↛ exitline 118 didn't return from function 'ifilter_only' because
121@overload
122def ifilter_only(container: Container[_T], iterable: None) -> "partial[filter[_T]]": ... 122 ↛ exitline 122 didn't return from function 'ifilter_only' because
125def ifilter_only(
126 container: Container[_T], iterable: Iterable[_T] | None = None
127) -> Union["filter[_T]", "partial[filter[_T]]"]:
128 """Filter out elements in which are not in container
130 If given an iterable it returns a filtered iterator, otherwise it
131 returns a function to generate filtered iterators. The latter is
132 useful if the same filter has to be (re-)used on multiple
133 iterators that are not known on beforehand.
134 """
135 if iterable is not None: 135 ↛ 137line 135 didn't jump to line 137 because the condition on line 135 was always true
136 return filter(container.__contains__, iterable)
137 return partial(filter, container.__contains__)
140# iter_except is from the "itertools" recipe
141def iter_except(
142 func: Callable[[], _T],
143 exception: type[BaseException] | tuple[type[BaseException], ...],
144 first: Any = None,
145) -> Iterator[_T]: # pragma: no cover - itertools recipe function
146 """Call a function repeatedly until an exception is raised.
148 Converts a call-until-exception interface to an iterator interface.
149 Like __builtin__.iter(func, sentinel) but uses an exception instead
150 of a sentinel to end the loop.
152 Examples:
153 bsddbiter = iter_except(db.next, bsddb.error, db.first)
154 heapiter = iter_except(functools.partial(heappop, h), IndexError)
155 dictiter = iter_except(d.popitem, KeyError)
156 dequeiter = iter_except(d.popleft, IndexError)
157 queueiter = iter_except(q.get_nowait, Queue.Empty)
158 setiter = iter_except(s.pop, KeyError)
160 """
161 try:
162 if first is not None:
163 yield first()
164 while 1:
165 yield func()
166 except exception:
167 pass
170def log_and_format_old_libraries(
171 logger: logging.Logger, libs: list["MigrationItem"]
172) -> None:
173 """Format and log old libraries in a table (no header)"""
174 libraries: dict[str, list[str]] = {}
175 for i in libs:
176 pkg = i.package
177 if pkg in libraries:
178 libraries[pkg].append(i.architecture)
179 else:
180 libraries[pkg] = [i.architecture]
182 for lib in sorted(libraries):
183 logger.info(" %s: %s", lib, " ".join(libraries[lib]))
186def compute_reverse_tree(
187 pkg_universe: "BinaryPackageUniverse", affected: set[BinaryPackageId]
188) -> None:
189 """Calculate the full dependency tree for a set of packages
191 This method returns the full dependency tree for a given set of
192 packages. The first argument is an instance of the BinaryPackageUniverse
193 and the second argument are a set of BinaryPackageId.
195 The set of affected packages will be updated in place and must
196 therefore be mutable.
197 """
198 remain = list(affected)
199 while remain:
200 pkg_id = remain.pop()
201 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected
202 affected.update(new_pkg_ids)
203 remain.extend(new_pkg_ids)
206def add_transitive_dependencies_flatten(
207 pkg_universe: "BinaryPackageUniverse", initial_set: MutableSet[BinaryPackageId]
208) -> None:
209 """Find and include all transitive dependencies
211 This method updates the initial_set parameter to include all transitive
212 dependencies. The first argument is an instance of the BinaryPackageUniverse
213 and the second argument are a set of BinaryPackageId.
215 The set of initial packages will be updated in place and must
216 therefore be mutable.
217 """
218 remain = list(initial_set)
219 while remain:
220 pkg_id = remain.pop()
221 new_pkg_ids = {
222 x
223 for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id))
224 if x not in initial_set
225 }
226 initial_set |= new_pkg_ids
227 remain.extend(new_pkg_ids)
230def write_nuninst(filename: str, nuninst: dict[str, set[str]]) -> None:
231 """Write the non-installable report
233 Write the non-installable report derived from "nuninst" to the
234 file denoted by "filename".
235 """
236 with open(filename, "w", encoding="utf-8") as f:
237 # Having two fields with (almost) identical dates seems a bit
238 # redundant.
239 f.write(
240 "Built on: "
241 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
242 + "\n"
243 )
244 f.write(
245 "Last update: "
246 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
247 + "\n\n"
248 )
249 for k in nuninst:
250 f.write("{}: {}\n".format(k, " ".join(nuninst[k])))
253def read_nuninst(filename: str, architectures: set[str]) -> dict[str, set[str]]:
254 """Read the non-installable report
256 Read the non-installable report from the file denoted by
257 "filename" and return it. Only architectures in "architectures"
258 will be included in the report.
259 """
260 nuninst: dict[str, set[str]] = {}
261 with open(filename, encoding="utf-8") as f:
262 for r in f:
263 if ":" not in r:
264 continue
265 arch, packages = r.strip().split(":", 1)
266 if arch.split("+", 1)[0] in architectures:
267 nuninst[arch] = set(packages.split())
268 return nuninst
271def newly_uninst(
272 nuold: dict[str, set[str]], nunew: dict[str, set[str]]
273) -> dict[str, list[str]]:
274 """Return a nuninst statistic with only new uninstallable packages
276 This method subtracts the uninstallable packages of the statistic
277 "nunew" from the statistic "nuold".
279 It returns a dictionary with the architectures as keys and the list
280 of uninstallable packages as values. If there are no regressions
281 on a given architecture, then the architecture will be omitted in
282 the result. Accordingly, if none of the architectures have
283 regressions an empty directory is returned.
284 """
285 res: dict[str, list[str]] = {}
286 for arch in ifilter_only(nunew, nuold):
287 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]]
288 # Leave res empty if there are no newly uninst packages
289 if arch_nuninst:
290 res[arch] = arch_nuninst
291 return res
294def format_and_log_uninst(
295 logger: logging.Logger,
296 architectures: Iterable[str],
297 nuninst: Mapping[str, Iterable[str]],
298 *,
299 loglevel: int = logging.INFO,
300) -> None:
301 """Emits the uninstallable packages to the log
303 An example of the output string is:
304 * i386: broken-pkg1, broken-pkg2
306 Note that if there is no uninstallable packages, then nothing is emitted.
307 """
308 for arch in architectures:
309 if arch in nuninst and nuninst[arch]:
310 msg = " * {}: {}".format(arch, ", ".join(sorted(nuninst[arch])))
311 logger.log(loglevel, msg)
314class Sorted(Protocol):
315 def __call__( 315 ↛ exitline 315 didn't jump to the function exit
316 self,
317 iterable: Iterable["SupportsRichComparisonT"],
318 /,
319 *,
320 key: None = None,
321 reverse: bool = False,
322 ) -> list["SupportsRichComparisonT"]: ...
325def write_heidi(
326 filename: str,
327 target_suite: TargetSuite,
328 *,
329 outofsync_arches: frozenset[str] = frozenset(),
330 sorted: Sorted = sorted,
331) -> None:
332 """Write the output HeidiResult
334 This method write the output for Heidi, which contains all the
335 binary packages and the source packages in the form:
337 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section>
338 <src-name> <src-version> source <src-section>
340 The file is written as "filename" using the sources and packages
341 from the "target_suite" parameter.
343 outofsync_arches: If given, it is a set of architectures marked
344 as "out of sync". The output file may exclude some out of date
345 arch:all packages for those architectures to reduce the noise.
347 The "X=X" parameters are optimizations to avoid "load global" in
348 the loops.
349 """
350 sources_t = target_suite.sources
351 packages_t = target_suite.binaries
353 with open(filename, "w", encoding="ascii") as f:
355 # write binary packages
356 for arch in sorted(packages_t):
357 binaries = packages_t[arch]
358 for pkg_name in sorted(binaries):
359 pkg = binaries[pkg_name]
360 pkgv = pkg.version
361 pkgarch = pkg.architecture or "all"
362 pkgsec = pkg.section or "faux"
363 if pkgsec == "faux" or pkgsec.endswith("/faux"):
364 # Faux package; not really a part of testing
365 continue
366 if ( 366 ↛ 378line 366 didn't jump to line 378
367 pkg.source_version
368 and pkgarch == "all"
369 and pkg.source_version != sources_t[pkg.source].version
370 and arch in outofsync_arches
371 ):
372 # when architectures are marked as "outofsync", their binary
373 # versions may be lower than those of the associated
374 # source package in testing. the binary package list for
375 # such architectures will include arch:all packages
376 # matching those older versions, but we only want the
377 # newer arch:all in testing
378 continue
379 f.write(f"{pkg_name} {pkgv} {pkgarch} {pkgsec}\n")
381 # write sources
382 for src_name in sorted(sources_t):
383 src = sources_t[src_name]
384 srcv = src.version
385 srcsec = src.section or "unknown"
386 if srcsec == "faux" or srcsec.endswith("/faux"):
387 # Faux package; not really a part of testing
388 continue
389 f.write(f"{src_name} {srcv} source {srcsec}\n")
392def write_heidi_delta(filename: str, all_selected: list["MigrationItem"]) -> None:
393 """Write the output delta
395 This method writes the packages to be upgraded, in the form:
396 <src-name> <src-version>
397 or (if the source is to be removed):
398 -<src-name> <src-version>
400 The order corresponds to that shown in update_output.
401 """
402 with open(filename, "w", encoding="ascii") as fd:
404 fd.write("#HeidiDelta\n")
406 for item in all_selected:
407 prefix = ""
409 if item.is_removal:
410 prefix = "-"
412 if item.architecture == "source":
413 fd.write(f"{prefix}{item.package} {item.version}\n")
414 else:
415 fd.write(
416 "%s%s %s %s\n"
417 % (prefix, item.package, item.version, item.architecture)
418 )
421class Opener(Protocol):
422 def __call__( 422 ↛ exitline 422 didn't jump to the function exit
423 self, file: str, mode: Literal["wt"], encoding: Literal["utf-8"]
424 ) -> IO[Any]: ...
427class ExcusesOutputFormat(Enum):
428 YAML = 0
429 LEGACY_HTML = 1
432def write_excuses(
433 excuses: dict[str, "Excuse"] | dict[PackageId, "Excuse"],
434 dest_file: str,
435 output_format: ExcusesOutputFormat = ExcusesOutputFormat.YAML,
436) -> None:
437 """Write the excuses to dest_file
439 Writes a list of excuses in a specified output_format to the
440 path denoted by dest_file. The output_format can either be "yaml"
441 or "legacy-html".
442 """
443 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey())
444 if output_format is ExcusesOutputFormat.YAML:
445 # use custom representer to avoid creation of the full list with all excuse data before starting the serialization
446 def represent_yaml_excuse(dumper: yaml.Dumper, data: Excuse) -> yaml.Node:
447 return dumper.represent_data(data.excusedata(excuses))
449 yaml.add_representer(Excuse, represent_yaml_excuse)
450 yaml.Dumper.add_multi_representer(
451 StrEnum, yaml.representer.Representer.represent_str
452 )
454 os.makedirs(os.path.dirname(dest_file), exist_ok=True)
455 opener: Opener = open # type: ignore[assignment]
456 if dest_file.endswith(".xz"): 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true
457 import lzma
459 opener = lzma.open # type: ignore[assignment]
460 elif dest_file.endswith(".gz"): 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true
461 import gzip
463 opener = gzip.open # type: ignore[assignment]
464 with opener(f"{dest_file}.new", "wt", encoding="utf-8") as f:
465 excusesdata = {
466 "sources": excuselist,
467 "generated-date": datetime.now(UTC),
468 }
469 yaml.dump(
470 excusesdata, stream=f, default_flow_style=False, allow_unicode=True
471 )
472 os.replace(f"{dest_file}.new", dest_file)
473 elif output_format is ExcusesOutputFormat.LEGACY_HTML:
474 with open(f"{dest_file}.new", "w", encoding="utf-8") as f:
475 f.write(
476 '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">\n'
477 )
478 f.write("<html><head><title>excuses...</title>")
479 f.write(
480 '<meta http-equiv="Content-Type" content="text/html;charset=utf-8"></head><body>\n'
481 )
482 f.write(
483 "<p>Generated: "
484 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
485 + "</p>\n"
486 )
487 f.write("<ul>\n")
488 for e in excuselist:
489 f.write("<li>%s" % e.html(excuses))
490 f.write("</ul></body></html>\n")
491 os.replace(f"{dest_file}.new", dest_file)
492 else: # pragma: no cover
493 raise ValueError('Output format must be either "YAML or "LEGACY_HTML"')
496def old_libraries(
497 mi_factory: "MigrationItemFactory",
498 suite_info: Suites,
499 outofsync_arches: Iterable[str] = frozenset(),
500) -> list["MigrationItem"]:
501 """Detect old libraries left in the target suite for smooth transitions
503 This method detects old libraries which are in the target suite but no
504 longer built from the source package: they are still there because
505 other packages still depend on them, but they should be removed as
506 soon as possible.
508 For "outofsync" architectures, outdated binaries are allowed to be in
509 the target suite, so they are only added to the removal list if they
510 are no longer in the (primary) source suite.
511 """
512 sources_t = suite_info.target_suite.sources
513 binaries_t = suite_info.target_suite.binaries
514 binaries_s = suite_info.primary_source_suite.binaries
515 removals = []
516 for arch in binaries_t:
517 for pkg_name in binaries_t[arch]:
518 pkg = binaries_t[arch][pkg_name]
519 if sources_t[pkg.source].version != pkg.source_version and (
520 arch not in outofsync_arches or pkg_name not in binaries_s[arch]
521 ):
522 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id))
523 return removals
526def is_nuninst_asgood_generous(
527 constraints: dict[str, list[str]],
528 allow_uninst: dict[str, set[str | None]],
529 architectures: list[str],
530 old: dict[str, set[str]],
531 new: dict[str, set[str]],
532 break_arches: set[str] = cast(set[str], frozenset()),
533) -> bool:
534 """Compares the nuninst counters and constraints to see if they improved
536 Given a list of architectures, the previous and the current nuninst
537 counters, this function determines if the current nuninst counter
538 is better than the previous one. Optionally it also accepts a set
539 of "break_arches", the nuninst counter for any architecture listed
540 in this set are completely ignored.
542 If the nuninst counters are equal or better, then the constraints
543 are checked for regressions (ignoring break_arches).
545 Returns True if the new nuninst counter is better than the
546 previous and there are no constraint regressions (ignoring Break-archs).
547 Returns False otherwise.
549 """
550 diff = 0
551 for arch in architectures:
552 if arch in break_arches:
553 continue
554 diff = diff + (
555 len(new[arch] - allow_uninst[arch]) - len(old[arch] - allow_uninst[arch])
556 )
557 if diff > 0:
558 return False
559 must_be_installable = constraints["keep-installable"]
560 for arch in architectures:
561 if arch in break_arches:
562 continue
563 regression = new[arch] - old[arch]
564 if not regression.isdisjoint(must_be_installable): 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true
565 return False
566 return True
569def clone_nuninst(
570 nuninst: dict[str, set[str]],
571 *,
572 packages_s: dict[str, dict[str, BinaryPackage]] | None = None,
573 architectures: Iterable[str] | None = None,
574) -> dict[str, set[str]]:
575 """Completely or Selectively deep clone nuninst
577 Given nuninst table, the package table for a given suite and
578 a list of architectures, this function will clone the nuninst
579 table. Only the listed architectures will be deep cloned -
580 the rest will only be shallow cloned. When packages_s is given,
581 packages not listed in packages_s will be pruned from the clone
582 (if packages_s is omitted, the per architecture nuninst is cloned
583 as-is)
584 """
585 clone = nuninst.copy()
586 if architectures is None: 586 ↛ 587line 586 didn't jump to line 587 because the condition on line 586 was never true
587 return clone
588 if packages_s is not None:
589 for arch in architectures:
590 clone[arch] = {x for x in nuninst[arch] if x in packages_s[arch]}
591 clone[arch + "+all"] = {
592 x for x in nuninst[arch + "+all"] if x in packages_s[arch]
593 }
594 else:
595 for arch in architectures:
596 clone[arch] = set(nuninst[arch])
597 clone[arch + "+all"] = set(nuninst[arch + "+all"])
598 return clone
601def test_installability(
602 target_suite: TargetSuite,
603 pkg_name: str,
604 pkg_id: BinaryPackageId,
605 broken: set[str],
606 nuninst_arch: set[str] | None,
607) -> None:
608 """Test for installability of a package on an architecture
610 (pkg_name, pkg_version, pkg_arch) is the package to check.
612 broken is the set of broken packages. If p changes
613 installability (e.g. goes from uninstallable to installable),
614 broken will be updated accordingly.
616 If nuninst_arch is not None then it also updated in the same
617 way as broken is.
618 """
619 if not target_suite.is_installable(pkg_id):
620 # if pkg_name not in broken: regression else: already broken
621 broken.add(pkg_name)
622 if nuninst_arch is not None:
623 nuninst_arch.add(pkg_name)
624 else:
625 # if pkg_name in broken: # improvement else: already not broken
626 broken.discard(pkg_name)
627 if nuninst_arch is not None:
628 nuninst_arch.discard(pkg_name)
631def check_installability(
632 target_suite: TargetSuite,
633 binaries: dict[str, dict[str, BinaryPackage]],
634 arch: str,
635 updates: set[BinaryPackageId],
636 check_archall: bool,
637 nuninst: dict[str, set[str]],
638) -> None:
639 broken = nuninst[arch + "+all"]
640 packages_t_a = binaries[arch]
642 for pkg_id in (x for x in updates if x.architecture == arch):
643 name, version, parch = pkg_id.package_name, pkg_id.version, pkg_id.architecture
644 if name not in packages_t_a:
645 continue
646 pkgdata = packages_t_a[name]
647 if version != pkgdata.version:
648 # Not the version in testing right now, ignore
649 continue
650 actual_arch = pkgdata.architecture
651 nuninst_arch = None
652 # only check arch:all packages if requested
653 if check_archall or actual_arch != "all":
654 nuninst_arch = nuninst[parch]
655 elif actual_arch == "all": 655 ↛ 657line 655 didn't jump to line 657 because the condition on line 655 was always true
656 nuninst[parch].discard(name)
657 test_installability(target_suite, name, pkg_id, broken, nuninst_arch)
660def possibly_compressed(
661 path: str, *, permitted_compressions: list[str] | None = None
662) -> str:
663 """Find and select a (possibly compressed) variant of a path
665 If the given path exists, it will be returned
667 :param path: The base path.
668 :param permitted_compressions: Alternative extensions to look for. Defaults to "gz" and "xz".
669 :return: The path given possibly with one of the permitted extensions.
670 :raises FileNotFoundError: if the path is not found
671 """
672 if os.path.exists(path): 672 ↛ 674line 672 didn't jump to line 674 because the condition on line 672 was always true
673 return path
674 if permitted_compressions is None:
675 permitted_compressions = ["gz", "xz"]
676 for ext in permitted_compressions:
677 cpath = f"{path}.{ext}"
678 if os.path.exists(cpath):
679 return cpath
680 raise FileNotFoundError(
681 errno.ENOENT, os.strerror(errno.ENOENT), path
682 ) # pragma: no cover
685def create_provides_map(
686 packages: dict[str, BinaryPackage],
687) -> dict[str, set[tuple[str, str]]]:
688 """Create a provides map from a map binary package names and their BinaryPackage objects
690 :param packages: A dict mapping binary package names to their BinaryPackage object
691 :return: A provides map
692 """
693 # create provides
694 provides = defaultdict(set)
696 for pkg, dpkg in packages.items():
697 if dpkg.provides is None:
698 continue
699 # register virtual packages and real packages that provide
700 # them
701 for provided_pkg, provided_version, _ in dpkg.provides:
702 provides[provided_pkg].add((pkg, provided_version))
704 return provides
707def read_release_file(suite_dir: str) -> "TagSection[str]":
708 """Parses a given "Release" file
710 :param suite_dir: The directory to the suite
711 :return: A dict of the first (and only) paragraph in an Release file
712 """
713 release_file = os.path.join(suite_dir, "Release")
714 with open(release_file) as fd:
715 tag_file = iter(apt_pkg.TagFile(fd))
716 result = next(tag_file)
717 if next(tag_file, None) is not None: # pragma: no cover
718 raise TypeError("%s has more than one paragraph" % release_file)
719 return result
722def read_sources_file(
723 filename: str,
724 sources: dict[str, SourcePackage] | None = None,
725 add_faux: bool = True,
726 sources_target_suite: dict[str, SourcePackage] | None = None,
727 intern: Callable[[str], str] = sys.intern,
728) -> dict[str, SourcePackage]:
729 """Parse a single Sources file into a hash
731 Parse a single Sources file into a dict mapping a source package
732 name to a SourcePackage object. If there are multiple source
733 packages with the same version, then highest versioned source
734 package (that is not marked as "Extra-Source-Only") is the
735 version kept in the dict.
737 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile
738 :param sources: Optional dict to add the packages to. If given, this is also the value returned.
739 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all
740 :param sources_target_suite: SourcPackages loaded from the target suite for memory optimizations
741 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop
742 :return: mapping from names to a source package
743 """
744 if sources is None:
745 sources = {}
746 if sources_target_suite is None:
747 sources_target_suite = {}
749 tag_file = apt_pkg.TagFile(filename)
750 get_field = tag_file.section.get
751 step = tag_file.step
753 while step():
754 if get_field("Extra-Source-Only", "no") == "yes":
755 # Ignore sources only referenced by Built-Using
756 continue
757 pkg = get_field("Package")
758 ver = get_field("Version")
759 # There may be multiple versions of the source package
760 # (in unstable) if some architectures have out-of-date
761 # binaries. We only ever consider the source with the
762 # largest version for migration.
763 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0:
764 continue
765 maint = get_field("Maintainer")
766 if maint: 766 ↛ 768line 766 didn't jump to line 768 because the condition on line 766 was always true
767 maint = intern(maint.strip())
768 section = get_field("Section")
769 if section: 769 ↛ 772line 769 didn't jump to line 772 because the condition on line 769 was always true
770 section = intern(section.strip())
771 build_deps_arch: str | None
772 build_deps_arch = ", ".join(
773 x
774 for x in (get_field("Build-Depends"), get_field("Build-Depends-Arch"))
775 if x is not None
776 )
777 if build_deps_arch != "":
778 build_deps_arch = sys.intern(build_deps_arch)
779 else:
780 build_deps_arch = None
781 build_deps_indep = get_field("Build-Depends-Indep")
782 if build_deps_indep is not None:
783 build_deps_indep = sys.intern(build_deps_indep)
785 # Adding arch:all packages to the list of binaries already to be able
786 # to check for them later. Helps mitigate bug 887060 and is the
787 # (partial?) answer to bug 1064428.
788 binaries: set[BinaryPackageId] = set()
789 if add_faux and "all" in get_field("Architecture", "").split():
790 # the value "faux" in arch:faux is used elsewhere, so keep in sync
791 pkg_id = BinaryPackageId(f"{pkg}-faux", intern("0~~~~"), intern("faux"))
792 binaries.add(pkg_id)
794 pkg = intern(pkg)
795 ver = intern(ver)
796 sources[pkg] = srcpkg = SourcePackage(
797 pkg,
798 ver,
799 section,
800 binaries,
801 maint,
802 False,
803 build_deps_arch,
804 build_deps_indep,
805 get_field("Testsuite", "").split(),
806 get_field("Testsuite-Triggers", "").replace(",", "").split(),
807 )
809 if (
810 srcpkg_target := sources_target_suite.get(pkg, None)
811 ) is not None and srcpkg_target.version == ver:
812 # If the source package exists and the version is the same, reuse the already stored data.
813 # Note that the binaries field may be different if cruft packages are involved.
814 srcpkg.build_deps_arch = srcpkg_target.build_deps_arch
815 srcpkg.build_deps_indep = srcpkg_target.build_deps_indep
816 srcpkg.testsuite = srcpkg_target.testsuite
817 srcpkg.testsuite_triggers = srcpkg_target.testsuite_triggers
818 return sources
821def _check_and_update_packages(
822 packages: list[BinaryPackage],
823 package: BinaryPackage,
824 archqual: str | None,
825 build_depends: bool,
826) -> None:
827 """Helper for get_dependency_solvers
829 This method updates the list of packages with a given package if that
830 package is a valid (Build-)Depends.
832 :param packages: which packages are to be updated
833 :param archqual: Architecture qualifier
834 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency.
835 """
837 # See also bug #971739 and #1059929
838 if archqual is None:
839 packages.append(package)
840 elif archqual == "native" and build_depends:
841 # Multi-arch handling for build-dependencies
842 # - :native is ok always
843 packages.append(package)
844 elif archqual == "any" and package.multi_arch is MultiArch.ALLOWED:
845 # Multi-arch handling for both build-dependencies and regular dependencies
846 # - :any is ok iff the target has "M-A: allowed"
847 packages.append(package)
850class GetDependencySolversProto(Protocol):
851 def __call__( 851 ↛ exitline 851 didn't jump to the function exit
852 self,
853 block: list[tuple[str, str, str]],
854 binaries_s_a: dict[str, BinaryPackage],
855 provides_s_a: dict[str, set[tuple[str, str]]],
856 *,
857 build_depends: bool = False,
858 empty_set: Any = frozenset(),
859 ) -> list[BinaryPackage]: ...
862def get_dependency_solvers(
863 block: list[tuple[str, str, str]],
864 binaries_s_a: dict[str, BinaryPackage],
865 provides_s_a: dict[str, set[tuple[str, str]]],
866 *,
867 build_depends: bool = False,
868 empty_set: Any = frozenset(),
869) -> list[BinaryPackage]:
870 """Find the packages which satisfy a dependency block
872 This method returns the list of packages which satisfy a dependency
873 block (as returned by apt_pkg.parse_depends) in a package table
874 for a given suite and architecture (a la self.binaries[suite][arch])
876 It can also handle build-dependency relations if the named parameter
877 "build_depends" is set to True. In this case, block should be based
878 on the return value from apt_pkg.parse_src_depends.
880 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends
881 if the "build_depends" is True)
882 :param binaries_s_a: Mapping of package names to the relevant BinaryPackage
883 :param provides_s_a: Mapping of package names to their providers (as generated by parse_provides)
884 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than
885 a regular dependency relation.
886 :param empty_set: Internal implementation detail / optimisation
887 :return: package names solving the relation
888 """
889 packages: list[BinaryPackage] = []
891 # for every package, version and operation in the block
892 for name, version, op in block:
893 if ":" in name:
894 name, archqual = name.split(":", 1)
895 else:
896 archqual = None
898 # look for the package in unstable
899 if name in binaries_s_a:
900 package = binaries_s_a[name]
901 # check the versioned dependency and architecture qualifier
902 # (if present)
903 if (op == "" and version == "") or apt_pkg.check_dep(
904 package.version, op, version
905 ):
906 _check_and_update_packages(packages, package, archqual, build_depends)
908 # look for the package in the virtual packages list and loop on them
909 for prov, prov_version in provides_s_a.get(name, empty_set):
910 assert prov in binaries_s_a
911 package = binaries_s_a[prov]
912 # See Policy Manual §7.5
913 if (op == "" and version == "") or (
914 prov_version != "" and apt_pkg.check_dep(prov_version, op, version)
915 ):
916 _check_and_update_packages(packages, package, archqual, build_depends)
918 return packages
921def invalidate_excuses(
922 excuses: dict[str, "Excuse"],
923 valid: set[str],
924 invalid: set[str],
925 invalidated: set[str],
926) -> None:
927 """Invalidate impossible excuses
929 This method invalidates the impossible excuses, which depend
930 on invalid excuses. The two parameters contains the sets of
931 `valid' and `invalid' excuses.
932 """
933 # make a list of all packages (source and binary) that are present in the
934 # excuses we have
935 excuses_packages: dict[PackageId | BinaryPackageId, set[str]] = defaultdict(set)
936 for exc in excuses.values():
937 for arch in exc.packages:
938 for pkg_arch_id in exc.packages[arch]:
939 # note that the same package can be in multiple excuses
940 # eg. when unstable and TPU have the same packages
941 excuses_packages[pkg_arch_id].add(exc.name)
943 # create dependencies between excuses based on packages
944 excuses_rdeps = defaultdict(set)
945 for exc in excuses.values():
946 # Note that excuses_rdeps is only populated by dependencies generated
947 # based on packages below. There are currently no dependencies between
948 # excuses that are added directly, so this is ok.
950 for pkg_dep in exc.depends_packages:
951 # set of excuses, each of which can satisfy this specific
952 # dependency
953 # if there is a dependency on a package for which no
954 # excuses exist (e.g. a cruft binary), the set will
955 # contain an ImpossibleDependencyState
956 dep_exc: set[str | DependencyState] = set()
957 for pkg_dep_id in cast(set[BinaryPackageId], pkg_dep.deps):
958 pkg_excuses = excuses_packages[pkg_dep_id]
959 # if the dependency isn't found, we get an empty set
960 if pkg_excuses == frozenset():
961 imp_dep = ImpossibleDependencyState(
962 PolicyVerdict.REJECTED_PERMANENTLY, "%s" % (pkg_dep_id.name)
963 )
964 dep_exc.add(imp_dep)
966 else:
967 dep_exc |= pkg_excuses
968 for e in pkg_excuses:
969 excuses_rdeps[e].add(exc.name)
970 if not exc.add_dependency(dep_exc, pkg_dep.spec):
971 valid.discard(exc.name)
972 invalid.add(exc.name)
974 # loop on the invalid excuses
975 # Convert invalid to a list for deterministic results
976 invalid2 = sorted(invalid)
977 for ename in iter_except(invalid2.pop, IndexError):
978 invalidated.add(ename)
979 # if there is no reverse dependency, skip the item
980 if ename not in excuses_rdeps:
981 continue
983 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM
984 if excuses[ename].policy_verdict.is_blocked:
985 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM
987 # loop on the reverse dependencies
988 for x in sorted(excuses_rdeps[ename]):
989 exc = excuses[x]
990 # if the item is valid and it is not marked as `forced', then we
991 # invalidate this specific dependency
992 if x in valid and not exc.forced:
993 # mark this specific dependency as invalid
994 still_valid = exc.invalidate_dependency(ename, rdep_verdict)
996 # if there are no alternatives left for this dependency,
997 # invalidate the excuse
998 if not still_valid:
999 valid.discard(x)
1000 invalid2.append(x)
1003def compile_nuninst(
1004 target_suite: TargetSuite, architectures: list[str], nobreakall_arches: list[str]
1005) -> dict[str, set[str]]:
1006 """Compile a nuninst dict from the current testing
1008 :param target_suite: The target suite
1009 :param architectures: Which architectures to check
1010 :param nobreakall_arches: Which architectures where arch:all packages must be installable
1011 """
1012 nuninst: dict[str, set[str]] = {}
1013 binaries_t = target_suite.binaries
1015 # for all the architectures
1016 for arch in architectures:
1017 # if it is in the nobreakall ones, check arch-independent packages too
1018 check_archall = arch in nobreakall_arches
1020 # check all the packages for this architecture
1021 nuninst[arch] = set()
1022 packages_t_a = binaries_t[arch]
1023 for pkg_name, pkg_data in packages_t_a.items():
1024 r = target_suite.is_installable(pkg_data.pkg_id)
1025 if not r:
1026 nuninst[arch].add(pkg_name)
1028 # if they are not required, remove architecture-independent packages
1029 nuninst[arch + "+all"] = nuninst[arch].copy()
1030 if not check_archall:
1031 for pkg_name in nuninst[arch + "+all"]:
1032 pkg_data = packages_t_a[pkg_name]
1033 if pkg_data.architecture == "all":
1034 nuninst[arch].remove(pkg_name)
1036 return nuninst
1039def is_smooth_update_allowed(
1040 binary: BinaryPackage, smooth_updates: list[str], hints: "HintCollection"
1041) -> bool:
1042 if "ALL" in smooth_updates: 1042 ↛ 1043line 1042 didn't jump to line 1043 because the condition on line 1042 was never true
1043 return True
1044 section = binary.section.split("/")[-1]
1045 if section in smooth_updates:
1046 return True
1047 # note that this needs to match the source version *IN TESTING*
1048 return hints.has_hint(
1049 "allow-smooth-update", package=binary.source, version=binary.source_version
1050 )
1053def find_smooth_updateable_binaries(
1054 binaries_to_check: list[BinaryPackageId],
1055 source_data: SourcePackage,
1056 pkg_universe: "BinaryPackageUniverse",
1057 target_suite: TargetSuite,
1058 binaries_t: dict[str, dict[str, BinaryPackage]],
1059 binaries_s: dict[str, dict[str, BinaryPackage]],
1060 removals: set[BinaryPackageId] | frozenset[BinaryPackageId],
1061 smooth_updates: list[str],
1062 hints: "HintCollection",
1063) -> set[BinaryPackageId]:
1064 check: set[BinaryPackageId] = set()
1065 smoothbins: set[BinaryPackageId] = set()
1067 binaries_to_check_set = set(binaries_to_check)
1068 for check_pkg_id in binaries_to_check:
1069 binary, parch = check_pkg_id.package_name, check_pkg_id.architecture
1071 cruftbins: set[BinaryPackageId] = set()
1073 # Not a candidate for smooth up date (newer non-cruft version in unstable)
1074 if binary in binaries_s[parch]:
1075 if binaries_s[parch][binary].source_version == source_data.version:
1076 continue
1077 cruftbins.add(binaries_s[parch][binary].pkg_id)
1079 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it.
1080 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints):
1081 # if the package has reverse-dependencies which are
1082 # built from other sources, it's a valid candidate for
1083 # a smooth update. if not, it may still be a valid
1084 # candidate if one if its r-deps is itself a candidate,
1085 # so note it for checking later
1086 #
1087 # We ignore all binaries listed in "removals" as we
1088 # assume they will leave at the same time as the
1089 # given package.
1090 rdeps = {
1091 x
1092 for x in pkg_universe.reverse_dependencies_of(check_pkg_id)
1093 if x not in removals and x not in binaries_to_check_set
1094 }
1096 smooth_update_it = False
1097 if target_suite.any_of_these_are_in_the_suite(rdeps):
1098 for rdep in rdeps:
1099 # each dependency clause has a set of possible
1100 # alternatives that can satisfy that dependency.
1101 # if any of them is outside the set of smoothbins, the
1102 # dependency can be satisfied even if this binary was
1103 # removed, so there is no need to keep it around for a
1104 # smooth update
1105 # if not, only this binary can satisfy the dependency, so
1106 # we should keep it around until the rdep is no longer in
1107 # testing
1108 for dep_clause in pkg_universe.dependencies_of(rdep):
1109 # filter out cruft binaries from unstable, because
1110 # they will not be added to the set of packages that
1111 # will be migrated
1112 if all(
1113 x in smoothbins or x == check_pkg_id
1114 for x in dep_clause
1115 if x not in cruftbins
1116 ):
1117 smoothbins.add(check_pkg_id)
1118 smooth_update_it = True
1119 break
1121 if not smooth_update_it:
1122 check.add(check_pkg_id)
1124 # check whether we should perform a smooth update for
1125 # packages which are candidates but do not have r-deps
1126 # outside of the current source
1127 while 1:
1128 found_any = False
1129 for candidate_pkg_id in check:
1130 rdeps = pkg_universe.reverse_dependencies_of(candidate_pkg_id)
1131 if not rdeps.isdisjoint(smoothbins):
1132 smoothbins.add(candidate_pkg_id)
1133 found_any = True
1134 if not found_any:
1135 break
1136 check = {x for x in check if x not in smoothbins}
1138 return smoothbins
1141def find_newer_binaries(
1142 suite_info: Suites, pkg: BinaryPackage, add_source_for_dropped_bin: bool = False
1143) -> Generator[tuple[PackageId, Suite], None, None]:
1144 """
1145 Find newer binaries for pkg in any of the source suites.
1147 :param pkg: BinaryPackage (is assumed to be in the target suite)
1149 :param add_source_for_dropped_bin: If True, newer versions of the
1150 source of pkg will be added if they don't have the binary pkg
1152 :return: the newer binaries (or sources) and their suites
1153 """
1154 source = pkg.source
1155 for suite in suite_info:
1156 if suite.suite_class is SuiteClass.TARGET_SUITE:
1157 continue
1159 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture)
1160 if not suite_binaries_on_arch: 1160 ↛ 1161line 1160 didn't jump to line 1161 because the condition on line 1160 was never true
1161 continue
1163 newerbin = None
1164 if pkg.pkg_id.package_name in suite_binaries_on_arch:
1165 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name]
1166 if suite.is_cruft(newerbin):
1167 # We pretend the cruft binary doesn't exist.
1168 # We handle this as if the source didn't have the binary
1169 # (see below)
1170 newerbin = None
1171 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0:
1172 continue
1173 else:
1174 if source not in suite.sources:
1175 # bin and source not in suite: no newer version
1176 continue
1178 if not newerbin:
1179 if not add_source_for_dropped_bin: 1179 ↛ 1180line 1179 didn't jump to line 1180 because the condition on line 1179 was never true
1180 continue
1181 # We only get here if there is a newer version of the source,
1182 # which doesn't have the binary anymore (either it doesn't
1183 # exist, or it's cruft and we pretend it doesn't exist).
1184 # Add the new source instead.
1185 nsrc = suite.sources[source]
1186 n_id = PackageId(source, nsrc.version, "source")
1187 overs = pkg.source_version
1188 if apt_pkg.version_compare(nsrc.version, overs) <= 0:
1189 continue
1190 else:
1191 n_id = newerbin.pkg_id
1193 yield (n_id, suite)
1196def parse_provides(
1197 provides_raw: str,
1198 pkg_id: BinaryPackageId | None = None,
1199 logger: logging.Logger | None = None,
1200) -> list[tuple[str, str, str]]:
1201 parts = apt_pkg.parse_depends(provides_raw, False)
1202 nprov = []
1203 for or_clause in parts:
1204 if len(or_clause) != 1: # pragma: no cover
1205 if logger is not None:
1206 msg = "Ignoring invalid provides in %s: Alternatives [%s]"
1207 logger.warning(msg, str(pkg_id), str(or_clause))
1208 continue
1209 for part in or_clause:
1210 provided, provided_version, op = part
1211 if op != "" and op != "=": # pragma: no cover
1212 if logger is not None:
1213 msg = "Ignoring invalid provides in %s: %s (%s %s)"
1214 logger.warning(msg, str(pkg_id), provided, op, provided_version)
1215 continue
1216 provided = sys.intern(provided)
1217 provided_version = sys.intern(provided_version)
1218 part = (provided, provided_version, sys.intern(op))
1219 nprov.append(part)
1220 return nprov
1223def parse_builtusing(
1224 builtusing_raw: str,
1225 pkg_id: BinaryPackageId | None = None,
1226 logger: logging.Logger | None = None,
1227) -> list[tuple[str, str]]:
1228 parts = apt_pkg.parse_depends(builtusing_raw, False)
1229 nbu = []
1230 for or_clause in parts:
1231 if len(or_clause) != 1: # pragma: no cover
1232 if logger is not None:
1233 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]"
1234 logger.warning(msg, str(pkg_id), str(or_clause))
1235 continue
1236 for part in or_clause:
1237 bu, bu_version, op = part
1238 if op != "=": # pragma: no cover
1239 if logger is not None:
1240 msg = "Ignoring invalid builtusing in %s: %s (%s %s)"
1241 logger.warning(msg, str(pkg_id), bu, op, bu_version)
1242 continue
1243 bu = sys.intern(bu)
1244 bu_version = sys.intern(bu_version)
1245 nbu.append((bu, bu_version))
1246 return nbu
1249def parse_option(
1250 options: "optparse.Values",
1251 option_name: str,
1252 default: Any | None = None,
1253 to_bool: bool = False,
1254 to_int: bool = False,
1255 day_to_sec: bool = False,
1256) -> None:
1257 """Ensure the option exist and has a sane value
1259 :param options: dict with options
1261 :param option_name: string with the name of the option
1263 :param default: the default value for the option
1265 :param to_int: convert the input to int (defaults to sys.maxsize)
1267 :param to_bool: convert the input to bool
1269 :param day_to_sec: convert the input from days to seconds (implies to_int=True)
1270 """
1271 value = getattr(options, option_name, default)
1273 # Option was provided with no value (or default is '') so pick up the default
1274 if value == "":
1275 value = default
1277 if (to_int or day_to_sec) and value in (None, ""):
1278 value = sys.maxsize
1280 if day_to_sec:
1281 value = int(float(value) * 24 * 60 * 60) # type: ignore[arg-type]
1283 if to_int:
1284 value = int(value) # type: ignore[arg-type]
1286 if to_bool:
1287 if value and (
1288 isinstance(value, bool) or value.lower() in ("yes", "y", "true", "t", "1")
1289 ):
1290 value = True
1291 else:
1292 value = False
1294 setattr(options, option_name, value)
1297def filter_out_faux_gen(
1298 binaries: Iterable[BinaryPackageId],
1299) -> Generator[BinaryPackageId, None, None]:
1300 """Generator for packages without faux packages"""
1302 for pkg in binaries:
1303 if not pkg.package_name.endswith("-faux-build-depends"):
1304 yield pkg
1307def filter_out_faux(binaries: Iterable[BinaryPackageId]) -> set[BinaryPackageId]:
1308 """Returns a set without faux packages"""
1310 return {pkg for pkg in filter_out_faux_gen(binaries)}
1313def binaries_from_source_version(
1314 source_data: SourcePackage, suite_info: Suites
1315) -> tuple[set[BinaryPackageId], str]:
1316 """Returns a set of real bid with only packages from this source version"""
1318 binaries = source_data.binaries.copy()
1319 # We don't know from which suite the source version comes
1320 for suite in suite_info.source_suites: 1320 ↛ 1331line 1320 didn't jump to line 1331 because the loop on line 1320 didn't complete
1321 # But if it's there, we assume it will have all the associated binaries
1322 if source_data.source in suite.sources: 1322 ↛ 1320line 1322 didn't jump to line 1320 because the condition on line 1322 was always true
1323 for bid in binaries.copy():
1324 if (
1325 suite.all_binaries_in_suite[bid].source_version
1326 != source_data.version
1327 ):
1328 binaries.remove(bid)
1329 break
1331 return filter_out_faux(binaries), suite.name
1334def get_component(section: str) -> str:
1335 """Returns the component based on the section"""
1337 # horrible hard-coding, but currently, we don't keep track of the component
1338 # when loading the packages files, but let's centralize it here
1339 component = "main"
1340 if "/" in section:
1341 component = section.split("/")[0]
1342 return component