Coverage for britney2/policies/policy.py: 86%

1243 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-03-23 07:34 +0000

1import json 

2import logging 

3import optparse 

4import os 

5import re 

6import sys 

7import time 

8from abc import ABC, abstractmethod 

9from collections import defaultdict 

10from collections.abc import Callable, Container 

11from enum import IntEnum, unique 

12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast 

13from urllib.parse import quote 

14 

15import apt_pkg 

16import yaml 

17 

18from britney2 import ( 

19 BinaryPackage, 

20 BinaryPackageId, 

21 DependencyType, 

22 PackageId, 

23 SourcePackage, 

24 Suite, 

25 SuiteClass, 

26 Suites, 

27 TargetSuite, 

28) 

29from britney2.excusedeps import DependencySpec 

30from britney2.hints import ( 

31 Hint, 

32 HintCollection, 

33 HintParser, 

34 PolicyHintParserProto, 

35 split_into_one_hint_per_package, 

36) 

37from britney2.inputs.suiteloader import SuiteContentLoader 

38from britney2.migrationitem import MigrationItem, MigrationItemFactory 

39from britney2.policies import ApplySrcPolicy, PolicyVerdict 

40from britney2.utils import ( 

41 compute_reverse_tree, 

42 find_newer_binaries, 

43 get_dependency_solvers, 

44 is_smooth_update_allowed, 

45 parse_option, 

46 GetDependencySolversProto, 

47) 

48 

49if TYPE_CHECKING: 49 ↛ 50line 49 didn't jump to line 50, because the condition on line 49 was never true

50 from ..britney import Britney 

51 from ..excuse import Excuse 

52 from ..installability.universe import BinaryPackageUniverse 

53 

54 

55class PolicyLoadRequest: 

56 __slots__ = ("_options_name", "_default_value", "_policy_constructor") 

57 

58 def __init__( 

59 self, 

60 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

61 options_name: Optional[str], 

62 default_value: bool, 

63 ) -> None: 

64 self._policy_constructor = policy_constructor 

65 self._options_name = options_name 

66 self._default_value = default_value 

67 

68 def is_enabled(self, options: optparse.Values) -> bool: 

69 if self._options_name is None: 

70 assert self._default_value 

71 return True 

72 actual_value = getattr(options, self._options_name, None) 

73 if actual_value is None: 

74 return self._default_value 

75 return actual_value.lower() in ("yes", "y", "true", "t") 

76 

77 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy": 

78 return self._policy_constructor(options, suite_info) 

79 

80 @classmethod 

81 def always_load( 

82 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"] 

83 ) -> "PolicyLoadRequest": 

84 return cls(policy_constructor, None, True) 

85 

86 @classmethod 

87 def conditionally_load( 

88 cls, 

89 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

90 option_name: str, 

91 default_value: bool, 

92 ) -> "PolicyLoadRequest": 

93 return cls(policy_constructor, option_name, default_value) 

94 

95 

96class PolicyEngine(object): 

97 def __init__(self) -> None: 

98 self._policies: list["BasePolicy"] = [] 

99 

100 def add_policy(self, policy: "BasePolicy") -> None: 

101 self._policies.append(policy) 

102 

103 def load_policies( 

104 self, 

105 options: optparse.Values, 

106 suite_info: Suites, 

107 policy_load_requests: list[PolicyLoadRequest], 

108 ) -> None: 

109 for policy_load_request in policy_load_requests: 

110 if policy_load_request.is_enabled(options): 

111 self.add_policy(policy_load_request.load(options, suite_info)) 

112 

113 def register_policy_hints(self, hint_parser: HintParser) -> None: 

114 for policy in self._policies: 

115 policy.register_hints(hint_parser) 

116 

117 def initialise(self, britney: "Britney", hints: HintCollection) -> None: 

118 for policy in self._policies: 

119 policy.hints = hints 

120 policy.initialise(britney) 

121 

122 def save_state(self, britney: "Britney") -> None: 

123 for policy in self._policies: 

124 policy.save_state(britney) 

125 

126 def apply_src_policies( 

127 self, 

128 item: MigrationItem, 

129 source_t: Optional[SourcePackage], 

130 source_u: SourcePackage, 

131 excuse: "Excuse", 

132 ) -> None: 

133 excuse_verdict = excuse.policy_verdict 

134 source_suite = item.suite 

135 suite_class = source_suite.suite_class 

136 for policy in self._policies: 

137 pinfo: dict[str, Any] = {} 

138 policy_verdict = PolicyVerdict.NOT_APPLICABLE 

139 if suite_class in policy.applicable_suites: 

140 if policy.src_policy.run_arch: 

141 for arch in policy.options.architectures: 

142 v = policy.apply_srcarch_policy_impl( 

143 pinfo, item, arch, source_t, source_u, excuse 

144 ) 

145 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

146 if policy.src_policy.run_src: 

147 v = policy.apply_src_policy_impl( 

148 pinfo, item, source_t, source_u, excuse 

149 ) 

150 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

151 # The base policy provides this field, so the subclass should leave it blank 

152 assert "verdict" not in pinfo 

153 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

154 excuse.policy_info[policy.policy_id] = pinfo 

155 pinfo["verdict"] = policy_verdict.name 

156 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

157 excuse.policy_verdict = excuse_verdict 

158 

159 def apply_srcarch_policies( 

160 self, 

161 item: MigrationItem, 

162 arch: str, 

163 source_t: Optional[SourcePackage], 

164 source_u: SourcePackage, 

165 excuse: "Excuse", 

166 ) -> None: 

167 excuse_verdict = excuse.policy_verdict 

168 source_suite = item.suite 

169 suite_class = source_suite.suite_class 

170 for policy in self._policies: 

171 pinfo: dict[str, Any] = {} 

172 if suite_class in policy.applicable_suites: 

173 policy_verdict = policy.apply_srcarch_policy_impl( 

174 pinfo, item, arch, source_t, source_u, excuse 

175 ) 

176 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

177 # The base policy provides this field, so the subclass should leave it blank 

178 assert "verdict" not in pinfo 

179 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

180 excuse.policy_info[policy.policy_id] = pinfo 

181 pinfo["verdict"] = policy_verdict.name 

182 excuse.policy_verdict = excuse_verdict 

183 

184 

185class BasePolicy(ABC): 

186 britney: "Britney" 

187 policy_id: str 

188 hints: Optional[HintCollection] 

189 applicable_suites: set[SuiteClass] 

190 src_policy: ApplySrcPolicy 

191 options: optparse.Values 

192 suite_info: Suites 

193 

194 def __init__( 

195 self, 

196 options: optparse.Values, 

197 suite_info: Suites, 

198 ) -> None: 

199 """The BasePolicy constructor 

200 

201 :param options: The options member of Britney with all the 

202 config values. 

203 """ 

204 

205 @property 

206 @abstractmethod 

207 def state_dir(self) -> str: ... 207 ↛ exitline 207 didn't return from function 'state_dir'

208 

209 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover 

210 """Register new hints that this policy accepts 

211 

212 :param hint_parser: (see HintParser.register_hint_type) 

213 """ 

214 

215 def initialise(self, britney: "Britney") -> None: # pragma: no cover 

216 """Called once to make the policy initialise any data structures 

217 

218 This is useful for e.g. parsing files or other "heavy do-once" work. 

219 

220 :param britney: This is the instance of the "Britney" class. 

221 """ 

222 self.britney = britney 

223 

224 def save_state(self, britney: "Britney") -> None: # pragma: no cover 

225 """Called once at the end of the run to make the policy save any persistent data 

226 

227 Note this will *not* be called for "dry-runs" as such runs should not change 

228 the state. 

229 

230 :param britney: This is the instance of the "Britney" class. 

231 """ 

232 

233 def apply_src_policy_impl( 

234 self, 

235 policy_info: dict[str, Any], 

236 item: MigrationItem, 

237 source_data_tdist: Optional[SourcePackage], 

238 source_data_srcdist: SourcePackage, 

239 excuse: "Excuse", 

240 ) -> PolicyVerdict: # pragma: no cover 

241 """Apply a policy on a given source migration 

242 

243 Britney will call this method on a given source package, when 

244 Britney is considering to migrate it from the given source 

245 suite to the target suite. The policy will then evaluate the 

246 the migration and then return a verdict. 

247 

248 :param policy_info: A dictionary of all policy results. The 

249 policy can add a value stored in a key related to its name. 

250 (e.g. policy_info['age'] = {...}). This will go directly into 

251 the "excuses.yaml" output. 

252 

253 :param item: The migration item the policy is applied to. 

254 

255 :param source_data_tdist: Information about the source package 

256 in the target distribution (e.g. "testing"). This is the 

257 data structure in source_suite.sources[source_name] 

258 

259 :param source_data_srcdist: Information about the source 

260 package in the source distribution (e.g. "unstable" or "tpu"). 

261 This is the data structure in target_suite.sources[source_name] 

262 

263 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

264 """ 

265 return PolicyVerdict.NOT_APPLICABLE 

266 

267 def apply_srcarch_policy_impl( 

268 self, 

269 policy_info: dict[str, Any], 

270 item: MigrationItem, 

271 arch: str, 

272 source_data_tdist: Optional[SourcePackage], 

273 source_data_srcdist: SourcePackage, 

274 excuse: "Excuse", 

275 ) -> PolicyVerdict: 

276 """Apply a policy on a given binary migration 

277 

278 Britney will call this method on binaries from a given source package 

279 on a given architecture, when Britney is considering to migrate them 

280 from the given source suite to the target suite. The policy will then 

281 evaluate the migration and then return a verdict. 

282 

283 :param policy_info: A dictionary of all policy results. The 

284 policy can add a value stored in a key related to its name. 

285 (e.g. policy_info['age'] = {...}). This will go directly into 

286 the "excuses.yaml" output. 

287 

288 :param item: The migration item the policy is applied to. 

289 

290 :param arch: The architecture the item is applied to. This is mostly 

291 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC 

292 (as that is the only case where arch can differ from item.architecture) 

293 

294 :param source_data_tdist: Information about the source package 

295 in the target distribution (e.g. "testing"). This is the 

296 data structure in source_suite.sources[source_name] 

297 

298 :param source_data_srcdist: Information about the source 

299 package in the source distribution (e.g. "unstable" or "tpu"). 

300 This is the data structure in target_suite.sources[source_name] 

301 

302 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

303 """ 

304 # if the policy doesn't implement this function, assume it's OK 

305 return PolicyVerdict.NOT_APPLICABLE 

306 

307 

308class AbstractBasePolicy(BasePolicy): 

309 """ 

310 A shared abstract class for building BasePolicy objects. 

311 

312 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy 

313 objects with just a two-item constructor, while all other uses of BasePolicy- 

314 derived objects need the 5-item constructor. So AbstractBasePolicy was split 

315 out to document this. 

316 """ 

317 

318 def __init__( 

319 self, 

320 policy_id: str, 

321 options: optparse.Values, 

322 suite_info: Suites, 

323 applicable_suites: set[SuiteClass], 

324 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC, 

325 ) -> None: 

326 """Concrete initializer. 

327 

328 :param policy_id: Identifies the policy. It will 

329 determine the key used for the excuses.yaml etc. 

330 

331 :param options: The options member of Britney with all the 

332 config values. 

333 

334 :param applicable_suites: Where this policy applies. 

335 """ 

336 self.policy_id = policy_id 

337 self.options = options 

338 self.suite_info = suite_info 

339 self.applicable_suites = applicable_suites 

340 self.src_policy = src_policy 

341 self.hints: Optional[HintCollection] = None 

342 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

343 self.logger = logging.getLogger(logger_name) 

344 

345 @property 

346 def state_dir(self) -> str: 

347 return cast(str, self.options.state_dir) 

348 

349 

350_T = TypeVar("_T") 

351 

352 

353class SimplePolicyHint(Hint, Generic[_T]): 

354 def __init__( 

355 self, 

356 user: str, 

357 hint_type: str, 

358 policy_parameter: _T, 

359 packages: list[MigrationItem], 

360 ) -> None: 

361 super().__init__(user, hint_type, packages) 

362 self._policy_parameter = policy_parameter 

363 

364 def __eq__(self, other: Any) -> bool: 

365 if self.type != other.type or self._policy_parameter != other._policy_parameter: 

366 return False 

367 return super().__eq__(other) 

368 

369 def str(self) -> str: 

370 return "%s %s %s" % ( 

371 self._type, 

372 str(self._policy_parameter), 

373 " ".join(x.name for x in self._packages), 

374 ) 

375 

376 

377class AgeDayHint(SimplePolicyHint[int]): 

378 @property 

379 def days(self) -> int: 

380 return self._policy_parameter 

381 

382 

383class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]): 

384 @property 

385 def ignored_rcbugs(self) -> frozenset[str]: 

386 return self._policy_parameter 

387 

388 

389def simple_policy_hint_parser_function( 

390 class_name: Callable[[str, str, _T, list[MigrationItem]], Hint], 

391 converter: Callable[[str], _T], 

392) -> PolicyHintParserProto: 

