Coverage for britney2/britney.py: 80%
768 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
1#!/usr/bin/python3 -u
3# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org>
4# Andreas Barth <aba@debian.org>
5# Fabio Tranchitella <kobold@debian.org>
6# Copyright (C) 2010-2013 Adam D. Barratt <adsb@debian.org>
8# This program is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
18"""
19= Introduction =
21This is the Debian testing updater script, also known as "Britney".
23Packages are usually installed into the `testing' distribution after
24they have undergone some degree of testing in unstable. The goal of
25this software is to do this task in a smart way, allowing testing
26to always be fully installable and close to being a release candidate.
28Britney's source code is split between two different but related tasks:
29the first one is the generation of the update excuses, while the
30second tries to update testing with the valid candidates; first
31each package alone, then larger and even larger sets of packages
32together. Each try is accepted if testing is not more uninstallable
33after the update than before.
35= Data Loading =
37In order to analyze the entire Debian distribution, Britney needs to
38load in memory the whole archive: this means more than 10.000 packages
39for twelve architectures, as well as the dependency interconnections
40between them. For this reason, the memory requirements for running this
41software are quite high and at least 1 gigabyte of RAM should be available.
43Britney loads the source packages from the `Sources' file and the binary
44packages from the `Packages_${arch}' files, where ${arch} is substituted
45with the supported architectures. While loading the data, the software
46analyzes the dependencies and builds a directed weighted graph in memory
47with all the interconnections between the packages (see Britney.read_sources
48and Britney.read_binaries).
50Other than source and binary packages, Britney loads the following data:
52 * rc-bugs-*, which contains the list of release-critical bugs for a given
53 version of a source or binary package (see RCBugPolicy.read_bugs).
55 * age-policy-dates, which contains the date of the upload of a given version
56 of a source package (see Britney.read_dates).
58 * age-policy-urgencies, which contains the urgency of the upload of a given
59 version of a source package (see AgePolicy._read_urgencies).
61 * Hints, which contains lists of commands which modify the standard behaviour
62 of Britney (see Britney.read_hints).
64 * Other policies typically require their own data.
66For a more detailed explanation about the format of these files, please read
67the documentation of the related methods. The exact meaning of them will be
68instead explained in the chapter "Excuses Generation".
70= Excuses =
72An excuse is a detailed explanation of why a package can or cannot
73be updated in the testing distribution from a newer package in
74another distribution (like for example unstable). The main purpose
75of the excuses is to be written in an HTML file which will be
76published over HTTP, as well as a YAML file. The maintainers will be able
77to parse it manually or automatically to find the explanation of why their
78packages have been updated or not.
80== Excuses generation ==
82These are the steps (with references to method names) that Britney
83does for the generation of the update excuses.
85 * If a source package is available in testing but it is not
86 present in unstable and no binary packages in unstable are
87 built from it, then it is marked for removal.
89 * Every source package in unstable and testing-proposed-updates,
90 if already present in testing, is checked for binary-NMUs, new
91 or dropped binary packages in all the supported architectures
92 (see Britney.should_upgrade_srcarch). The steps to detect if an
93 upgrade is needed are:
95 1. If there is a `remove' hint for the source package, the package
96 is ignored: it will be removed and not updated.
98 2. For every binary package built from the new source, it checks
99 for unsatisfied dependencies, new binary packages and updated
100 binary packages (binNMU), excluding the architecture-independent
101 ones, and packages not built from the same source.
103 3. For every binary package built from the old source, it checks
104 if it is still built from the new source; if this is not true
105 and the package is not architecture-independent, the script
106 removes it from testing.
108 4. Finally, if there is something worth doing (eg. a new or updated
109 binary package) and nothing wrong it marks the source package
110 as "Valid candidate", or "Not considered" if there is something
111 wrong which prevented the update.
113 * Every source package in unstable and testing-proposed-updates is
114 checked for upgrade (see Britney.should_upgrade_src). The steps
115 to detect if an upgrade is needed are:
117 1. If the source package in testing is more recent the new one
118 is ignored.
120 2. If the source package doesn't exist (is fake), which means that
121 a binary package refers to it but it is not present in the
122 `Sources' file, the new one is ignored.
124 3. If the package doesn't exist in testing, the urgency of the
125 upload is ignored and set to the default (actually `low').
127 4. If there is a `remove' hint for the source package, the package
128 is ignored: it will be removed and not updated.
130 5. If there is a `block' hint for the source package without an
131 `unblock` hint or a `block-all source`, the package is ignored.
133 6. If there is a `block-udeb' hint for the source package, it will
134 have the same effect as `block', but may only be cancelled by
135 a subsequent `unblock-udeb' hint.
137 7. If the suite is unstable, the update can go ahead only if the
138 upload happened more than the minimum days specified by the
139 urgency of the upload; if this is not true, the package is
140 ignored as `too-young'. Note that the urgency is sticky, meaning
141 that the highest urgency uploaded since the previous testing
142 transition is taken into account.
144 8. If the suite is unstable, all the architecture-dependent binary
145 packages and the architecture-independent ones for the `nobreakall'
146 architectures have to be built from the source we are considering.
147 If this is not true, then these are called `out-of-date'
148 architectures and the package is ignored.
150 9. The source package must have at least one binary package, otherwise
151 it is ignored.
153 10. If the suite is unstable, the new source package must have no
154 release critical bugs which do not also apply to the testing
155 one. If this is not true, the package is ignored as `buggy'.
157 11. If there is a `force' hint for the source package, then it is
158 updated even if it is marked as ignored from the previous steps.
160 12. If the suite is {testing-,}proposed-updates, the source package can
161 be updated only if there is an explicit approval for it. Unless
162 a `force' hint exists, the new package must also be available
163 on all of the architectures for which it has binary packages in
164 testing.
166 13. If the package will be ignored, mark it as "Valid candidate",
167 otherwise mark it as "Not considered".
169 * The list of `remove' hints is processed: if the requested source
170 package is not already being updated or removed and the version
171 actually in testing is the same specified with the `remove' hint,
172 it is marked for removal.
174 * The excuses are sorted by the number of days from the last upload
175 (days-old) and by name.
177 * A list of unconsidered excuses (for which the package is not upgraded)
178 is built. Using this list, all of the excuses depending on them are
179 marked as invalid "impossible dependencies".
181 * The excuses are written in an HTML file.
182"""
183import contextlib
184import logging
185import optparse
186import os
187import sys
188import time
189from collections import defaultdict
190from collections.abc import Iterator
191from functools import reduce
192from itertools import chain
193from operator import attrgetter
194from typing import TYPE_CHECKING, Any, Optional, cast
196import apt_pkg
198from britney2 import BinaryPackage, BinaryPackageId, SourcePackage, Suites
199from britney2.excusefinder import ExcuseFinder
200from britney2.hints import Hint, HintCollection, HintParser
201from britney2.inputs.suiteloader import (
202 DebMirrorLikeSuiteContentLoader,
203 MissingRequiredConfigurationError,
204)
205from britney2.installability.builder import build_installability_tester
206from britney2.installability.solver import InstallabilitySolver
207from britney2.migration import MigrationManager
208from britney2.migrationitem import MigrationItem, MigrationItemFactory
209from britney2.policies.autopkgtest import AutopkgtestPolicy
210from britney2.policies.lintian import LintianPolicy
211from britney2.policies.policy import (
212 AgePolicy,
213 BlockPolicy,
214 BuildDependsPolicy,
215 BuiltOnBuilddPolicy,
216 BuiltUsingPolicy,
217 DependsPolicy,
218 ImplicitDependencyPolicy,
219 PiupartsPolicy,
220 PolicyEngine,
221 PolicyLoadRequest,
222 RCBugPolicy,
223 ReproduciblePolicy,
224 ReverseRemovalPolicy,
225)
226from britney2.utils import (
227 MigrationConstraintException,
228 clone_nuninst,
229 compile_nuninst,
230 format_and_log_uninst,
231 is_nuninst_asgood_generous,
232 log_and_format_old_libraries,
233 newly_uninst,
234 old_libraries,
235 parse_option,
236 parse_provides,
237 read_nuninst,
238 write_excuses,
239 write_heidi,
240 write_heidi_delta,
241 write_nuninst,
242)
244if TYPE_CHECKING: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true
245 from .excuse import Excuse
246 from .installability.tester import InstallabilityTester
247 from .installability.universe import BinaryPackageUniverse
248 from .transaction import MigrationTransactionState
251__author__ = "Fabio Tranchitella and the Debian Release Team"
252__version__ = "2.0"
255MIGRATION_POLICIES = [
256 PolicyLoadRequest.always_load(DependsPolicy),
257 PolicyLoadRequest.conditionally_load(RCBugPolicy, "rcbug_enable", True),
258 PolicyLoadRequest.conditionally_load(PiupartsPolicy, "piuparts_enable", True),
259 PolicyLoadRequest.always_load(ImplicitDependencyPolicy),
260 PolicyLoadRequest.conditionally_load(AutopkgtestPolicy, "adt_enable", True),
261 PolicyLoadRequest.conditionally_load(LintianPolicy, "lintian_enable", False),
262 PolicyLoadRequest.conditionally_load(ReproduciblePolicy, "repro_enable", False),
263 PolicyLoadRequest.conditionally_load(AgePolicy, "age_enable", True),
264 PolicyLoadRequest.always_load(BuildDependsPolicy),
265 PolicyLoadRequest.always_load(BlockPolicy),
266 PolicyLoadRequest.conditionally_load(
267 BuiltUsingPolicy, "built_using_policy_enable", True
268 ),
269 PolicyLoadRequest.conditionally_load(BuiltOnBuilddPolicy, "check_buildd", False),
270 PolicyLoadRequest.always_load(ReverseRemovalPolicy),
271]
274class Britney:
275 """Britney, the Debian testing updater script
277 This is the script that updates the testing distribution. It is executed
278 each day after the installation of the updated packages. It generates the
279 `Packages' files for the testing distribution, but it does so in an
280 intelligent manner; it tries to avoid any inconsistency and to use only
281 non-buggy packages.
283 For more documentation on this script, please read the Developers Reference.
284 """
286 HINTS_HELPERS = (
287 "easy",
288 "hint",
289 "remove",
290 "block",
291 "block-udeb",
292 "unblock",
293 "unblock-udeb",
294 "approve",
295 "remark",
296 "ignore-piuparts",
297 "ignore-rc-bugs",
298 "force-skiptest",
299 "force-badtest",
300 )
301 HINTS_STANDARD = ("urgent", "age-days") + HINTS_HELPERS
302 # ALL = {"force", "force-hint", "block-all"} | HINTS_STANDARD | registered policy hints (not covered above)
303 HINTS_ALL = "ALL"
304 pkg_universe: "BinaryPackageUniverse"
305 _inst_tester: "InstallabilityTester"
306 constraints: dict[str, list[str]]
307 suite_info: Suites
309 def __init__(self) -> None:
310 """Class constructor
312 This method initializes and populates the data lists, which contain all
313 the information needed by the other methods of the class.
314 """
316 # setup logging - provide the "short level name" (i.e. INFO -> I) that
317 # we used to use prior to using the logging module.
319 old_factory = logging.getLogRecordFactory()
320 short_level_mapping = {
321 "CRITICAL": "F",
322 "INFO": "I",
323 "WARNING": "W",
324 "ERROR": "E",
325 "DEBUG": "N",
326 }
328 def record_factory(
329 *args: Any, **kwargs: Any
330 ) -> logging.LogRecord: # pragma: no cover
331 record = old_factory(*args, **kwargs)
332 try:
333 record.shortlevelname = short_level_mapping[record.levelname]
334 except KeyError:
335 record.shortlevelname = record.levelname
336 return record
338 logging.setLogRecordFactory(record_factory)
339 logging.basicConfig(
340 format="{shortlevelname}: [{asctime}] - {message}",
341 style="{",
342 datefmt="%Y-%m-%dT%H:%M:%S%z",
343 stream=sys.stdout,
344 )
346 self.logger = logging.getLogger()
348 # Logger for "upgrade_output"; the file handler will be attached later when
349 # we are ready to open the file.
350 self.output_logger = logging.getLogger("britney2.output.upgrade_output")
351 self.output_logger.setLevel(logging.INFO)
353 # initialize the apt_pkg back-end
354 apt_pkg.init()
356 # parse the command line arguments
357 self._policy_engine = PolicyEngine()
358 self.__parse_arguments()
359 assert self.suite_info is not None # for type checking
361 self.all_selected: list[MigrationItem] = []
362 self.excuses: dict[str, "Excuse"] = {}
363 self.upgrade_me: list[MigrationItem] = []
365 if self.options.nuninst_cache: 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true
366 self.logger.info(
367 "Not building the list of non-installable packages, as requested"
368 )
369 if self.options.print_uninst:
370 nuninst = read_nuninst(
371 self.options.noninst_status, self.options.architectures
372 )
373 print("* summary")
374 print(
375 "\n".join(
376 "%4d %s" % (len(nuninst[x]), x)
377 for x in self.options.architectures
378 )
379 )
380 return
382 try:
383 constraints_file = os.path.join(
384 self.options.static_input_dir, "constraints"
385 )
386 faux_packages = os.path.join(self.options.static_input_dir, "faux-packages")
387 except AttributeError:
388 self.logger.info("The static_input_dir option is not set")
389 constraints_file = None
390 faux_packages = None
391 if faux_packages is not None and os.path.exists(faux_packages):
392 self.logger.info("Loading faux packages from %s", faux_packages)
393 self._load_faux_packages(faux_packages)
394 elif faux_packages is not None: 394 ↛ 397line 394 didn't jump to line 397 because the condition on line 394 was always true
395 self.logger.info("No Faux packages as %s does not exist", faux_packages)
397 if constraints_file is not None and os.path.exists(constraints_file):
398 self.logger.info("Loading constraints from %s", constraints_file)
399 self.constraints = self._load_constraints(constraints_file)
400 else:
401 if constraints_file is not None: 401 ↛ 405line 401 didn't jump to line 405
402 self.logger.info(
403 "No constraints as %s does not exist", constraints_file
404 )
405 self.constraints = {
406 "keep-installable": [],
407 }
409 self.logger.info("Compiling Installability tester")
410 self.pkg_universe, self._inst_tester = build_installability_tester(
411 self.suite_info, self.options.architectures
412 )
413 target_suite = self.suite_info.target_suite
414 target_suite.inst_tester = self._inst_tester
416 self.allow_uninst: dict[str, set[str | None]] = {}
417 for arch in self.options.architectures:
418 self.allow_uninst[arch] = set()
419 self._migration_item_factory: MigrationItemFactory = MigrationItemFactory(
420 self.suite_info
421 )
422 self._hint_parser: HintParser = HintParser(self._migration_item_factory)
423 self._migration_manager: MigrationManager = MigrationManager(
424 self.options,
425 self.suite_info,
426 self.all_binaries,
427 self.pkg_universe,
428 self.constraints,
429 self.allow_uninst,
430 self._migration_item_factory,
431 self.hints,
432 )
434 if not self.options.nuninst_cache: 434 ↛ 474line 434 didn't jump to line 474 because the condition on line 434 was always true
435 self.logger.info(
436 "Building the list of non-installable packages for the full archive"
437 )
438 self._inst_tester.compute_installability()
439 nuninst = compile_nuninst(
440 target_suite, self.options.architectures, self.options.nobreakall_arches
441 )
442 self.nuninst_orig: dict[str, set[str]] = nuninst
443 for arch in self.options.architectures:
444 self.logger.info(
445 "> Found %d non-installable packages for %s",
446 len(nuninst[arch]),
447 arch,
448 )
449 if self.options.print_uninst: 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true
450 self.nuninst_arch_report(nuninst, arch)
452 if self.options.print_uninst: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 print("* summary")
454 print(
455 "\n".join(
456 map(
457 lambda x: "%4d %s" % (len(nuninst[x]), x),
458 self.options.architectures,
459 )
460 )
461 )
462 return
463 else:
464 write_nuninst(self.options.noninst_status, nuninst)
466 stats = self._inst_tester.compute_stats()
467 self.logger.info("> Installability tester statistics (per architecture)")
468 for arch in self.options.architectures:
469 arch_stat = stats[arch]
470 self.logger.info("> %s", arch)
471 for stat in arch_stat.stat_summary():
472 self.logger.info("> - %s", stat)
473 else:
474 self.logger.info("Loading uninstallability counters from cache")
475 self.nuninst_orig = read_nuninst(
476 self.options.noninst_status, self.options.architectures
477 )
479 # nuninst_orig may get updated during the upgrade process
480 self.nuninst_orig_save: dict[str, set[str]] = clone_nuninst(
481 self.nuninst_orig, architectures=self.options.architectures
482 )
484 self._policy_engine.register_policy_hints(self._hint_parser)
486 try:
487 self.read_hints(self.options.hintsdir)
488 except AttributeError:
489 self.read_hints(os.path.join(self.suite_info["unstable"].path, "Hints"))
491 self._policy_engine.initialise(self, self.hints)
493 def __parse_arguments(self) -> None:
494 """Parse the command line arguments
496 This method parses and initializes the command line arguments.
497 While doing so, it preprocesses some of the options to be converted
498 in a suitable form for the other methods of the class.
499 """
500 # initialize the parser
501 parser = optparse.OptionParser(version="%prog")
502 parser.add_option(
503 "-v", "", action="count", dest="verbose", help="enable verbose output"
504 )
505 parser.add_option(
506 "-c",
507 "--config",
508 action="store",
509 dest="config",
510 default="/etc/britney.conf",
511 help="path for the configuration file",
512 )
513 parser.add_option(
514 "",
515 "--architectures",
516 action="store",
517 dest="architectures",
518 default=None,
519 help="override architectures from configuration file",
520 )
521 parser.add_option(
522 "",
523 "--actions",
524 action="store",
525 dest="actions",
526 default=None,
527 help="override the list of actions to be performed",
528 )
529 parser.add_option(
530 "",
531 "--hints",
532 action="store",
533 dest="hints",
534 default=None,
535 help="additional hints, separated by semicolons",
536 )
537 parser.add_option(
538 "",
539 "--hint-tester",
540 action="store_true",
541 dest="hint_tester",
542 default=None,
543 help="provide a command line interface to test hints",
544 )
545 parser.add_option(
546 "",
547 "--dry-run",
548 action="store_true",
549 dest="dry_run",
550 default=False,
551 help="disable all outputs to the testing directory",
552 )
553 parser.add_option(
554 "",
555 "--nuninst-cache",
556 action="store_true",
557 dest="nuninst_cache",
558 default=False,
559 help="do not build the non-installability status, use the cache from file",
560 )
561 parser.add_option(
562 "",
563 "--print-uninst",
564 action="store_true",
565 dest="print_uninst",
566 default=False,
567 help="just print a summary of uninstallable packages",
568 )
569 parser.add_option(
570 "",
571 "--compute-migrations",
572 action="store_true",
573 dest="compute_migrations",
574 default=True,
575 help="Compute which packages can migrate (the default)",
576 )
577 parser.add_option(
578 "",
579 "--no-compute-migrations",
580 action="store_false",
581 dest="compute_migrations",
582 help="Do not compute which packages can migrate.",
583 )
584 parser.add_option(
585 "",
586 "--series",
587 action="store",
588 dest="series",
589 default="",
590 help="set distribution series name",
591 )
592 parser.add_option(
593 "",
594 "--distribution",
595 action="store",
596 dest="distribution",
597 default="debian",
598 help="set distribution name",
599 )
600 (self.options, self.args) = parser.parse_args()
602 if self.options.verbose: 602 ↛ 608line 602 didn't jump to line 608 because the condition on line 602 was always true
603 if self.options.verbose > 1: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 self.logger.setLevel(logging.DEBUG)
605 else:
606 self.logger.setLevel(logging.INFO)
607 else:
608 self.logger.setLevel(logging.WARNING)
609 # Historical way to get debug information (equivalent to -vv)
610 try: # pragma: no cover
611 if int(os.environ.get("BRITNEY_DEBUG", "0")):
612 self.logger.setLevel(logging.DEBUG)
613 except ValueError: # pragma: no cover
614 pass
616 # integrity checks
617 if self.options.nuninst_cache and self.options.print_uninst: # pragma: no cover
618 self.logger.error("nuninst_cache and print_uninst are mutually exclusive!")
619 sys.exit(1)
621 # if the configuration file exists, then read it and set the additional options
622 if not os.path.isfile(self.options.config): # pragma: no cover
623 self.logger.error(
624 "Unable to read the configuration file (%s), exiting!",
625 self.options.config,
626 )
627 sys.exit(1)
629 self.HINTS: dict[str, Any] = {"command-line": self.HINTS_ALL}
630 with open(self.options.config, encoding="utf-8") as config:
631 for line in config:
632 if "=" in line and not line.strip().startswith("#"):
633 k, v = line.split("=", 1)
634 k = k.strip()
635 v = v.strip()
636 if k.startswith("HINTS_"):
637 self.HINTS[k.split("_")[1].lower()] = reduce( 637 ↛ exitline 637 didn't jump to the function exit
638 lambda x, y: x + y,
639 [
640 hasattr(self, "HINTS_" + i)
641 and getattr(self, "HINTS_" + i)
642 or (i,)
643 for i in v.split()
644 ],
645 )
646 elif not hasattr(self.options, k.lower()) or not getattr(
647 self.options, k.lower()
648 ):
649 setattr(self.options, k.lower(), v)
651 parse_option(self.options, "archall_inconsistency_allowed", to_bool=True)
653 suite_loader = DebMirrorLikeSuiteContentLoader(self.options)
655 try:
656 self.suite_info = suite_loader.load_suites()
657 except MissingRequiredConfigurationError as e: # pragma: no cover
658 self.logger.error(
659 "Could not load the suite content due to missing configuration: %s",
660 str(e),
661 )
662 sys.exit(1)
663 self.all_binaries = suite_loader.all_binaries()
664 self.options.components = suite_loader.components
665 self.options.architectures = suite_loader.architectures
666 self.options.nobreakall_arches = suite_loader.nobreakall_arches
667 self.options.outofsync_arches = suite_loader.outofsync_arches
668 self.options.break_arches = suite_loader.break_arches
669 self.options.new_arches = suite_loader.new_arches
670 if self.options.series == "": 670 ↛ 673line 670 didn't jump to line 673 because the condition on line 670 was always true
671 self.options.series = self.suite_info.target_suite.name
673 if self.options.heidi_output and not hasattr( 673 ↛ 678line 673 didn't jump to line 678 because the condition on line 673 was always true
674 self.options, "heidi_delta_output"
675 ):
676 self.options.heidi_delta_output = self.options.heidi_output + "Delta"
678 self.options.smooth_updates = self.options.smooth_updates.split()
680 parse_option(self.options, "ignore_cruft", to_bool=True)
681 parse_option(self.options, "check_consistency_level", default=2, to_int=True)
682 parse_option(self.options, "build_url")
684 self._policy_engine.load_policies(
685 self.options, self.suite_info, MIGRATION_POLICIES
686 )
688 @property
689 def hints(self) -> HintCollection:
690 return self._hint_parser.hints
692 def _load_faux_packages(self, faux_packages_file: str) -> None:
693 """Loads fake packages
695 In rare cases, it is useful to create a "fake" package that can be used to satisfy
696 dependencies. This is usually needed for packages that are not shipped directly
697 on this mirror but is a prerequisite for using this mirror (e.g. some vendors provide
698 non-distributable "setup" packages and contrib/non-free packages depend on these).
700 :param faux_packages_file: Path to the file containing the fake package definitions
701 """
702 tag_file = apt_pkg.TagFile(faux_packages_file)
703 get_field = tag_file.section.get
704 step = tag_file.step
705 no = 0
706 pri_source_suite = self.suite_info.primary_source_suite
707 target_suite = self.suite_info.target_suite
709 while step():
710 no += 1
711 pkg_name = get_field("Package", None)
712 if pkg_name is None: # pragma: no cover
713 raise ValueError(
714 "Missing Package field in paragraph %d (file %s)"
715 % (no, faux_packages_file)
716 )
717 pkg_name = sys.intern(pkg_name)
718 version = sys.intern(get_field("Version", "1.0-1"))
719 provides_raw = get_field("Provides")
720 archs_raw = get_field("Architecture", None)
721 component = get_field("Component", "non-free")
722 if archs_raw: 722 ↛ 723line 722 didn't jump to line 723 because the condition on line 722 was never true
723 archs = archs_raw.split()
724 else:
725 archs = self.options.architectures
726 faux_section = "faux"
727 if component != "main": 727 ↛ 729line 727 didn't jump to line 729 because the condition on line 727 was always true
728 faux_section = "%s/faux" % component
729 src_data = SourcePackage(
730 pkg_name,
731 version,
732 sys.intern(faux_section),
733 set(),
734 None,
735 True,
736 None,
737 None,
738 [],
739 [],
740 )
742 target_suite.sources[pkg_name] = src_data
743 pri_source_suite.sources[pkg_name] = src_data
745 for arch in archs:
746 pkg_id = BinaryPackageId(pkg_name, version, arch)
747 if provides_raw: 747 ↛ 748line 747 didn't jump to line 748 because the condition on line 747 was never true
748 provides = parse_provides(
749 provides_raw, pkg_id=pkg_id, logger=self.logger
750 )
751 else:
752 provides = []
753 bin_data = BinaryPackage(
754 version,
755 faux_section,
756 pkg_name,
757 version,
758 arch,
759 get_field("Multi-Arch"),
760 None,
761 None,
762 provides,
763 False,
764 pkg_id,
765 [],
766 )
768 src_data.binaries.add(pkg_id)
769 target_suite.binaries[arch][pkg_name] = bin_data
770 pri_source_suite.binaries[arch][pkg_name] = bin_data
772 # register provided packages with the target suite provides table
773 for provided_pkg, provided_version, _ in bin_data.provides: 773 ↛ 774line 773 didn't jump to line 774 because the loop on line 773 never started
774 target_suite.provides_table[arch][provided_pkg].add(
775 (pkg_name, provided_version)
776 )
778 self.all_binaries[pkg_id] = bin_data
780 def _load_constraints(self, constraints_file: str) -> dict[str, list[str]]:
781 """Loads configurable constraints
783 The constraints file can contain extra rules that Britney should attempt
784 to satisfy. Examples can be "keep package X in testing and ensure it is
785 installable".
787 :param constraints_file: Path to the file containing the constraints
788 """
789 tag_file = apt_pkg.TagFile(constraints_file)
790 get_field = tag_file.section.get
791 step = tag_file.step
792 no = 0
793 faux_version = sys.intern("1")
794 faux_section = sys.intern("faux")
795 keep_installable: list[str] = []
796 constraints = {"keep-installable": keep_installable}
797 pri_source_suite = self.suite_info.primary_source_suite
798 target_suite = self.suite_info.target_suite
800 while step():
801 no += 1
802 pkg_name = get_field("Fake-Package-Name", None)
803 if pkg_name is None: # pragma: no cover
804 raise ValueError(
805 "Missing Fake-Package-Name field in paragraph %d (file %s)"
806 % (no, constraints_file)
807 )
808 pkg_name = sys.intern(pkg_name)
810 def mandatory_field(x: str) -> str:
811 v: str = get_field(x, None)
812 if v is None: # pragma: no cover
813 raise ValueError(
814 "Missing %s field for %s (file %s)"
815 % (x, pkg_name, constraints_file)
816 )
817 return v
819 constraint = mandatory_field("Constraint")
820 if constraint not in {"present-and-installable"}: # pragma: no cover
821 raise ValueError(
822 "Unsupported constraint %s for %s (file %s)"
823 % (constraint, pkg_name, constraints_file)
824 )
826 self.logger.info(" - constraint %s", pkg_name)
828 pkg_list = [
829 x.strip()
830 for x in mandatory_field("Package-List").split("\n")
831 if x.strip() != "" and not x.strip().startswith("#")
832 ]
833 src_data = SourcePackage(
834 pkg_name,
835 faux_version,
836 faux_section,
837 set(),
838 None,
839 True,
840 None,
841 None,
842 [],
843 [],
844 )
845 target_suite.sources[pkg_name] = src_data
846 pri_source_suite.sources[pkg_name] = src_data
847 keep_installable.append(pkg_name)
848 for arch in self.options.architectures:
849 deps = []
850 for pkg_spec in pkg_list:
851 s = pkg_spec.split(None, 1)
852 if len(s) == 1:
853 deps.append(s[0])
854 else:
855 pkg, arch_res = s
856 if not (
857 arch_res.startswith("[") and arch_res.endswith("]")
858 ): # pragma: no cover
859 raise ValueError(
860 "Invalid arch-restriction on %s - should be [arch1 arch2] (for %s file %s)"
861 % (pkg, pkg_name, constraints_file)
862 )
863 arch_res_l = arch_res[1:-1].split()
864 if not arch_res_l: # pragma: no cover
865 msg = "Empty arch-restriction for %s: Uses comma or negation (for %s file %s)"
866 raise ValueError(msg % (pkg, pkg_name, constraints_file))
867 for a in arch_res_l:
868 if a == arch:
869 deps.append(pkg)
870 elif "," in a or "!" in a: # pragma: no cover
871 msg = "Invalid arch-restriction for %s: Uses comma or negation (for %s file %s)"
872 raise ValueError(
873 msg % (pkg, pkg_name, constraints_file)
874 )
875 pkg_id = BinaryPackageId(pkg_name, faux_version, arch)
876 bin_data = BinaryPackage(
877 faux_version,
878 faux_section,
879 pkg_name,
880 faux_version,
881 arch,
882 "no",
883 ", ".join(deps),
884 None,
885 [],
886 False,
887 pkg_id,
888 [],
889 )
890 src_data.binaries.add(pkg_id)
891 target_suite.binaries[arch][pkg_name] = bin_data
892 pri_source_suite.binaries[arch][pkg_name] = bin_data
893 self.all_binaries[pkg_id] = bin_data
895 return constraints
897 # Data reading/writing methods
898 # ----------------------------
900 def read_hints(self, hintsdir: str) -> None:
901 """Read the hint commands from the specified directory
903 The hint commands are read from the files contained in the directory
904 specified by the `hintsdir' parameter.
905 The names of the files have to be the same as the authorized users
906 for the hints.
908 The file contains rows with the format:
910 <command> <package-name>[/<version>]
912 The method returns a dictionary where the key is the command, and
913 the value is the list of affected packages.
914 """
916 for who in self.HINTS.keys():
917 if who == "command-line":
918 lines = self.options.hints and self.options.hints.split(";") or ()
919 filename = "<cmd-line>"
920 self._hint_parser.parse_hints(who, self.HINTS[who], filename, lines)
921 else:
922 filename = os.path.join(hintsdir, who)
923 if not os.path.isfile(filename): 923 ↛ 924line 923 didn't jump to line 924 because the condition on line 923 was never true
924 self.logger.error(
925 "Cannot read hints list from %s, no such file!", filename
926 )
927 continue
928 self.logger.info("Loading hints list from %s", filename)
929 with open(filename, encoding="utf-8") as f:
930 self._hint_parser.parse_hints(who, self.HINTS[who], filename, f)
932 hints = self._hint_parser.hints
934 for x in [
935 "block",
936 "block-all",
937 "block-udeb",
938 "unblock",
939 "unblock-udeb",
940 "force",
941 "urgent",
942 "remove",
943 "age-days",
944 ]:
945 z: dict[str | None, dict[str | None, tuple[Hint, str]]] = defaultdict(dict)
946 for hint in hints[x]:
947 package = hint.package
948 architecture = hint.architecture
949 key = (hint, hint.user)
950 if ( 950 ↛ 955line 950 didn't jump to line 955
951 package in z
952 and architecture in z[package]
953 and z[package][architecture] != key
954 ):
955 hint2 = z[package][architecture][0]
956 if x in ["unblock", "unblock-udeb"]:
957 assert hint.version is not None
958 assert hint2.version is not None
959 if apt_pkg.version_compare(hint2.version, hint.version) < 0:
960 # This hint is for a newer version, so discard the old one
961 self.logger.warning(
962 "Overriding %s[%s] = ('%s', '%s', '%s') with ('%s', '%s', '%s')",
963 x,
964 package,
965 hint2.version,
966 hint2.architecture,
967 hint2.user,
968 hint.version,
969 hint.architecture,
970 hint.user,
971 )
972 hint2.set_active(False)
973 else:
974 # This hint is for an older version, so ignore it in favour of the new one
975 self.logger.warning(
976 "Ignoring %s[%s] = ('%s', '%s', '%s'), ('%s', '%s', '%s') is higher or equal",
977 x,
978 package,
979 hint.version,
980 hint.architecture,
981 hint.user,
982 hint2.version,
983 hint2.architecture,
984 hint2.user,
985 )
986 hint.set_active(False)
987 else:
988 self.logger.warning(
989 "Overriding %s[%s] = ('%s', '%s') with ('%s', '%s')",
990 x,
991 package,
992 hint2.user,
993 hint2,
994 hint.user,
995 hint,
996 )
997 hint2.set_active(False)
999 z[package][architecture] = key
1001 for hint in hints["allow-uninst"]:
1002 if hint.architecture == "source":
1003 for arch in self.options.architectures:
1004 self.allow_uninst[arch].add(hint.package)
1005 else:
1006 assert hint.architecture is not None
1007 self.allow_uninst[hint.architecture].add(hint.package)
1009 # Sanity check the hints hash
1010 if len(hints["block"]) == 0 and len(hints["block-udeb"]) == 0: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true
1011 self.logger.warning("WARNING: No block hints at all, not even udeb ones!")
1013 def write_excuses(self) -> None:
1014 """Produce and write the update excuses
1016 This method handles the update excuses generation: the packages are
1017 looked at to determine whether they are valid candidates. For the details
1018 of this procedure, please refer to the module docstring.
1019 """
1021 self.logger.info("Update Excuses generation started")
1023 mi_factory = self._migration_item_factory
1024 excusefinder = ExcuseFinder(
1025 self.options,
1026 self.suite_info,
1027 self.all_binaries,
1028 self.pkg_universe,
1029 self._policy_engine,
1030 mi_factory,
1031 self.hints,
1032 )
1034 excuses, upgrade_me = excusefinder.find_actionable_excuses()
1035 self.excuses = excuses
1037 # sort the list of candidates
1038 self.upgrade_me = sorted(upgrade_me)
1039 old_lib_removals = old_libraries(
1040 mi_factory, self.suite_info, self.options.outofsync_arches
1041 )
1042 self.upgrade_me.extend(old_lib_removals)
1043 self.output_logger.info(
1044 "List of old libraries added to upgrade_me (%d):", len(old_lib_removals)
1045 )
1046 log_and_format_old_libraries(self.output_logger, old_lib_removals)
1048 # write excuses to the output file
1049 if not self.options.dry_run: 1049 ↛ 1062line 1049 didn't jump to line 1062 because the condition on line 1049 was always true
1050 self.logger.info("> Writing Excuses to %s", self.options.excuses_output)
1051 write_excuses(
1052 excuses, self.options.excuses_output, output_format="legacy-html"
1053 )
1054 if hasattr(self.options, "excuses_yaml_output"): 1054 ↛ 1062line 1054 didn't jump to line 1062 because the condition on line 1054 was always true
1055 self.logger.info(
1056 "> Writing YAML Excuses to %s", self.options.excuses_yaml_output
1057 )
1058 write_excuses(
1059 excuses, self.options.excuses_yaml_output, output_format="yaml"
1060 )
1062 self.logger.info("Update Excuses generation completed")
1064 # Upgrade run
1065 # -----------
1067 def eval_nuninst(
1068 self,
1069 nuninst: dict[str, set[str]],
1070 original: dict[str, set[str]] | None = None,
1071 ) -> str:
1072 """Return a string which represents the uninstallability counters
1074 This method returns a string which represents the uninstallability
1075 counters reading the uninstallability statistics `nuninst` and, if
1076 present, merging the results with the `original` one.
1078 An example of the output string is:
1079 1+2: i-0:a-0:a-0:h-0:i-1:m-0:m-0:p-0:a-0:m-0:s-2:s-0
1081 where the first part is the number of broken packages in non-break
1082 architectures + the total number of broken packages for all the
1083 architectures.
1084 """
1085 res = []
1086 total = 0
1087 totalbreak = 0
1088 for arch in self.options.architectures:
1089 if arch in nuninst: 1089 ↛ 1091line 1089 didn't jump to line 1091 because the condition on line 1089 was always true
1090 n = len(nuninst[arch])
1091 elif original and arch in original:
1092 n = len(original[arch])
1093 else:
1094 continue
1095 if arch in self.options.break_arches:
1096 totalbreak = totalbreak + n
1097 else:
1098 total = total + n
1099 res.append("%s-%d" % (arch[0], n))
1100 return "%d+%d: %s" % (total, totalbreak, ":".join(res))
1102 def iter_packages(
1103 self,
1104 packages: list[MigrationItem],
1105 selected: list[MigrationItem],
1106 nuninst: dict[str, set[str]] | None = None,
1107 ) -> tuple[dict[str, set[str]] | None, list[MigrationItem]]:
1108 """Iter on the list of actions and apply them one-by-one
1110 This method applies the changes from `packages` to testing, checking the uninstallability
1111 counters for every action performed. If the action does not improve them, it is reverted.
1112 The method returns the new uninstallability counters and the remaining actions if the
1113 final result is successful, otherwise (None, []).
1115 :param selected: list of MigrationItem?
1116 :param nuninst: dict with sets ? of ? per architecture
1117 """
1118 assert self.suite_info is not None # for type checking
1119 group_info = {}
1120 rescheduled_packages = packages
1121 maybe_rescheduled_packages: list[MigrationItem] = []
1122 output_logger = self.output_logger
1123 solver = InstallabilitySolver(self.pkg_universe, self._inst_tester)
1124 mm = self._migration_manager
1125 target_suite = self.suite_info.target_suite
1127 for y in sorted((y for y in packages), key=attrgetter("uvname")):
1128 try:
1129 _, updates, rms, _ = mm.compute_groups(y)
1130 result = (y, sorted(updates), sorted(rms))
1131 group_info[y] = result
1132 except MigrationConstraintException as e:
1133 rescheduled_packages.remove(y)
1134 output_logger.info("not adding package to list: %s", (y.package))
1135 output_logger.info(" got exception: %s" % (repr(e)))
1137 if nuninst:
1138 nuninst_orig = nuninst
1139 else:
1140 nuninst_orig = self.nuninst_orig
1142 nuninst_last_accepted = nuninst_orig
1144 output_logger.info(
1145 "recur: [] %s %d/0", ",".join(x.uvname for x in selected), len(packages)
1146 )
1147 while rescheduled_packages:
1148 groups = [group_info[x] for x in rescheduled_packages]
1149 worklist = solver.solve_groups(groups)
1150 rescheduled_packages = []
1152 worklist.reverse()
1154 while worklist:
1155 comp = worklist.pop()
1156 comp_name = " ".join(item.uvname for item in comp)
1157 output_logger.info("trying: %s" % comp_name)
1158 with mm.start_transaction() as transaction:
1159 accepted = False
1160 try:
1161 (
1162 accepted,
1163 nuninst_after,
1164 failed_arch,
1165 new_cruft,
1166 ) = mm.migrate_items_to_target_suite(
1167 comp, nuninst_last_accepted
1168 )
1169 if accepted:
1170 selected.extend(comp)
1171 transaction.commit()
1172 output_logger.info("accepted: %s", comp_name)
1173 output_logger.info(
1174 " ori: %s", self.eval_nuninst(nuninst_orig)
1175 )
1176 output_logger.info(
1177 " pre: %s", self.eval_nuninst(nuninst_last_accepted)
1178 )
1179 output_logger.info(
1180 " now: %s", self.eval_nuninst(nuninst_after)
1181 )
1182 if new_cruft:
1183 output_logger.info(
1184 " added new cruft items to list: %s",
1185 " ".join(x.uvname for x in sorted(new_cruft)),
1186 )
1188 if len(selected) <= 20:
1189 output_logger.info(
1190 " all: %s", " ".join(x.uvname for x in selected)
1191 )
1192 else:
1193 output_logger.info(
1194 " most: (%d) .. %s",
1195 len(selected),
1196 " ".join(x.uvname for x in selected[-20:]),
1197 )
1198 if self.options.check_consistency_level >= 3: 1198 ↛ 1199line 1198 didn't jump to line 1199 because the condition on line 1198 was never true
1199 target_suite.check_suite_source_pkg_consistency(
1200 "iter_packages after commit"
1201 )
1202 nuninst_last_accepted = nuninst_after
1203 for cruft_item in new_cruft:
1204 try:
1205 _, updates, rms, _ = mm.compute_groups(cruft_item)
1206 result = (cruft_item, sorted(updates), sorted(rms))
1207 group_info[cruft_item] = result
1208 worklist.append([cruft_item])
1209 except MigrationConstraintException as e:
1210 output_logger.info(
1211 " got exception adding cruft item %s to list: %s"
1212 % (cruft_item.uvname, repr(e))
1213 )
1214 rescheduled_packages.extend(maybe_rescheduled_packages)
1215 maybe_rescheduled_packages.clear()
1216 else:
1217 transaction.rollback()
1218 assert failed_arch # type checking
1219 broken = sorted(
1220 b
1221 for b in nuninst_after[failed_arch]
1222 if b not in nuninst_last_accepted[failed_arch]
1223 )
1224 compare_nuninst = None
1225 if any(
1226 item for item in comp if item.architecture != "source"
1227 ):
1228 compare_nuninst = nuninst_last_accepted
1229 # NB: try_migration already reverted this for us, so just print the results and move on
1230 output_logger.info(
1231 "skipped: %s (%d, %d, %d)",
1232 comp_name,
1233 len(rescheduled_packages),
1234 len(maybe_rescheduled_packages),
1235 len(worklist),
1236 )
1237 output_logger.info(
1238 " got: %s",
1239 self.eval_nuninst(nuninst_after, compare_nuninst),
1240 )
1241 output_logger.info(
1242 " * %s: %s", failed_arch, ", ".join(broken)
1243 )
1244 if self.options.check_consistency_level >= 3: 1244 ↛ 1245line 1244 didn't jump to line 1245 because the condition on line 1244 was never true
1245 target_suite.check_suite_source_pkg_consistency(
1246 "iter_package after rollback (not accepted)"
1247 )
1249 except MigrationConstraintException as e:
1250 transaction.rollback()
1251 output_logger.info(
1252 "skipped: %s (%d, %d, %d)",
1253 comp_name,
1254 len(rescheduled_packages),
1255 len(maybe_rescheduled_packages),
1256 len(worklist),
1257 )
1258 output_logger.info(" got exception: %s" % (repr(e)))
1259 if self.options.check_consistency_level >= 3: 1259 ↛ 1260line 1259 didn't jump to line 1260 because the condition on line 1259 was never true
1260 target_suite.check_suite_source_pkg_consistency(
1261 "iter_package after rollback (MigrationConstraintException)"
1262 )
1264 if not accepted:
1265 if len(comp) > 1:
1266 output_logger.info(
1267 " - splitting the component into single items and retrying them"
1268 )
1269 worklist.extend([item] for item in comp)
1270 else:
1271 maybe_rescheduled_packages.append(comp[0])
1273 output_logger.info(" finish: [%s]", ",".join(x.uvname for x in selected))
1274 output_logger.info("endloop: %s", self.eval_nuninst(self.nuninst_orig))
1275 output_logger.info(" now: %s", self.eval_nuninst(nuninst_last_accepted))
1276 format_and_log_uninst(
1277 output_logger,
1278 self.options.architectures,
1279 newly_uninst(self.nuninst_orig, nuninst_last_accepted),
1280 )
1281 output_logger.info("")
1283 return (nuninst_last_accepted, maybe_rescheduled_packages)
1285 def do_all(
1286 self,
1287 hinttype: str | None = None,
1288 init: list[MigrationItem] | None = None,
1289 actions: list[MigrationItem] | None = None,
1290 ) -> None:
1291 """Testing update runner
1293 This method tries to update testing checking the uninstallability
1294 counters before and after the actions to decide if the update was
1295 successful or not.
1296 """
1297 selected = []
1298 if actions:
1299 upgrade_me = actions[:]
1300 else:
1301 upgrade_me = self.upgrade_me[:]
1302 nuninst_start = self.nuninst_orig
1303 output_logger = self.output_logger
1304 target_suite = self.suite_info.target_suite
1306 # these are special parameters for hints processing
1307 force = False
1308 recurse = True
1309 nuninst_end = None
1310 extra: list[MigrationItem] = []
1311 mm = self._migration_manager
1313 if hinttype == "easy" or hinttype == "force-hint":
1314 force = hinttype == "force-hint"
1315 recurse = False
1317 # if we have a list of initial packages, check them
1318 if init:
1319 for x in init:
1320 if x not in upgrade_me:
1321 output_logger.warning(
1322 "failed: %s is not a valid candidate (or it already migrated)",
1323 x.uvname,
1324 )
1325 return None
1326 selected.append(x)
1327 upgrade_me.remove(x)
1329 output_logger.info("start: %s", self.eval_nuninst(nuninst_start))
1330 output_logger.info("orig: %s", self.eval_nuninst(nuninst_start))
1332 if not (init and not force):
1333 # No "outer" transaction needed as we will never need to rollback
1334 # (e.g. "force-hint" or a regular "main run"). Emulate the start_transaction
1335 # call from the MigrationManager, so the rest of the code follows the
1336 # same flow regardless of whether we need the transaction or not.
1338 @contextlib.contextmanager
1339 def _start_transaction() -> Iterator[Optional["MigrationTransactionState"]]:
1340 yield None
1342 else:
1343 # We will need to be able to roll back (e.g. easy or a "hint"-hint)
1344 _start_transaction = mm.start_transaction
1346 with _start_transaction() as transaction:
1347 if init:
1348 # init => a hint (e.g. "easy") - so do the hint run
1349 (_, nuninst_end, _, new_cruft) = mm.migrate_items_to_target_suite(
1350 selected, self.nuninst_orig, stop_on_first_regression=False
1351 )
1353 if recurse: 1353 ↛ 1356line 1353 didn't jump to line 1356 because the condition on line 1353 was never true
1354 # Ensure upgrade_me and selected do not overlap, if we
1355 # follow-up with a recurse ("hint"-hint).
1356 upgrade_me = [x for x in upgrade_me if x not in set(selected)]
1357 else:
1358 # On non-recursive hints check for cruft and purge it proactively in case it "fixes" the hint.
1359 cruft = [x for x in upgrade_me if x.is_cruft_removal]
1360 if new_cruft:
1361 output_logger.info(
1362 "Change added new cruft items to list: %s",
1363 " ".join(x.uvname for x in sorted(new_cruft)),
1364 )
1365 cruft.extend(new_cruft)
1366 if cruft:
1367 output_logger.info("Checking if changes enables cruft removal")
1368 (nuninst_end, remaining_cruft) = self.iter_packages(
1369 cruft, selected, nuninst=nuninst_end
1370 )
1371 output_logger.info(
1372 "Removed %d of %d cruft item(s) after the changes",
1373 len(cruft) - len(remaining_cruft),
1374 len(cruft),
1375 )
1376 new_cruft.difference_update(remaining_cruft)
1378 # Add new cruft items regardless of whether we recurse. A future run might clean
1379 # them for us.
1380 upgrade_me.extend(new_cruft)
1382 if recurse:
1383 # Either the main run or the recursive run of a "hint"-hint.
1384 (nuninst_end, extra) = self.iter_packages(
1385 upgrade_me, selected, nuninst=nuninst_end
1386 )
1388 assert nuninst_end is not None
1389 nuninst_end_str = self.eval_nuninst(nuninst_end)
1391 if not recurse:
1392 # easy or force-hint
1393 output_logger.info("easy: %s", nuninst_end_str)
1395 if not force:
1396 format_and_log_uninst(
1397 self.output_logger,
1398 self.options.architectures,
1399 newly_uninst(nuninst_start, nuninst_end),
1400 )
1402 if force:
1403 # Force implies "unconditionally better"
1404 better = True
1405 else:
1406 break_arches: set[str] = set(self.options.break_arches)
1407 if all(x.architecture in break_arches for x in selected):
1408 # If we only migrated items from break-arches, then we
1409 # do not allow any regressions on these architectures.
1410 # This usually only happens with hints
1411 break_arches = set()
1412 better = is_nuninst_asgood_generous(
1413 self.constraints,
1414 self.allow_uninst,
1415 self.options.architectures,
1416 self.nuninst_orig,
1417 nuninst_end,
1418 break_arches,
1419 )
1421 if better: 1421 ↛ 1460line 1421 didn't jump to line 1460 because the condition on line 1421 was always true
1422 # Result accepted either by force or by being better than the original result.
1423 output_logger.info(
1424 "final: %s", ",".join(sorted(x.uvname for x in selected))
1425 )
1426 output_logger.info("start: %s", self.eval_nuninst(nuninst_start))
1427 output_logger.info(" orig: %s", self.eval_nuninst(self.nuninst_orig))
1428 output_logger.info(" end: %s", nuninst_end_str)
1429 if force:
1430 broken = newly_uninst(nuninst_start, nuninst_end)
1431 if broken:
1432 output_logger.warning("force breaks:")
1433 format_and_log_uninst(
1434 self.output_logger,
1435 self.options.architectures,
1436 broken,
1437 loglevel=logging.WARNING,
1438 )
1439 else:
1440 output_logger.info("force did not break any packages")
1441 output_logger.info(
1442 "SUCCESS (%d/%d)", len(actions or self.upgrade_me), len(extra)
1443 )
1444 self.nuninst_orig = nuninst_end
1445 self.all_selected += selected
1446 if transaction:
1447 transaction.commit()
1448 if self.options.check_consistency_level >= 2: 1448 ↛ 1452line 1448 didn't jump to line 1452 because the condition on line 1448 was always true
1449 target_suite.check_suite_source_pkg_consistency(
1450 "do_all after commit"
1451 )
1452 if not actions:
1453 if recurse:
1454 self.upgrade_me = extra
1455 else:
1456 self.upgrade_me = [
1457 x for x in self.upgrade_me if x not in set(selected)
1458 ]
1459 else:
1460 output_logger.info("FAILED\n")
1461 if not transaction:
1462 # if we 'FAILED', but we cannot rollback, we will probably
1463 # leave a broken state behind
1464 # this should not happen
1465 raise AssertionError("do_all FAILED but no transaction to rollback")
1466 transaction.rollback()
1467 if self.options.check_consistency_level >= 2:
1468 target_suite.check_suite_source_pkg_consistency(
1469 "do_all after rollback"
1470 )
1472 output_logger.info("")
1474 def assert_nuninst_is_correct(self) -> None:
1475 self.logger.info("> Update complete - Verifying non-installability counters")
1477 cached_nuninst = self.nuninst_orig
1478 self._inst_tester.compute_installability()
1479 computed_nuninst = compile_nuninst(
1480 self.suite_info.target_suite,
1481 self.options.architectures,
1482 self.options.nobreakall_arches,
1483 )
1484 if cached_nuninst != computed_nuninst: # pragma: no cover
1485 only_on_break_archs = True
1486 msg_l = [
1487 "==================== NUNINST OUT OF SYNC ========================="
1488 ]
1489 for arch in self.options.architectures:
1490 expected_nuninst = set(cached_nuninst[arch])
1491 actual_nuninst = set(computed_nuninst[arch])
1492 false_negatives = actual_nuninst - expected_nuninst
1493 false_positives = expected_nuninst - actual_nuninst
1494 # Britney does not quite work correctly with
1495 # break/fucked arches, so ignore issues there for now.
1496 if (
1497 false_negatives or false_positives
1498 ) and arch not in self.options.break_arches:
1499 only_on_break_archs = False
1500 if false_negatives:
1501 msg_l.append(f" {arch} - unnoticed nuninst: {str(false_negatives)}")
1502 if false_positives:
1503 msg_l.append(f" {arch} - invalid nuninst: {str(false_positives)}")
1504 if false_negatives or false_positives:
1505 msg_l.append(
1506 f" {arch} - actual nuninst: {str(sorted(actual_nuninst))}"
1507 )
1508 msg_l.append(msg_l[0])
1509 for msg in msg_l:
1510 if only_on_break_archs:
1511 self.logger.warning(msg)
1512 else:
1513 self.logger.error(msg)
1514 if not only_on_break_archs:
1515 raise AssertionError("NUNINST OUT OF SYNC")
1516 else:
1517 self.logger.warning("Nuninst is out of sync on some break arches")
1519 self.logger.info("> All non-installability counters are ok")
1521 def upgrade_testing(self) -> None:
1522 """Upgrade testing using the packages from the source suites
1524 This method tries to upgrade testing using the packages from the
1525 source suites.
1526 Before running the do_all method, it tries the easy and force-hint
1527 commands.
1528 """
1530 output_logger = self.output_logger
1531 self.logger.info("Starting the upgrade test")
1532 output_logger.info(
1533 "Generated on: %s",
1534 time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())),
1535 )
1536 output_logger.info("Arch order is: %s", ", ".join(self.options.architectures))
1538 if not self.options.actions: 1538 ↛ 1549line 1538 didn't jump to line 1549 because the condition on line 1538 was always true
1539 # process `easy' hints
1540 for x in self.hints["easy"]:
1541 self.do_hint("easy", x.user, x.packages)
1543 # process `force-hint' hints
1544 for x in self.hints["force-hint"]:
1545 self.do_hint("force-hint", x.user, x.packages)
1547 # run the first round of the upgrade
1548 # - do separate runs for break arches
1549 allpackages = []
1550 normpackages = self.upgrade_me[:]
1551 archpackages = {}
1552 for a in self.options.break_arches:
1553 archpackages[a] = [p for p in normpackages if p.architecture == a]
1554 normpackages = [p for p in normpackages if p not in archpackages[a]]
1555 self.upgrade_me = normpackages
1556 output_logger.info("info: main run")
1557 self.do_all()
1558 allpackages += self.upgrade_me
1559 for a in self.options.break_arches:
1560 backup = self.options.break_arches
1561 self.options.break_arches = " ".join(
1562 x for x in self.options.break_arches if x != a
1563 )
1564 self.upgrade_me = archpackages[a]
1565 output_logger.info("info: broken arch run for %s", a)
1566 self.do_all()
1567 allpackages += self.upgrade_me
1568 self.options.break_arches = backup
1569 self.upgrade_me = allpackages
1571 if self.options.actions: 1571 ↛ 1572line 1571 didn't jump to line 1572 because the condition on line 1571 was never true
1572 self.printuninstchange()
1573 return
1575 # process `hint' hints
1576 hintcnt = 0
1577 for x in self.hints["hint"][:50]:
1578 if hintcnt > 50: 1578 ↛ 1579line 1578 didn't jump to line 1579 because the condition on line 1578 was never true
1579 output_logger.info("Skipping remaining hints...")
1580 break
1581 if self.do_hint("hint", x.user, x.packages): 1581 ↛ 1577line 1581 didn't jump to line 1577 because the condition on line 1581 was always true
1582 hintcnt += 1
1584 # run the auto hinter
1585 self.run_auto_hinter()
1587 if getattr(self.options, "remove_obsolete", "yes") == "yes":
1588 # obsolete source packages
1589 # a package is obsolete if none of the binary packages in testing
1590 # are built by it
1591 self.logger.info(
1592 "> Removing obsolete source packages from the target suite"
1593 )
1594 # local copies for performance
1595 target_suite = self.suite_info.target_suite
1596 sources_t = target_suite.sources
1597 binaries_t = target_suite.binaries
1598 mi_factory = self._migration_item_factory
1599 used = {
1600 binaries_t[arch][binary].source
1601 for arch in binaries_t
1602 for binary in binaries_t[arch]
1603 }
1604 removals = [
1605 mi_factory.parse_item(
1606 f"-{source}/{sources_t[source].version}", auto_correct=False
1607 )
1608 for source in sources_t
1609 if source not in used
1610 ]
1611 if removals:
1612 output_logger.info(
1613 "Removing obsolete source packages from the target suite (%d):",
1614 len(removals),
1615 )
1616 self.do_all(actions=removals)
1618 # smooth updates
1619 removals = old_libraries(
1620 self._migration_item_factory, self.suite_info, self.options.outofsync_arches
1621 )
1622 if removals:
1623 output_logger.info(
1624 "Removing packages left in the target suite (e.g. smooth updates or cruft)"
1625 )
1626 log_and_format_old_libraries(self.output_logger, removals)
1627 self.do_all(actions=removals)
1628 removals = old_libraries(
1629 self._migration_item_factory,
1630 self.suite_info,
1631 self.options.outofsync_arches,
1632 )
1634 output_logger.info(
1635 "List of old libraries in the target suite (%d):", len(removals)
1636 )
1637 log_and_format_old_libraries(self.output_logger, removals)
1639 self.printuninstchange()
1640 if self.options.check_consistency_level >= 1: 1640 ↛ 1646line 1640 didn't jump to line 1646 because the condition on line 1640 was always true
1641 target_suite = self.suite_info.target_suite
1642 self.assert_nuninst_is_correct()
1643 target_suite.check_suite_source_pkg_consistency("end")
1645 # output files
1646 if self.options.heidi_output and not self.options.dry_run: 1646 ↛ 1660line 1646 didn't jump to line 1660 because the condition on line 1646 was always true
1647 target_suite = self.suite_info.target_suite
1649 # write HeidiResult
1650 self.logger.info("Writing Heidi results to %s", self.options.heidi_output)
1651 write_heidi(
1652 self.options.heidi_output,
1653 target_suite,
1654 outofsync_arches=self.options.outofsync_arches,
1655 )
1657 self.logger.info("Writing delta to %s", self.options.heidi_delta_output)
1658 write_heidi_delta(self.options.heidi_delta_output, self.all_selected)
1660 self.logger.info("Test completed!")
1662 def printuninstchange(self) -> None:
1663 self.logger.info("Checking for newly uninstallable packages")
1664 uninst = newly_uninst(self.nuninst_orig_save, self.nuninst_orig)
1666 if uninst:
1667 self.output_logger.info("")
1668 self.output_logger.info(
1669 "Newly uninstallable packages in the target suite (arch:all on BREAKALL_ARCHES not shown)"
1670 )
1671 format_and_log_uninst(
1672 self.output_logger,
1673 self.options.architectures,
1674 uninst,
1675 loglevel=logging.WARNING,
1676 )
1678 def hint_tester(self) -> None:
1679 """Run a command line interface to test hints
1681 This method provides a command line interface for the release team to
1682 try hints and evaluate the results.
1683 """
1684 import readline
1686 from britney2.completer import Completer
1688 histfile = os.path.expanduser("~/.britney2_history")
1689 if os.path.exists(histfile):
1690 readline.read_history_file(histfile)
1692 readline.parse_and_bind("tab: complete")
1693 readline.set_completer(Completer(self).completer)
1694 # Package names can contain "-" and we use "/" in our presentation of them as well,
1695 # so ensure readline does not split on these characters.
1696 readline.set_completer_delims(
1697 readline.get_completer_delims().replace("-", "").replace("/", "")
1698 )
1700 known_hints = self._hint_parser.registered_hint_names
1702 print("Britney hint tester")
1703 print()
1704 print(
1705 "Besides inputting known britney hints, the follow commands are also available"
1706 )
1707 print(" * quit/exit - terminates the shell")
1708 print(
1709 " * python-console - jump into an interactive python shell (with the current loaded dataset)"
1710 )
1711 print()
1713 while True:
1714 # read the command from the command line
1715 try:
1716 user_input = input("britney> ").split()
1717 except EOFError:
1718 print("")
1719 break
1720 except KeyboardInterrupt:
1721 print("")
1722 continue
1723 # quit the hint tester
1724 if user_input and user_input[0] in ("quit", "exit"):
1725 break
1726 elif user_input and user_input[0] == "python-console":
1727 try:
1728 import britney2.console
1729 except ImportError as e:
1730 print("Failed to import britney.console module: %s" % repr(e))
1731 continue
1732 britney2.console.run_python_console(self)
1733 print("Returning to the britney hint-tester console")
1734 # run a hint
1735 elif user_input and user_input[0] in ("easy", "hint", "force-hint"):
1736 mi_factory = self._migration_item_factory
1737 try:
1738 self.do_hint(
1739 user_input[0],
1740 "hint-tester",
1741 mi_factory.parse_items(user_input[1:]),
1742 )
1743 self.printuninstchange()
1744 except KeyboardInterrupt:
1745 continue
1746 elif user_input and user_input[0] in known_hints:
1747 self._hint_parser.parse_hints(
1748 "hint-tester", self.HINTS_ALL, "<stdin>", [" ".join(user_input)]
1749 )
1750 self.write_excuses()
1752 try:
1753 readline.write_history_file(histfile)
1754 except OSError as e:
1755 self.logger.warning("Could not write %s: %s", histfile, e)
1757 def do_hint(self, hinttype: str, who: str, pkgvers: list[MigrationItem]) -> bool:
1758 """Process hints
1760 This method process `easy`, `hint` and `force-hint` hints. If the
1761 requested version is not in the relevant source suite, then the hint
1762 is skipped.
1763 """
1765 output_logger = self.output_logger
1767 self.logger.info("> Processing '%s' hint from %s", hinttype, who)
1768 output_logger.info(
1769 "Trying %s from %s: %s",
1770 hinttype,
1771 who,
1772 " ".join(f"{x.uvname}/{x.version}" for x in pkgvers),
1773 )
1775 issues = []
1776 # loop on the requested packages and versions
1777 for idx in range(len(pkgvers)):
1778 pkg = pkgvers[idx]
1779 # skip removal requests
1780 if pkg.is_removal:
1781 continue
1783 suite = pkg.suite
1785 assert pkg.version is not None
1786 if pkg.package not in suite.sources: 1786 ↛ 1787line 1786 didn't jump to line 1787 because the condition on line 1786 was never true
1787 issues.append(f"Source {pkg.package} has no version in {suite.name}")
1788 elif ( 1788 ↛ 1792line 1788 didn't jump to line 1792
1789 apt_pkg.version_compare(suite.sources[pkg.package].version, pkg.version)
1790 != 0
1791 ):
1792 issues.append(
1793 "Version mismatch, %s %s != %s"
1794 % (pkg.package, pkg.version, suite.sources[pkg.package].version)
1795 )
1796 if issues: 1796 ↛ 1797line 1796 didn't jump to line 1797 because the condition on line 1796 was never true
1797 output_logger.warning("%s: Not using hint", ", ".join(issues))
1798 return False
1800 self.do_all(hinttype, pkgvers)
1801 return True
1803 def get_auto_hinter_hints(
1804 self, upgrade_me: list[MigrationItem]
1805 ) -> list[list[frozenset[MigrationItem]]]:
1806 """Auto-generate "easy" hints.
1808 This method attempts to generate "easy" hints for sets of packages which
1809 must migrate together. Beginning with a package which does not depend on
1810 any other package (in terms of excuses), a list of dependencies and
1811 reverse dependencies is recursively created.
1813 Once all such lists have been generated, any which are subsets of other
1814 lists are ignored in favour of the larger lists. The remaining lists are
1815 then attempted in turn as "easy" hints.
1817 We also try to auto hint circular dependencies analyzing the update
1818 excuses relationships. If they build a circular dependency, which we already
1819 know as not-working with the standard do_all algorithm, try to `easy` them.
1820 """
1821 self.logger.info("> Processing hints from the auto hinter")
1823 sources_t = self.suite_info.target_suite.sources
1824 excuses = self.excuses
1826 def excuse_still_valid(excuse: "Excuse") -> bool:
1827 source = excuse.source
1828 assert isinstance(excuse.item, MigrationItem)
1829 arch = excuse.item.architecture
1830 # TODO for binNMUs, this check is always ok, even if the item
1831 # migrated already
1832 valid = (
1833 arch != "source"
1834 or source not in sources_t
1835 or sources_t[source].version != excuse.ver[1]
1836 )
1837 # TODO migrated items should be removed from upgrade_me, so this
1838 # should not happen
1839 if not valid: 1839 ↛ 1840line 1839 didn't jump to line 1840 because the condition on line 1839 was never true
1840 raise AssertionError("excuse no longer valid %s" % (item))
1841 return valid
1843 # consider only excuses which are valid candidates and still relevant.
1844 valid_excuses = frozenset(
1845 e.name
1846 for n, e in excuses.items()
1847 if e.item in upgrade_me and excuse_still_valid(e)
1848 )
1849 excuses_deps = {
1850 name: valid_excuses.intersection(excuse.get_deps())
1851 for name, excuse in excuses.items()
1852 if name in valid_excuses
1853 }
1854 excuses_rdeps = defaultdict(set)
1855 for name, deps in excuses_deps.items():
1856 for dep in deps:
1857 excuses_rdeps[dep].add(name)
1859 # loop on them
1860 candidates = []
1861 mincands = []
1862 seen_hints = set()
1863 for e in valid_excuses:
1864 excuse = excuses[e]
1865 if not excuse.get_deps():
1866 assert isinstance(excuse.item, MigrationItem)
1867 items = [excuse.item]
1868 orig_size = 1
1869 looped = False
1870 seen_items = set()
1871 seen_items.update(items)
1873 for item in items:
1874 assert isinstance(item, MigrationItem)
1875 # excuses which depend on "item" or are depended on by it
1876 new_items = cast(
1877 set[MigrationItem],
1878 {
1879 excuses[x].item
1880 for x in chain(
1881 excuses_deps[item.name], excuses_rdeps[item.name]
1882 )
1883 },
1884 )
1885 new_items -= seen_items
1886 items.extend(new_items)
1887 seen_items.update(new_items)
1889 if not looped and len(items) > 1:
1890 orig_size = len(items)
1891 h = frozenset(seen_items)
1892 if h not in seen_hints: 1892 ↛ 1895line 1892 didn't jump to line 1895 because the condition on line 1892 was always true
1893 mincands.append(h)
1894 seen_hints.add(h)
1895 looped = True
1896 if len(items) != orig_size: 1896 ↛ 1897line 1896 didn't jump to line 1897 because the condition on line 1896 was never true
1897 h = frozenset(seen_items)
1898 if h != mincands[-1] and h not in seen_hints:
1899 candidates.append(h)
1900 seen_hints.add(h)
1901 return [candidates, mincands]
1903 def run_auto_hinter(self) -> None:
1904 for lst in self.get_auto_hinter_hints(self.upgrade_me):
1905 for hint in lst:
1906 self.do_hint("easy", "autohinter", sorted(hint))
1908 def nuninst_arch_report(self, nuninst: dict[str, set[str]], arch: str) -> None:
1909 """Print a report of uninstallable packages for one architecture."""
1910 all = defaultdict(set)
1911 binaries_t = self.suite_info.target_suite.binaries
1912 for p in nuninst[arch]:
1913 pkg = binaries_t[arch][p]
1914 all[(pkg.source, pkg.source_version)].add(p)
1916 print("* %s" % arch)
1918 for (src, ver), pkgs in sorted(all.items()):
1919 print(" {} ({}): {}".format(src, ver, " ".join(sorted(pkgs))))
1921 print()
1923 def _remove_archall_faux_packages(self) -> None:
1924 """Remove faux packages added for the excuses phase
1926 To prevent binary packages from going missing while they are listed by
1927 their source package we add bin:faux packages during reading in the
1928 Sources. They are used during the excuses phase to prevent packages
1929 from becoming candidates. However, they interfere in complex ways
1930 during the installability phase, so instead of having all code during
1931 migration be aware of this excuses phase implementation detail, let's
1932 remove them again.
1934 """
1935 if not self.options.archall_inconsistency_allowed: 1935 ↛ exitline 1935 didn't return from function '_remove_archall_faux_packages' because the condition on line 1935 was always true
1936 all_binaries = self.all_binaries
1937 faux_a = {x for x in all_binaries.keys() if x[2] == "faux"}
1938 for pkg_a in faux_a: 1938 ↛ 1939line 1938 didn't jump to line 1939 because the loop on line 1938 never started
1939 del all_binaries[pkg_a]
1941 for suite in self.suite_info._suites.values():
1942 for arch in suite.binaries.keys():
1943 binaries = suite.binaries[arch]
1944 faux_b = {x for x in binaries if binaries[x].pkg_id[2] == "faux"}
1945 for pkg_b in faux_b: 1945 ↛ 1946line 1945 didn't jump to line 1946 because the loop on line 1945 never started
1946 del binaries[pkg_b]
1947 sources = suite.sources
1948 for src in sources.keys():
1949 faux_s = {x for x in sources[src].binaries if x[2] == "faux"}
1950 sources[src].binaries -= faux_s
1952 def main(self) -> None:
1953 """Main method
1955 This is the entry point for the class: it includes the list of calls
1956 for the member methods which will produce the output files.
1957 """
1958 # if running in --print-uninst mode, quit
1959 if self.options.print_uninst: 1959 ↛ 1960line 1959 didn't jump to line 1960 because the condition on line 1959 was never true
1960 return
1961 # if no actions are provided, build the excuses and sort them
1962 elif not self.options.actions: 1962 ↛ 1966line 1962 didn't jump to line 1966 because the condition on line 1962 was always true
1963 self.write_excuses()
1964 # otherwise, use the actions provided by the command line
1965 else:
1966 self.upgrade_me = self.options.actions.split()
1968 self._remove_archall_faux_packages()
1970 if self.options.compute_migrations or self.options.hint_tester:
1971 if self.options.dry_run: 1971 ↛ 1972line 1971 didn't jump to line 1972 because the condition on line 1971 was never true
1972 self.logger.info(
1973 "Upgrade output not (also) written to a separate file"
1974 " as this is a dry-run."
1975 )
1976 elif hasattr(self.options, "upgrade_output"): 1976 ↛ 1986line 1976 didn't jump to line 1986 because the condition on line 1976 was always true
1977 upgrade_output = getattr(self.options, "upgrade_output")
1978 file_handler = logging.FileHandler(
1979 upgrade_output, mode="w", encoding="utf-8"
1980 )
1981 output_formatter = logging.Formatter("%(message)s")
1982 file_handler.setFormatter(output_formatter)
1983 self.output_logger.addHandler(file_handler)
1984 self.logger.info("Logging upgrade output to %s", upgrade_output)
1985 else:
1986 self.logger.info(
1987 "Upgrade output not (also) written to a separate file"
1988 " as the UPGRADE_OUTPUT configuration is not provided."
1989 )
1991 # run the hint tester
1992 if self.options.hint_tester: 1992 ↛ 1993line 1992 didn't jump to line 1993 because the condition on line 1992 was never true
1993 self.hint_tester()
1994 # run the upgrade test
1995 else:
1996 self.upgrade_testing()
1998 self.logger.info("> Stats from the installability tester")
1999 for stat in self._inst_tester.stats.stats():
2000 self.logger.info("> %s", stat)
2001 else:
2002 self.logger.info("Migration computation skipped as requested.")
2003 if not self.options.dry_run: 2003 ↛ 2005line 2003 didn't jump to line 2005 because the condition on line 2003 was always true
2004 self._policy_engine.save_state(self)
2005 logging.shutdown()
2008if __name__ == "__main__": 2008 ↛ 2009line 2008 didn't jump to line 2009 because the condition on line 2008 was never true
2009 Britney().main()