Coverage for britney2/excusefinder.py: 91%

346 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-08 19:15 +0000

1import logging 

2import optparse 

3from collections.abc import Iterable 

4from itertools import chain 

5from typing import TYPE_CHECKING, Any, Optional, cast 

6from urllib.parse import quote 

7 

8import apt_pkg 

9 

10from britney2 import BinaryPackage, BinaryPackageId, PackageId, Suites 

11from britney2.excuse import Excuse 

12from britney2.migrationitem import MigrationItem, MigrationItemFactory 

13from britney2.policies import PolicyVerdict 

14from britney2.utils import ( 

15 filter_out_faux, 

16 find_smooth_updateable_binaries, 

17 invalidate_excuses, 

18) 

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true

21 from .hints import HintCollection 

22 from .installability.universe import BinaryPackageUniverse 

23 from .policies.policy import PolicyEngine 

24 

25 

26class ExcuseFinder: 

27 

28 def __init__( 

29 self, 

30 options: optparse.Values, 

31 suite_info: Suites, 

32 all_binaries: dict[BinaryPackageId, BinaryPackage], 

33 pkg_universe: "BinaryPackageUniverse", 

34 policy_engine: "PolicyEngine", 

35 mi_factory: MigrationItemFactory, 

36 hints: "HintCollection", 

37 ) -> None: 

38 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

39 self.logger = logging.getLogger(logger_name) 

40 self.options = options 

41 self.suite_info = suite_info 

42 self.all_binaries = all_binaries 

43 self.pkg_universe = pkg_universe 

44 self._policy_engine = policy_engine 

45 self._migration_item_factory = mi_factory 

46 self.hints = hints 

47 self.excuses: dict[str, Excuse] = {} 

48 

49 def _get_build_link( 

50 self, arch: str, src: str, ver: str, label: str | None = None 

51 ) -> str: 

52 """Return a link to the build logs, labelled 'arch' per default""" 

53 if label is None: 

54 label = arch 

55 if self.options.build_url: 

56 url = self.options.build_url.format( 

57 arch=arch, source=quote(src), version=quote(ver) 

58 ) 

59 return f'<a href="{url}" target="_blank">{label}</a>' 

60 else: 

61 return label 

62 

63 def _should_remove_source(self, item: MigrationItem) -> bool: 

64 """Check if a source package should be removed from testing 

65 

66 This method checks if a source package should be removed from the 

67 target suite; this happens if the source package is not 

68 present in the primary source suite anymore. 

69 

70 It returns True if the package can be removed, False otherwise. 

71 In the former case, a new excuse is appended to the object 

72 attribute excuses. 

73 """ 

74 if hasattr(self.options, "partial_source"): 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 return False 

76 # if the source package is available in unstable, then do nothing 

77 source_suite = self.suite_info.primary_source_suite 

78 pkg = item.package 

79 if pkg in source_suite.sources: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 return False 

81 # otherwise, add a new excuse for its removal 

82 src = item.suite.sources[pkg] 

83 excuse = Excuse(item) 

84 excuse.addinfo("Package not in %s, will try to remove" % source_suite.name) 

85 excuse.set_vers(src.version, None) 

86 if src.maintainer: 

87 excuse.set_maint(src.maintainer) 

88 if src.section: 88 ↛ 92line 88 didn't jump to line 92 because the condition on line 88 was always true

89 excuse.set_section(src.section) 

90 

91 # if the package is blocked, skip it 

92 for hint in self.hints.search("block", package=pkg, removal=True): 

93 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

94 excuse.add_verdict_info( 

95 excuse.policy_verdict, 

96 "Not touching package, as requested by %s " 

97 "(contact %s-release if update is needed)" 

98 % (hint.user, self.options.distribution), 

99 ) 

100 excuse.addreason("block") 

101 self.excuses[excuse.name] = excuse 

102 return False 

103 

104 excuse.policy_verdict = PolicyVerdict.PASS 

105 self.excuses[excuse.name] = excuse 

106 return True 

107 

108 def _should_upgrade_srcarch(self, item: MigrationItem) -> bool: 