393 def f( 

394 mi_factory: MigrationItemFactory, 

395 hints: HintCollection, 

396 who: str, 

397 hint_name: str, 

398 *args: str, 

399 ) -> None: 

400 policy_parameter = args[0] 

401 args = args[1:] 

402 for item in mi_factory.parse_items(*args): 

403 hints.add_hint( 

404 class_name(who, hint_name, converter(policy_parameter), [item]) 

405 ) 

406 

407 return f 

408 

409 

410class AgePolicy(AbstractBasePolicy): 

411 """Configurable Aging policy for source migrations 

412 

413 The AgePolicy will let packages stay in the source suite for a pre-defined 

414 amount of days before letting migrate (based on their urgency, if any). 

415 

416 The AgePolicy's decision is influenced by the following: 

417 

418 State files: 

419 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source 

420 packages. Note that urgencies are "sticky" and the most "urgent" urgency 

421 will be used (i.e. the one with lowest age-requirements). 

422 - This file needs to be updated externally, if the policy should take 

423 urgencies into consideration. If empty (or not updated), the policy 

424 will simply use the default urgency (see the "Config" section below) 

425 - In Debian, these values are taken from the .changes file, but that is 

426 not a requirement for Britney. 

427 * ${STATE_DIR}/age-policy-dates: File containing the age of all source 

428 packages. 

429 - The policy will automatically update this file. 

430 Config: 

431 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency 

432 (or for unknown urgencies). Will also be used to set the "minimum" 

433 aging requirements for packages not in the target suite. 

434 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the 

435 given urgency. 

436 - Commonly used urgencies are: low, medium, high, emergency, critical 

437 Hints: 

438 * urgent <source>/<version>: Disregard the age requirements for a given 

439 source/version. 

440 * age-days X <source>/<version>: Set the age requirements for a given 

441 source/version to X days. Note that X can exceed the highest 

442 age-requirement normally given. 

443 

444 """ 

445 

446 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

447 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}) 

448 self._min_days = self._generate_mindays_table() 

449 self._min_days_default = 0 

450 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day) 

451 # NB: _date_now is used in tests 

452 time_now = time.time() 

453 if hasattr(self.options, "fake_runtime"): 

454 time_now = int(self.options.fake_runtime) 

455 self.logger.info("overriding runtime with fake_runtime %d" % time_now) 

456 

457 self._date_now = int(((time_now / (60 * 60)) - 19) / 24) 

458 self._dates: dict[str, tuple[str, int]] = {} 

459 self._urgencies: dict[str, str] = {} 

460 self._default_urgency: str = self.options.default_urgency 

461 self._penalty_immune_urgencies: frozenset[str] = frozenset() 

462 if hasattr(self.options, "no_penalties"): 

463 self._penalty_immune_urgencies = frozenset( 

464 x.strip() for x in self.options.no_penalties.split() 

465 ) 

466 self._bounty_min_age: Optional[int] = None # initialised later 

467 

468 def _generate_mindays_table(self) -> dict[str, int]: 

469 mindays: dict[str, int] = {} 

470 for k in dir(self.options): 

471 if not k.startswith("mindays_"): 

472 continue 

473 v = getattr(self.options, k) 

474 try: 

475 as_days = int(v) 

476 except ValueError: 

477 raise ValueError( 

478 "Unable to parse " 

479 + k 

480 + " as a number of days. Must be 0 or a positive integer" 

481 ) 

482 if as_days < 0: 482 ↛ 483line 482 didn't jump to line 483, because the condition on line 482 was never true

483 raise ValueError( 

484 "The value of " + k + " must be zero or a positive integer" 

485 ) 

486 mindays[k.split("_")[1]] = as_days 

487 return mindays 

488 

489 def register_hints(self, hint_parser: HintParser) -> None: 

490 hint_parser.register_hint_type( 

491 "age-days", simple_policy_hint_parser_function(AgeDayHint, int), min_args=2 

492 ) 

493 hint_parser.register_hint_type("urgent", split_into_one_hint_per_package) 

494 

495 def initialise(self, britney: "Britney") -> None: 

496 super().initialise(britney) 

497 self._read_dates_file() 

498 self._read_urgencies_file() 

499 if self._default_urgency not in self._min_days: # pragma: no cover 

500 raise ValueError( 

501 "Missing age-requirement for default urgency (MINDAYS_%s)" 

502 % self._default_urgency 

503 ) 

504 self._min_days_default = self._min_days[self._default_urgency] 

505 try: 

506 self._bounty_min_age = int(self.options.bounty_min_age) 

507 except ValueError: 507 ↛ 508line 507 didn't jump to line 508, because the exception caught by line 507 didn't happen

508 if self.options.bounty_min_age in self._min_days: 

509 self._bounty_min_age = self._min_days[self.options.bounty_min_age] 

510 else: # pragma: no cover 

511 raise ValueError( 

512 "Please fix BOUNTY_MIN_AGE in the britney configuration" 

513 ) 

514 except AttributeError: 

515 # The option wasn't defined in the configuration 

516 self._bounty_min_age = 0 

517 

518 def save_state(self, britney: "Britney") -> None: 

519 super().save_state(britney) 

520 self._write_dates_file() 

521 

522 def apply_src_policy_impl( 

523 self, 

524 age_info: dict[str, Any], 

525 item: MigrationItem, 

526 source_data_tdist: Optional[SourcePackage], 

527 source_data_srcdist: SourcePackage, 

528 excuse: "Excuse", 

529 ) -> PolicyVerdict: 

530 # retrieve the urgency for the upload, ignoring it if this is a NEW package 

531 # (not present in the target suite) 

532 source_name = item.package 

533 urgency = self._urgencies.get(source_name, self._default_urgency) 

534 

535 if urgency not in self._min_days: 535 ↛ 536line 535 didn't jump to line 536, because the condition on line 535 was never true

536 age_info["unknown-urgency"] = urgency 

537 urgency = self._default_urgency 

538 

539 if not source_data_tdist: 

540 if self._min_days[urgency] < self._min_days_default: 

541 age_info["urgency-reduced"] = { 

542 "from": urgency, 

543 "to": self._default_urgency, 

544 } 

545 urgency = self._default_urgency 

546 

547 if source_name not in self._dates: 

548 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

549 elif self._dates[source_name][0] != source_data_srcdist.version: 

550 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

551 

552 days_old = self._date_now - self._dates[source_name][1] 

553 min_days = self._min_days[urgency] 

554 for bounty in excuse.bounty: 

555 if excuse.bounty[bounty]: 555 ↛ 554line 555 didn't jump to line 554, because the condition on line 555 was never false

556 self.logger.info( 

557 "Applying bounty for %s granted by %s: %d days", 

558 source_name, 

559 bounty, 

560 excuse.bounty[bounty], 

561 ) 

562 excuse.addinfo( 

563 "Required age reduced by %d days because of %s" 

564 % (excuse.bounty[bounty], bounty) 

565 ) 

566 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen" 

567 min_days -= excuse.bounty[bounty] 

568 if urgency not in self._penalty_immune_urgencies: 

569 for penalty in excuse.penalty: 

570 if excuse.penalty[penalty]: 570 ↛ 569line 570 didn't jump to line 569, because the condition on line 570 was never false

571 self.logger.info( 

572 "Applying penalty for %s given by %s: %d days", 

573 source_name, 

574 penalty, 

575 excuse.penalty[penalty], 

576 ) 

577 excuse.addinfo( 

578 "Required age increased by %d days because of %s" 

579 % (excuse.penalty[penalty], penalty) 

580 ) 

581 assert ( 

582 excuse.penalty[penalty] > 0 

583 ), "negative penalties should be handled earlier" 

584 min_days += excuse.penalty[penalty] 

585 

586 assert self._bounty_min_age is not None 

587 # the age in BOUNTY_MIN_AGE can be higher than the one associated with 

588 # the real urgency, so don't forget to take it into account 

589 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency]) 

590 if min_days < bounty_min_age: 

591 min_days = bounty_min_age 

592 excuse.addinfo( 

593 "Required age is not allowed to drop below %d days" % min_days 

594 ) 

595 

596 age_info["current-age"] = days_old 

597 

598 assert self.hints is not None 

599 for age_days_hint in cast( 

600 "list[AgeDayHint]", 

601 self.hints.search( 

602 "age-days", package=source_name, version=source_data_srcdist.version 

603 ), 

604 ): 

605 new_req = age_days_hint.days 

606 age_info["age-requirement-reduced"] = { 

607 "new-requirement": new_req, 

608 "changed-by": age_days_hint.user, 

609 } 

610 if "original-age-requirement" not in age_info: 610 ↛ 612line 610 didn't jump to line 612, because the condition on line 610 was never false

611 age_info["original-age-requirement"] = min_days 

612 min_days = new_req 

613 

614 age_info["age-requirement"] = min_days 

615 res = PolicyVerdict.PASS 

616 

617 if days_old < min_days: 

618 urgent_hints = self.hints.search( 

619 "urgent", package=source_name, version=source_data_srcdist.version 

620 ) 

621 if urgent_hints: 

622 age_info["age-requirement-reduced"] = { 

623 "new-requirement": 0, 

624 "changed-by": urgent_hints[0].user, 

625 } 

626 res = PolicyVerdict.PASS_HINTED 

627 else: 

628 res = PolicyVerdict.REJECTED_TEMPORARILY 

629 

630 # update excuse 

631 age_hint = age_info.get("age-requirement-reduced", None) 

632 age_min_req = age_info["age-requirement"] 

633 if age_hint: 

634 new_req = age_hint["new-requirement"] 

635 who = age_hint["changed-by"] 

636 if new_req: 

637 excuse.addinfo( 

638 "Overriding age needed from %d days to %d by %s" 

639 % (age_min_req, new_req, who) 

640 ) 

641 age_min_req = new_req 

642 else: 

643 excuse.addinfo("Too young, but urgency pushed by %s" % who) 

644 age_min_req = 0 

645 excuse.setdaysold(age_info["current-age"], age_min_req) 

646 

647 if age_min_req == 0: 

648 excuse.addinfo("%d days old" % days_old) 

649 elif days_old < age_min_req: 

650 excuse.add_verdict_info( 

651 res, "Too young, only %d of %d days old" % (days_old, age_min_req) 

652 ) 

653 else: 

654 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req)) 

655 

656 return res 

657 

658 def _read_dates_file(self) -> None: 

659 """Parse the dates file""" 

660 dates = self._dates 

661 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates") 

662 using_new_name = False 

663 try: 

664 filename = os.path.join(self.state_dir, "age-policy-dates") 

665 if not os.path.exists(filename) and os.path.exists(fallback_filename): 

666 filename = fallback_filename 

667 else: 

668 using_new_name = True 

669 except AttributeError: 

670 if os.path.exists(fallback_filename): 

671 filename = fallback_filename 

672 else: 

673 raise RuntimeError("Please set STATE_DIR in the britney configuration") 

674 

675 try: 

676 with open(filename, encoding="utf-8") as fd: 

677 for line in fd: 

678 if line.startswith("#"): 

679 # Ignore comment lines (mostly used for tests) 

680 continue 

681 # <source> <version> <date>) 

682 ln = line.split() 

683 if len(ln) != 3: # pragma: no cover 

684 continue 

685 try: 

686 dates[ln[0]] = (ln[1], int(ln[2])) 

687 except ValueError: # pragma: no cover 

688 pass 

689 except FileNotFoundError: 

690 if not using_new_name: 690 ↛ 692line 690 didn't jump to line 692, because the condition on line 690 was never true

691 # If we using the legacy name, then just give up 

692 raise 

693 self.logger.info("%s does not appear to exist. Creating it", filename) 

694 with open(filename, mode="x", encoding="utf-8"): 

695 pass 

696 

697 def _read_urgencies_file(self) -> None: 

698 urgencies = self._urgencies 

699 min_days_default = self._min_days_default 

700 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency") 

701 try: 

702 filename = os.path.join(self.state_dir, "age-policy-urgencies") 

703 if not os.path.exists(filename) and os.path.exists(fallback_filename): 

704 filename = fallback_filename 

705 except AttributeError: 

706 filename = fallback_filename 

707 

708 sources_s = self.suite_info.primary_source_suite.sources 

709 sources_t = self.suite_info.target_suite.sources 

710 

711 with open(filename, errors="surrogateescape", encoding="ascii") as fd: 

712 for line in fd: 

713 if line.startswith("#"): 

714 # Ignore comment lines (mostly used for tests) 

715 continue 

716 # <source> <version> <urgency> 

717 ln = line.split() 

718 if len(ln) != 3: 718 ↛ 719line 718 didn't jump to line 719, because the condition on line 718 was never true

719 continue 

720 

721 # read the minimum days associated with the urgencies 

722 urgency_old = urgencies.get(ln[0], None) 

723 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type] 

724 mindays_new = self._min_days.get(ln[2], min_days_default) 

725 

726 # if the new urgency is lower (so the min days are higher), do nothing 

727 if mindays_old <= mindays_new: 

728 continue 

729 

730 # if the package exists in the target suite and it is more recent, do nothing 

