Coverage for britney2/britney.py: 83%

769 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-08 19:15 +0000

1#!/usr/bin/python3 -u 

2 

3# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org> 

4# Andreas Barth <aba@debian.org> 

5# Fabio Tranchitella <kobold@debian.org> 

6# Copyright (C) 2010-2013 Adam D. Barratt <adsb@debian.org> 

7 

8# This program is free software; you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation; either version 2 of the License, or 

11# (at your option) any later version. 

12 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17 

18""" 

19= Introduction = 

20 

21This is the Debian testing updater script, also known as "Britney". 

22 

23Packages are usually installed into the `testing' distribution after 

24they have undergone some degree of testing in unstable. The goal of 

25this software is to do this task in a smart way, allowing testing 

26to always be fully installable and close to being a release candidate. 

27 

28Britney's source code is split between two different but related tasks: 

29the first one is the generation of the update excuses, while the 

30second tries to update testing with the valid candidates; first 

31each package alone, then larger and even larger sets of packages 

32together. Each try is accepted if testing is not more uninstallable 

33after the update than before. 

34 

35= Data Loading = 

36 

37In order to analyze the entire Debian distribution, Britney needs to 

38load in memory the whole archive: this means more than 10.000 packages 

39for twelve architectures, as well as the dependency interconnections 

40between them. For this reason, the memory requirements for running this 

41software are quite high and at least 1 gigabyte of RAM should be available. 

42 

43Britney loads the source packages from the `Sources' file and the binary 

44packages from the `Packages_${arch}' files, where ${arch} is substituted 

45with the supported architectures. While loading the data, the software 

46analyzes the dependencies and builds a directed weighted graph in memory 

47with all the interconnections between the packages (see Britney.read_sources 

48and Britney.read_binaries). 

49 

50Other than source and binary packages, Britney loads the following data: 

51 

52 * rc-bugs-*, which contains the list of release-critical bugs for a given 

53 version of a source or binary package (see RCBugPolicy.read_bugs). 

54 

55 * age-policy-dates, which contains the date of the upload of a given version 

56 of a source package (see Britney.read_dates). 

57 

58 * age-policy-urgencies, which contains the urgency of the upload of a given 

59 version of a source package (see AgePolicy._read_urgencies). 

60 

61 * Hints, which contains lists of commands which modify the standard behaviour 

62 of Britney (see Britney.read_hints). 

63 

64 * Other policies typically require their own data. 

65 

66For a more detailed explanation about the format of these files, please read 

67the documentation of the related methods. The exact meaning of them will be 

68instead explained in the chapter "Excuses Generation". 

69 

70= Excuses = 

71 

72An excuse is a detailed explanation of why a package can or cannot 

73be updated in the testing distribution from a newer package in 

74another distribution (like for example unstable). The main purpose 

75of the excuses is to be written in an HTML file which will be 

76published over HTTP, as well as a YAML file. The maintainers will be able 

77to parse it manually or automatically to find the explanation of why their 

78packages have been updated or not. 

79 

80== Excuses generation == 

81 

82These are the steps (with references to method names) that Britney 

83does for the generation of the update excuses. 

84 

85 * If a source package is available in testing but it is not 

86 present in unstable and no binary packages in unstable are 

87 built from it, then it is marked for removal. 

88 

89 * Every source package in unstable and testing-proposed-updates, 

90 if already present in testing, is checked for binary-NMUs, new 

91 or dropped binary packages in all the supported architectures 

92 (see Britney.should_upgrade_srcarch). The steps to detect if an 

93 upgrade is needed are: 

94 

95 1. If there is a `remove' hint for the source package, the package 

96 is ignored: it will be removed and not updated. 

97 

98 2. For every binary package built from the new source, it checks 

99 for unsatisfied dependencies, new binary packages and updated 

100 binary packages (binNMU), excluding the architecture-independent 

101 ones, and packages not built from the same source. 

102 

103 3. For every binary package built from the old source, it checks 

104 if it is still built from the new source; if this is not true 

105 and the package is not architecture-independent, the script 

106 removes it from testing. 

107 

108 4. Finally, if there is something worth doing (eg. a new or updated 

109 binary package) and nothing wrong it marks the source package 

110 as "Valid candidate", or "Not considered" if there is something 

111 wrong which prevented the update. 

112 

113 * Every source package in unstable and testing-proposed-updates is 

114 checked for upgrade (see Britney.should_upgrade_src). The steps 

115 to detect if an upgrade is needed are: 

116 

117 1. If the source package in testing is more recent the new one 

118 is ignored. 

119 

120 2. If the source package doesn't exist (is fake), which means that 

121 a binary package refers to it but it is not present in the 

122 `Sources' file, the new one is ignored. 

123 

124 3. If the package doesn't exist in testing, the urgency of the 

125 upload is ignored and set to the default (actually `low'). 

126 

127 4. If there is a `remove' hint for the source package, the package 

128 is ignored: it will be removed and not updated. 

129 

130 5. If there is a `block' hint for the source package without an 

131 `unblock` hint or a `block-all source`, the package is ignored. 

132 

133 6. If there is a `block-udeb' hint for the source package, it will 

134 have the same effect as `block', but may only be cancelled by 

135 a subsequent `unblock-udeb' hint. 

136 

137 7. If the suite is unstable, the update can go ahead only if the 

138 upload happened more than the minimum days specified by the 

139 urgency of the upload; if this is not true, the package is 

140 ignored as `too-young'. Note that the urgency is sticky, meaning 

141 that the highest urgency uploaded since the previous testing 

142 transition is taken into account. 

143 

144 8. If the suite is unstable, all the architecture-dependent binary 

145 packages and the architecture-independent ones for the `nobreakall' 

146 architectures have to be built from the source we are considering. 

147 If this is not true, then these are called `out-of-date' 

148 architectures and the package is ignored. 

149 

150 9. The source package must have at least one binary package, otherwise 

151 it is ignored. 

152 

153 10. If the suite is unstable, the new source package must have no 

154 release critical bugs which do not also apply to the testing 

155 one. If this is not true, the package is ignored as `buggy'. 

156 

157 11. If there is a `force' hint for the source package, then it is 

158 updated even if it is marked as ignored from the previous steps. 

159 

160 12. If the suite is {testing-,}proposed-updates, the source package can 

161 be updated only if there is an explicit approval for it. Unless 

162 a `force' hint exists, the new package must also be available 

163 on all of the architectures for which it has binary packages in 

164 testing. 

165 

166 13. If the package will be ignored, mark it as "Valid candidate", 

167 otherwise mark it as "Not considered". 

168 

169 * The list of `remove' hints is processed: if the requested source 

170 package is not already being updated or removed and the version 

171 actually in testing is the same specified with the `remove' hint, 

172 it is marked for removal. 

173 

174 * The excuses are sorted by the number of days from the last upload 

175 (days-old) and by name. 

176 

177 * A list of unconsidered excuses (for which the package is not upgraded) 

178 is built. Using this list, all of the excuses depending on them are 

179 marked as invalid "impossible dependencies". 

180 

181 * The excuses are written in an HTML file. 

182""" 

183import contextlib 

184import logging 

185import optparse 

186import os 

187import sys 

188import time 

189from collections import defaultdict 

190from collections.abc import Iterator 

191from functools import reduce 

192from itertools import chain 

193from operator import attrgetter 

194from typing import TYPE_CHECKING, Any, Optional, cast 

195 

196import apt_pkg 

197 

198from britney2 import BinaryPackage, BinaryPackageId, SourcePackage, Suites 

199from britney2.excusefinder import ExcuseFinder 

200from britney2.hints import Hint, HintCollection, HintParser 

201from britney2.inputs.suiteloader import ( 

202 DebMirrorLikeSuiteContentLoader, 

203 MissingRequiredConfigurationError, 

204) 

205from britney2.installability.builder import build_installability_tester 

206from britney2.installability.solver import InstallabilitySolver 

207from britney2.migration import MigrationManager 

208from britney2.migrationitem import MigrationItem, MigrationItemFactory 

209from britney2.policies.autopkgtest import AutopkgtestPolicy 

210from britney2.policies.lintian import LintianPolicy 

211from britney2.policies.policy import ( 

212 AgePolicy, 

213 BlockPolicy, 

214 BuildDependsPolicy, 

215 BuiltOnBuilddPolicy, 

216 BuiltUsingPolicy, 

217 DependsPolicy, 

218 ImplicitDependencyPolicy, 

219 PiupartsPolicy, 

220 PolicyEngine, 

221 PolicyLoadRequest, 

222 RCBugPolicy, 

223 ReproduciblePolicy, 

224 ReverseRemovalPolicy, 

225) 

226from britney2.utils import ( 

227 MigrationConstraintException, 

228 clone_nuninst, 

229 compile_nuninst, 

230 format_and_log_uninst, 

231 is_nuninst_asgood_generous, 

232 log_and_format_old_libraries, 

233 newly_uninst, 

234 old_libraries, 

235 parse_option, 

236 parse_provides, 

237 read_nuninst, 

238 write_excuses, 

239 write_heidi, 

240 write_heidi_delta, 

241 write_nuninst, 

242) 

243 

244if TYPE_CHECKING: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true

245 from .excuse import Excuse 

246 from .installability.tester import InstallabilityTester 

247 from .installability.universe import BinaryPackageUniverse 

248 from .transaction import MigrationTransactionState 

249 

250 

251__author__ = "Fabio Tranchitella and the Debian Release Team" 

252__version__ = "2.0" 

253 

254 