109 """Check if a set of binary packages should be upgraded 

110 

111 This method checks if the binary packages produced by the source 

112 package on the given architecture should be upgraded; this can 

113 happen also if the migration is a binary-NMU for the given arch. 

114 

115 It returns False if the given packages don't need to be upgraded, 

116 True otherwise. In the former case, a new excuse is appended to 

117 the object attribute excuses. 

118 """ 

119 # retrieve the source packages for testing and suite 

120 

121 target_suite = self.suite_info.target_suite 

122 source_suite = item.suite 

123 src = item.package 

124 arch = item.architecture 

125 source_t = target_suite.sources[src] 

126 source_u = source_suite.sources[src] 

127 

128 excuse = Excuse(item) 

129 excuse.set_vers(source_t.version, source_t.version) 

130 if source_u.maintainer: 130 ↛ 132line 130 didn't jump to line 132 because the condition on line 130 was always true

131 excuse.set_maint(source_u.maintainer) 

132 if source_u.section: 132 ↛ 139line 132 didn't jump to line 139 because the condition on line 132 was always true

133 excuse.set_section(source_u.section) 

134 

135 # if there is a `remove' hint and the requested version is the same as the 

136 # version in testing, then stop here and return False 

137 # (as a side effect, a removal may generate such excuses for both the source 

138 # package and its binary packages on each architecture) 

139 for hint in self.hints.search("remove", package=src, version=source_t.version): 

140 excuse.add_hint(hint) 

141 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

142 excuse.add_verdict_info( 

143 excuse.policy_verdict, "Removal request by %s" % (hint.user) 

144 ) 

145 excuse.add_verdict_info( 

146 excuse.policy_verdict, "Trying to remove package, not update it" 

147 ) 

148 self.excuses[excuse.name] = excuse 

149 return False 

150 

151 # the starting point is that there is nothing wrong and nothing worth doing 

152 anywrongver = False 

153 anyworthdoing = False 

154 

155 packages_t_a = target_suite.binaries[arch] 

156 packages_s_a = source_suite.binaries[arch] 

157 

158 wrong_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

159 

160 # for every binary package produced by this source in unstable for this architecture 

161 for pkg_id in { 

162 x for x in filter_out_faux(source_u.binaries) if x.architecture == arch 

163 }: 

164 pkg_name = pkg_id.package_name 

165 # TODO filter binaries based on checks below? 

166 excuse.add_package(pkg_id) 

167 

168 # retrieve the testing (if present) and unstable corresponding binary packages 

169 binary_t = packages_t_a[pkg_name] if pkg_name in packages_t_a else None 

170 binary_u = packages_s_a[pkg_name] 

171 

172 # this is the source version for the new binary package 

173 pkgsv = binary_u.source_version 

174 

175 # if the new binary package is architecture-independent, then skip it 

176 if binary_u.architecture == "all": 

177 if pkg_id not in source_t.binaries: 

178 # only add a note if the arch:all does not match the expected version 

179 excuse.add_detailed_info( 

180 "Ignoring %s %s (from %s) as it is arch: all" 

181 % (pkg_name, binary_u.version, pkgsv) 

182 ) 

183 continue 

184 

185 # if the new binary package is not from the same source as the testing one, then skip it 

186 # this implies that this binary migration is part of a source migration 

187 if source_u.version == pkgsv and source_t.version != pkgsv: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 anywrongver = True 

189 excuse.add_verdict_info( 

190 wrong_verdict, 

191 "From wrong source: %s %s (%s not %s)" 

192 % (pkg_name, binary_u.version, pkgsv, source_t.version), 

193 ) 

194 continue 

195 

196 # cruft in unstable 

197 if source_u.version != pkgsv and source_t.version != pkgsv: 

198 if self.options.ignore_cruft: 

199 excuse.add_detailed_info( 

200 "Old cruft: %s %s (but ignoring cruft, so nevermind)" 

201 % (pkg_name, pkgsv) 

202 ) 

203 else: 

204 anywrongver = True 

205 excuse.add_verdict_info( 

206 wrong_verdict, f"Old cruft: {pkg_name} {pkgsv}" 

207 ) 

208 continue 

209 

210 # if the source package has been updated in unstable and this is a binary migration, skip it 

211 # (the binaries are now out-of-date) 

212 if source_t.version == pkgsv and source_t.version != source_u.version: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 anywrongver = True 