731 tsrcv = sources_t.get(ln[0], None) 

732 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0: 

733 continue 

734 

735 # if the package doesn't exist in the primary source suite or it is older, do nothing 

736 usrcv = sources_s.get(ln[0], None) 

737 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 737 ↛ 738line 737 didn't jump to line 738, because the condition on line 737 was never true

738 continue 

739 

740 # update the urgency for the package 

741 urgencies[ln[0]] = ln[2] 

742 

743 def _write_dates_file(self) -> None: 

744 dates = self._dates 

745 try: 

746 directory = self.state_dir 

747 basename = "age-policy-dates" 

748 old_file = os.path.join(self.suite_info.target_suite.path, "Dates") 

749 except AttributeError: 

750 directory = self.suite_info.target_suite.path 

751 basename = "Dates" 

752 old_file = None 

753 filename = os.path.join(directory, basename) 

754 filename_tmp = os.path.join(directory, "%s_new" % basename) 

755 with open(filename_tmp, "w", encoding="utf-8") as fd: 

756 for pkg in sorted(dates): 

757 version, date = dates[pkg] 

758 fd.write("%s %s %d\n" % (pkg, version, date)) 

759 os.rename(filename_tmp, filename) 

760 if old_file is not None and os.path.exists(old_file): 

761 self.logger.info("Removing old age-policy-dates file %s", old_file) 

762 os.unlink(old_file) 

763 

764 

765class RCBugPolicy(AbstractBasePolicy): 

766 """RC bug regression policy for source migrations 

767 

768 The RCBugPolicy will read provided list of RC bugs and block any 

769 source upload that would introduce a *new* RC bug in the target 

770 suite. 

771 

772 The RCBugPolicy's decision is influenced by the following: 

773 

774 State files: 

775 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in 

776 the given suite (one for both primary source suite and the target sutie is 

777 needed). 

778 - These files need to be updated externally. 

779 """ 

780 

781 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

782 super().__init__( 

783 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

784 ) 

785 self._bugs_source: Optional[dict[str, set[str]]] = None 

786 self._bugs_target: Optional[dict[str, set[str]]] = None 

787 

788 def register_hints(self, hint_parser: HintParser) -> None: 

789 f = simple_policy_hint_parser_function( 

790 IgnoreRCBugHint, lambda x: frozenset(x.split(",")) 

791 ) 

792 hint_parser.register_hint_type("ignore-rc-bugs", f, min_args=2) 

793 

794 def initialise(self, britney: "Britney") -> None: 

795 super().initialise(britney) 

796 source_suite = self.suite_info.primary_source_suite 

797 target_suite = self.suite_info.target_suite 

798 fallback_unstable = os.path.join(source_suite.path, "BugsV") 

799 fallback_testing = os.path.join(target_suite.path, "BugsV") 

800 try: 

801 filename_unstable = os.path.join( 

802 self.state_dir, "rc-bugs-%s" % source_suite.name 

803 ) 

804 filename_testing = os.path.join( 

805 self.state_dir, "rc-bugs-%s" % target_suite.name 

806 ) 

807 if ( 

808 not os.path.exists(filename_unstable) 

809 and not os.path.exists(filename_testing) 

810 and os.path.exists(fallback_unstable) 

811 and os.path.exists(fallback_testing) 

812 ): 

813 filename_unstable = fallback_unstable 

814 filename_testing = fallback_testing 

815 except AttributeError: 

816 filename_unstable = fallback_unstable 

817 filename_testing = fallback_testing 

818 self._bugs_source = self._read_bugs(filename_unstable) 

819 self._bugs_target = self._read_bugs(filename_testing) 

820 

821 def apply_src_policy_impl( 

822 self, 

823 rcbugs_info: dict[str, Any], 

824 item: MigrationItem, 

825 source_data_tdist: Optional[SourcePackage], 

826 source_data_srcdist: SourcePackage, 

827 excuse: "Excuse", 

828 ) -> PolicyVerdict: 

829 assert self._bugs_source is not None # for type checking 

830 assert self._bugs_target is not None # for type checking 

831 bugs_t = set() 

832 bugs_u = set() 

833 source_name = item.package 

834 

835 for src_key in (source_name, "src:%s" % source_name): 

836 if source_data_tdist and src_key in self._bugs_target: 

837 bugs_t.update(self._bugs_target[src_key]) 

838 if src_key in self._bugs_source: 

839 bugs_u.update(self._bugs_source[src_key]) 

840 

841 for pkg, _, _ in source_data_srcdist.binaries: 

842 if pkg in self._bugs_source: 

843 bugs_u |= self._bugs_source[pkg] 

844 if source_data_tdist: 

845 for pkg, _, _ in source_data_tdist.binaries: 

846 if pkg in self._bugs_target: 

847 bugs_t |= self._bugs_target[pkg] 

848 

849 # If a package is not in the target suite, it has no RC bugs per 

850 # definition. Unfortunately, it seems that the live-data is 

851 # not always accurate (e.g. live-2011-12-13 suggests that 

852 # obdgpslogger had the same bug in testing and unstable, 

853 # but obdgpslogger was not in testing at that time). 

854 # - For the curious, obdgpslogger was removed on that day 

855 # and the BTS probably had not caught up with that fact. 

856 # (https://tracker.debian.org/news/415935) 

857 assert not bugs_t or source_data_tdist, ( 

858 "%s had bugs in the target suite but is not present" % source_name 

859 ) 

860 

861 verdict = PolicyVerdict.PASS 

862 

863 assert self.hints is not None 

864 for ignore_hint in cast( 

865 list[IgnoreRCBugHint], 

866 self.hints.search( 

867 "ignore-rc-bugs", 

868 package=source_name, 

869 version=source_data_srcdist.version, 

870 ), 

871 ): 

872 ignored_bugs = ignore_hint.ignored_rcbugs 

873 

874 # Only handle one hint for now 

875 if "ignored-bugs" in rcbugs_info: 

876 self.logger.info( 

877 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s", 

878 ignore_hint.user, 

879 source_name, 

880 rcbugs_info["ignored-bugs"]["issued-by"], 

881 ) 

882 continue 

883 if not ignored_bugs.isdisjoint(bugs_u): 883 ↛ 892line 883 didn't jump to line 892, because the condition on line 883 was never false

884 bugs_u -= ignored_bugs 

885 bugs_t -= ignored_bugs 

886 rcbugs_info["ignored-bugs"] = { 

887 "bugs": sorted(ignored_bugs), 

888 "issued-by": ignore_hint.user, 

889 } 

890 verdict = PolicyVerdict.PASS_HINTED 

891 else: 

892 self.logger.info( 

893 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package", 

894 ignore_hint.user, 

895 source_name, 

896 str(ignored_bugs), 

897 ) 

898 

899 rcbugs_info["shared-bugs"] = sorted(bugs_u & bugs_t) 

900 rcbugs_info["unique-source-bugs"] = sorted(bugs_u - bugs_t) 

901 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_u) 

902 

903 # update excuse 

904 new_bugs = rcbugs_info["unique-source-bugs"] 

905 old_bugs = rcbugs_info["unique-target-bugs"] 

906 excuse.setbugs(old_bugs, new_bugs) 

907 

908 if new_bugs: 

909 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

910 excuse.add_verdict_info( 

911 verdict, 

912 "Updating %s would introduce bugs in %s: %s" 

913 % ( 

914 source_name, 

915 self.suite_info.target_suite.name, 

916 ", ".join( 

917 [ 

918 '<a href="https://bugs.debian.org/%s">#%s</a>' 

919 % (quote(a), a) 

920 for a in new_bugs 

921 ] 

922 ), 

923 ), 

924 ) 

925 

926 if old_bugs: 

927 excuse.addinfo( 

928 "Updating %s will fix bugs in %s: %s" 

929 % ( 

930 source_name, 

931 self.suite_info.target_suite.name, 

932 ", ".join( 

933 [ 

934 '<a href="https://bugs.debian.org/%s">#%s</a>' 

935 % (quote(a), a) 

936 for a in old_bugs 

937 ] 

938 ), 

939 ) 

940 ) 

941 

942 return verdict 

943 

944 def _read_bugs(self, filename: str) -> dict[str, set[str]]: 

945 """Read the release critical bug summary from the specified file 

946 

947 The file contains rows with the format: 

948 

949 <package-name> <bug number>[,<bug number>...] 

950 

951 The method returns a dictionary where the key is the binary package 

952 name and the value is the list of open RC bugs for it. 

953 """ 

954 bugs: dict[str, set[str]] = {} 

955 self.logger.info("Loading RC bugs data from %s", filename) 

956 with open(filename, encoding="ascii") as f: 

957 for line in f: 

958 ln = line.split() 

959 if len(ln) != 2: # pragma: no cover 

960 self.logger.warning("Malformed line found in line %s", line) 

961 continue 

962 pkg = ln[0] 

963 if pkg not in bugs: 

964 bugs[pkg] = set() 

965 bugs[pkg].update(ln[1].split(",")) 

966 return bugs 

967 

968 

969class PiupartsPolicy(AbstractBasePolicy): 

970 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

971 super().__init__( 

972 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

973 ) 

974 self._piuparts_source: Optional[dict[str, tuple[str, str]]] = None 

975 self._piuparts_target: Optional[dict[str, tuple[str, str]]] = None 

976 

977 def register_hints(self, hint_parser: HintParser) -> None: 

978 hint_parser.register_hint_type( 

979 "ignore-piuparts", split_into_one_hint_per_package 

980 ) 

981 

982 def initialise(self, britney: "Britney") -> None: 

983 super().initialise(britney) 

984 source_suite = self.suite_info.primary_source_suite 

985 target_suite = self.suite_info.target_suite 

986 try: 

987 filename_unstable = os.path.join( 

988 self.state_dir, "piuparts-summary-%s.json" % source_suite.name 

989 ) 

990 filename_testing = os.path.join( 

991 self.state_dir, "piuparts-summary-%s.json" % target_suite.name 

992 ) 

993 except AttributeError as e: # pragma: no cover 

994 raise RuntimeError( 

995 "Please set STATE_DIR in the britney configuration" 

996 ) from e 

997 self._piuparts_source = self._read_piuparts_summary( 

998 filename_unstable, keep_url=True 

999 ) 

1000 self._piuparts_target = self._read_piuparts_summary( 

1001 filename_testing, keep_url=False 

1002 ) 

1003 

1004 def apply_src_policy_impl( 

1005 self, 

1006 piuparts_info: dict[str, Any], 

1007 item: MigrationItem, 

1008 source_data_tdist: Optional[SourcePackage], 

1009 source_data_srcdist: SourcePackage, 

1010 excuse: "Excuse", 

1011 ) -> PolicyVerdict: 

1012 assert self._piuparts_source is not None # for type checking 

1013 assert self._piuparts_target is not None # for type checking 

1014 source_name = item.package 

1015 

1016 if source_name in self._piuparts_target: 

1017 testing_state = self._piuparts_target[source_name][0] 

1018 else: 

1019 testing_state = "X" 

1020 url: Optional[str] 

1021 if source_name in self._piuparts_source: 

1022 unstable_state, url = self._piuparts_source[source_name] 

1023 else: 

1024 unstable_state = "X" 

1025 url = None 

1026 url_html = "(no link yet)" 

1027 if url is not None: 

1028 url_html = '<a href="{0}">{0}</a>'.format(url) 

1029 

1030 if unstable_state == "P": 

1031 # Not a regression 

1032 msg = "Piuparts tested OK - {0}".format(url_html) 

1033 result = PolicyVerdict.PASS 

1034 piuparts_info["test-results"] = "pass" 

1035 elif unstable_state == "F": 

1036 if testing_state != unstable_state: 

1037 piuparts_info["test-results"] = "regression" 

1038 msg = "Rejected due to piuparts regression - {0}".format(url_html) 

1039 result = PolicyVerdict.REJECTED_PERMANENTLY 

1040 else: 

1041 piuparts_info["test-results"] = "failed" 

1042 msg = "Ignoring piuparts failure (Not a regression) - {0}".format( 

1043 url_html 

1044 ) 

1045 result = PolicyVerdict.PASS 

1046 elif unstable_state == "W": 

1047 msg = "Waiting for piuparts test results (stalls migration) - {0}".format( 

1048 url_html 

1049 ) 

1050 result = PolicyVerdict.REJECTED_TEMPORARILY 

1051 piuparts_info["test-results"] = "waiting-for-test-results" 

1052 else: 

1053 msg = "Cannot be tested by piuparts (not a blocker) - {0}".format(url_html) 

1054 piuparts_info["test-results"] = "cannot-be-tested" 

1055 result = PolicyVerdict.PASS 

1056 

1057 if url is not None: 

1058 piuparts_info["piuparts-test-url"] = url 

1059 if result.is_rejected: 

1060 excuse.add_verdict_info(result, msg) 

1061 else: 

1062 excuse.addinfo(msg) 

1063 

1064 if result.is_rejected: 

1065 assert self.hints is not None 