255MIGRATION_POLICIES = [ 

256 PolicyLoadRequest.always_load(DependsPolicy), 

257 PolicyLoadRequest.conditionally_load(RCBugPolicy, "rcbug_enable", True), 

258 PolicyLoadRequest.conditionally_load(PiupartsPolicy, "piuparts_enable", True), 

259 PolicyLoadRequest.always_load(ImplicitDependencyPolicy), 

260 PolicyLoadRequest.conditionally_load(AutopkgtestPolicy, "adt_enable", True), 

261 PolicyLoadRequest.conditionally_load(LintianPolicy, "lintian_enable", False), 

262 PolicyLoadRequest.conditionally_load(ReproduciblePolicy, "repro_enable", False), 

263 PolicyLoadRequest.conditionally_load(AgePolicy, "age_enable", True), 

264 PolicyLoadRequest.always_load(BuildDependsPolicy), 

265 PolicyLoadRequest.always_load(BlockPolicy), 

266 PolicyLoadRequest.conditionally_load( 

267 BuiltUsingPolicy, "built_using_policy_enable", True 

268 ), 

269 PolicyLoadRequest.conditionally_load(BuiltOnBuilddPolicy, "check_buildd", False), 

270 PolicyLoadRequest.always_load(ReverseRemovalPolicy), 

271] 

272 

273 

274class Britney: 

275 """Britney, the Debian testing updater script 

276 

277 This is the script that updates the testing distribution. It is executed 

278 each day after the installation of the updated packages. It generates the 

279 `Packages' files for the testing distribution, but it does so in an 

280 intelligent manner; it tries to avoid any inconsistency and to use only 

281 non-buggy packages. 

282 

283 For more documentation on this script, please read the Developers Reference. 

284 """ 

285 

286 HINTS_HELPERS = ( 

287 "easy", 

288 "hint", 

289 "remove", 

290 "block", 

291 "block-udeb", 

292 "unblock", 

293 "unblock-udeb", 

294 "approve", 

295 "remark", 

296 "ignore-piuparts", 

297 "ignore-rc-bugs", 

298 "force-skiptest", 

299 "force-badtest", 

300 ) 

301 HINTS_STANDARD = ("urgent", "age-days") + HINTS_HELPERS 

302 # ALL = {"force", "force-hint", "block-all"} | HINTS_STANDARD | registered policy hints (not covered above) 

303 HINTS_ALL = "ALL" 

304 pkg_universe: "BinaryPackageUniverse" 

305 _inst_tester: "InstallabilityTester" 

306 constraints: dict[str, list[str]] 

307 suite_info: Suites 

308 

309 def __init__(self) -> None: 

310 """Class constructor 

311 

312 This method initializes and populates the data lists, which contain all 

313 the information needed by the other methods of the class. 

314 """ 

315 

316 # setup logging - provide the "short level name" (i.e. INFO -> I) that 

317 # we used to use prior to using the logging module. 

318 

319 old_factory = logging.getLogRecordFactory() 

320 short_level_mapping = { 

321 "CRITICAL": "F", 

322 "INFO": "I", 

323 "WARNING": "W", 

324 "ERROR": "E", 

325 "DEBUG": "N", 

326 } 

327 

328 def record_factory( 

329 *args: Any, **kwargs: Any 

330 ) -> logging.LogRecord: # pragma: no cover 

331 record = old_factory(*args, **kwargs) 

332 try: 

333 record.shortlevelname = short_level_mapping[record.levelname] 

334 except KeyError: 

335 record.shortlevelname = record.levelname 

336 return record 

337 

338 logging.setLogRecordFactory(record_factory) 

339 logging.basicConfig( 

340 format="{shortlevelname}: [{asctime}] - {message}", 

341 style="{", 

342 datefmt="%Y-%m-%dT%H:%M:%S%z", 

343 stream=sys.stdout, 

344 ) 

345 

346 self.logger = logging.getLogger() 

347 

348 # Logger for "upgrade_output"; the file handler will be attached later when 

349 # we are ready to open the file. 

350 self.output_logger = logging.getLogger("britney2.output.upgrade_output") 

351 self.output_logger.setLevel(logging.INFO) 

352 

353 # initialize the apt_pkg back-end 

354 apt_pkg.init() 

355 

356 # parse the command line arguments 

357 self._policy_engine = PolicyEngine() 

358 self.__parse_arguments() 

359 assert self.suite_info is not None # for type checking 

360 

361 self.all_selected: list[MigrationItem] = [] 

362 self.excuses: dict[str, "Excuse"] = {} 

363 self.upgrade_me: list[MigrationItem] = [] 

364 

365 if self.options.nuninst_cache: 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true

366 self.logger.info( 

367 "Not building the list of non-installable packages, as requested" 

368 ) 

369 if self.options.print_uninst: 

370 nuninst = read_nuninst( 

371 self.options.noninst_status, self.options.architectures 

372 ) 

373 print("* summary") 

374 print( 

375 "\n".join( 

376 "%4d %s" % (len(nuninst[x]), x) 

377 for x in self.options.architectures 

378 ) 

379 ) 

380 return 

381 

382 try: 

383 constraints_file = os.path.join( 

384 self.options.static_input_dir, "constraints" 

385 ) 

386 faux_packages = os.path.join(self.options.static_input_dir, "faux-packages") 

387 except AttributeError: 

388 self.logger.info("The static_input_dir option is not set") 

389 constraints_file = None 

390 faux_packages = None 

391 if faux_packages is not None and os.path.exists(faux_packages): 

392 self.logger.info("Loading faux packages from %s", faux_packages) 

393 self._load_faux_packages(faux_packages) 

394 elif faux_packages is not None: 394 ↛ 397line 394 didn't jump to line 397 because the condition on line 394 was always true

395 self.logger.info("No Faux packages as %s does not exist", faux_packages) 

396 

397 if constraints_file is not None and os.path.exists(constraints_file): 

398 self.logger.info("Loading constraints from %s", constraints_file) 

399 self.constraints = self._load_constraints(constraints_file) 

400 else: 

401 if constraints_file is not None: 401 ↛ 405line 401 didn't jump to line 405

402 self.logger.info( 

403 "No constraints as %s does not exist", constraints_file 

404 ) 

405 self.constraints = { 

406 "keep-installable": [], 

407 } 

408 

409 self.logger.info("Compiling Installability tester") 

410 self.pkg_universe, self._inst_tester = build_installability_tester( 

411 self.suite_info, self.options.architectures 

412 ) 

413 target_suite = self.suite_info.target_suite 

414 target_suite.inst_tester = self._inst_tester 

415 

416 self.allow_uninst: dict[str, set[str | None]] = {} 

417 for arch in self.options.architectures: 

418 self.allow_uninst[arch] = set() 

419 self._migration_item_factory: MigrationItemFactory = MigrationItemFactory( 

420 self.suite_info 

421 ) 

422 self._hint_parser: HintParser = HintParser(self._migration_item_factory) 

423 self._migration_manager: MigrationManager = MigrationManager( 

424 self.options, 

425 self.suite_info, 

426 self.all_binaries, 

427 self.pkg_universe, 

428 self.constraints, 

429 self.allow_uninst, 

430 self._migration_item_factory, 

431 self.hints, 

432 ) 

433 

434 if not self.options.nuninst_cache: 434 ↛ 474line 434 didn't jump to line 474 because the condition on line 434 was always true

435 self.logger.info( 

436 "Building the list of non-installable packages for the full archive" 

437 ) 

438 self._inst_tester.compute_installability() 

439 nuninst = compile_nuninst( 

440 target_suite, self.options.architectures, self.options.nobreakall_arches 

441 ) 

442 self.nuninst_orig: dict[str, set[str]] = nuninst 

443 for arch in self.options.architectures: 

444 self.logger.info( 

445 "> Found %d non-installable packages for %s", 

446 len(nuninst[arch]), 

447 arch, 

448 ) 

449 if self.options.print_uninst: 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 self.nuninst_arch_report(nuninst, arch) 

451 

452 if self.options.print_uninst: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 print("* summary") 

454 print( 

455 "\n".join( 

456 map( 

457 lambda x: "%4d %s" % (len(nuninst[x]), x), 

458 self.options.architectures, 

459 ) 

460 ) 

461 ) 

462 return 

463 else: 

464 write_nuninst(self.options.noninst_status, nuninst) 

465 

466 stats = self._inst_tester.compute_stats() 

467 self.logger.info("> Installability tester statistics (per architecture)") 

468 for arch in self.options.architectures: 

469 arch_stat = stats[arch] 

470 self.logger.info("> %s", arch) 

471 for stat in arch_stat.stat_summary(): 

472 self.logger.info("> - %s", stat) 

473 else: 

474 self.logger.info("Loading uninstallability counters from cache") 

475 self.nuninst_orig = read_nuninst( 

476 self.options.noninst_status, self.options.architectures 

477 ) 

478 

479 # nuninst_orig may get updated during the upgrade process 

480 self.nuninst_orig_save: dict[str, set[str]] = clone_nuninst( 

481 self.nuninst_orig, architectures=self.options.architectures 

482 ) 

483 

484 self._policy_engine.register_policy_hints(self._hint_parser) 

485 

486 try: 

487 self.read_hints(self.options.hintsdir) 

488 except AttributeError: 

489 self.read_hints(os.path.join(self.suite_info["unstable"].path, "Hints")) 

490 

491 self._policy_engine.initialise(self, self.hints) 

492 

493 def __parse_arguments(self) -> None: 

494 """Parse the command line arguments 

495 

496 This method parses and initializes the command line arguments. 

497 While doing so, it preprocesses some of the options to be converted 

498 in a suitable form for the other methods of the class. 

499 """ 

500 # initialize the parser 

501 parser = optparse.OptionParser(version="%prog") 

502 parser.add_option( 

503 "-v", "", action="count", dest="verbose", help="enable verbose output" 

504 ) 

505 parser.add_option( 

506 "-c", 

507 "--config", 

508 action="store", 

509 dest="config", 

510 default="/etc/britney.conf", 

511 help="path for the configuration file", 

512 ) 

513 parser.add_option( 

514 "", 

515 "--architectures", 

516 action="store", 

517 dest="architectures", 

518 default=None, 

519 help="override architectures from configuration file", 

520 ) 

