Coverage for britney2/excusefinder.py: 91%

346 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-29 17:21 +0000

1import logging 

2import optparse 

3from collections.abc import Iterable 

4from itertools import chain 

5from typing import TYPE_CHECKING, Any, cast 

6from urllib.parse import quote 

7 

8import apt_pkg 

9 

10from britney2 import BinaryPackage, BinaryPackageId, PackageId, Suites 

11from britney2.excuse import Excuse 

12from britney2.migrationitem import MigrationItem, MigrationItemFactory 

13from britney2.policies import PolicyVerdict 

14from britney2.utils import ( 

15 filter_out_faux, 

16 find_smooth_updateable_binaries, 

17 invalidate_excuses, 

18) 

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true

21 from .hints import HintCollection 

22 from .installability.universe import BinaryPackageUniverse 

23 from .policies.policy import PolicyEngine 

24 

25 

26class ExcuseFinder: 

27 

28 def __init__( 

29 self, 

30 options: optparse.Values, 

31 suite_info: Suites, 

32 all_binaries: dict[BinaryPackageId, BinaryPackage], 

33 pkg_universe: "BinaryPackageUniverse", 

34 policy_engine: "PolicyEngine", 

35 mi_factory: MigrationItemFactory, 

36 hints: "HintCollection", 

37 ) -> None: 

38 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

39 self.logger = logging.getLogger(logger_name) 

40 self.options = options 

41 self.suite_info = suite_info 

42 self.all_binaries = all_binaries 

43 self.pkg_universe = pkg_universe 

44 self._policy_engine = policy_engine 

45 self._migration_item_factory = mi_factory 

46 self.hints = hints 

47 self.excuses: dict[str, Excuse] = {} 

48 

49 def _get_build_link( 

50 self, arch: str, src: str, ver: str, label: str | None = None 

51 ) -> str: 

52 """Return a link to the build logs, labelled 'arch' per default""" 

53 if label is None: 

54 label = arch 

55 if self.options.build_url: 

56 url = self.options.build_url.format( 

57 arch=arch, source=quote(src), version=quote(ver) 

58 ) 

59 return f'<a href="{url}" target="_blank">{label}</a>' 

60 else: 

61 return label 

62 

63 def _should_remove_source(self, item: MigrationItem) -> bool: 

64 """Check if a source package should be removed from testing 

65 

66 This method checks if a source package should be removed from the 

67 target suite; this happens if the source package is not 

68 present in the primary source suite anymore. 

69 

70 It returns True if the package can be removed, False otherwise. 

71 In the former case, a new excuse is appended to the object 

72 attribute excuses. 

73 """ 

74 if hasattr(self.options, "partial_source"): 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 return False 

76 # if the source package is available in unstable, then do nothing 

77 source_suite = self.suite_info.primary_source_suite 

78 pkg = item.package 

79 if pkg in source_suite.sources: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 return False 

81 # otherwise, add a new excuse for its removal 

82 src = item.suite.sources[pkg] 

83 excuse = Excuse(item) 

84 excuse.addinfo("Package not in %s, will try to remove" % source_suite.name) 

85 excuse.set_vers(src.version, None) 

86 if src.maintainer: 

87 excuse.set_maint(src.maintainer) 

88 if src.section: 88 ↛ 92line 88 didn't jump to line 92 because the condition on line 88 was always true

89 excuse.set_section(src.section) 

90 

91 # if the package is blocked, skip it 

92 for hint in self.hints.search("block", package=pkg, removal=True): 

93 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

94 excuse.add_verdict_info( 

95 excuse.policy_verdict, 

96 "Not touching package, as requested by %s " 

97 "(contact %s-release if update is needed)" 

98 % (hint.user, self.options.distribution), 

99 ) 

100 excuse.addreason("block") 

101 self.excuses[excuse.name] = excuse 

102 return False 

103 

104 excuse.policy_verdict = PolicyVerdict.PASS 

105 self.excuses[excuse.name] = excuse 

106 return True 

107 

108 def _should_upgrade_srcarch(self, item: MigrationItem) -> bool: 