1066 for ignore_hint in self.hints.search( 

1067 "ignore-piuparts", 

1068 package=source_name, 

1069 version=source_data_srcdist.version, 

1070 ): 

1071 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user} 

1072 result = PolicyVerdict.PASS_HINTED 

1073 excuse.addinfo( 

1074 "Ignoring piuparts issue as requested by {0}".format( 

1075 ignore_hint.user 

1076 ) 

1077 ) 

1078 break 

1079 

1080 return result 

1081 

1082 def _read_piuparts_summary( 

1083 self, filename: str, keep_url: bool = True 

1084 ) -> dict[str, tuple[str, str]]: 

1085 summary: dict[str, tuple[str, str]] = {} 

1086 self.logger.info("Loading piuparts report from %s", filename) 

1087 with open(filename) as fd: 

1088 if os.fstat(fd.fileno()).st_size < 1: 

1089 return summary 

1090 data = json.load(fd) 

1091 try: 

1092 if ( 

1093 data["_id"] != "Piuparts Package Test Results Summary" 

1094 or data["_version"] != "1.0" 

1095 ): # pragma: no cover 

1096 raise ValueError( 

1097 "Piuparts results in {0} does not have the correct ID or version".format( 

1098 filename 

1099 ) 

1100 ) 

1101 except KeyError as e: # pragma: no cover 

1102 raise ValueError( 

1103 "Piuparts results in {0} is missing id or version field".format( 

1104 filename 

1105 ) 

1106 ) from e 

1107 for source, suite_data in data["packages"].items(): 

1108 if len(suite_data) != 1: # pragma: no cover 

1109 raise ValueError( 

1110 "Piuparts results in {0}, the source {1} does not have exactly one result set".format( 

1111 filename, source 

1112 ) 

1113 ) 

1114 item = next(iter(suite_data.values())) 

1115 state, _, url = item 

1116 if not keep_url: 

1117 url = None 

1118 summary[source] = (state, url) 

1119 

1120 return summary 

1121 

1122 

1123class DependsPolicy(AbstractBasePolicy): 

1124 pkg_universe: "BinaryPackageUniverse" 

1125 broken_packages: frozenset["BinaryPackageId"] 

1126 all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

1127 allow_uninst: dict[str, set[Optional[str]]] 

1128 

1129 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1130 super().__init__( 

1131 "depends", 

1132 options, 

1133 suite_info, 

1134 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1135 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1136 ) 

1137 self.nobreakall_arches = None 

1138 self.new_arches = None 

1139 self.break_arches = None 

1140 

1141 def initialise(self, britney: "Britney") -> None: 

1142 super().initialise(britney) 

1143 self.pkg_universe = britney.pkg_universe 

1144 self.broken_packages = self.pkg_universe.broken_packages 

1145 self.all_binaries = britney.all_binaries 

1146 self.nobreakall_arches = self.options.nobreakall_arches 

1147 self.new_arches = self.options.new_arches 

1148 self.break_arches = self.options.break_arches 

1149 self.allow_uninst = britney.allow_uninst 

1150 

1151 def apply_srcarch_policy_impl( 

1152 self, 

1153 deps_info: dict[str, Any], 

1154 item: MigrationItem, 

1155 arch: str, 

1156 source_data_tdist: Optional[SourcePackage], 

1157 source_data_srcdist: SourcePackage, 

1158 excuse: "Excuse", 

1159 ) -> PolicyVerdict: 

1160 verdict = PolicyVerdict.PASS 

1161 

1162 assert self.break_arches is not None 

1163 assert self.new_arches is not None 

1164 if arch in self.break_arches or arch in self.new_arches: 

1165 # we don't check these in the policy (TODO - for now?) 

1166 return verdict 

1167 

1168 source_suite = item.suite 

1169 target_suite = self.suite_info.target_suite 

1170 

1171 packages_s_a = source_suite.binaries[arch] 

1172 packages_t_a = target_suite.binaries[arch] 

1173 

1174 my_bins = sorted(excuse.packages[arch]) 

1175 

1176 arch_all_installable = set() 

1177 arch_arch_installable = set() 

1178 consider_it_regression = True 

1179 

1180 for pkg_id in my_bins: 

1181 pkg_name = pkg_id.package_name 

1182 binary_u = packages_s_a[pkg_name] 

1183 pkg_arch = binary_u.architecture 

1184 

1185 # in some cases, we want to track the uninstallability of a 

1186 # package (because the autopkgtest policy uses this), but we still 

1187 # want to allow the package to be uninstallable 

1188 skip_dep_check = False 

1189 

1190 if binary_u.source_version != source_data_srcdist.version: 

1191 # don't check cruft in unstable 

1192 continue 

1193 

1194 if item.architecture != "source" and pkg_arch == "all": 

1195 # we don't care about the existing arch: all binaries when 

1196 # checking a binNMU item, because the arch: all binaries won't 

1197 # migrate anyway 

1198 skip_dep_check = True 

1199 

1200 if pkg_arch == "all" and arch not in self.nobreakall_arches: 

1201 skip_dep_check = True 

1202 

1203 if pkg_name in self.allow_uninst[arch]: 1203 ↛ 1206line 1203 didn't jump to line 1206, because the condition on line 1203 was never true

1204 # this binary is allowed to become uninstallable, so we don't 

1205 # need to check anything 

1206 skip_dep_check = True 

1207 

1208 if pkg_name in packages_t_a: 

1209 oldbin = packages_t_a[pkg_name] 

1210 if not target_suite.is_installable(oldbin.pkg_id): 

1211 # as the current binary in testing is already 

1212 # uninstallable, the newer version is allowed to be 

1213 # uninstallable as well, so we don't need to check 

1214 # anything 

1215 skip_dep_check = True 

1216 consider_it_regression = False 

1217 

1218 if pkg_id in self.broken_packages: 

1219 if pkg_arch == "all": 

1220 arch_all_installable.add(False) 

1221 else: 

1222 arch_arch_installable.add(False) 

1223 # dependencies can't be satisfied by all the known binaries - 

1224 # this certainly won't work... 

1225 excuse.add_unsatisfiable_on_arch(arch) 

1226 if skip_dep_check: 

1227 # ...but if the binary is allowed to become uninstallable, 

1228 # we don't care 

1229 # we still want the binary to be listed as uninstallable, 

1230 continue 

1231 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

1232 excuse.add_verdict_info( 

1233 verdict, "%s/%s has unsatisfiable dependency" % (pkg_name, arch) 

1234 ) 

1235 excuse.addreason("depends") 

1236 else: 

1237 if pkg_arch == "all": 

1238 arch_all_installable.add(True) 

1239 else: 

1240 arch_arch_installable.add(True) 

1241 

1242 if skip_dep_check: 

1243 continue 

1244 

1245 deps = self.pkg_universe.dependencies_of(pkg_id) 

1246 

1247 for dep in deps: 

1248 # dep is a list of packages, each of which satisfy the 

1249 # dependency 

1250 

1251 if dep == frozenset(): 

1252 continue 

1253 is_ok = False 

1254 needed_for_dep = set() 

1255 

1256 for alternative in dep: 

1257 if target_suite.is_pkg_in_the_suite(alternative): 

1258 # dep can be satisfied in testing - ok 

1259 is_ok = True 

1260 elif alternative in my_bins: 

1261 # can be satisfied by binary from same item: will be 

1262 # ok if item migrates 

1263 is_ok = True 

1264 else: 

1265 needed_for_dep.add(alternative) 

1266 

1267 if not is_ok: 

1268 spec = DependencySpec(DependencyType.DEPENDS, arch) 

1269 excuse.add_package_depends(spec, needed_for_dep) 

1270 

1271 # The autopkgtest policy needs delicate trade offs for 

1272 # non-installability. The current choice (considering source 

1273 # migration and only binaries built by the version of the 

1274 # source): 

1275 # 

1276 # * Run autopkgtest if all arch:$arch binaries are installable 

1277 # (but some or all arch:all binaries are not) 

1278 # 

1279 # * Don't schedule nor wait for not installable arch:all only package 

1280 # on ! NOBREAKALL_ARCHES 

1281 # 

1282 # * Run autopkgtest if installability isn't a regression (there are (or 

1283 # rather, should) not be a lot of packages in this state, and most 

1284 # likely they'll just fail quickly) 

1285 # 

1286 # * Don't schedule, but wait otherwise 

1287 if arch_arch_installable == {True} and False in arch_all_installable: 

1288 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1289 elif ( 

1290 arch not in self.nobreakall_arches 

1291 and arch_arch_installable == set() 

1292 and False in arch_all_installable 

1293 ): 

1294 deps_info.setdefault("arch_all_not_installable", []).append(arch) 

1295 elif not consider_it_regression: 

1296 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1297 

1298 return verdict 

1299 

1300 

1301@unique 

1302class BuildDepResult(IntEnum): 

1303 # relation is satisfied in target 

1304 OK = 1 

1305 # relation can be satisfied by other packages in source 

1306 DEPENDS = 2 

1307 # relation cannot be satisfied 

1308 FAILED = 3 

1309 

1310 

1311class BuildDependsPolicy(AbstractBasePolicy): 

1312 

1313 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1314 super().__init__( 

1315 "build-depends", 

1316 options, 

1317 suite_info, 

1318 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1319 ) 

1320 self._all_buildarch: list[str] = [] 

1321 

1322 parse_option(options, "all_buildarch") 

1323 

1324 def initialise(self, britney: "Britney") -> None: 

1325 super().initialise(britney) 

1326 if self.options.all_buildarch: 

1327 self._all_buildarch = SuiteContentLoader.config_str_as_list( 

1328 self.options.all_buildarch, [] 

1329 ) 

1330 

1331 def apply_src_policy_impl( 

1332 self, 

1333 build_deps_info: dict[str, Any], 

1334 item: MigrationItem, 

1335 source_data_tdist: Optional[SourcePackage], 

1336 source_data_srcdist: SourcePackage, 

1337 excuse: "Excuse", 

1338 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1339 ) -> PolicyVerdict: 

1340 verdict = PolicyVerdict.PASS 

1341 

1342 # analyze the dependency fields (if present) 

1343 deps = source_data_srcdist.build_deps_arch 

1344 if deps: 

1345 v = self._check_build_deps( 

1346 deps, 

1347 DependencyType.BUILD_DEPENDS, 

1348 build_deps_info, 

1349 item, 

1350 source_data_tdist, 

1351 source_data_srcdist, 

1352 excuse, 

1353 get_dependency_solvers=get_dependency_solvers, 

1354 ) 

1355 verdict = PolicyVerdict.worst_of(verdict, v) 

1356 

1357 ideps = source_data_srcdist.build_deps_indep 

1358 if ideps: 

1359 v = self._check_build_deps( 

1360 ideps, 

1361 DependencyType.BUILD_DEPENDS_INDEP, 

1362 build_deps_info, 

1363 item, 

1364 source_data_tdist, 

1365 source_data_srcdist, 

1366 excuse, 

1367 get_dependency_solvers=get_dependency_solvers, 

1368 ) 

1369 verdict = PolicyVerdict.worst_of(verdict, v) 

1370 

1371 return verdict 

1372 

1373 def _get_check_archs( 

1374 self, archs: Container[str], dep_type: DependencyType 

1375 ) -> list[str]: 

1376 oos = self.options.outofsync_arches 

1377 

1378 if dep_type == DependencyType.BUILD_DEPENDS: 

1379 return [ 

1380 arch 

1381 for arch in self.options.architectures 

1382 if arch in archs and arch not in oos 

1383 ] 

1384 

1385 # first try the all buildarch 

1386 checkarchs = list(self._all_buildarch) 

1387 # then try the architectures where this source has arch specific 

1388 # binaries (in the order of the architecture config file) 

1389 checkarchs.extend( 

1390 arch 

1391 for arch in self.options.architectures 

1392 if arch in archs and arch not in checkarchs 

1393 ) 

1394 # then try all other architectures 

1395 checkarchs.extend( 

1396 arch for arch in self.options.architectures if arch not in checkarchs 

1397 ) 

1398 

1399 # and drop OUTOFSYNC_ARCHES 

1400 return [arch for arch in checkarchs if arch not in oos] 

1401 

1402 def _add_info_for_arch( 

1403 self, 

1404 arch: str, 

1405 excuses_info: dict[str, list[str]], 

1406 blockers: dict[str, set[BinaryPackageId]], 

1407 results: dict[str, BuildDepResult], 

1408 dep_type: DependencyType, 

1409 target_suite: TargetSuite, 

1410 source_suite: Suite, 

1411 excuse: "Excuse", 

1412 verdict: PolicyVerdict, 

1413 ) -> PolicyVerdict: 

1414 if arch in blockers: 

1415 packages = blockers[arch] 

1416 

1417 # for the solving packages, update the excuse to add the dependencies 

1418 for p in packages: 

1419 if arch not in self.options.break_arches: 1419 ↛ 1418line 1419 didn't jump to line 1418, because the condition on line 1419 was never false

1420 spec = DependencySpec(dep_type, arch) 

1421 excuse.add_package_depends(spec, {p}) 

1422 