521 parser.add_option( 

522 "", 

523 "--actions", 

524 action="store", 

525 dest="actions", 

526 default=None, 

527 help="override the list of actions to be performed", 

528 ) 

529 parser.add_option( 

530 "", 

531 "--hints", 

532 action="store", 

533 dest="hints", 

534 default=None, 

535 help="additional hints, separated by semicolons", 

536 ) 

537 parser.add_option( 

538 "", 

539 "--hint-tester", 

540 action="store_true", 

541 dest="hint_tester", 

542 default=None, 

543 help="provide a command line interface to test hints", 

544 ) 

545 parser.add_option( 

546 "", 

547 "--dry-run", 

548 action="store_true", 

549 dest="dry_run", 

550 default=False, 

551 help="disable all outputs to the testing directory", 

552 ) 

553 parser.add_option( 

554 "", 

555 "--nuninst-cache", 

556 action="store_true", 

557 dest="nuninst_cache", 

558 default=False, 

559 help="do not build the non-installability status, use the cache from file", 

560 ) 

561 parser.add_option( 

562 "", 

563 "--print-uninst", 

564 action="store_true", 

565 dest="print_uninst", 

566 default=False, 

567 help="just print a summary of uninstallable packages", 

568 ) 

569 parser.add_option( 

570 "", 

571 "--compute-migrations", 

572 action="store_true", 

573 dest="compute_migrations", 

574 default=True, 

575 help="Compute which packages can migrate (the default)", 

576 ) 

577 parser.add_option( 

578 "", 

579 "--no-compute-migrations", 

580 action="store_false", 

581 dest="compute_migrations", 

582 help="Do not compute which packages can migrate.", 

583 ) 

584 parser.add_option( 

585 "", 

586 "--series", 

587 action="store", 

588 dest="series", 

589 default="", 

590 help="set distribution series name", 

591 ) 

592 parser.add_option( 

593 "", 

594 "--distribution", 

595 action="store", 

596 dest="distribution", 

597 default="debian", 

598 help="set distribution name", 

599 ) 

600 (self.options, self.args) = parser.parse_args() 

601 

602 if self.options.verbose: 602 ↛ 608line 602 didn't jump to line 608 because the condition on line 602 was always true

603 if self.options.verbose > 1: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 self.logger.setLevel(logging.DEBUG) 

605 else: 

606 self.logger.setLevel(logging.INFO) 

607 else: 

608 self.logger.setLevel(logging.WARNING) 

609 # Historical way to get debug information (equivalent to -vv) 

610 try: # pragma: no cover 

611 if int(os.environ.get("BRITNEY_DEBUG", "0")): 

612 self.logger.setLevel(logging.DEBUG) 

613 except ValueError: # pragma: no cover 

614 pass 

615 

616 # integrity checks 

617 if self.options.nuninst_cache and self.options.print_uninst: # pragma: no cover 

618 self.logger.error("nuninst_cache and print_uninst are mutually exclusive!") 

619 sys.exit(1) 

620 

621 # if the configuration file exists, then read it and set the additional options 

622 if not os.path.isfile(self.options.config): # pragma: no cover 

623 self.logger.error( 

624 "Unable to read the configuration file (%s), exiting!", 

625 self.options.config, 

626 ) 

627 sys.exit(1) 

628 

629 self.HINTS: dict[str, Any] = {"command-line": self.HINTS_ALL} 

630 with open(self.options.config, encoding="utf-8") as config: 

631 for line in config: 

632 if "=" in line and not line.strip().startswith("#"): 

633 k, v = line.split("=", 1) 

634 k = k.strip() 

635 v = v.strip() 

636 if k.startswith("HINTS_"): 

637 self.HINTS[k.split("_")[1].lower()] = reduce( 637 ↛ exitline 637 didn't jump to the function exit

638 lambda x, y: x + y, 

639 [ 

640 hasattr(self, "HINTS_" + i) 

641 and getattr(self, "HINTS_" + i) 

642 or (i,) 

643 for i in v.split() 

644 ], 

645 ) 

646 elif not hasattr(self.options, k.lower()) or not getattr( 

647 self.options, k.lower() 

648 ): 

649 setattr(self.options, k.lower(), v) 

650 

651 parse_option(self.options, "archall_inconsistency_allowed", to_bool=True) 

652 parse_option( 

653 self.options, "be_strict_with_build_deps", default=True, to_bool=True 

654 ) 

655 

656 suite_loader = DebMirrorLikeSuiteContentLoader(self.options) 

657 

658 try: 

659 self.suite_info = suite_loader.load_suites() 

660 except MissingRequiredConfigurationError as e: # pragma: no cover 

661 self.logger.error( 

662 "Could not load the suite content due to missing configuration: %s", 

663 str(e), 

664 ) 

665 sys.exit(1) 

666 self.all_binaries = suite_loader.all_binaries() 

667 self.options.components = suite_loader.components 

668 self.options.architectures = suite_loader.architectures 

669 self.options.nobreakall_arches = suite_loader.nobreakall_arches 

670 self.options.outofsync_arches = suite_loader.outofsync_arches 

671 self.options.break_arches = suite_loader.break_arches 

672 self.options.new_arches = suite_loader.new_arches 

673 if self.options.series == "": 673 ↛ 676line 673 didn't jump to line 676 because the condition on line 673 was always true

674 self.options.series = self.suite_info.target_suite.name 

675 

676 if self.options.heidi_output and not hasattr( 676 ↛ 681line 676 didn't jump to line 681 because the condition on line 676 was always true

677 self.options, "heidi_delta_output" 

678 ): 

679 self.options.heidi_delta_output = self.options.heidi_output + "Delta" 

680 

681 self.options.smooth_updates = self.options.smooth_updates.split() 

682 

683 parse_option(self.options, "ignore_cruft", to_bool=True) 

684 parse_option(self.options, "check_consistency_level", default=2, to_int=True) 

685 parse_option(self.options, "build_url") 

686 

687 self._policy_engine.load_policies( 

688 self.options, self.suite_info, MIGRATION_POLICIES 

689 ) 

690 

691 @property 

692 def hints(self) -> HintCollection: 

693 return self._hint_parser.hints 

694 

695 def _load_faux_packages(self, faux_packages_file: str) -> None: 

696 """Loads fake packages 

697 

698 In rare cases, it is useful to create a "fake" package that can be used to satisfy 

699 dependencies. This is usually needed for packages that are not shipped directly 

700 on this mirror but is a prerequisite for using this mirror (e.g. some vendors provide 

701 non-distributable "setup" packages and contrib/non-free packages depend on these). 

702 

703 :param faux_packages_file: Path to the file containing the fake package definitions 

704 """ 

705 tag_file = apt_pkg.TagFile(faux_packages_file) 

706 get_field = tag_file.section.get 

707 step = tag_file.step 

708 no = 0 

709 pri_source_suite = self.suite_info.primary_source_suite 

710 target_suite = self.suite_info.target_suite 

711 

712 while step(): 

713 no += 1 

714 pkg_name = get_field("Package", None) 

715 if pkg_name is None: # pragma: no cover 

716 raise ValueError( 

717 "Missing Package field in paragraph %d (file %s)" 

718 % (no, faux_packages_file) 

719 ) 

720 pkg_name = sys.intern(pkg_name) 

721 version = sys.intern(get_field("Version", "1.0-1")) 

722 provides_raw = get_field("Provides") 

723 archs_raw = get_field("Architecture", None) 

724 component = get_field("Component", "non-free") 

725 if archs_raw: 725 ↛ 726line 725 didn't jump to line 726 because the condition on line 725 was never true

726 archs = archs_raw.split() 

727 else: 

728 archs = self.options.architectures 

729 faux_section = "faux" 

730 if component != "main": 730 ↛ 732line 730 didn't jump to line 732 because the condition on line 730 was always true

731 faux_section = "%s/faux" % component 

732 src_data = SourcePackage( 

733 pkg_name, 

734 version, 

735 sys.intern(faux_section), 

736 set(), 

737 None, 

738 True, 

739 None, 

740 None, 

741 [], 

742 [], 

743 ) 

744 

745 target_suite.sources[pkg_name] = src_data 

746 pri_source_suite.sources[pkg_name] = src_data 

747 

748 for arch in archs: 

749 pkg_id = BinaryPackageId(pkg_name, version, arch) 

750 if provides_raw: 750 ↛ 751line 750 didn't jump to line 751 because the condition on line 750 was never true

751 provides = parse_provides( 

752 provides_raw, pkg_id=pkg_id, logger=self.logger 

753 ) 

754 else: 

755 provides = [] 

756 bin_data = BinaryPackage( 

757 version, 

758 faux_section, 

759 pkg_name, 

760 version, 

761 arch, 

762 get_field("Multi-Arch"), 

763 None, 

764 None, 

765 provides, 

766 False, 

767 pkg_id, 

768 [], 

769 ) 

770 

771 src_data.binaries.add(pkg_id) 

772 target_suite.binaries[arch][pkg_name] = bin_data 

773 pri_source_suite.binaries[arch][pkg_name] = bin_data 

774 

775 # register provided packages with the target suite provides table 

776 for provided_pkg, provided_version, _ in bin_data.provides: 776 ↛ 777line 776 didn't jump to line 777 because the loop on line 776 never started

777 target_suite.provides_table[arch][provided_pkg].add( 

778 (pkg_name, provided_version) 

779 ) 

780 

781 self.all_binaries[pkg_id] = bin_data 

782 

783 def _load_constraints(self, constraints_file: str) -> dict[str, list[str]]: 

784 """Loads configurable constraints 

785 

786 The constraints file can contain extra rules that Britney should attempt 

787 to satisfy. Examples can be "keep package X in testing and ensure it is 

788 installable". 

789 

790 :param constraints_file: Path to the file containing the constraints 

791 """ 

792 tag_file = apt_pkg.TagFile(constraints_file) 

793 get_field = tag_file.section.get 

794 step = tag_file.step 

795 no = 0 

796 faux_version = sys.intern("1") 