214 excuse.add_verdict_info( 

215 wrong_verdict, 

216 "From wrong source: %s %s (%s not %s)" 

217 % (pkg_name, binary_u.version, pkgsv, source_u.version), 

218 ) 

219 continue 

220 

221 # if the binary is not present in testing, then it is a new binary; 

222 # in this case, there is something worth doing 

223 if not binary_t: 

224 excuse.add_detailed_info(f"New binary: {pkg_name} ({binary_u.version})") 

225 anyworthdoing = True 

226 continue 

227 

228 # at this point, the binary package is present in testing, so we can compare 

229 # the versions of the packages ... 

230 vcompare = apt_pkg.version_compare(binary_t.version, binary_u.version) 

231 

232 # ... if updating would mean downgrading, then stop here: there is something wrong 

233 if vcompare > 0: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 anywrongver = True 

235 excuse.add_verdict_info( 

236 wrong_verdict, 

237 "Not downgrading: %s (%s to %s)" 

238 % (pkg_name, binary_t.version, binary_u.version), 

239 ) 

240 break 

241 # ... if updating would mean upgrading, then there is something worth doing 

242 elif vcompare < 0: 

243 excuse.add_detailed_info( 

244 "Updated binary: %s (%s to %s)" 

245 % (pkg_name, binary_t.version, binary_u.version) 

246 ) 

247 anyworthdoing = True 

248 

249 srcv = source_u.version 

250 same_source = source_t.version == srcv 

251 primary_source_suite = self.suite_info.primary_source_suite 

252 is_primary_source = source_suite == primary_source_suite 

253 

254 # if there is nothing wrong and there is something worth doing or the source 

255 # package is not fake, then check what packages should be removed 

256 if not anywrongver and (anyworthdoing or not source_u.is_fakesrc): 

257 # we want to remove binaries that are no longer produced by the 

258 # new source, but there are some special cases: 

259 # - if this is binary-only (same_source) and not from the primary 

260 # source, we don't do any removals: 

261 # binNMUs in *pu on some architectures would otherwise result in 

262 # the removal of binaries on other architectures 

263 # - for the primary source, smooth binaries in the target suite 

264 # are not considered for removal 

265 if not same_source or is_primary_source: 

266 smoothbins = set() 

267 if is_primary_source: 267 ↛ 285line 267 didn't jump to line 285 because the condition on line 267 was always true

268 binaries_t = target_suite.binaries 

269 possible_smooth_updates = [ 

270 p for p in source_t.binaries if p.architecture == arch 

271 ] 

272 smoothbins = find_smooth_updateable_binaries( 

273 possible_smooth_updates, 

274 source_u, 

275 self.pkg_universe, 

276 target_suite, 

277 binaries_t, 

278 source_suite.binaries, 

279 cast(frozenset["BinaryPackageId"], frozenset()), 

280 self.options.smooth_updates, 

281 self.hints, 

282 ) 

283 

284 # for every binary package produced by this source in testing for this architecture 

285 for pkg_id in sorted( 

286 x for x in source_t.binaries if x.architecture == arch 

287 ): 

288 pkg = pkg_id.package_name 

289 # if the package is architecture-independent, then ignore it 

290 tpkg_data = packages_t_a[pkg] 

291 if tpkg_data.architecture == "all": 

292 if pkg_id not in source_u.binaries: 

293 # only add a note if the arch:all does not match the expected version 

294 excuse.add_detailed_info( 

295 "Ignoring removal of %s as it is arch: all" % (pkg) 

296 ) 

297 continue 

298 # if the package is not produced by the new source package, then remove it from testing 

299 if pkg not in packages_s_a: 

300 excuse.add_detailed_info( 

301 f"Removed binary: {pkg} {tpkg_data.version}" 

302 ) 

303 # the removed binary is only interesting if this is a binary-only migration, 

304 # as otherwise the updated source will already cause the binary packages 

305 # to be updated 

306 if same_source and pkg_id not in smoothbins: 

307 # Special-case, if the binary is a candidate for a smooth update, we do not consider 

308 # it "interesting" on its own. This case happens quite often with smooth updatable 

309 # packages, where the old binary "survives" a full run because it still has 

310 # reverse dependencies. 

