Coverage for britney2/policies/policy.py: 85%

1249 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-12-13 17:57 +0000

1import json 

2import logging 

3import optparse 

4import os 

5import re 

6import sys 

7import time 

8from abc import ABC, abstractmethod 

9from collections import defaultdict 

10from collections.abc import Callable, Container 

11from enum import IntEnum, unique 

12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast 

13from urllib.parse import quote 

14 

15import apt_pkg 

16import yaml 

17 

18from britney2 import ( 

19 BinaryPackage, 

20 BinaryPackageId, 

21 DependencyType, 

22 PackageId, 

23 SourcePackage, 

24 Suite, 

25 SuiteClass, 

26 Suites, 

27 TargetSuite, 

28) 

29from britney2.excusedeps import DependencySpec 

30from britney2.hints import ( 

31 Hint, 

32 HintAnnotate, 

33 HintCollection, 

34 HintParser, 

35 HintType, 

36 PolicyHintParserProto, 

37) 

38from britney2.inputs.suiteloader import SuiteContentLoader 

39from britney2.migrationitem import MigrationItem, MigrationItemFactory 

40from britney2.policies import ApplySrcPolicy, PolicyVerdict 

41from britney2.utils import ( 

42 GetDependencySolversProto, 

43 compute_reverse_tree, 

44 find_newer_binaries, 

45 get_dependency_solvers, 

46 is_smooth_update_allowed, 

47 parse_option, 

48) 

49 

50if TYPE_CHECKING: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true

51 from ..britney import Britney 

52 from ..excuse import Excuse 

53 from ..installability.universe import BinaryPackageUniverse 

54 

55 

56class PolicyLoadRequest: 

57 __slots__ = ("_options_name", "_default_value", "_policy_constructor") 

58 

59 def __init__( 

60 self, 

61 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

62 options_name: str | None, 

63 default_value: bool, 

64 ) -> None: 

65 self._policy_constructor = policy_constructor 

66 self._options_name = options_name 

67 self._default_value = default_value 

68 

69 def is_enabled(self, options: optparse.Values) -> bool: 

70 if self._options_name is None: 

71 assert self._default_value 

72 return True 

73 actual_value = getattr(options, self._options_name, None) 

74 if actual_value is None: 

75 return self._default_value 

76 return actual_value.lower() in ("yes", "y", "true", "t") 

77 

78 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy": 

79 return self._policy_constructor(options, suite_info) 

80 

81 @classmethod 

82 def always_load( 

83 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"] 

84 ) -> "PolicyLoadRequest": 

85 return cls(policy_constructor, None, True) 

86 

87 @classmethod 

88 def conditionally_load( 

89 cls, 

90 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

91 option_name: str, 

92 default_value: bool, 

93 ) -> "PolicyLoadRequest": 

94 return cls(policy_constructor, option_name, default_value) 

95 

96 

97class PolicyEngine: 

98 def __init__(self) -> None: 

99 self._policies: list["BasePolicy"] = [] 

100 

101 def add_policy(self, policy: "BasePolicy") -> None: 

102 self._policies.append(policy) 

103 

104 def load_policies( 

105 self, 

106 options: optparse.Values, 

107 suite_info: Suites, 

108 policy_load_requests: list[PolicyLoadRequest], 

109 ) -> None: 

110 for policy_load_request in policy_load_requests: 

111 if policy_load_request.is_enabled(options): 

112 self.add_policy(policy_load_request.load(options, suite_info)) 

113 

114 def register_policy_hints(self, hint_parser: HintParser) -> None: 

115 for policy in self._policies: 

116 policy.register_hints(hint_parser) 

117 

118 def initialise(self, britney: "Britney", hints: HintCollection) -> None: 

119 for policy in self._policies: 

120 policy.hints = hints 

121 policy.initialise(britney) 

122 

123 def save_state(self, britney: "Britney") -> None: 

124 for policy in self._policies: 

125 policy.save_state(britney) 

126 

127 def apply_src_policies( 

128 self, 

129 item: MigrationItem, 

130 source_t: SourcePackage | None, 

131 source_u: SourcePackage, 

132 excuse: "Excuse", 

133 ) -> None: 

134 excuse_verdict = excuse.policy_verdict 

135 source_suite = item.suite 

136 suite_class = source_suite.suite_class 

137 for policy in self._policies: 

138 pinfo: dict[str, Any] = {} 

139 policy_verdict = PolicyVerdict.NOT_APPLICABLE 

140 if suite_class in policy.applicable_suites: 

141 if policy.src_policy.run_arch: 

142 for arch in policy.options.architectures: 

143 v = policy.apply_srcarch_policy_impl( 

144 pinfo, item, arch, source_t, source_u, excuse 

145 ) 

146 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

147 if policy.src_policy.run_src: 

148 v = policy.apply_src_policy_impl( 

149 pinfo, item, source_t, source_u, excuse 

150 ) 

151 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

152 # The base policy provides this field, so the subclass should leave it blank 

153 assert "verdict" not in pinfo 

154 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

155 excuse.policy_info[policy.policy_id] = pinfo 

156 pinfo["verdict"] = policy_verdict.name 

157 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

158 excuse.policy_verdict = excuse_verdict 

159 

160 def apply_srcarch_policies( 

161 self, 

162 item: MigrationItem, 

163 arch: str, 

164 source_t: SourcePackage | None, 

165 source_u: SourcePackage, 

166 excuse: "Excuse", 

167 ) -> None: 

168 excuse_verdict = excuse.policy_verdict 

169 source_suite = item.suite 

170 suite_class = source_suite.suite_class 

171 for policy in self._policies: 

172 pinfo: dict[str, Any] = {} 

173 if suite_class in policy.applicable_suites: 

174 policy_verdict = policy.apply_srcarch_policy_impl( 

175 pinfo, item, arch, source_t, source_u, excuse 

176 ) 

177 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

178 # The base policy provides this field, so the subclass should leave it blank 

179 assert "verdict" not in pinfo 

180 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

181 excuse.policy_info[policy.policy_id] = pinfo 

182 pinfo["verdict"] = policy_verdict.name 

183 excuse.policy_verdict = excuse_verdict 

184 

185 

186class BasePolicy(ABC): 

187 britney: "Britney" 

188 policy_id: str 

189 hints: HintCollection | None 

190 applicable_suites: set[SuiteClass] 

191 src_policy: ApplySrcPolicy 

192 options: optparse.Values 

193 suite_info: Suites 

194 

195 def __init__( 

196 self, 

197 options: optparse.Values, 

198 suite_info: Suites, 

199 ) -> None: 

200 """The BasePolicy constructor 

201 

202 :param options: The options member of Britney with all the 

203 config values. 

204 """ 

205 

206 @property 

207 @abstractmethod 

208 def state_dir(self) -> str: ... 208 ↛ exitline 208 didn't return from function 'state_dir' because

209 

210 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover 

211 """Register new hints that this policy accepts 

212 

213 :param hint_parser: (see HintParser.register_hint_type) 

214 """ 

215 

216 def initialise(self, britney: "Britney") -> None: # pragma: no cover 

217 """Called once to make the policy initialise any data structures 

218 

219 This is useful for e.g. parsing files or other "heavy do-once" work. 

220 

221 :param britney: This is the instance of the "Britney" class. 

222 """ 

223 self.britney = britney 

224 

225 def save_state(self, britney: "Britney") -> None: # pragma: no cover 

226 """Called once at the end of the run to make the policy save any persistent data 

227 

228 Note this will *not* be called for "dry-runs" as such runs should not change 

229 the state. 

230 

231 :param britney: This is the instance of the "Britney" class. 

232 """ 

233 

234 def apply_src_policy_impl( 

235 self, 

236 policy_info: dict[str, Any], 

237 item: MigrationItem, 

238 source_data_tdist: SourcePackage | None, 

239 source_data_srcdist: SourcePackage, 

240 excuse: "Excuse", 

241 ) -> PolicyVerdict: # pragma: no cover 

242 """Apply a policy on a given source migration 

243 

244 Britney will call this method on a given source package, when 

245 Britney is considering to migrate it from the given source 

246 suite to the target suite. The policy will then evaluate the 

247 the migration and then return a verdict. 

248 

249 :param policy_info: A dictionary of all policy results. The 

250 policy can add a value stored in a key related to its name. 

251 (e.g. policy_info['age'] = {...}). This will go directly into 

252 the "excuses.yaml" output. 

253 

254 :param item: The migration item the policy is applied to. 

255 

256 :param source_data_tdist: Information about the source package 

257 in the target distribution (e.g. "testing"). This is the 

258 data structure in source_suite.sources[source_name] 

259 

260 :param source_data_srcdist: Information about the source 

261 package in the source distribution (e.g. "unstable" or "tpu"). 

262 This is the data structure in target_suite.sources[source_name] 

263 

264 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

265 """ 

266 return PolicyVerdict.NOT_APPLICABLE 

267 

268 def apply_srcarch_policy_impl( 

269 self, 

270 policy_info: dict[str, Any], 

271 item: MigrationItem, 

272 arch: str, 

273 source_data_tdist: SourcePackage | None, 

274 source_data_srcdist: SourcePackage, 

275 excuse: "Excuse", 

276 ) -> PolicyVerdict: 

277 """Apply a policy on a given binary migration 

278 

279 Britney will call this method on binaries from a given source package 

280 on a given architecture, when Britney is considering to migrate them 

281 from the given source suite to the target suite. The policy will then 

282 evaluate the migration and then return a verdict. 

283 

284 :param policy_info: A dictionary of all policy results. The 

285 policy can add a value stored in a key related to its name. 

286 (e.g. policy_info['age'] = {...}). This will go directly into 

287 the "excuses.yaml" output. 

288 

289 :param item: The migration item the policy is applied to. 

290 

291 :param arch: The architecture the item is applied to. This is mostly 

292 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC 

293 (as that is the only case where arch can differ from item.architecture) 

294 

295 :param source_data_tdist: Information about the source package 

296 in the target distribution (e.g. "testing"). This is the 

297 data structure in source_suite.sources[source_name] 

298 

299 :param source_data_srcdist: Information about the source 

300 package in the source distribution (e.g. "unstable" or "tpu"). 

301 This is the data structure in target_suite.sources[source_name] 

302 

303 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

304 """ 

305 # if the policy doesn't implement this function, assume it's OK 

306 return PolicyVerdict.NOT_APPLICABLE 

307 

308 

309class AbstractBasePolicy(BasePolicy): 

310 """ 

311 A shared abstract class for building BasePolicy objects. 

312 

313 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy 

314 objects with just a two-item constructor, while all other uses of BasePolicy- 

315 derived objects need the 5-item constructor. So AbstractBasePolicy was split 

316 out to document this. 

317 """ 

318 

319 def __init__( 

320 self, 

321 policy_id: str, 

322 options: optparse.Values, 

323 suite_info: Suites, 

324 applicable_suites: set[SuiteClass], 

325 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC, 

326 ) -> None: 

327 """Concrete initializer. 

328 

329 :param policy_id: Identifies the policy. It will 

330 determine the key used for the excuses.yaml etc. 

331 

332 :param options: The options member of Britney with all the 

333 config values. 

334 

335 :param applicable_suites: Where this policy applies. 

336 """ 

337 self.policy_id = policy_id 

338 self.options = options 

339 self.suite_info = suite_info 

340 self.applicable_suites = applicable_suites 

341 self.src_policy = src_policy 

342 self.hints: HintCollection | None = None 

343 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

344 self.logger = logging.getLogger(logger_name) 

345 

346 @property 

347 def state_dir(self) -> str: 

348 return cast(str, self.options.state_dir) 

349 

350 

351_T = TypeVar("_T") 

352 

353 

354class SimplePolicyHint(Hint, Generic[_T]): 

355 def __init__( 

356 self, 

357 user: str, 

358 hint_type: HintType, 

359 policy_parameter: _T, 

360 packages: list[MigrationItem], 

361 ) -> None: 

362 super().__init__(user, hint_type, packages) 

363 self._policy_parameter = policy_parameter 

364 

365 def __eq__(self, other: Any) -> bool: 

366 if self.type != other.type or self._policy_parameter != other._policy_parameter: 

367 return False 

368 return super().__eq__(other) 

369 

370 def str(self) -> str: 

371 return "{} {} {}".format( 

372 self._type, 

373 str(self._policy_parameter), 

374 " ".join(x.name for x in self._packages), 

375 ) 

376 

377 

378class AgeDayHint(SimplePolicyHint[int]): 

379 @property 

380 def days(self) -> int: 

381 return self._policy_parameter 

382 

383 

384class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]): 

385 @property 

386 def ignored_rcbugs(self) -> frozenset[str]: 

387 return self._policy_parameter 

388 

389 

390def simple_policy_hint_parser_function( 

391 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint], 

392 converter: Callable[[str], _T], 

393) -> PolicyHintParserProto: 

394 def f( 

395 mi_factory: MigrationItemFactory, 

396 hints: HintCollection, 

397 who: str, 

398 hint_type: HintType, 

399 *args: str, 

400 ) -> None: 

401 policy_parameter = args[0] 