797 faux_section = sys.intern("faux") 

798 keep_installable: list[str] = [] 

799 constraints = {"keep-installable": keep_installable} 

800 pri_source_suite = self.suite_info.primary_source_suite 

801 target_suite = self.suite_info.target_suite 

802 

803 while step(): 

804 no += 1 

805 pkg_name = get_field("Fake-Package-Name", None) 

806 if pkg_name is None: # pragma: no cover 

807 raise ValueError( 

808 "Missing Fake-Package-Name field in paragraph %d (file %s)" 

809 % (no, constraints_file) 

810 ) 

811 pkg_name = sys.intern(pkg_name) 

812 

813 def mandatory_field(x: str) -> str: 

814 v: str = get_field(x, None) 

815 if v is None: # pragma: no cover 

816 raise ValueError( 

817 "Missing %s field for %s (file %s)" 

818 % (x, pkg_name, constraints_file) 

819 ) 

820 return v 

821 

822 constraint = mandatory_field("Constraint") 

823 if constraint not in {"present-and-installable"}: # pragma: no cover 

824 raise ValueError( 

825 "Unsupported constraint %s for %s (file %s)" 

826 % (constraint, pkg_name, constraints_file) 

827 ) 

828 

829 self.logger.info(" - constraint %s", pkg_name) 

830 

831 pkg_list = [ 

832 x.strip() 

833 for x in mandatory_field("Package-List").split("\n") 

834 if x.strip() != "" and not x.strip().startswith("#") 

835 ] 

836 src_data = SourcePackage( 

837 pkg_name, 

838 faux_version, 

839 faux_section, 

840 set(), 

841 None, 

842 True, 

843 None, 

844 None, 

845 [], 

846 [], 

847 ) 

848 target_suite.sources[pkg_name] = src_data 

849 pri_source_suite.sources[pkg_name] = src_data 

850 keep_installable.append(pkg_name) 

851 for arch in self.options.architectures: 

852 deps = [] 

853 for pkg_spec in pkg_list: 

854 s = pkg_spec.split(None, 1) 

855 if len(s) == 1: 

856 deps.append(s[0]) 

857 else: 

858 pkg, arch_res = s 

859 if not ( 

860 arch_res.startswith("[") and arch_res.endswith("]") 

861 ): # pragma: no cover 

862 raise ValueError( 

863 "Invalid arch-restriction on %s - should be [arch1 arch2] (for %s file %s)" 

864 % (pkg, pkg_name, constraints_file) 

865 ) 

866 arch_res_l = arch_res[1:-1].split() 

867 if not arch_res_l: # pragma: no cover 

868 msg = "Empty arch-restriction for %s: Uses comma or negation (for %s file %s)" 

869 raise ValueError(msg % (pkg, pkg_name, constraints_file)) 

870 for a in arch_res_l: 

871 if a == arch: 

872 deps.append(pkg) 

873 elif "," in a or "!" in a: # pragma: no cover 

874 msg = "Invalid arch-restriction for %s: Uses comma or negation (for %s file %s)" 

875 raise ValueError( 

876 msg % (pkg, pkg_name, constraints_file) 

877 ) 

878 pkg_id = BinaryPackageId(pkg_name, faux_version, arch) 

879 bin_data = BinaryPackage( 

880 faux_version, 

881 faux_section, 

882 pkg_name, 

883 faux_version, 

884 arch, 

885 "no", 

886 ", ".join(deps), 

887 None, 

888 [], 

889 False, 

890 pkg_id, 

891 [], 

892 ) 

893 src_data.binaries.add(pkg_id) 

894 target_suite.binaries[arch][pkg_name] = bin_data 

895 pri_source_suite.binaries[arch][pkg_name] = bin_data 

896 self.all_binaries[pkg_id] = bin_data 

897 

898 return constraints 

899 

900 # Data reading/writing methods 

901 # ---------------------------- 

902 

903 def read_hints(self, hintsdir: str) -> None: 

904 """Read the hint commands from the specified directory 

905 

906 The hint commands are read from the files contained in the directory 

907 specified by the `hintsdir' parameter. 

908 The names of the files have to be the same as the authorized users 

909 for the hints. 

910 

911 The file contains rows with the format: 

912 

913 <command> <package-name>[/<version>] 

914 

915 The method returns a dictionary where the key is the command, and 

916 the value is the list of affected packages. 

917 """ 

918 

919 for who in self.HINTS.keys(): 

920 if who == "command-line": 

921 lines = self.options.hints and self.options.hints.split(";") or () 

922 filename = "<cmd-line>" 

923 self._hint_parser.parse_hints(who, self.HINTS[who], filename, lines) 

924 else: 

925 filename = os.path.join(hintsdir, who) 

926 if not os.path.isfile(filename): 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true

927 self.logger.error( 

928 "Cannot read hints list from %s, no such file!", filename 

929 ) 

930 continue 

931 self.logger.info("Loading hints list from %s", filename) 

932 with open(filename, encoding="utf-8") as f: 

933 self._hint_parser.parse_hints(who, self.HINTS[who], filename, f) 

934 

935 hints = self._hint_parser.hints 

936 

937 for x in [ 

938 "block", 

939 "block-all", 

940 "block-udeb", 

941 "unblock", 

942 "unblock-udeb", 

943 "force", 

944 "urgent", 

945 "remove", 

946 "age-days", 

947 ]: 

948 z: dict[str | None, dict[str | None, tuple[Hint, str]]] = defaultdict(dict) 

949 for hint in hints[x]: 

950 package = hint.package 

951 architecture = hint.architecture 

952 key = (hint, hint.user) 

953 if ( 

954 package in z 

955 and architecture in z[package] 

956 and z[package][architecture] != key 

957 ): 

958 hint2 = z[package][architecture][0] 

959 if x in ["unblock", "unblock-udeb"]: 959 ↛ 991line 959 didn't jump to line 991 because the condition on line 959 was always true

960 assert hint.version is not None 

961 assert hint2.version is not None 

962 if apt_pkg.version_compare(hint2.version, hint.version) < 0: 

963 # This hint is for a newer version, so discard the old one 

964 self.logger.warning( 

965 "Overriding %s[%s] = ('%s', '%s', '%s') with ('%s', '%s', '%s')", 

966 x, 

967 package, 

968 hint2.version, 

969 hint2.architecture, 

970 hint2.user, 

971 hint.version, 

972 hint.architecture, 

973 hint.user, 

974 ) 

975 hint2.set_active(False) 

976 else: 

977 # This hint is for an older version, so ignore it in favour of the new one 

978 self.logger.warning( 

979 "Ignoring %s[%s] = ('%s', '%s', '%s'), ('%s', '%s', '%s') is higher or equal", 

980 x, 

981 package, 

982 hint.version, 

983 hint.architecture, 

984 hint.user, 

985 hint2.version, 

986 hint2.architecture, 

987 hint2.user, 

988 ) 

989 hint.set_active(False) 

990 else: 

991 self.logger.warning( 

992 "Overriding %s[%s] = ('%s', '%s') with ('%s', '%s')", 

993 x, 

994 package, 

995 hint2.user, 

996 hint2, 

997 hint.user, 

998 hint, 

999 ) 

1000 hint2.set_active(False) 

1001 

1002 z[package][architecture] = key 

1003 

1004 for hint in hints["allow-uninst"]: 

1005 if hint.architecture == "source": 

1006 for arch in self.options.architectures: 

1007 self.allow_uninst[arch].add(hint.package) 

1008 else: 

1009 assert hint.architecture is not None 

1010 self.allow_uninst[hint.architecture].add(hint.package) 

1011 

1012 # Sanity check the hints hash 

1013 if len(hints["block"]) == 0 and len(hints["block-udeb"]) == 0: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true

1014 self.logger.warning("WARNING: No block hints at all, not even udeb ones!") 

1015 

1016 def write_excuses(self) -> None: 

1017 """Produce and write the update excuses 

1018 

1019 This method handles the update excuses generation: the packages are 

1020 looked at to determine whether they are valid candidates. For the details 

1021 of this procedure, please refer to the module docstring. 

1022 """ 

1023 

1024 self.logger.info("Update Excuses generation started") 

1025 

1026 mi_factory = self._migration_item_factory 

1027 excusefinder = ExcuseFinder( 

1028 self.options, 

1029 self.suite_info, 

1030 self.all_binaries, 

1031 self.pkg_universe, 

1032 self._policy_engine, 

1033 mi_factory, 

1034 self.hints, 

1035 ) 

1036 

1037 excuses, upgrade_me = excusefinder.find_actionable_excuses() 

1038 self.excuses = excuses 

1039 

1040 # sort the list of candidates 

1041 self.upgrade_me = sorted(upgrade_me) 

1042 old_lib_removals = old_libraries( 

1043 mi_factory, self.suite_info, self.options.outofsync_arches 

1044 ) 

1045 self.upgrade_me.extend(old_lib_removals) 

1046 self.output_logger.info( 

1047 "List of old libraries added to upgrade_me (%d):", len(old_lib_removals) 

1048 ) 

1049 log_and_format_old_libraries(self.output_logger, old_lib_removals) 

1050 

1051 # write excuses to the output file 

1052 if not self.options.dry_run: 1052 ↛ 1065line 1052 didn't jump to line 1065 because the condition on line 1052 was always true

1053 self.logger.info("> Writing Excuses to %s", self.options.excuses_output) 

1054 write_excuses( 

1055 excuses, self.options.excuses_output, output_format="legacy-html" 

1056 ) 

1057 if hasattr(self.options, "excuses_yaml_output"): 1057 ↛ 1065line 1057 didn't jump to line 1065 because the condition on line 1057 was always true

1058 self.logger.info( 

1059 "> Writing YAML Excuses to %s", self.options.excuses_yaml_output 

1060 ) 

1061 write_excuses( 

1062 excuses, self.options.excuses_yaml_output, output_format="yaml" 

1063 ) 

1064 