311 anyworthdoing = True 

312 

313 if not anyworthdoing and not ( 

314 self.options.archall_inconsistency_allowed and excuse.detailed_info 

315 ): 

316 # nothing worth doing, we don't add an excuse to the list, we just return false 

317 return False 

318 

319 if not anyworthdoing: 

320 # This source has binary differences between the target and source 

321 # suite, but we're not going to upgrade them. Part of the purpose 

322 # of options.archall_inconsistency_allowed is to log the excuse 

323 # with a temporary failure such that the administrators can take 

324 # action so they wish. 

325 excuse.policy_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

326 excuse.addreason("everything-ignored") 

327 

328 else: 

329 # there is something worth doing 

330 # we assume that this package will be ok, if not invalidated below 

331 excuse.policy_verdict = PolicyVerdict.PASS 

332 

333 # if there is something something wrong, reject this package 

334 if anywrongver: 

335 excuse.policy_verdict = wrong_verdict 

336 

337 self._policy_engine.apply_srcarch_policies( 

338 item, arch, source_t, source_u, excuse 

339 ) 

340 

341 self.excuses[excuse.name] = excuse 

342 return excuse.is_valid 

343 

344 def _should_upgrade_src(self, item: MigrationItem) -> bool: 

345 """Check if source package should be upgraded 

346 

347 This method checks if a source package should be upgraded. The analysis 

348 is performed for the source package specified by the `src' parameter, 

349 for the distribution `source_suite'. 

350 

351 It returns False if the given package doesn't need to be upgraded, 

352 True otherwise. In the former case, a new excuse is appended to 

353 the object attribute excuses. 

354 """ 

355 

356 src = item.package 

357 source_suite = item.suite 

358 suite_name = source_suite.name 

359 source_u = source_suite.sources[src] 

360 if source_u.is_fakesrc: 360 ↛ 362line 360 didn't jump to line 362 because the condition on line 360 was never true

361 # it is a fake package created to satisfy Britney implementation details; silently ignore it 

362 return False 

363 

364 target_suite = self.suite_info.target_suite 

365 # retrieve the source packages for testing (if available) and suite 

366 if src in target_suite.sources: 

367 source_t = target_suite.sources[src] 

368 # if testing and unstable have the same version, then this is a candidate for binary-NMUs only 

369 if apt_pkg.version_compare(source_t.version, source_u.version) == 0: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 return False 

371 else: 

372 source_t = None 

373 

374 excuse = Excuse(item) 

375 excuse.set_vers(source_t and source_t.version or None, source_u.version) 

376 if source_u.maintainer: 376 ↛ 378line 376 didn't jump to line 378 because the condition on line 376 was always true

377 excuse.set_maint(source_u.maintainer) 

378 if source_u.section: 378 ↛ 380line 378 didn't jump to line 380 because the condition on line 378 was always true

379 excuse.set_section(source_u.section) 

380 excuse.add_package(PackageId(src, source_u.version, "source")) 

381 

382 # if the version in unstable is older, then stop here with a warning in the excuse and return False 

383 if source_t and apt_pkg.version_compare(source_u.version, source_t.version) < 0: 

384 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

385 excuse.add_verdict_info( 

386 excuse.policy_verdict, 

387 "ALERT: %s is newer in the target suite (%s %s)" 

388 % (src, source_t.version, source_u.version), 

389 ) 

390 self.excuses[excuse.name] = excuse 

391 excuse.addreason("newerintesting") 

392 return False 

393 

394 # the starting point is that we will update the candidate 

395 excuse.policy_verdict = PolicyVerdict.PASS 

396 

397 # if there is a `remove' hint and the requested version is the same as the 

398 # version in testing, then stop here and return False 

399 for hint in self.hints.search("remove", package=src): 

400 if ( 400 ↛ 399line 400 didn't jump to line 399

401 source_t 

402 and source_t.version == hint.version 

403 or source_u.version == hint.version 

404 ): 

405 excuse.add_hint(hint) 

406 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

407 excuse.add_verdict_info( 

408 excuse.policy_verdict, "Removal request by %s" % (hint.user) 

409 ) 

410 excuse.add_verdict_info( 

411 excuse.policy_verdict, "Trying to remove package, not update it" 

412 ) 