109 """Check if a set of binary packages should be upgraded 

110 

111 This method checks if the binary packages produced by the source 

112 package on the given architecture should be upgraded; this can 

113 happen also if the migration is a binary-NMU for the given arch. 

114 

115 It returns False if the given packages don't need to be upgraded, 

116 True otherwise. In the former case, a new excuse is appended to 

117 the object attribute excuses. 

118 """ 

119 # retrieve the source packages for testing and suite 

120 

121 target_suite = self.suite_info.target_suite 

122 source_suite = item.suite 

123 src = item.package 

124 arch = item.architecture 

125 source_t = target_suite.sources[src] 

126 source_u = source_suite.sources[src] 

127 

128 excuse = Excuse(item) 

129 excuse.set_vers(source_t.version, source_t.version) 

130 if source_u.maintainer: 130 ↛ 132line 130 didn't jump to line 132 because the condition on line 130 was always true

131 excuse.set_maint(source_u.maintainer) 

132 if source_u.section: 132 ↛ 139line 132 didn't jump to line 139 because the condition on line 132 was always true

133 excuse.set_section(source_u.section) 

134 

135 # if there is a `remove' hint and the requested version is the same as the 

136 # version in testing, then stop here and return False 

137 # (as a side effect, a removal may generate such excuses for both the source 

138 # package and its binary packages on each architecture) 

139 for hint in self.hints.search("remove", package=src, version=source_t.version): 

140 excuse.add_hint(hint) 

141 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

142 excuse.add_verdict_info( 

143 excuse.policy_verdict, "Removal request by %s" % (hint.user) 

144 ) 

145 excuse.add_verdict_info( 

146 excuse.policy_verdict, "Trying to remove package, not update it" 

147 ) 

148 self.excuses[excuse.name] = excuse 

149 return False 

150 

151 # the starting point is that there is nothing wrong and nothing worth doing 

152 anywrongver = False 

153 anyworthdoing = False 

154 

155 packages_t_a = target_suite.binaries[arch] 

156 packages_s_a = source_suite.binaries[arch] 

157 

158 wrong_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

159 

160 # for every binary package produced by this source in unstable for this architecture 

161 for pkg_id in { 

162 x for x in filter_out_faux(source_u.binaries) if x.architecture == arch 

163 }: 

164 pkg_name = pkg_id.package_name 

165 # TODO filter binaries based on checks below? 

166 excuse.add_package(pkg_id) 

167 

168 # retrieve the testing (if present) and unstable corresponding binary packages 

169 binary_t = packages_t_a[pkg_name] if pkg_name in packages_t_a else None 

170 binary_u = packages_s_a[pkg_name] 

171 

172 # this is the source version for the new binary package 

173 pkgsv = binary_u.source_version 

174 

175 # if the new binary package is architecture-independent, then skip it 

176 if binary_u.architecture == "all": 

177 if pkg_id not in source_t.binaries: 

178 # only add a note if the arch:all does not match the expected version 

179 excuse.add_detailed_info( 

180 "Ignoring %s %s (from %s) as it is arch: all" 

181 % (pkg_name, binary_u.version, pkgsv) 

182 ) 

183 continue 

184 

185 # if the new binary package is not from the same source as the testing one, then skip it 

186 # this implies that this binary migration is part of a source migration 

187 if source_u.version == pkgsv and source_t.version != pkgsv: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 anywrongver = True 

189 excuse.add_verdict_info( 

190 wrong_verdict, 

191 "From wrong source: %s %s (%s not %s)" 

192 % (pkg_name, binary_u.version, pkgsv, source_t.version), 

193 ) 

194 continue 

195 

196 # cruft in unstable 

197 if source_u.version != pkgsv and source_t.version != pkgsv: 

198 if self.options.ignore_cruft: 

199 excuse.add_detailed_info( 

200 "Old cruft: %s %s (but ignoring cruft, so nevermind)" 

201 % (pkg_name, pkgsv) 

202 ) 

203 else: 

204 anywrongver = True 

205 excuse.add_verdict_info( 

206 wrong_verdict, f"Old cruft: {pkg_name} {pkgsv}" 

207 ) 

208 continue 

209 

210 # if the source package has been updated in unstable and this is a binary migration, skip it 

211 # (the binaries are now out-of-date) 

212 if source_t.version == pkgsv and source_t.version != source_u.version: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 anywrongver = True 