402 args = args[1:] 

403 for item in mi_factory.parse_items(*args): 

404 hints.add_hint( 

405 class_name(who, hint_type, converter(policy_parameter), [item]) 

406 ) 

407 

408 return f 

409 

410 

411class AgePolicy(AbstractBasePolicy): 

412 """Configurable Aging policy for source migrations 

413 

414 The AgePolicy will let packages stay in the source suite for a pre-defined 

415 amount of days before letting migrate (based on their urgency, if any). 

416 

417 The AgePolicy's decision is influenced by the following: 

418 

419 State files: 

420 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source 

421 packages. Note that urgencies are "sticky" and the most "urgent" urgency 

422 will be used (i.e. the one with lowest age-requirements). 

423 - This file needs to be updated externally, if the policy should take 

424 urgencies into consideration. If empty (or not updated), the policy 

425 will simply use the default urgency (see the "Config" section below) 

426 - In Debian, these values are taken from the .changes file, but that is 

427 not a requirement for Britney. 

428 * ${STATE_DIR}/age-policy-dates: File containing the age of all source 

429 packages. 

430 - The policy will automatically update this file. 

431 Config: 

432 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency 

433 (or for unknown urgencies). Will also be used to set the "minimum" 

434 aging requirements for packages not in the target suite. 

435 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the 

436 given urgency. 

437 - Commonly used urgencies are: low, medium, high, emergency, critical 

438 Hints: 

439 * urgent <source>/<version>: Disregard the age requirements for a given 

440 source/version. 

441 * age-days X <source>/<version>: Set the age requirements for a given 

442 source/version to X days. Note that X can exceed the highest 

443 age-requirement normally given. 

444 

445 """ 

446 

447 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

448 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}) 

449 self._min_days = self._generate_mindays_table() 

450 self._min_days_default = 0 

451 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day) 

452 # NB: _date_now is used in tests 

453 time_now = time.time() 

454 if hasattr(self.options, "fake_runtime"): 

455 time_now = int(self.options.fake_runtime) 

456 self.logger.info("overriding runtime with fake_runtime %d" % time_now) 

457 

458 self._date_now = int(((time_now / (60 * 60)) - 19) / 24) 

459 self._dates: dict[str, tuple[str, int]] = {} 

460 self._urgencies: dict[str, str] = {} 

461 self._default_urgency: str = self.options.default_urgency 

462 self._penalty_immune_urgencies: frozenset[str] = frozenset() 

463 if hasattr(self.options, "no_penalties"): 

464 self._penalty_immune_urgencies = frozenset( 

465 x.strip() for x in self.options.no_penalties.split() 

466 ) 

467 self._bounty_min_age: int | None = None # initialised later 

468 

469 def _generate_mindays_table(self) -> dict[str, int]: 

470 mindays: dict[str, int] = {} 

471 for k in dir(self.options): 

472 if not k.startswith("mindays_"): 

473 continue 

474 v = getattr(self.options, k) 

475 try: 

476 as_days = int(v) 

477 except ValueError: 

478 raise ValueError( 

479 "Unable to parse " 

480 + k 

481 + " as a number of days. Must be 0 or a positive integer" 

482 ) 

483 if as_days < 0: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 raise ValueError( 

485 "The value of " + k + " must be zero or a positive integer" 

486 ) 

487 mindays[k.split("_")[1]] = as_days 

488 return mindays 

489 

490 def register_hints(self, hint_parser: HintParser) -> None: 

491 hint_parser.register_hint_type( 

492 HintType( 

493 "age-days", 

494 simple_policy_hint_parser_function(AgeDayHint, int), 

495 min_args=2, 

496 ) 

497 ) 

498 hint_parser.register_hint_type(HintType("urgent")) 

499 

500 def initialise(self, britney: "Britney") -> None: 

501 super().initialise(britney) 

502 self._read_dates_file() 

503 self._read_urgencies_file() 

504 if self._default_urgency not in self._min_days: # pragma: no cover 

505 raise ValueError( 

506 "Missing age-requirement for default urgency (MINDAYS_%s)" 

507 % self._default_urgency 

508 ) 

509 self._min_days_default = self._min_days[self._default_urgency] 

510 try: 

511 self._bounty_min_age = int(self.options.bounty_min_age) 

512 except ValueError: 512 ↛ 513line 512 didn't jump to line 513 because the exception caught by line 512 didn't happen

513 if self.options.bounty_min_age in self._min_days: 

514 self._bounty_min_age = self._min_days[self.options.bounty_min_age] 

515 else: # pragma: no cover 

516 raise ValueError( 

517 "Please fix BOUNTY_MIN_AGE in the britney configuration" 

518 ) 

519 except AttributeError: 

520 # The option wasn't defined in the configuration 

521 self._bounty_min_age = 0 

522 

523 def save_state(self, britney: "Britney") -> None: 

524 super().save_state(britney) 

525 self._write_dates_file() 

526 

527 def apply_src_policy_impl( 

528 self, 

529 age_info: dict[str, Any], 

530 item: MigrationItem, 

531 source_data_tdist: SourcePackage | None, 

532 source_data_srcdist: SourcePackage, 

533 excuse: "Excuse", 

534 ) -> PolicyVerdict: 

535 # retrieve the urgency for the upload, ignoring it if this is a NEW package 

536 # (not present in the target suite) 

537 source_name = item.package 

538 urgency = self._urgencies.get(source_name, self._default_urgency) 

539 

540 if urgency not in self._min_days: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true

541 age_info["unknown-urgency"] = urgency 

542 urgency = self._default_urgency 

543 

544 if not source_data_tdist: 

545 if self._min_days[urgency] < self._min_days_default: 

546 age_info["urgency-reduced"] = { 

547 "from": urgency, 

548 "to": self._default_urgency, 

549 } 

550 urgency = self._default_urgency 

551 

552 if source_name not in self._dates: 

553 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

554 elif self._dates[source_name][0] != source_data_srcdist.version: 

555 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

556 

557 days_old = self._date_now - self._dates[source_name][1] 

558 min_days = self._min_days[urgency] 

559 for bounty in excuse.bounty: 

560 if excuse.bounty[bounty]: 560 ↛ 559line 560 didn't jump to line 559 because the condition on line 560 was always true

561 self.logger.info( 

562 "Applying bounty for %s granted by %s: %d days", 

563 source_name, 

564 bounty, 

565 excuse.bounty[bounty], 

566 ) 

567 excuse.addinfo( 

568 "Required age reduced by %d days because of %s" 

569 % (excuse.bounty[bounty], bounty) 

570 ) 

571 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen" 

572 min_days -= excuse.bounty[bounty] 

573 if urgency not in self._penalty_immune_urgencies: 

574 for penalty in excuse.penalty: 

575 if excuse.penalty[penalty]: 575 ↛ 574line 575 didn't jump to line 574 because the condition on line 575 was always true

576 self.logger.info( 

577 "Applying penalty for %s given by %s: %d days", 

578 source_name, 

579 penalty, 

580 excuse.penalty[penalty], 

581 ) 

582 excuse.addinfo( 

583 "Required age increased by %d days because of %s" 

584 % (excuse.penalty[penalty], penalty) 

585 ) 

586 assert ( 

587 excuse.penalty[penalty] > 0 

588 ), "negative penalties should be handled earlier" 

589 min_days += excuse.penalty[penalty] 

590 

591 assert self._bounty_min_age is not None 

592 # the age in BOUNTY_MIN_AGE can be higher than the one associated with 

593 # the real urgency, so don't forget to take it into account 

594 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency]) 

595 if min_days < bounty_min_age: 

596 min_days = bounty_min_age 

597 excuse.addinfo( 

598 "Required age is not allowed to drop below %d days" % min_days 

599 ) 

600 

601 age_info["current-age"] = days_old 

602 

603 assert self.hints is not None 

604 for age_days_hint in cast( 

605 "list[AgeDayHint]", 

606 self.hints.search( 

607 "age-days", package=source_name, version=source_data_srcdist.version 

608 ), 

609 ): 

610 new_req = age_days_hint.days 

611 age_info["age-requirement-reduced"] = { 

612 "new-requirement": new_req, 

613 "changed-by": age_days_hint.user, 

614 } 

615 if "original-age-requirement" not in age_info: 615 ↛ 617line 615 didn't jump to line 617 because the condition on line 615 was always true

616 age_info["original-age-requirement"] = min_days 

617 min_days = new_req 

618 

619 age_info["age-requirement"] = min_days 

620 res = PolicyVerdict.PASS 

621 

622 if days_old < min_days: 

623 urgent_hints = self.hints.search( 

624 "urgent", package=source_name, version=source_data_srcdist.version 

625 ) 

626 if urgent_hints: 

627 age_info["age-requirement-reduced"] = { 

628 "new-requirement": 0, 

629 "changed-by": urgent_hints[0].user, 

630 } 

631 res = PolicyVerdict.PASS_HINTED 

632 else: 

633 res = PolicyVerdict.REJECTED_TEMPORARILY 

634 

635 # update excuse 

636 age_hint = age_info.get("age-requirement-reduced", None) 

637 age_min_req = age_info["age-requirement"] 

638 if age_hint: 

639 new_req = age_hint["new-requirement"] 

640 who = age_hint["changed-by"] 

641 if new_req: 

642 excuse.addinfo( 

643 "Overriding age needed from %d days to %d by %s" 

644 % (age_min_req, new_req, who) 

645 ) 

646 age_min_req = new_req 

647 else: 

648 excuse.addinfo("Too young, but urgency pushed by %s" % who) 

649 age_min_req = 0 

650 excuse.setdaysold(age_info["current-age"], age_min_req) 

651 

652 if age_min_req == 0: 

653 excuse.addinfo("%d days old" % days_old) 

654 elif days_old < age_min_req: 

655 excuse.add_verdict_info( 

656 res, "Too young, only %d of %d days old" % (days_old, age_min_req) 

657 ) 

658 else: 

659 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req)) 

660 

661 return res 

662 

663 def _read_dates_file(self) -> None: 

664 """Parse the dates file""" 

665 dates = self._dates 

666 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates") 

667 using_new_name = False 

668 try: 

669 filename = os.path.join(self.state_dir, "age-policy-dates") 

670 if not os.path.exists(filename) and os.path.exists(fallback_filename): 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true

671 filename = fallback_filename 

672 else: 

673 using_new_name = True 

674 except AttributeError: 

675 if os.path.exists(fallback_filename): 

676 filename = fallback_filename 

677 else: 

678 raise RuntimeError("Please set STATE_DIR in the britney configuration") 

679 

680 try: 

681 with open(filename, encoding="utf-8") as fd: 

682 for line in fd: 

683 if line.startswith("#"): 

684 # Ignore comment lines (mostly used for tests) 

685 continue 

686 # <source> <version> <date>) 

687 ln = line.split() 

688 if len(ln) != 3: # pragma: no cover 

689 continue 

690 try: 

691 dates[ln[0]] = (ln[1], int(ln[2])) 

692 except ValueError: # pragma: no cover 

693 pass 

694 except FileNotFoundError: 

695 if not using_new_name: 695 ↛ 697line 695 didn't jump to line 697 because the condition on line 695 was never true

696 # If we using the legacy name, then just give up 

697 raise 

698 self.logger.info("%s does not appear to exist. Creating it", filename) 

699 with open(filename, mode="x", encoding="utf-8"): 

700 pass 

701 

702 def _read_urgencies_file(self) -> None: 

703 urgencies = self._urgencies 

704 min_days_default = self._min_days_default 

705 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency") 

706 try: 

707 filename = os.path.join(self.state_dir, "age-policy-urgencies") 

708 if not os.path.exists(filename) and os.path.exists(fallback_filename): 708 ↛ 709line 708 didn't jump to line 709 because the condition on line 708 was never true

709 filename = fallback_filename 

710 except AttributeError: 

711 filename = fallback_filename 

712 

713 sources_s = self.suite_info.primary_source_suite.sources 

714 sources_t = self.suite_info.target_suite.sources 

715 

716 with open(filename, errors="surrogateescape", encoding="ascii") as fd: 

717 for line in fd: 

718 if line.startswith("#"): 

719 # Ignore comment lines (mostly used for tests) 

720 continue 

721 # <source> <version> <urgency> 

722 ln = line.split() 

723 if len(ln) != 3: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true

724 continue 

725 

726 # read the minimum days associated with the urgencies 

727 urgency_old = urgencies.get(ln[0], None) 

728 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type] 

729 mindays_new = self._min_days.get(ln[2], min_days_default) 

730 

731 # if the new urgency is lower (so the min days are higher), do nothing 

732 if mindays_old <= mindays_new: 

733 continue 

734 

735 # if the package exists in the target suite and it is more recent, do nothing 

736 tsrcv = sources_t.get(ln[0], None) 

737 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0: 

738 continue 

739 

740 # if the package doesn't exist in the primary source suite or it is older, do nothing 

741 usrcv = sources_s.get(ln[0], None) 

742 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 742 ↛ 743line 742 didn't jump to line 743 because the condition on line 742 was never true

743 continue 

744 

745 # update the urgency for the package 