1423 if arch in results and results[arch] == BuildDepResult.FAILED: 

1424 verdict = PolicyVerdict.worst_of( 

1425 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1426 ) 

1427 

1428 if arch in excuses_info: 

1429 for excuse_text in excuses_info[arch]: 

1430 if verdict.is_rejected: 1430 ↛ 1433line 1430 didn't jump to line 1433, because the condition on line 1430 was never false

1431 excuse.add_verdict_info(verdict, excuse_text) 

1432 else: 

1433 excuse.addinfo(excuse_text) 

1434 

1435 return verdict 

1436 

1437 def _check_build_deps( 

1438 self, 

1439 deps: str, 

1440 dep_type: DependencyType, 

1441 build_deps_info: dict[str, Any], 

1442 item: MigrationItem, 

1443 source_data_tdist: Optional[SourcePackage], 

1444 source_data_srcdist: SourcePackage, 

1445 excuse: "Excuse", 

1446 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1447 ) -> PolicyVerdict: 

1448 verdict = PolicyVerdict.PASS 

1449 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP 

1450 

1451 britney = self.britney 

1452 

1453 # local copies for better performance 

1454 parse_src_depends = apt_pkg.parse_src_depends 

1455 

1456 source_name = item.package 

1457 source_suite = item.suite 

1458 target_suite = self.suite_info.target_suite 

1459 binaries_s = source_suite.binaries 

1460 provides_s = source_suite.provides_table 

1461 binaries_t = target_suite.binaries 

1462 provides_t = target_suite.provides_table 

1463 unsat_bd: dict[str, list[str]] = {} 

1464 relevant_archs: set[str] = { 

1465 binary.architecture 

1466 for binary in source_data_srcdist.binaries 

1467 if britney.all_binaries[binary].architecture != "all" 

1468 } 

1469 

1470 excuses_info: dict[str, list[str]] = defaultdict(list) 

1471 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set) 

1472 arch_results = {} 

1473 result_archs = defaultdict(list) 

1474 bestresult = BuildDepResult.FAILED 

1475 check_archs = self._get_check_archs(relevant_archs, dep_type) 

1476 if not check_archs: 

1477 # when the arch list is empty, we check the b-d on any arch, instead of all archs 

1478 # this happens for Build-Depens on a source package that only produces arch: all binaries 

1479 any_arch_ok = True 

1480 check_archs = self._get_check_archs( 

1481 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP 

1482 ) 

1483 

1484 for arch in check_archs: 

1485 # retrieve the binary package from the specified suite and arch 

1486 binaries_s_a = binaries_s[arch] 

1487 provides_s_a = provides_s[arch] 

1488 binaries_t_a = binaries_t[arch] 

1489 provides_t_a = provides_t[arch] 

1490 arch_results[arch] = BuildDepResult.OK 

1491 # for every dependency block (formed as conjunction of disjunction) 

1492 for block_txt in deps.split(","): 

1493 block_list = parse_src_depends(block_txt, False, arch) 

1494 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be 

1495 # filtered out by (e.g.) architecture restrictions. We need to cope with this while 

1496 # keeping block_txt and block aligned. 

1497 if not block_list: 

1498 # Relation is not relevant for this architecture. 

1499 continue 

1500 block = block_list[0] 

1501 # if the block is satisfied in the target suite, then skip the block 

1502 if get_dependency_solvers( 

1503 block, binaries_t_a, provides_t_a, build_depends=True 

1504 ): 

1505 # Satisfied in the target suite; all ok. 

1506 continue 

1507 

1508 # check if the block can be satisfied in the source suite, and list the solving packages 

1509 packages = get_dependency_solvers( 

1510 block, binaries_s_a, provides_s_a, build_depends=True 

1511 ) 

1512 sources = sorted(p.source for p in packages) 

1513 

1514 # if the dependency can be satisfied by the same source package, skip the block: 

1515 # obviously both binary packages will enter the target suite together 

1516 if source_name in sources: 1516 ↛ 1517line 1516 didn't jump to line 1517, because the condition on line 1516 was never true

1517 continue 

1518 

1519 # if no package can satisfy the dependency, add this information to the excuse 

1520 if not packages: 

1521 excuses_info[arch].append( 

1522 "%s unsatisfiable %s on %s: %s" 

1523 % (source_name, dep_type, arch, block_txt.strip()) 

1524 ) 

1525 if arch not in unsat_bd: 1525 ↛ 1527line 1525 didn't jump to line 1527, because the condition on line 1525 was never false

1526 unsat_bd[arch] = [] 

1527 unsat_bd[arch].append(block_txt.strip()) 

1528 arch_results[arch] = BuildDepResult.FAILED 

1529 continue 

1530 

1531 blockers[arch].update(p.pkg_id for p in packages) 

1532 if arch_results[arch] < BuildDepResult.DEPENDS: 

1533 arch_results[arch] = BuildDepResult.DEPENDS 

1534 

1535 if any_arch_ok: 

1536 if arch_results[arch] < bestresult: 

1537 bestresult = arch_results[arch] 

1538 result_archs[arch_results[arch]].append(arch) 

1539 if bestresult == BuildDepResult.OK: 

1540 # we found an architecture where the b-deps-indep are 

1541 # satisfied in the target suite, so we can stop 

1542 break 

1543 

1544 if any_arch_ok: 

1545 arch = result_archs[bestresult][0] 

1546 excuse.add_detailed_info( 

1547 "Checking %s on %s" % (dep_type.get_description(), arch) 

1548 ) 

1549 key = "check-%s-on-arch" % dep_type.get_reason() 

1550 build_deps_info[key] = arch 

1551 verdict = self._add_info_for_arch( 

1552 arch, 

1553 excuses_info, 

1554 blockers, 

1555 arch_results, 

1556 dep_type, 

1557 target_suite, 

1558 source_suite, 

1559 excuse, 

1560 verdict, 

1561 ) 

1562 

1563 else: 

1564 for arch in check_archs: 

1565 verdict = self._add_info_for_arch( 

1566 arch, 

1567 excuses_info, 

1568 blockers, 

1569 arch_results, 

1570 dep_type, 

1571 target_suite, 

1572 source_suite, 

1573 excuse, 

1574 verdict, 

1575 ) 

1576 

1577 if unsat_bd: 

1578 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd 

1579 

1580 return verdict 

1581 

1582 

1583class BuiltUsingPolicy(AbstractBasePolicy): 

1584 """Built-Using policy 

1585 

1586 Binaries that incorporate (part of) another source package must list these 

1587 sources under 'Built-Using'. 

1588 

1589 This policy checks if the corresponding sources are available in the 

1590 target suite. If they are not, but they are candidates for migration, a 

1591 dependency is added. 

1592 

1593 If the binary incorporates a newer version of a source, that is not (yet) 

1594 a candidate, we don't want to accept that binary. A rebuild later in the 

1595 primary suite wouldn't fix the issue, because that would incorporate the 

1596 newer version again. 

1597 

1598 If the binary incorporates an older version of the source, a newer version 

1599 will be accepted as a replacement. We assume that this can be fixed by 

1600 rebuilding the binary at some point during the development cycle. 

1601 

1602 Requiring exact version of the source would not be useful in practice. A 

1603 newer upload of that source wouldn't be blocked by this policy, so the 

1604 built-using would be outdated anyway. 

1605 

1606 """ 

1607 

1608 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1609 super().__init__( 

1610 "built-using", 

1611 options, 

1612 suite_info, 

1613 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1614 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1615 ) 

1616 

1617 def initialise(self, britney: "Britney") -> None: 

1618 super().initialise(britney) 

1619 

1620 def apply_srcarch_policy_impl( 

1621 self, 

1622 build_deps_info: dict[str, Any], 

1623 item: MigrationItem, 

1624 arch: str, 

1625 source_data_tdist: Optional[SourcePackage], 

1626 source_data_srcdist: SourcePackage, 

1627 excuse: "Excuse", 

1628 ) -> PolicyVerdict: 

1629 verdict = PolicyVerdict.PASS 

1630 

1631 source_suite = item.suite 

1632 target_suite = self.suite_info.target_suite 

1633 binaries_s = source_suite.binaries 

1634 

1635 def check_bu_in_suite( 

1636 bu_source: str, bu_version: str, source_suite: Suite 

1637 ) -> bool: 

1638 found = False 

1639 if bu_source not in source_suite.sources: 

1640 return found 

1641 s_source = source_suite.sources[bu_source] 

1642 s_ver = s_source.version 

1643 if apt_pkg.version_compare(s_ver, bu_version) >= 0: 

1644 found = True 

1645 dep = PackageId(bu_source, s_ver, "source") 

1646 if arch in self.options.break_arches: 

1647 excuse.add_detailed_info( 

1648 "Ignoring Built-Using for %s/%s on %s" 

1649 % (pkg_name, arch, dep.uvname) 

1650 ) 

1651 else: 

1652 spec = DependencySpec(DependencyType.BUILT_USING, arch) 

1653 excuse.add_package_depends(spec, {dep}) 

1654 excuse.add_detailed_info( 

1655 "%s/%s has Built-Using on %s" % (pkg_name, arch, dep.uvname) 

1656 ) 

1657 

1658 return found 

1659 

1660 for pkg_id in sorted( 

1661 x for x in source_data_srcdist.binaries if x.architecture == arch 

1662 ): 

1663 pkg_name = pkg_id.package_name 

1664 

1665 # retrieve the testing (if present) and unstable corresponding binary packages 

1666 binary_s = binaries_s[arch][pkg_name] 

1667 

1668 for bu in binary_s.builtusing: 

1669 bu_source = bu[0] 

1670 bu_version = bu[1] 

1671 found = False 

1672 if bu_source in target_suite.sources: 

1673 t_source = target_suite.sources[bu_source] 

1674 t_ver = t_source.version 

1675 if apt_pkg.version_compare(t_ver, bu_version) >= 0: 

1676 found = True 

1677 

1678 if not found: 

1679 found = check_bu_in_suite(bu_source, bu_version, source_suite) 

1680 

1681 if not found and source_suite.suite_class.is_additional_source: 

1682 found = check_bu_in_suite( 

1683 bu_source, bu_version, self.suite_info.primary_source_suite 

1684 ) 

1685 

1686 if not found: 

1687 if arch in self.options.break_arches: 

1688 excuse.add_detailed_info( 

1689 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s" 

1690 % (pkg_name, arch, bu_source, bu_version) 

1691 ) 

1692 else: 

1693 verdict = PolicyVerdict.worst_of( 

1694 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1695 ) 

1696 excuse.add_verdict_info( 

1697 verdict, 

1698 "%s/%s has unsatisfiable Built-Using on %s %s" 

1699 % (pkg_name, arch, bu_source, bu_version), 

1700 ) 

1701 

1702 return verdict 

1703 

1704 

1705class BlockPolicy(AbstractBasePolicy): 

1706 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$") 

1707 

1708 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1709 super().__init__( 

1710 "block", 

1711 options, 

1712 suite_info, 

1713 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1714 ) 

1715 self._blockall: dict[Optional[str], Hint] = {} 

1716 

1717 def initialise(self, britney: "Britney") -> None: 

1718 super().initialise(britney) 

1719 assert self.hints is not None 

1720 for hint in self.hints.search(type="block-all"): 

1721 self._blockall[hint.package] = hint 

1722 

1723 self._key_packages = [] 

1724 if "key" in self._blockall: 

1725 self._key_packages = self._read_key_packages() 

1726 

1727 def _read_key_packages(self) -> list[str]: 

1728 """Read the list of key packages 

1729 

1730 The file contains data in the yaml format : 

1731 

1732 - reason: <something> 

1733 source: <package> 

1734 

1735 The method returns a list of all key packages. 

1736 """ 

1737 filename = os.path.join(self.state_dir, "key_packages.yaml") 

1738 self.logger.info("Loading key packages from %s", filename) 

1739 if os.path.exists(filename): 1739 ↛ 1744line 1739 didn't jump to line 1744, because the condition on line 1739 was never false

1740 with open(filename) as f: 

1741 data = yaml.safe_load(f) 

1742 key_packages = [item["source"] for item in data] 

1743 else: 

1744 self.logger.error( 

1745 "Britney was asked to block key packages, " 

1746 + "but no key_packages.yaml file was found." 

1747 ) 

1748 sys.exit(1) 

1749 

1750 return key_packages 

1751 

1752 def register_hints(self, hint_parser: HintParser) -> None: 

1753 # block related hints are currently defined in hint.py 

1754 pass 

1755 

1756 def _check_blocked( 

1757 self, item: "MigrationItem", arch: str, version: str, excuse: "Excuse" 

1758 ) -> PolicyVerdict: 

1759 verdict = PolicyVerdict.PASS 

1760 blocked = {} 

1761 unblocked = {} 

1762 block_info = {} 

1763 source_suite = item.suite 

1764 suite_name = source_suite.name 

1765 src = item.package 

1766 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE 

1767 

1768 tooltip = ( 

1769 "please contact %s-release if update is needed" % self.options.distribution 

1770 ) 

1771 

1772 assert self.hints is not None 