214 excuse.add_verdict_info( 

215 wrong_verdict, 

216 "From wrong source: %s %s (%s not %s)" 

217 % (pkg_name, binary_u.version, pkgsv, source_u.version), 

218 ) 

219 continue 

220 

221 # if the binary is not present in testing, then it is a new binary; 

222 # in this case, there is something worth doing 

223 if not binary_t: 

224 excuse.add_detailed_info(f"New binary: {pkg_name} ({binary_u.version})") 

225 anyworthdoing = True 

226 continue 

227 

228 # at this point, the binary package is present in testing, so we can compare 

229 # the versions of the packages ... 

230 vcompare = apt_pkg.version_compare(binary_t.version, binary_u.version) 

231 

232 # ... if updating would mean downgrading, then stop here: there is something wrong 

233 if vcompare > 0: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 anywrongver = True 

235 excuse.add_verdict_info( 

236 wrong_verdict, 

237 "Not downgrading: %s (%s to %s)" 

238 % (pkg_name, binary_t.version, binary_u.version), 

239 ) 

240 break 

241 # ... if updating would mean upgrading, then there is something worth doing 

242 elif vcompare < 0: 

243 excuse.add_detailed_info( 

244 "Updated binary: %s (%s to %s)" 

245 % (pkg_name, binary_t.version, binary_u.version) 

246 ) 

247 anyworthdoing = True 

248 

249 srcv = source_u.version 

250 same_source = source_t.version == srcv 

251 primary_source_suite = self.suite_info.primary_source_suite 

252 is_primary_source = source_suite == primary_source_suite 

253 

254 # if there is nothing wrong and there is something worth doing or the source 

255 # package is not fake, then check what packages should be removed 

256 if not anywrongver and (anyworthdoing or not source_u.is_fakesrc): 

257 # we want to remove binaries that are no longer produced by the 

258 # new source, but there are some special cases: 

259 # - if this is binary-only (same_source) and not from the primary 

260 # source, we don't do any removals: 

261 # binNMUs in *pu on some architectures would otherwise result in 

262 # the removal of binaries on other architectures 

263 # - for the primary source, smooth binaries in the target suite 

264 # are not considered for removal 

265 if not same_source or is_primary_source: 

266 smoothbins = set() 

267 if is_primary_source: 267 ↛ 285line 267 didn't jump to line 285 because the condition on line 267 was always true

268 binaries_t = target_suite.binaries 

269 possible_smooth_updates = [ 

270 p for p in source_t.binaries if p.architecture == arch 

271 ] 

272 smoothbins = find_smooth_updateable_binaries( 

273 possible_smooth_updates, 

274 source_u, 

275 self.pkg_universe, 

276 target_suite, 

277 binaries_t, 

278 source_suite.binaries, 

279 cast(frozenset["BinaryPackageId"], frozenset()), 

280 self.options.smooth_updates, 

281 self.hints, 

282 ) 

283 

284 # for every binary package produced by this source in testing for this architecture 

285 for pkg_id in sorted( 

286 x for x in source_t.binaries if x.architecture == arch 

287 ): 

288 pkg = pkg_id.package_name 

289 # if the package is architecture-independent, then ignore it 

290 tpkg_data = packages_t_a[pkg] 

291 if tpkg_data.architecture == "all": 

292 if pkg_id not in source_u.binaries: 

293 # only add a note if the arch:all does not match the expected version 

294 excuse.add_detailed_info( 

295 "Ignoring removal of %s as it is arch: all" % (pkg) 

296 ) 

297 continue 

298 # if the package is not produced by the new source package, then remove it from testing 

299 if pkg not in packages_s_a: 

300 excuse.add_detailed_info( 

301 f"Removed binary: {pkg} {tpkg_data.version}" 

302 ) 

303 # the removed binary is only interesting if this is a binary-only migration, 

304 # as otherwise the updated source will already cause the binary packages 

305 # to be updated 

306 if same_source and pkg_id not in smoothbins: 

307 # Special-case, if the binary is a candidate for a smooth update, we do not consider 

308 # it "interesting" on its own. This case happens quite often with smooth updatable 

309 # packages, where the old binary "survives" a full run because it still has 