746 urgencies[ln[0]] = ln[2] 

747 

748 def _write_dates_file(self) -> None: 

749 dates = self._dates 

750 try: 

751 directory = self.state_dir 

752 basename = "age-policy-dates" 

753 old_file = os.path.join(self.suite_info.target_suite.path, "Dates") 

754 except AttributeError: 

755 directory = self.suite_info.target_suite.path 

756 basename = "Dates" 

757 old_file = None 

758 filename = os.path.join(directory, basename) 

759 filename_tmp = os.path.join(directory, "%s_new" % basename) 

760 with open(filename_tmp, "w", encoding="utf-8") as fd: 

761 for pkg in sorted(dates): 

762 version, date = dates[pkg] 

763 fd.write("%s %s %d\n" % (pkg, version, date)) 

764 os.rename(filename_tmp, filename) 

765 if old_file is not None and os.path.exists(old_file): 765 ↛ 766line 765 didn't jump to line 766 because the condition on line 765 was never true

766 self.logger.info("Removing old age-policy-dates file %s", old_file) 

767 os.unlink(old_file) 

768 

769 

770class RCBugPolicy(AbstractBasePolicy): 

771 """RC bug regression policy for source migrations 

772 

773 The RCBugPolicy will read provided list of RC bugs and block any 

774 source upload that would introduce a *new* RC bug in the target 

775 suite. 

776 

777 The RCBugPolicy's decision is influenced by the following: 

778 

779 State files: 

780 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in 

781 the given suite (one for both primary source suite and the target sutie is 

782 needed). 

783 - These files need to be updated externally. 

784 """ 

785 

786 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

787 super().__init__( 

788 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

789 ) 

790 self._bugs_source: dict[str, set[str]] | None = None 

791 self._bugs_target: dict[str, set[str]] | None = None 

792 

793 def register_hints(self, hint_parser: HintParser) -> None: 

794 f = simple_policy_hint_parser_function( 

795 IgnoreRCBugHint, lambda x: frozenset(x.split(",")) 

796 ) 

797 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2)) 

798 

799 def initialise(self, britney: "Britney") -> None: 

800 super().initialise(britney) 

801 source_suite = self.suite_info.primary_source_suite 

802 target_suite = self.suite_info.target_suite 

803 fallback_unstable = os.path.join(source_suite.path, "BugsV") 

804 fallback_testing = os.path.join(target_suite.path, "BugsV") 

805 try: 

806 filename_unstable = os.path.join( 

807 self.state_dir, "rc-bugs-%s" % source_suite.name 

808 ) 

809 filename_testing = os.path.join( 

810 self.state_dir, "rc-bugs-%s" % target_suite.name 

811 ) 

812 if ( 812 ↛ 818line 812 didn't jump to line 818

813 not os.path.exists(filename_unstable) 

814 and not os.path.exists(filename_testing) 

815 and os.path.exists(fallback_unstable) 

816 and os.path.exists(fallback_testing) 

817 ): 

818 filename_unstable = fallback_unstable 

819 filename_testing = fallback_testing 

820 except AttributeError: 

821 filename_unstable = fallback_unstable 

822 filename_testing = fallback_testing 

823 self._bugs_source = self._read_bugs(filename_unstable) 

824 self._bugs_target = self._read_bugs(filename_testing) 

825 

826 def apply_src_policy_impl( 

827 self, 

828 rcbugs_info: dict[str, Any], 

829 item: MigrationItem, 

830 source_data_tdist: SourcePackage | None, 

831 source_data_srcdist: SourcePackage, 

832 excuse: "Excuse", 

833 ) -> PolicyVerdict: 

834 assert self._bugs_source is not None # for type checking 

835 assert self._bugs_target is not None # for type checking 

836 bugs_t = set() 

837 bugs_s = set() 

838 source_name = item.package 

839 binaries_s = {x[0] for x in source_data_srcdist.binaries} 

840 try: 

841 binaries_t = {x[0] for x in source_data_tdist.binaries} # type: ignore[union-attr] 

842 except AttributeError: 

843 binaries_t = set() 

844 

845 src_key = f"src:{source_name}" 

846 if source_data_tdist and src_key in self._bugs_target: 

847 bugs_t.update(self._bugs_target[src_key]) 

848 if src_key in self._bugs_source: 

849 bugs_s.update(self._bugs_source[src_key]) 

850 

851 for pkg in binaries_s: 

852 if pkg in self._bugs_source: 

853 bugs_s |= self._bugs_source[pkg] 

854 for pkg in binaries_t: 

855 if pkg in self._bugs_target: 

856 bugs_t |= self._bugs_target[pkg] 

857 

858 # The bts seems to support filing source bugs against a binary of the 

859 # same name if that binary isn't built by any source. An example is bug 

860 # 820347 against Package: juce (in the live-2016-04-11 test). Add those 

861 # bugs too. 

862 if ( 

863 source_name not in (binaries_s | binaries_t) 

864 and source_name 

865 not in { 

866 x.package_name 

867 for x in self.suite_info.primary_source_suite.all_binaries_in_suite.keys() 

868 } 

869 and source_name 

870 not in { 

871 x.package_name 

872 for x in self.suite_info.target_suite.all_binaries_in_suite.keys() 

873 } 

874 ): 

875 if source_name in self._bugs_source: 

876 bugs_s |= self._bugs_source[source_name] 

877 if source_name in self._bugs_target: 877 ↛ 878line 877 didn't jump to line 878 because the condition on line 877 was never true

878 bugs_t |= self._bugs_target[source_name] 

879 

880 # If a package is not in the target suite, it has no RC bugs per 

881 # definition. Unfortunately, it seems that the live-data is 

882 # not always accurate (e.g. live-2011-12-13 suggests that 

883 # obdgpslogger had the same bug in testing and unstable, 

884 # but obdgpslogger was not in testing at that time). 

885 # - For the curious, obdgpslogger was removed on that day 

886 # and the BTS probably had not caught up with that fact. 

887 # (https://tracker.debian.org/news/415935) 

888 assert not bugs_t or source_data_tdist, ( 

889 "%s had bugs in the target suite but is not present" % source_name 

890 ) 

891 

892 verdict = PolicyVerdict.PASS 

893 

894 assert self.hints is not None 

895 for ignore_hint in cast( 

896 list[IgnoreRCBugHint], 

897 self.hints.search( 

898 "ignore-rc-bugs", 

899 package=source_name, 

900 version=source_data_srcdist.version, 

901 ), 

902 ): 

903 ignored_bugs = ignore_hint.ignored_rcbugs 

904 

905 # Only handle one hint for now 

906 if "ignored-bugs" in rcbugs_info: 

907 self.logger.info( 

908 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s", 

909 ignore_hint.user, 

910 source_name, 

911 rcbugs_info["ignored-bugs"]["issued-by"], 

912 ) 

913 continue 

914 if not ignored_bugs.isdisjoint(bugs_s): 914 ↛ 923line 914 didn't jump to line 923 because the condition on line 914 was always true

915 bugs_s -= ignored_bugs 

916 bugs_t -= ignored_bugs 

917 rcbugs_info["ignored-bugs"] = { 

918 "bugs": sorted(ignored_bugs), 

919 "issued-by": ignore_hint.user, 

920 } 

921 verdict = PolicyVerdict.PASS_HINTED 

922 else: 

923 self.logger.info( 

924 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package", 

925 ignore_hint.user, 

926 source_name, 

927 str(ignored_bugs), 

928 ) 

929 

930 rcbugs_info["shared-bugs"] = sorted(bugs_s & bugs_t) 

931 rcbugs_info["unique-source-bugs"] = sorted(bugs_s - bugs_t) 

932 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_s) 

933 

934 # update excuse 

935 new_bugs = rcbugs_info["unique-source-bugs"] 

936 old_bugs = rcbugs_info["unique-target-bugs"] 

937 excuse.setbugs(old_bugs, new_bugs) 

938 

939 if new_bugs: 

940 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

941 excuse.add_verdict_info( 

942 verdict, 

943 "Updating %s would introduce bugs in %s: %s" 

944 % ( 

945 source_name, 

946 self.suite_info.target_suite.name, 

947 ", ".join( 

948 [ 

949 '<a href="https://bugs.debian.org/%s">#%s</a>' 

950 % (quote(a), a) 

951 for a in new_bugs 

952 ] 

953 ), 

954 ), 

955 ) 

956 

957 if old_bugs: 

958 excuse.addinfo( 

959 "Updating %s will fix bugs in %s: %s" 

960 % ( 

961 source_name, 

962 self.suite_info.target_suite.name, 

963 ", ".join( 

964 [ 

965 '<a href="https://bugs.debian.org/%s">#%s</a>' 

966 % (quote(a), a) 

967 for a in old_bugs 

968 ] 

969 ), 

970 ) 

971 ) 

972 

973 return verdict 

974 

975 def _read_bugs(self, filename: str) -> dict[str, set[str]]: 

976 """Read the release critical bug summary from the specified file 

977 

978 The file contains rows with the format: 

979 

980 <package-name> <bug number>[,<bug number>...] 

981 

982 The method returns a dictionary where the key is the binary package 

983 name and the value is the list of open RC bugs for it. 

984 """ 

985 bugs: dict[str, set[str]] = {} 

986 self.logger.info("Loading RC bugs data from %s", filename) 

987 with open(filename, encoding="ascii") as f: 

988 for line in f: 

989 ln = line.split() 

990 if len(ln) != 2: # pragma: no cover 

991 self.logger.warning("Malformed line found in line %s", line) 

992 continue 

993 pkg = ln[0] 

994 if pkg not in bugs: 

995 bugs[pkg] = set() 

996 bugs[pkg].update(ln[1].split(",")) 

997 return bugs 

998 

999 

1000class PiupartsPolicy(AbstractBasePolicy): 

1001 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1002 super().__init__( 

1003 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

1004 ) 

1005 self._piuparts_source: dict[str, tuple[str, str]] | None = None 

1006 self._piuparts_target: dict[str, tuple[str, str]] | None = None 

1007 

1008 def register_hints(self, hint_parser: HintParser) -> None: 

1009 hint_parser.register_hint_type(HintType("ignore-piuparts")) 

1010 

1011 def initialise(self, britney: "Britney") -> None: 

1012 super().initialise(britney) 

1013 source_suite = self.suite_info.primary_source_suite 

1014 target_suite = self.suite_info.target_suite 

1015 try: 

1016 filename_unstable = os.path.join( 

1017 self.state_dir, "piuparts-summary-%s.json" % source_suite.name 

1018 ) 

1019 filename_testing = os.path.join( 

1020 self.state_dir, "piuparts-summary-%s.json" % target_suite.name 

1021 ) 

1022 except AttributeError as e: # pragma: no cover 

1023 raise RuntimeError( 

1024 "Please set STATE_DIR in the britney configuration" 

1025 ) from e 

1026 self._piuparts_source = self._read_piuparts_summary( 

1027 filename_unstable, keep_url=True 

1028 ) 

1029 self._piuparts_target = self._read_piuparts_summary( 

1030 filename_testing, keep_url=False 

1031 ) 

1032 

1033 def apply_src_policy_impl( 

1034 self, 

1035 piuparts_info: dict[str, Any], 

1036 item: MigrationItem, 

1037 source_data_tdist: SourcePackage | None, 

1038 source_data_srcdist: SourcePackage, 

1039 excuse: "Excuse", 

1040 ) -> PolicyVerdict: 

1041 assert self._piuparts_source is not None # for type checking 

1042 assert self._piuparts_target is not None # for type checking 

1043 source_name = item.package 

1044 

1045 if source_name in self._piuparts_target: 

1046 testing_state = self._piuparts_target[source_name][0] 

1047 else: 

1048 testing_state = "X" 

1049 url: str | None 

1050 if source_name in self._piuparts_source: 

1051 unstable_state, url = self._piuparts_source[source_name] 

1052 else: 

1053 unstable_state = "X" 

1054 url = None 

1055 url_html = "(no link yet)" 

1056 if url is not None: 

1057 url_html = '<a href="{0}">{0}</a>'.format(url) 

1058 

1059 if unstable_state == "P": 

1060 # Not a regression 

1061 msg = f"Piuparts tested OK - {url_html}" 

1062 result = PolicyVerdict.PASS 

1063 piuparts_info["test-results"] = "pass" 

1064 elif unstable_state == "F": 

1065 if testing_state != unstable_state: 

1066 piuparts_info["test-results"] = "regression" 

1067 msg = f"Piuparts regression - {url_html}" 

1068 result = PolicyVerdict.REJECTED_PERMANENTLY 

1069 else: 

1070 piuparts_info["test-results"] = "failed" 

1071 msg = f"Piuparts failure (not a regression) - {url_html}" 

1072 result = PolicyVerdict.PASS 

1073 elif unstable_state == "W": 

1074 msg = f"Piuparts check waiting for test results - {url_html}" 

1075 result = PolicyVerdict.REJECTED_TEMPORARILY 

1076 piuparts_info["test-results"] = "waiting-for-test-results" 

1077 else: 

1078 msg = f"Piuparts can't test {source_name} (not a blocker) - {url_html}" 