1773 shints = self.hints.search(package=src) 

1774 mismatches = False 

1775 r = self.BLOCK_HINT_REGEX 

1776 for hint in shints: 

1777 m = r.match(hint.type) 

1778 if m: 

1779 if m.group(1) == "un": 

1780 assert hint.suite is not None 

1781 if ( 

1782 hint.version != version 

1783 or hint.suite.name != suite_name 

1784 or (hint.architecture != arch and hint.architecture != "source") 

1785 ): 

1786 self.logger.info( 

1787 "hint mismatch: %s %s %s", version, arch, suite_name 

1788 ) 

1789 mismatches = True 

1790 else: 

1791 unblocked[m.group(2)] = hint.user 

1792 excuse.add_hint(hint) 

1793 else: 

1794 # block(-*) hint: only accepts a source, so this will 

1795 # always match 

1796 blocked[m.group(2)] = hint.user 

1797 excuse.add_hint(hint) 

1798 

1799 if "block" not in blocked and is_primary: 

1800 # if there is a specific block hint for this package, we don't 

1801 # check for the general hints 

1802 

1803 if self.options.distribution == "debian": 1803 ↛ 1810line 1803 didn't jump to line 1810, because the condition on line 1803 was never false

1804 url = "https://release.debian.org/testing/freeze_policy.html" 

1805 tooltip = ( 

1806 'Follow the <a href="%s">freeze policy</a> when applying for an unblock' 

1807 % url 

1808 ) 

1809 

1810 if "source" in self._blockall: 

1811 blocked["block"] = self._blockall["source"].user 

1812 excuse.add_hint(self._blockall["source"]) 

1813 elif ( 

1814 "new-source" in self._blockall 

1815 and src not in self.suite_info.target_suite.sources 

1816 ): 

1817 blocked["block"] = self._blockall["new-source"].user 

1818 excuse.add_hint(self._blockall["new-source"]) 

1819 # no tooltip: new sources will probably not be accepted anyway 

1820 block_info["block"] = "blocked by %s: is not in %s" % ( 

1821 self._blockall["new-source"].user, 

1822 self.suite_info.target_suite.name, 

1823 ) 

1824 elif "key" in self._blockall and src in self._key_packages: 

1825 blocked["block"] = self._blockall["key"].user 

1826 excuse.add_hint(self._blockall["key"]) 

1827 block_info["block"] = "blocked by %s: is a key package (%s)" % ( 

1828 self._blockall["key"].user, 

1829 tooltip, 

1830 ) 

1831 elif "no-autopkgtest" in self._blockall: 

1832 if excuse.autopkgtest_results == {"PASS"}: 

1833 if not blocked: 1833 ↛ 1859line 1833 didn't jump to line 1859, because the condition on line 1833 was never false

1834 excuse.addinfo("not blocked: has successful autopkgtest") 

1835 else: 

1836 blocked["block"] = self._blockall["no-autopkgtest"].user 

1837 excuse.add_hint(self._blockall["no-autopkgtest"]) 

1838 if not excuse.autopkgtest_results: 

1839 block_info["block"] = ( 

1840 "blocked by %s: does not have autopkgtest (%s)" 

1841 % ( 

1842 self._blockall["no-autopkgtest"].user, 

1843 tooltip, 

1844 ) 

1845 ) 

1846 else: 

1847 block_info["block"] = ( 

1848 "blocked by %s: autopkgtest not fully successful (%s)" 

1849 % ( 

1850 self._blockall["no-autopkgtest"].user, 

1851 tooltip, 

1852 ) 

1853 ) 

1854 

1855 elif not is_primary: 

1856 blocked["block"] = suite_name 

1857 excuse.needs_approval = True 

1858 

1859 for block_cmd in blocked: 

1860 unblock_cmd = "un" + block_cmd 

1861 if block_cmd in unblocked: 

1862 if is_primary or block_cmd == "block-udeb": 

1863 excuse.addinfo( 

1864 "Ignoring %s request by %s, due to %s request by %s" 

1865 % ( 

1866 block_cmd, 

1867 blocked[block_cmd], 

1868 unblock_cmd, 

1869 unblocked[block_cmd], 

1870 ) 

1871 ) 

1872 else: 

1873 excuse.addinfo("Approved by %s" % (unblocked[block_cmd])) 

1874 else: 

1875 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL 

1876 if is_primary or block_cmd == "block-udeb": 

1877 # redirect people to d-i RM for udeb things: 

1878 if block_cmd == "block-udeb": 

1879 tooltip = "please contact the d-i release manager if an update is needed" 

1880 if block_cmd in block_info: 

1881 info = block_info[block_cmd] 

1882 else: 

1883 info = "Not touching package due to %s request by %s (%s)" % ( 

1884 block_cmd, 

1885 blocked[block_cmd], 

1886 tooltip, 

1887 ) 

1888 excuse.add_verdict_info(verdict, info) 

1889 else: 

1890 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM") 

1891 excuse.addreason("block") 

1892 if mismatches: 

1893 excuse.add_detailed_info( 

1894 "Some hints for %s do not match this item" % src 

1895 ) 

1896 return verdict 

1897 

1898 def apply_src_policy_impl( 

1899 self, 

1900 block_info: dict[str, Any], 

1901 item: MigrationItem, 

1902 source_data_tdist: Optional[SourcePackage], 

1903 source_data_srcdist: SourcePackage, 

1904 excuse: "Excuse", 

1905 ) -> PolicyVerdict: 

1906 return self._check_blocked(item, "source", source_data_srcdist.version, excuse) 

1907 

1908 def apply_srcarch_policy_impl( 

1909 self, 

1910 block_info: dict[str, Any], 

1911 item: MigrationItem, 

1912 arch: str, 

1913 source_data_tdist: Optional[SourcePackage], 

1914 source_data_srcdist: SourcePackage, 

1915 excuse: "Excuse", 

1916 ) -> PolicyVerdict: 

1917 return self._check_blocked(item, arch, source_data_srcdist.version, excuse) 

1918 

1919 

1920class BuiltOnBuilddPolicy(AbstractBasePolicy): 

1921 

1922 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1923 super().__init__( 

1924 "builtonbuildd", 

1925 options, 

1926 suite_info, 

1927 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1928 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1929 ) 

1930 self._builtonbuildd: dict[str, Any] = { 

1931 "signerinfo": None, 

1932 } 

1933 

1934 def register_hints(self, hint_parser: HintParser) -> None: 

1935 hint_parser.register_hint_type( 

1936 "allow-archall-maintainer-upload", split_into_one_hint_per_package 

1937 ) 

1938 

1939 def initialise(self, britney: "Britney") -> None: 

1940 super().initialise(britney) 

1941 try: 

1942 filename_signerinfo = os.path.join(self.state_dir, "signers.json") 

1943 except AttributeError as e: # pragma: no cover 

1944 raise RuntimeError( 

1945 "Please set STATE_DIR in the britney configuration" 

1946 ) from e 

1947 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo) 

1948 

1949 def apply_srcarch_policy_impl( 

1950 self, 

1951 buildd_info: dict[str, Any], 

1952 item: MigrationItem, 

1953 arch: str, 

1954 source_data_tdist: Optional[SourcePackage], 

1955 source_data_srcdist: SourcePackage, 

1956 excuse: "Excuse", 

1957 ) -> PolicyVerdict: 

1958 verdict = PolicyVerdict.PASS 

1959 signers = self._builtonbuildd["signerinfo"] 

1960 

1961 if "signed-by" not in buildd_info: 

1962 buildd_info["signed-by"] = {} 

1963 

1964 source_suite = item.suite 

1965 

1966 # horribe hard-coding, but currently, we don't keep track of the 

1967 # component when loading the packages files 

1968 component = "main" 

1969 # we use the source component, because a binary in contrib can 

1970 # belong to a source in main 

1971 section = source_data_srcdist.section 

1972 if section.find("/") > -1: 

1973 component = section.split("/")[0] 

1974 

1975 packages_s_a = source_suite.binaries[arch] 

1976 assert self.hints is not None 

1977 

1978 for pkg_id in sorted( 

1979 x for x in source_data_srcdist.binaries if x.architecture == arch 

1980 ): 

1981 pkg_name = pkg_id.package_name 

1982 binary_u = packages_s_a[pkg_name] 

1983 pkg_arch = binary_u.architecture 

1984 

1985 if binary_u.source_version != source_data_srcdist.version: 1985 ↛ 1986line 1985 didn't jump to line 1986, because the condition on line 1985 was never true

1986 continue 

1987 

1988 if item.architecture != "source" and pkg_arch == "all": 

1989 # we don't care about the existing arch: all binaries when 

1990 # checking a binNMU item, because the arch: all binaries won't 

1991 # migrate anyway 

1992 continue 

1993 

1994 signer = None 

1995 uid = None 

1996 uidinfo = "" 

1997 buildd_ok = False 

1998 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

1999 try: 

2000 signer = signers[pkg_name][pkg_id.version][pkg_arch] 

2001 if signer["buildd"]: 

2002 buildd_ok = True 

2003 uid = signer["uid"] 

2004 uidinfo = "arch %s binaries uploaded by %s" % (pkg_arch, uid) 

2005 except KeyError: 

2006 self.logger.info( 

2007 "signer info for %s %s (%s) on %s not found " 

2008 % (pkg_name, binary_u.version, pkg_arch, arch) 

2009 ) 

2010 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch) 

2011 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

2012 if not buildd_ok: 

2013 if component != "main": 

2014 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2014 ↛ 2018line 2014 didn't jump to line 2018, because the condition on line 2014 was never false

2015 excuse.add_detailed_info( 

2016 "%s, but package in %s" % (uidinfo, component) 

2017 ) 

2018 buildd_ok = True 

2019 elif pkg_arch == "all": 

2020 allow_hints = self.hints.search( 

2021 "allow-archall-maintainer-upload", package=item.package 

2022 ) 

2023 if allow_hints: 

2024 buildd_ok = True 

2025 verdict = PolicyVerdict.worst_of( 

2026 verdict, PolicyVerdict.PASS_HINTED 

2027 ) 

2028 if pkg_arch not in buildd_info["signed-by"]: 

2029 excuse.addinfo( 

2030 "%s, but whitelisted by %s" 

2031 % (uidinfo, allow_hints[0].user) 

2032 ) 

2033 if not buildd_ok: 

2034 verdict = failure_verdict 

2035 if pkg_arch not in buildd_info["signed-by"]: 

2036 if pkg_arch == "all": 

2037 uidinfo += ( 

2038 ", a new source-only upload is needed to allow migration" 

2039 ) 

2040 excuse.add_verdict_info( 

2041 verdict, "Not built on buildd: %s" % (uidinfo) 

2042 ) 

2043 

2044 if ( 2044 ↛ 2048line 2044 didn't jump to line 2048

2045 pkg_arch in buildd_info["signed-by"] 

2046 and buildd_info["signed-by"][pkg_arch] != uid 

2047 ): 

2048 self.logger.info( 

2049 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed" 

2050 % ( 

2051 pkg_name, 

2052 binary_u.source, 

2053 binary_u.source_version, 

2054 pkg_arch, 

2055 uid, 

2056 buildd_info["signed-by"][pkg_arch], 

2057 ) 

2058 ) 

2059 

2060 buildd_info["signed-by"][pkg_arch] = uid 

2061 

2062 return verdict 

2063 

2064 def _read_signerinfo(self, filename: str) -> dict[str, Any]: 

2065 signerinfo: dict[str, Any] = {} 

2066 self.logger.info("Loading signer info from %s", filename) 

2067 with open(filename) as fd: 2067 ↛ exitline 2067 didn't return from function '_read_signerinfo', because the return on line 2069 wasn't executed

2068 if os.fstat(fd.fileno()).st_size < 1: 2068 ↛ 2069line 2068 didn't jump to line 2069, because the condition on line 2068 was never true

2069 return signerinfo 

2070 signerinfo = json.load(fd) 

2071 

2072 return signerinfo 

2073 

2074 

2075class ImplicitDependencyPolicy(AbstractBasePolicy): 

2076 """Implicit Dependency policy 

2077 

2078 Upgrading a package pkg-a can break the installability of a package pkg-b. 

2079 A newer version (or the removal) of pkg-b might fix the issue. In that 

2080 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only 

2081 migrate if pkg-b also migrates. 

2082 

2083 This policy tries to discover a few common cases, and adds the relevant 

2084 info to the excuses. If another item is needed to fix the 

2085 uninstallability, a dependency is added. If no newer item can fix it, this 

2086 excuse will be blocked. 

2087 

2088 Note that the migration step will check the installability of every 

2089 package, so this policy doesn't need to handle every corner case. It 

2090 must, however, make sure that no excuse is unnecessarily blocked. 

2091 

2092 Some cases that should be detected by this policy: 

2093 

2094 * pkg-a is upgraded from 1.0-1 to 2.0-1, while 

2095 pkg-b has "Depends: pkg-a (<< 2.0)" 

2096 This typically happens if pkg-b has a strict dependency on pkg-a because 

2097 it uses some non-stable internal interface (examples are glibc, 

2098 binutils, python3-defaults, ...) 

2099 

2100 * pkg-a is upgraded from 1.0-1 to 2.0-1, and 

2101 pkg-a 1.0-1 has "Provides: provides-1", 

2102 pkg-a 2.0-1 has "Provides: provides-2", 

2103 pkg-b has "Depends: provides-1" 

2104 This typically happens when pkg-a has an interface that changes between 

2105 versions, and a virtual package is used to identify the version of this 

2106 interface (e.g. perl-api-x.y) 

2107 

2108 """ 