413 break 

414 

415 all_binaries = self.all_binaries 

416 

417 # at this point, we check the status of the builds on all the supported architectures 

418 # to catch the out-of-date ones 

419 archs_to_consider = list(self.options.architectures) 

420 archs_to_consider.append("all") 

421 for arch in archs_to_consider: 

422 oodbins: dict[str, set[str]] = {} 

423 uptodatebins = False 

424 # for every binary package produced by this source in the suite for this architecture 

425 if arch == "all": 

426 consider_binaries: Iterable[BinaryPackageId] = source_u.binaries 

427 else: 

428 # Will also include arch:all for the given architecture (they are filtered out 

429 # below) 

430 consider_binaries = sorted( 

431 x for x in source_u.binaries if x.architecture == arch 

432 ) 

433 for pkg_id in consider_binaries: 

434 pkg = pkg_id.package_name 

435 

436 # retrieve the binary package and its source version 

437 binary_u = all_binaries[pkg_id] 

438 pkgsv = binary_u.source_version 

439 

440 # arch:all packages are treated separately from arch:arch 

441 if binary_u.architecture != arch: 

442 continue 

443 

444 # TODO filter binaries based on checks below? 

445 excuse.add_package(pkg_id) 

446 

447 if pkg_id.package_name.endswith("-faux-build-depends"): 

448 continue 

449 

450 # if it wasn't built by the same source, it is out-of-date 

451 # if there is at least one binary on this arch which is 

452 # up-to-date, there is a build on this arch 

453 if source_u.version != pkgsv or pkg_id.architecture == "faux": 

454 if pkgsv not in oodbins: 

455 oodbins[pkgsv] = set() 

456 oodbins[pkgsv].add(pkg) 

457 if pkg_id.architecture != "faux": 

458 excuse.add_old_binary(pkg, pkgsv) 

459 continue 

460 else: 

461 uptodatebins = True 

462 

463 # if there are out-of-date packages, warn about them in the excuse and set excuse.is_valid 

464 # to False to block the update; if the architecture where the package is out-of-date is 

465 # in the `outofsync_arches' list, then do not block the update 

466 if oodbins: 

467 oodtxt = "" 

468 for v in sorted(oodbins): 

469 if oodtxt: 469 ↛ 470line 469 didn't jump to line 470 because the condition on line 469 was never true

470 oodtxt = oodtxt + "; " 

471 oodtxt = oodtxt + "{} (from {})".format( 

472 ", ".join(sorted(oodbins[v])), 

473 self._get_build_link(arch, src, v, label=v), 

474 ) 

475 

476 if uptodatebins: 

477 text = "Old binaries left on {}: {}".format( 

478 self._get_build_link(arch, src, source_u.version), 

479 oodtxt, 

480 ) 

481 else: 

482 text = "Missing build on %s" % ( 

483 self._get_build_link(arch, src, source_u.version) 

484 ) 

485 

486 if arch in self.options.outofsync_arches: 

487 text = text + " (but %s isn't keeping up, so nevermind)" % (arch) 

488 if not uptodatebins: 488 ↛ 421line 488 didn't jump to line 421 because the condition on line 488 was always true

489 excuse.missing_build_on_ood_arch(arch) 

490 else: 

491 if uptodatebins: 

492 if self.options.ignore_cruft: 

493 text = text + " (but ignoring cruft, so nevermind)" 

494 excuse.add_detailed_info(text) 

495 else: 

496 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

497 excuse.addreason("cruft") 

498 excuse.add_verdict_info(excuse.policy_verdict, text) 

499 else: 

500 excuse.policy_verdict = ( 

501 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

502 ) 

503 excuse.missing_build_on_arch(arch) 

504 excuse.addreason("missingbuild") 

505 excuse.add_verdict_info(excuse.policy_verdict, text) 

506 if excuse.old_binaries: 

507 excuse.add_detailed_info( 

508 f"old binaries on {arch}: {oodtxt}" 

509 ) 

510 

511 # if the source package has no binaries, set is_valid to False to block the update 

512 if not { 

513 x for x in filter_out_faux(source_u.binaries) if x.architecture != "faux" 

514 }: 

515 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

516 excuse.add_verdict_info( 

517 excuse.policy_verdict, "%s has no binaries on any arch" % src 

518 ) 