310 # reverse dependencies. 

311 anyworthdoing = True 

312 

313 if not anyworthdoing and not ( 

314 self.options.archall_inconsistency_allowed and excuse.detailed_info 

315 ): 

316 # nothing worth doing, we don't add an excuse to the list, we just return false 

317 return False 

318 

319 if not anyworthdoing: 

320 # This source has binary differences between the target and source 

321 # suite, but we're not going to upgrade them. Part of the purpose 

322 # of options.archall_inconsistency_allowed is to log the excuse 

323 # with a temporary failure such that the administrators can take 

324 # action so they wish. 

325 excuse.policy_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

326 excuse.addreason("everything-ignored") 

327 

328 else: 

329 # there is something worth doing 

330 # we assume that this package will be ok, if not invalidated below 

331 excuse.policy_verdict = PolicyVerdict.PASS 

332 

333 # if there is something something wrong, reject this package 

334 if anywrongver: 

335 excuse.policy_verdict = wrong_verdict 

336 

337 self._policy_engine.apply_srcarch_policies(arch, source_t, source_u, excuse) 

338 

339 self.excuses[excuse.name] = excuse 

340 return excuse.is_valid 

341 

342 def _should_upgrade_src(self, item: MigrationItem) -> bool: 

343 """Check if source package should be upgraded 

344 

345 This method checks if a source package should be upgraded. The analysis 

346 is performed for the source package specified by the `src' parameter, 

347 for the distribution `source_suite'. 

348 

349 It returns False if the given package doesn't need to be upgraded, 

350 True otherwise. In the former case, a new excuse is appended to 

351 the object attribute excuses. 

352 """ 

353 

354 src = item.package 

355 source_suite = item.suite 

356 suite_name = source_suite.name 

357 source_u = source_suite.sources[src] 

358 if source_u.is_fakesrc: 358 ↛ 360line 358 didn't jump to line 360 because the condition on line 358 was never true

359 # it is a fake package created to satisfy Britney implementation details; silently ignore it 

360 return False 

361 

362 target_suite = self.suite_info.target_suite 

363 # retrieve the source packages for testing (if available) and suite 

364 if src in target_suite.sources: 

365 source_t = target_suite.sources[src] 

366 # if testing and unstable have the same version, then this is a candidate for binary-NMUs only 

367 if apt_pkg.version_compare(source_t.version, source_u.version) == 0: 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true

368 return False 

369 else: 

370 source_t = None 

371 

372 excuse = Excuse(item) 

373 excuse.set_vers(source_t and source_t.version or None, source_u.version) 

374 if source_u.maintainer: 374 ↛ 376line 374 didn't jump to line 376 because the condition on line 374 was always true

375 excuse.set_maint(source_u.maintainer) 

376 if source_u.section: 376 ↛ 378line 376 didn't jump to line 378 because the condition on line 376 was always true

377 excuse.set_section(source_u.section) 

378 excuse.add_package(PackageId(src, source_u.version, "source")) 

379 

380 # if the version in unstable is older, then stop here with a warning in the excuse and return False 

381 if source_t and apt_pkg.version_compare(source_u.version, source_t.version) < 0: 

382 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

383 excuse.add_verdict_info( 

384 excuse.policy_verdict, 

385 "ALERT: %s is newer in the target suite (%s %s)" 

386 % (src, source_t.version, source_u.version), 

387 ) 

388 self.excuses[excuse.name] = excuse 

389 excuse.addreason("newerintesting") 

390 return False 

391 

392 # the starting point is that we will update the candidate 

393 excuse.policy_verdict = PolicyVerdict.PASS 

394 

395 # if there is a `remove' hint and the requested version is the same as the 

396 # version in testing, then stop here and return False 

397 for hint in self.hints.search("remove", package=src): 

398 if ( 398 ↛ 397line 398 didn't jump to line 397

399 source_t 

400 and source_t.version == hint.version 

401 or source_u.version == hint.version 

402 ): 

403 excuse.add_hint(hint) 

404 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

405 excuse.add_verdict_info( 

406 excuse.policy_verdict, "Removal request by %s" % (hint.user) 

407 ) 

408 excuse.add_verdict_info( 

409 excuse.policy_verdict, "Trying to remove package, not update it" 

410 ) 