2109 

2110 _pkg_universe: "BinaryPackageUniverse" 

2111 _all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

2112 _allow_uninst: dict[str, set[Optional[str]]] 

2113 _nobreakall_arches: list[str] 

2114 

2115 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2116 super().__init__( 

2117 "implicit-deps", 

2118 options, 

2119 suite_info, 

2120 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2121 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2122 ) 

2123 

2124 def initialise(self, britney: "Britney") -> None: 

2125 super().initialise(britney) 

2126 self._pkg_universe = britney.pkg_universe 

2127 self._all_binaries = britney.all_binaries 

2128 self._smooth_updates = britney.options.smooth_updates 

2129 self._nobreakall_arches = self.options.nobreakall_arches 

2130 self._new_arches = self.options.new_arches 

2131 self._break_arches = self.options.break_arches 

2132 self._allow_uninst = britney.allow_uninst 

2133 self._outofsync_arches = self.options.outofsync_arches 

2134 

2135 def can_be_removed(self, pkg: BinaryPackage) -> bool: 

2136 src = pkg.source 

2137 target_suite = self.suite_info.target_suite 

2138 

2139 # TODO these conditions shouldn't be hardcoded here 

2140 # ideally, we would be able to look up excuses to see if the removal 

2141 # is in there, but in the current flow, this policy is called before 

2142 # all possible excuses exist, so there is no list for us to check 

2143 

2144 if src not in self.suite_info.primary_source_suite.sources: 

2145 # source for pkg not in unstable: candidate for removal 

2146 return True 

2147 

2148 source_t = target_suite.sources[src] 

2149 assert self.hints is not None 

2150 for hint in self.hints.search("remove", package=src, version=source_t.version): 

2151 # removal hint for the source in testing: candidate for removal 

2152 return True 

2153 

2154 if target_suite.is_cruft(pkg): 

2155 # if pkg is cruft in testing, removal will be tried 

2156 return True 

2157 

2158 # the case were the newer version of the source no longer includes the 

2159 # binary (or includes a cruft version of the binary) will be handled 

2160 # separately (in that case there might be an implicit dependency on 

2161 # the newer source) 

2162 

2163 return False 

2164 

2165 def should_skip_rdep( 

2166 self, pkg: BinaryPackage, source_name: str, myarch: str 

2167 ) -> bool: 

2168 target_suite = self.suite_info.target_suite 

2169 

2170 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id): 

2171 # it is not in the target suite, migration cannot break anything 

2172 return True 

2173 

2174 if pkg.source == source_name: 

2175 # if it is built from the same source, it will be upgraded 

2176 # with the source 

2177 return True 

2178 

2179 if self.can_be_removed(pkg): 

2180 # could potentially be removed, so if that happens, it won't be 

2181 # broken 

2182 return True 

2183 

2184 if pkg.architecture == "all" and myarch not in self._nobreakall_arches: 

2185 # arch all on non nobreakarch is allowed to become uninstallable 

2186 return True 

2187 

2188 if pkg.pkg_id.package_name in self._allow_uninst[myarch]: 

2189 # there is a hint to allow this binary to become uninstallable 

2190 return True 

2191 

2192 if not target_suite.is_installable(pkg.pkg_id): 

2193 # it is already uninstallable in the target suite, migration 

2194 # cannot break anything 

2195 return True 

2196 

2197 return False 

2198 

2199 def breaks_installability( 

2200 self, 

2201 pkg_id_t: BinaryPackageId, 

2202 pkg_id_s: Optional[BinaryPackageId], 

2203 pkg_to_check: BinaryPackageId, 

2204 ) -> bool: 

2205 """ 

2206 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of 

2207 pkg_to_check. 

2208 

2209 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to 

2210 None. 

2211 """ 

2212 

2213 pkg_universe = self._pkg_universe 

2214 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check) 

2215 

2216 for dep in pkg_universe.dependencies_of(pkg_to_check): 

2217 if pkg_id_t not in dep: 

2218 # this depends doesn't have pkg_id_t as alternative, so 

2219 # upgrading pkg_id_t cannot break this dependency clause 

2220 continue 

2221 

2222 # We check all the alternatives for this dependency, to find one 

2223 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s 

2224 found_alternative = False 

2225 for d in dep: 

2226 if d in negative_deps: 

2227 # If this alternative dependency conflicts with 

2228 # pkg_to_check, it cannot be used to satisfy the 

2229 # dependency. 

2230 # This commonly happens when breaks are added to pkg_id_s. 

2231 continue 

2232 

2233 if d.package_name != pkg_id_t.package_name: 

2234 # a binary different from pkg_id_t can satisfy the dep, so 

2235 # upgrading pkg_id_t won't break this dependency 

2236 found_alternative = True 

2237 break 

2238 

2239 if d != pkg_id_s: 

2240 # We want to know the impact of the upgrade of 

2241 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the 

2242 # target suite, any other version of this binary will 

2243 # not be there, so it cannot satisfy this dependency. 

2244 # This includes pkg_id_t, but also other versions. 

2245 continue 

2246 

2247 # pkg_id_s can satisfy the dep 

2248 found_alternative = True 

2249 

2250 if not found_alternative: 

2251 return True 

2252 return False 

2253 

2254 def check_upgrade( 

2255 self, 

2256 pkg_id_t: BinaryPackageId, 

2257 pkg_id_s: Optional[BinaryPackageId], 

2258 source_name: str, 

2259 myarch: str, 

2260 broken_binaries: set[str], 

2261 excuse: "Excuse", 

2262 ) -> PolicyVerdict: 

2263 verdict = PolicyVerdict.PASS 

2264 

2265 pkg_universe = self._pkg_universe 

2266 all_binaries = self._all_binaries 

2267 

2268 # check all rdeps of the package in testing 

2269 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t) 

2270 

2271 for rdep_pkg in sorted(rdeps_t): 

2272 rdep_p = all_binaries[rdep_pkg] 

2273 

2274 # check some cases where the rdep won't become uninstallable, or 

2275 # where we don't care if it does 

2276 if self.should_skip_rdep(rdep_p, source_name, myarch): 

2277 continue 

2278 

2279 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg): 

2280 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg, 

2281 # there is no implicit dependency 

2282 continue 

2283 

2284 # The upgrade breaks the installability of the rdep. We need to 

2285 # find out if there is a newer version of the rdep that solves the 

2286 # uninstallability. If that is the case, there is an implicit 

2287 # dependency. If not, the upgrade will fail. 

2288 

2289 # check source versions 

2290 newer_versions = find_newer_binaries( 

2291 self.suite_info, rdep_p, add_source_for_dropped_bin=True 

2292 ) 

2293 good_newer_versions = set() 

2294 for npkg, suite in newer_versions: 

2295 if npkg.architecture == "source": 

2296 # When a newer version of the source package doesn't have 

2297 # the binary, we get the source as 'newer version'. In 

2298 # this case, the binary will not be uninstallable if the 

2299 # newer source migrates, because it is no longer there. 

2300 good_newer_versions.add(npkg) 

2301 continue 

2302 assert isinstance(npkg, BinaryPackageId) 

2303 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg): 

2304 good_newer_versions.add(npkg) 

2305 

2306 if good_newer_versions: 

2307 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch) 

2308 excuse.add_package_depends(spec, good_newer_versions) 

2309 else: 

2310 # no good newer versions: no possible solution 

2311 broken_binaries.add(rdep_pkg.name) 

2312 if pkg_id_s: 

2313 action = "migrating %s to %s" % ( 

2314 pkg_id_s.name, 

2315 self.suite_info.target_suite.name, 

2316 ) 

2317 else: 

2318 action = "removing %s from %s" % ( 

2319 pkg_id_t.name, 

2320 self.suite_info.target_suite.name, 

2321 ) 

2322 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format( 

2323 action, rdep_pkg.name 

2324 ) 

2325 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2326 excuse.add_verdict_info(verdict, info) 

2327 

2328 return verdict 

2329 

2330 def apply_srcarch_policy_impl( 

2331 self, 

2332 implicit_dep_info: dict[str, Any], 

2333 item: MigrationItem, 

2334 arch: str, 

2335 source_data_tdist: Optional[SourcePackage], 

2336 source_data_srcdist: SourcePackage, 

2337 excuse: "Excuse", 

2338 ) -> PolicyVerdict: 

2339 verdict = PolicyVerdict.PASS 

2340 

2341 if not source_data_tdist: 

2342 # this item is not currently in testing: no implicit dependency 

2343 return verdict 

2344 

2345 if excuse.hasreason("missingbuild"): 

2346 # if the build is missing, the policy would treat this as if the 

2347 # binaries would be removed, which would give incorrect (and 

2348 # confusing) info 

2349 info = "missing build, not checking implicit dependencies on %s" % (arch) 

2350 excuse.add_detailed_info(info) 

2351 return verdict 

2352 

2353 source_suite = item.suite 

2354 source_name = item.package 

2355 target_suite = self.suite_info.target_suite 

2356 all_binaries = self._all_binaries 

2357 

2358 # we check all binaries for this excuse that are currently in testing 

2359 relevant_binaries = [ 

2360 x 

2361 for x in source_data_tdist.binaries 

2362 if (arch == "source" or x.architecture == arch) 

2363 and x.package_name in target_suite.binaries[x.architecture] 

2364 and x.architecture not in self._new_arches 

2365 and x.architecture not in self._break_arches 

2366 and x.architecture not in self._outofsync_arches 

2367 ] 

2368 

2369 broken_binaries: set[str] = set() 

2370 

2371 assert self.hints is not None 

2372 for pkg_id_t in sorted(relevant_binaries): 

2373 mypkg = pkg_id_t.package_name 

2374 myarch = pkg_id_t.architecture 

2375 binaries_t_a = target_suite.binaries[myarch] 

2376 binaries_s_a = source_suite.binaries[myarch] 

2377 

2378 if target_suite.is_cruft(all_binaries[pkg_id_t]): 

2379 # this binary is cruft in testing: it will stay around as long 

2380 # as necessary to satisfy dependencies, so we don't need to 

2381 # care 

2382 continue 

2383 

2384 if mypkg in binaries_s_a: 

2385 mybin = binaries_s_a[mypkg] 

2386 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id 

2387 if mybin.source != source_name: 

2388 # hijack: this is too complicated to check, so we ignore 

2389 # it (the migration code will check the installability 

2390 # later anyway) 

2391 pass 

2392 elif mybin.source_version != source_data_srcdist.version: 

2393 # cruft in source suite: pretend the binary doesn't exist 

2394 pkg_id_s = None 

2395 elif pkg_id_t == pkg_id_s: 

2396 # same binary (probably arch: all from a binNMU): 

2397 # 'upgrading' doesn't change anything, for this binary, so 

2398 # it won't break anything 

2399 continue 

2400 else: 

2401 pkg_id_s = None 

2402 

2403 if not pkg_id_s and is_smooth_update_allowed( 

2404 binaries_t_a[mypkg], self._smooth_updates, self.hints 

2405 ): 

2406 # the binary isn't in the new version (or is cruft there), and 

2407 # smooth updates are allowed: the binary can stay around if 

2408 # that is necessary to satisfy dependencies, so we don't need 

2409 # to check it 

2410 continue 

2411 

2412 if ( 

2413 not pkg_id_s 

2414 and source_data_tdist.version == source_data_srcdist.version 

2415 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE 

2416 and binaries_t_a[mypkg].architecture == "all" 

2417 ): 

2418 # we're very probably migrating a binNMU built in tpu where the arch:all 

2419 # binaries were not copied to it as that's not needed. This policy could 

2420 # needlessly block. 

2421 continue 

2422 

2423 v = self.check_upgrade( 

2424 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse 

2425 ) 

2426 verdict = PolicyVerdict.worst_of(verdict, v) 

2427 

2428 # each arch is processed separately, so if we already have info from 

2429 # other archs, we need to merge the info from this arch 

2430 broken_old = set() 

2431 if "implicit-deps" not in implicit_dep_info: 

2432 implicit_dep_info["implicit-deps"] = {} 

2433 else: 

2434 broken_old = set(implicit_dep_info["implicit-deps"]["broken-binaries"]) 

2435 

2436 implicit_dep_info["implicit-deps"]["broken-binaries"] = sorted( 

2437 broken_old | broken_binaries 

2438 ) 

2439 

2440 return verdict 

2441 

2442 

2443class ReverseRemovalPolicy(AbstractBasePolicy): 

2444 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2445 super().__init__( 

2446 "reverseremoval", 

2447 options, 

2448 suite_info, 

2449 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2450 ) 

