Coverage for britney2/utils.py: 91%
469 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
1# -*- coding: utf-8 -*-
3# Refactored parts from britney.py, which is/was:
4# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org>
5# Andreas Barth <aba@debian.org>
6# Fabio Tranchitella <kobold@debian.org>
7# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org>
8# Copyright (C) 2012 Niels Thykier <niels@thykier.net>
9#
10# New portions
11# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
13# This program is free software; you can redistribute it and/or modify
14# it under the terms of the GNU General Public License as published by
15# the Free Software Foundation; either version 2 of the License, or
16# (at your option) any later version.
18# This program is distributed in the hope that it will be useful,
19# but WITHOUT ANY WARRANTY; without even the implied warranty of
20# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21# GNU General Public License for more details.
24import errno
25import logging
26import optparse
27import os
28import sys
29import time
30from collections import defaultdict
31from collections.abc import Callable, Container, MutableSet, Iterable, Iterator, Mapping
32from datetime import datetime
33from functools import partial
34from itertools import chain, filterfalse
35from typing import (
36 IO,
37 TYPE_CHECKING,
38 Any,
39 Literal,
40 Optional,
41 Protocol,
42 TypeVar,
43 Union,
44 cast,
45 overload,
46)
48import apt_pkg
49import yaml
51from britney2 import (
52 BinaryPackage,
53 BinaryPackageId,
54 PackageId,
55 SourcePackage,
56 Suite,
57 SuiteClass,
58 Suites,
59 TargetSuite,
60)
61from britney2.excusedeps import DependencyState, ImpossibleDependencyState
62from britney2.policies import PolicyVerdict
64if TYPE_CHECKING: 64 ↛ 66line 64 didn't jump to line 66, because the condition on line 64 was never true
66 from _typeshed import SupportsRichComparisonT
67 from apt_pkg import TagSection
69 from .excuse import Excuse
70 from .hints import HintCollection
71 from .installability.universe import BinaryPackageUniverse
72 from .migrationitem import MigrationItem, MigrationItemFactory
74_T = TypeVar("_T")
77class MigrationConstraintException(Exception):
78 pass
81@overload
82def ifilter_except( 82 ↛ exitline 82 didn't jump to the function exit
83 container: Container[_T], iterable: Literal[None] = None
84) -> "partial[filterfalse[_T]]": ...
87@overload
88def ifilter_except( 88 ↛ exitline 88 didn't jump to the function exit
89 container: Container[_T], iterable: Iterable[_T]
90) -> "filterfalse[_T]": ...
93def ifilter_except(
94 container: Container[_T], iterable: Optional[Iterable[_T]] = None
95) -> Union["filterfalse[_T]", "partial[filterfalse[_T]]"]:
96 """Filter out elements in container
98 If given an iterable it returns a filtered iterator, otherwise it
99 returns a function to generate filtered iterators. The latter is
100 useful if the same filter has to be (re-)used on multiple
101 iterators that are not known on beforehand.
102 """
103 if iterable is not None: 103 ↛ 104line 103 didn't jump to line 104, because the condition on line 103 was never true
104 return filterfalse(container.__contains__, iterable)
105 return cast(
106 "partial[filterfalse[_T]]", partial(filterfalse, container.__contains__)
107 )
110@overload
111def ifilter_only(container: Container[_T], iterable: Iterable[_T]) -> "filter[_T]": ... 111 ↛ exitline 111 didn't return from function 'ifilter_only'
114@overload
115def ifilter_only(container: Container[_T], iterable: None) -> "partial[filter[_T]]": ... 115 ↛ exitline 115 didn't return from function 'ifilter_only'
118def ifilter_only(
119 container: Container[_T], iterable: Optional[Iterable[_T]] = None
120) -> Union["filter[_T]", "partial[filter[_T]]"]:
121 """Filter out elements in which are not in container
123 If given an iterable it returns a filtered iterator, otherwise it
124 returns a function to generate filtered iterators. The latter is
125 useful if the same filter has to be (re-)used on multiple
126 iterators that are not known on beforehand.
127 """
128 if iterable is not None: 128 ↛ 130line 128 didn't jump to line 130, because the condition on line 128 was never false
129 return filter(container.__contains__, iterable)
130 return partial(filter, container.__contains__)
133# iter_except is from the "itertools" recipe
134def iter_except(
135 func: Callable[[], _T],
136 exception: type[BaseException] | tuple[type[BaseException], ...],
137 first: Any = None,
138) -> Iterator[_T]: # pragma: no cover - itertools recipe function
139 """Call a function repeatedly until an exception is raised.
141 Converts a call-until-exception interface to an iterator interface.
142 Like __builtin__.iter(func, sentinel) but uses an exception instead
143 of a sentinel to end the loop.
145 Examples:
146 bsddbiter = iter_except(db.next, bsddb.error, db.first)
147 heapiter = iter_except(functools.partial(heappop, h), IndexError)
148 dictiter = iter_except(d.popitem, KeyError)
149 dequeiter = iter_except(d.popleft, IndexError)
150 queueiter = iter_except(q.get_nowait, Queue.Empty)
151 setiter = iter_except(s.pop, KeyError)
153 """
154 try:
155 if first is not None:
156 yield first()
157 while 1:
158 yield func()
159 except exception:
160 pass
163def log_and_format_old_libraries(
164 logger: logging.Logger, libs: list["MigrationItem"]
165) -> None:
166 """Format and log old libraries in a table (no header)"""
167 libraries: dict[str, list[str]] = {}
168 for i in libs:
169 pkg = i.package
170 if pkg in libraries:
171 libraries[pkg].append(i.architecture)
172 else:
173 libraries[pkg] = [i.architecture]
175 for lib in sorted(libraries):
176 logger.info(" %s: %s", lib, " ".join(libraries[lib]))
179def compute_reverse_tree(
180 pkg_universe: "BinaryPackageUniverse", affected: set[BinaryPackageId]
181) -> None:
182 """Calculate the full dependency tree for a set of packages
184 This method returns the full dependency tree for a given set of
185 packages. The first argument is an instance of the BinaryPackageUniverse
186 and the second argument are a set of BinaryPackageId.
188 The set of affected packages will be updated in place and must
189 therefore be mutable.
190 """
191 remain = list(affected)
192 while remain:
193 pkg_id = remain.pop()
194 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected
195 affected.update(new_pkg_ids)
196 remain.extend(new_pkg_ids)
199def add_transitive_dependencies_flatten(
200 pkg_universe: "BinaryPackageUniverse", initial_set: MutableSet[BinaryPackageId]
201) -> None:
202 """Find and include all transitive dependencies
204 This method updates the initial_set parameter to include all transitive
205 dependencies. The first argument is an instance of the BinaryPackageUniverse
206 and the second argument are a set of BinaryPackageId.
208 The set of initial packages will be updated in place and must
209 therefore be mutable.
210 """
211 remain = list(initial_set)
212 while remain:
213 pkg_id = remain.pop()
214 new_pkg_ids = {
215 x
216 for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id))
217 if x not in initial_set
218 }
219 initial_set |= new_pkg_ids
220 remain.extend(new_pkg_ids)
223def write_nuninst(filename: str, nuninst: dict[str, set[str]]) -> None:
224 """Write the non-installable report
226 Write the non-installable report derived from "nuninst" to the
227 file denoted by "filename".
228 """
229 with open(filename, "w", encoding="utf-8") as f:
230 # Having two fields with (almost) identical dates seems a bit
231 # redundant.
232 f.write(
233 "Built on: "
234 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
235 + "\n"
236 )
237 f.write(
238 "Last update: "
239 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
240 + "\n\n"
241 )
242 for k in nuninst:
243 f.write("%s: %s\n" % (k, " ".join(nuninst[k])))
246def read_nuninst(filename: str, architectures: set[str]) -> dict[str, set[str]]:
247 """Read the non-installable report
249 Read the non-installable report from the file denoted by
250 "filename" and return it. Only architectures in "architectures"
251 will be included in the report.
252 """
253 nuninst: dict[str, set[str]] = {}
254 with open(filename, encoding="ascii") as f:
255 for r in f:
256 if ":" not in r:
257 continue
258 arch, packages = r.strip().split(":", 1)
259 if arch.split("+", 1)[0] in architectures:
260 nuninst[arch] = set(packages.split())
261 return nuninst
264def newly_uninst(
265 nuold: dict[str, set[str]], nunew: dict[str, set[str]]
266) -> dict[str, list[str]]:
267 """Return a nuninst statistic with only new uninstallable packages
269 This method subtracts the uninstallable packages of the statistic
270 "nunew" from the statistic "nuold".
272 It returns a dictionary with the architectures as keys and the list
273 of uninstallable packages as values. If there are no regressions
274 on a given architecture, then the architecture will be omitted in
275 the result. Accordingly, if none of the architectures have
276 regressions an empty directory is returned.
277 """
278 res: dict[str, list[str]] = {}
279 for arch in ifilter_only(nunew, nuold):
280 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]]
281 # Leave res empty if there are no newly uninst packages
282 if arch_nuninst:
283 res[arch] = arch_nuninst
284 return res
287def format_and_log_uninst(
288 logger: logging.Logger,
289 architectures: Iterable[str],
290 nuninst: Mapping[str, Iterable[str]],
291 *,
292 loglevel: int = logging.INFO,
293) -> None:
294 """Emits the uninstallable packages to the log
296 An example of the output string is:
297 * i386: broken-pkg1, broken-pkg2
299 Note that if there is no uninstallable packages, then nothing is emitted.
300 """
301 for arch in architectures:
302 if arch in nuninst and nuninst[arch]:
303 msg = " * %s: %s" % (arch, ", ".join(sorted(nuninst[arch])))
304 logger.log(loglevel, msg)
307class Sorted(Protocol):
308 def __call__( 308 ↛ exitline 308 didn't jump to the function exit
309 self,
310 iterable: Iterable["SupportsRichComparisonT"],
311 /,
312 *,
313 key: None = None,
314 reverse: bool = False,
315 ) -> list["SupportsRichComparisonT"]: ...
318def write_heidi(
319 filename: str,
320 target_suite: TargetSuite,
321 *,
322 outofsync_arches: frozenset[str] = frozenset(),
323 sorted: Sorted = sorted,
324) -> None:
325 """Write the output HeidiResult
327 This method write the output for Heidi, which contains all the
328 binary packages and the source packages in the form:
330 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section>
331 <src-name> <src-version> source <src-section>
333 The file is written as "filename" using the sources and packages
334 from the "target_suite" parameter.
336 outofsync_arches: If given, it is a set of architectures marked
337 as "out of sync". The output file may exclude some out of date
338 arch:all packages for those architectures to reduce the noise.
340 The "X=X" parameters are optimizations to avoid "load global" in
341 the loops.
342 """
343 sources_t = target_suite.sources
344 packages_t = target_suite.binaries
346 with open(filename, "w", encoding="ascii") as f:
348 # write binary packages
349 for arch in sorted(packages_t):
350 binaries = packages_t[arch]
351 for pkg_name in sorted(binaries):
352 pkg = binaries[pkg_name]
353 pkgv = pkg.version
354 pkgarch = pkg.architecture or "all"
355 pkgsec = pkg.section or "faux"
356 if pkgsec == "faux" or pkgsec.endswith("/faux"):
357 # Faux package; not really a part of testing
358 continue
359 if ( 359 ↛ 371line 359 didn't jump to line 371
360 pkg.source_version
361 and pkgarch == "all"
362 and pkg.source_version != sources_t[pkg.source].version
363 and arch in outofsync_arches
364 ):
365 # when architectures are marked as "outofsync", their binary
366 # versions may be lower than those of the associated
367 # source package in testing. the binary package list for
368 # such architectures will include arch:all packages
369 # matching those older versions, but we only want the
370 # newer arch:all in testing
371 continue
372 f.write("%s %s %s %s\n" % (pkg_name, pkgv, pkgarch, pkgsec))
374 # write sources
375 for src_name in sorted(sources_t):
376 src = sources_t[src_name]
377 srcv = src.version
378 srcsec = src.section or "unknown"
379 if srcsec == "faux" or srcsec.endswith("/faux"):
380 # Faux package; not really a part of testing
381 continue
382 f.write("%s %s source %s\n" % (src_name, srcv, srcsec))
385def write_heidi_delta(filename: str, all_selected: list["MigrationItem"]) -> None:
386 """Write the output delta
388 This method writes the packages to be upgraded, in the form:
389 <src-name> <src-version>
390 or (if the source is to be removed):
391 -<src-name> <src-version>
393 The order corresponds to that shown in update_output.
394 """
395 with open(filename, "w", encoding="ascii") as fd:
397 fd.write("#HeidiDelta\n")
399 for item in all_selected:
400 prefix = ""
402 if item.is_removal:
403 prefix = "-"
405 if item.architecture == "source":
406 fd.write("%s%s %s\n" % (prefix, item.package, item.version))
407 else:
408 fd.write(
409 "%s%s %s %s\n"
410 % (prefix, item.package, item.version, item.architecture)
411 )
414class Opener(Protocol):
415 def __call__( 415 ↛ exitline 415 didn't jump to the function exit
416 self, file: str, mode: Literal["wt"], encoding: Literal["utf-8"]
417 ) -> IO[Any]: ...
420def write_excuses(
421 excuses: Union[dict[str, "Excuse"], dict[PackageId, "Excuse"]],
422 dest_file: str,
423 output_format: Literal["yaml", "legacy-html"] = "yaml",
424) -> None:
425 """Write the excuses to dest_file
427 Writes a list of excuses in a specified output_format to the
428 path denoted by dest_file. The output_format can either be "yaml"
429 or "legacy-html".
430 """
431 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey())
432 if output_format == "yaml":
433 os.makedirs(os.path.dirname(dest_file), exist_ok=True)
434 opener: Opener = open # type: ignore[assignment]
435 if dest_file.endswith(".xz"): 435 ↛ 436line 435 didn't jump to line 436, because the condition on line 435 was never true
436 import lzma
438 opener = lzma.open # type: ignore[assignment]
439 elif dest_file.endswith(".gz"): 439 ↛ 440line 439 didn't jump to line 440, because the condition on line 439 was never true
440 import gzip
442 opener = gzip.open # type: ignore[assignment]
443 with opener(dest_file, "wt", encoding="utf-8") as f:
444 edatalist = [e.excusedata(excuses) for e in excuselist]
445 excusesdata = {
446 "sources": edatalist,
447 "generated-date": datetime.utcnow(),
448 }
449 f.write(
450 yaml.dump(excusesdata, default_flow_style=False, allow_unicode=True)
451 )
452 elif output_format == "legacy-html":
453 with open(dest_file, "w", encoding="utf-8") as f:
454 f.write(
455 '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">\n'
456 )
457 f.write("<html><head><title>excuses...</title>")
458 f.write(
459 '<meta http-equiv="Content-Type" content="text/html;charset=utf-8"></head><body>\n'
460 )
461 f.write(
462 "<p>Generated: "
463 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time()))
464 + "</p>\n"
465 )
466 f.write("<ul>\n")
467 for e in excuselist:
468 f.write("<li>%s" % e.html(excuses))
469 f.write("</ul></body></html>\n")
470 else: # pragma: no cover
471 raise ValueError('Output format must be either "yaml or "legacy-html"')
474def old_libraries(
475 mi_factory: "MigrationItemFactory",
476 suite_info: Suites,
477 outofsync_arches: Iterable[str] = frozenset(),
478) -> list["MigrationItem"]:
479 """Detect old libraries left in the target suite for smooth transitions
481 This method detects old libraries which are in the target suite but no
482 longer built from the source package: they are still there because
483 other packages still depend on them, but they should be removed as
484 soon as possible.
486 For "outofsync" architectures, outdated binaries are allowed to be in
487 the target suite, so they are only added to the removal list if they
488 are no longer in the (primary) source suite.
489 """
490 sources_t = suite_info.target_suite.sources
491 binaries_t = suite_info.target_suite.binaries
492 binaries_s = suite_info.primary_source_suite.binaries
493 removals = []
494 for arch in binaries_t:
495 for pkg_name in binaries_t[arch]:
496 pkg = binaries_t[arch][pkg_name]
497 if sources_t[pkg.source].version != pkg.source_version and (
498 arch not in outofsync_arches or pkg_name not in binaries_s[arch]
499 ):
500 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id))
501 return removals
504def is_nuninst_asgood_generous(
505 constraints: dict[str, list[str]],
506 allow_uninst: dict[str, set[Optional[str]]],
507 architectures: list[str],
508 old: dict[str, set[str]],
509 new: dict[str, set[str]],
510 break_arches: set[str] = cast(set[str], frozenset()),
511) -> bool:
512 """Compares the nuninst counters and constraints to see if they improved
514 Given a list of architectures, the previous and the current nuninst
515 counters, this function determines if the current nuninst counter
516 is better than the previous one. Optionally it also accepts a set
517 of "break_arches", the nuninst counter for any architecture listed
518 in this set are completely ignored.
520 If the nuninst counters are equal or better, then the constraints
521 are checked for regressions (ignoring break_arches).
523 Returns True if the new nuninst counter is better than the
524 previous and there are no constraint regressions (ignoring Break-archs).
525 Returns False otherwise.
527 """
528 diff = 0
529 for arch in architectures:
530 if arch in break_arches:
531 continue
532 diff = diff + (
533 len(new[arch] - allow_uninst[arch]) - len(old[arch] - allow_uninst[arch])
534 )
535 if diff > 0:
536 return False
537 must_be_installable = constraints["keep-installable"]
538 for arch in architectures:
539 if arch in break_arches:
540 continue
541 regression = new[arch] - old[arch]
542 if not regression.isdisjoint(must_be_installable): 542 ↛ 543line 542 didn't jump to line 543, because the condition on line 542 was never true
543 return False
544 return True
547def clone_nuninst(
548 nuninst: dict[str, set[str]],
549 *,
550 packages_s: Optional[dict[str, dict[str, BinaryPackage]]] = None,
551 architectures: Optional[Iterable[str]] = None,
552) -> dict[str, set[str]]:
553 """Completely or Selectively deep clone nuninst
555 Given nuninst table, the package table for a given suite and
556 a list of architectures, this function will clone the nuninst
557 table. Only the listed architectures will be deep cloned -
558 the rest will only be shallow cloned. When packages_s is given,
559 packages not listed in packages_s will be pruned from the clone
560 (if packages_s is omitted, the per architecture nuninst is cloned
561 as-is)
562 """
563 clone = nuninst.copy()
564 if architectures is None: 564 ↛ 565line 564 didn't jump to line 565, because the condition on line 564 was never true
565 return clone
566 if packages_s is not None:
567 for arch in architectures:
568 clone[arch] = set(x for x in nuninst[arch] if x in packages_s[arch])
569 clone[arch + "+all"] = set(
570 x for x in nuninst[arch + "+all"] if x in packages_s[arch]
571 )
572 else:
573 for arch in architectures:
574 clone[arch] = set(nuninst[arch])
575 clone[arch + "+all"] = set(nuninst[arch + "+all"])
576 return clone
579def test_installability(
580 target_suite: TargetSuite,
581 pkg_name: str,
582 pkg_id: BinaryPackageId,
583 broken: set[str],
584 nuninst_arch: Optional[set[str]],
585) -> Literal[-1, 0, 1]:
586 """Test for installability of a package on an architecture
588 (pkg_name, pkg_version, pkg_arch) is the package to check.
590 broken is the set of broken packages. If p changes
591 installability (e.g. goes from uninstallable to installable),
592 broken will be updated accordingly.
594 If nuninst_arch is not None then it also updated in the same
595 way as broken is.
596 """
597 c: Literal[-1, 0, 1] = 0
598 r = target_suite.is_installable(pkg_id)
599 if not r:
600 # not installable
601 if pkg_name not in broken:
602 # regression
603 broken.add(pkg_name)
604 c = -1
605 if nuninst_arch is not None and pkg_name not in nuninst_arch:
606 nuninst_arch.add(pkg_name)
607 else:
608 if pkg_name in broken:
609 # Improvement
610 broken.remove(pkg_name)
611 c = 1
612 if nuninst_arch is not None and pkg_name in nuninst_arch:
613 nuninst_arch.remove(pkg_name)
614 return c
617def check_installability(
618 target_suite: TargetSuite,
619 binaries: dict[str, dict[str, BinaryPackage]],
620 arch: str,
621 updates: set[BinaryPackageId],
622 check_archall: bool,
623 nuninst: dict[str, set[str]],
624) -> None:
625 broken = nuninst[arch + "+all"]
626 packages_t_a = binaries[arch]
628 for pkg_id in (x for x in updates if x.architecture == arch):
629 name, version, parch = pkg_id
630 if name not in packages_t_a:
631 continue
632 pkgdata = packages_t_a[name]
633 if version != pkgdata.version:
634 # Not the version in testing right now, ignore
635 continue
636 actual_arch = pkgdata.architecture
637 nuninst_arch = None
638 # only check arch:all packages if requested
639 if check_archall or actual_arch != "all":
640 nuninst_arch = nuninst[parch]
641 elif actual_arch == "all": 641 ↛ 643line 641 didn't jump to line 643, because the condition on line 641 was never false
642 nuninst[parch].discard(name)
643 test_installability(target_suite, name, pkg_id, broken, nuninst_arch)
646def possibly_compressed(
647 path: str, *, permitted_compressions: Optional[list[str]] = None
648) -> str:
649 """Find and select a (possibly compressed) variant of a path
651 If the given path exists, it will be returned
653 :param path: The base path.
654 :param permitted_compressions: Alternative extensions to look for. Defaults to "gz" and "xz".
655 :return: The path given possibly with one of the permitted extensions.
656 :raises FileNotFoundError: if the path is not found
657 """
658 if os.path.exists(path): 658 ↛ 660line 658 didn't jump to line 660, because the condition on line 658 was never false
659 return path
660 if permitted_compressions is None:
661 permitted_compressions = ["gz", "xz"]
662 for ext in permitted_compressions:
663 cpath = "%s.%s" % (path, ext)
664 if os.path.exists(cpath):
665 return cpath
666 raise FileNotFoundError(
667 errno.ENOENT, os.strerror(errno.ENOENT), path
668 ) # pragma: no cover
671def create_provides_map(
672 packages: dict[str, BinaryPackage],
673) -> dict[str, set[tuple[str, str]]]:
674 """Create a provides map from a map binary package names and their BinaryPackage objects
676 :param packages: A dict mapping binary package names to their BinaryPackage object
677 :return: A provides map
678 """
679 # create provides
680 provides = defaultdict(set)
682 for pkg, dpkg in packages.items():
683 # register virtual packages and real packages that provide
684 # them
685 for provided_pkg, provided_version, _ in dpkg.provides:
686 provides[provided_pkg].add((pkg, provided_version))
688 return provides
691def read_release_file(suite_dir: str) -> "TagSection[str]":
692 """Parses a given "Release" file
694 :param suite_dir: The directory to the suite
695 :return: A dict of the first (and only) paragraph in an Release file
696 """
697 release_file = os.path.join(suite_dir, "Release")
698 with open(release_file) as fd:
699 tag_file = iter(apt_pkg.TagFile(fd))
700 result = next(tag_file)
701 if next(tag_file, None) is not None: # pragma: no cover
702 raise TypeError("%s has more than one paragraph" % release_file)
703 return result
706def read_sources_file(
707 filename: str,
708 sources: Optional[dict[str, SourcePackage]] = None,
709 add_faux: bool = True,
710 intern: Callable[[str], str] = sys.intern,
711) -> dict[str, SourcePackage]:
712 """Parse a single Sources file into a hash
714 Parse a single Sources file into a dict mapping a source package
715 name to a SourcePackage object. If there are multiple source
716 packages with the same version, then highest versioned source
717 package (that is not marked as "Extra-Source-Only") is the
718 version kept in the dict.
720 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile
721 :param sources: Optional dict to add the packages to. If given, this is also the value returned.
722 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all
723 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop
724 :return: mapping from names to a source package
725 """
726 if sources is None:
727 sources = {}
729 tag_file = apt_pkg.TagFile(filename)
730 get_field = tag_file.section.get
731 step = tag_file.step
733 while step():
734 if get_field("Extra-Source-Only", "no") == "yes":
735 # Ignore sources only referenced by Built-Using
736 continue
737 pkg = get_field("Package")
738 ver = get_field("Version")
739 # There may be multiple versions of the source package
740 # (in unstable) if some architectures have out-of-date
741 # binaries. We only ever consider the source with the
742 # largest version for migration.
743 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0:
744 continue
745 maint = get_field("Maintainer")
746 if maint: 746 ↛ 748line 746 didn't jump to line 748, because the condition on line 746 was never false
747 maint = intern(maint.strip())
748 section = get_field("Section")
749 if section: 749 ↛ 752line 749 didn't jump to line 752, because the condition on line 749 was never false
750 section = intern(section.strip())
751 build_deps_arch: Optional[str]
752 build_deps_arch = ", ".join(
753 x
754 for x in (get_field("Build-Depends"), get_field("Build-Depends-Arch"))
755 if x is not None
756 )
757 if build_deps_arch != "":
758 build_deps_arch = sys.intern(build_deps_arch)
759 else:
760 build_deps_arch = None
761 build_deps_indep = get_field("Build-Depends-Indep")
762 if build_deps_indep is not None:
763 build_deps_indep = sys.intern(build_deps_indep)
765 # Adding arch:all packages to the list of binaries already to be able
766 # to check for them later. Helps mitigate bug 887060 and is the
767 # (partial?) answer to bug 1064428.
768 binaries: set[BinaryPackageId] = set()
769 if add_faux and "all" in get_field("Architecture", "").split():
770 # the value "faux" in arch:faux is used elsewhere, so keep in sync
771 pkg_id = BinaryPackageId(pkg + "-faux", intern("0~~~~"), intern("faux"))
772 binaries.add(pkg_id)
774 sources[intern(pkg)] = SourcePackage(
775 intern(pkg),
776 intern(ver),
777 section,
778 binaries,
779 maint,
780 False,
781 build_deps_arch,
782 build_deps_indep,
783 get_field("Testsuite", "").split(),
784 get_field("Testsuite-Triggers", "").replace(",", "").split(),
785 )
786 return sources
789def _check_and_update_packages(
790 packages: list[BinaryPackage],
791 package: BinaryPackage,
792 archqual: Optional[str],
793 build_depends: bool,
794) -> None:
795 """Helper for get_dependency_solvers
797 This method updates the list of packages with a given package if that
798 package is a valid (Build-)Depends.
800 :param packages: which packages are to be updated
801 :param archqual: Architecture qualifier
802 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency.
803 """
805 # See also bug #971739 and #1059929
806 if archqual is None:
807 packages.append(package)
808 elif archqual == "native" and build_depends:
809 # Multi-arch handling for build-dependencies
810 # - :native is ok always
811 packages.append(package)
812 elif archqual == "any" and package.multi_arch == "allowed":
813 # Multi-arch handling for both build-dependencies and regular dependencies
814 # - :any is ok iff the target has "M-A: allowed"
815 packages.append(package)
818class GetDependencySolversProto(Protocol):
819 def __call__( 819 ↛ exitline 819 didn't jump to the function exit
820 self,
821 block: list[tuple[str, str, str]],
822 binaries_s_a: dict[str, BinaryPackage],
823 provides_s_a: dict[str, set[tuple[str, str]]],
824 *,
825 build_depends: bool = False,
826 empty_set: Any = frozenset(),
827 ) -> list[BinaryPackage]: ...
830def get_dependency_solvers(
831 block: list[tuple[str, str, str]],
832 binaries_s_a: dict[str, BinaryPackage],
833 provides_s_a: dict[str, set[tuple[str, str]]],
834 *,
835 build_depends: bool = False,
836 empty_set: Any = frozenset(),
837) -> list[BinaryPackage]:
838 """Find the packages which satisfy a dependency block
840 This method returns the list of packages which satisfy a dependency
841 block (as returned by apt_pkg.parse_depends) in a package table
842 for a given suite and architecture (a la self.binaries[suite][arch])
844 It can also handle build-dependency relations if the named parameter
845 "build_depends" is set to True. In this case, block should be based
846 on the return value from apt_pkg.parse_src_depends.
848 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends
849 if the "build_depends" is True)
850 :param binaries_s_a: Mapping of package names to the relevant BinaryPackage
851 :param provides_s_a: Mapping of package names to their providers (as generated by parse_provides)
852 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than
853 a regular dependency relation.
854 :param empty_set: Internal implementation detail / optimisation
855 :return: package names solving the relation
856 """
857 packages: list[BinaryPackage] = []
859 # for every package, version and operation in the block
860 for name, version, op in block:
861 if ":" in name:
862 name, archqual = name.split(":", 1)
863 else:
864 archqual = None
866 # look for the package in unstable
867 if name in binaries_s_a:
868 package = binaries_s_a[name]
869 # check the versioned dependency and architecture qualifier
870 # (if present)
871 if (op == "" and version == "") or apt_pkg.check_dep(
872 package.version, op, version
873 ):
874 _check_and_update_packages(packages, package, archqual, build_depends)
876 # look for the package in the virtual packages list and loop on them
877 for prov, prov_version in provides_s_a.get(name, empty_set):
878 assert prov in binaries_s_a
879 package = binaries_s_a[prov]
880 # See Policy Manual §7.5
881 if (op == "" and version == "") or (
882 prov_version != "" and apt_pkg.check_dep(prov_version, op, version)
883 ):
884 _check_and_update_packages(packages, package, archqual, build_depends)
886 return packages
889def invalidate_excuses(
890 excuses: dict[str, "Excuse"],
891 valid: set[str],
892 invalid: set[str],
893 invalidated: set[str],
894) -> None:
895 """Invalidate impossible excuses
897 This method invalidates the impossible excuses, which depend
898 on invalid excuses. The two parameters contains the sets of
899 `valid' and `invalid' excuses.
900 """
901 # make a list of all packages (source and binary) that are present in the
902 # excuses we have
903 excuses_packages: dict[Union[PackageId, BinaryPackageId], set[str]] = defaultdict(
904 set
905 )
906 for exc in excuses.values():
907 for arch in exc.packages:
908 for pkg_arch_id in exc.packages[arch]:
909 # note that the same package can be in multiple excuses
910 # eg. when unstable and TPU have the same packages
911 excuses_packages[pkg_arch_id].add(exc.name)
913 # create dependencies between excuses based on packages
914 excuses_rdeps = defaultdict(set)
915 for exc in excuses.values():
916 # Note that excuses_rdeps is only populated by dependencies generated
917 # based on packages below. There are currently no dependencies between
918 # excuses that are added directly, so this is ok.
920 for pkg_dep in exc.depends_packages:
921 # set of excuses, each of which can satisfy this specific
922 # dependency
923 # if there is a dependency on a package for which no
924 # excuses exist (e.g. a cruft binary), the set will
925 # contain an ImpossibleDependencyState
926 dep_exc: set[Union[str, DependencyState]] = set()
927 for pkg_dep_id in cast(set[BinaryPackageId], pkg_dep.deps):
928 pkg_excuses = excuses_packages[pkg_dep_id]
929 # if the dependency isn't found, we get an empty set
930 if pkg_excuses == frozenset():
931 imp_dep = ImpossibleDependencyState(
932 PolicyVerdict.REJECTED_PERMANENTLY, "%s" % (pkg_dep_id.name)
933 )
934 dep_exc.add(imp_dep)
936 else:
937 dep_exc |= pkg_excuses
938 for e in pkg_excuses:
939 excuses_rdeps[e].add(exc.name)
940 if not exc.add_dependency(dep_exc, pkg_dep.spec):
941 valid.discard(exc.name)
942 invalid.add(exc.name)
944 # loop on the invalid excuses
945 # Convert invalid to a list for deterministic results
946 invalid2 = sorted(invalid)
947 for ename in iter_except(invalid2.pop, IndexError):
948 invalidated.add(ename)
949 # if there is no reverse dependency, skip the item
950 if ename not in excuses_rdeps:
951 continue
953 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM
954 if excuses[ename].policy_verdict.is_blocked:
955 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM
957 # loop on the reverse dependencies
958 for x in sorted(excuses_rdeps[ename]):
959 exc = excuses[x]
960 # if the item is valid and it is not marked as `forced', then we
961 # invalidate this specific dependency
962 if x in valid and not exc.forced:
963 # mark this specific dependency as invalid
964 still_valid = exc.invalidate_dependency(ename, rdep_verdict)
966 # if there are no alternatives left for this dependency,
967 # invalidate the excuse
968 if not still_valid:
969 valid.discard(x)
970 invalid2.append(x)
973def compile_nuninst(
974 target_suite: TargetSuite, architectures: list[str], nobreakall_arches: list[str]
975) -> dict[str, set[str]]:
976 """Compile a nuninst dict from the current testing
978 :param target_suite: The target suite
979 :param architectures: Which architectures to check
980 :param nobreakall_arches: Which architectures where arch:all packages must be installable
981 """
982 nuninst: dict[str, set[str]] = {}
983 binaries_t = target_suite.binaries
985 # for all the architectures
986 for arch in architectures:
987 # if it is in the nobreakall ones, check arch-independent packages too
988 check_archall = arch in nobreakall_arches
990 # check all the packages for this architecture
991 nuninst[arch] = set()
992 packages_t_a = binaries_t[arch]
993 for pkg_name, pkg_data in packages_t_a.items():
994 r = target_suite.is_installable(pkg_data.pkg_id)
995 if not r:
996 nuninst[arch].add(pkg_name)
998 # if they are not required, remove architecture-independent packages
999 nuninst[arch + "+all"] = nuninst[arch].copy()
1000 if not check_archall:
1001 for pkg_name in nuninst[arch + "+all"]:
1002 pkg_data = packages_t_a[pkg_name]
1003 if pkg_data.architecture == "all":
1004 nuninst[arch].remove(pkg_name)
1006 return nuninst
1009def is_smooth_update_allowed(
1010 binary: BinaryPackage, smooth_updates: list[str], hints: "HintCollection"
1011) -> bool:
1012 if "ALL" in smooth_updates: 1012 ↛ 1013line 1012 didn't jump to line 1013, because the condition on line 1012 was never true
1013 return True
1014 section = binary.section.split("/")[-1]
1015 if section in smooth_updates:
1016 return True
1017 if hints.search(
1018 "allow-smooth-update", package=binary.source, version=binary.source_version
1019 ):
1020 # note that this needs to match the source version *IN TESTING*
1021 return True
1022 return False
1025def find_smooth_updateable_binaries(
1026 binaries_to_check: list[BinaryPackageId],
1027 source_data: SourcePackage,
1028 pkg_universe: "BinaryPackageUniverse",
1029 target_suite: TargetSuite,
1030 binaries_t: dict[str, dict[str, BinaryPackage]],
1031 binaries_s: dict[str, dict[str, BinaryPackage]],
1032 removals: Union[set[BinaryPackageId], frozenset[BinaryPackageId]],
1033 smooth_updates: list[str],
1034 hints: "HintCollection",
1035) -> set[BinaryPackageId]:
1036 check: set[BinaryPackageId] = set()
1037 smoothbins: set[BinaryPackageId] = set()
1039 for check_pkg_id in binaries_to_check:
1040 binary, _, parch = check_pkg_id
1042 cruftbins: set[BinaryPackageId] = set()
1044 # Not a candidate for smooth up date (newer non-cruft version in unstable)
1045 if binary in binaries_s[parch]:
1046 if binaries_s[parch][binary].source_version == source_data.version:
1047 continue
1048 cruftbins.add(binaries_s[parch][binary].pkg_id)
1050 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it.
1051 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints):
1052 # if the package has reverse-dependencies which are
1053 # built from other sources, it's a valid candidate for
1054 # a smooth update. if not, it may still be a valid
1055 # candidate if one if its r-deps is itself a candidate,
1056 # so note it for checking later
1057 rdeps = set(pkg_universe.reverse_dependencies_of(check_pkg_id))
1058 # We ignore all binaries listed in "removals" as we
1059 # assume they will leave at the same time as the
1060 # given package.
1061 rdeps.difference_update(removals, binaries_to_check)
1063 smooth_update_it = False
1064 if target_suite.any_of_these_are_in_the_suite(rdeps):
1065 combined = set(smoothbins)
1066 combined.add(check_pkg_id)
1067 for rdep in rdeps:
1068 # each dependency clause has a set of possible
1069 # alternatives that can satisfy that dependency.
1070 # if any of them is outside the set of smoothbins, the
1071 # dependency can be satisfied even if this binary was
1072 # removed, so there is no need to keep it around for a
1073 # smooth update
1074 # if not, only this binary can satisfy the dependency, so
1075 # we should keep it around until the rdep is no longer in
1076 # testing
1077 for dep_clause in pkg_universe.dependencies_of(rdep):
1078 # filter out cruft binaries from unstable, because
1079 # they will not be added to the set of packages that
1080 # will be migrated
1081 if dep_clause - cruftbins <= combined:
1082 smooth_update_it = True
1083 break
1085 if smooth_update_it:
1086 smoothbins = combined
1087 else:
1088 check.add(check_pkg_id)
1090 # check whether we should perform a smooth update for
1091 # packages which are candidates but do not have r-deps
1092 # outside of the current source
1093 while 1:
1094 found_any = False
1095 for candidate_pkg_id in check:
1096 rdeps = pkg_universe.reverse_dependencies_of(candidate_pkg_id)
1097 if not rdeps.isdisjoint(smoothbins):
1098 smoothbins.add(candidate_pkg_id)
1099 found_any = True
1100 if not found_any:
1101 break
1102 check = {x for x in check if x not in smoothbins}
1104 return smoothbins
1107def find_newer_binaries(
1108 suite_info: Suites, pkg: BinaryPackage, add_source_for_dropped_bin: bool = False
1109) -> list[tuple[PackageId, Suite]]:
1110 """
1111 Find newer binaries for pkg in any of the source suites.
1113 :param pkg: BinaryPackage (is assumed to be in the target suite)
1115 :param add_source_for_dropped_bin: If True, newer versions of the
1116 source of pkg will be added if they don't have the binary pkg
1118 :return: the newer binaries (or sources) and their suites
1119 """
1120 source = pkg.source
1121 newer_versions: list[tuple[PackageId, Suite]] = []
1122 for suite in suite_info:
1123 if suite.suite_class == SuiteClass.TARGET_SUITE:
1124 continue
1126 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture)
1127 if not suite_binaries_on_arch: 1127 ↛ 1128line 1127 didn't jump to line 1128, because the condition on line 1127 was never true
1128 continue
1130 newerbin = None
1131 if pkg.pkg_id.package_name in suite_binaries_on_arch:
1132 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name]
1133 if suite.is_cruft(newerbin):
1134 # We pretend the cruft binary doesn't exist.
1135 # We handle this as if the source didn't have the binary
1136 # (see below)
1137 newerbin = None
1138 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0:
1139 continue
1140 else:
1141 if source not in suite.sources:
1142 # bin and source not in suite: no newer version
1143 continue
1145 if not newerbin:
1146 if not add_source_for_dropped_bin: 1146 ↛ 1147line 1146 didn't jump to line 1147, because the condition on line 1146 was never true
1147 continue
1148 # We only get here if there is a newer version of the source,
1149 # which doesn't have the binary anymore (either it doesn't
1150 # exist, or it's cruft and we pretend it doesn't exist).
1151 # Add the new source instead.
1152 nsrc = suite.sources[source]
1153 n_id = PackageId(source, nsrc.version, "source")
1154 overs = pkg.source_version
1155 if apt_pkg.version_compare(nsrc.version, overs) <= 0:
1156 continue
1157 else:
1158 n_id = newerbin.pkg_id
1160 newer_versions.append((n_id, suite))
1162 return newer_versions
1165def parse_provides(
1166 provides_raw: str,
1167 pkg_id: Optional[BinaryPackageId] = None,
1168 logger: Optional[logging.Logger] = None,
1169) -> list[tuple[str, str, str]]:
1170 parts = apt_pkg.parse_depends(provides_raw, False)
1171 nprov = []
1172 for or_clause in parts:
1173 if len(or_clause) != 1: # pragma: no cover
1174 if logger is not None:
1175 msg = "Ignoring invalid provides in %s: Alternatives [%s]"
1176 logger.warning(msg, str(pkg_id), str(or_clause))
1177 continue
1178 for part in or_clause:
1179 provided, provided_version, op = part
1180 if op != "" and op != "=": # pragma: no cover
1181 if logger is not None:
1182 msg = "Ignoring invalid provides in %s: %s (%s %s)"
1183 logger.warning(msg, str(pkg_id), provided, op, provided_version)
1184 continue
1185 provided = sys.intern(provided)
1186 provided_version = sys.intern(provided_version)
1187 part = (provided, provided_version, sys.intern(op))
1188 nprov.append(part)
1189 return nprov
1192def parse_builtusing(
1193 builtusing_raw: str,
1194 pkg_id: Optional[BinaryPackageId] = None,
1195 logger: Optional[logging.Logger] = None,
1196) -> list[tuple[str, str]]:
1197 parts = apt_pkg.parse_depends(builtusing_raw, False)
1198 nbu = []
1199 for or_clause in parts:
1200 if len(or_clause) != 1: # pragma: no cover
1201 if logger is not None:
1202 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]"
1203 logger.warning(msg, str(pkg_id), str(or_clause))
1204 continue
1205 for part in or_clause:
1206 bu, bu_version, op = part
1207 if op != "=": # pragma: no cover
1208 if logger is not None:
1209 msg = "Ignoring invalid builtusing in %s: %s (%s %s)"
1210 logger.warning(msg, str(pkg_id), bu, op, bu_version)
1211 continue
1212 bu = sys.intern(bu)
1213 bu_version = sys.intern(bu_version)
1214 nbu.append((bu, bu_version))
1215 return nbu
1218def parse_option(
1219 options: "optparse.Values",
1220 option_name: str,
1221 default: Optional[Any] = None,
1222 to_bool: bool = False,
1223 to_int: bool = False,
1224 day_to_sec: bool = False,
1225) -> None:
1226 """Ensure the option exist and has a sane value
1228 :param options: dict with options
1230 :param option_name: string with the name of the option
1232 :param default: the default value for the option
1234 :param to_int: convert the input to int (defaults to sys.maxsize)
1236 :param to_bool: convert the input to bool
1238 :param day_to_sec: convert the input from days to seconds (implies to_int=True)
1239 """
1240 value = getattr(options, option_name, default)
1242 # Option was provided with no value (or default is '') so pick up the default
1243 if value == "":
1244 value = default
1246 if (to_int or day_to_sec) and value in (None, ""):
1247 value = sys.maxsize
1249 if day_to_sec:
1250 value = int(float(value) * 24 * 60 * 60) # type: ignore[arg-type]
1252 if to_int:
1253 value = int(value) # type: ignore[arg-type]
1255 if to_bool:
1256 if value and (
1257 isinstance(value, bool) or value.lower() in ("yes", "y", "true", "t", "1")
1258 ):
1259 value = True
1260 else:
1261 value = False
1263 setattr(options, option_name, value)