1065 self.logger.info("Update Excuses generation completed") 

1066 

1067 # Upgrade run 

1068 # ----------- 

1069 

1070 def eval_nuninst( 

1071 self, 

1072 nuninst: dict[str, set[str]], 

1073 original: dict[str, set[str]] | None = None, 

1074 ) -> str: 

1075 """Return a string which represents the uninstallability counters 

1076 

1077 This method returns a string which represents the uninstallability 

1078 counters reading the uninstallability statistics `nuninst` and, if 

1079 present, merging the results with the `original` one. 

1080 

1081 An example of the output string is: 

1082 1+2: i-0:a-0:a-0:h-0:i-1:m-0:m-0:p-0:a-0:m-0:s-2:s-0 

1083 

1084 where the first part is the number of broken packages in non-break 

1085 architectures + the total number of broken packages for all the 

1086 architectures. 

1087 """ 

1088 res = [] 

1089 total = 0 

1090 totalbreak = 0 

1091 for arch in self.options.architectures: 

1092 if arch in nuninst: 1092 ↛ 1094line 1092 didn't jump to line 1094 because the condition on line 1092 was always true

1093 n = len(nuninst[arch]) 

1094 elif original and arch in original: 

1095 n = len(original[arch]) 

1096 else: 

1097 continue 

1098 if arch in self.options.break_arches: 

1099 totalbreak = totalbreak + n 

1100 else: 

1101 total = total + n 

1102 res.append("%s-%d" % (arch[0], n)) 

1103 return "%d+%d: %s" % (total, totalbreak, ":".join(res)) 

1104 

1105 def iter_packages( 

1106 self, 

1107 packages: list[MigrationItem], 

1108 selected: list[MigrationItem], 

1109 nuninst: dict[str, set[str]] | None = None, 

1110 ) -> tuple[dict[str, set[str]] | None, list[MigrationItem]]: 

1111 """Iter on the list of actions and apply them one-by-one 

1112 

1113 This method applies the changes from `packages` to testing, checking the uninstallability 

1114 counters for every action performed. If the action does not improve them, it is reverted. 

1115 The method returns the new uninstallability counters and the remaining actions if the 

1116 final result is successful, otherwise (None, []). 

1117 

1118 :param selected: list of MigrationItem? 

1119 :param nuninst: dict with sets ? of ? per architecture 

1120 """ 

1121 assert self.suite_info is not None # for type checking 

1122 group_info = {} 

1123 rescheduled_packages = packages 

1124 maybe_rescheduled_packages: list[MigrationItem] = [] 

1125 output_logger = self.output_logger 

1126 solver = InstallabilitySolver(self.pkg_universe, self._inst_tester) 

1127 mm = self._migration_manager 

1128 target_suite = self.suite_info.target_suite 

1129 

1130 for y in sorted((y for y in packages), key=attrgetter("uvname")): 

1131 try: 

1132 _, updates, rms, _ = mm.compute_groups(y) 

1133 result = (y, sorted(updates), sorted(rms)) 

1134 group_info[y] = result 

1135 except MigrationConstraintException as e: 

1136 rescheduled_packages.remove(y) 

1137 output_logger.info("not adding package to list: %s", (y.package)) 

1138 output_logger.info(" got exception: %s" % (repr(e))) 

1139 

1140 if nuninst: 

1141 nuninst_orig = nuninst 

1142 else: 

1143 nuninst_orig = self.nuninst_orig 

1144 

1145 nuninst_last_accepted = nuninst_orig 

1146 

1147 output_logger.info( 

1148 "recur: [] %s %d/0", ",".join(x.uvname for x in selected), len(packages) 

1149 ) 

1150 while rescheduled_packages: 

1151 groups = [group_info[x] for x in rescheduled_packages] 

1152 worklist = solver.solve_groups(groups) 

1153 rescheduled_packages = [] 

1154 

1155 worklist.reverse() 

1156 

1157 while worklist: 

1158 comp = worklist.pop() 

1159 comp_name = " ".join(item.uvname for item in comp) 

1160 output_logger.info("trying: %s" % comp_name) 

1161 with mm.start_transaction() as transaction: 

1162 accepted = False 

1163 try: 

1164 ( 

1165 accepted, 

1166 nuninst_after, 

1167 failed_arch, 

1168 new_cruft, 

1169 ) = mm.migrate_items_to_target_suite( 

1170 comp, nuninst_last_accepted 

1171 ) 

1172 if accepted: 

1173 selected.extend(comp) 

1174 transaction.commit() 

1175 output_logger.info("accepted: %s", comp_name) 

1176 output_logger.info( 

1177 " ori: %s", self.eval_nuninst(nuninst_orig) 

1178 ) 

1179 output_logger.info( 

1180 " pre: %s", self.eval_nuninst(nuninst_last_accepted) 

1181 ) 

1182 output_logger.info( 

1183 " now: %s", self.eval_nuninst(nuninst_after) 

1184 ) 

1185 if new_cruft: 

1186 output_logger.info( 

1187 " added new cruft items to list: %s", 

1188 " ".join(x.uvname for x in sorted(new_cruft)), 

1189 ) 

1190 

1191 if len(selected) <= 20: 

1192 output_logger.info( 

1193 " all: %s", " ".join(x.uvname for x in selected) 

1194 ) 

1195 else: 

1196 output_logger.info( 

1197 " most: (%d) .. %s", 

1198 len(selected), 

1199 " ".join(x.uvname for x in selected[-20:]), 

1200 ) 

1201 if self.options.check_consistency_level >= 3: 1201 ↛ 1202line 1201 didn't jump to line 1202 because the condition on line 1201 was never true

1202 target_suite.check_suite_source_pkg_consistency( 

1203 "iter_packages after commit" 

1204 ) 

1205 nuninst_last_accepted = nuninst_after 

1206 for cruft_item in new_cruft: 

1207 try: 

1208 _, updates, rms, _ = mm.compute_groups(cruft_item) 

1209 result = (cruft_item, sorted(updates), sorted(rms)) 

1210 group_info[cruft_item] = result 

1211 worklist.append([cruft_item]) 

1212 except MigrationConstraintException as e: 

1213 output_logger.info( 

1214 " got exception adding cruft item %s to list: %s" 

1215 % (cruft_item.uvname, repr(e)) 

1216 ) 

1217 rescheduled_packages.extend(maybe_rescheduled_packages) 

1218 maybe_rescheduled_packages.clear() 

1219 else: 

1220 transaction.rollback() 

1221 assert failed_arch # type checking 

1222 broken = sorted( 

1223 b 

1224 for b in nuninst_after[failed_arch] 

1225 if b not in nuninst_last_accepted[failed_arch] 

1226 ) 

1227 compare_nuninst = None 

1228 if any( 

1229 item for item in comp if item.architecture != "source" 

1230 ): 

1231 compare_nuninst = nuninst_last_accepted 

1232 # NB: try_migration already reverted this for us, so just print the results and move on 

1233 output_logger.info( 

1234 "skipped: %s (%d, %d, %d)", 

1235 comp_name, 

1236 len(rescheduled_packages), 

1237 len(maybe_rescheduled_packages), 

1238 len(worklist), 

1239 ) 

1240 output_logger.info( 

1241 " got: %s", 

1242 self.eval_nuninst(nuninst_after, compare_nuninst), 

1243 ) 

1244 output_logger.info( 

1245 " * %s: %s", failed_arch, ", ".join(broken) 

1246 ) 

1247 if self.options.check_consistency_level >= 3: 1247 ↛ 1248line 1247 didn't jump to line 1248 because the condition on line 1247 was never true

1248 target_suite.check_suite_source_pkg_consistency( 

1249 "iter_package after rollback (not accepted)" 

1250 ) 

1251 

1252 except MigrationConstraintException as e: 

1253 transaction.rollback() 

1254 output_logger.info( 

1255 "skipped: %s (%d, %d, %d)", 

1256 comp_name, 

1257 len(rescheduled_packages), 

1258 len(maybe_rescheduled_packages), 

1259 len(worklist), 

1260 ) 

1261 output_logger.info(" got exception: %s" % (repr(e))) 

1262 if self.options.check_consistency_level >= 3: 1262 ↛ 1263line 1262 didn't jump to line 1263 because the condition on line 1262 was never true

1263 target_suite.check_suite_source_pkg_consistency( 

1264 "iter_package after rollback (MigrationConstraintException)" 

1265 ) 

1266 

1267 if not accepted: 

1268 if len(comp) > 1: 

1269 output_logger.info( 

1270 " - splitting the component into single items and retrying them" 

1271 ) 

1272 worklist.extend([item] for item in comp) 

1273 else: 

1274 maybe_rescheduled_packages.append(comp[0]) 

1275 

1276 output_logger.info(" finish: [%s]", ",".join(x.uvname for x in selected)) 

1277 output_logger.info("endloop: %s", self.eval_nuninst(self.nuninst_orig)) 

1278 output_logger.info(" now: %s", self.eval_nuninst(nuninst_last_accepted)) 

1279 format_and_log_uninst( 

1280 output_logger, 

1281 self.options.architectures, 

1282 newly_uninst(self.nuninst_orig, nuninst_last_accepted), 

1283 ) 

1284 output_logger.info("") 

1285 

1286 return (nuninst_last_accepted, maybe_rescheduled_packages) 

1287 

1288 def do_all( 

1289 self, 

1290 hinttype: str | None = None, 

1291 init: list[MigrationItem] | None = None, 

1292 actions: list[MigrationItem] | None = None, 

1293 ) -> None: 

1294 """Testing update runner 

1295 

1296 This method tries to update testing checking the uninstallability 

1297 counters before and after the actions to decide if the update was 

1298 successful or not. 

1299 """ 

1300 selected = [] 

1301 if actions: 

1302 upgrade_me = actions[:] 

1303 else: 

1304 upgrade_me = self.upgrade_me[:] 

1305 nuninst_start = self.nuninst_orig 