2451 

2452 def register_hints(self, hint_parser: HintParser) -> None: 

2453 hint_parser.register_hint_type( 

2454 "ignore-reverse-remove", split_into_one_hint_per_package 

2455 ) 

2456 

2457 def initialise(self, britney: "Britney") -> None: 

2458 super().initialise(britney) 

2459 

2460 pkg_universe = britney.pkg_universe 

2461 source_suites = britney.suite_info.source_suites 

2462 target_suite = britney.suite_info.target_suite 

2463 

2464 # Build set of the sources of reverse (Build-) Depends 

2465 assert self.hints is not None 

2466 hints = self.hints.search("remove") 

2467 

2468 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set) 

2469 for hint in hints: 

2470 for item in hint.packages: 

2471 # I think we don't need to look at the target suite 

2472 for src_suite in source_suites: 

2473 try: 

2474 my_bins = set(src_suite.sources[item.uvname].binaries) 

2475 except KeyError: 

2476 continue 

2477 compute_reverse_tree(pkg_universe, my_bins) 

2478 for this_bin in my_bins: 

2479 rev_bin.setdefault(this_bin, set()).add(item.uvname) 

2480 

2481 rev_src: dict[str, set[str]] = defaultdict(set) 

2482 for bin_pkg, reasons in rev_bin.items(): 

2483 # If the pkg is in the target suite, there's nothing this 

2484 # policy wants to do. 

2485 if target_suite.is_pkg_in_the_suite(bin_pkg): 

2486 continue 

2487 that_bin = britney.all_binaries[bin_pkg] 

2488 bin_src = that_bin.source + "/" + that_bin.source_version 

2489 rev_src.setdefault(bin_src, set()).update(reasons) 

2490 self._block_src_for_rm_hint = rev_src 

2491 

2492 def apply_src_policy_impl( 

2493 self, 

2494 rev_remove_info: dict[str, Any], 

2495 item: MigrationItem, 

2496 source_data_tdist: Optional[SourcePackage], 

2497 source_data_srcdist: SourcePackage, 

2498 excuse: "Excuse", 

2499 ) -> PolicyVerdict: 

2500 verdict = PolicyVerdict.PASS 

2501 

2502 if item.name in self._block_src_for_rm_hint: 

2503 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name])) 

2504 assert self.hints is not None 

2505 ignore_hints = self.hints.search( 

2506 "ignore-reverse-remove", package=item.uvname, version=item.version 

2507 ) 

2508 excuse.addreason("reverseremoval") 

2509 if ignore_hints: 

2510 excuse.addreason("ignore-reverse-remove") 

2511 excuse.addinfo( 

2512 "Should block migration because of remove hint for %s, but forced by %s" 

2513 % (reason, ignore_hints[0].user) 

2514 ) 

2515 verdict = PolicyVerdict.PASS_HINTED 

2516 else: 

2517 excuse.addinfo("Remove hint for (transitive) dependency: %s" % reason) 

2518 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2519 

2520 return verdict 

2521 

2522 

2523class ReproduciblePolicy(AbstractBasePolicy): 

2524 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2525 super().__init__( 

2526 "reproducible", 

2527 options, 

2528 suite_info, 

2529 {SuiteClass.PRIMARY_SOURCE_SUITE}, 

2530 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2531 ) 

2532 self._reproducible: dict[str, Any] = { 

2533 "source": {}, 

2534 "target": {}, 

2535 } 

2536 

2537 # Default values for this policy's options 

2538 parse_option(options, "repro_success_bounty", default=0, to_int=True) 

2539 parse_option(options, "repro_regression_penalty", default=0, to_int=True) 

2540 parse_option(options, "repro_url") 

2541 parse_option(options, "repro_retry_url") 

2542 parse_option(options, "repro_components") 

2543 

2544 def register_hints(self, hint_parser: HintParser) -> None: 

2545 hint_parser.register_hint_type( 

2546 "ignore-reproducible", split_into_one_hint_per_package 

2547 ) 

2548 

2549 def initialise(self, britney: "Britney") -> None: 

2550 super().initialise(britney) 

2551 source_suite = self.suite_info.primary_source_suite 

2552 target_suite = self.suite_info.target_suite 

2553 try: 

2554 filename = os.path.join(self.state_dir, "reproducible.json") 

2555 except AttributeError as e: # pragma: no cover 

2556 raise RuntimeError( 

2557 "Please set STATE_DIR in the britney configuration" 

2558 ) from e 

2559 

2560 self._reproducible = self._read_repro_status( 

2561 filename, 

2562 source=set((source_suite.name, source_suite.codename)), 

2563 target=set((target_suite.name, target_suite.codename)), 

2564 ) 

2565 

2566 def apply_srcarch_policy_impl( 

2567 self, 

2568 reproducible_info: dict[str, Any], 

2569 item: MigrationItem, 

2570 arch: str, 

2571 source_data_tdist: Optional[SourcePackage], 

2572 source_data_srcdist: SourcePackage, 

2573 excuse: "Excuse", 

2574 ) -> PolicyVerdict: 

2575 verdict = PolicyVerdict.PASS 

2576 

2577 # we don't want to apply this policy (yet) on binNMUs 

2578 if item.architecture != "source": 

2579 return verdict 

2580 

2581 # we're not supposed to judge on this arch 

2582 if arch not in self.options.repro_arches: 

2583 return verdict 

2584 

2585 # bail out if this arch has no packages for this source (not build 

2586 # here) 

2587 if arch not in excuse.packages: 

2588 return verdict 

2589 

2590 # horrible hard-coding, but currently, we don't keep track of the 

2591 # component when loading the packages files 

2592 component = "main" 

2593 section = source_data_srcdist.section 

2594 if "/" in section: 

2595 component = section.split("/")[0] 

2596 

2597 if ( 

2598 self.options.repro_components 

2599 and component not in self.options.repro_components 

2600 ): 

2601 return verdict 

2602 

2603 source_name = item.package 

2604 try: 

2605 tar_res = self._reproducible["target"][arch] 

2606 src_res = self._reproducible["source"][arch] 

2607 except KeyError: 

2608 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2609 msg = "No reproducible data available at all for %s" % arch 

2610 excuse.add_verdict_info(verdict, msg) 

2611 return verdict 

2612 

2613 if source_data_tdist is None: 

2614 target_suite_state = "new" 

2615 elif source_name not in tar_res: 

2616 target_suite_state = "unknown" 

2617 elif tar_res[source_name]["version"] == source_data_tdist.version: 

2618 target_suite_state = tar_res[source_name]["status"] 

2619 else: 

2620 target_suite_state = "stale" 

2621 

2622 if source_name in src_res and src_res[source_name]["version"] == item.version: 

2623 source_suite_state = src_res[source_name]["status"] 

2624 else: 

2625 source_suite_state = "unknown" 

2626 

2627 # status of ['E404', 'FTBFS', 'FTBR', 'NFU', 'blacklisted', 'depwait', 

2628 # 'reproducible', 'timeout'] with ['new', 'stale', 'unknown'] 

2629 wait_states = ("E404", "depwait", "stale", "timeout", "unknown") 

2630 no_build_states = ("FTBFS", "NFU", "blacklisted") 

2631 

2632 # if this package doesn't build on this architecture, we don't need to 

2633 # judge it 

2634 # FTBFS: Fails to build from source on r-b infra 

2635 # NFU: the package explicitly doesn't support building on this arch 

2636 # blacklisted: per package per arch per suite 

2637 if source_suite_state in no_build_states: 

2638 return verdict 

2639 # Assume depwait in the source suite only are intermittent (might not 

2640 # be true, e.g. with new build depends) 

2641 if source_suite_state == target_suite_state and target_suite_state == "depwait": 

2642 return verdict 

2643 

2644 if self.options.repro_url: 

2645 url = self.options.repro_url.format(package=quote(source_name), arch=arch) 

2646 url_html = ' - <a href="%s">info</a>' % url 

2647 if self.options.repro_retry_url: 

2648 url_html += ( 

2649 ' <a href="%s">♻ </a>' 

2650 % self.options.repro_retry_url.format( 

2651 package=quote(source_name), arch=arch 

2652 ) 

2653 ) 

2654 # When run on multiple archs, the last one "wins" 

2655 reproducible_info["reproducible-test-url"] = url 

2656 else: 

2657 url = None 

2658 url_html = "" 

2659 

2660 eligible_for_bounty = False 

2661 if source_suite_state == "reproducible": 

2662 verdict = PolicyVerdict.PASS 

2663 msg = "Reproducible on %s%s" % (arch, url_html) 

2664 reproducible_info.setdefault("test-results", []).append( 

2665 "reproducible on %s" % arch 

2666 ) 

2667 eligible_for_bounty = True 

2668 elif source_suite_state == "FTBR": 

2669 if target_suite_state == "new": 

2670 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2671 msg = "New but not reproducible on %s%s" % (arch, url_html) 

2672 reproducible_info.setdefault("test-results", []).append( 

2673 "new but not reproducible on %s" % arch 

2674 ) 

2675 elif target_suite_state in wait_states: 

2676 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2677 msg = "Waiting for reproducibility reference results on %s%s" % ( 

2678 arch, 

2679 url_html, 

2680 ) 

2681 reproducible_info.setdefault("test-results", []).append( 

2682 "waiting-for-reference-results on %s" % arch 

2683 ) 

2684 elif target_suite_state == "reproducible": 

2685 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2686 msg = "Reproducibility regression on %s%s" % (arch, url_html) 

2687 reproducible_info.setdefault("test-results", []).append( 

2688 "regression on %s" % arch 

2689 ) 

2690 elif target_suite_state == "FTBR": 

2691 verdict = PolicyVerdict.PASS 

2692 msg = "Ignoring non-reproducibility on %s (not a regression)%s" % ( 

2693 arch, 

2694 url_html, 

2695 ) 

2696 reproducible_info.setdefault("test-results", []).append( 

2697 "not reproducible on %s" % arch 

2698 ) 

2699 else: 

2700 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2701 msg = "No reference result, but not reproducibility on %s%s" % ( 

2702 arch, 

2703 url_html, 

2704 ) 

2705 reproducible_info.setdefault("test-results", []).append( 

2706 "reference %s on %s" % (target_suite_state, arch) 

2707 ) 

2708 elif source_suite_state in wait_states: 

2709 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2710 msg = "Waiting for reproducibility test results on %s%s" % (arch, url_html) 

2711 reproducible_info.setdefault("test-results", []).append( 

2712 "waiting-for-test-results on %s" % arch 

2713 ) 

2714 else: 

2715 raise KeyError("Unhandled reproducibility state %s" % source_suite_state) 

2716 

2717 if verdict.is_rejected: 

2718 assert self.hints is not None 

2719 for hint_arch in ("source", arch): 

2720 for ignore_hint in self.hints.search( 

2721 "ignore-reproducible", 

2722 package=source_name, 

2723 version=source_data_srcdist.version, 

2724 architecture=hint_arch, 

2725 ): 

2726 verdict = PolicyVerdict.PASS_HINTED 

2727 reproducible_info.setdefault("ignored-reproducible", {}).setdefault( 

2728 arch, {} 

2729 ).setdefault("issued-by", []).append(ignore_hint.user) 

2730 excuse.addinfo( 

2731 "Ignoring reproducibility issue on %s as requested " 

2732 "by %s" % (arch, ignore_hint.user) 

2733 ) 

2734 break 

2735 

2736 if self.options.repro_success_bounty and eligible_for_bounty: 

2737 excuse.add_bounty("reproducibility", self.options.repro_success_bounty) 

2738 

2739 if self.options.repro_regression_penalty and verdict in { 

2740 PolicyVerdict.REJECTED_PERMANENTLY, 

2741 PolicyVerdict.REJECTED_TEMPORARILY, 

2742 }: 

2743 if self.options.repro_regression_penalty > 0: 

2744 excuse.add_penalty( 

2745 "reproducibility", self.options.repro_regression_penalty 

2746 ) 

2747 # In case we give penalties instead of blocking, we must always pass 

2748 verdict = PolicyVerdict.PASS 

2749 

2750 if verdict.is_rejected: 

2751 excuse.add_verdict_info(verdict, msg) 

2752 else: 

2753 excuse.addinfo(msg) 

2754 

2755 return verdict 

2756 

2757 def _read_repro_status( 

2758 self, filename: str, source: set[str], target: set[str] 

2759 ) -> dict[str, dict[str, str]]: 

2760 summary = self._reproducible 

2761 self.logger.info("Loading reproducibility report from %s", filename) 

2762 with open(filename) as fd: 

2763 if os.fstat(fd.fileno()).st_size < 1: 

2764 return summary 

2765 data = json.load(fd) 

2766 

2767 for result in data: 

2768 if result["suite"] in source: 

2769 summary["source"].setdefault(result["architecture"], {})[ 

2770 result["package"] 

2771 ] = result 

2772 if result["suite"] in target: 

2773 summary["target"].setdefault(result["architecture"], {})[ 

2774 result["package"] 

2775 ] = result 

2776 

2777 return summary