Coverage for britney2/britney.py: 83%
769 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-08 19:15 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-08 19:15 +0000
1#!/usr/bin/python3 -u
3# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org>
4# Andreas Barth <aba@debian.org>
5# Fabio Tranchitella <kobold@debian.org>
6# Copyright (C) 2010-2013 Adam D. Barratt <adsb@debian.org>
8# This program is free software; you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
18"""
19= Introduction =
21This is the Debian testing updater script, also known as "Britney".
23Packages are usually installed into the `testing' distribution after
24they have undergone some degree of testing in unstable. The goal of
25this software is to do this task in a smart way, allowing testing
26to always be fully installable and close to being a release candidate.
28Britney's source code is split between two different but related tasks:
29the first one is the generation of the update excuses, while the
30second tries to update testing with the valid candidates; first
31each package alone, then larger and even larger sets of packages
32together. Each try is accepted if testing is not more uninstallable
33after the update than before.
35= Data Loading =
37In order to analyze the entire Debian distribution, Britney needs to
38load in memory the whole archive: this means more than 10.000 packages
39for twelve architectures, as well as the dependency interconnections
40between them. For this reason, the memory requirements for running this
41software are quite high and at least 1 gigabyte of RAM should be available.
43Britney loads the source packages from the `Sources' file and the binary
44packages from the `Packages_${arch}' files, where ${arch} is substituted
45with the supported architectures. While loading the data, the software
46analyzes the dependencies and builds a directed weighted graph in memory
47with all the interconnections between the packages (see Britney.read_sources
48and Britney.read_binaries).
50Other than source and binary packages, Britney loads the following data:
52 * rc-bugs-*, which contains the list of release-critical bugs for a given
53 version of a source or binary package (see RCBugPolicy.read_bugs).
55 * age-policy-dates, which contains the date of the upload of a given version
56 of a source package (see Britney.read_dates).
58 * age-policy-urgencies, which contains the urgency of the upload of a given
59 version of a source package (see AgePolicy._read_urgencies).
61 * Hints, which contains lists of commands which modify the standard behaviour
62 of Britney (see Britney.read_hints).
64 * Other policies typically require their own data.
66For a more detailed explanation about the format of these files, please read
67the documentation of the related methods. The exact meaning of them will be
68instead explained in the chapter "Excuses Generation".
70= Excuses =
72An excuse is a detailed explanation of why a package can or cannot
73be updated in the testing distribution from a newer package in
74another distribution (like for example unstable). The main purpose
75of the excuses is to be written in an HTML file which will be
76published over HTTP, as well as a YAML file. The maintainers will be able
77to parse it manually or automatically to find the explanation of why their
78packages have been updated or not.
80== Excuses generation ==
82These are the steps (with references to method names) that Britney
83does for the generation of the update excuses.
85 * If a source package is available in testing but it is not
86 present in unstable and no binary packages in unstable are
87 built from it, then it is marked for removal.
89 * Every source package in unstable and testing-proposed-updates,
90 if already present in testing, is checked for binary-NMUs, new
91 or dropped binary packages in all the supported architectures
92 (see Britney.should_upgrade_srcarch). The steps to detect if an
93 upgrade is needed are:
95 1. If there is a `remove' hint for the source package, the package
96 is ignored: it will be removed and not updated.
98 2. For every binary package built from the new source, it checks
99 for unsatisfied dependencies, new binary packages and updated
100 binary packages (binNMU), excluding the architecture-independent
101 ones, and packages not built from the same source.
103 3. For every binary package built from the old source, it checks
104 if it is still built from the new source; if this is not true
105 and the package is not architecture-independent, the script
106 removes it from testing.
108 4. Finally, if there is something worth doing (eg. a new or updated
109 binary package) and nothing wrong it marks the source package
110 as "Valid candidate", or "Not considered" if there is something
111 wrong which prevented the update.
113 * Every source package in unstable and testing-proposed-updates is
114 checked for upgrade (see Britney.should_upgrade_src). The steps
115 to detect if an upgrade is needed are:
117 1. If the source package in testing is more recent the new one
118 is ignored.
120 2. If the source package doesn't exist (is fake), which means that
121 a binary package refers to it but it is not present in the
122 `Sources' file, the new one is ignored.
124 3. If the package doesn't exist in testing, the urgency of the
125 upload is ignored and set to the default (actually `low').
127 4. If there is a `remove' hint for the source package, the package
128 is ignored: it will be removed and not updated.
130 5. If there is a `block' hint for the source package without an
131 `unblock` hint or a `block-all source`, the package is ignored.
133 6. If there is a `block-udeb' hint for the source package, it will
134 have the same effect as `block', but may only be cancelled by
135 a subsequent `unblock-udeb' hint.
137 7. If the suite is unstable, the update can go ahead only if the
138 upload happened more than the minimum days specified by the
139 urgency of the upload; if this is not true, the package is
140 ignored as `too-young'. Note that the urgency is sticky, meaning
141 that the highest urgency uploaded since the previous testing
142 transition is taken into account.
144 8. If the suite is unstable, all the architecture-dependent binary
145 packages and the architecture-independent ones for the `nobreakall'
146 architectures have to be built from the source we are considering.
147 If this is not true, then these are called `out-of-date'
148 architectures and the package is ignored.
150 9. The source package must have at least one binary package, otherwise
151 it is ignored.
153 10. If the suite is unstable, the new source package must have no
154 release critical bugs which do not also apply to the testing
155 one. If this is not true, the package is ignored as `buggy'.
157 11. If there is a `force' hint for the source package, then it is
158 updated even if it is marked as ignored from the previous steps.
160 12. If the suite is {testing-,}proposed-updates, the source package can
161 be updated only if there is an explicit approval for it. Unless
162 a `force' hint exists, the new package must also be available
163 on all of the architectures for which it has binary packages in
164 testing.
166 13. If the package will be ignored, mark it as "Valid candidate",
167 otherwise mark it as "Not considered".
169 * The list of `remove' hints is processed: if the requested source
170 package is not already being updated or removed and the version
171 actually in testing is the same specified with the `remove' hint,
172 it is marked for removal.
174 * The excuses are sorted by the number of days from the last upload
175 (days-old) and by name.
177 * A list of unconsidered excuses (for which the package is not upgraded)
178 is built. Using this list, all of the excuses depending on them are
179 marked as invalid "impossible dependencies".
181 * The excuses are written in an HTML file.
182"""
183import contextlib
184import logging
185import optparse
186import os
187import sys
188import time
189from collections import defaultdict
190from collections.abc import Iterator
191from functools import reduce
192from itertools import chain
193from operator import attrgetter
194from typing import TYPE_CHECKING, Any, Optional, cast
196import apt_pkg
198from britney2 import BinaryPackage, BinaryPackageId, SourcePackage, Suites
199from britney2.excusefinder import ExcuseFinder
200from britney2.hints import Hint, HintCollection, HintParser
201from britney2.inputs.suiteloader import (
202 DebMirrorLikeSuiteContentLoader,
203 MissingRequiredConfigurationError,
204)
205from britney2.installability.builder import build_installability_tester
206from britney2.installability.solver import InstallabilitySolver
207from britney2.migration import MigrationManager
208from britney2.migrationitem import MigrationItem, MigrationItemFactory
209from britney2.policies.autopkgtest import AutopkgtestPolicy
210from britney2.policies.lintian import LintianPolicy
211from britney2.policies.policy import (
212 AgePolicy,
213 BlockPolicy,
214 BuildDependsPolicy,
215 BuiltOnBuilddPolicy,
216 BuiltUsingPolicy,
217 DependsPolicy,
218 ImplicitDependencyPolicy,
219 PiupartsPolicy,
220 PolicyEngine,
221 PolicyLoadRequest,
222 RCBugPolicy,
223 ReproduciblePolicy,
224 ReverseRemovalPolicy,
225)
226from britney2.utils import (
227 MigrationConstraintException,
228 clone_nuninst,
229 compile_nuninst,
230 format_and_log_uninst,
231 is_nuninst_asgood_generous,
232 log_and_format_old_libraries,
233 newly_uninst,
234 old_libraries,
235 parse_option,
236 parse_provides,
237 read_nuninst,
238 write_excuses,
239 write_heidi,
240 write_heidi_delta,
241 write_nuninst,
242)
244if TYPE_CHECKING: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true
245 from .excuse import Excuse
246 from .installability.tester import InstallabilityTester
247 from .installability.universe import BinaryPackageUniverse
248 from .transaction import MigrationTransactionState
251__author__ = "Fabio Tranchitella and the Debian Release Team"
252__version__ = "2.0"
255MIGRATION_POLICIES = [
256 PolicyLoadRequest.always_load(DependsPolicy),
257 PolicyLoadRequest.conditionally_load(RCBugPolicy, "rcbug_enable", True),
258 PolicyLoadRequest.conditionally_load(PiupartsPolicy, "piuparts_enable", True),
259 PolicyLoadRequest.always_load(ImplicitDependencyPolicy),
260 PolicyLoadRequest.conditionally_load(AutopkgtestPolicy, "adt_enable", True),
261 PolicyLoadRequest.conditionally_load(LintianPolicy, "lintian_enable", False),
262 PolicyLoadRequest.conditionally_load(ReproduciblePolicy, "repro_enable", False),
263 PolicyLoadRequest.conditionally_load(AgePolicy, "age_enable", True),
264 PolicyLoadRequest.always_load(BuildDependsPolicy),
265 PolicyLoadRequest.always_load(BlockPolicy),
266 PolicyLoadRequest.conditionally_load(
267 BuiltUsingPolicy, "built_using_policy_enable", True
268 ),
269 PolicyLoadRequest.conditionally_load(BuiltOnBuilddPolicy, "check_buildd", False),
270 PolicyLoadRequest.always_load(ReverseRemovalPolicy),
271]
274class Britney:
275 """Britney, the Debian testing updater script
277 This is the script that updates the testing distribution. It is executed
278 each day after the installation of the updated packages. It generates the
279 `Packages' files for the testing distribution, but it does so in an
280 intelligent manner; it tries to avoid any inconsistency and to use only
281 non-buggy packages.
283 For more documentation on this script, please read the Developers Reference.
284 """
286 HINTS_HELPERS = (
287 "easy",
288 "hint",
289 "remove",
290 "block",
291 "block-udeb",
292 "unblock",
293 "unblock-udeb",
294 "approve",
295 "remark",
296 "ignore-piuparts",
297 "ignore-rc-bugs",
298 "force-skiptest",
299 "force-badtest",
300 )
301 HINTS_STANDARD = ("urgent", "age-days") + HINTS_HELPERS
302 # ALL = {"force", "force-hint", "block-all"} | HINTS_STANDARD | registered policy hints (not covered above)
303 HINTS_ALL = "ALL"
304 pkg_universe: "BinaryPackageUniverse"
305 _inst_tester: "InstallabilityTester"
306 constraints: dict[str, list[str]]
307 suite_info: Suites
309 def __init__(self) -> None:
310 """Class constructor
312 This method initializes and populates the data lists, which contain all
313 the information needed by the other methods of the class.
314 """
316 # setup logging - provide the "short level name" (i.e. INFO -> I) that
317 # we used to use prior to using the logging module.
319 old_factory = logging.getLogRecordFactory()
320 short_level_mapping = {
321 "CRITICAL": "F",
322 "INFO": "I",
323 "WARNING": "W",
324 "ERROR": "E",
325 "DEBUG": "N",
326 }
328 def record_factory(
329 *args: Any, **kwargs: Any
330 ) -> logging.LogRecord: # pragma: no cover
331 record = old_factory(*args, **kwargs)
332 try:
333 record.shortlevelname = short_level_mapping[record.levelname]
334 except KeyError:
335 record.shortlevelname = record.levelname
336 return record
338 logging.setLogRecordFactory(record_factory)
339 logging.basicConfig(
340 format="{shortlevelname}: [{asctime}] - {message}",
341 style="{",
342 datefmt="%Y-%m-%dT%H:%M:%S%z",
343 stream=sys.stdout,
344 )
346 self.logger = logging.getLogger()
348 # Logger for "upgrade_output"; the file handler will be attached later when
349 # we are ready to open the file.
350 self.output_logger = logging.getLogger("britney2.output.upgrade_output")
351 self.output_logger.setLevel(logging.INFO)
353 # initialize the apt_pkg back-end
354 apt_pkg.init()
356 # parse the command line arguments
357 self._policy_engine = PolicyEngine()
358 self.__parse_arguments()
359 assert self.suite_info is not None # for type checking
361 self.all_selected: list[MigrationItem] = []
362 self.excuses: dict[str, "Excuse"] = {}
363 self.upgrade_me: list[MigrationItem] = []
365 if self.options.nuninst_cache: 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true
366 self.logger.info(
367 "Not building the list of non-installable packages, as requested"
368 )
369 if self.options.print_uninst:
370 nuninst = read_nuninst(
371 self.options.noninst_status, self.options.architectures
372 )
373 print("* summary")
374 print(
375 "\n".join(
376 "%4d %s" % (len(nuninst[x]), x)
377 for x in self.options.architectures
378 )
379 )
380 return
382 try:
383 constraints_file = os.path.join(
384 self.options.static_input_dir, "constraints"
385 )
386 faux_packages = os.path.join(self.options.static_input_dir, "faux-packages")
387 except AttributeError:
388 self.logger.info("The static_input_dir option is not set")
389 constraints_file = None
390 faux_packages = None
391 if faux_packages is not None and os.path.exists(faux_packages):
392 self.logger.info("Loading faux packages from %s", faux_packages)
393 self._load_faux_packages(faux_packages)
394 elif faux_packages is not None: 394 ↛ 397line 394 didn't jump to line 397 because the condition on line 394 was always true
395 self.logger.info("No Faux packages as %s does not exist", faux_packages)
397 if constraints_file is not None and os.path.exists(constraints_file):
398 self.logger.info("Loading constraints from %s", constraints_file)
399 self.constraints = self._load_constraints(constraints_file)
400 else:
401 if constraints_file is not None: 401 ↛ 405line 401 didn't jump to line 405
402 self.logger.info(
403 "No constraints as %s does not exist", constraints_file
404 )
405 self.constraints = {
406 "keep-installable": [],
407 }
409 self.logger.info("Compiling Installability tester")
410 self.pkg_universe, self._inst_tester = build_installability_tester(
411 self.suite_info, self.options.architectures
412 )
413 target_suite = self.suite_info.target_suite
414 target_suite.inst_tester = self._inst_tester
416 self.allow_uninst: dict[str, set[str | None]] = {}
417 for arch in self.options.architectures:
418 self.allow_uninst[arch] = set()
419 self._migration_item_factory: MigrationItemFactory = MigrationItemFactory(
420 self.suite_info
421 )
422 self._hint_parser: HintParser = HintParser(self._migration_item_factory)
423 self._migration_manager: MigrationManager = MigrationManager(
424 self.options,
425 self.suite_info,
426 self.all_binaries,
427 self.pkg_universe,
428 self.constraints,
429 self.allow_uninst,
430 self._migration_item_factory,
431 self.hints,
432 )
434 if not self.options.nuninst_cache: 434 ↛ 474line 434 didn't jump to line 474 because the condition on line 434 was always true
435 self.logger.info(
436 "Building the list of non-installable packages for the full archive"
437 )
438 self._inst_tester.compute_installability()
439 nuninst = compile_nuninst(
440 target_suite, self.options.architectures, self.options.nobreakall_arches
441 )
442 self.nuninst_orig: dict[str, set[str]] = nuninst
443 for arch in self.options.architectures:
444 self.logger.info(
445 "> Found %d non-installable packages for %s",
446 len(nuninst[arch]),
447 arch,
448 )
449 if self.options.print_uninst: 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true
450 self.nuninst_arch_report(nuninst, arch)
452 if self.options.print_uninst: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 print("* summary")
454 print(
455 "\n".join(
456 map(
457 lambda x: "%4d %s" % (len(nuninst[x]), x),
458 self.options.architectures,
459 )
460 )
461 )
462 return
463 else:
464 write_nuninst(self.options.noninst_status, nuninst)
466 stats = self._inst_tester.compute_stats()
467 self.logger.info("> Installability tester statistics (per architecture)")
468 for arch in self.options.architectures:
469 arch_stat = stats[arch]
470 self.logger.info("> %s", arch)
471 for stat in arch_stat.stat_summary():
472 self.logger.info("> - %s", stat)
473 else:
474 self.logger.info("Loading uninstallability counters from cache")
475 self.nuninst_orig = read_nuninst(
476 self.options.noninst_status, self.options.architectures
477 )
479 # nuninst_orig may get updated during the upgrade process
480 self.nuninst_orig_save: dict[str, set[str]] = clone_nuninst(
481 self.nuninst_orig, architectures=self.options.architectures
482 )
484 self._policy_engine.register_policy_hints(self._hint_parser)
486 try:
487 self.read_hints(self.options.hintsdir)
488 except AttributeError:
489 self.read_hints(os.path.join(self.suite_info["unstable"].path, "Hints"))
491 self._policy_engine.initialise(self, self.hints)
493 def __parse_arguments(self) -> None:
494 """Parse the command line arguments
496 This method parses and initializes the command line arguments.
497 While doing so, it preprocesses some of the options to be converted
498 in a suitable form for the other methods of the class.
499 """
500 # initialize the parser
501 parser = optparse.OptionParser(version="%prog")
502 parser.add_option(
503 "-v", "", action="count", dest="verbose", help="enable verbose output"
504 )
505 parser.add_option(
506 "-c",
507 "--config",
508 action="store",
509 dest="config",
510 default="/etc/britney.conf",
511 help="path for the configuration file",
512 )
513 parser.add_option(
514 "",
515 "--architectures",
516 action="store",
517 dest="architectures",
518 default=None,
519 help="override architectures from configuration file",
520 )
521 parser.add_option(
522 "",
523 "--actions",
524 action="store",
525 dest="actions",
526 default=None,
527 help="override the list of actions to be performed",
528 )
529 parser.add_option(
530 "",
531 "--hints",
532 action="store",
533 dest="hints",
534 default=None,
535 help="additional hints, separated by semicolons",
536 )
537 parser.add_option(
538 "",
539 "--hint-tester",
540 action="store_true",
541 dest="hint_tester",
542 default=None,
543 help="provide a command line interface to test hints",
544 )
545 parser.add_option(
546 "",
547 "--dry-run",
548 action="store_true",
549 dest="dry_run",
550 default=False,
551 help="disable all outputs to the testing directory",
552 )
553 parser.add_option(
554 "",
555 "--nuninst-cache",
556 action="store_true",
557 dest="nuninst_cache",
558 default=False,
559 help="do not build the non-installability status, use the cache from file",
560 )
561 parser.add_option(
562 "",
563 "--print-uninst",
564 action="store_true",
565 dest="print_uninst",
566 default=False,
567 help="just print a summary of uninstallable packages",
568 )
569 parser.add_option(
570 "",
571 "--compute-migrations",
572 action="store_true",
573 dest="compute_migrations",
574 default=True,
575 help="Compute which packages can migrate (the default)",
576 )
577 parser.add_option(
578 "",
579 "--no-compute-migrations",
580 action="store_false",
581 dest="compute_migrations",
582 help="Do not compute which packages can migrate.",
583 )
584 parser.add_option(
585 "",
586 "--series",
587 action="store",
588 dest="series",
589 default="",
590 help="set distribution series name",
591 )
592 parser.add_option(
593 "",
594 "--distribution",
595 action="store",
596 dest="distribution",
597 default="debian",
598 help="set distribution name",
599 )
600 (self.options, self.args) = parser.parse_args()
602 if self.options.verbose: 602 ↛ 608line 602 didn't jump to line 608 because the condition on line 602 was always true
603 if self.options.verbose > 1: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 self.logger.setLevel(logging.DEBUG)
605 else:
606 self.logger.setLevel(logging.INFO)
607 else:
608 self.logger.setLevel(logging.WARNING)
609 # Historical way to get debug information (equivalent to -vv)
610 try: # pragma: no cover
611 if int(os.environ.get("BRITNEY_DEBUG", "0")):
612 self.logger.setLevel(logging.DEBUG)
613 except ValueError: # pragma: no cover
614 pass
616 # integrity checks
617 if self.options.nuninst_cache and self.options.print_uninst: # pragma: no cover
618 self.logger.error("nuninst_cache and print_uninst are mutually exclusive!")
619 sys.exit(1)
621 # if the configuration file exists, then read it and set the additional options
622 if not os.path.isfile(self.options.config): # pragma: no cover
623 self.logger.error(
624 "Unable to read the configuration file (%s), exiting!",
625 self.options.config,
626 )
627 sys.exit(1)
629 self.HINTS: dict[str, Any] = {"command-line": self.HINTS_ALL}
630 with open(self.options.config, encoding="utf-8") as config:
631 for line in config:
632 if "=" in line and not line.strip().startswith("#"):
633 k, v = line.split("=", 1)
634 k = k.strip()
635 v = v.strip()
636 if k.startswith("HINTS_"):
637 self.HINTS[k.split("_")[1].lower()] = reduce( 637 ↛ exitline 637 didn't jump to the function exit
638 lambda x, y: x + y,
639 [
640 hasattr(self, "HINTS_" + i)
641 and getattr(self, "HINTS_" + i)
642 or (i,)
643 for i in v.split()
644 ],
645 )
646 elif not hasattr(self.options, k.lower()) or not getattr(
647 self.options, k.lower()
648 ):
649 setattr(self.options, k.lower(), v)
651 parse_option(self.options, "archall_inconsistency_allowed", to_bool=True)
652 parse_option(
653 self.options, "be_strict_with_build_deps", default=True, to_bool=True
654 )
656 suite_loader = DebMirrorLikeSuiteContentLoader(self.options)
658 try:
659 self.suite_info = suite_loader.load_suites()
660 except MissingRequiredConfigurationError as e: # pragma: no cover
661 self.logger.error(
662 "Could not load the suite content due to missing configuration: %s",
663 str(e),
664 )
665 sys.exit(1)
666 self.all_binaries = suite_loader.all_binaries()
667 self.options.components = suite_loader.components
668 self.options.architectures = suite_loader.architectures
669 self.options.nobreakall_arches = suite_loader.nobreakall_arches
670 self.options.outofsync_arches = suite_loader.outofsync_arches
671 self.options.break_arches = suite_loader.break_arches
672 self.options.new_arches = suite_loader.new_arches
673 if self.options.series == "": 673 ↛ 676line 673 didn't jump to line 676 because the condition on line 673 was always true
674 self.options.series = self.suite_info.target_suite.name
676 if self.options.heidi_output and not hasattr( 676 ↛ 681line 676 didn't jump to line 681 because the condition on line 676 was always true
677 self.options, "heidi_delta_output"
678 ):
679 self.options.heidi_delta_output = self.options.heidi_output + "Delta"
681 self.options.smooth_updates = self.options.smooth_updates.split()
683 parse_option(self.options, "ignore_cruft", to_bool=True)
684 parse_option(self.options, "check_consistency_level", default=2, to_int=True)
685 parse_option(self.options, "build_url")
687 self._policy_engine.load_policies(
688 self.options, self.suite_info, MIGRATION_POLICIES
689 )
691 @property
692 def hints(self) -> HintCollection:
693 return self._hint_parser.hints
695 def _load_faux_packages(self, faux_packages_file: str) -> None:
696 """Loads fake packages
698 In rare cases, it is useful to create a "fake" package that can be used to satisfy
699 dependencies. This is usually needed for packages that are not shipped directly
700 on this mirror but is a prerequisite for using this mirror (e.g. some vendors provide
701 non-distributable "setup" packages and contrib/non-free packages depend on these).
703 :param faux_packages_file: Path to the file containing the fake package definitions
704 """
705 tag_file = apt_pkg.TagFile(faux_packages_file)
706 get_field = tag_file.section.get
707 step = tag_file.step
708 no = 0
709 pri_source_suite = self.suite_info.primary_source_suite
710 target_suite = self.suite_info.target_suite
712 while step():
713 no += 1
714 pkg_name = get_field("Package", None)
715 if pkg_name is None: # pragma: no cover
716 raise ValueError(
717 "Missing Package field in paragraph %d (file %s)"
718 % (no, faux_packages_file)
719 )
720 pkg_name = sys.intern(pkg_name)
721 version = sys.intern(get_field("Version", "1.0-1"))
722 provides_raw = get_field("Provides")
723 archs_raw = get_field("Architecture", None)
724 component = get_field("Component", "non-free")
725 if archs_raw: 725 ↛ 726line 725 didn't jump to line 726 because the condition on line 725 was never true
726 archs = archs_raw.split()
727 else:
728 archs = self.options.architectures
729 faux_section = "faux"
730 if component != "main": 730 ↛ 732line 730 didn't jump to line 732 because the condition on line 730 was always true
731 faux_section = "%s/faux" % component
732 src_data = SourcePackage(
733 pkg_name,
734 version,
735 sys.intern(faux_section),
736 set(),
737 None,
738 True,
739 None,
740 None,
741 [],
742 [],
743 )
745 target_suite.sources[pkg_name] = src_data
746 pri_source_suite.sources[pkg_name] = src_data
748 for arch in archs:
749 pkg_id = BinaryPackageId(pkg_name, version, arch)
750 if provides_raw: 750 ↛ 751line 750 didn't jump to line 751 because the condition on line 750 was never true
751 provides = parse_provides(
752 provides_raw, pkg_id=pkg_id, logger=self.logger
753 )
754 else:
755 provides = []
756 bin_data = BinaryPackage(
757 version,
758 faux_section,
759 pkg_name,
760 version,
761 arch,
762 get_field("Multi-Arch"),
763 None,
764 None,
765 provides,
766 False,
767 pkg_id,
768 [],
769 )
771 src_data.binaries.add(pkg_id)
772 target_suite.binaries[arch][pkg_name] = bin_data
773 pri_source_suite.binaries[arch][pkg_name] = bin_data
775 # register provided packages with the target suite provides table
776 for provided_pkg, provided_version, _ in bin_data.provides: 776 ↛ 777line 776 didn't jump to line 777 because the loop on line 776 never started
777 target_suite.provides_table[arch][provided_pkg].add(
778 (pkg_name, provided_version)
779 )
781 self.all_binaries[pkg_id] = bin_data
783 def _load_constraints(self, constraints_file: str) -> dict[str, list[str]]:
784 """Loads configurable constraints
786 The constraints file can contain extra rules that Britney should attempt
787 to satisfy. Examples can be "keep package X in testing and ensure it is
788 installable".
790 :param constraints_file: Path to the file containing the constraints
791 """
792 tag_file = apt_pkg.TagFile(constraints_file)
793 get_field = tag_file.section.get
794 step = tag_file.step
795 no = 0
796 faux_version = sys.intern("1")
797 faux_section = sys.intern("faux")
798 keep_installable: list[str] = []
799 constraints = {"keep-installable": keep_installable}
800 pri_source_suite = self.suite_info.primary_source_suite
801 target_suite = self.suite_info.target_suite
803 while step():
804 no += 1
805 pkg_name = get_field("Fake-Package-Name", None)
806 if pkg_name is None: # pragma: no cover
807 raise ValueError(
808 "Missing Fake-Package-Name field in paragraph %d (file %s)"
809 % (no, constraints_file)
810 )
811 pkg_name = sys.intern(pkg_name)
813 def mandatory_field(x: str) -> str:
814 v: str = get_field(x, None)
815 if v is None: # pragma: no cover
816 raise ValueError(
817 "Missing %s field for %s (file %s)"
818 % (x, pkg_name, constraints_file)
819 )
820 return v
822 constraint = mandatory_field("Constraint")
823 if constraint not in {"present-and-installable"}: # pragma: no cover
824 raise ValueError(
825 "Unsupported constraint %s for %s (file %s)"
826 % (constraint, pkg_name, constraints_file)
827 )
829 self.logger.info(" - constraint %s", pkg_name)
831 pkg_list = [
832 x.strip()
833 for x in mandatory_field("Package-List").split("\n")
834 if x.strip() != "" and not x.strip().startswith("#")
835 ]
836 src_data = SourcePackage(
837 pkg_name,
838 faux_version,
839 faux_section,
840 set(),
841 None,
842 True,
843 None,
844 None,
845 [],
846 [],
847 )
848 target_suite.sources[pkg_name] = src_data
849 pri_source_suite.sources[pkg_name] = src_data
850 keep_installable.append(pkg_name)
851 for arch in self.options.architectures:
852 deps = []
853 for pkg_spec in pkg_list:
854 s = pkg_spec.split(None, 1)
855 if len(s) == 1:
856 deps.append(s[0])
857 else:
858 pkg, arch_res = s
859 if not (
860 arch_res.startswith("[") and arch_res.endswith("]")
861 ): # pragma: no cover
862 raise ValueError(
863 "Invalid arch-restriction on %s - should be [arch1 arch2] (for %s file %s)"
864 % (pkg, pkg_name, constraints_file)
865 )
866 arch_res_l = arch_res[1:-1].split()
867 if not arch_res_l: # pragma: no cover
868 msg = "Empty arch-restriction for %s: Uses comma or negation (for %s file %s)"
869 raise ValueError(msg % (pkg, pkg_name, constraints_file))
870 for a in arch_res_l:
871 if a == arch:
872 deps.append(pkg)
873 elif "," in a or "!" in a: # pragma: no cover
874 msg = "Invalid arch-restriction for %s: Uses comma or negation (for %s file %s)"
875 raise ValueError(
876 msg % (pkg, pkg_name, constraints_file)
877 )
878 pkg_id = BinaryPackageId(pkg_name, faux_version, arch)
879 bin_data = BinaryPackage(
880 faux_version,
881 faux_section,
882 pkg_name,
883 faux_version,
884 arch,
885 "no",
886 ", ".join(deps),
887 None,
888 [],
889 False,
890 pkg_id,
891 [],
892 )
893 src_data.binaries.add(pkg_id)
894 target_suite.binaries[arch][pkg_name] = bin_data
895 pri_source_suite.binaries[arch][pkg_name] = bin_data
896 self.all_binaries[pkg_id] = bin_data
898 return constraints
900 # Data reading/writing methods
901 # ----------------------------
903 def read_hints(self, hintsdir: str) -> None:
904 """Read the hint commands from the specified directory
906 The hint commands are read from the files contained in the directory
907 specified by the `hintsdir' parameter.
908 The names of the files have to be the same as the authorized users
909 for the hints.
911 The file contains rows with the format:
913 <command> <package-name>[/<version>]
915 The method returns a dictionary where the key is the command, and
916 the value is the list of affected packages.
917 """
919 for who in self.HINTS.keys():
920 if who == "command-line":
921 lines = self.options.hints and self.options.hints.split(";") or ()
922 filename = "<cmd-line>"
923 self._hint_parser.parse_hints(who, self.HINTS[who], filename, lines)
924 else:
925 filename = os.path.join(hintsdir, who)
926 if not os.path.isfile(filename): 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true
927 self.logger.error(
928 "Cannot read hints list from %s, no such file!", filename
929 )
930 continue
931 self.logger.info("Loading hints list from %s", filename)
932 with open(filename, encoding="utf-8") as f:
933 self._hint_parser.parse_hints(who, self.HINTS[who], filename, f)
935 hints = self._hint_parser.hints
937 for x in [
938 "block",
939 "block-all",
940 "block-udeb",
941 "unblock",
942 "unblock-udeb",
943 "force",
944 "urgent",
945 "remove",
946 "age-days",
947 ]:
948 z: dict[str | None, dict[str | None, tuple[Hint, str]]] = defaultdict(dict)
949 for hint in hints[x]:
950 package = hint.package
951 architecture = hint.architecture
952 key = (hint, hint.user)
953 if (
954 package in z
955 and architecture in z[package]
956 and z[package][architecture] != key
957 ):
958 hint2 = z[package][architecture][0]
959 if x in ["unblock", "unblock-udeb"]: 959 ↛ 991line 959 didn't jump to line 991 because the condition on line 959 was always true
960 assert hint.version is not None
961 assert hint2.version is not None
962 if apt_pkg.version_compare(hint2.version, hint.version) < 0:
963 # This hint is for a newer version, so discard the old one
964 self.logger.warning(
965 "Overriding %s[%s] = ('%s', '%s', '%s') with ('%s', '%s', '%s')",
966 x,
967 package,
968 hint2.version,
969 hint2.architecture,
970 hint2.user,
971 hint.version,
972 hint.architecture,
973 hint.user,
974 )
975 hint2.set_active(False)
976 else:
977 # This hint is for an older version, so ignore it in favour of the new one
978 self.logger.warning(
979 "Ignoring %s[%s] = ('%s', '%s', '%s'), ('%s', '%s', '%s') is higher or equal",
980 x,
981 package,
982 hint.version,
983 hint.architecture,
984 hint.user,
985 hint2.version,
986 hint2.architecture,
987 hint2.user,
988 )
989 hint.set_active(False)
990 else:
991 self.logger.warning(
992 "Overriding %s[%s] = ('%s', '%s') with ('%s', '%s')",
993 x,
994 package,
995 hint2.user,
996 hint2,
997 hint.user,
998 hint,
999 )
1000 hint2.set_active(False)
1002 z[package][architecture] = key
1004 for hint in hints["allow-uninst"]:
1005 if hint.architecture == "source":
1006 for arch in self.options.architectures:
1007 self.allow_uninst[arch].add(hint.package)
1008 else:
1009 assert hint.architecture is not None
1010 self.allow_uninst[hint.architecture].add(hint.package)
1012 # Sanity check the hints hash
1013 if len(hints["block"]) == 0 and len(hints["block-udeb"]) == 0: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true
1014 self.logger.warning("WARNING: No block hints at all, not even udeb ones!")
1016 def write_excuses(self) -> None:
1017 """Produce and write the update excuses
1019 This method handles the update excuses generation: the packages are
1020 looked at to determine whether they are valid candidates. For the details
1021 of this procedure, please refer to the module docstring.
1022 """
1024 self.logger.info("Update Excuses generation started")
1026 mi_factory = self._migration_item_factory
1027 excusefinder = ExcuseFinder(
1028 self.options,
1029 self.suite_info,
1030 self.all_binaries,
1031 self.pkg_universe,
1032 self._policy_engine,
1033 mi_factory,
1034 self.hints,
1035 )
1037 excuses, upgrade_me = excusefinder.find_actionable_excuses()
1038 self.excuses = excuses
1040 # sort the list of candidates
1041 self.upgrade_me = sorted(upgrade_me)
1042 old_lib_removals = old_libraries(
1043 mi_factory, self.suite_info, self.options.outofsync_arches
1044 )
1045 self.upgrade_me.extend(old_lib_removals)
1046 self.output_logger.info(
1047 "List of old libraries added to upgrade_me (%d):", len(old_lib_removals)
1048 )
1049 log_and_format_old_libraries(self.output_logger, old_lib_removals)
1051 # write excuses to the output file
1052 if not self.options.dry_run: 1052 ↛ 1065line 1052 didn't jump to line 1065 because the condition on line 1052 was always true
1053 self.logger.info("> Writing Excuses to %s", self.options.excuses_output)
1054 write_excuses(
1055 excuses, self.options.excuses_output, output_format="legacy-html"
1056 )
1057 if hasattr(self.options, "excuses_yaml_output"): 1057 ↛ 1065line 1057 didn't jump to line 1065 because the condition on line 1057 was always true
1058 self.logger.info(
1059 "> Writing YAML Excuses to %s", self.options.excuses_yaml_output
1060 )
1061 write_excuses(
1062 excuses, self.options.excuses_yaml_output, output_format="yaml"
1063 )
1065 self.logger.info("Update Excuses generation completed")
1067 # Upgrade run
1068 # -----------
1070 def eval_nuninst(
1071 self,
1072 nuninst: dict[str, set[str]],
1073 original: dict[str, set[str]] | None = None,
1074 ) -> str:
1075 """Return a string which represents the uninstallability counters
1077 This method returns a string which represents the uninstallability
1078 counters reading the uninstallability statistics `nuninst` and, if
1079 present, merging the results with the `original` one.
1081 An example of the output string is:
1082 1+2: i-0:a-0:a-0:h-0:i-1:m-0:m-0:p-0:a-0:m-0:s-2:s-0
1084 where the first part is the number of broken packages in non-break
1085 architectures + the total number of broken packages for all the
1086 architectures.
1087 """
1088 res = []
1089 total = 0
1090 totalbreak = 0
1091 for arch in self.options.architectures:
1092 if arch in nuninst: 1092 ↛ 1094line 1092 didn't jump to line 1094 because the condition on line 1092 was always true
1093 n = len(nuninst[arch])
1094 elif original and arch in original:
1095 n = len(original[arch])
1096 else:
1097 continue
1098 if arch in self.options.break_arches:
1099 totalbreak = totalbreak + n
1100 else:
1101 total = total + n
1102 res.append("%s-%d" % (arch[0], n))
1103 return "%d+%d: %s" % (total, totalbreak, ":".join(res))
1105 def iter_packages(
1106 self,
1107 packages: list[MigrationItem],
1108 selected: list[MigrationItem],
1109 nuninst: dict[str, set[str]] | None = None,
1110 ) -> tuple[dict[str, set[str]] | None, list[MigrationItem]]:
1111 """Iter on the list of actions and apply them one-by-one
1113 This method applies the changes from `packages` to testing, checking the uninstallability
1114 counters for every action performed. If the action does not improve them, it is reverted.
1115 The method returns the new uninstallability counters and the remaining actions if the
1116 final result is successful, otherwise (None, []).
1118 :param selected: list of MigrationItem?
1119 :param nuninst: dict with sets ? of ? per architecture
1120 """
1121 assert self.suite_info is not None # for type checking
1122 group_info = {}
1123 rescheduled_packages = packages
1124 maybe_rescheduled_packages: list[MigrationItem] = []
1125 output_logger = self.output_logger
1126 solver = InstallabilitySolver(self.pkg_universe, self._inst_tester)
1127 mm = self._migration_manager
1128 target_suite = self.suite_info.target_suite
1130 for y in sorted((y for y in packages), key=attrgetter("uvname")):
1131 try:
1132 _, updates, rms, _ = mm.compute_groups(y)
1133 result = (y, sorted(updates), sorted(rms))
1134 group_info[y] = result
1135 except MigrationConstraintException as e:
1136 rescheduled_packages.remove(y)
1137 output_logger.info("not adding package to list: %s", (y.package))
1138 output_logger.info(" got exception: %s" % (repr(e)))
1140 if nuninst:
1141 nuninst_orig = nuninst
1142 else:
1143 nuninst_orig = self.nuninst_orig
1145 nuninst_last_accepted = nuninst_orig
1147 output_logger.info(
1148 "recur: [] %s %d/0", ",".join(x.uvname for x in selected), len(packages)
1149 )
1150 while rescheduled_packages:
1151 groups = [group_info[x] for x in rescheduled_packages]
1152 worklist = solver.solve_groups(groups)
1153 rescheduled_packages = []
1155 worklist.reverse()
1157 while worklist:
1158 comp = worklist.pop()
1159 comp_name = " ".join(item.uvname for item in comp)
1160 output_logger.info("trying: %s" % comp_name)
1161 with mm.start_transaction() as transaction:
1162 accepted = False
1163 try:
1164 (
1165 accepted,
1166 nuninst_after,
1167 failed_arch,
1168 new_cruft,
1169 ) = mm.migrate_items_to_target_suite(
1170 comp, nuninst_last_accepted
1171 )
1172 if accepted:
1173 selected.extend(comp)
1174 transaction.commit()
1175 output_logger.info("accepted: %s", comp_name)
1176 output_logger.info(
1177 " ori: %s", self.eval_nuninst(nuninst_orig)
1178 )
1179 output_logger.info(
1180 " pre: %s", self.eval_nuninst(nuninst_last_accepted)
1181 )
1182 output_logger.info(
1183 " now: %s", self.eval_nuninst(nuninst_after)
1184 )
1185 if new_cruft:
1186 output_logger.info(
1187 " added new cruft items to list: %s",
1188 " ".join(x.uvname for x in sorted(new_cruft)),
1189 )
1191 if len(selected) <= 20:
1192 output_logger.info(
1193 " all: %s", " ".join(x.uvname for x in selected)
1194 )
1195 else:
1196 output_logger.info(
1197 " most: (%d) .. %s",
1198 len(selected),
1199 " ".join(x.uvname for x in selected[-20:]),
1200 )
1201 if self.options.check_consistency_level >= 3: 1201 ↛ 1202line 1201 didn't jump to line 1202 because the condition on line 1201 was never true
1202 target_suite.check_suite_source_pkg_consistency(
1203 "iter_packages after commit"
1204 )
1205 nuninst_last_accepted = nuninst_after
1206 for cruft_item in new_cruft:
1207 try:
1208 _, updates, rms, _ = mm.compute_groups(cruft_item)
1209 result = (cruft_item, sorted(updates), sorted(rms))
1210 group_info[cruft_item] = result
1211 worklist.append([cruft_item])
1212 except MigrationConstraintException as e:
1213 output_logger.info(
1214 " got exception adding cruft item %s to list: %s"
1215 % (cruft_item.uvname, repr(e))
1216 )
1217 rescheduled_packages.extend(maybe_rescheduled_packages)
1218 maybe_rescheduled_packages.clear()
1219 else:
1220 transaction.rollback()
1221 assert failed_arch # type checking
1222 broken = sorted(
1223 b
1224 for b in nuninst_after[failed_arch]
1225 if b not in nuninst_last_accepted[failed_arch]
1226 )
1227 compare_nuninst = None
1228 if any(
1229 item for item in comp if item.architecture != "source"
1230 ):
1231 compare_nuninst = nuninst_last_accepted
1232 # NB: try_migration already reverted this for us, so just print the results and move on
1233 output_logger.info(
1234 "skipped: %s (%d, %d, %d)",
1235 comp_name,
1236 len(rescheduled_packages),
1237 len(maybe_rescheduled_packages),
1238 len(worklist),
1239 )
1240 output_logger.info(
1241 " got: %s",
1242 self.eval_nuninst(nuninst_after, compare_nuninst),
1243 )
1244 output_logger.info(
1245 " * %s: %s", failed_arch, ", ".join(broken)
1246 )
1247 if self.options.check_consistency_level >= 3: 1247 ↛ 1248line 1247 didn't jump to line 1248 because the condition on line 1247 was never true
1248 target_suite.check_suite_source_pkg_consistency(
1249 "iter_package after rollback (not accepted)"
1250 )
1252 except MigrationConstraintException as e:
1253 transaction.rollback()
1254 output_logger.info(
1255 "skipped: %s (%d, %d, %d)",
1256 comp_name,
1257 len(rescheduled_packages),
1258 len(maybe_rescheduled_packages),
1259 len(worklist),
1260 )
1261 output_logger.info(" got exception: %s" % (repr(e)))
1262 if self.options.check_consistency_level >= 3: 1262 ↛ 1263line 1262 didn't jump to line 1263 because the condition on line 1262 was never true
1263 target_suite.check_suite_source_pkg_consistency(
1264 "iter_package after rollback (MigrationConstraintException)"
1265 )
1267 if not accepted:
1268 if len(comp) > 1:
1269 output_logger.info(
1270 " - splitting the component into single items and retrying them"
1271 )
1272 worklist.extend([item] for item in comp)
1273 else:
1274 maybe_rescheduled_packages.append(comp[0])
1276 output_logger.info(" finish: [%s]", ",".join(x.uvname for x in selected))
1277 output_logger.info("endloop: %s", self.eval_nuninst(self.nuninst_orig))
1278 output_logger.info(" now: %s", self.eval_nuninst(nuninst_last_accepted))
1279 format_and_log_uninst(
1280 output_logger,
1281 self.options.architectures,
1282 newly_uninst(self.nuninst_orig, nuninst_last_accepted),
1283 )
1284 output_logger.info("")
1286 return (nuninst_last_accepted, maybe_rescheduled_packages)
1288 def do_all(
1289 self,
1290 hinttype: str | None = None,
1291 init: list[MigrationItem] | None = None,
1292 actions: list[MigrationItem] | None = None,
1293 ) -> None:
1294 """Testing update runner
1296 This method tries to update testing checking the uninstallability
1297 counters before and after the actions to decide if the update was
1298 successful or not.
1299 """
1300 selected = []
1301 if actions:
1302 upgrade_me = actions[:]
1303 else:
1304 upgrade_me = self.upgrade_me[:]
1305 nuninst_start = self.nuninst_orig
1306 output_logger = self.output_logger
1307 target_suite = self.suite_info.target_suite
1309 # these are special parameters for hints processing
1310 force = False
1311 recurse = True
1312 nuninst_end = None
1313 extra: list[MigrationItem] = []
1314 mm = self._migration_manager
1316 if hinttype == "easy" or hinttype == "force-hint":
1317 force = hinttype == "force-hint"
1318 recurse = False
1320 # if we have a list of initial packages, check them
1321 if init:
1322 for x in init:
1323 if x not in upgrade_me:
1324 output_logger.warning(
1325 "failed: %s is not a valid candidate (or it already migrated)",
1326 x.uvname,
1327 )
1328 return None
1329 selected.append(x)
1330 upgrade_me.remove(x)
1332 output_logger.info("start: %s", self.eval_nuninst(nuninst_start))
1333 output_logger.info("orig: %s", self.eval_nuninst(nuninst_start))
1335 if not (init and not force):
1336 # No "outer" transaction needed as we will never need to rollback
1337 # (e.g. "force-hint" or a regular "main run"). Emulate the start_transaction
1338 # call from the MigrationManager, so the rest of the code follows the
1339 # same flow regardless of whether we need the transaction or not.
1341 @contextlib.contextmanager
1342 def _start_transaction() -> Iterator[Optional["MigrationTransactionState"]]:
1343 yield None
1345 else:
1346 # We will need to be able to roll back (e.g. easy or a "hint"-hint)
1347 _start_transaction = mm.start_transaction
1349 with _start_transaction() as transaction:
1350 if init:
1351 # init => a hint (e.g. "easy") - so do the hint run
1352 (_, nuninst_end, _, new_cruft) = mm.migrate_items_to_target_suite(
1353 selected, self.nuninst_orig, stop_on_first_regression=False
1354 )
1356 if recurse:
1357 # Ensure upgrade_me and selected do not overlap, if we
1358 # follow-up with a recurse ("hint"-hint).
1359 upgrade_me = [x for x in upgrade_me if x not in set(selected)]
1360 else:
1361 # On non-recursive hints check for cruft and purge it proactively in case it "fixes" the hint.
1362 cruft = [x for x in upgrade_me if x.is_cruft_removal]
1363 if new_cruft:
1364 output_logger.info(
1365 "Change added new cruft items to list: %s",
1366 " ".join(x.uvname for x in sorted(new_cruft)),
1367 )
1368 cruft.extend(new_cruft)
1369 if cruft:
1370 output_logger.info("Checking if changes enables cruft removal")
1371 (nuninst_end, remaining_cruft) = self.iter_packages(
1372 cruft, selected, nuninst=nuninst_end
1373 )
1374 output_logger.info(
1375 "Removed %d of %d cruft item(s) after the changes",
1376 len(cruft) - len(remaining_cruft),
1377 len(cruft),
1378 )
1379 new_cruft.difference_update(remaining_cruft)
1381 # Add new cruft items regardless of whether we recurse. A future run might clean
1382 # them for us.
1383 upgrade_me.extend(new_cruft)
1385 if recurse:
1386 # Either the main run or the recursive run of a "hint"-hint.
1387 (nuninst_end, extra) = self.iter_packages(
1388 upgrade_me, selected, nuninst=nuninst_end
1389 )
1391 assert nuninst_end is not None
1392 nuninst_end_str = self.eval_nuninst(nuninst_end)
1394 if not recurse:
1395 # easy or force-hint
1396 output_logger.info("easy: %s", nuninst_end_str)
1398 if not force:
1399 format_and_log_uninst(
1400 self.output_logger,
1401 self.options.architectures,
1402 newly_uninst(nuninst_start, nuninst_end),
1403 )
1405 if force:
1406 # Force implies "unconditionally better"
1407 better = True
1408 else:
1409 break_arches: set[str] = set(self.options.break_arches)
1410 if all(x.architecture in break_arches for x in selected):
1411 # If we only migrated items from break-arches, then we
1412 # do not allow any regressions on these architectures.
1413 # This usually only happens with hints
1414 break_arches = set()
1415 better = is_nuninst_asgood_generous(
1416 self.constraints,
1417 self.allow_uninst,
1418 self.options.architectures,
1419 self.nuninst_orig,
1420 nuninst_end,
1421 break_arches,
1422 )
1424 if better:
1425 # Result accepted either by force or by being better than the original result.
1426 output_logger.info(
1427 "final: %s", ",".join(sorted(x.uvname for x in selected))
1428 )
1429 output_logger.info("start: %s", self.eval_nuninst(nuninst_start))
1430 output_logger.info(" orig: %s", self.eval_nuninst(self.nuninst_orig))
1431 output_logger.info(" end: %s", nuninst_end_str)
1432 if force:
1433 broken = newly_uninst(nuninst_start, nuninst_end)
1434 if broken:
1435 output_logger.warning("force breaks:")
1436 format_and_log_uninst(
1437 self.output_logger,
1438 self.options.architectures,
1439 broken,
1440 loglevel=logging.WARNING,
1441 )
1442 else:
1443 output_logger.info("force did not break any packages")
1444 output_logger.info(
1445 "SUCCESS (%d/%d)", len(actions or self.upgrade_me), len(extra)
1446 )
1447 self.nuninst_orig = nuninst_end
1448 self.all_selected += selected
1449 if transaction:
1450 transaction.commit()
1451 if self.options.check_consistency_level >= 2: 1451 ↛ 1455line 1451 didn't jump to line 1455 because the condition on line 1451 was always true
1452 target_suite.check_suite_source_pkg_consistency(
1453 "do_all after commit"
1454 )
1455 if not actions:
1456 if recurse:
1457 self.upgrade_me = extra
1458 else:
1459 self.upgrade_me = [
1460 x for x in self.upgrade_me if x not in set(selected)
1461 ]
1462 else:
1463 output_logger.info("FAILED\n")
1464 if not transaction: 1464 ↛ 1468line 1464 didn't jump to line 1468 because the condition on line 1464 was never true
1465 # if we 'FAILED', but we cannot rollback, we will probably
1466 # leave a broken state behind
1467 # this should not happen
1468 raise AssertionError("do_all FAILED but no transaction to rollback")
1469 transaction.rollback()
1470 if self.options.check_consistency_level >= 2: 1470 ↛ 1349line 1470 didn't jump to line 1349
1471 target_suite.check_suite_source_pkg_consistency(
1472 "do_all after rollback"
1473 )
1475 output_logger.info("")
1477 def assert_nuninst_is_correct(self) -> None:
1478 self.logger.info("> Update complete - Verifying non-installability counters")
1480 cached_nuninst = self.nuninst_orig
1481 self._inst_tester.compute_installability()
1482 computed_nuninst = compile_nuninst(
1483 self.suite_info.target_suite,
1484 self.options.architectures,
1485 self.options.nobreakall_arches,
1486 )
1487 if cached_nuninst != computed_nuninst: # pragma: no cover
1488 only_on_break_archs = True
1489 msg_l = [
1490 "==================== NUNINST OUT OF SYNC ========================="
1491 ]
1492 for arch in self.options.architectures:
1493 expected_nuninst = set(cached_nuninst[arch])
1494 actual_nuninst = set(computed_nuninst[arch])
1495 false_negatives = actual_nuninst - expected_nuninst
1496 false_positives = expected_nuninst - actual_nuninst
1497 # Britney does not quite work correctly with
1498 # break/fucked arches, so ignore issues there for now.
1499 if (
1500 false_negatives or false_positives
1501 ) and arch not in self.options.break_arches:
1502 only_on_break_archs = False
1503 if false_negatives:
1504 msg_l.append(f" {arch} - unnoticed nuninst: {str(false_negatives)}")
1505 if false_positives:
1506 msg_l.append(f" {arch} - invalid nuninst: {str(false_positives)}")
1507 if false_negatives or false_positives:
1508 msg_l.append(
1509 f" {arch} - actual nuninst: {str(sorted(actual_nuninst))}"
1510 )
1511 msg_l.append(msg_l[0])
1512 for msg in msg_l:
1513 if only_on_break_archs:
1514 self.logger.warning(msg)
1515 else:
1516 self.logger.error(msg)
1517 if not only_on_break_archs:
1518 raise AssertionError("NUNINST OUT OF SYNC")
1519 else:
1520 self.logger.warning("Nuninst is out of sync on some break arches")
1522 self.logger.info("> All non-installability counters are ok")
1524 def upgrade_testing(self) -> None:
1525 """Upgrade testing using the packages from the source suites
1527 This method tries to upgrade testing using the packages from the
1528 source suites.
1529 Before running the do_all method, it tries the easy and force-hint
1530 commands.
1531 """
1533 output_logger = self.output_logger
1534 self.logger.info("Starting the upgrade test")
1535 output_logger.info(
1536 "Generated on: %s",
1537 time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())),
1538 )
1539 output_logger.info("Arch order is: %s", ", ".join(self.options.architectures))
1541 if not self.options.actions: 1541 ↛ 1552line 1541 didn't jump to line 1552 because the condition on line 1541 was always true
1542 # process `easy' hints
1543 for x in self.hints["easy"]:
1544 self.do_hint("easy", x.user, x.packages)
1546 # process `force-hint' hints
1547 for x in self.hints["force-hint"]:
1548 self.do_hint("force-hint", x.user, x.packages)
1550 # run the first round of the upgrade
1551 # - do separate runs for break arches
1552 allpackages = []
1553 normpackages = self.upgrade_me[:]
1554 archpackages = {}
1555 for a in self.options.break_arches:
1556 archpackages[a] = [p for p in normpackages if p.architecture == a]
1557 normpackages = [p for p in normpackages if p not in archpackages[a]]
1558 self.upgrade_me = normpackages
1559 output_logger.info("info: main run")
1560 self.do_all()
1561 allpackages += self.upgrade_me
1562 for a in self.options.break_arches:
1563 backup = self.options.break_arches
1564 self.options.break_arches = " ".join(
1565 x for x in self.options.break_arches if x != a
1566 )
1567 self.upgrade_me = archpackages[a]
1568 output_logger.info("info: broken arch run for %s", a)
1569 self.do_all()
1570 allpackages += self.upgrade_me
1571 self.options.break_arches = backup
1572 self.upgrade_me = allpackages
1574 if self.options.actions: 1574 ↛ 1575line 1574 didn't jump to line 1575 because the condition on line 1574 was never true
1575 self.printuninstchange()
1576 return
1578 # process `hint' hints
1579 hintcnt = 0
1580 for x in self.hints["hint"][:50]:
1581 if hintcnt > 50: 1581 ↛ 1582line 1581 didn't jump to line 1582 because the condition on line 1581 was never true
1582 output_logger.info("Skipping remaining hints...")
1583 break
1584 if self.do_hint("hint", x.user, x.packages): 1584 ↛ 1580line 1584 didn't jump to line 1580 because the condition on line 1584 was always true
1585 hintcnt += 1
1587 # run the auto hinter
1588 self.run_auto_hinter()
1590 if getattr(self.options, "remove_obsolete", "yes") == "yes":
1591 # obsolete source packages
1592 # a package is obsolete if none of the binary packages in testing
1593 # are built by it
1594 self.logger.info(
1595 "> Removing obsolete source packages from the target suite"
1596 )
1597 # local copies for performance
1598 target_suite = self.suite_info.target_suite
1599 sources_t = target_suite.sources
1600 binaries_t = target_suite.binaries
1601 mi_factory = self._migration_item_factory
1602 used = {
1603 binaries_t[arch][binary].source
1604 for arch in binaries_t
1605 for binary in binaries_t[arch]
1606 if not binary.endswith("-faux-build-depends")
1607 }
1608 removals = [
1609 mi_factory.parse_item(
1610 f"-{source}/{sources_t[source].version}", auto_correct=False
1611 )
1612 for source in sources_t
1613 if source not in used
1614 ]
1615 if removals:
1616 output_logger.info(
1617 "Removing obsolete source packages from the target suite (%d):",
1618 len(removals),
1619 )
1620 self.do_all(actions=removals)
1622 # smooth updates
1623 removals = old_libraries(
1624 self._migration_item_factory, self.suite_info, self.options.outofsync_arches
1625 )
1626 if removals:
1627 output_logger.info(
1628 "Removing packages left in the target suite (e.g. smooth updates or cruft)"
1629 )
1630 log_and_format_old_libraries(self.output_logger, removals)
1631 self.do_all(actions=removals)
1632 removals = old_libraries(
1633 self._migration_item_factory,
1634 self.suite_info,
1635 self.options.outofsync_arches,
1636 )
1638 output_logger.info(
1639 "List of old libraries in the target suite (%d):", len(removals)
1640 )
1641 log_and_format_old_libraries(self.output_logger, removals)
1643 self.printuninstchange()
1644 if self.options.check_consistency_level >= 1: 1644 ↛ 1650line 1644 didn't jump to line 1650 because the condition on line 1644 was always true
1645 target_suite = self.suite_info.target_suite
1646 self.assert_nuninst_is_correct()
1647 target_suite.check_suite_source_pkg_consistency("end")
1649 # output files
1650 if self.options.heidi_output and not self.options.dry_run: 1650 ↛ 1664line 1650 didn't jump to line 1664 because the condition on line 1650 was always true
1651 target_suite = self.suite_info.target_suite
1653 # write HeidiResult
1654 self.logger.info("Writing Heidi results to %s", self.options.heidi_output)
1655 write_heidi(
1656 self.options.heidi_output,
1657 target_suite,
1658 outofsync_arches=self.options.outofsync_arches,
1659 )
1661 self.logger.info("Writing delta to %s", self.options.heidi_delta_output)
1662 write_heidi_delta(self.options.heidi_delta_output, self.all_selected)
1664 self.logger.info("Test completed!")
1666 def printuninstchange(self) -> None:
1667 self.logger.info("Checking for newly uninstallable packages")
1668 uninst = newly_uninst(self.nuninst_orig_save, self.nuninst_orig)
1670 if uninst:
1671 self.output_logger.info("")
1672 self.output_logger.info(
1673 "Newly uninstallable packages in the target suite (arch:all on BREAKALL_ARCHES not shown)"
1674 )
1675 format_and_log_uninst(
1676 self.output_logger,
1677 self.options.architectures,
1678 uninst,
1679 loglevel=logging.WARNING,
1680 )
1682 def hint_tester(self) -> None:
1683 """Run a command line interface to test hints
1685 This method provides a command line interface for the release team to
1686 try hints and evaluate the results.
1687 """
1688 import readline
1690 from britney2.completer import Completer
1692 histfile = os.path.expanduser("~/.britney2_history")
1693 if os.path.exists(histfile):
1694 readline.read_history_file(histfile)
1696 readline.parse_and_bind("tab: complete")
1697 readline.set_completer(Completer(self).completer)
1698 # Package names can contain "-" and we use "/" in our presentation of them as well,
1699 # so ensure readline does not split on these characters.
1700 readline.set_completer_delims(
1701 readline.get_completer_delims().replace("-", "").replace("/", "")
1702 )
1704 known_hints = self._hint_parser.registered_hint_names
1706 print("Britney hint tester")
1707 print()
1708 print(
1709 "Besides inputting known britney hints, the follow commands are also available"
1710 )
1711 print(" * quit/exit - terminates the shell")
1712 print(
1713 " * python-console - jump into an interactive python shell (with the current loaded dataset)"
1714 )
1715 print()
1717 while True:
1718 # read the command from the command line
1719 try:
1720 user_input = input("britney> ").split()
1721 except EOFError:
1722 print("")
1723 break
1724 except KeyboardInterrupt:
1725 print("")
1726 continue
1727 # quit the hint tester
1728 if user_input and user_input[0] in ("quit", "exit"):
1729 break
1730 elif user_input and user_input[0] == "python-console":
1731 try:
1732 import britney2.console
1733 except ImportError as e:
1734 print("Failed to import britney.console module: %s" % repr(e))
1735 continue
1736 britney2.console.run_python_console(self)
1737 print("Returning to the britney hint-tester console")
1738 # run a hint
1739 elif user_input and user_input[0] in ("easy", "hint", "force-hint"):
1740 mi_factory = self._migration_item_factory
1741 try:
1742 self.do_hint(
1743 user_input[0],
1744 "hint-tester",
1745 mi_factory.parse_items(user_input[1:]),
1746 )
1747 self.printuninstchange()
1748 except KeyboardInterrupt:
1749 continue
1750 elif user_input and user_input[0] in known_hints:
1751 self._hint_parser.parse_hints(
1752 "hint-tester", self.HINTS_ALL, "<stdin>", [" ".join(user_input)]
1753 )
1754 self.write_excuses()
1756 try:
1757 readline.write_history_file(histfile)
1758 except OSError as e:
1759 self.logger.warning("Could not write %s: %s", histfile, e)
1761 def do_hint(self, hinttype: str, who: str, pkgvers: list[MigrationItem]) -> bool:
1762 """Process hints
1764 This method process `easy`, `hint` and `force-hint` hints. If the
1765 requested version is not in the relevant source suite, then the hint
1766 is skipped.
1767 """
1769 output_logger = self.output_logger
1771 self.logger.info("> Processing '%s' hint from %s", hinttype, who)
1772 output_logger.info(
1773 "Trying %s from %s: %s",
1774 hinttype,
1775 who,
1776 " ".join(f"{x.uvname}/{x.version}" for x in pkgvers),
1777 )
1779 issues = []
1780 # loop on the requested packages and versions
1781 for idx in range(len(pkgvers)):
1782 pkg = pkgvers[idx]
1783 # skip removal requests
1784 if pkg.is_removal:
1785 continue
1787 suite = pkg.suite
1789 assert pkg.version is not None
1790 if pkg.package not in suite.sources: 1790 ↛ 1791line 1790 didn't jump to line 1791 because the condition on line 1790 was never true
1791 issues.append(f"Source {pkg.package} has no version in {suite.name}")
1792 elif ( 1792 ↛ 1796line 1792 didn't jump to line 1796
1793 apt_pkg.version_compare(suite.sources[pkg.package].version, pkg.version)
1794 != 0
1795 ):
1796 issues.append(
1797 "Version mismatch, %s %s != %s"
1798 % (pkg.package, pkg.version, suite.sources[pkg.package].version)
1799 )
1800 if issues: 1800 ↛ 1801line 1800 didn't jump to line 1801 because the condition on line 1800 was never true
1801 output_logger.warning("%s: Not using hint", ", ".join(issues))
1802 return False
1804 self.do_all(hinttype, pkgvers)
1805 return True
1807 def get_auto_hinter_hints(
1808 self, upgrade_me: list[MigrationItem]
1809 ) -> list[list[frozenset[MigrationItem]]]:
1810 """Auto-generate "easy" hints.
1812 This method attempts to generate "easy" hints for sets of packages which
1813 must migrate together. Beginning with a package which does not depend on
1814 any other package (in terms of excuses), a list of dependencies and
1815 reverse dependencies is recursively created.
1817 Once all such lists have been generated, any which are subsets of other
1818 lists are ignored in favour of the larger lists. The remaining lists are
1819 then attempted in turn as "easy" hints.
1821 We also try to auto hint circular dependencies analyzing the update
1822 excuses relationships. If they build a circular dependency, which we already
1823 know as not-working with the standard do_all algorithm, try to `easy` them.
1824 """
1825 self.logger.info("> Processing hints from the auto hinter")
1827 sources_t = self.suite_info.target_suite.sources
1828 excuses = self.excuses
1830 def excuse_still_valid(excuse: "Excuse") -> bool:
1831 source = excuse.source
1832 assert isinstance(excuse.item, MigrationItem)
1833 arch = excuse.item.architecture
1834 # TODO for binNMUs, this check is always ok, even if the item
1835 # migrated already
1836 valid = (
1837 arch != "source"
1838 or source not in sources_t
1839 or sources_t[source].version != excuse.ver[1]
1840 )
1841 # TODO migrated items should be removed from upgrade_me, so this
1842 # should not happen
1843 if not valid: 1843 ↛ 1844line 1843 didn't jump to line 1844 because the condition on line 1843 was never true
1844 raise AssertionError("excuse no longer valid %s" % (item))
1845 return valid
1847 # consider only excuses which are valid candidates and still relevant.
1848 valid_excuses = frozenset(
1849 e.name
1850 for n, e in excuses.items()
1851 if e.item in upgrade_me and excuse_still_valid(e)
1852 )
1853 excuses_deps = {
1854 name: valid_excuses.intersection(excuse.get_deps())
1855 for name, excuse in excuses.items()
1856 if name in valid_excuses
1857 }
1858 excuses_rdeps = defaultdict(set)
1859 for name, deps in excuses_deps.items():
1860 for dep in deps:
1861 excuses_rdeps[dep].add(name)
1863 # loop on them
1864 candidates = []
1865 mincands = []
1866 seen_hints = set()
1867 for e in valid_excuses:
1868 excuse = excuses[e]
1869 if not excuse.get_deps():
1870 assert isinstance(excuse.item, MigrationItem)
1871 items = [excuse.item]
1872 orig_size = 1
1873 looped = False
1874 seen_items = set()
1875 seen_items.update(items)
1877 for item in items:
1878 assert isinstance(item, MigrationItem)
1879 # excuses which depend on "item" or are depended on by it
1880 new_items = cast(
1881 set[MigrationItem],
1882 {
1883 excuses[x].item
1884 for x in chain(
1885 excuses_deps[item.name], excuses_rdeps[item.name]
1886 )
1887 },
1888 )
1889 new_items -= seen_items
1890 items.extend(new_items)
1891 seen_items.update(new_items)
1893 if not looped and len(items) > 1:
1894 orig_size = len(items)
1895 h = frozenset(seen_items)
1896 if h not in seen_hints: 1896 ↛ 1899line 1896 didn't jump to line 1899 because the condition on line 1896 was always true
1897 mincands.append(h)
1898 seen_hints.add(h)
1899 looped = True
1900 if len(items) != orig_size: 1900 ↛ 1901line 1900 didn't jump to line 1901 because the condition on line 1900 was never true
1901 h = frozenset(seen_items)
1902 if h != mincands[-1] and h not in seen_hints:
1903 candidates.append(h)
1904 seen_hints.add(h)
1905 return [candidates, mincands]
1907 def run_auto_hinter(self) -> None:
1908 for lst in self.get_auto_hinter_hints(self.upgrade_me):
1909 for hint in lst:
1910 self.do_hint("easy", "autohinter", sorted(hint))
1912 def nuninst_arch_report(self, nuninst: dict[str, set[str]], arch: str) -> None:
1913 """Print a report of uninstallable packages for one architecture."""
1914 all = defaultdict(set)
1915 binaries_t = self.suite_info.target_suite.binaries
1916 for p in nuninst[arch]:
1917 pkg = binaries_t[arch][p]
1918 all[(pkg.source, pkg.source_version)].add(p)
1920 print("* %s" % arch)
1922 for (src, ver), pkgs in sorted(all.items()):
1923 print(" {} ({}): {}".format(src, ver, " ".join(sorted(pkgs))))
1925 print()
1927 def _remove_archall_faux_packages(self) -> None:
1928 """Remove faux packages added for the excuses phase
1930 To prevent binary packages from going missing while they are listed by
1931 their source package we add bin:faux packages during reading in the
1932 Sources. They are used during the excuses phase to prevent packages
1933 from becoming candidates. However, they interfere in complex ways
1934 during the installability phase, so instead of having all code during
1935 migration be aware of this excuses phase implementation detail, let's
1936 remove them again.
1938 """
1939 if not self.options.archall_inconsistency_allowed:
1940 all_binaries = self.all_binaries
1941 faux_a = {x for x in all_binaries.keys() if x[2] == "faux"}
1942 for pkg_a in faux_a:
1943 del all_binaries[pkg_a]
1945 for suite in self.suite_info._suites.values():
1946 for arch in suite.binaries.keys():
1947 binaries = suite.binaries[arch]
1948 faux_b = {x for x in binaries if binaries[x].pkg_id[2] == "faux"}
1949 for pkg_b in faux_b:
1950 del binaries[pkg_b]
1951 sources = suite.sources
1952 for src in sources.keys():
1953 faux_s = {x for x in sources[src].binaries if x[2] == "faux"}
1954 sources[src].binaries -= faux_s
1956 def main(self) -> None:
1957 """Main method
1959 This is the entry point for the class: it includes the list of calls
1960 for the member methods which will produce the output files.
1961 """
1962 # if running in --print-uninst mode, quit
1963 if self.options.print_uninst: 1963 ↛ 1964line 1963 didn't jump to line 1964 because the condition on line 1963 was never true
1964 return
1965 # if no actions are provided, build the excuses and sort them
1966 elif not self.options.actions: 1966 ↛ 1970line 1966 didn't jump to line 1970 because the condition on line 1966 was always true
1967 self.write_excuses()
1968 # otherwise, use the actions provided by the command line
1969 else:
1970 self.upgrade_me = self.options.actions.split()
1972 self._remove_archall_faux_packages()
1974 if self.options.compute_migrations or self.options.hint_tester:
1975 if self.options.dry_run: 1975 ↛ 1976line 1975 didn't jump to line 1976 because the condition on line 1975 was never true
1976 self.logger.info(
1977 "Upgrade output not (also) written to a separate file"
1978 " as this is a dry-run."
1979 )
1980 elif hasattr(self.options, "upgrade_output"): 1980 ↛ 1990line 1980 didn't jump to line 1990 because the condition on line 1980 was always true
1981 upgrade_output = getattr(self.options, "upgrade_output")
1982 file_handler = logging.FileHandler(
1983 upgrade_output, mode="w", encoding="utf-8"
1984 )
1985 output_formatter = logging.Formatter("%(message)s")
1986 file_handler.setFormatter(output_formatter)
1987 self.output_logger.addHandler(file_handler)
1988 self.logger.info("Logging upgrade output to %s", upgrade_output)
1989 else:
1990 self.logger.info(
1991 "Upgrade output not (also) written to a separate file"
1992 " as the UPGRADE_OUTPUT configuration is not provided."
1993 )
1995 # run the hint tester
1996 if self.options.hint_tester: 1996 ↛ 1997line 1996 didn't jump to line 1997 because the condition on line 1996 was never true
1997 self.hint_tester()
1998 # run the upgrade test
1999 else:
2000 self.upgrade_testing()
2002 self.logger.info("> Stats from the installability tester")
2003 for stat in self._inst_tester.stats.stats():
2004 self.logger.info("> %s", stat)
2005 else:
2006 self.logger.info("Migration computation skipped as requested.")
2007 if not self.options.dry_run: 2007 ↛ 2009line 2007 didn't jump to line 2009 because the condition on line 2007 was always true
2008 self._policy_engine.save_state(self)
2009 logging.shutdown()
2012if __name__ == "__main__": 2012 ↛ 2013line 2012 didn't jump to line 2013 because the condition on line 2012 was never true
2013 Britney().main()