1079 piuparts_info["test-results"] = "cannot-be-tested" 

1080 result = PolicyVerdict.PASS 

1081 

1082 if url is not None: 

1083 piuparts_info["piuparts-test-url"] = url 

1084 if result.is_rejected: 

1085 excuse.add_verdict_info(result, msg) 

1086 else: 

1087 excuse.addinfo(msg) 

1088 

1089 if result.is_rejected: 

1090 assert self.hints is not None 

1091 for ignore_hint in self.hints.search( 

1092 "ignore-piuparts", 

1093 package=source_name, 

1094 version=source_data_srcdist.version, 

1095 ): 

1096 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user} 

1097 result = PolicyVerdict.PASS_HINTED 

1098 excuse.addinfo( 

1099 f"Piuparts issue ignored as requested by {ignore_hint.user}" 

1100 ) 

1101 break 

1102 

1103 return result 

1104 

1105 def _read_piuparts_summary( 

1106 self, filename: str, keep_url: bool = True 

1107 ) -> dict[str, tuple[str, str]]: 

1108 summary: dict[str, tuple[str, str]] = {} 

1109 self.logger.info("Loading piuparts report from %s", filename) 

1110 with open(filename) as fd: 1110 ↛ exitline 1110 didn't return from function '_read_piuparts_summary' because the return on line 1112 wasn't executed

1111 if os.fstat(fd.fileno()).st_size < 1: 1111 ↛ 1112line 1111 didn't jump to line 1112 because the condition on line 1111 was never true

1112 return summary 

1113 data = json.load(fd) 

1114 try: 

1115 if ( 

1116 data["_id"] != "Piuparts Package Test Results Summary" 

1117 or data["_version"] != "1.0" 

1118 ): # pragma: no cover 

1119 raise ValueError( 

1120 f"Piuparts results in {filename} does not have the correct ID or version" 

1121 ) 

1122 except KeyError as e: # pragma: no cover 

1123 raise ValueError( 

1124 f"Piuparts results in {filename} is missing id or version field" 

1125 ) from e 

1126 for source, suite_data in data["packages"].items(): 

1127 if len(suite_data) != 1: # pragma: no cover 

1128 raise ValueError( 

1129 f"Piuparts results in {filename}, the source {source} does not have " 

1130 "exactly one result set" 

1131 ) 

1132 item = next(iter(suite_data.values())) 

1133 state, _, url = item 

1134 if not keep_url: 

1135 url = None 

1136 summary[source] = (state, url) 

1137 

1138 return summary 

1139 

1140 

1141class DependsPolicy(AbstractBasePolicy): 

1142 pkg_universe: "BinaryPackageUniverse" 

1143 broken_packages: frozenset["BinaryPackageId"] 

1144 all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

1145 allow_uninst: dict[str, set[str | None]] 

1146 

1147 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1148 super().__init__( 

1149 "depends", 

1150 options, 

1151 suite_info, 

1152 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1153 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1154 ) 

1155 self.nobreakall_arches = None 

1156 self.new_arches = None 

1157 self.break_arches = None 

1158 

1159 def initialise(self, britney: "Britney") -> None: 

1160 super().initialise(britney) 

1161 self.pkg_universe = britney.pkg_universe 

1162 self.broken_packages = self.pkg_universe.broken_packages 

1163 self.all_binaries = britney.all_binaries 

1164 self.nobreakall_arches = self.options.nobreakall_arches 

1165 self.new_arches = self.options.new_arches 

1166 self.break_arches = self.options.break_arches 

1167 self.allow_uninst = britney.allow_uninst 

1168 

1169 def apply_srcarch_policy_impl( 

1170 self, 

1171 deps_info: dict[str, Any], 

1172 item: MigrationItem, 

1173 arch: str, 

1174 source_data_tdist: SourcePackage | None, 

1175 source_data_srcdist: SourcePackage, 

1176 excuse: "Excuse", 

1177 ) -> PolicyVerdict: 

1178 verdict = PolicyVerdict.PASS 

1179 

1180 assert self.break_arches is not None 

1181 assert self.new_arches is not None 

1182 if arch in self.break_arches or arch in self.new_arches: 

1183 # we don't check these in the policy (TODO - for now?) 

1184 return verdict 

1185 

1186 source_suite = item.suite 

1187 target_suite = self.suite_info.target_suite 

1188 

1189 packages_s_a = source_suite.binaries[arch] 

1190 packages_t_a = target_suite.binaries[arch] 

1191 

1192 my_bins = sorted(excuse.packages[arch]) 

1193 

1194 arch_all_installable = set() 

1195 arch_arch_installable = set() 

1196 consider_it_regression = True 

1197 

1198 for pkg_id in my_bins: 

1199 pkg_name = pkg_id.package_name 

1200 binary_u = packages_s_a[pkg_name] 

1201 pkg_arch = binary_u.architecture 

1202 

1203 # in some cases, we want to track the uninstallability of a 

1204 # package (because the autopkgtest policy uses this), but we still 

1205 # want to allow the package to be uninstallable 

1206 skip_dep_check = False 

1207 

1208 if binary_u.source_version != source_data_srcdist.version: 

1209 # don't check cruft in unstable 

1210 continue 

1211 

1212 if item.architecture != "source" and pkg_arch == "all": 

1213 # we don't care about the existing arch: all binaries when 

1214 # checking a binNMU item, because the arch: all binaries won't 

1215 # migrate anyway 

1216 skip_dep_check = True 

1217 

1218 if pkg_arch == "all" and arch not in self.nobreakall_arches: 

1219 skip_dep_check = True 

1220 

1221 if pkg_name in self.allow_uninst[arch]: 1221 ↛ 1224line 1221 didn't jump to line 1224 because the condition on line 1221 was never true

1222 # this binary is allowed to become uninstallable, so we don't 

1223 # need to check anything 

1224 skip_dep_check = True 

1225 

1226 if pkg_name in packages_t_a: 

1227 oldbin = packages_t_a[pkg_name] 

1228 if not target_suite.is_installable(oldbin.pkg_id): 

1229 # as the current binary in testing is already 

1230 # uninstallable, the newer version is allowed to be 

1231 # uninstallable as well, so we don't need to check 

1232 # anything 

1233 skip_dep_check = True 

1234 consider_it_regression = False 

1235 

1236 if pkg_id in self.broken_packages: 

1237 if pkg_arch == "all": 

1238 arch_all_installable.add(False) 

1239 else: 

1240 arch_arch_installable.add(False) 

1241 # dependencies can't be satisfied by all the known binaries - 

1242 # this certainly won't work... 

1243 excuse.add_unsatisfiable_on_arch(arch) 

1244 if skip_dep_check: 

1245 # ...but if the binary is allowed to become uninstallable, 

1246 # we don't care 

1247 # we still want the binary to be listed as uninstallable, 

1248 continue 

1249 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

1250 excuse.add_verdict_info( 

1251 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency" 

1252 ) 

1253 excuse.addreason("depends") 

1254 else: 

1255 if pkg_arch == "all": 

1256 arch_all_installable.add(True) 

1257 else: 

1258 arch_arch_installable.add(True) 

1259 

1260 if skip_dep_check: 

1261 continue 

1262 

1263 deps = self.pkg_universe.dependencies_of(pkg_id) 

1264 

1265 for dep in deps: 

1266 # dep is a list of packages, each of which satisfy the 

1267 # dependency 

1268 

1269 if dep == frozenset(): 

1270 continue 

1271 is_ok = False 

1272 needed_for_dep = set() 

1273 

1274 for alternative in dep: 

1275 if target_suite.is_pkg_in_the_suite(alternative): 

1276 # dep can be satisfied in testing - ok 

1277 is_ok = True 

1278 elif alternative in my_bins: 

1279 # can be satisfied by binary from same item: will be 

1280 # ok if item migrates 

1281 is_ok = True 

1282 else: 

1283 needed_for_dep.add(alternative) 

1284 

1285 if not is_ok: 

1286 spec = DependencySpec(DependencyType.DEPENDS, arch) 

1287 excuse.add_package_depends(spec, needed_for_dep) 

1288 

1289 # The autopkgtest policy needs delicate trade offs for 

1290 # non-installability. The current choice (considering source 

1291 # migration and only binaries built by the version of the 

1292 # source): 

1293 # 

1294 # * Run autopkgtest if all arch:$arch binaries are installable 

1295 # (but some or all arch:all binaries are not) 

1296 # 

1297 # * Don't schedule nor wait for not installable arch:all only package 

1298 # on ! NOBREAKALL_ARCHES 

1299 # 

1300 # * Run autopkgtest if installability isn't a regression (there are (or 

1301 # rather, should) not be a lot of packages in this state, and most 

1302 # likely they'll just fail quickly) 

1303 # 

1304 # * Don't schedule, but wait otherwise 

1305 if arch_arch_installable == {True} and False in arch_all_installable: 

1306 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1307 elif ( 

1308 arch not in self.nobreakall_arches 

1309 and arch_arch_installable == set() 

1310 and False in arch_all_installable 

1311 ): 

1312 deps_info.setdefault("arch_all_not_installable", []).append(arch) 

1313 elif not consider_it_regression: 

1314 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1315 

1316 return verdict 

1317 

1318 

1319@unique 

1320class BuildDepResult(IntEnum): 

1321 # relation is satisfied in target 

1322 OK = 1 

1323 # relation can be satisfied by other packages in source 

1324 DEPENDS = 2 

1325 # relation cannot be satisfied 

1326 FAILED = 3 

1327 

1328 

1329class BuildDependsPolicy(AbstractBasePolicy): 

1330 

1331 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1332 super().__init__( 

1333 "build-depends", 

1334 options, 

1335 suite_info, 

1336 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1337 ) 

1338 self._all_buildarch: list[str] = [] 

1339 

1340 parse_option(options, "all_buildarch") 

1341 

1342 def initialise(self, britney: "Britney") -> None: 

1343 super().initialise(britney) 

1344 if self.options.all_buildarch: 

1345 self._all_buildarch = SuiteContentLoader.config_str_as_list( 

1346 self.options.all_buildarch, [] 

1347 ) 

1348 

1349 def apply_src_policy_impl( 

1350 self, 

1351 build_deps_info: dict[str, Any], 

1352 item: MigrationItem, 

1353 source_data_tdist: SourcePackage | None, 

1354 source_data_srcdist: SourcePackage, 

1355 excuse: "Excuse", 

1356 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1357 ) -> PolicyVerdict: 

1358 verdict = PolicyVerdict.PASS 

1359 

1360 # analyze the dependency fields (if present) 

1361 if deps := source_data_srcdist.build_deps_arch: 

1362 v = self._check_build_deps( 

1363 deps, 

1364 DependencyType.BUILD_DEPENDS, 

1365 build_deps_info, 

1366 item, 

1367 source_data_tdist, 

1368 source_data_srcdist, 

1369 excuse, 

1370 get_dependency_solvers=get_dependency_solvers, 

1371 ) 

1372 verdict = PolicyVerdict.worst_of(verdict, v) 

1373 

1374 if ideps := source_data_srcdist.build_deps_indep: 

1375 v = self._check_build_deps( 

1376 ideps, 

1377 DependencyType.BUILD_DEPENDS_INDEP, 

1378 build_deps_info, 

1379 item, 

1380 source_data_tdist, 

1381 source_data_srcdist, 

1382 excuse, 

1383 get_dependency_solvers=get_dependency_solvers, 

1384 ) 

1385 verdict = PolicyVerdict.worst_of(verdict, v) 

1386 

1387 return verdict 

1388 

1389 def _get_check_archs( 

1390 self, archs: Container[str], dep_type: DependencyType 

1391 ) -> list[str]: 

1392 oos = self.options.outofsync_arches 

1393 

1394 if dep_type == DependencyType.BUILD_DEPENDS: 

1395 return [ 

1396 arch 

1397 for arch in self.options.architectures 

1398 if arch in archs and arch not in oos 

1399 ] 

1400 

1401 # first try the all buildarch 

1402 checkarchs = list(self._all_buildarch) 

1403 # then try the architectures where this source has arch specific 

1404 # binaries (in the order of the architecture config file) 

1405 checkarchs.extend( 

1406 arch 

1407 for arch in self.options.architectures 

1408 if arch in archs and arch not in checkarchs 

1409 ) 

1410 # then try all other architectures 

1411 checkarchs.extend( 

1412 arch for arch in self.options.architectures if arch not in checkarchs 

1413 ) 

1414 

1415 # and drop OUTOFSYNC_ARCHES 

1416 return [arch for arch in checkarchs if arch not in oos] 

1417 

1418 def _add_info_for_arch( 

1419 self, 

1420 arch: str, 

1421 excuses_info: dict[str, list[str]], 

1422 blockers: dict[str, set[BinaryPackageId]], 

1423 results: dict[str, BuildDepResult], 

1424 dep_type: DependencyType, 

1425 target_suite: TargetSuite, 

1426 source_suite: Suite, 

1427 excuse: "Excuse", 

1428 verdict: PolicyVerdict, 

1429 ) -> PolicyVerdict: 