519 excuse.addreason("no-binaries") 

520 

521 self._policy_engine.apply_src_policies(item, source_t, source_u, excuse) 

522 

523 if source_suite.suite_class.is_additional_source and source_t: 

524 # o-o-d(ish) checks for (t-)p-u 

525 # This only makes sense if the package is actually in testing. 

526 for arch in self.options.architectures: 

527 # if the package in testing has no binaries on this 

528 # architecture, it can't be out-of-date 

529 if not any( 

530 x 

531 for x in source_t.binaries 

532 if x.architecture == arch and all_binaries[x].architecture != "all" 

533 ): 

534 continue 

535 

536 # if the (t-)p-u package has produced any binaries on 

537 # this architecture then we assume it's ok. this allows for 

538 # uploads to (t-)p-u which intentionally drop binary 

539 # packages 

540 if any( 

541 x 

542 for x in source_suite.binaries[arch].values() 

543 if x.source == src 

544 and x.source_version == source_u.version 

545 and x.architecture != "all" 

546 ): 

547 continue 

548 

549 # TODO: Find a way to avoid hardcoding pu/stable relation. 

550 if suite_name == "pu": 550 ↛ 551line 550 didn't jump to line 551 because the condition on line 550 was never true

551 base = "stable" 

552 else: 

553 base = target_suite.name 

554 text = "Not yet built on %s (relative to target suite)" % ( 

555 self._get_build_link(arch, src, source_u.version) 

556 ) 

557 

558 if arch in self.options.outofsync_arches: 558 ↛ 559line 558 didn't jump to line 559 because the condition on line 558 was never true

559 text = text + " (but %s isn't keeping up, so never mind)" % (arch) 

560 excuse.missing_build_on_ood_arch(arch) 

561 excuse.addinfo(text) 

562 else: 

563 excuse.policy_verdict = ( 

564 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

565 ) 

566 excuse.missing_build_on_arch(arch) 

567 excuse.addreason("missingbuild") 

568 excuse.add_verdict_info(excuse.policy_verdict, text) 

569 

570 # check if there is a `force' hint for this package, which allows it to go in even if it is not updateable 

571 forces = self.hints.search("force", package=src, version=source_u.version) 

572 if forces: 

573 # force() updates the final verdict for us 

574 changed_state = excuse.force() 

575 if changed_state: 

576 excuse.addinfo("Should ignore, but forced by %s" % (forces[0].user)) 

577 

578 self.excuses[excuse.name] = excuse 

579 return excuse.is_valid 

580 

581 def _compute_excuses_and_initial_actionable_items(self) -> set[MigrationItem]: 

582 # list of local methods and variables (for better performance) 

583 excuses = self.excuses 

584 suite_info = self.suite_info 

585 pri_source_suite = suite_info.primary_source_suite 

586 architectures = self.options.architectures 

587 should_remove_source = self._should_remove_source 

588 should_upgrade_srcarch = self._should_upgrade_srcarch 

589 should_upgrade_src = self._should_upgrade_src 

590 

591 sources_ps = pri_source_suite.sources 

592 sources_t = suite_info.target_suite.sources 

593 

594 # this set will contain the packages which are valid candidates; 

595 # if a package is going to be removed, it will have a "-" prefix 

596 actionable_items: set[MigrationItem] = set() 

597 actionable_items_add = actionable_items.add # Every . in a loop slows it down 

598 

599 # for every source package in testing, check if it should be removed 

600 for pkg in sources_t: 

601 if pkg not in sources_ps: 

602 src_t = sources_t[pkg] 

603 item = MigrationItem( 

604 package=pkg, 

605 version=src_t.version, 

606 suite=suite_info.target_suite, 

607 is_removal=True, 

608 ) 

609 if should_remove_source(item): 

610 actionable_items_add(item) 

611 

612 # for every source package in the source suites, check if it should be upgraded 

613 for suite in chain((pri_source_suite, *suite_info.additional_source_suites)): 

614 sources_s = suite.sources 

615 for pkg in sources_s: 

616 src_s_data = sources_s[pkg] 

617 if src_s_data.is_fakesrc: 

618 continue 

619 src_t_data = sources_t.get(pkg) 