411 break 

412 

413 all_binaries = self.all_binaries 

414 

415 # at this point, we check the status of the builds on all the supported architectures 

416 # to catch the out-of-date ones 

417 archs_to_consider = list(self.options.architectures) 

418 archs_to_consider.append("all") 

419 for arch in archs_to_consider: 

420 oodbins: dict[str, set[str]] = {} 

421 uptodatebins = False 

422 # for every binary package produced by this source in the suite for this architecture 

423 if arch == "all": 

424 consider_binaries: Iterable[BinaryPackageId] = source_u.binaries 

425 else: 

426 # Will also include arch:all for the given architecture (they are filtered out 

427 # below) 

428 consider_binaries = sorted( 

429 x for x in source_u.binaries if x.architecture == arch 

430 ) 

431 for pkg_id in consider_binaries: 

432 pkg = pkg_id.package_name 

433 

434 # retrieve the binary package and its source version 

435 binary_u = all_binaries[pkg_id] 

436 pkgsv = binary_u.source_version 

437 

438 # arch:all packages are treated separately from arch:arch 

439 if binary_u.architecture != arch: 

440 continue 

441 

442 # TODO filter binaries based on checks below? 

443 excuse.add_package(pkg_id) 

444 

445 if pkg_id.package_name.endswith("-faux-build-depends"): 

446 continue 

447 

448 # if it wasn't built by the same source, it is out-of-date 

449 # if there is at least one binary on this arch which is 

450 # up-to-date, there is a build on this arch 

451 if source_u.version != pkgsv or pkg_id.architecture == "faux": 

452 if pkgsv not in oodbins: 

453 oodbins[pkgsv] = set() 

454 oodbins[pkgsv].add(pkg) 

455 if pkg_id.architecture != "faux": 

456 excuse.add_old_binary(pkg, pkgsv) 

457 continue 

458 else: 

459 uptodatebins = True 

460 

461 # if there are out-of-date packages, warn about them in the excuse and set excuse.is_valid 

462 # to False to block the update; if the architecture where the package is out-of-date is 

463 # in the `outofsync_arches' list, then do not block the update 

464 if oodbins: 

465 oodtxt = "" 

466 for v in sorted(oodbins): 

467 if oodtxt: 467 ↛ 468line 467 didn't jump to line 468 because the condition on line 467 was never true

468 oodtxt = oodtxt + "; " 

469 oodtxt = oodtxt + "{} (from {})".format( 

470 ", ".join(sorted(oodbins[v])), 

471 self._get_build_link(arch, src, v, label=v), 

472 ) 

473 

474 if uptodatebins: 

475 text = "Old binaries left on {}: {}".format( 

476 self._get_build_link(arch, src, source_u.version), 

477 oodtxt, 

478 ) 

479 else: 

480 text = "Missing build on %s" % ( 

481 self._get_build_link(arch, src, source_u.version) 

482 ) 

483 

484 if arch in self.options.outofsync_arches: 

485 text = text + " (but %s isn't keeping up, so nevermind)" % (arch) 

486 if not uptodatebins: 486 ↛ 419line 486 didn't jump to line 419 because the condition on line 486 was always true

487 excuse.missing_build_on_ood_arch(arch) 

488 else: 

489 if uptodatebins: 

490 if self.options.ignore_cruft: 

491 text = text + " (but ignoring cruft, so nevermind)" 

492 excuse.add_detailed_info(text) 

493 else: 

494 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

495 excuse.addreason("cruft") 

496 excuse.add_verdict_info(excuse.policy_verdict, text) 

497 else: 

498 excuse.policy_verdict = ( 

499 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

500 ) 

501 excuse.missing_build_on_arch(arch) 

502 excuse.addreason("missingbuild") 

503 excuse.add_verdict_info(excuse.policy_verdict, text) 

504 if excuse.old_binaries: 

505 excuse.add_detailed_info( 

506 f"old binaries on {arch}: {oodtxt}" 

507 ) 

508 

509 # if the source package has no binaries, set is_valid to False to block the update 

510 if not { 

511 x for x in filter_out_faux(source_u.binaries) if x.architecture != "faux" 

512 }: 

513 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