1430 if arch in blockers: 

1431 packages = blockers[arch] 

1432 

1433 # for the solving packages, update the excuse to add the dependencies 

1434 for p in packages: 

1435 if arch not in self.options.break_arches: 1435 ↛ 1434line 1435 didn't jump to line 1434 because the condition on line 1435 was always true

1436 spec = DependencySpec(dep_type, arch) 

1437 excuse.add_package_depends(spec, {p}) 

1438 

1439 if arch in results and results[arch] == BuildDepResult.FAILED: 

1440 verdict = PolicyVerdict.worst_of( 

1441 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1442 ) 

1443 

1444 if arch in excuses_info: 

1445 for excuse_text in excuses_info[arch]: 

1446 if verdict.is_rejected: 1446 ↛ 1449line 1446 didn't jump to line 1449 because the condition on line 1446 was always true

1447 excuse.add_verdict_info(verdict, excuse_text) 

1448 else: 

1449 excuse.addinfo(excuse_text) 

1450 

1451 return verdict 

1452 

1453 def _check_build_deps( 

1454 self, 

1455 deps: str, 

1456 dep_type: DependencyType, 

1457 build_deps_info: dict[str, Any], 

1458 item: MigrationItem, 

1459 source_data_tdist: SourcePackage | None, 

1460 source_data_srcdist: SourcePackage, 

1461 excuse: "Excuse", 

1462 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1463 ) -> PolicyVerdict: 

1464 verdict = PolicyVerdict.PASS 

1465 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP 

1466 

1467 britney = self.britney 

1468 

1469 # local copies for better performance 

1470 parse_src_depends = apt_pkg.parse_src_depends 

1471 

1472 source_name = item.package 

1473 source_suite = item.suite 

1474 target_suite = self.suite_info.target_suite 

1475 binaries_s = source_suite.binaries 

1476 provides_s = source_suite.provides_table 

1477 binaries_t = target_suite.binaries 

1478 provides_t = target_suite.provides_table 

1479 unsat_bd: dict[str, list[str]] = {} 

1480 relevant_archs: set[str] = { 

1481 binary.architecture 

1482 for binary in source_data_srcdist.binaries 

1483 if britney.all_binaries[binary].architecture != "all" 

1484 } 

1485 

1486 excuses_info: dict[str, list[str]] = defaultdict(list) 

1487 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set) 

1488 arch_results = {} 

1489 result_archs = defaultdict(list) 

1490 bestresult = BuildDepResult.FAILED 

1491 check_archs = self._get_check_archs(relevant_archs, dep_type) 

1492 if not check_archs: 

1493 # when the arch list is empty, we check the b-d on any arch, instead of all archs 

1494 # this happens for Build-Depens on a source package that only produces arch: all binaries 

1495 any_arch_ok = True 

1496 check_archs = self._get_check_archs( 

1497 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP 

1498 ) 

1499 

1500 for arch in check_archs: 

1501 # retrieve the binary package from the specified suite and arch 

1502 binaries_s_a = binaries_s[arch] 

1503 provides_s_a = provides_s[arch] 

1504 binaries_t_a = binaries_t[arch] 

1505 provides_t_a = provides_t[arch] 

1506 arch_results[arch] = BuildDepResult.OK 

1507 # for every dependency block (formed as conjunction of disjunction) 

1508 for block_txt in deps.split(","): 

1509 block_list = parse_src_depends(block_txt, False, arch) 

1510 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be 

1511 # filtered out by (e.g.) architecture restrictions. We need to cope with this while 

1512 # keeping block_txt and block aligned. 

1513 if not block_list: 

1514 # Relation is not relevant for this architecture. 

1515 continue 

1516 block = block_list[0] 

1517 # if the block is satisfied in the target suite, then skip the block 

1518 if get_dependency_solvers( 

1519 block, binaries_t_a, provides_t_a, build_depends=True 

1520 ): 

1521 # Satisfied in the target suite; all ok. 

1522 continue 

1523 

1524 # check if the block can be satisfied in the source suite, and list the solving packages 

1525 packages = get_dependency_solvers( 

1526 block, binaries_s_a, provides_s_a, build_depends=True 

1527 ) 

1528 sources = sorted(p.source for p in packages) 

1529 

1530 # if the dependency can be satisfied by the same source package, skip the block: 

1531 # obviously both binary packages will enter the target suite together 

1532 if source_name in sources: 1532 ↛ 1533line 1532 didn't jump to line 1533 because the condition on line 1532 was never true

1533 continue 

1534 

1535 # if no package can satisfy the dependency, add this information to the excuse 

1536 if not packages: 

1537 excuses_info[arch].append( 

1538 "%s unsatisfiable %s on %s: %s" 

1539 % (source_name, dep_type, arch, block_txt.strip()) 

1540 ) 

1541 if arch not in unsat_bd: 1541 ↛ 1543line 1541 didn't jump to line 1543 because the condition on line 1541 was always true

1542 unsat_bd[arch] = [] 

1543 unsat_bd[arch].append(block_txt.strip()) 

1544 arch_results[arch] = BuildDepResult.FAILED 

1545 continue 

1546 

1547 blockers[arch].update(p.pkg_id for p in packages) 

1548 if arch_results[arch] < BuildDepResult.DEPENDS: 

1549 arch_results[arch] = BuildDepResult.DEPENDS 

1550 

1551 if any_arch_ok: 

1552 if arch_results[arch] < bestresult: 

1553 bestresult = arch_results[arch] 

1554 result_archs[arch_results[arch]].append(arch) 

1555 if bestresult == BuildDepResult.OK: 

1556 # we found an architecture where the b-deps-indep are 

1557 # satisfied in the target suite, so we can stop 

1558 break 

1559 

1560 if any_arch_ok: 

1561 arch = result_archs[bestresult][0] 

1562 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}") 

1563 key = "check-%s-on-arch" % dep_type.get_reason() 

1564 build_deps_info[key] = arch 

1565 verdict = self._add_info_for_arch( 

1566 arch, 

1567 excuses_info, 

1568 blockers, 

1569 arch_results, 

1570 dep_type, 

1571 target_suite, 

1572 source_suite, 

1573 excuse, 

1574 verdict, 

1575 ) 

1576 

1577 else: 

1578 for arch in check_archs: 

1579 verdict = self._add_info_for_arch( 

1580 arch, 

1581 excuses_info, 

1582 blockers, 

1583 arch_results, 

1584 dep_type, 

1585 target_suite, 

1586 source_suite, 

1587 excuse, 

1588 verdict, 

1589 ) 

1590 

1591 if unsat_bd: 

1592 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd 

1593 

1594 return verdict 

1595 

1596 

1597class BuiltUsingPolicy(AbstractBasePolicy): 

1598 """Built-Using policy 

1599 

1600 Binaries that incorporate (part of) another source package must list these 

1601 sources under 'Built-Using'. 

1602 

1603 This policy checks if the corresponding sources are available in the 

1604 target suite. If they are not, but they are candidates for migration, a 

1605 dependency is added. 

1606 

1607 If the binary incorporates a newer version of a source, that is not (yet) 

1608 a candidate, we don't want to accept that binary. A rebuild later in the 

1609 primary suite wouldn't fix the issue, because that would incorporate the 

1610 newer version again. 

1611 

1612 If the binary incorporates an older version of the source, a newer version 

1613 will be accepted as a replacement. We assume that this can be fixed by 

1614 rebuilding the binary at some point during the development cycle. 

1615 

1616 Requiring exact version of the source would not be useful in practice. A 

1617 newer upload of that source wouldn't be blocked by this policy, so the 

1618 built-using would be outdated anyway. 

1619 

1620 """ 

1621 

1622 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1623 super().__init__( 

1624 "built-using", 

1625 options, 

1626 suite_info, 

1627 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1628 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1629 ) 

1630 

1631 def initialise(self, britney: "Britney") -> None: 

1632 super().initialise(britney) 

1633 

1634 def apply_srcarch_policy_impl( 

1635 self, 

1636 build_deps_info: dict[str, Any], 

1637 item: MigrationItem, 

1638 arch: str, 

1639 source_data_tdist: SourcePackage | None, 

1640 source_data_srcdist: SourcePackage, 

1641 excuse: "Excuse", 

1642 ) -> PolicyVerdict: 

1643 verdict = PolicyVerdict.PASS 

1644 

1645 source_suite = item.suite 

1646 target_suite = self.suite_info.target_suite 

1647 binaries_s = source_suite.binaries 

1648 

1649 def check_bu_in_suite( 

1650 bu_source: str, bu_version: str, source_suite: Suite 

1651 ) -> bool: 

1652 found = False 

1653 if bu_source not in source_suite.sources: 

1654 return found 

1655 s_source = source_suite.sources[bu_source] 

1656 s_ver = s_source.version 

1657 if apt_pkg.version_compare(s_ver, bu_version) >= 0: 

1658 found = True 

1659 dep = PackageId(bu_source, s_ver, "source") 

1660 if arch in self.options.break_arches: 

1661 excuse.add_detailed_info( 

1662 "Ignoring Built-Using for %s/%s on %s" 

1663 % (pkg_name, arch, dep.uvname) 

1664 ) 

1665 else: 

1666 spec = DependencySpec(DependencyType.BUILT_USING, arch) 

1667 excuse.add_package_depends(spec, {dep}) 

1668 excuse.add_detailed_info( 

1669 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}" 

1670 ) 

1671 

1672 return found 

1673 

1674 for pkg_id in sorted( 

1675 x for x in source_data_srcdist.binaries if x.architecture == arch 

1676 ): 

1677 pkg_name = pkg_id.package_name 

1678 

1679 # retrieve the testing (if present) and unstable corresponding binary packages 

1680 binary_s = binaries_s[arch][pkg_name] 

1681 

1682 for bu in binary_s.builtusing: 

1683 bu_source = bu[0] 

1684 bu_version = bu[1] 

1685 found = False 

1686 if bu_source in target_suite.sources: 

1687 t_source = target_suite.sources[bu_source] 

1688 t_ver = t_source.version 

1689 if apt_pkg.version_compare(t_ver, bu_version) >= 0: 

1690 found = True 

1691 

1692 if not found: 

1693 found = check_bu_in_suite(bu_source, bu_version, source_suite) 

1694 

1695 if not found and source_suite.suite_class.is_additional_source: 

1696 found = check_bu_in_suite( 

1697 bu_source, bu_version, self.suite_info.primary_source_suite 

1698 ) 

1699 

1700 if not found: 

1701 if arch in self.options.break_arches: 

1702 excuse.add_detailed_info( 

1703 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s" 

1704 % (pkg_name, arch, bu_source, bu_version) 

1705 ) 

1706 else: 

1707 verdict = PolicyVerdict.worst_of( 

1708 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1709 ) 

1710 excuse.add_verdict_info( 

1711 verdict, 

1712 "%s/%s has unsatisfiable Built-Using on %s %s" 

1713 % (pkg_name, arch, bu_source, bu_version), 

1714 ) 

1715 

1716 return verdict 

1717 

1718 

1719class BlockPolicy(AbstractBasePolicy): 

1720 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$") 

1721 

1722 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1723 super().__init__( 

1724 "block", 

1725 options, 

1726 suite_info, 

1727 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1728 ) 

1729 self._blockall: dict[str | None, Hint] = {} 

1730 

1731 def initialise(self, britney: "Britney") -> None: 

1732 super().initialise(britney) 

1733 assert self.hints is not None 

1734 for hint in self.hints.search(type="block-all"): 

1735 self._blockall[hint.package] = hint 

1736 

1737 self._key_packages = [] 

1738 if "key" in self._blockall: 

1739 self._key_packages = self._read_key_packages() 

1740 

1741 def _read_key_packages(self) -> list[str]: 

1742 """Read the list of key packages 

1743 

1744 The file contains data in the yaml format : 

1745 

1746 - reason: <something> 

1747 source: <package> 

1748 

1749 The method returns a list of all key packages. 

1750 """ 

1751 filename = os.path.join(self.state_dir, "key_packages.yaml") 

1752 self.logger.info("Loading key packages from %s", filename) 

1753 if os.path.exists(filename): 1753 ↛ 1758line 1753 didn't jump to line 1758 because the condition on line 1753 was always true

1754 with open(filename) as f: 

1755 data = yaml.safe_load(f) 

1756 key_packages = [item["source"] for item in data] 

1757 else: 

1758 self.logger.error( 

1759 "Britney was asked to block key packages, " 

1760 + "but no key_packages.yaml file was found." 

1761 ) 

1762 sys.exit(1) 

1763 

1764 return key_packages 

1765 

1766 def register_hints(self, hint_parser: HintParser) -> None: 

1767 # block related hints are currently defined in hint.py 

1768 pass 

1769 

1770 def _check_blocked( 

1771 self, item: "MigrationItem", arch: str, version: str, excuse: "Excuse" 

1772 ) -> PolicyVerdict: 

1773 verdict = PolicyVerdict.PASS 

1774 blocked = {} 

1775 unblocked = {} 

1776 block_info = {} 

1777 source_suite = item.suite 

1778 suite_name = source_suite.name 