620 

621 if ( 

622 src_t_data is None 

623 or apt_pkg.version_compare(src_s_data.version, src_t_data.version) 

624 != 0 

625 ): 

626 item = MigrationItem( 

627 package=pkg, version=src_s_data.version, suite=suite 

628 ) 

629 # check if the source package should be upgraded 

630 if should_upgrade_src(item): 

631 actionable_items_add(item) 

632 else: 

633 # package has same version in source and target suite; check if any of the 

634 # binaries have changed on the various architectures 

635 for arch in architectures: 

636 item = MigrationItem( 

637 package=pkg, 

638 version=src_s_data.version, 

639 architecture=arch, 

640 suite=suite, 

641 ) 

642 if should_upgrade_srcarch(item): 

643 actionable_items_add(item) 

644 

645 # process the `remove' hints, if the given package is not yet in actionable_items 

646 for hint in self.hints["remove"]: 

647 src_r = hint.package 

648 if src_r not in sources_t: 

649 continue 

650 

651 existing_items = {x for x in actionable_items if x.package == src_r} 

652 if existing_items: 

653 self.logger.info( 

654 "removal hint '%s' ignored due to existing item(s) %s" 

655 % (hint, [i.name for i in existing_items]) 

656 ) 

657 continue 

658 

659 tsrcv = sources_t[src_r].version 

660 item = MigrationItem( 

661 package=src_r, 

662 version=tsrcv, 

663 suite=suite_info.target_suite, 

664 is_removal=True, 

665 ) 

666 

667 # check if the version specified in the hint is the same as the considered package 

668 if tsrcv != hint.version: 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true

669 continue 

670 

671 # add the removal of the package to actionable_items and build a new excuse 

672 excuse = Excuse(item) 

673 excuse.set_vers(tsrcv, None) 

674 excuse.addinfo("Removal request by %s" % (hint.user)) 

675 # if the removal of the package is blocked, skip it 

676 blocked = False 

677 for blockhint in self.hints.search("block", package=src_r, removal=True): 

678 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

679 excuse.add_verdict_info( 

680 excuse.policy_verdict, 

681 "Not removing package, due to block hint by %s " 

682 "(contact %s-release if update is needed)" 

683 % (blockhint.user, self.options.distribution), 

684 ) 

685 excuse.addreason("block") 

686 blocked = True 

687 

688 if blocked: 

689 excuses[excuse.name] = excuse 

690 continue 

691 

692 actionable_items_add(item) 

693 excuse.addinfo("Package is broken, will try to remove") 

694 excuse.add_hint(hint) 

695 # Using "PASS" here as "Created by a hint" != "accepted due to hint". In a future 

696 # where there might be policy checks on removals, it would make sense to distinguish 

697 # those two states. Not sure that future will ever be. 

698 excuse.policy_verdict = PolicyVerdict.PASS 

699 excuses[excuse.name] = excuse 

700 

701 return actionable_items 

702 

703 def find_actionable_excuses(self) -> tuple[dict[str, Excuse], set[MigrationItem]]: 

704 excuses = self.excuses 

705 actionable_items = self._compute_excuses_and_initial_actionable_items() 

706 valid = {x.name for x in actionable_items} 

707 

708 # extract the not considered packages, which are in the excuses but not in upgrade_me 

709 unconsidered = {ename for ename in excuses if ename not in valid} 

710 invalidated: set[str] = set() 

711 

712 invalidate_excuses(excuses, valid, unconsidered, invalidated) 

713 

714 # check that the list of actionable items matches the list of valid 

715 # excuses 

716 assert_sets_equal(valid, {x for x in excuses if excuses[x].is_valid}) 

717 

718 # check that the rdeps for all invalid excuses were invalidated 

719 assert_sets_equal(invalidated, {x for x in excuses if not excuses[x].is_valid}) 

720 

721 actionable_items = {x for x in actionable_items if x.name in valid} 

722 return excuses, actionable_items 

723 

724 

725def assert_sets_equal(a: Any, b: Any) -> None: 

726 if a != b: 726 ↛ 727line 726 didn't jump to line 727 because the condition on line 726 was never true

727 raise AssertionError(f"sets not equal a-b {a - b} b-a {b - a}")