514 excuse.add_verdict_info( 

515 excuse.policy_verdict, "%s has no binaries on any arch" % src 

516 ) 

517 excuse.addreason("no-binaries") 

518 

519 self._policy_engine.apply_src_policies(source_t, source_u, excuse) 

520 

521 if source_suite.suite_class.is_additional_source and source_t: 

522 # o-o-d(ish) checks for (t-)p-u 

523 # This only makes sense if the package is actually in testing. 

524 for arch in self.options.architectures: 

525 # if the package in testing has no binaries on this 

526 # architecture, it can't be out-of-date 

527 if not any( 

528 x 

529 for x in source_t.binaries 

530 if x.architecture == arch and all_binaries[x].architecture != "all" 

531 ): 

532 continue 

533 

534 # if the (t-)p-u package has produced any binaries on 

535 # this architecture then we assume it's ok. this allows for 

536 # uploads to (t-)p-u which intentionally drop binary 

537 # packages 

538 if any( 

539 x 

540 for x in source_suite.binaries[arch].values() 

541 if x.source == src 

542 and x.source_version == source_u.version 

543 and x.architecture != "all" 

544 ): 

545 continue 

546 

547 # TODO: Find a way to avoid hardcoding pu/stable relation. 

548 if suite_name == "pu": 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true

549 base = "stable" 

550 else: 

551 base = target_suite.name 

552 text = "Not yet built on %s (relative to target suite)" % ( 

553 self._get_build_link(arch, src, source_u.version) 

554 ) 

555 

556 if arch in self.options.outofsync_arches: 556 ↛ 557line 556 didn't jump to line 557 because the condition on line 556 was never true

557 text = text + " (but %s isn't keeping up, so never mind)" % (arch) 

558 excuse.missing_build_on_ood_arch(arch) 

559 excuse.addinfo(text) 

560 else: 

561 excuse.policy_verdict = ( 

562 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

563 ) 

564 excuse.missing_build_on_arch(arch) 

565 excuse.addreason("missingbuild") 

566 excuse.add_verdict_info(excuse.policy_verdict, text) 

567 

568 # check if there is a `force' hint for this package, which allows it to go in even if it is not updateable 

569 forces = self.hints.search("force", package=src, version=source_u.version) 

570 if forces: 

571 # force() updates the final verdict for us 

572 changed_state = excuse.force() 

573 if changed_state: 

574 excuse.addinfo("Should ignore, but forced by %s" % (forces[0].user)) 

575 

576 self.excuses[excuse.name] = excuse 

577 return excuse.is_valid 

578 

579 def _compute_excuses_and_initial_actionable_items(self) -> set[MigrationItem]: 

580 # list of local methods and variables (for better performance) 

581 excuses = self.excuses 

582 suite_info = self.suite_info 

583 pri_source_suite = suite_info.primary_source_suite 

584 architectures = self.options.architectures 

585 should_remove_source = self._should_remove_source 

586 should_upgrade_srcarch = self._should_upgrade_srcarch 

587 should_upgrade_src = self._should_upgrade_src 

588 

589 sources_ps = pri_source_suite.sources 

590 sources_t = suite_info.target_suite.sources 

591 

592 # this set will contain the packages which are valid candidates; 

593 # if a package is going to be removed, it will have a "-" prefix 

594 actionable_items: set[MigrationItem] = set() 

595 actionable_items_add = actionable_items.add # Every . in a loop slows it down 

596 

597 # for every source package in testing, check if it should be removed 

598 for pkg in sources_t: 

599 if pkg not in sources_ps: 

600 src_t = sources_t[pkg] 

601 item = MigrationItem( 

602 package=pkg, 

603 version=src_t.version, 

604 suite=suite_info.target_suite, 

605 is_removal=True, 

606 ) 

607 if should_remove_source(item): 

608 actionable_items_add(item) 

609 

610 # for every source package in the source suites, check if it should be upgraded 

611 for suite in chain((pri_source_suite, *suite_info.additional_source_suites)): 

612 sources_s = suite.sources 

613 for pkg in sources_s: 

614 src_s_data = sources_s[pkg] 

615 if src_s_data.is_fakesrc: 

616 continue 

617 src_t_data = sources_t.get(pkg) 