1779 src = item.package 

1780 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE 

1781 

1782 tooltip = ( 

1783 "please contact %s-release if update is needed" % self.options.distribution 

1784 ) 

1785 

1786 assert self.hints is not None 

1787 shints = self.hints.search(package=src) 

1788 mismatches = False 

1789 r = self.BLOCK_HINT_REGEX 

1790 for hint in shints: 

1791 m = r.match(hint.type) 

1792 if m: 

1793 if m.group(1) == "un": 

1794 assert hint.suite is not None 

1795 if ( 

1796 hint.version != version 

1797 or hint.suite.name != suite_name 

1798 or (hint.architecture != arch and hint.architecture != "source") 

1799 ): 

1800 self.logger.info( 

1801 "hint mismatch: %s %s %s", version, arch, suite_name 

1802 ) 

1803 mismatches = True 

1804 else: 

1805 unblocked[m.group(2)] = hint.user 

1806 excuse.add_hint(hint) 

1807 else: 

1808 # block(-*) hint: only accepts a source, so this will 

1809 # always match 

1810 blocked[m.group(2)] = hint.user 

1811 excuse.add_hint(hint) 

1812 

1813 if "block" not in blocked and is_primary: 

1814 # if there is a specific block hint for this package, we don't 

1815 # check for the general hints 

1816 

1817 if self.options.distribution == "debian": 1817 ↛ 1824line 1817 didn't jump to line 1824 because the condition on line 1817 was always true

1818 url = "https://release.debian.org/testing/freeze_policy.html" 

1819 tooltip = ( 

1820 'Follow the <a href="%s">freeze policy</a> when applying for an unblock' 

1821 % url 

1822 ) 

1823 

1824 if "source" in self._blockall: 

1825 blocked["block"] = self._blockall["source"].user 

1826 excuse.add_hint(self._blockall["source"]) 

1827 elif ( 

1828 "new-source" in self._blockall 

1829 and src not in self.suite_info.target_suite.sources 

1830 ): 

1831 blocked["block"] = self._blockall["new-source"].user 

1832 excuse.add_hint(self._blockall["new-source"]) 

1833 # no tooltip: new sources will probably not be accepted anyway 

1834 block_info["block"] = "blocked by {}: is not in {}".format( 

1835 self._blockall["new-source"].user, 

1836 self.suite_info.target_suite.name, 

1837 ) 

1838 elif "key" in self._blockall and src in self._key_packages: 

1839 blocked["block"] = self._blockall["key"].user 

1840 excuse.add_hint(self._blockall["key"]) 

1841 block_info["block"] = "blocked by {}: is a key package ({})".format( 

1842 self._blockall["key"].user, 

1843 tooltip, 

1844 ) 

1845 elif "no-autopkgtest" in self._blockall: 

1846 if excuse.autopkgtest_results == {"PASS"}: 

1847 if not blocked: 1847 ↛ 1873line 1847 didn't jump to line 1873 because the condition on line 1847 was always true

1848 excuse.addinfo("not blocked: has successful autopkgtest") 

1849 else: 

1850 blocked["block"] = self._blockall["no-autopkgtest"].user 

1851 excuse.add_hint(self._blockall["no-autopkgtest"]) 

1852 if not excuse.autopkgtest_results: 

1853 block_info["block"] = ( 

1854 "blocked by %s: does not have autopkgtest (%s)" 

1855 % ( 

1856 self._blockall["no-autopkgtest"].user, 

1857 tooltip, 

1858 ) 

1859 ) 

1860 else: 

1861 block_info["block"] = ( 

1862 "blocked by %s: autopkgtest not fully successful (%s)" 

1863 % ( 

1864 self._blockall["no-autopkgtest"].user, 

1865 tooltip, 

1866 ) 

1867 ) 

1868 

1869 elif not is_primary: 

1870 blocked["block"] = suite_name 

1871 excuse.needs_approval = True 

1872 

1873 for block_cmd in blocked: 

1874 unblock_cmd = "un" + block_cmd 

1875 if block_cmd in unblocked: 

1876 if is_primary or block_cmd == "block-udeb": 

1877 excuse.addinfo( 

1878 "Ignoring %s request by %s, due to %s request by %s" 

1879 % ( 

1880 block_cmd, 

1881 blocked[block_cmd], 

1882 unblock_cmd, 

1883 unblocked[block_cmd], 

1884 ) 

1885 ) 

1886 else: 

1887 excuse.addinfo("Approved by %s" % (unblocked[block_cmd])) 

1888 else: 

1889 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL 

1890 if is_primary or block_cmd == "block-udeb": 

1891 # redirect people to d-i RM for udeb things: 

1892 if block_cmd == "block-udeb": 

1893 tooltip = "please contact the d-i release manager if an update is needed" 

1894 if block_cmd in block_info: 

1895 info = block_info[block_cmd] 

1896 else: 

1897 info = ( 

1898 "Not touching package due to {} request by {} ({})".format( 

1899 block_cmd, 

1900 blocked[block_cmd], 

1901 tooltip, 

1902 ) 

1903 ) 

1904 excuse.add_verdict_info(verdict, info) 

1905 else: 

1906 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM") 

1907 excuse.addreason("block") 

1908 if mismatches: 

1909 excuse.add_detailed_info( 

1910 "Some hints for %s do not match this item" % src 

1911 ) 

1912 return verdict 

1913 

1914 def apply_src_policy_impl( 

1915 self, 

1916 block_info: dict[str, Any], 

1917 item: MigrationItem, 

1918 source_data_tdist: SourcePackage | None, 

1919 source_data_srcdist: SourcePackage, 

1920 excuse: "Excuse", 

1921 ) -> PolicyVerdict: 

1922 return self._check_blocked(item, "source", source_data_srcdist.version, excuse) 

1923 

1924 def apply_srcarch_policy_impl( 

1925 self, 

1926 block_info: dict[str, Any], 

1927 item: MigrationItem, 

1928 arch: str, 

1929 source_data_tdist: SourcePackage | None, 

1930 source_data_srcdist: SourcePackage, 

1931 excuse: "Excuse", 

1932 ) -> PolicyVerdict: 

1933 return self._check_blocked(item, arch, source_data_srcdist.version, excuse) 

1934 

1935 

1936class BuiltOnBuilddPolicy(AbstractBasePolicy): 

1937 

1938 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1939 super().__init__( 

1940 "builtonbuildd", 

1941 options, 

1942 suite_info, 

1943 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1944 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1945 ) 

1946 self._builtonbuildd: dict[str, Any] = { 

1947 "signerinfo": None, 

1948 } 

1949 

1950 def register_hints(self, hint_parser: HintParser) -> None: 

1951 hint_parser.register_hint_type( 

1952 HintType( 

1953 "allow-archall-maintainer-upload", 

1954 versioned=HintAnnotate.FORBIDDEN, 

1955 ) 

1956 ) 

1957 

1958 def initialise(self, britney: "Britney") -> None: 

1959 super().initialise(britney) 

1960 try: 

1961 filename_signerinfo = os.path.join(self.state_dir, "signers.json") 

1962 except AttributeError as e: # pragma: no cover 

1963 raise RuntimeError( 

1964 "Please set STATE_DIR in the britney configuration" 

1965 ) from e 

1966 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo) 

1967 

1968 def apply_srcarch_policy_impl( 

1969 self, 

1970 buildd_info: dict[str, Any], 

1971 item: MigrationItem, 

1972 arch: str, 

1973 source_data_tdist: SourcePackage | None, 

1974 source_data_srcdist: SourcePackage, 

1975 excuse: "Excuse", 

1976 ) -> PolicyVerdict: 

1977 verdict = PolicyVerdict.PASS 

1978 signers = self._builtonbuildd["signerinfo"] 

1979 

1980 if "signed-by" not in buildd_info: 

1981 buildd_info["signed-by"] = {} 

1982 

1983 source_suite = item.suite 

1984 

1985 # horribe hard-coding, but currently, we don't keep track of the 

1986 # component when loading the packages files 

1987 component = "main" 

1988 # we use the source component, because a binary in contrib can 

1989 # belong to a source in main 

1990 section = source_data_srcdist.section 

1991 if section.find("/") > -1: 

1992 component = section.split("/")[0] 

1993 

1994 packages_s_a = source_suite.binaries[arch] 

1995 assert self.hints is not None 

1996 

1997 for pkg_id in sorted( 

1998 x for x in source_data_srcdist.binaries if x.architecture == arch 

1999 ): 

2000 pkg_name = pkg_id.package_name 

2001 binary_u = packages_s_a[pkg_name] 

2002 pkg_arch = binary_u.architecture 

2003 

2004 if binary_u.source_version != source_data_srcdist.version: 2004 ↛ 2005line 2004 didn't jump to line 2005 because the condition on line 2004 was never true

2005 continue 

2006 

2007 if item.architecture != "source" and pkg_arch == "all": 

2008 # we don't care about the existing arch: all binaries when 

2009 # checking a binNMU item, because the arch: all binaries won't 

2010 # migrate anyway 

2011 continue 

2012 

2013 signer = None 

2014 uid = None 

2015 uidinfo = "" 

2016 buildd_ok = False 

2017 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2018 try: 

2019 signer = signers[pkg_name][pkg_id.version][pkg_arch] 

2020 if signer["buildd"]: 

2021 buildd_ok = True 

2022 uid = signer["uid"] 

2023 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}" 

2024 except KeyError: 

2025 self.logger.info( 

2026 "signer info for %s %s (%s) on %s not found " 

2027 % (pkg_name, binary_u.version, pkg_arch, arch) 

2028 ) 

2029 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch) 

2030 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

2031 if not buildd_ok: 

2032 if component != "main": 

2033 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2033 ↛ 2037line 2033 didn't jump to line 2037 because the condition on line 2033 was always true

2034 excuse.add_detailed_info( 

2035 f"{uidinfo}, but package in {component}" 

2036 ) 

2037 buildd_ok = True 

2038 elif pkg_arch == "all": 

2039 allow_hints = self.hints.search( 

2040 "allow-archall-maintainer-upload", package=item.package 

2041 ) 

2042 if allow_hints: 

2043 buildd_ok = True 

2044 verdict = PolicyVerdict.worst_of( 

2045 verdict, PolicyVerdict.PASS_HINTED 

2046 ) 

2047 if pkg_arch not in buildd_info["signed-by"]: 

2048 excuse.addinfo( 

2049 "%s, but whitelisted by %s" 

2050 % (uidinfo, allow_hints[0].user) 

2051 ) 

2052 if not buildd_ok: 

2053 verdict = failure_verdict 

2054 if pkg_arch not in buildd_info["signed-by"]: 

2055 if pkg_arch == "all": 

2056 uidinfo += ( 

2057 ", a new source-only upload is needed to allow migration" 

2058 ) 

2059 excuse.add_verdict_info( 

2060 verdict, "Not built on buildd: %s" % (uidinfo) 

2061 ) 

2062 

2063 if ( 2063 ↛ 2067line 2063 didn't jump to line 2067

2064 pkg_arch in buildd_info["signed-by"] 

2065 and buildd_info["signed-by"][pkg_arch] != uid 

2066 ): 

2067 self.logger.info( 

2068 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed" 

2069 % ( 

2070 pkg_name, 

2071 binary_u.source, 

2072 binary_u.source_version, 

2073 pkg_arch, 

2074 uid, 

2075 buildd_info["signed-by"][pkg_arch], 

2076 ) 

2077 ) 

2078 

2079 buildd_info["signed-by"][pkg_arch] = uid 

2080 

2081 return verdict 

2082 

2083 def _read_signerinfo(self, filename: str) -> dict[str, Any]: 

2084 signerinfo: dict[str, Any] = {} 

2085 self.logger.info("Loading signer info from %s", filename) 

2086 with open(filename) as fd: 2086 ↛ exitline 2086 didn't return from function '_read_signerinfo' because the return on line 2088 wasn't executed

2087 if os.fstat(fd.fileno()).st_size < 1: 2087 ↛ 2088line 2087 didn't jump to line 2088 because the condition on line 2087 was never true

2088 return signerinfo 

2089 signerinfo = json.load(fd) 

2090 

2091 return signerinfo 

2092 

2093 

2094class ImplicitDependencyPolicy(AbstractBasePolicy): 

2095 """Implicit Dependency policy 

2096 

2097 Upgrading a package pkg-a can break the installability of a package pkg-b. 

2098 A newer version (or the removal) of pkg-b might fix the issue. In that 

2099 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only 

2100 migrate if pkg-b also migrates. 

2101 

2102 This policy tries to discover a few common cases, and adds the relevant 

2103 info to the excuses. If another item is needed to fix the 

2104 uninstallability, a dependency is added. If no newer item can fix it, this 

2105 excuse will be blocked. 

2106 

2107 Note that the migration step will check the installability of every 

2108 package, so this policy doesn't need to handle every corner case. It 

2109 must, however, make sure that no excuse is unnecessarily blocked. 

2110 

2111 Some cases that should be detected by this policy: 

2112 

2113 * pkg-a is upgraded from 1.0-1 to 2.0-1, while 

2114 pkg-b has "Depends: pkg-a (<< 2.0)" 

2115 This typically happens if pkg-b has a strict dependency on pkg-a because 

2116 it uses some non-stable internal interface (examples are glibc, 

2117 binutils, python3-defaults, ...) 

2118 

2119 * pkg-a is upgraded from 1.0-1 to 2.0-1, and 

2120 pkg-a 1.0-1 has "Provides: provides-1", 

2121 pkg-a 2.0-1 has "Provides: provides-2", 

2122 pkg-b has "Depends: provides-1" 

2123 This typically happens when pkg-a has an interface that changes between 

2124 versions, and a virtual package is used to identify the version of this 

2125 interface (e.g. perl-api-x.y) 

2126 

2127 """ 