1306 output_logger = self.output_logger 

1307 target_suite = self.suite_info.target_suite 

1308 

1309 # these are special parameters for hints processing 

1310 force = False 

1311 recurse = True 

1312 nuninst_end = None 

1313 extra: list[MigrationItem] = [] 

1314 mm = self._migration_manager 

1315 

1316 if hinttype == "easy" or hinttype == "force-hint": 

1317 force = hinttype == "force-hint" 

1318 recurse = False 

1319 

1320 # if we have a list of initial packages, check them 

1321 if init: 

1322 for x in init: 

1323 if x not in upgrade_me: 

1324 output_logger.warning( 

1325 "failed: %s is not a valid candidate (or it already migrated)", 

1326 x.uvname, 

1327 ) 

1328 return None 

1329 selected.append(x) 

1330 upgrade_me.remove(x) 

1331 

1332 output_logger.info("start: %s", self.eval_nuninst(nuninst_start)) 

1333 output_logger.info("orig: %s", self.eval_nuninst(nuninst_start)) 

1334 

1335 if not (init and not force): 

1336 # No "outer" transaction needed as we will never need to rollback 

1337 # (e.g. "force-hint" or a regular "main run"). Emulate the start_transaction 

1338 # call from the MigrationManager, so the rest of the code follows the 

1339 # same flow regardless of whether we need the transaction or not. 

1340 

1341 @contextlib.contextmanager 

1342 def _start_transaction() -> Iterator[Optional["MigrationTransactionState"]]: 

1343 yield None 

1344 

1345 else: 

1346 # We will need to be able to roll back (e.g. easy or a "hint"-hint) 

1347 _start_transaction = mm.start_transaction 

1348 

1349 with _start_transaction() as transaction: 

1350 if init: 

1351 # init => a hint (e.g. "easy") - so do the hint run 

1352 (_, nuninst_end, _, new_cruft) = mm.migrate_items_to_target_suite( 

1353 selected, self.nuninst_orig, stop_on_first_regression=False 

1354 ) 

1355 

1356 if recurse: 

1357 # Ensure upgrade_me and selected do not overlap, if we 

1358 # follow-up with a recurse ("hint"-hint). 

1359 upgrade_me = [x for x in upgrade_me if x not in set(selected)] 

1360 else: 

1361 # On non-recursive hints check for cruft and purge it proactively in case it "fixes" the hint. 

1362 cruft = [x for x in upgrade_me if x.is_cruft_removal] 

1363 if new_cruft: 

1364 output_logger.info( 

1365 "Change added new cruft items to list: %s", 

1366 " ".join(x.uvname for x in sorted(new_cruft)), 

1367 ) 

1368 cruft.extend(new_cruft) 

1369 if cruft: 

1370 output_logger.info("Checking if changes enables cruft removal") 

1371 (nuninst_end, remaining_cruft) = self.iter_packages( 

1372 cruft, selected, nuninst=nuninst_end 

1373 ) 

1374 output_logger.info( 

1375 "Removed %d of %d cruft item(s) after the changes", 

1376 len(cruft) - len(remaining_cruft), 

1377 len(cruft), 

1378 ) 

1379 new_cruft.difference_update(remaining_cruft) 

1380 

1381 # Add new cruft items regardless of whether we recurse. A future run might clean 

1382 # them for us. 

1383 upgrade_me.extend(new_cruft) 

1384 

1385 if recurse: 

1386 # Either the main run or the recursive run of a "hint"-hint. 

1387 (nuninst_end, extra) = self.iter_packages( 

1388 upgrade_me, selected, nuninst=nuninst_end 

1389 ) 

1390 

1391 assert nuninst_end is not None 

1392 nuninst_end_str = self.eval_nuninst(nuninst_end) 

1393 

1394 if not recurse: 

1395 # easy or force-hint 

1396 output_logger.info("easy: %s", nuninst_end_str) 

1397 

1398 if not force: 

1399 format_and_log_uninst( 

1400 self.output_logger, 

1401 self.options.architectures, 

1402 newly_uninst(nuninst_start, nuninst_end), 

1403 ) 

1404 

1405 if force: 

1406 # Force implies "unconditionally better" 

1407 better = True 

1408 else: 

1409 break_arches: set[str] = set(self.options.break_arches) 

1410 if all(x.architecture in break_arches for x in selected): 

1411 # If we only migrated items from break-arches, then we 

1412 # do not allow any regressions on these architectures. 

1413 # This usually only happens with hints 

1414 break_arches = set() 

1415 better = is_nuninst_asgood_generous( 

1416 self.constraints, 

1417 self.allow_uninst, 

1418 self.options.architectures, 

1419 self.nuninst_orig, 

1420 nuninst_end, 

1421 break_arches, 

1422 ) 

1423 

1424 if better: 

1425 # Result accepted either by force or by being better than the original result. 

1426 output_logger.info( 

1427 "final: %s", ",".join(sorted(x.uvname for x in selected)) 

1428 ) 

1429 output_logger.info("start: %s", self.eval_nuninst(nuninst_start)) 

1430 output_logger.info(" orig: %s", self.eval_nuninst(self.nuninst_orig)) 

1431 output_logger.info(" end: %s", nuninst_end_str) 

1432 if force: 

1433 broken = newly_uninst(nuninst_start, nuninst_end) 

1434 if broken: 

1435 output_logger.warning("force breaks:") 

1436 format_and_log_uninst( 

1437 self.output_logger, 

1438 self.options.architectures, 

1439 broken, 

1440 loglevel=logging.WARNING, 

1441 ) 

1442 else: 

1443 output_logger.info("force did not break any packages") 

1444 output_logger.info( 

1445 "SUCCESS (%d/%d)", len(actions or self.upgrade_me), len(extra) 

1446 ) 

1447 self.nuninst_orig = nuninst_end 

1448 self.all_selected += selected 

1449 if transaction: 

1450 transaction.commit() 

1451 if self.options.check_consistency_level >= 2: 1451 ↛ 1455line 1451 didn't jump to line 1455 because the condition on line 1451 was always true

1452 target_suite.check_suite_source_pkg_consistency( 

1453 "do_all after commit" 

1454 ) 

1455 if not actions: 

1456 if recurse: 

1457 self.upgrade_me = extra 

1458 else: 

1459 self.upgrade_me = [ 

1460 x for x in self.upgrade_me if x not in set(selected) 

1461 ] 

1462 else: 

1463 output_logger.info("FAILED\n") 

1464 if not transaction: 1464 ↛ 1468line 1464 didn't jump to line 1468 because the condition on line 1464 was never true

1465 # if we 'FAILED', but we cannot rollback, we will probably 

1466 # leave a broken state behind 

1467 # this should not happen 

1468 raise AssertionError("do_all FAILED but no transaction to rollback") 

1469 transaction.rollback() 

1470 if self.options.check_consistency_level >= 2: 1470 ↛ 1349line 1470 didn't jump to line 1349

1471 target_suite.check_suite_source_pkg_consistency( 

1472 "do_all after rollback" 

1473 ) 

1474 

1475 output_logger.info("") 

1476 

1477 def assert_nuninst_is_correct(self) -> None: 

1478 self.logger.info("> Update complete - Verifying non-installability counters") 

1479 

1480 cached_nuninst = self.nuninst_orig 

1481 self._inst_tester.compute_installability() 

1482 computed_nuninst = compile_nuninst( 

1483 self.suite_info.target_suite, 

1484 self.options.architectures, 

1485 self.options.nobreakall_arches, 

1486 ) 

1487 if cached_nuninst != computed_nuninst: # pragma: no cover 

1488 only_on_break_archs = True 

1489 msg_l = [ 

1490 "==================== NUNINST OUT OF SYNC =========================" 

1491 ] 

1492 for arch in self.options.architectures: 

1493 expected_nuninst = set(cached_nuninst[arch]) 

1494 actual_nuninst = set(computed_nuninst[arch]) 

1495 false_negatives = actual_nuninst - expected_nuninst 

1496 false_positives = expected_nuninst - actual_nuninst 

1497 # Britney does not quite work correctly with 

1498 # break/fucked arches, so ignore issues there for now. 

1499 if ( 

1500 false_negatives or false_positives 

1501 ) and arch not in self.options.break_arches: 

1502 only_on_break_archs = False 

1503 if false_negatives: 

1504 msg_l.append(f" {arch} - unnoticed nuninst: {str(false_negatives)}") 

1505 if false_positives: 

1506 msg_l.append(f" {arch} - invalid nuninst: {str(false_positives)}") 

1507 if false_negatives or false_positives: 

1508 msg_l.append( 

1509 f" {arch} - actual nuninst: {str(sorted(actual_nuninst))}" 

1510 ) 

1511 msg_l.append(msg_l[0]) 

1512 for msg in msg_l: 

1513 if only_on_break_archs: 

1514 self.logger.warning(msg) 

1515 else: 

1516 self.logger.error(msg) 

1517 if not only_on_break_archs: 

1518 raise AssertionError("NUNINST OUT OF SYNC") 

1519 else: 

1520 self.logger.warning("Nuninst is out of sync on some break arches") 

1521 

1522 self.logger.info("> All non-installability counters are ok") 

1523 

1524 def upgrade_testing(self) -> None: 

1525 """Upgrade testing using the packages from the source suites 

1526 

1527 This method tries to upgrade testing using the packages from the 

1528 source suites. 

1529 Before running the do_all method, it tries the easy and force-hint 

1530 commands. 

1531 """ 

1532 

1533 output_logger = self.output_logger 

1534 self.logger.info("Starting the upgrade test") 

1535 output_logger.info( 

1536 "Generated on: %s", 

1537 time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())), 

1538 ) 

1539 output_logger.info("Arch order is: %s", ", ".join(self.options.architectures)) 

1540 

1541 if not self.options.actions: 1541 ↛ 1552line 1541 didn't jump to line 1552 because the condition on line 1541 was always true

1542 # process `easy' hints 