618 

619 if ( 

620 src_t_data is None 

621 or apt_pkg.version_compare(src_s_data.version, src_t_data.version) 

622 != 0 

623 ): 

624 item = MigrationItem( 

625 package=pkg, version=src_s_data.version, suite=suite 

626 ) 

627 # check if the source package should be upgraded 

628 if should_upgrade_src(item): 

629 actionable_items_add(item) 

630 else: 

631 # package has same version in source and target suite; check if any of the 

632 # binaries have changed on the various architectures 

633 for arch in architectures: 

634 item = MigrationItem( 

635 package=pkg, 

636 version=src_s_data.version, 

637 architecture=arch, 

638 suite=suite, 

639 ) 

640 if should_upgrade_srcarch(item): 

641 actionable_items_add(item) 

642 

643 # process the `remove' hints, if the given package is not yet in actionable_items 

644 for hint in self.hints["remove"]: 

645 src_r = hint.package 

646 if src_r not in sources_t: 

647 continue 

648 

649 existing_items = {x for x in actionable_items if x.package == src_r} 

650 if existing_items: 

651 self.logger.info( 

652 "removal hint '%s' ignored due to existing item(s) %s" 

653 % (hint, [i.name for i in existing_items]) 

654 ) 

655 continue 

656 

657 tsrcv = sources_t[src_r].version 

658 item = MigrationItem( 

659 package=src_r, 

660 version=tsrcv, 

661 suite=suite_info.target_suite, 

662 is_removal=True, 

663 ) 

664 

665 # check if the version specified in the hint is the same as the considered package 

666 if tsrcv != hint.version: 666 ↛ 667line 666 didn't jump to line 667 because the condition on line 666 was never true

667 continue 

668 

669 # add the removal of the package to actionable_items and build a new excuse 

670 excuse = Excuse(item) 

671 excuse.set_vers(tsrcv, None) 

672 excuse.addinfo("Removal request by %s" % (hint.user)) 

673 # if the removal of the package is blocked, skip it 

674 blocked = False 

675 for blockhint in self.hints.search("block", package=src_r, removal=True): 

676 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

677 excuse.add_verdict_info( 

678 excuse.policy_verdict, 

679 "Not removing package, due to block hint by %s " 

680 "(contact %s-release if update is needed)" 

681 % (blockhint.user, self.options.distribution), 

682 ) 

683 excuse.addreason("block") 

684 blocked = True 

685 

686 if blocked: 

687 excuses[excuse.name] = excuse 

688 continue 

689 

690 actionable_items_add(item) 

691 excuse.addinfo("Package is broken, will try to remove") 

692 excuse.add_hint(hint) 

693 # Using "PASS" here as "Created by a hint" != "accepted due to hint". In a future 

694 # where there might be policy checks on removals, it would make sense to distinguish 

695 # those two states. Not sure that future will ever be. 

696 excuse.policy_verdict = PolicyVerdict.PASS 

697 excuses[excuse.name] = excuse 

698 

699 return actionable_items 

700 

701 def find_actionable_excuses(self) -> tuple[dict[str, Excuse], set[MigrationItem]]: 

702 excuses = self.excuses 

703 actionable_items = self._compute_excuses_and_initial_actionable_items() 

704 valid = {x.name for x in actionable_items} 

705 

706 # extract the not considered packages, which are in the excuses but not in upgrade_me 

707 unconsidered = {ename for ename in excuses if ename not in valid} 

708 invalidated: set[str] = set() 

709 

710 invalidate_excuses(excuses, valid, unconsidered, invalidated) 

711 

712 # check that the list of actionable items matches the list of valid 

713 # excuses 

714 assert_sets_equal(valid, {x for x in excuses if excuses[x].is_valid}) 

715 

716 # check that the rdeps for all invalid excuses were invalidated 

717 assert_sets_equal(invalidated, {x for x in excuses if not excuses[x].is_valid}) 

718 

719 actionable_items = {x for x in actionable_items if x.name in valid} 

720 return excuses, actionable_items 

721 

722 

723def assert_sets_equal(a: Any, b: Any) -> None: 

724 if a != b: 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true

725 raise AssertionError(f"sets not equal a-b {a - b} b-a {b - a}")