2128 

2129 _pkg_universe: "BinaryPackageUniverse" 

2130 _all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

2131 _allow_uninst: dict[str, set[str | None]] 

2132 _nobreakall_arches: list[str] 

2133 

2134 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2135 super().__init__( 

2136 "implicit-deps", 

2137 options, 

2138 suite_info, 

2139 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2140 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2141 ) 

2142 

2143 def initialise(self, britney: "Britney") -> None: 

2144 super().initialise(britney) 

2145 self._pkg_universe = britney.pkg_universe 

2146 self._all_binaries = britney.all_binaries 

2147 self._smooth_updates = britney.options.smooth_updates 

2148 self._nobreakall_arches = self.options.nobreakall_arches 

2149 self._new_arches = self.options.new_arches 

2150 self._break_arches = self.options.break_arches 

2151 self._allow_uninst = britney.allow_uninst 

2152 self._outofsync_arches = self.options.outofsync_arches 

2153 

2154 def can_be_removed(self, pkg: BinaryPackage) -> bool: 

2155 src = pkg.source 

2156 target_suite = self.suite_info.target_suite 

2157 

2158 # TODO these conditions shouldn't be hardcoded here 

2159 # ideally, we would be able to look up excuses to see if the removal 

2160 # is in there, but in the current flow, this policy is called before 

2161 # all possible excuses exist, so there is no list for us to check 

2162 

2163 if src not in self.suite_info.primary_source_suite.sources: 

2164 # source for pkg not in unstable: candidate for removal 

2165 return True 

2166 

2167 source_t = target_suite.sources[src] 

2168 assert self.hints is not None 

2169 for hint in self.hints.search("remove", package=src, version=source_t.version): 

2170 # removal hint for the source in testing: candidate for removal 

2171 return True 

2172 

2173 if target_suite.is_cruft(pkg): 

2174 # if pkg is cruft in testing, removal will be tried 

2175 return True 

2176 

2177 # the case were the newer version of the source no longer includes the 

2178 # binary (or includes a cruft version of the binary) will be handled 

2179 # separately (in that case there might be an implicit dependency on 

2180 # the newer source) 

2181 

2182 return False 

2183 

2184 def should_skip_rdep( 

2185 self, pkg: BinaryPackage, source_name: str, myarch: str 

2186 ) -> bool: 

2187 target_suite = self.suite_info.target_suite 

2188 

2189 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id): 

2190 # it is not in the target suite, migration cannot break anything 

2191 return True 

2192 

2193 if pkg.source == source_name: 

2194 # if it is built from the same source, it will be upgraded 

2195 # with the source 

2196 return True 

2197 

2198 if self.can_be_removed(pkg): 

2199 # could potentially be removed, so if that happens, it won't be 

2200 # broken 

2201 return True 

2202 

2203 if pkg.architecture == "all" and myarch not in self._nobreakall_arches: 

2204 # arch all on non nobreakarch is allowed to become uninstallable 

2205 return True 

2206 

2207 if pkg.pkg_id.package_name in self._allow_uninst[myarch]: 

2208 # there is a hint to allow this binary to become uninstallable 

2209 return True 

2210 

2211 if not target_suite.is_installable(pkg.pkg_id): 

2212 # it is already uninstallable in the target suite, migration 

2213 # cannot break anything 

2214 return True 

2215 

2216 return False 

2217 

2218 def breaks_installability( 

2219 self, 

2220 pkg_id_t: BinaryPackageId, 

2221 pkg_id_s: BinaryPackageId | None, 

2222 pkg_to_check: BinaryPackageId, 

2223 ) -> bool: 

2224 """ 

2225 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of 

2226 pkg_to_check. 

2227 

2228 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to 

2229 None. 

2230 """ 

2231 

2232 pkg_universe = self._pkg_universe 

2233 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check) 

2234 

2235 for dep in pkg_universe.dependencies_of(pkg_to_check): 

2236 if pkg_id_t not in dep: 

2237 # this depends doesn't have pkg_id_t as alternative, so 

2238 # upgrading pkg_id_t cannot break this dependency clause 

2239 continue 

2240 

2241 # We check all the alternatives for this dependency, to find one 

2242 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s 

2243 found_alternative = False 

2244 for d in dep: 

2245 if d in negative_deps: 

2246 # If this alternative dependency conflicts with 

2247 # pkg_to_check, it cannot be used to satisfy the 

2248 # dependency. 

2249 # This commonly happens when breaks are added to pkg_id_s. 

2250 continue 

2251 

2252 if d.package_name != pkg_id_t.package_name: 

2253 # a binary different from pkg_id_t can satisfy the dep, so 

2254 # upgrading pkg_id_t won't break this dependency 

2255 found_alternative = True 

2256 break 

2257 

2258 if d != pkg_id_s: 

2259 # We want to know the impact of the upgrade of 

2260 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the 

2261 # target suite, any other version of this binary will 

2262 # not be there, so it cannot satisfy this dependency. 

2263 # This includes pkg_id_t, but also other versions. 

2264 continue 

2265 

2266 # pkg_id_s can satisfy the dep 

2267 found_alternative = True 

2268 

2269 if not found_alternative: 

2270 return True 

2271 return False 

2272 

2273 def check_upgrade( 

2274 self, 

2275 pkg_id_t: BinaryPackageId, 

2276 pkg_id_s: BinaryPackageId | None, 

2277 source_name: str, 

2278 myarch: str, 

2279 broken_binaries: set[str], 

2280 excuse: "Excuse", 

2281 ) -> PolicyVerdict: 

2282 verdict = PolicyVerdict.PASS 

2283 

2284 pkg_universe = self._pkg_universe 

2285 all_binaries = self._all_binaries 

2286 

2287 # check all rdeps of the package in testing 

2288 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t) 

2289 

2290 for rdep_pkg in sorted(rdeps_t): 

2291 rdep_p = all_binaries[rdep_pkg] 

2292 

2293 # check some cases where the rdep won't become uninstallable, or 

2294 # where we don't care if it does 

2295 if self.should_skip_rdep(rdep_p, source_name, myarch): 

2296 continue 

2297 

2298 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg): 

2299 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg, 

2300 # there is no implicit dependency 

2301 continue 

2302 

2303 # The upgrade breaks the installability of the rdep. We need to 

2304 # find out if there is a newer version of the rdep that solves the 

2305 # uninstallability. If that is the case, there is an implicit 

2306 # dependency. If not, the upgrade will fail. 

2307 

2308 # check source versions 

2309 newer_versions = find_newer_binaries( 

2310 self.suite_info, rdep_p, add_source_for_dropped_bin=True 

2311 ) 

2312 good_newer_versions = set() 

2313 for npkg, suite in newer_versions: 

2314 if npkg.architecture == "source": 

2315 # When a newer version of the source package doesn't have 

2316 # the binary, we get the source as 'newer version'. In 

2317 # this case, the binary will not be uninstallable if the 

2318 # newer source migrates, because it is no longer there. 

2319 good_newer_versions.add(npkg) 

2320 continue 

2321 assert isinstance(npkg, BinaryPackageId) 

2322 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg): 

2323 good_newer_versions.add(npkg) 

2324 

2325 if good_newer_versions: 

2326 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch) 

2327 excuse.add_package_depends(spec, good_newer_versions) 

2328 else: 

2329 # no good newer versions: no possible solution 

2330 broken_binaries.add(rdep_pkg.name) 

2331 if pkg_id_s: 

2332 action = "migrating {} to {}".format( 

2333 pkg_id_s.name, 

2334 self.suite_info.target_suite.name, 

2335 ) 

2336 else: 

2337 action = "removing {} from {}".format( 

2338 pkg_id_t.name, 

2339 self.suite_info.target_suite.name, 

2340 ) 

2341 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format( 

2342 action, rdep_pkg.name 

2343 ) 

2344 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2345 excuse.add_verdict_info(verdict, info) 

2346 

2347 return verdict 

2348 

2349 def apply_srcarch_policy_impl( 

2350 self, 

2351 implicit_dep_info: dict[str, Any], 

2352 item: MigrationItem, 

2353 arch: str, 

2354 source_data_tdist: SourcePackage | None, 

2355 source_data_srcdist: SourcePackage, 

2356 excuse: "Excuse", 

2357 ) -> PolicyVerdict: 

2358 verdict = PolicyVerdict.PASS 

2359 

2360 if not source_data_tdist: 

2361 # this item is not currently in testing: no implicit dependency 

2362 return verdict 

2363 

2364 if excuse.hasreason("missingbuild"): 

2365 # if the build is missing, the policy would treat this as if the 

2366 # binaries would be removed, which would give incorrect (and 

2367 # confusing) info 

2368 info = "missing build, not checking implicit dependencies on %s" % (arch) 

2369 excuse.add_detailed_info(info) 

2370 return verdict 

2371 

2372 source_suite = item.suite 

2373 source_name = item.package 

2374 target_suite = self.suite_info.target_suite 

2375 all_binaries = self._all_binaries 

2376 

2377 # we check all binaries for this excuse that are currently in testing 

2378 relevant_binaries = [ 

2379 x 

2380 for x in source_data_tdist.binaries 

2381 if (arch == "source" or x.architecture == arch) 

2382 and x.package_name in target_suite.binaries[x.architecture] 

2383 and x.architecture not in self._new_arches 

2384 and x.architecture not in self._break_arches 

2385 and x.architecture not in self._outofsync_arches 

2386 ] 

2387 

2388 broken_binaries: set[str] = set() 

2389 

2390 assert self.hints is not None 

2391 for pkg_id_t in sorted(relevant_binaries): 

2392 mypkg = pkg_id_t.package_name 

2393 myarch = pkg_id_t.architecture 

2394 binaries_t_a = target_suite.binaries[myarch] 

2395 binaries_s_a = source_suite.binaries[myarch] 

2396 

2397 if target_suite.is_cruft(all_binaries[pkg_id_t]): 

2398 # this binary is cruft in testing: it will stay around as long 

2399 # as necessary to satisfy dependencies, so we don't need to 

2400 # care 

2401 continue 

2402 

2403 if mypkg in binaries_s_a: 

2404 mybin = binaries_s_a[mypkg] 

2405 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id 

2406 if mybin.source != source_name: 

2407 # hijack: this is too complicated to check, so we ignore 

2408 # it (the migration code will check the installability 

2409 # later anyway) 

2410 pass 

2411 elif mybin.source_version != source_data_srcdist.version: 

2412 # cruft in source suite: pretend the binary doesn't exist 

2413 pkg_id_s = None 

2414 elif pkg_id_t == pkg_id_s: 

2415 # same binary (probably arch: all from a binNMU): 

2416 # 'upgrading' doesn't change anything, for this binary, so 

2417 # it won't break anything 

2418 continue 

2419 else: 

2420 pkg_id_s = None 

2421 

2422 if not pkg_id_s and is_smooth_update_allowed( 

2423 binaries_t_a[mypkg], self._smooth_updates, self.hints 

2424 ): 

2425 # the binary isn't in the new version (or is cruft there), and 

2426 # smooth updates are allowed: the binary can stay around if 

2427 # that is necessary to satisfy dependencies, so we don't need 

2428 # to check it 

2429 continue 

2430 

2431 if ( 

2432 not pkg_id_s 

2433 and source_data_tdist.version == source_data_srcdist.version 

2434 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE 

2435 and binaries_t_a[mypkg].architecture == "all" 

2436 ): 

2437 # we're very probably migrating a binNMU built in tpu where the arch:all 

2438 # binaries were not copied to it as that's not needed. This policy could 

2439 # needlessly block. 

2440 continue 

2441 

2442 v = self.check_upgrade( 

2443 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse 

2444 ) 

2445 verdict = PolicyVerdict.worst_of(verdict, v) 

2446 

2447 # each arch is processed separately, so if we already have info from 

2448 # other archs, we need to merge the info from this arch 

2449 broken_old = set() 

2450 if "implicit-deps" not in implicit_dep_info: 

2451 implicit_dep_info["implicit-deps"] = {} 

2452 else: 

2453 broken_old = set(implicit_dep_info["implicit-deps"]["broken-binaries"]) 

2454 

2455 implicit_dep_info["implicit-deps"]["broken-binaries"] = sorted( 

2456 broken_old | broken_binaries 

2457 ) 

2458 

2459 return verdict 

2460 

2461 

2462class ReverseRemovalPolicy(AbstractBasePolicy): 