1543 for x in self.hints["easy"]: 

1544 self.do_hint("easy", x.user, x.packages) 

1545 

1546 # process `force-hint' hints 

1547 for x in self.hints["force-hint"]: 

1548 self.do_hint("force-hint", x.user, x.packages) 

1549 

1550 # run the first round of the upgrade 

1551 # - do separate runs for break arches 

1552 allpackages = [] 

1553 normpackages = self.upgrade_me[:] 

1554 archpackages = {} 

1555 for a in self.options.break_arches: 

1556 archpackages[a] = [p for p in normpackages if p.architecture == a] 

1557 normpackages = [p for p in normpackages if p not in archpackages[a]] 

1558 self.upgrade_me = normpackages 

1559 output_logger.info("info: main run") 

1560 self.do_all() 

1561 allpackages += self.upgrade_me 

1562 for a in self.options.break_arches: 

1563 backup = self.options.break_arches 

1564 self.options.break_arches = " ".join( 

1565 x for x in self.options.break_arches if x != a 

1566 ) 

1567 self.upgrade_me = archpackages[a] 

1568 output_logger.info("info: broken arch run for %s", a) 

1569 self.do_all() 

1570 allpackages += self.upgrade_me 

1571 self.options.break_arches = backup 

1572 self.upgrade_me = allpackages 

1573 

1574 if self.options.actions: 1574 ↛ 1575line 1574 didn't jump to line 1575 because the condition on line 1574 was never true

1575 self.printuninstchange() 

1576 return 

1577 

1578 # process `hint' hints 

1579 hintcnt = 0 

1580 for x in self.hints["hint"][:50]: 

1581 if hintcnt > 50: 1581 ↛ 1582line 1581 didn't jump to line 1582 because the condition on line 1581 was never true

1582 output_logger.info("Skipping remaining hints...") 

1583 break 

1584 if self.do_hint("hint", x.user, x.packages): 1584 ↛ 1580line 1584 didn't jump to line 1580 because the condition on line 1584 was always true

1585 hintcnt += 1 

1586 

1587 # run the auto hinter 

1588 self.run_auto_hinter() 

1589 

1590 if getattr(self.options, "remove_obsolete", "yes") == "yes": 

1591 # obsolete source packages 

1592 # a package is obsolete if none of the binary packages in testing 

1593 # are built by it 

1594 self.logger.info( 

1595 "> Removing obsolete source packages from the target suite" 

1596 ) 

1597 # local copies for performance 

1598 target_suite = self.suite_info.target_suite 

1599 sources_t = target_suite.sources 

1600 binaries_t = target_suite.binaries 

1601 mi_factory = self._migration_item_factory 

1602 used = { 

1603 binaries_t[arch][binary].source 

1604 for arch in binaries_t 

1605 for binary in binaries_t[arch] 

1606 if not binary.endswith("-faux-build-depends") 

1607 } 

1608 removals = [ 

1609 mi_factory.parse_item( 

1610 f"-{source}/{sources_t[source].version}", auto_correct=False 

1611 ) 

1612 for source in sources_t 

1613 if source not in used 

1614 ] 

1615 if removals: 

1616 output_logger.info( 

1617 "Removing obsolete source packages from the target suite (%d):", 

1618 len(removals), 

1619 ) 

1620 self.do_all(actions=removals) 

1621 

1622 # smooth updates 

1623 removals = old_libraries( 

1624 self._migration_item_factory, self.suite_info, self.options.outofsync_arches 

1625 ) 

1626 if removals: 

1627 output_logger.info( 

1628 "Removing packages left in the target suite (e.g. smooth updates or cruft)" 

1629 ) 

1630 log_and_format_old_libraries(self.output_logger, removals) 

1631 self.do_all(actions=removals) 

1632 removals = old_libraries( 

1633 self._migration_item_factory, 

1634 self.suite_info, 

1635 self.options.outofsync_arches, 

1636 ) 

1637 

1638 output_logger.info( 

1639 "List of old libraries in the target suite (%d):", len(removals) 

1640 ) 

1641 log_and_format_old_libraries(self.output_logger, removals) 

1642 

1643 self.printuninstchange() 

1644 if self.options.check_consistency_level >= 1: 1644 ↛ 1650line 1644 didn't jump to line 1650 because the condition on line 1644 was always true

1645 target_suite = self.suite_info.target_suite 

1646 self.assert_nuninst_is_correct() 

1647 target_suite.check_suite_source_pkg_consistency("end") 

1648 

1649 # output files 

1650 if self.options.heidi_output and not self.options.dry_run: 1650 ↛ 1664line 1650 didn't jump to line 1664 because the condition on line 1650 was always true

1651 target_suite = self.suite_info.target_suite 

1652 

1653 # write HeidiResult 

1654 self.logger.info("Writing Heidi results to %s", self.options.heidi_output) 

1655 write_heidi( 

1656 self.options.heidi_output, 

1657 target_suite, 

1658 outofsync_arches=self.options.outofsync_arches, 

1659 ) 

1660 

1661 self.logger.info("Writing delta to %s", self.options.heidi_delta_output) 

1662 write_heidi_delta(self.options.heidi_delta_output, self.all_selected) 

1663 

1664 self.logger.info("Test completed!") 

1665 

1666 def printuninstchange(self) -> None: 

1667 self.logger.info("Checking for newly uninstallable packages") 

1668 uninst = newly_uninst(self.nuninst_orig_save, self.nuninst_orig) 

1669 

1670 if uninst: 

1671 self.output_logger.info("") 

1672 self.output_logger.info( 

1673 "Newly uninstallable packages in the target suite (arch:all on BREAKALL_ARCHES not shown)" 

1674 ) 

1675 format_and_log_uninst( 

1676 self.output_logger, 

1677 self.options.architectures, 

1678 uninst, 

1679 loglevel=logging.WARNING, 

1680 ) 

1681 

1682 def hint_tester(self) -> None: 

1683 """Run a command line interface to test hints 

1684 

1685 This method provides a command line interface for the release team to 

1686 try hints and evaluate the results. 

1687 """ 

1688 import readline 

1689 

1690 from britney2.completer import Completer 

1691 

1692 histfile = os.path.expanduser("~/.britney2_history") 

1693 if os.path.exists(histfile): 

1694 readline.read_history_file(histfile) 

1695 

1696 readline.parse_and_bind("tab: complete") 

1697 readline.set_completer(Completer(self).completer) 

1698 # Package names can contain "-" and we use "/" in our presentation of them as well, 

1699 # so ensure readline does not split on these characters. 

1700 readline.set_completer_delims( 

1701 readline.get_completer_delims().replace("-", "").replace("/", "") 

1702 ) 

1703 

1704 known_hints = self._hint_parser.registered_hint_names 

1705 

1706 print("Britney hint tester") 

1707 print() 

1708 print( 

1709 "Besides inputting known britney hints, the follow commands are also available" 

1710 ) 

1711 print(" * quit/exit - terminates the shell") 

1712 print( 

1713 " * python-console - jump into an interactive python shell (with the current loaded dataset)" 

1714 ) 

1715 print() 

1716 

1717 while True: 

1718 # read the command from the command line 

1719 try: 

1720 user_input = input("britney> ").split() 

1721 except EOFError: 

1722 print("") 

1723 break 

1724 except KeyboardInterrupt: 

1725 print("") 

1726 continue 

1727 # quit the hint tester 

1728 if user_input and user_input[0] in ("quit", "exit"): 

1729 break 

1730 elif user_input and user_input[0] == "python-console": 

1731 try: 

1732 import britney2.console 

1733 except ImportError as e: 

1734 print("Failed to import britney.console module: %s" % repr(e)) 

1735 continue 

1736 britney2.console.run_python_console(self) 

1737 print("Returning to the britney hint-tester console") 

1738 # run a hint 

1739 elif user_input and user_input[0] in ("easy", "hint", "force-hint"): 

1740 mi_factory = self._migration_item_factory 

1741 try: 

1742 self.do_hint( 

1743 user_input[0], 

1744 "hint-tester", 

1745 mi_factory.parse_items(user_input[1:]), 

1746 ) 

1747 self.printuninstchange() 

1748 except KeyboardInterrupt: 

1749 continue 

1750 elif user_input and user_input[0] in known_hints: 

1751 self._hint_parser.parse_hints( 

1752 "hint-tester", self.HINTS_ALL, "<stdin>", [" ".join(user_input)] 

1753 ) 

1754 self.write_excuses() 

1755 

1756 try: 

1757 readline.write_history_file(histfile) 

1758 except OSError as e: 

1759 self.logger.warning("Could not write %s: %s", histfile, e) 

1760 

1761 def do_hint(self, hinttype: str, who: str, pkgvers: list[MigrationItem]) -> bool: 

1762 """Process hints 

1763 

1764 This method process `easy`, `hint` and `force-hint` hints. If the 

1765 requested version is not in the relevant source suite, then the hint 

1766 is skipped. 

1767 """ 

1768 

1769 output_logger = self.output_logger 

1770 

1771 self.logger.info("> Processing '%s' hint from %s", hinttype, who) 

1772 output_logger.info( 

1773 "Trying %s from %s: %s", 

1774 hinttype, 

1775 who, 

1776 " ".join(f"{x.uvname}/{x.version}" for x in pkgvers), 

1777 ) 

1778 

1779 issues = [] 

1780 # loop on the requested packages and versions 

1781 for idx in range(len(pkgvers)): 

1782 pkg = pkgvers[idx] 

1783 # skip removal requests 

1784 if pkg.is_removal: 

1785 continue 

1786 

1787 suite = pkg.suite 

1788 

1789 assert pkg.version is not None 

1790 if pkg.package not in suite.sources: 1790 ↛ 1791line 1790 didn't jump to line 1791 because the condition on line 1790 was never true

1791 issues.append(f"Source {pkg.package} has no version in {suite.name}") 

