Coverage for britney2/britney.py: 84%
768 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-08-23 07:57 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-08-23 07:57 +0000
1#!/usr/bin/python3 -u
2# -*- coding: utf-8 -*-
4# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org>
5# Andreas Barth <aba@debian.org>
6# Fabio Tranchitella <kobold@debian.org>
7# Copyright (C) 2010-2013 Adam D. Barratt <adsb@debian.org>
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
19"""
20= Introduction =
22This is the Debian testing updater script, also known as "Britney".
24Packages are usually installed into the `testing' distribution after
25they have undergone some degree of testing in unstable. The goal of
26this software is to do this task in a smart way, allowing testing
27to always be fully installable and close to being a release candidate.
29Britney's source code is split between two different but related tasks:
30the first one is the generation of the update excuses, while the
31second tries to update testing with the valid candidates; first
32each package alone, then larger and even larger sets of packages
33together. Each try is accepted if testing is not more uninstallable
34after the update than before.
36= Data Loading =
38In order to analyze the entire Debian distribution, Britney needs to
39load in memory the whole archive: this means more than 10.000 packages
40for twelve architectures, as well as the dependency interconnections
41between them. For this reason, the memory requirements for running this
42software are quite high and at least 1 gigabyte of RAM should be available.
44Britney loads the source packages from the `Sources' file and the binary
45packages from the `Packages_${arch}' files, where ${arch} is substituted
46with the supported architectures. While loading the data, the software
47analyzes the dependencies and builds a directed weighted graph in memory
48with all the interconnections between the packages (see Britney.read_sources
49and Britney.read_binaries).
51Other than source and binary packages, Britney loads the following data:
53 * BugsV, which contains the list of release-critical bugs for a given
54 version of a source or binary package (see RCBugPolicy.read_bugs).
56 * Dates, which contains the date of the upload of a given version
57 of a source package (see Britney.read_dates).
59 * Urgencies, which contains the urgency of the upload of a given
60 version of a source package (see AgePolicy._read_urgencies).
62 * Hints, which contains lists of commands which modify the standard behaviour
63 of Britney (see Britney.read_hints).
65 * Other policies typically require their own data.
67For a more detailed explanation about the format of these files, please read
68the documentation of the related methods. The exact meaning of them will be
69instead explained in the chapter "Excuses Generation".
71= Excuses =
73An excuse is a detailed explanation of why a package can or cannot
74be updated in the testing distribution from a newer package in
75another distribution (like for example unstable). The main purpose
76of the excuses is to be written in an HTML file which will be
77published over HTTP, as well as a YAML file. The maintainers will be able
78to parse it manually or automatically to find the explanation of why their
79packages have been updated or not.
81== Excuses generation ==
83These are the steps (with references to method names) that Britney
84does for the generation of the update excuses.
86 * If a source package is available in testing but it is not
87 present in unstable and no binary packages in unstable are
88 built from it, then it is marked for removal.
90 * Every source package in unstable and testing-proposed-updates,
91 if already present in testing, is checked for binary-NMUs, new
92 or dropped binary packages in all the supported architectures
93 (see Britney.should_upgrade_srcarch). The steps to detect if an
94 upgrade is needed are:
96 1. If there is a `remove' hint for the source package, the package
97 is ignored: it will be removed and not updated.
99 2. For every binary package built from the new source, it checks
100 for unsatisfied dependencies, new binary packages and updated
101 binary packages (binNMU), excluding the architecture-independent
102 ones, and packages not built from the same source.
104 3. For every binary package built from the old source, it checks
105 if it is still built from the new source; if this is not true
106 and the package is not architecture-independent, the script
107 removes it from testing.
109 4. Finally, if there is something worth doing (eg. a new or updated
110 binary package) and nothing wrong it marks the source package
111 as "Valid candidate", or "Not considered" if there is something
112 wrong which prevented the update.
114 * Every source package in unstable and testing-proposed-updates is
115 checked for upgrade (see Britney.should_upgrade_src). The steps
116 to detect if an upgrade is needed are:
118 1. If the source package in testing is more recent the new one
119 is ignored.
121 2. If the source package doesn't exist (is fake), which means that
122 a binary package refers to it but it is not present in the
123 `Sources' file, the new one is ignored.
125 3. If the package doesn't exist in testing, the urgency of the
126 upload is ignored and set to the default (actually `low').
128 4. If there is a `remove' hint for the source package, the package
129 is ignored: it will be removed and not updated.
131 5. If there is a `block' hint for the source package without an
132 `unblock` hint or a `block-all source`, the package is ignored.
134 6. If there is a `block-udeb' hint for the source package, it will
135 have the same effect as `block', but may only be cancelled by
136 a subsequent `unblock-udeb' hint.
138 7. If the suite is unstable, the update can go ahead only if the
139 upload happened more than the minimum days specified by the
140 urgency of the upload; if this is not true, the package is
141 ignored as `too-young'. Note that the urgency is sticky, meaning
142 that the highest urgency uploaded since the previous testing
143 transition is taken into account.
145 8. If the suite is unstable, all the architecture-dependent binary
146 packages and the architecture-independent ones for the `nobreakall'
147 architectures have to be built from the source we are considering.
148 If this is not true, then these are called `out-of-date'
149 architectures and the package is ignored.
151 9. The source package must have at least one binary package, otherwise
152 it is ignored.
154 10. If the suite is unstable, the new source package must have no
155 release critical bugs which do not also apply to the testing
156 one. If this is not true, the package is ignored as `buggy'.
158 11. If there is a `force' hint for the source package, then it is
159 updated even if it is marked as ignored from the previous steps.
161 12. If the suite is {testing-,}proposed-updates, the source package can
162 be updated only if there is an explicit approval for it. Unless
163 a `force' hint exists, the new package must also be available
164 on all of the architectures for which it has binary packages in
165 testing.
167 13. If the package will be ignored, mark it as "Valid candidate",
168 otherwise mark it as "Not considered".
170 * The list of `remove' hints is processed: if the requested source
171 package is not already being updated or removed and the version
172 actually in testing is the same specified with the `remove' hint,
173 it is marked for removal.
175 * The excuses are sorted by the number of days from the last upload
176 (days-old) and by name.
178 * A list of unconsidered excuses (for which the package is not upgraded)
179 is built. Using this list, all of the excuses depending on them are
180 marked as invalid "impossible dependencies".
182 * The excuses are written in an HTML file.
183"""
184import contextlib
185import logging
186import optparse
187import os
188import sys
189import time
190from collections import defaultdict
191from functools import reduce
192from itertools import chain
193from operator import attrgetter
194from typing import TYPE_CHECKING, Any, Optional, cast
195from collections.abc import Iterator
197import apt_pkg
199from britney2 import BinaryPackage, BinaryPackageId, SourcePackage, Suites
200from britney2.excusefinder import ExcuseFinder
201from britney2.hints import Hint, HintCollection, HintParser
202from britney2.inputs.suiteloader import (
203 DebMirrorLikeSuiteContentLoader,
204 MissingRequiredConfigurationError,
205)
206from britney2.installability.builder import build_installability_tester
207from britney2.installability.solver import InstallabilitySolver
208from britney2.migration import MigrationManager
209from britney2.migrationitem import MigrationItem, MigrationItemFactory
210from britney2.policies.autopkgtest import AutopkgtestPolicy
211from britney2.policies.lintian import LintianPolicy
212from britney2.policies.policy import (
213 AgePolicy,
214 BlockPolicy,
215 BuildDependsPolicy,
216 BuiltOnBuilddPolicy,
217 BuiltUsingPolicy,
218 DependsPolicy,
219 ImplicitDependencyPolicy,
220 PiupartsPolicy,
221 PolicyEngine,
222 PolicyLoadRequest,
223 RCBugPolicy,
224 ReproduciblePolicy,
225 ReverseRemovalPolicy,
226)
227from britney2.utils import (
228 MigrationConstraintException,
229 clone_nuninst,
230 compile_nuninst,
231 format_and_log_uninst,
232 is_nuninst_asgood_generous,
233 log_and_format_old_libraries,
234 newly_uninst,
235 old_libraries,
236 parse_option,
237 parse_provides,
238 read_nuninst,
239 write_excuses,
240 write_heidi,
241 write_heidi_delta,
242 write_nuninst,
243)
245if TYPE_CHECKING: 245 ↛ 246line 245 didn't jump to line 246, because the condition on line 245 was never true
246 from .excuse import Excuse
247 from .installability.tester import InstallabilityTester
248 from .installability.universe import BinaryPackageUniverse
249 from .transaction import MigrationTransactionState
252__author__ = "Fabio Tranchitella and the Debian Release Team"
253__version__ = "2.0"
256MIGRATION_POLICIES = [
257 PolicyLoadRequest.always_load(DependsPolicy),
258 PolicyLoadRequest.conditionally_load(RCBugPolicy, "rcbug_enable", True),
259 PolicyLoadRequest.conditionally_load(PiupartsPolicy, "piuparts_enable", True),
260 PolicyLoadRequest.always_load(ImplicitDependencyPolicy),
261 PolicyLoadRequest.conditionally_load(AutopkgtestPolicy, "adt_enable", True),
262 PolicyLoadRequest.conditionally_load(LintianPolicy, "lintian_enable", False),
263 PolicyLoadRequest.conditionally_load(ReproduciblePolicy, "repro_enable", False),
264 PolicyLoadRequest.conditionally_load(AgePolicy, "age_enable", True),
265 PolicyLoadRequest.always_load(BuildDependsPolicy),
266 PolicyLoadRequest.always_load(BlockPolicy),
267 PolicyLoadRequest.conditionally_load(
268 BuiltUsingPolicy, "built_using_policy_enable", True
269 ),
270 PolicyLoadRequest.conditionally_load(BuiltOnBuilddPolicy, "check_buildd", False),
271 PolicyLoadRequest.always_load(ReverseRemovalPolicy),
272]
275class Britney(object):
276 """Britney, the Debian testing updater script
278 This is the script that updates the testing distribution. It is executed
279 each day after the installation of the updated packages. It generates the
280 `Packages' files for the testing distribution, but it does so in an
281 intelligent manner; it tries to avoid any inconsistency and to use only
282 non-buggy packages.
284 For more documentation on this script, please read the Developers Reference.
285 """
287 HINTS_HELPERS = (
288 "easy",
289 "hint",
290 "remove",
291 "block",
292 "block-udeb",
293 "unblock",
294 "unblock-udeb",
295 "approve",
296 "remark",
297 "ignore-piuparts",
298 "ignore-rc-bugs",
299 "force-skiptest",
300 "force-badtest",
301 )
302 HINTS_STANDARD = ("urgent", "age-days") + HINTS_HELPERS
303 # ALL = {"force", "force-hint", "block-all"} | HINTS_STANDARD | registered policy hints (not covered above)
304 HINTS_ALL = "ALL"
305 pkg_universe: "BinaryPackageUniverse"
306 _inst_tester: "InstallabilityTester"
307 constraints: dict[str, list[str]]
308 suite_info: Suites
310 def __init__(self) -> None:
311 """Class constructor
313 This method initializes and populates the data lists, which contain all
314 the information needed by the other methods of the class.
315 """
317 # setup logging - provide the "short level name" (i.e. INFO -> I) that
318 # we used to use prior to using the logging module.
320 old_factory = logging.getLogRecordFactory()
321 short_level_mapping = {
322 "CRITICAL": "F",
323 "INFO": "I",
324 "WARNING": "W",
325 "ERROR": "E",
326 "DEBUG": "N",
327 }
329 def record_factory(
330 *args: Any, **kwargs: Any
331 ) -> logging.LogRecord: # pragma: no cover
332 record = old_factory(*args, **kwargs)
333 try:
334 record.shortlevelname = short_level_mapping[record.levelname]
335 except KeyError:
336 record.shortlevelname = record.levelname
337 return record
339 logging.setLogRecordFactory(record_factory)
340 logging.basicConfig(
341 format="{shortlevelname}: [{asctime}] - {message}",
342 style="{",
343 datefmt="%Y-%m-%dT%H:%M:%S%z",
344 stream=sys.stdout,
345 )
347 self.logger = logging.getLogger()
349 # Logger for "upgrade_output"; the file handler will be attached later when
350 # we are ready to open the file.
351 self.output_logger = logging.getLogger("britney2.output.upgrade_output")
352 self.output_logger.setLevel(logging.INFO)
354 # initialize the apt_pkg back-end
355 apt_pkg.init()
357 # parse the command line arguments
358 self._policy_engine = PolicyEngine()
359 self.__parse_arguments()
360 assert self.suite_info is not None # for type checking
362 self.all_selected: list[MigrationItem] = []
363 self.excuses: dict[str, "Excuse"] = {}
364 self.upgrade_me: list[MigrationItem] = []
366 if self.options.nuninst_cache: 366 ↛ 367line 366 didn't jump to line 367, because the condition on line 366 was never true
367 self.logger.info(
368 "Not building the list of non-installable packages, as requested"
369 )
370 if self.options.print_uninst:
371 nuninst = read_nuninst(
372 self.options.noninst_status, self.options.architectures
373 )
374 print("* summary")
375 print(
376 "\n".join(
377 "%4d %s" % (len(nuninst[x]), x)
378 for x in self.options.architectures
379 )
380 )
381 return
383 try:
384 constraints_file = os.path.join(
385 self.options.static_input_dir, "constraints"
386 )
387 faux_packages = os.path.join(self.options.static_input_dir, "faux-packages")
388 except AttributeError:
389 self.logger.info("The static_input_dir option is not set")
390 constraints_file = None
391 faux_packages = None
392 if faux_packages is not None and os.path.exists(faux_packages):
393 self.logger.info("Loading faux packages from %s", faux_packages)
394 self._load_faux_packages(faux_packages)
395 elif faux_packages is not None: 395 ↛ 398line 395 didn't jump to line 398, because the condition on line 395 was never false
396 self.logger.info("No Faux packages as %s does not exist", faux_packages)
398 if constraints_file is not None and os.path.exists(constraints_file):
399 self.logger.info("Loading constraints from %s", constraints_file)
400 self.constraints = self._load_constraints(constraints_file)
401 else:
402 if constraints_file is not None: 402 ↛ 406line 402 didn't jump to line 406
403 self.logger.info(
404 "No constraints as %s does not exist", constraints_file
405 )
406 self.constraints = {
407 "keep-installable": [],
408 }
410 self.logger.info("Compiling Installability tester")
411 self.pkg_universe, self._inst_tester = build_installability_tester(
412 self.suite_info, self.options.architectures
413 )
414 target_suite = self.suite_info.target_suite
415 target_suite.inst_tester = self._inst_tester
417 self.allow_uninst: dict[str, set[Optional[str]]] = {}
418 for arch in self.options.architectures:
419 self.allow_uninst[arch] = set()
420 self._migration_item_factory: MigrationItemFactory = MigrationItemFactory(
421 self.suite_info
422 )
423 self._hint_parser: HintParser = HintParser(self._migration_item_factory)
424 self._migration_manager: MigrationManager = MigrationManager(
425 self.options,
426 self.suite_info,
427 self.all_binaries,
428 self.pkg_universe,
429 self.constraints,
430 self.allow_uninst,
431 self._migration_item_factory,
432 self.hints,
433 )
435 if not self.options.nuninst_cache: 435 ↛ 475line 435 didn't jump to line 475, because the condition on line 435 was never false
436 self.logger.info(
437 "Building the list of non-installable packages for the full archive"
438 )
439 self._inst_tester.compute_installability()
440 nuninst = compile_nuninst(
441 target_suite, self.options.architectures, self.options.nobreakall_arches
442 )
443 self.nuninst_orig: dict[str, set[str]] = nuninst
444 for arch in self.options.architectures:
445 self.logger.info(
446 "> Found %d non-installable packages for %s",
447 len(nuninst[arch]),
448 arch,
449 )
450 if self.options.print_uninst: 450 ↛ 451line 450 didn't jump to line 451, because the condition on line 450 was never true
451 self.nuninst_arch_report(nuninst, arch)
453 if self.options.print_uninst: 453 ↛ 454line 453 didn't jump to line 454, because the condition on line 453 was never true
454 print("* summary")
455 print(
456 "\n".join(
457 map(
458 lambda x: "%4d %s" % (len(nuninst[x]), x),
459 self.options.architectures,
460 )
461 )
462 )
463 return
464 else:
465 write_nuninst(self.options.noninst_status, nuninst)
467 stats = self._inst_tester.compute_stats()
468 self.logger.info("> Installability tester statistics (per architecture)")
469 for arch in self.options.architectures:
470 arch_stat = stats[arch]
471 self.logger.info("> %s", arch)
472 for stat in arch_stat.stat_summary():
473 self.logger.info("> - %s", stat)
474 else:
475 self.logger.info("Loading uninstallability counters from cache")
476 self.nuninst_orig = read_nuninst(
477 self.options.noninst_status, self.options.architectures
478 )
480 # nuninst_orig may get updated during the upgrade process
481 self.nuninst_orig_save: dict[str, set[str]] = clone_nuninst(
482 self.nuninst_orig, architectures=self.options.architectures
483 )
485 self._policy_engine.register_policy_hints(self._hint_parser)
487 try:
488 self.read_hints(self.options.hintsdir)
489 except AttributeError:
490 self.read_hints(os.path.join(self.suite_info["unstable"].path, "Hints"))
492 self._policy_engine.initialise(self, self.hints)
494 def __parse_arguments(self) -> None:
495 """Parse the command line arguments
497 This method parses and initializes the command line arguments.
498 While doing so, it preprocesses some of the options to be converted
499 in a suitable form for the other methods of the class.
500 """
501 # initialize the parser
502 parser = optparse.OptionParser(version="%prog")
503 parser.add_option(
504 "-v", "", action="count", dest="verbose", help="enable verbose output"
505 )
506 parser.add_option(
507 "-c",
508 "--config",
509 action="store",
510 dest="config",
511 default="/etc/britney.conf",
512 help="path for the configuration file",
513 )
514 parser.add_option(
515 "",
516 "--architectures",
517 action="store",
518 dest="architectures",
519 default=None,
520 help="override architectures from configuration file",
521 )
522 parser.add_option(
523 "",
524 "--actions",
525 action="store",
526 dest="actions",
527 default=None,
528 help="override the list of actions to be performed",
529 )
530 parser.add_option(
531 "",
532 "--hints",
533 action="store",
534 dest="hints",
535 default=None,
536 help="additional hints, separated by semicolons",
537 )
538 parser.add_option(
539 "",
540 "--hint-tester",
541 action="store_true",
542 dest="hint_tester",
543 default=None,
544 help="provide a command line interface to test hints",
545 )
546 parser.add_option(
547 "",
548 "--dry-run",
549 action="store_true",
550 dest="dry_run",
551 default=False,
552 help="disable all outputs to the testing directory",
553 )
554 parser.add_option(
555 "",
556 "--nuninst-cache",
557 action="store_true",
558 dest="nuninst_cache",
559 default=False,
560 help="do not build the non-installability status, use the cache from file",
561 )
562 parser.add_option(
563 "",
564 "--print-uninst",
565 action="store_true",
566 dest="print_uninst",
567 default=False,
568 help="just print a summary of uninstallable packages",
569 )
570 parser.add_option(
571 "",
572 "--compute-migrations",
573 action="store_true",
574 dest="compute_migrations",
575 default=True,
576 help="Compute which packages can migrate (the default)",
577 )
578 parser.add_option(
579 "",
580 "--no-compute-migrations",
581 action="store_false",
582 dest="compute_migrations",
583 help="Do not compute which packages can migrate.",
584 )
585 parser.add_option(
586 "",
587 "--series",
588 action="store",
589 dest="series",
590 default="",
591 help="set distribution series name",
592 )
593 parser.add_option(
594 "",
595 "--distribution",
596 action="store",
597 dest="distribution",
598 default="debian",
599 help="set distribution name",
600 )
601 (self.options, self.args) = parser.parse_args()
603 if self.options.verbose: 603 ↛ 609line 603 didn't jump to line 609, because the condition on line 603 was never false
604 if self.options.verbose > 1: 604 ↛ 605line 604 didn't jump to line 605, because the condition on line 604 was never true
605 self.logger.setLevel(logging.DEBUG)
606 else:
607 self.logger.setLevel(logging.INFO)
608 else:
609 self.logger.setLevel(logging.WARNING)
610 # Historical way to get debug information (equivalent to -vv)
611 try: # pragma: no cover
612 if int(os.environ.get("BRITNEY_DEBUG", "0")):
613 self.logger.setLevel(logging.DEBUG)
614 except ValueError: # pragma: no cover
615 pass
617 # integrity checks
618 if self.options.nuninst_cache and self.options.print_uninst: # pragma: no cover
619 self.logger.error("nuninst_cache and print_uninst are mutually exclusive!")
620 sys.exit(1)
622 # if the configuration file exists, then read it and set the additional options
623 if not os.path.isfile(self.options.config): # pragma: no cover
624 self.logger.error(
625 "Unable to read the configuration file (%s), exiting!",
626 self.options.config,
627 )
628 sys.exit(1)
630 self.HINTS: dict[str, Any] = {"command-line": self.HINTS_ALL}
631 with open(self.options.config, encoding="utf-8") as config:
632 for line in config:
633 if "=" in line and not line.strip().startswith("#"):
634 k, v = line.split("=", 1)
635 k = k.strip()
636 v = v.strip()
637 if k.startswith("HINTS_"):
638 self.HINTS[k.split("_")[1].lower()] = reduce(
639 lambda x, y: x + y,
640 [
641 hasattr(self, "HINTS_" + i)
642 and getattr(self, "HINTS_" + i)
643 or (i,)
644 for i in v.split()
645 ],
646 )
647 elif not hasattr(self.options, k.lower()) or not getattr(
648 self.options, k.lower()
649 ):
650 setattr(self.options, k.lower(), v)
652 parse_option(self.options, "archall_inconsistency_allowed", to_bool=True)
654 suite_loader = DebMirrorLikeSuiteContentLoader(self.options)
656 try:
657 self.suite_info = suite_loader.load_suites()
658 except MissingRequiredConfigurationError as e: # pragma: no cover
659 self.logger.error(
660 "Could not load the suite content due to missing configuration: %s",
661 str(e),
662 )
663 sys.exit(1)
664 self.all_binaries = suite_loader.all_binaries()
665 self.options.components = suite_loader.components
666 self.options.architectures = suite_loader.architectures
667 self.options.nobreakall_arches = suite_loader.nobreakall_arches
668 self.options.outofsync_arches = suite_loader.outofsync_arches
669 self.options.break_arches = suite_loader.break_arches
670 self.options.new_arches = suite_loader.new_arches
671 if self.options.series == "": 671 ↛ 674line 671 didn't jump to line 674, because the condition on line 671 was never false
672 self.options.series = self.suite_info.target_suite.name
674 if self.options.heidi_output and not hasattr( 674 ↛ 679line 674 didn't jump to line 679, because the condition on line 674 was never false
675 self.options, "heidi_delta_output"
676 ):
677 self.options.heidi_delta_output = self.options.heidi_output + "Delta"
679 self.options.smooth_updates = self.options.smooth_updates.split()
681 parse_option(self.options, "ignore_cruft", to_bool=True)
682 parse_option(self.options, "check_consistency_level", default=2, to_int=True)
683 parse_option(self.options, "build_url")
685 self._policy_engine.load_policies(
686 self.options, self.suite_info, MIGRATION_POLICIES
687 )
689 @property
690 def hints(self) -> HintCollection:
691 return self._hint_parser.hints
693 def _load_faux_packages(self, faux_packages_file: str) -> None:
694 """Loads fake packages
696 In rare cases, it is useful to create a "fake" package that can be used to satisfy
697 dependencies. This is usually needed for packages that are not shipped directly
698 on this mirror but is a prerequisite for using this mirror (e.g. some vendors provide
699 non-distributable "setup" packages and contrib/non-free packages depend on these).
701 :param faux_packages_file: Path to the file containing the fake package definitions
702 """
703 tag_file = apt_pkg.TagFile(faux_packages_file)
704 get_field = tag_file.section.get
705 step = tag_file.step
706 no = 0
707 pri_source_suite = self.suite_info.primary_source_suite
708 target_suite = self.suite_info.target_suite
710 while step():
711 no += 1
712 pkg_name = get_field("Package", None)
713 if pkg_name is None: # pragma: no cover
714 raise ValueError(
715 "Missing Package field in paragraph %d (file %s)"
716 % (no, faux_packages_file)
717 )
718 pkg_name = sys.intern(pkg_name)
719 version = sys.intern(get_field("Version", "1.0-1"))
720 provides_raw = get_field("Provides")
721 archs_raw = get_field("Architecture", None)
722 component = get_field("Component", "non-free")
723 if archs_raw: 723 ↛ 724line 723 didn't jump to line 724, because the condition on line 723 was never true
724 archs = archs_raw.split()
725 else:
726 archs = self.options.architectures
727 faux_section = "faux"
728 if component != "main": 728 ↛ 730line 728 didn't jump to line 730, because the condition on line 728 was never false
729 faux_section = "%s/faux" % component
730 src_data = SourcePackage(
731 pkg_name,
732 version,
733 sys.intern(faux_section),
734 set(),
735 None,
736 True,
737 None,
738 None,
739 [],
740 [],
741 )
743 target_suite.sources[pkg_name] = src_data
744 pri_source_suite.sources[pkg_name] = src_data
746 for arch in archs:
747 pkg_id = BinaryPackageId(pkg_name, version, arch)
748 if provides_raw: 748 ↛ 749line 748 didn't jump to line 749, because the condition on line 748 was never true
749 provides = parse_provides(
750 provides_raw, pkg_id=pkg_id, logger=self.logger
751 )
752 else:
753 provides = []
754 bin_data = BinaryPackage(
755 version,
756 faux_section,
757 pkg_name,
758 version,
759 arch,
760 get_field("Multi-Arch"),
761 None,
762 None,
763 provides,
764 False,
765 pkg_id,
766 [],
767 )
769 src_data.binaries.add(pkg_id)
770 target_suite.binaries[arch][pkg_name] = bin_data
771 pri_source_suite.binaries[arch][pkg_name] = bin_data
773 # register provided packages with the target suite provides table
774 for provided_pkg, provided_version, _ in bin_data.provides: 774 ↛ 775line 774 didn't jump to line 775, because the loop on line 774 never started
775 target_suite.provides_table[arch][provided_pkg].add(
776 (pkg_name, provided_version)
777 )
779 self.all_binaries[pkg_id] = bin_data
781 def _load_constraints(self, constraints_file: str) -> dict[str, list[str]]:
782 """Loads configurable constraints
784 The constraints file can contain extra rules that Britney should attempt
785 to satisfy. Examples can be "keep package X in testing and ensure it is
786 installable".
788 :param constraints_file: Path to the file containing the constraints
789 """
790 tag_file = apt_pkg.TagFile(constraints_file)
791 get_field = tag_file.section.get
792 step = tag_file.step
793 no = 0
794 faux_version = sys.intern("1")
795 faux_section = sys.intern("faux")
796 keep_installable: list[str] = []
797 constraints = {"keep-installable": keep_installable}
798 pri_source_suite = self.suite_info.primary_source_suite
799 target_suite = self.suite_info.target_suite
801 while step():
802 no += 1
803 pkg_name = get_field("Fake-Package-Name", None)
804 if pkg_name is None: # pragma: no cover
805 raise ValueError(
806 "Missing Fake-Package-Name field in paragraph %d (file %s)"
807 % (no, constraints_file)
808 )
809 pkg_name = sys.intern(pkg_name)
811 def mandatory_field(x: str) -> str:
812 v: str = get_field(x, None)
813 if v is None: # pragma: no cover
814 raise ValueError(
815 "Missing %s field for %s (file %s)"
816 % (x, pkg_name, constraints_file)
817 )
818 return v
820 constraint = mandatory_field("Constraint")
821 if constraint not in {"present-and-installable"}: # pragma: no cover
822 raise ValueError(
823 "Unsupported constraint %s for %s (file %s)"
824 % (constraint, pkg_name, constraints_file)
825 )
827 self.logger.info(" - constraint %s", pkg_name)
829 pkg_list = [
830 x.strip()
831 for x in mandatory_field("Package-List").split("\n")
832 if x.strip() != "" and not x.strip().startswith("#")
833 ]
834 src_data = SourcePackage(
835 pkg_name,
836 faux_version,
837 faux_section,
838 set(),
839 None,
840 True,
841 None,
842 None,
843 [],
844 [],
845 )
846 target_suite.sources[pkg_name] = src_data
847 pri_source_suite.sources[pkg_name] = src_data
848 keep_installable.append(pkg_name)
849 for arch in self.options.architectures:
850 deps = []
851 for pkg_spec in pkg_list:
852 s = pkg_spec.split(None, 1)
853 if len(s) == 1:
854 deps.append(s[0])
855 else:
856 pkg, arch_res = s
857 if not (
858 arch_res.startswith("[") and arch_res.endswith("]")
859 ): # pragma: no cover
860 raise ValueError(
861 "Invalid arch-restriction on %s - should be [arch1 arch2] (for %s file %s)"
862 % (pkg, pkg_name, constraints_file)
863 )
864 arch_res_l = arch_res[1:-1].split()
865 if not arch_res_l: # pragma: no cover
866 msg = "Empty arch-restriction for %s: Uses comma or negation (for %s file %s)"
867 raise ValueError(msg % (pkg, pkg_name, constraints_file))
868 for a in arch_res_l:
869 if a == arch:
870 deps.append(pkg)
871 elif "," in a or "!" in a: # pragma: no cover
872 msg = "Invalid arch-restriction for %s: Uses comma or negation (for %s file %s)"
873 raise ValueError(
874 msg % (pkg, pkg_name, constraints_file)
875 )
876 pkg_id = BinaryPackageId(pkg_name, faux_version, arch)
877 bin_data = BinaryPackage(
878 faux_version,
879 faux_section,
880 pkg_name,
881 faux_version,
882 arch,
883 "no",
884 ", ".join(deps),
885 None,
886 [],
887 False,
888 pkg_id,
889 [],
890 )
891 src_data.binaries.add(pkg_id)
892 target_suite.binaries[arch][pkg_name] = bin_data
893 pri_source_suite.binaries[arch][pkg_name] = bin_data
894 self.all_binaries[pkg_id] = bin_data
896 return constraints
898 # Data reading/writing methods
899 # ----------------------------
901 def read_hints(self, hintsdir: str) -> None:
902 """Read the hint commands from the specified directory
904 The hint commands are read from the files contained in the directory
905 specified by the `hintsdir' parameter.
906 The names of the files have to be the same as the authorized users
907 for the hints.
909 The file contains rows with the format:
911 <command> <package-name>[/<version>]
913 The method returns a dictionary where the key is the command, and
914 the value is the list of affected packages.
915 """
917 for who in self.HINTS.keys():
918 if who == "command-line":
919 lines = self.options.hints and self.options.hints.split(";") or ()
920 filename = "<cmd-line>"
921 self._hint_parser.parse_hints(who, self.HINTS[who], filename, lines)
922 else:
923 filename = os.path.join(hintsdir, who)
924 if not os.path.isfile(filename): 924 ↛ 925line 924 didn't jump to line 925, because the condition on line 924 was never true
925 self.logger.error(
926 "Cannot read hints list from %s, no such file!", filename
927 )
928 continue
929 self.logger.info("Loading hints list from %s", filename)
930 with open(filename, encoding="utf-8") as f:
931 self._hint_parser.parse_hints(who, self.HINTS[who], filename, f)
933 hints = self._hint_parser.hints
935 for x in [
936 "block",
937 "block-all",
938 "block-udeb",
939 "unblock",
940 "unblock-udeb",
941 "force",
942 "urgent",
943 "remove",
944 "age-days",
945 ]:
946 z: dict[Optional[str], dict[Optional[str], tuple[Hint, str]]] = defaultdict(
947 dict
948 )
949 for hint in hints[x]:
950 package = hint.package
951 architecture = hint.architecture
952 key = (hint, hint.user)
953 if (
954 package in z
955 and architecture in z[package]
956 and z[package][architecture] != key
957 ):
958 hint2 = z[package][architecture][0]
959 if x in ["unblock", "unblock-udeb"]:
960 assert hint.version is not None
961 assert hint2.version is not None
962 if apt_pkg.version_compare(hint2.version, hint.version) < 0:
963 # This hint is for a newer version, so discard the old one
964 self.logger.warning(
965 "Overriding %s[%s] = ('%s', '%s', '%s') with ('%s', '%s', '%s')",
966 x,
967 package,
968 hint2.version,
969 hint2.architecture,
970 hint2.user,
971 hint.version,
972 hint.architecture,
973 hint.user,
974 )
975 hint2.set_active(False)
976 else:
977 # This hint is for an older version, so ignore it in favour of the new one
978 self.logger.warning(
979 "Ignoring %s[%s] = ('%s', '%s', '%s'), ('%s', '%s', '%s') is higher or equal",
980 x,
981 package,
982 hint.version,
983 hint.architecture,
984 hint.user,
985 hint2.version,
986 hint2.architecture,
987 hint2.user,
988 )
989 hint.set_active(False)
990 else:
991 self.logger.warning(
992 "Overriding %s[%s] = ('%s', '%s') with ('%s', '%s')",
993 x,
994 package,
995 hint2.user,
996 hint2,
997 hint.user,
998 hint,
999 )
1000 hint2.set_active(False)
1002 z[package][architecture] = key
1004 for hint in hints["allow-uninst"]:
1005 if hint.architecture == "source":
1006 for arch in self.options.architectures:
1007 self.allow_uninst[arch].add(hint.package)
1008 else:
1009 assert hint.architecture is not None
1010 self.allow_uninst[hint.architecture].add(hint.package)
1012 # Sanity check the hints hash
1013 if len(hints["block"]) == 0 and len(hints["block-udeb"]) == 0:
1014 self.logger.warning("WARNING: No block hints at all, not even udeb ones!")
1016 def write_excuses(self) -> None:
1017 """Produce and write the update excuses
1019 This method handles the update excuses generation: the packages are
1020 looked at to determine whether they are valid candidates. For the details
1021 of this procedure, please refer to the module docstring.
1022 """
1024 self.logger.info("Update Excuses generation started")
1026 mi_factory = self._migration_item_factory
1027 excusefinder = ExcuseFinder(
1028 self.options,
1029 self.suite_info,
1030 self.all_binaries,
1031 self.pkg_universe,
1032 self._policy_engine,
1033 mi_factory,
1034 self.hints,
1035 )
1037 excuses, upgrade_me = excusefinder.find_actionable_excuses()
1038 self.excuses = excuses
1040 # sort the list of candidates
1041 self.upgrade_me = sorted(upgrade_me)
1042 old_lib_removals = old_libraries(
1043 mi_factory, self.suite_info, self.options.outofsync_arches
1044 )
1045 self.upgrade_me.extend(old_lib_removals)
1046 self.output_logger.info(
1047 "List of old libraries added to upgrade_me (%d):", len(old_lib_removals)
1048 )
1049 log_and_format_old_libraries(self.output_logger, old_lib_removals)
1051 # write excuses to the output file
1052 if not self.options.dry_run: 1052 ↛ 1065line 1052 didn't jump to line 1065, because the condition on line 1052 was never false
1053 self.logger.info("> Writing Excuses to %s", self.options.excuses_output)
1054 write_excuses(
1055 excuses, self.options.excuses_output, output_format="legacy-html"
1056 )
1057 if hasattr(self.options, "excuses_yaml_output"): 1057 ↛ 1065line 1057 didn't jump to line 1065, because the condition on line 1057 was never false
1058 self.logger.info(
1059 "> Writing YAML Excuses to %s", self.options.excuses_yaml_output
1060 )
1061 write_excuses(
1062 excuses, self.options.excuses_yaml_output, output_format="yaml"
1063 )
1065 self.logger.info("Update Excuses generation completed")
1067 # Upgrade run
1068 # -----------
1070 def eval_nuninst(
1071 self,
1072 nuninst: dict[str, set[str]],
1073 original: Optional[dict[str, set[str]]] = None,
1074 ) -> str:
1075 """Return a string which represents the uninstallability counters
1077 This method returns a string which represents the uninstallability
1078 counters reading the uninstallability statistics `nuninst` and, if
1079 present, merging the results with the `original` one.
1081 An example of the output string is:
1082 1+2: i-0:a-0:a-0:h-0:i-1:m-0:m-0:p-0:a-0:m-0:s-2:s-0
1084 where the first part is the number of broken packages in non-break
1085 architectures + the total number of broken packages for all the
1086 architectures.
1087 """
1088 res = []
1089 total = 0
1090 totalbreak = 0
1091 for arch in self.options.architectures:
1092 if arch in nuninst: 1092 ↛ 1094line 1092 didn't jump to line 1094, because the condition on line 1092 was never false
1093 n = len(nuninst[arch])
1094 elif original and arch in original:
1095 n = len(original[arch])
1096 else:
1097 continue
1098 if arch in self.options.break_arches:
1099 totalbreak = totalbreak + n
1100 else:
1101 total = total + n
1102 res.append("%s-%d" % (arch[0], n))
1103 return "%d+%d: %s" % (total, totalbreak, ":".join(res))
1105 def iter_packages(
1106 self,
1107 packages: list[MigrationItem],
1108 selected: list[MigrationItem],
1109 nuninst: Optional[dict[str, set[str]]] = None,
1110 ) -> tuple[Optional[dict[str, set[str]]], list[MigrationItem]]:
1111 """Iter on the list of actions and apply them one-by-one
1113 This method applies the changes from `packages` to testing, checking the uninstallability
1114 counters for every action performed. If the action does not improve them, it is reverted.
1115 The method returns the new uninstallability counters and the remaining actions if the
1116 final result is successful, otherwise (None, []).
1118 :param selected: list of MigrationItem?
1119 :param nuninst: dict with sets ? of ? per architecture
1120 """
1121 assert self.suite_info is not None # for type checking
1122 group_info = {}
1123 rescheduled_packages = packages
1124 maybe_rescheduled_packages: list[MigrationItem] = []
1125 output_logger = self.output_logger
1126 solver = InstallabilitySolver(self.pkg_universe, self._inst_tester)
1127 mm = self._migration_manager
1128 target_suite = self.suite_info.target_suite
1130 for y in sorted((y for y in packages), key=attrgetter("uvname")):
1131 try:
1132 _, updates, rms, _ = mm.compute_groups(y)
1133 result = (y, sorted(updates), sorted(rms))
1134 group_info[y] = result
1135 except MigrationConstraintException as e:
1136 rescheduled_packages.remove(y)
1137 output_logger.info("not adding package to list: %s", (y.package))
1138 output_logger.info(" got exception: %s" % (repr(e)))
1140 if nuninst:
1141 nuninst_orig = nuninst
1142 else:
1143 nuninst_orig = self.nuninst_orig
1145 nuninst_last_accepted = nuninst_orig
1147 output_logger.info(
1148 "recur: [] %s %d/0", ",".join(x.uvname for x in selected), len(packages)
1149 )
1150 while rescheduled_packages:
1151 groups = [group_info[x] for x in rescheduled_packages]
1152 worklist = solver.solve_groups(groups)
1153 rescheduled_packages = []
1155 worklist.reverse()
1157 while worklist:
1158 comp = worklist.pop()
1159 comp_name = " ".join(item.uvname for item in comp)
1160 output_logger.info("trying: %s" % comp_name)
1161 with mm.start_transaction() as transaction:
1162 accepted = False
1163 try:
1164 (
1165 accepted,
1166 nuninst_after,
1167 failed_arch,
1168 new_cruft,
1169 ) = mm.migrate_items_to_target_suite(
1170 comp, nuninst_last_accepted
1171 )
1172 if accepted:
1173 selected.extend(comp)
1174 transaction.commit()
1175 output_logger.info("accepted: %s", comp_name)
1176 output_logger.info(
1177 " ori: %s", self.eval_nuninst(nuninst_orig)
1178 )
1179 output_logger.info(
1180 " pre: %s", self.eval_nuninst(nuninst_last_accepted)
1181 )
1182 output_logger.info(
1183 " now: %s", self.eval_nuninst(nuninst_after)
1184 )
1185 if new_cruft:
1186 output_logger.info(
1187 " added new cruft items to list: %s",
1188 " ".join(x.uvname for x in sorted(new_cruft)),
1189 )
1191 if len(selected) <= 20:
1192 output_logger.info(
1193 " all: %s", " ".join(x.uvname for x in selected)
1194 )
1195 else:
1196 output_logger.info(
1197 " most: (%d) .. %s",
1198 len(selected),
1199 " ".join(x.uvname for x in selected[-20:]),
1200 )
1201 if self.options.check_consistency_level >= 3:
1202 target_suite.check_suite_source_pkg_consistency(
1203 "iter_packages after commit"
1204 )
1205 nuninst_last_accepted = nuninst_after
1206 for cruft_item in new_cruft:
1207 try:
1208 _, updates, rms, _ = mm.compute_groups(cruft_item)
1209 result = (cruft_item, sorted(updates), sorted(rms))
1210 group_info[cruft_item] = result
1211 worklist.append([cruft_item])
1212 except MigrationConstraintException as e:
1213 output_logger.info(
1214 " got exception adding cruft item %s to list: %s"
1215 % (cruft_item.uvname, repr(e))
1216 )
1217 rescheduled_packages.extend(maybe_rescheduled_packages)
1218 maybe_rescheduled_packages.clear()
1219 else:
1220 transaction.rollback()
1221 broken = sorted(
1222 b
1223 for b in nuninst_after[failed_arch]
1224 if b not in nuninst_last_accepted[failed_arch]
1225 )
1226 compare_nuninst = None
1227 if any(
1228 item for item in comp if item.architecture != "source"
1229 ):
1230 compare_nuninst = nuninst_last_accepted
1231 # NB: try_migration already reverted this for us, so just print the results and move on
1232 output_logger.info(
1233 "skipped: %s (%d, %d, %d)",
1234 comp_name,
1235 len(rescheduled_packages),
1236 len(maybe_rescheduled_packages),
1237 len(worklist),
1238 )
1239 output_logger.info(
1240 " got: %s",
1241 self.eval_nuninst(nuninst_after, compare_nuninst),
1242 )
1243 output_logger.info(
1244 " * %s: %s", failed_arch, ", ".join(broken)
1245 )
1246 if self.options.check_consistency_level >= 3:
1247 target_suite.check_suite_source_pkg_consistency(
1248 "iter_package after rollback (not accepted)"
1249 )
1251 except MigrationConstraintException as e:
1252 transaction.rollback()
1253 output_logger.info(
1254 "skipped: %s (%d, %d, %d)",
1255 comp_name,
1256 len(rescheduled_packages),
1257 len(maybe_rescheduled_packages),
1258 len(worklist),
1259 )
1260 output_logger.info(" got exception: %s" % (repr(e)))
1261 if self.options.check_consistency_level >= 3: 1261 ↛ 1266line 1261 didn't jump to line 1266, because the condition on line 1261 was never false
1262 target_suite.check_suite_source_pkg_consistency(
1263 "iter_package after rollback (MigrationConstraintException)"
1264 )
1266 if not accepted:
1267 if len(comp) > 1:
1268 output_logger.info(
1269 " - splitting the component into single items and retrying them"
1270 )
1271 worklist.extend([item] for item in comp)
1272 else:
1273 maybe_rescheduled_packages.append(comp[0])
1275 output_logger.info(" finish: [%s]", ",".join(x.uvname for x in selected))
1276 output_logger.info("endloop: %s", self.eval_nuninst(self.nuninst_orig))
1277 output_logger.info(" now: %s", self.eval_nuninst(nuninst_last_accepted))
1278 format_and_log_uninst(
1279 output_logger,
1280 self.options.architectures,
1281 newly_uninst(self.nuninst_orig, nuninst_last_accepted),
1282 )
1283 output_logger.info("")
1285 return (nuninst_last_accepted, maybe_rescheduled_packages)
1287 def do_all(
1288 self,
1289 hinttype: Optional[str] = None,
1290 init: Optional[list[MigrationItem]] = None,
1291 actions: Optional[list[MigrationItem]] = None,
1292 ) -> None:
1293 """Testing update runner
1295 This method tries to update testing checking the uninstallability
1296 counters before and after the actions to decide if the update was
1297 successful or not.
1298 """
1299 selected = []
1300 if actions:
1301 upgrade_me = actions[:]
1302 else:
1303 upgrade_me = self.upgrade_me[:]
1304 nuninst_start = self.nuninst_orig
1305 output_logger = self.output_logger
1306 target_suite = self.suite_info.target_suite
1308 # these are special parameters for hints processing
1309 force = False
1310 recurse = True
1311 nuninst_end = None
1312 extra: list[MigrationItem] = []
1313 mm = self._migration_manager
1315 if hinttype == "easy" or hinttype == "force-hint":
1316 force = hinttype == "force-hint"
1317 recurse = False
1319 # if we have a list of initial packages, check them
1320 if init:
1321 for x in init:
1322 if x not in upgrade_me:
1323 output_logger.warning(
1324 "failed: %s is not a valid candidate (or it already migrated)",
1325 x.uvname,
1326 )
1327 return None
1328 selected.append(x)
1329 upgrade_me.remove(x)
1331 output_logger.info("start: %s", self.eval_nuninst(nuninst_start))
1332 output_logger.info("orig: %s", self.eval_nuninst(nuninst_start))
1334 if not (init and not force):
1335 # No "outer" transaction needed as we will never need to rollback
1336 # (e.g. "force-hint" or a regular "main run"). Emulate the start_transaction
1337 # call from the MigrationManager, so the rest of the code follows the
1338 # same flow regardless of whether we need the transaction or not.
1340 @contextlib.contextmanager
1341 def _start_transaction() -> Iterator[Optional["MigrationTransactionState"]]:
1342 yield None
1344 else:
1345 # We will need to be able to roll back (e.g. easy or a "hint"-hint)
1346 _start_transaction = mm.start_transaction
1348 with _start_transaction() as transaction:
1349 if init:
1350 # init => a hint (e.g. "easy") - so do the hint run
1351 (_, nuninst_end, _, new_cruft) = mm.migrate_items_to_target_suite(
1352 selected, self.nuninst_orig, stop_on_first_regression=False
1353 )
1355 if recurse:
1356 # Ensure upgrade_me and selected do not overlap, if we
1357 # follow-up with a recurse ("hint"-hint).
1358 upgrade_me = [x for x in upgrade_me if x not in set(selected)]
1359 else:
1360 # On non-recursive hints check for cruft and purge it proactively in case it "fixes" the hint.
1361 cruft = [x for x in upgrade_me if x.is_cruft_removal]
1362 if new_cruft:
1363 output_logger.info(
1364 "Change added new cruft items to list: %s",
1365 " ".join(x.uvname for x in sorted(new_cruft)),
1366 )
1367 cruft.extend(new_cruft)
1368 if cruft:
1369 output_logger.info("Checking if changes enables cruft removal")
1370 (nuninst_end, remaining_cruft) = self.iter_packages(
1371 cruft, selected, nuninst=nuninst_end
1372 )
1373 output_logger.info(
1374 "Removed %d of %d cruft item(s) after the changes",
1375 len(cruft) - len(remaining_cruft),
1376 len(cruft),
1377 )
1378 new_cruft.difference_update(remaining_cruft)
1380 # Add new cruft items regardless of whether we recurse. A future run might clean
1381 # them for us.
1382 upgrade_me.extend(new_cruft)
1384 if recurse:
1385 # Either the main run or the recursive run of a "hint"-hint.
1386 (nuninst_end, extra) = self.iter_packages(
1387 upgrade_me, selected, nuninst=nuninst_end
1388 )
1390 assert nuninst_end is not None
1391 nuninst_end_str = self.eval_nuninst(nuninst_end)
1393 if not recurse:
1394 # easy or force-hint
1395 output_logger.info("easy: %s", nuninst_end_str)
1397 if not force:
1398 format_and_log_uninst(
1399 self.output_logger,
1400 self.options.architectures,
1401 newly_uninst(nuninst_start, nuninst_end),
1402 )
1404 if force:
1405 # Force implies "unconditionally better"
1406 better = True
1407 else:
1408 break_arches: set[str] = set(self.options.break_arches)
1409 if all(x.architecture in break_arches for x in selected):
1410 # If we only migrated items from break-arches, then we
1411 # do not allow any regressions on these architectures.
1412 # This usually only happens with hints
1413 break_arches = set()
1414 better = is_nuninst_asgood_generous(
1415 self.constraints,
1416 self.allow_uninst,
1417 self.options.architectures,
1418 self.nuninst_orig,
1419 nuninst_end,
1420 break_arches,
1421 )
1423 if better:
1424 # Result accepted either by force or by being better than the original result.
1425 output_logger.info(
1426 "final: %s", ",".join(sorted(x.uvname for x in selected))
1427 )
1428 output_logger.info("start: %s", self.eval_nuninst(nuninst_start))
1429 output_logger.info(" orig: %s", self.eval_nuninst(self.nuninst_orig))
1430 output_logger.info(" end: %s", nuninst_end_str)
1431 if force:
1432 broken = newly_uninst(nuninst_start, nuninst_end)
1433 if broken:
1434 output_logger.warning("force breaks:")
1435 format_and_log_uninst(
1436 self.output_logger,
1437 self.options.architectures,
1438 broken,
1439 loglevel=logging.WARNING,
1440 )
1441 else:
1442 output_logger.info("force did not break any packages")
1443 output_logger.info(
1444 "SUCCESS (%d/%d)", len(actions or self.upgrade_me), len(extra)
1445 )
1446 self.nuninst_orig = nuninst_end
1447 self.all_selected += selected
1448 if transaction:
1449 transaction.commit()
1450 if self.options.check_consistency_level >= 2: 1450 ↛ 1454line 1450 didn't jump to line 1454, because the condition on line 1450 was never false
1451 target_suite.check_suite_source_pkg_consistency(
1452 "do_all after commit"
1453 )
1454 if not actions:
1455 if recurse:
1456 self.upgrade_me = extra
1457 else:
1458 self.upgrade_me = [
1459 x for x in self.upgrade_me if x not in set(selected)
1460 ]
1461 else:
1462 output_logger.info("FAILED\n")
1463 if not transaction: 1463 ↛ 1467line 1463 didn't jump to line 1467, because the condition on line 1463 was never true
1464 # if we 'FAILED', but we cannot rollback, we will probably
1465 # leave a broken state behind
1466 # this should not happen
1467 raise AssertionError("do_all FAILED but no transaction to rollback")
1468 transaction.rollback()
1469 if self.options.check_consistency_level >= 2: 1469 ↛ 1348line 1469 didn't jump to line 1348
1470 target_suite.check_suite_source_pkg_consistency(
1471 "do_all after rollback"
1472 )
1474 output_logger.info("")
1476 def assert_nuninst_is_correct(self) -> None:
1477 self.logger.info("> Update complete - Verifying non-installability counters")
1479 cached_nuninst = self.nuninst_orig
1480 self._inst_tester.compute_installability()
1481 computed_nuninst = compile_nuninst(
1482 self.suite_info.target_suite,
1483 self.options.architectures,
1484 self.options.nobreakall_arches,
1485 )
1486 if cached_nuninst != computed_nuninst: # pragma: no cover
1487 only_on_break_archs = True
1488 self.logger.error(
1489 "==================== NUNINST OUT OF SYNC ========================="
1490 )
1491 for arch in self.options.architectures:
1492 expected_nuninst = set(cached_nuninst[arch])
1493 actual_nuninst = set(computed_nuninst[arch])
1494 false_negatives = actual_nuninst - expected_nuninst
1495 false_positives = expected_nuninst - actual_nuninst
1496 # Britney does not quite work correctly with
1497 # break/fucked arches, so ignore issues there for now.
1498 if (
1499 false_negatives or false_positives
1500 ) and arch not in self.options.break_arches:
1501 only_on_break_archs = False
1502 if false_negatives:
1503 self.logger.error(
1504 " %s - unnoticed nuninst: %s", arch, str(false_negatives)
1505 )
1506 if false_positives:
1507 self.logger.error(
1508 " %s - invalid nuninst: %s", arch, str(false_positives)
1509 )
1510 if false_negatives or false_positives:
1511 self.logger.info(
1512 " %s - actual nuninst: %s", arch, str(sorted(actual_nuninst))
1513 )
1514 self.logger.error(
1515 "==================== NUNINST OUT OF SYNC ========================="
1516 )
1517 if not only_on_break_archs:
1518 raise AssertionError("NUNINST OUT OF SYNC")
1519 else:
1520 self.logger.warning("Nuninst is out of sync on some break arches")
1522 self.logger.info("> All non-installability counters are ok")
1524 def upgrade_testing(self) -> None:
1525 """Upgrade testing using the packages from the source suites
1527 This method tries to upgrade testing using the packages from the
1528 source suites.
1529 Before running the do_all method, it tries the easy and force-hint
1530 commands.
1531 """
1533 output_logger = self.output_logger
1534 self.logger.info("Starting the upgrade test")
1535 output_logger.info(
1536 "Generated on: %s",
1537 time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())),
1538 )
1539 output_logger.info("Arch order is: %s", ", ".join(self.options.architectures))
1541 if not self.options.actions: 1541 ↛ 1552line 1541 didn't jump to line 1552, because the condition on line 1541 was never false
1542 # process `easy' hints
1543 for x in self.hints["easy"]:
1544 self.do_hint("easy", x.user, x.packages)
1546 # process `force-hint' hints
1547 for x in self.hints["force-hint"]:
1548 self.do_hint("force-hint", x.user, x.packages)
1550 # run the first round of the upgrade
1551 # - do separate runs for break arches
1552 allpackages = []
1553 normpackages = self.upgrade_me[:]
1554 archpackages = {}
1555 for a in self.options.break_arches:
1556 archpackages[a] = [p for p in normpackages if p.architecture == a]
1557 normpackages = [p for p in normpackages if p not in archpackages[a]]
1558 self.upgrade_me = normpackages
1559 output_logger.info("info: main run")
1560 self.do_all()
1561 allpackages += self.upgrade_me
1562 for a in self.options.break_arches:
1563 backup = self.options.break_arches
1564 self.options.break_arches = " ".join(
1565 x for x in self.options.break_arches if x != a
1566 )
1567 self.upgrade_me = archpackages[a]
1568 output_logger.info("info: broken arch run for %s", a)
1569 self.do_all()
1570 allpackages += self.upgrade_me
1571 self.options.break_arches = backup
1572 self.upgrade_me = allpackages
1574 if self.options.actions: 1574 ↛ 1575line 1574 didn't jump to line 1575, because the condition on line 1574 was never true
1575 self.printuninstchange()
1576 return
1578 # process `hint' hints
1579 hintcnt = 0
1580 for x in self.hints["hint"][:50]:
1581 if hintcnt > 50: 1581 ↛ 1582line 1581 didn't jump to line 1582, because the condition on line 1581 was never true
1582 output_logger.info("Skipping remaining hints...")
1583 break
1584 if self.do_hint("hint", x.user, x.packages): 1584 ↛ 1580line 1584 didn't jump to line 1580, because the condition on line 1584 was never false
1585 hintcnt += 1
1587 # run the auto hinter
1588 self.run_auto_hinter()
1590 if getattr(self.options, "remove_obsolete", "yes") == "yes":
1591 # obsolete source packages
1592 # a package is obsolete if none of the binary packages in testing
1593 # are built by it
1594 self.logger.info(
1595 "> Removing obsolete source packages from the target suite"
1596 )
1597 # local copies for performance
1598 target_suite = self.suite_info.target_suite
1599 sources_t = target_suite.sources
1600 binaries_t = target_suite.binaries
1601 mi_factory = self._migration_item_factory
1602 used = set(
1603 binaries_t[arch][binary].source
1604 for arch in binaries_t
1605 for binary in binaries_t[arch]
1606 )
1607 removals = [
1608 mi_factory.parse_item(
1609 "-%s/%s" % (source, sources_t[source].version), auto_correct=False
1610 )
1611 for source in sources_t
1612 if source not in used
1613 ]
1614 if removals:
1615 output_logger.info(
1616 "Removing obsolete source packages from the target suite (%d):",
1617 len(removals),
1618 )
1619 self.do_all(actions=removals)
1621 # smooth updates
1622 removals = old_libraries(
1623 self._migration_item_factory, self.suite_info, self.options.outofsync_arches
1624 )
1625 if removals:
1626 output_logger.info(
1627 "Removing packages left in the target suite (e.g. smooth updates or cruft)"
1628 )
1629 log_and_format_old_libraries(self.output_logger, removals)
1630 self.do_all(actions=removals)
1631 removals = old_libraries(
1632 self._migration_item_factory,
1633 self.suite_info,
1634 self.options.outofsync_arches,
1635 )
1637 output_logger.info(
1638 "List of old libraries in the target suite (%d):", len(removals)
1639 )
1640 log_and_format_old_libraries(self.output_logger, removals)
1642 self.printuninstchange()
1643 if self.options.check_consistency_level >= 1: 1643 ↛ 1649line 1643 didn't jump to line 1649, because the condition on line 1643 was never false
1644 target_suite = self.suite_info.target_suite
1645 self.assert_nuninst_is_correct()
1646 target_suite.check_suite_source_pkg_consistency("end")
1648 # output files
1649 if self.options.heidi_output and not self.options.dry_run: 1649 ↛ 1663line 1649 didn't jump to line 1663, because the condition on line 1649 was never false
1650 target_suite = self.suite_info.target_suite
1652 # write HeidiResult
1653 self.logger.info("Writing Heidi results to %s", self.options.heidi_output)
1654 write_heidi(
1655 self.options.heidi_output,
1656 target_suite,
1657 outofsync_arches=self.options.outofsync_arches,
1658 )
1660 self.logger.info("Writing delta to %s", self.options.heidi_delta_output)
1661 write_heidi_delta(self.options.heidi_delta_output, self.all_selected)
1663 self.logger.info("Test completed!")
1665 def printuninstchange(self) -> None:
1666 self.logger.info("Checking for newly uninstallable packages")
1667 uninst = newly_uninst(self.nuninst_orig_save, self.nuninst_orig)
1669 if uninst:
1670 self.output_logger.warning("")
1671 self.output_logger.warning(
1672 "Newly uninstallable packages in the target suite:"
1673 )
1674 format_and_log_uninst(
1675 self.output_logger,
1676 self.options.architectures,
1677 uninst,
1678 loglevel=logging.WARNING,
1679 )
1681 def hint_tester(self) -> None:
1682 """Run a command line interface to test hints
1684 This method provides a command line interface for the release team to
1685 try hints and evaluate the results.
1686 """
1687 import readline
1689 from britney2.completer import Completer
1691 histfile = os.path.expanduser("~/.britney2_history")
1692 if os.path.exists(histfile):
1693 readline.read_history_file(histfile)
1695 readline.parse_and_bind("tab: complete")
1696 readline.set_completer(Completer(self).completer)
1697 # Package names can contain "-" and we use "/" in our presentation of them as well,
1698 # so ensure readline does not split on these characters.
1699 readline.set_completer_delims(
1700 readline.get_completer_delims().replace("-", "").replace("/", "")
1701 )
1703 known_hints = self._hint_parser.registered_hints
1705 print("Britney hint tester")
1706 print()
1707 print(
1708 "Besides inputting known britney hints, the follow commands are also available"
1709 )
1710 print(" * quit/exit - terminates the shell")
1711 print(
1712 " * python-console - jump into an interactive python shell (with the current loaded dataset)"
1713 )
1714 print()
1716 while True:
1717 # read the command from the command line
1718 try:
1719 user_input = input("britney> ").split()
1720 except EOFError:
1721 print("")
1722 break
1723 except KeyboardInterrupt:
1724 print("")
1725 continue
1726 # quit the hint tester
1727 if user_input and user_input[0] in ("quit", "exit"):
1728 break
1729 elif user_input and user_input[0] == "python-console":
1730 try:
1731 import britney2.console
1732 except ImportError as e:
1733 print("Failed to import britney.console module: %s" % repr(e))
1734 continue
1735 britney2.console.run_python_console(self)
1736 print("Returning to the britney hint-tester console")
1737 # run a hint
1738 elif user_input and user_input[0] in ("easy", "hint", "force-hint"):
1739 mi_factory = self._migration_item_factory
1740 try:
1741 self.do_hint(
1742 user_input[0],
1743 "hint-tester",
1744 mi_factory.parse_items(user_input[1:]),
1745 )
1746 self.printuninstchange()
1747 except KeyboardInterrupt:
1748 continue
1749 elif user_input and user_input[0] in known_hints:
1750 self._hint_parser.parse_hints(
1751 "hint-tester", self.HINTS_ALL, "<stdin>", [" ".join(user_input)]
1752 )
1753 self.write_excuses()
1755 try:
1756 readline.write_history_file(histfile)
1757 except IOError as e:
1758 self.logger.warning("Could not write %s: %s", histfile, e)
1760 def do_hint(self, hinttype: str, who: str, pkgvers: list[MigrationItem]) -> bool:
1761 """Process hints
1763 This method process `easy`, `hint` and `force-hint` hints. If the
1764 requested version is not in the relevant source suite, then the hint
1765 is skipped.
1766 """
1768 output_logger = self.output_logger
1770 self.logger.info("> Processing '%s' hint from %s", hinttype, who)
1771 output_logger.info(
1772 "Trying %s from %s: %s",
1773 hinttype,
1774 who,
1775 " ".join("%s/%s" % (x.uvname, x.version) for x in pkgvers),
1776 )
1778 issues = []
1779 # loop on the requested packages and versions
1780 for idx in range(len(pkgvers)):
1781 pkg = pkgvers[idx]
1782 # skip removal requests
1783 if pkg.is_removal:
1784 continue
1786 suite = pkg.suite
1788 assert pkg.version is not None
1789 if pkg.package not in suite.sources: 1789 ↛ 1790line 1789 didn't jump to line 1790, because the condition on line 1789 was never true
1790 issues.append(
1791 "Source %s has no version in %s" % (pkg.package, suite.name)
1792 )
1793 elif ( 1793 ↛ 1797line 1793 didn't jump to line 1797
1794 apt_pkg.version_compare(suite.sources[pkg.package].version, pkg.version)
1795 != 0
1796 ):
1797 issues.append(
1798 "Version mismatch, %s %s != %s"
1799 % (pkg.package, pkg.version, suite.sources[pkg.package].version)
1800 )
1801 if issues: 1801 ↛ 1802line 1801 didn't jump to line 1802, because the condition on line 1801 was never true
1802 output_logger.warning("%s: Not using hint", ", ".join(issues))
1803 return False
1805 self.do_all(hinttype, pkgvers)
1806 return True
1808 def get_auto_hinter_hints(
1809 self, upgrade_me: list[MigrationItem]
1810 ) -> list[list[frozenset[MigrationItem]]]:
1811 """Auto-generate "easy" hints.
1813 This method attempts to generate "easy" hints for sets of packages which
1814 must migrate together. Beginning with a package which does not depend on
1815 any other package (in terms of excuses), a list of dependencies and
1816 reverse dependencies is recursively created.
1818 Once all such lists have been generated, any which are subsets of other
1819 lists are ignored in favour of the larger lists. The remaining lists are
1820 then attempted in turn as "easy" hints.
1822 We also try to auto hint circular dependencies analyzing the update
1823 excuses relationships. If they build a circular dependency, which we already
1824 know as not-working with the standard do_all algorithm, try to `easy` them.
1825 """
1826 self.logger.info("> Processing hints from the auto hinter")
1828 sources_t = self.suite_info.target_suite.sources
1829 excuses = self.excuses
1831 def excuse_still_valid(excuse: "Excuse") -> bool:
1832 source = excuse.source
1833 assert isinstance(excuse.item, MigrationItem)
1834 arch = excuse.item.architecture
1835 # TODO for binNMUs, this check is always ok, even if the item
1836 # migrated already
1837 valid = (
1838 arch != "source"
1839 or source not in sources_t
1840 or sources_t[source].version != excuse.ver[1]
1841 )
1842 # TODO migrated items should be removed from upgrade_me, so this
1843 # should not happen
1844 if not valid: 1844 ↛ 1845line 1844 didn't jump to line 1845, because the condition on line 1844 was never true
1845 raise AssertionError("excuse no longer valid %s" % (item))
1846 return valid
1848 # consider only excuses which are valid candidates and still relevant.
1849 valid_excuses = frozenset(
1850 e.name
1851 for n, e in excuses.items()
1852 if e.item in upgrade_me and excuse_still_valid(e)
1853 )
1854 excuses_deps = {
1855 name: valid_excuses.intersection(excuse.get_deps())
1856 for name, excuse in excuses.items()
1857 if name in valid_excuses
1858 }
1859 excuses_rdeps = defaultdict(set)
1860 for name, deps in excuses_deps.items():
1861 for dep in deps:
1862 excuses_rdeps[dep].add(name)
1864 # loop on them
1865 candidates = []
1866 mincands = []
1867 seen_hints = set()
1868 for e in valid_excuses:
1869 excuse = excuses[e]
1870 if not excuse.get_deps():
1871 assert isinstance(excuse.item, MigrationItem)
1872 items = [excuse.item]
1873 orig_size = 1
1874 looped = False
1875 seen_items = set()
1876 seen_items.update(items)
1878 for item in items:
1879 assert isinstance(item, MigrationItem)
1880 # excuses which depend on "item" or are depended on by it
1881 new_items = cast(
1882 set[MigrationItem],
1883 {
1884 excuses[x].item
1885 for x in chain(
1886 excuses_deps[item.name], excuses_rdeps[item.name]
1887 )
1888 },
1889 )
1890 new_items -= seen_items
1891 items.extend(new_items)
1892 seen_items.update(new_items)
1894 if not looped and len(items) > 1:
1895 orig_size = len(items)
1896 h = frozenset(seen_items)
1897 if h not in seen_hints: 1897 ↛ 1900line 1897 didn't jump to line 1900, because the condition on line 1897 was never false
1898 mincands.append(h)
1899 seen_hints.add(h)
1900 looped = True
1901 if len(items) != orig_size: 1901 ↛ 1902line 1901 didn't jump to line 1902, because the condition on line 1901 was never true
1902 h = frozenset(seen_items)
1903 if h != mincands[-1] and h not in seen_hints:
1904 candidates.append(h)
1905 seen_hints.add(h)
1906 return [candidates, mincands]
1908 def run_auto_hinter(self) -> None:
1909 for lst in self.get_auto_hinter_hints(self.upgrade_me):
1910 for hint in lst:
1911 self.do_hint("easy", "autohinter", sorted(hint))
1913 def nuninst_arch_report(self, nuninst: dict[str, set[str]], arch: str) -> None:
1914 """Print a report of uninstallable packages for one architecture."""
1915 all = defaultdict(set)
1916 binaries_t = self.suite_info.target_suite.binaries
1917 for p in nuninst[arch]:
1918 pkg = binaries_t[arch][p]
1919 all[(pkg.source, pkg.source_version)].add(p)
1921 print("* %s" % arch)
1923 for (src, ver), pkgs in sorted(all.items()):
1924 print(" %s (%s): %s" % (src, ver, " ".join(sorted(pkgs))))
1926 print()
1928 def _remove_archall_faux_packages(self) -> None:
1929 """Remove faux packages added for the excuses phase
1931 To prevent binary packages from going missing while they are listed by
1932 their source package we add bin:faux packages during reading in the
1933 Sources. They are used during the excuses phase to prevent packages
1934 from becoming candidates. However, they interfere in complex ways
1935 during the installability phase, so instead of having all code during
1936 migration be aware of this excuses phase implementation detail, let's
1937 remove them again.
1939 """
1940 if not self.options.archall_inconsistency_allowed:
1941 all_binaries = self.all_binaries
1942 faux_a = {x for x in all_binaries.keys() if x[2] == "faux"}
1943 for pkg_a in faux_a:
1944 del all_binaries[pkg_a]
1946 for suite in self.suite_info._suites.values():
1947 for arch in suite.binaries.keys():
1948 binaries = suite.binaries[arch]
1949 faux_b = {x for x in binaries if binaries[x].pkg_id[2] == "faux"}
1950 for pkg_b in faux_b:
1951 del binaries[pkg_b]
1952 sources = suite.sources
1953 for src in sources.keys():
1954 faux_s = {x for x in sources[src].binaries if x[2] == "faux"}
1955 sources[src].binaries -= faux_s
1957 def main(self) -> None:
1958 """Main method
1960 This is the entry point for the class: it includes the list of calls
1961 for the member methods which will produce the output files.
1962 """
1963 # if running in --print-uninst mode, quit
1964 if self.options.print_uninst: 1964 ↛ 1965line 1964 didn't jump to line 1965, because the condition on line 1964 was never true
1965 return
1966 # if no actions are provided, build the excuses and sort them
1967 elif not self.options.actions: 1967 ↛ 1971line 1967 didn't jump to line 1971, because the condition on line 1967 was never false
1968 self.write_excuses()
1969 # otherwise, use the actions provided by the command line
1970 else:
1971 self.upgrade_me = self.options.actions.split()
1973 self._remove_archall_faux_packages()
1975 if self.options.compute_migrations or self.options.hint_tester:
1976 if self.options.dry_run: 1976 ↛ 1977line 1976 didn't jump to line 1977, because the condition on line 1976 was never true
1977 self.logger.info(
1978 "Upgrade output not (also) written to a separate file"
1979 " as this is a dry-run."
1980 )
1981 elif hasattr(self.options, "upgrade_output"): 1981 ↛ 1991line 1981 didn't jump to line 1991, because the condition on line 1981 was never false
1982 upgrade_output = getattr(self.options, "upgrade_output")
1983 file_handler = logging.FileHandler(
1984 upgrade_output, mode="w", encoding="utf-8"
1985 )
1986 output_formatter = logging.Formatter("%(message)s")
1987 file_handler.setFormatter(output_formatter)
1988 self.output_logger.addHandler(file_handler)
1989 self.logger.info("Logging upgrade output to %s", upgrade_output)
1990 else:
1991 self.logger.info(
1992 "Upgrade output not (also) written to a separate file"
1993 " as the UPGRADE_OUTPUT configuration is not provided."
1994 )
1996 # run the hint tester
1997 if self.options.hint_tester: 1997 ↛ 1998line 1997 didn't jump to line 1998, because the condition on line 1997 was never true
1998 self.hint_tester()
1999 # run the upgrade test
2000 else:
2001 self.upgrade_testing()
2003 self.logger.info("> Stats from the installability tester")
2004 for stat in self._inst_tester.stats.stats():
2005 self.logger.info("> %s", stat)
2006 else:
2007 self.logger.info("Migration computation skipped as requested.")
2008 if not self.options.dry_run: 2008 ↛ 2010line 2008 didn't jump to line 2010, because the condition on line 2008 was never false
2009 self._policy_engine.save_state(self)
2010 logging.shutdown()
2013if __name__ == "__main__": 2013 ↛ 2014line 2013 didn't jump to line 2014, because the condition on line 2013 was never true
2014 Britney().main()