2463 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2464 super().__init__( 

2465 "reverseremoval", 

2466 options, 

2467 suite_info, 

2468 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2469 ) 

2470 

2471 def register_hints(self, hint_parser: HintParser) -> None: 

2472 hint_parser.register_hint_type(HintType("ignore-reverse-remove")) 

2473 

2474 def initialise(self, britney: "Britney") -> None: 

2475 super().initialise(britney) 

2476 

2477 pkg_universe = britney.pkg_universe 

2478 source_suites = britney.suite_info.source_suites 

2479 target_suite = britney.suite_info.target_suite 

2480 

2481 # Build set of the sources of reverse (Build-) Depends 

2482 assert self.hints is not None 

2483 hints = self.hints.search("remove") 

2484 

2485 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set) 

2486 for hint in hints: 

2487 for item in hint.packages: 

2488 # I think we don't need to look at the target suite 

2489 for src_suite in source_suites: 

2490 try: 

2491 my_bins = set(src_suite.sources[item.uvname].binaries) 

2492 except KeyError: 

2493 continue 

2494 compute_reverse_tree(pkg_universe, my_bins) 

2495 for this_bin in my_bins: 

2496 rev_bin.setdefault(this_bin, set()).add(item.uvname) 

2497 

2498 rev_src: dict[str, set[str]] = defaultdict(set) 

2499 for bin_pkg, reasons in rev_bin.items(): 

2500 # If the pkg is in the target suite, there's nothing this 

2501 # policy wants to do. 

2502 if target_suite.is_pkg_in_the_suite(bin_pkg): 

2503 continue 

2504 that_bin = britney.all_binaries[bin_pkg] 

2505 bin_src = that_bin.source + "/" + that_bin.source_version 

2506 rev_src.setdefault(bin_src, set()).update(reasons) 

2507 self._block_src_for_rm_hint = rev_src 

2508 

2509 def apply_src_policy_impl( 

2510 self, 

2511 rev_remove_info: dict[str, Any], 

2512 item: MigrationItem, 

2513 source_data_tdist: SourcePackage | None, 

2514 source_data_srcdist: SourcePackage, 

2515 excuse: "Excuse", 

2516 ) -> PolicyVerdict: 

2517 verdict = PolicyVerdict.PASS 

2518 

2519 if item.name in self._block_src_for_rm_hint: 

2520 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name])) 

2521 assert self.hints is not None 

2522 ignore_hints = self.hints.search( 

2523 "ignore-reverse-remove", package=item.uvname, version=item.version 

2524 ) 

2525 excuse.addreason("reverseremoval") 

2526 if ignore_hints: 

2527 excuse.addreason("ignore-reverse-remove") 

2528 excuse.addinfo( 

2529 "Should block migration because of remove hint for %s, but forced by %s" 

2530 % (reason, ignore_hints[0].user) 

2531 ) 

2532 verdict = PolicyVerdict.PASS_HINTED 

2533 else: 

2534 excuse.addinfo("Remove hint for (transitive) dependency: %s" % reason) 

2535 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2536 

2537 return verdict 

2538 

2539 

2540class ReproduciblePolicy(AbstractBasePolicy): 

2541 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2542 super().__init__( 

2543 "reproducible", 

2544 options, 

2545 suite_info, 

2546 {SuiteClass.PRIMARY_SOURCE_SUITE}, 

2547 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2548 ) 

2549 self._reproducible: dict[str, Any] = { 

2550 "source": {}, 

2551 "target": {}, 

2552 } 

2553 

2554 # Default values for this policy's options 

2555 parse_option(options, "repro_success_bounty", default=0, to_int=True) 

2556 parse_option(options, "repro_regression_penalty", default=0, to_int=True) 

2557 parse_option(options, "repro_url") 

2558 parse_option(options, "repro_retry_url") 

2559 parse_option(options, "repro_components") 

2560 

2561 def register_hints(self, hint_parser: HintParser) -> None: 

2562 hint_parser.register_hint_type( 

2563 HintType("ignore-reproducible", architectured=HintAnnotate.OPTIONAL) 

2564 ) 

2565 

2566 def initialise(self, britney: "Britney") -> None: 

2567 super().initialise(britney) 

2568 source_suite = self.suite_info.primary_source_suite 

2569 target_suite = self.suite_info.target_suite 

2570 try: 

2571 filename = os.path.join(self.state_dir, "reproducible.json") 

2572 except AttributeError as e: # pragma: no cover 

2573 raise RuntimeError( 

2574 "Please set STATE_DIR in the britney configuration" 

2575 ) from e 

2576 

2577 self._reproducible = self._read_repro_status( 

2578 filename, 

2579 source={source_suite.name, source_suite.codename}, 

2580 target={target_suite.name, target_suite.codename}, 

2581 ) 

2582 

2583 def apply_srcarch_policy_impl( 

2584 self, 

2585 reproducible_info: dict[str, Any], 

2586 item: MigrationItem, 

2587 arch: str, 

2588 source_data_tdist: SourcePackage | None, 

2589 source_data_srcdist: SourcePackage, 

2590 excuse: "Excuse", 

2591 ) -> PolicyVerdict: 

2592 verdict = PolicyVerdict.PASS 

2593 

2594 # we don't want to apply this policy (yet) on binNMUs 

2595 if item.architecture != "source": 

2596 return verdict 

2597 

2598 # we're not supposed to judge on this arch 

2599 if arch not in self.options.repro_arches: 

2600 return verdict 

2601 

2602 # bail out if this arch has no packages for this source (not build 

2603 # here) 

2604 if arch not in excuse.packages: 

2605 return verdict 

2606 

2607 # horrible hard-coding, but currently, we don't keep track of the 

2608 # component when loading the packages files 

2609 component = "main" 

2610 if "/" in (section := source_data_srcdist.section): 

2611 component = section.split("/")[0] 

2612 

2613 if ( 

2614 self.options.repro_components 

2615 and component not in self.options.repro_components 

2616 ): 

2617 return verdict 

2618 

2619 source_name = item.package 

2620 try: 

2621 tar_res = self._reproducible["target"][arch] 

2622 src_res = self._reproducible["source"][arch] 

2623 except KeyError: 

2624 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2625 msg = "No reproducible data available at all for %s" % arch 

2626 excuse.add_verdict_info(verdict, msg) 

2627 return verdict 

2628 

2629 if source_data_tdist is None: 

2630 target_suite_state = "new" 

2631 elif source_name not in tar_res: 

2632 target_suite_state = "unknown" 

2633 elif tar_res[source_name]["version"] == source_data_tdist.version: 

2634 target_suite_state = tar_res[source_name]["status"] 

2635 else: 

2636 target_suite_state = "stale" 

2637 

2638 if source_name in src_res and src_res[source_name]["version"] == item.version: 

2639 source_suite_state = src_res[source_name]["status"] 

2640 else: 

2641 source_suite_state = "unknown" 

2642 

2643 # status of ['E404', 'FTBFS', 'FTBR', 'NFU', 'blacklisted', 'depwait', 

2644 # 'reproducible', 'timeout'] with ['new', 'stale', 'unknown'] 

2645 wait_states = ("E404", "depwait", "stale", "timeout", "unknown") 

2646 no_build_states = ("FTBFS", "NFU", "blacklisted") 

2647 

2648 # if this package doesn't build on this architecture, we don't need to 

2649 # judge it 

2650 # FTBFS: Fails to build from source on r-b infra 

2651 # NFU: the package explicitly doesn't support building on this arch 

2652 # blacklisted: per package per arch per suite 

2653 if source_suite_state in no_build_states: 

2654 return verdict 

2655 # Assume depwait in the source suite only are intermittent (might not 

2656 # be true, e.g. with new build depends) 

2657 if source_suite_state == target_suite_state and target_suite_state == "depwait": 

2658 return verdict 

2659 

2660 if self.options.repro_url: 

2661 url = self.options.repro_url.format(package=quote(source_name), arch=arch) 

2662 url_html = ' - <a href="%s">info</a>' % url 

2663 if self.options.repro_retry_url: 

2664 url_html += ( 

2665 ' <a href="%s">♻ </a>' 

2666 % self.options.repro_retry_url.format( 

2667 package=quote(source_name), arch=arch 

2668 ) 

2669 ) 

2670 # When run on multiple archs, the last one "wins" 

2671 reproducible_info["reproducible-test-url"] = url 

2672 else: 

2673 url = None 

2674 url_html = "" 

2675 

2676 eligible_for_bounty = False 

2677 if source_suite_state == "reproducible": 

2678 verdict = PolicyVerdict.PASS 

2679 msg = f"Reproducible on {arch}{url_html}" 

2680 reproducible_info.setdefault("test-results", []).append( 

2681 "reproducible on %s" % arch 

2682 ) 

2683 eligible_for_bounty = True 

2684 elif source_suite_state == "FTBR": 

2685 if target_suite_state == "new": 

2686 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2687 msg = f"New but not reproducible on {arch}{url_html}" 

2688 reproducible_info.setdefault("test-results", []).append( 

2689 "new but not reproducible on %s" % arch 

2690 ) 

2691 elif target_suite_state in wait_states: 

2692 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2693 msg = "Waiting for reproducibility reference results on {}{}".format( 

2694 arch, 

2695 url_html, 

2696 ) 

2697 reproducible_info.setdefault("test-results", []).append( 

2698 "waiting-for-reference-results on %s" % arch 

2699 ) 

2700 elif target_suite_state == "reproducible": 

2701 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2702 msg = f"Reproducibility regression on {arch}{url_html}" 

2703 reproducible_info.setdefault("test-results", []).append( 

2704 "regression on %s" % arch 

2705 ) 

2706 elif target_suite_state == "FTBR": 

2707 verdict = PolicyVerdict.PASS 

2708 msg = "Ignoring non-reproducibility on {} (not a regression){}".format( 

2709 arch, 

2710 url_html, 

2711 ) 

2712 reproducible_info.setdefault("test-results", []).append( 

2713 "not reproducible on %s" % arch 

2714 ) 

2715 else: 

2716 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2717 msg = "No reference result, but not reproducibility on {}{}".format( 

2718 arch, 

2719 url_html, 

2720 ) 

2721 reproducible_info.setdefault("test-results", []).append( 

2722 f"reference {target_suite_state} on {arch}" 

2723 ) 

2724 elif source_suite_state in wait_states: 

2725 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2726 msg = f"Waiting for reproducibility test results on {arch}{url_html}" 

2727 reproducible_info.setdefault("test-results", []).append( 

2728 "waiting-for-test-results on %s" % arch 

2729 ) 

2730 else: 

2731 raise KeyError("Unhandled reproducibility state %s" % source_suite_state) 

2732 

2733 if verdict.is_rejected: 

2734 assert self.hints is not None 

2735 for hint_arch in ("source", arch): 

2736 for ignore_hint in self.hints.search( 

2737 "ignore-reproducible", 

2738 package=source_name, 

2739 version=source_data_srcdist.version, 

2740 architecture=hint_arch, 

2741 ): 

2742 verdict = PolicyVerdict.PASS_HINTED 

2743 reproducible_info.setdefault("ignored-reproducible", {}).setdefault( 

2744 arch, {} 

2745 ).setdefault("issued-by", []).append(ignore_hint.user) 

2746 excuse.addinfo( 

2747 "Ignoring reproducibility issue on %s as requested " 

2748 "by %s" % (arch, ignore_hint.user) 

2749 ) 

2750 break 

2751 

2752 if self.options.repro_success_bounty and eligible_for_bounty: 

2753 excuse.add_bounty("reproducibility", self.options.repro_success_bounty) 

2754 

2755 if self.options.repro_regression_penalty and verdict in { 

2756 PolicyVerdict.REJECTED_PERMANENTLY, 

2757 PolicyVerdict.REJECTED_TEMPORARILY, 

2758 }: 

2759 if self.options.repro_regression_penalty > 0: 

2760 excuse.add_penalty( 

2761 "reproducibility", self.options.repro_regression_penalty 

2762 ) 

2763 # In case we give penalties instead of blocking, we must always pass 

2764 verdict = PolicyVerdict.PASS 

2765 

2766 if verdict.is_rejected: 

2767 excuse.add_verdict_info(verdict, msg) 

2768 else: 

2769 excuse.addinfo(msg) 

2770 

2771 return verdict 

2772 

2773 def _read_repro_status( 

2774 self, filename: str, source: set[str], target: set[str] 

2775 ) -> dict[str, dict[str, str]]: 

2776 summary = self._reproducible 

2777 self.logger.info("Loading reproducibility report from %s", filename) 

2778 with open(filename) as fd: 

2779 if os.fstat(fd.fileno()).st_size < 1: 

2780 return summary 

2781 data = json.load(fd) 

2782 

2783 for result in data: 

2784 if result["suite"] in source: 

2785 summary["source"].setdefault(result["architecture"], {})[ 

2786 result["package"] 

2787 ] = result 

2788 if result["suite"] in target: 

2789 summary["target"].setdefault(result["architecture"], {})[ 

2790 result["package"] 

2791 ] = result 

2792 

2793 return summary