1792 elif ( 1792 ↛ 1796line 1792 didn't jump to line 1796

1793 apt_pkg.version_compare(suite.sources[pkg.package].version, pkg.version) 

1794 != 0 

1795 ): 

1796 issues.append( 

1797 "Version mismatch, %s %s != %s" 

1798 % (pkg.package, pkg.version, suite.sources[pkg.package].version) 

1799 ) 

1800 if issues: 1800 ↛ 1801line 1800 didn't jump to line 1801 because the condition on line 1800 was never true

1801 output_logger.warning("%s: Not using hint", ", ".join(issues)) 

1802 return False 

1803 

1804 self.do_all(hinttype, pkgvers) 

1805 return True 

1806 

1807 def get_auto_hinter_hints( 

1808 self, upgrade_me: list[MigrationItem] 

1809 ) -> list[list[frozenset[MigrationItem]]]: 

1810 """Auto-generate "easy" hints. 

1811 

1812 This method attempts to generate "easy" hints for sets of packages which 

1813 must migrate together. Beginning with a package which does not depend on 

1814 any other package (in terms of excuses), a list of dependencies and 

1815 reverse dependencies is recursively created. 

1816 

1817 Once all such lists have been generated, any which are subsets of other 

1818 lists are ignored in favour of the larger lists. The remaining lists are 

1819 then attempted in turn as "easy" hints. 

1820 

1821 We also try to auto hint circular dependencies analyzing the update 

1822 excuses relationships. If they build a circular dependency, which we already 

1823 know as not-working with the standard do_all algorithm, try to `easy` them. 

1824 """ 

1825 self.logger.info("> Processing hints from the auto hinter") 

1826 

1827 sources_t = self.suite_info.target_suite.sources 

1828 excuses = self.excuses 

1829 

1830 def excuse_still_valid(excuse: "Excuse") -> bool: 

1831 source = excuse.source 

1832 assert isinstance(excuse.item, MigrationItem) 

1833 arch = excuse.item.architecture 

1834 # TODO for binNMUs, this check is always ok, even if the item 

1835 # migrated already 

1836 valid = ( 

1837 arch != "source" 

1838 or source not in sources_t 

1839 or sources_t[source].version != excuse.ver[1] 

1840 ) 

1841 # TODO migrated items should be removed from upgrade_me, so this 

1842 # should not happen 

1843 if not valid: 1843 ↛ 1844line 1843 didn't jump to line 1844 because the condition on line 1843 was never true

1844 raise AssertionError("excuse no longer valid %s" % (item)) 

1845 return valid 

1846 

1847 # consider only excuses which are valid candidates and still relevant. 

1848 valid_excuses = frozenset( 

1849 e.name 

1850 for n, e in excuses.items() 

1851 if e.item in upgrade_me and excuse_still_valid(e) 

1852 ) 

1853 excuses_deps = { 

1854 name: valid_excuses.intersection(excuse.get_deps()) 

1855 for name, excuse in excuses.items() 

1856 if name in valid_excuses 

1857 } 

1858 excuses_rdeps = defaultdict(set) 

1859 for name, deps in excuses_deps.items(): 

1860 for dep in deps: 

1861 excuses_rdeps[dep].add(name) 

1862 

1863 # loop on them 

1864 candidates = [] 

1865 mincands = [] 

1866 seen_hints = set() 

1867 for e in valid_excuses: 

1868 excuse = excuses[e] 

1869 if not excuse.get_deps(): 

1870 assert isinstance(excuse.item, MigrationItem) 

1871 items = [excuse.item] 

1872 orig_size = 1 

1873 looped = False 

1874 seen_items = set() 

1875 seen_items.update(items) 

1876 

1877 for item in items: 

1878 assert isinstance(item, MigrationItem) 

1879 # excuses which depend on "item" or are depended on by it 

1880 new_items = cast( 

1881 set[MigrationItem], 

1882 { 

1883 excuses[x].item 

1884 for x in chain( 

1885 excuses_deps[item.name], excuses_rdeps[item.name] 

1886 ) 

1887 }, 

1888 ) 

1889 new_items -= seen_items 

1890 items.extend(new_items) 

1891 seen_items.update(new_items) 

1892 

1893 if not looped and len(items) > 1: 

1894 orig_size = len(items) 

1895 h = frozenset(seen_items) 

1896 if h not in seen_hints: 1896 ↛ 1899line 1896 didn't jump to line 1899 because the condition on line 1896 was always true

1897 mincands.append(h) 

1898 seen_hints.add(h) 

1899 looped = True 

1900 if len(items) != orig_size: 1900 ↛ 1901line 1900 didn't jump to line 1901 because the condition on line 1900 was never true

1901 h = frozenset(seen_items) 

1902 if h != mincands[-1] and h not in seen_hints: 

1903 candidates.append(h) 

1904 seen_hints.add(h) 

1905 return [candidates, mincands] 

1906 

1907 def run_auto_hinter(self) -> None: 

1908 for lst in self.get_auto_hinter_hints(self.upgrade_me): 

1909 for hint in lst: 

1910 self.do_hint("easy", "autohinter", sorted(hint)) 

1911 

1912 def nuninst_arch_report(self, nuninst: dict[str, set[str]], arch: str) -> None: 

1913 """Print a report of uninstallable packages for one architecture.""" 

1914 all = defaultdict(set) 

1915 binaries_t = self.suite_info.target_suite.binaries 

1916 for p in nuninst[arch]: 

1917 pkg = binaries_t[arch][p] 

1918 all[(pkg.source, pkg.source_version)].add(p) 

1919 

1920 print("* %s" % arch) 

1921 

1922 for (src, ver), pkgs in sorted(all.items()): 

1923 print(" {} ({}): {}".format(src, ver, " ".join(sorted(pkgs)))) 

1924 

1925 print() 

1926 

1927 def _remove_archall_faux_packages(self) -> None: 

1928 """Remove faux packages added for the excuses phase 

1929 

1930 To prevent binary packages from going missing while they are listed by 

1931 their source package we add bin:faux packages during reading in the 

1932 Sources. They are used during the excuses phase to prevent packages 

1933 from becoming candidates. However, they interfere in complex ways 

1934 during the installability phase, so instead of having all code during 

1935 migration be aware of this excuses phase implementation detail, let's 

1936 remove them again. 

1937 

1938 """ 

1939 if not self.options.archall_inconsistency_allowed: 

1940 all_binaries = self.all_binaries 

1941 faux_a = {x for x in all_binaries.keys() if x[2] == "faux"} 

1942 for pkg_a in faux_a: 

1943 del all_binaries[pkg_a] 

1944 

1945 for suite in self.suite_info._suites.values(): 

1946 for arch in suite.binaries.keys(): 

1947 binaries = suite.binaries[arch] 

1948 faux_b = {x for x in binaries if binaries[x].pkg_id[2] == "faux"} 

1949 for pkg_b in faux_b: 

1950 del binaries[pkg_b] 

1951 sources = suite.sources 

1952 for src in sources.keys(): 

1953 faux_s = {x for x in sources[src].binaries if x[2] == "faux"} 

1954 sources[src].binaries -= faux_s 

1955 

1956 def main(self) -> None: 

1957 """Main method 

1958 

1959 This is the entry point for the class: it includes the list of calls 

1960 for the member methods which will produce the output files. 

1961 """ 

1962 # if running in --print-uninst mode, quit 

1963 if self.options.print_uninst: 1963 ↛ 1964line 1963 didn't jump to line 1964 because the condition on line 1963 was never true

1964 return 

1965 # if no actions are provided, build the excuses and sort them 

1966 elif not self.options.actions: 1966 ↛ 1970line 1966 didn't jump to line 1970 because the condition on line 1966 was always true

1967 self.write_excuses() 

1968 # otherwise, use the actions provided by the command line 

1969 else: 

1970 self.upgrade_me = self.options.actions.split() 

1971 

1972 self._remove_archall_faux_packages() 

1973 

1974 if self.options.compute_migrations or self.options.hint_tester: 

1975 if self.options.dry_run: 1975 ↛ 1976line 1975 didn't jump to line 1976 because the condition on line 1975 was never true

1976 self.logger.info( 

1977 "Upgrade output not (also) written to a separate file" 

1978 " as this is a dry-run." 

1979 ) 

1980 elif hasattr(self.options, "upgrade_output"): 1980 ↛ 1990line 1980 didn't jump to line 1990 because the condition on line 1980 was always true

1981 upgrade_output = getattr(self.options, "upgrade_output") 

1982 file_handler = logging.FileHandler( 

1983 upgrade_output, mode="w", encoding="utf-8" 

1984 ) 

1985 output_formatter = logging.Formatter("%(message)s") 

1986 file_handler.setFormatter(output_formatter) 

1987 self.output_logger.addHandler(file_handler) 

1988 self.logger.info("Logging upgrade output to %s", upgrade_output) 

1989 else: 

1990 self.logger.info( 

1991 "Upgrade output not (also) written to a separate file" 

1992 " as the UPGRADE_OUTPUT configuration is not provided." 

1993 ) 

1994 

1995 # run the hint tester 

1996 if self.options.hint_tester: 1996 ↛ 1997line 1996 didn't jump to line 1997 because the condition on line 1996 was never true

1997 self.hint_tester() 

1998 # run the upgrade test 

1999 else: 

2000 self.upgrade_testing() 

2001 

2002 self.logger.info("> Stats from the installability tester") 

2003 for stat in self._inst_tester.stats.stats(): 

2004 self.logger.info("> %s", stat) 

2005 else: 

2006 self.logger.info("Migration computation skipped as requested.") 

2007 if not self.options.dry_run: 2007 ↛ 2009line 2007 didn't jump to line 2009 because the condition on line 2007 was always true

2008 self._policy_engine.save_state(self) 

2009 logging.shutdown() 

2010 

2011 

2012if __name__ == "__main__": 2012 ↛ 2013line 2012 didn't jump to line 2013 because the condition on line 2012 was never true

2013 Britney().main()