Coverage for britney2/policies/policy.py: 91%

1378 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-06-17 09:00 +0000

1import json 

2import logging 

3import optparse 

4import os 

5import re 

6import sys 

7import time 

8from abc import ABC, abstractmethod 

9from collections import defaultdict 

10from collections.abc import Callable, Container, Generator 

11from dataclasses import dataclass, field 

12from enum import Enum, IntEnum, StrEnum, auto, unique 

13from itertools import chain 

14from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast 

15from urllib.parse import quote 

16 

17import apt_pkg 

18import yaml 

19 

20from britney2 import ( 

21 BinaryPackage, 

22 BinaryPackageId, 

23 DependencyType, 

24 PackageId, 

25 SourcePackage, 

26 Suite, 

27 SuiteClass, 

28 Suites, 

29 TargetSuite, 

30) 

31from britney2.excusedeps import DependencySpec 

32from britney2.hints import ( 

33 Hint, 

34 HintAnnotate, 

35 HintCollection, 

36 HintParser, 

37 HintType, 

38 PolicyHintParserProto, 

39) 

40from britney2.inputs.suiteloader import SuiteContentLoader 

41from britney2.migrationitem import MigrationItem, MigrationItemFactory 

42from britney2.policies import ApplySrcPolicy, PolicyVerdict 

43from britney2.utils import ( 

44 GetDependencySolversProto, 

45 binaries_from_source_version, 

46 compute_reverse_tree, 

47 filter_out_faux, 

48 filter_out_faux_gen, 

49 find_newer_binaries, 

50 get_component, 

51 get_dependency_solvers, 

52 is_smooth_update_allowed, 

53 parse_option, 

54) 

55 

56if TYPE_CHECKING: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 from ..britney import Britney 

58 from ..excuse import Excuse 

59 from ..installability.universe import BinaryPackageUniverse 

60 

61 

62class PolicyLoadRequest: 

63 __slots__ = ("_options_name", "_default_value", "_policy_constructor") 

64 

65 def __init__( 

66 self, 

67 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

68 options_name: str | None, 

69 default_value: bool, 

70 ) -> None: 

71 self._policy_constructor = policy_constructor 

72 self._options_name = options_name 

73 self._default_value = default_value 

74 

75 def is_enabled(self, options: optparse.Values) -> bool: 

76 if self._options_name is None: 

77 assert self._default_value 

78 return True 

79 actual_value = getattr(options, self._options_name, None) 

80 if actual_value is None: 

81 return self._default_value 

82 return actual_value.lower() in ("yes", "y", "true", "t") 

83 

84 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy": 

85 return self._policy_constructor(options, suite_info) 

86 

87 @classmethod 

88 def always_load( 

89 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"] 

90 ) -> "PolicyLoadRequest": 

91 return cls(policy_constructor, None, True) 

92 

93 @classmethod 

94 def conditionally_load( 

95 cls, 

96 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

97 option_name: str, 

98 default_value: bool, 

99 ) -> "PolicyLoadRequest": 

100 return cls(policy_constructor, option_name, default_value) 

101 

102 

103class PolicyEngine: 

104 def __init__(self) -> None: 

105 self._policies: list["BasePolicy"] = [] 

106 

107 def add_policy(self, policy: "BasePolicy") -> None: 

108 self._policies.append(policy) 

109 

110 def load_policies( 

111 self, 

112 options: optparse.Values, 

113 suite_info: Suites, 

114 policy_load_requests: list[PolicyLoadRequest], 

115 ) -> None: 

116 for policy_load_request in policy_load_requests: 

117 if policy_load_request.is_enabled(options): 

118 self.add_policy(policy_load_request.load(options, suite_info)) 

119 

120 def register_policy_hints(self, hint_parser: HintParser) -> None: 

121 for policy in self._policies: 

122 policy.register_hints(hint_parser) 

123 

124 def initialise(self, britney: "Britney", hints: HintCollection) -> None: 

125 for policy in self._policies: 

126 policy.hints = hints 

127 policy.initialise(britney) 

128 

129 def save_state(self, britney: "Britney") -> None: 

130 for policy in self._policies: 

131 policy.save_state(britney) 

132 

133 def apply_src_policies( 

134 self, 

135 source_t: SourcePackage | None, 

136 source_u: SourcePackage, 

137 excuse: "Excuse", 

138 ) -> None: 

139 excuse_verdict = excuse.policy_verdict 

140 source_suite = excuse.item.suite 

141 suite_class = source_suite.suite_class 

142 for policy in self._policies: 

143 pinfo: dict[str, Any] = {} 

144 policy_verdict = PolicyVerdict.NOT_APPLICABLE 

145 if suite_class in policy.applicable_suites: 

146 if policy.src_policy.run_arch: 

147 for arch in policy.options.architectures: 

148 v = policy.apply_srcarch_policy_impl( 

149 pinfo, arch, source_t, source_u, excuse 

150 ) 

151 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

152 if policy.src_policy.run_src: 

153 v = policy.apply_src_policy_impl(pinfo, source_t, source_u, excuse) 

154 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

155 # The base policy provides this field, so the subclass should leave it blank 

156 assert "verdict" not in pinfo 

157 if policy_verdict is not PolicyVerdict.NOT_APPLICABLE: 

158 excuse.policy_info[policy.policy_id] = pinfo 

159 pinfo["verdict"] = policy_verdict.name 

160 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

161 excuse.policy_verdict = excuse_verdict 

162 

163 def apply_srcarch_policies( 

164 self, 

165 arch: str, 

166 source_t: SourcePackage | None, 

167 source_u: SourcePackage, 

168 excuse: "Excuse", 

169 ) -> None: 

170 excuse_verdict = excuse.policy_verdict 

171 source_suite = excuse.item.suite 

172 suite_class = source_suite.suite_class 

173 for policy in self._policies: 

174 pinfo: dict[str, Any] = {} 

175 if suite_class in policy.applicable_suites: 

176 policy_verdict = policy.apply_srcarch_policy_impl( 

177 pinfo, arch, source_t, source_u, excuse 

178 ) 

179 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

180 # The base policy provides this field, so the subclass should leave it blank 

181 assert "verdict" not in pinfo 

182 if policy_verdict is not PolicyVerdict.NOT_APPLICABLE: 

183 excuse.policy_info[policy.policy_id] = pinfo 

184 pinfo["verdict"] = policy_verdict.name 

185 excuse.policy_verdict = excuse_verdict 

186 

187 

188class BasePolicy(ABC): 

189 britney: "Britney" 

190 policy_id: str 

191 hints: HintCollection | None 

192 applicable_suites: set[SuiteClass] 

193 src_policy: ApplySrcPolicy 

194 options: optparse.Values 

195 suite_info: Suites 

196 

197 def __init__( 

198 self, 

199 options: optparse.Values, 

200 suite_info: Suites, 

201 ) -> None: 

202 """The BasePolicy constructor 

203 

204 :param options: The options member of Britney with all the 

205 config values. 

206 """ 

207 

208 @property 

209 @abstractmethod 

210 def state_dir(self) -> str: ... 210 ↛ exitline 210 didn't return from function 'state_dir' because

211 

212 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover 

213 """Register new hints that this policy accepts 

214 

215 :param hint_parser: (see HintParser.register_hint_type) 

216 """ 

217 

218 def initialise(self, britney: "Britney") -> None: # pragma: no cover 

219 """Called once to make the policy initialise any data structures 

220 

221 This is useful for e.g. parsing files or other "heavy do-once" work. 

222 

223 :param britney: This is the instance of the "Britney" class. 

224 """ 

225 self.britney = britney 

226 

227 def save_state(self, britney: "Britney") -> None: # pragma: no cover 

228 """Called once at the end of the run to make the policy save any persistent data 

229 

230 Note this will *not* be called for "dry-runs" as such runs should not change 

231 the state. 

232 

233 :param britney: This is the instance of the "Britney" class. 

234 """ 

235 

236 def apply_src_policy_impl( 

237 self, 

238 policy_info: dict[str, Any], 

239 source_data_tdist: SourcePackage | None, 

240 source_data_srcdist: SourcePackage, 

241 excuse: "Excuse", 

242 ) -> PolicyVerdict: # pragma: no cover 

243 """Apply a policy on a given source migration 

244 

245 Britney will call this method on a given source package, when 

246 Britney is considering to migrate it from the given source 

247 suite to the target suite. The policy will then evaluate the 

248 the migration and then return a verdict. 

249 

250 :param policy_info: A dictionary of all policy results. The 

251 policy can add a value stored in a key related to its name. 

252 (e.g. policy_info['age'] = {...}). This will go directly into 

253 the "excuses.yaml" output. 

254 

255 :param source_data_tdist: Information about the source package 

256 in the target distribution (e.g. "testing"). This is the 

257 data structure in source_suite.sources[source_name] 

258 

259 :param source_data_srcdist: Information about the source 

260 package in the source distribution (e.g. "unstable" or "tpu"). 

261 This is the data structure in target_suite.sources[source_name] 

262 

263 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

264 """ 

265 return PolicyVerdict.NOT_APPLICABLE 

266 

267 def apply_srcarch_policy_impl( 

268 self, 

269 policy_info: dict[str, Any], 

270 arch: str, 

271 source_data_tdist: SourcePackage | None, 

272 source_data_srcdist: SourcePackage, 

273 excuse: "Excuse", 

274 ) -> PolicyVerdict: 

275 """Apply a policy on a given binary migration 

276 

277 Britney will call this method on binaries from a given source package 

278 on a given architecture, when Britney is considering to migrate them 

279 from the given source suite to the target suite. The policy will then 

280 evaluate the migration and then return a verdict. 

281 

282 :param policy_info: A dictionary of all policy results. The 

283 policy can add a value stored in a key related to its name. 

284 (e.g. policy_info['age'] = {...}). This will go directly into 

285 the "excuses.yaml" output. 

286 

287 :param arch: The architecture the item is applied to. This is mostly 

288 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC 

289 (as that is the only case where arch can differ from item.architecture) 

290 

291 :param source_data_tdist: Information about the source package 

292 in the target distribution (e.g. "testing"). This is the 

293 data structure in source_suite.sources[source_name] 

294 

295 :param source_data_srcdist: Information about the source 

296 package in the source distribution (e.g. "unstable" or "tpu"). 

297 This is the data structure in target_suite.sources[source_name] 

298 

299 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

300 """ 

301 # if the policy doesn't implement this function, assume it's OK 

302 return PolicyVerdict.NOT_APPLICABLE 

303 

304 

305class AbstractBasePolicy(BasePolicy): 

306 """ 

307 A shared abstract class for building BasePolicy objects. 

308 

309 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy 

310 objects with just a two-item constructor, while all other uses of BasePolicy- 

311 derived objects need the 5-item constructor. So AbstractBasePolicy was split 

312 out to document this. 

313 """ 

314 

315 def __init__( 

316 self, 

317 policy_id: str, 

318 options: optparse.Values, 

319 suite_info: Suites, 

320 applicable_suites: set[SuiteClass], 

321 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC, 

322 ) -> None: 

323 """Concrete initializer. 

324 

325 :param policy_id: Identifies the policy. It will 

326 determine the key used for the excuses.yaml etc. 

327 

328 :param options: The options member of Britney with all the 

329 config values. 

330 

331 :param applicable_suites: Where this policy applies. 

332 """ 

333 self.policy_id = policy_id 

334 self.options = options 

335 self.suite_info = suite_info 

336 self.applicable_suites = applicable_suites 

337 self.src_policy = src_policy 

338 self.hints: HintCollection | None = None 

339 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

340 self.logger = logging.getLogger(logger_name) 

341 

342 @property 

343 def state_dir(self) -> str: 

344 return cast(str, self.options.state_dir) 

345 

346 

347_T = TypeVar("_T") 

348 

349 

350class SimplePolicyHint(Hint, Generic[_T]): 

351 def __init__( 

352 self, 

353 user: str, 

354 hint_type: HintType, 

355 policy_parameter: _T, 

356 packages: list[MigrationItem], 

357 ) -> None: 

358 super().__init__(user, hint_type, packages) 

359 self._policy_parameter = policy_parameter 

360 

361 def __eq__(self, other: Any) -> bool: 

362 if self.type != other.type or self._policy_parameter != other._policy_parameter: 

363 return False 

364 return super().__eq__(other) 

365 

366 def str(self) -> str: 

367 return "{} {} {}".format( 

368 self._type, 

369 str(self._policy_parameter), 

370 " ".join(x.name for x in self._packages), 

371 ) 

372 

373 

374class AgeDayHint(SimplePolicyHint[int]): 

375 @property 

376 def days(self) -> int: 

377 return self._policy_parameter 

378 

379 

380class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]): 

381 @property 

382 def ignored_rcbugs(self) -> frozenset[str]: 

383 return self._policy_parameter 

384 

385 

386def simple_policy_hint_parser_function( 

387 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint], 

388 converter: Callable[[str], _T], 

389) -> PolicyHintParserProto: 

390 def f( 

391 mi_factory: MigrationItemFactory, 

392 hints: HintCollection, 

393 who: str, 

394 hint_type: HintType, 

395 *args: str, 

396 ) -> None: 

397 policy_parameter = args[0] 

398 args = args[1:] 

399 for item in mi_factory.parse_items(*args): 

400 hints.add_hint( 

401 class_name(who, hint_type, converter(policy_parameter), [item]) 

402 ) 

403 

404 return f 

405 

406 

407class AgePolicy(AbstractBasePolicy): 

408 """Configurable Aging policy for source migrations 

409 

410 The AgePolicy will let packages stay in the source suite for a pre-defined 

411 amount of days before letting migrate (based on their urgency, if any). 

412 

413 The AgePolicy's decision is influenced by the following: 

414 

415 State files: 

416 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source 

417 packages. Note that urgencies are "sticky" and the most "urgent" urgency 

418 will be used (i.e. the one with lowest age-requirements). 

419 - This file needs to be updated externally, if the policy should take 

420 urgencies into consideration. If empty (or not updated), the policy 

421 will simply use the default urgency (see the "Config" section below) 

422 - In Debian, these values are taken from the .changes file, but that is 

423 not a requirement for Britney. 

424 * ${STATE_DIR}/age-policy-dates: File containing the age of all source 

425 packages. 

426 - The policy will automatically update this file. 

427 Config: 

428 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency 

429 (or for unknown urgencies). Will also be used to set the "minimum" 

430 aging requirements for packages not in the target suite. 

431 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the 

432 given urgency. 

433 - Commonly used urgencies are: low, medium, high, emergency, critical 

434 Hints: 

435 * urgent <source>/<version>: Disregard the age requirements for a given 

436 source/version. 

437 * age-days X <source>/<version>: Set the age requirements for a given 

438 source/version to X days. Note that X can exceed the highest 

439 age-requirement normally given. 

440 

441 """ 

442 

443 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

444 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}) 

445 self._min_days = self._generate_mindays_table() 

446 self._min_days_default = 0 

447 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day) 

448 # NB: _date_now is used in tests 

449 time_now = time.time() 

450 if hasattr(self.options, "fake_runtime"): 

451 time_now = int(self.options.fake_runtime) 

452 self.logger.info("overriding runtime with fake_runtime %d", time_now) 

453 

454 self._date_now = int(((time_now / (60 * 60)) - 19) / 24) 

455 self._dates: dict[str, tuple[str, int]] = {} 

456 self._urgencies: dict[str, str] = {} 

457 self._default_urgency: str = self.options.default_urgency 

458 self._penalty_immune_urgencies: frozenset[str] = frozenset() 

459 if hasattr(self.options, "no_penalties"): 

460 self._penalty_immune_urgencies = frozenset( 

461 x.strip() for x in self.options.no_penalties.split() 

462 ) 

463 self._bounty_min_age: int | None = None # initialised later 

464 

465 def _generate_mindays_table(self) -> dict[str, int]: 

466 mindays: dict[str, int] = {} 

467 for k in dir(self.options): 

468 if not k.startswith("mindays_"): 

469 continue 

470 v = getattr(self.options, k) 

471 try: 

472 as_days = int(v) 

473 except ValueError: 

474 raise ValueError( 

475 "Unable to parse " 

476 + k 

477 + " as a number of days. Must be 0 or a positive integer" 

478 ) 

479 if as_days < 0: 479 ↛ 480line 479 didn't jump to line 480 because the condition on line 479 was never true

480 raise ValueError( 

481 "The value of " + k + " must be zero or a positive integer" 

482 ) 

483 mindays[k.split("_")[1]] = as_days 

484 return mindays 

485 

486 def register_hints(self, hint_parser: HintParser) -> None: 

487 hint_parser.register_hint_type( 

488 HintType( 

489 "age-days", 

490 simple_policy_hint_parser_function(AgeDayHint, int), 

491 min_args=2, 

492 ) 

493 ) 

494 hint_parser.register_hint_type(HintType("urgent")) 

495 

496 def initialise(self, britney: "Britney") -> None: 

497 super().initialise(britney) 

498 self._read_dates_file() 

499 self._read_urgencies_file() 

500 if self._default_urgency not in self._min_days: # pragma: no cover 

501 raise ValueError( 

502 "Missing age-requirement for default urgency (MINDAYS_%s)" 

503 % self._default_urgency 

504 ) 

505 self._min_days_default = self._min_days[self._default_urgency] 

506 try: 

507 self._bounty_min_age = int(self.options.bounty_min_age) 

508 except ValueError: 508 ↛ 509line 508 didn't jump to line 509 because the exception caught by line 508 didn't happen

509 if self.options.bounty_min_age in self._min_days: 

510 self._bounty_min_age = self._min_days[self.options.bounty_min_age] 

511 else: # pragma: no cover 

512 raise ValueError( 

513 "Please fix BOUNTY_MIN_AGE in the britney configuration" 

514 ) 

515 except AttributeError: 

516 # The option wasn't defined in the configuration 

517 self._bounty_min_age = 0 

518 

519 def save_state(self, britney: "Britney") -> None: 

520 super().save_state(britney) 

521 self._write_dates_file() 

522 

523 def apply_src_policy_impl( 

524 self, 

525 age_info: dict[str, Any], 

526 source_data_tdist: SourcePackage | None, 

527 source_data_srcdist: SourcePackage, 

528 excuse: "Excuse", 

529 ) -> PolicyVerdict: 

530 # retrieve the urgency for the upload, ignoring it if this is a NEW package 

531 # (not present in the target suite) 

532 source_name = excuse.item.package 

533 urgency = self._urgencies.get(source_name, self._default_urgency) 

534 

535 if urgency not in self._min_days: 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true

536 age_info["unknown-urgency"] = urgency 

537 urgency = self._default_urgency 

538 

539 if not source_data_tdist: 

540 if self._min_days[urgency] < self._min_days_default: 

541 age_info["urgency-reduced"] = { 

542 "from": urgency, 

543 "to": self._default_urgency, 

544 } 

545 urgency = self._default_urgency 

546 

547 if source_name not in self._dates: 

548 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

549 elif self._dates[source_name][0] != source_data_srcdist.version: 

550 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

551 

552 days_old = self._date_now - self._dates[source_name][1] 

553 min_days = self._min_days[urgency] 

554 for bounty, bounty_value in excuse.bounty.items(): 

555 if bounty_value: 555 ↛ 554line 555 didn't jump to line 554 because the condition on line 555 was always true

556 self.logger.info( 

557 "Applying bounty for %s granted by %s: %d days", 

558 source_name, 

559 bounty, 

560 bounty_value, 

561 ) 

562 excuse.addinfo( 

563 "Required age reduced by %d days because of %s" 

564 % (bounty_value, bounty) 

565 ) 

566 assert bounty_value > 0, "negative bounties shouldn't happen" 

567 min_days -= bounty_value 

568 if urgency not in self._penalty_immune_urgencies: 

569 for penalty, penalty_value in excuse.penalty.items(): 

570 if penalty_value: 570 ↛ 569line 570 didn't jump to line 569 because the condition on line 570 was always true

571 self.logger.info( 

572 "Applying penalty for %s given by %s: %d days", 

573 source_name, 

574 penalty, 

575 penalty_value, 

576 ) 

577 excuse.addinfo( 

578 "Required age increased by %d days because of %s" 

579 % (penalty_value, penalty) 

580 ) 

581 assert ( 

582 penalty_value > 0 

583 ), "negative penalties should be handled earlier" 

584 min_days += penalty_value 

585 

586 assert self._bounty_min_age is not None 

587 # the age in BOUNTY_MIN_AGE can be higher than the one associated with 

588 # the real urgency, so don't forget to take it into account 

589 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency]) 

590 if min_days < bounty_min_age: 

591 min_days = bounty_min_age 

592 excuse.addinfo( 

593 "Required age is not allowed to drop below %d days" % min_days 

594 ) 

595 

596 age_info["current-age"] = days_old 

597 

598 assert self.hints is not None 

599 for hint in self.hints.search( 

600 "age-days", package=source_name, version=source_data_srcdist.version 

601 ): 

602 age_days_hint = cast("AgeDayHint", hint) 

603 

604 new_req = age_days_hint.days 

605 age_info["age-requirement-reduced"] = { 

606 "new-requirement": new_req, 

607 "changed-by": age_days_hint.user, 

608 } 

609 if "original-age-requirement" not in age_info: 609 ↛ 611line 609 didn't jump to line 611 because the condition on line 609 was always true

610 age_info["original-age-requirement"] = min_days 

611 min_days = new_req 

612 

613 age_info["age-requirement"] = min_days 

614 res = PolicyVerdict.PASS 

615 

616 if days_old < min_days: 

617 if ( 

618 urgent_hint := self.hints.search_first( 

619 "urgent", package=source_name, version=source_data_srcdist.version 

620 ) 

621 ) is not None: 

622 age_info["age-requirement-reduced"] = { 

623 "new-requirement": 0, 

624 "changed-by": urgent_hint.user, 

625 } 

626 res = PolicyVerdict.PASS_HINTED 

627 else: 

628 res = PolicyVerdict.REJECTED_TEMPORARILY 

629 

630 # update excuse 

631 age_hint = age_info.get("age-requirement-reduced", None) 

632 age_min_req = age_info["age-requirement"] 

633 if age_hint: 

634 new_req = age_hint["new-requirement"] 

635 who = age_hint["changed-by"] 

636 if new_req: 

637 excuse.addinfo( 

638 "Overriding age needed from %d days to %d by %s" 

639 % (age_min_req, new_req, who) 

640 ) 

641 age_min_req = new_req 

642 else: 

643 excuse.addinfo("Too young, but urgency pushed by %s" % who) 

644 age_min_req = 0 

645 excuse.setdaysold(age_info["current-age"], age_min_req) 

646 

647 if age_min_req == 0: 

648 excuse.addinfo("%d days old" % days_old) 

649 elif days_old < age_min_req: 

650 excuse.add_verdict_info( 

651 res, "Too young, only %d of %d days old" % (days_old, age_min_req) 

652 ) 

653 else: 

654 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req)) 

655 

656 return res 

657 

658 def _read_dates_file(self) -> None: 

659 """Parse the dates file""" 

660 dates = self._dates 

661 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates") 

662 using_new_name = False 

663 try: 

664 filename = os.path.join(self.state_dir, "age-policy-dates") 

665 if not os.path.exists(filename) and os.path.exists(fallback_filename): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true

666 filename = fallback_filename 

667 else: 

668 using_new_name = True 

669 except AttributeError: 

670 if os.path.exists(fallback_filename): 

671 filename = fallback_filename 

672 else: 

673 raise RuntimeError("Please set STATE_DIR in the britney configuration") 

674 

675 try: 

676 with open(filename, encoding="utf-8") as fd: 

677 for line in fd: 

678 if line.startswith("#"): 

679 # Ignore comment lines (mostly used for tests) 

680 continue 

681 # <source> <version> <date>) 

682 ln = line.split() 

683 if len(ln) != 3: # pragma: no cover 

684 continue 

685 try: 

686 dates[ln[0]] = (ln[1], int(ln[2])) 

687 except ValueError: # pragma: no cover 

688 pass 

689 except FileNotFoundError: 

690 if not using_new_name: 690 ↛ 692line 690 didn't jump to line 692 because the condition on line 690 was never true

691 # If we using the legacy name, then just give up 

692 raise 

693 self.logger.info("%s does not appear to exist. Creating it", filename) 

694 with open(filename, mode="x", encoding="utf-8"): 

695 pass 

696 

697 def _read_urgencies_file(self) -> None: 

698 urgencies = self._urgencies 

699 min_days_default = self._min_days_default 

700 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency") 

701 try: 

702 filename = os.path.join(self.state_dir, "age-policy-urgencies") 

703 if not os.path.exists(filename) and os.path.exists(fallback_filename): 703 ↛ 704line 703 didn't jump to line 704 because the condition on line 703 was never true

704 filename = fallback_filename 

705 except AttributeError: 

706 filename = fallback_filename 

707 

708 sources_s = self.suite_info.primary_source_suite.sources 

709 sources_t = self.suite_info.target_suite.sources 

710 

711 with open(filename, errors="surrogateescape", encoding="ascii") as fd: 

712 for line in fd: 

713 if line.startswith("#"): 

714 # Ignore comment lines (mostly used for tests) 

715 continue 

716 # <source> <version> <urgency> 

717 ln = line.split() 

718 if len(ln) != 3: 718 ↛ 719line 718 didn't jump to line 719 because the condition on line 718 was never true

719 continue 

720 

721 # read the minimum days associated with the urgencies 

722 urgency_old = urgencies.get(ln[0], None) 

723 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type] 

724 mindays_new = self._min_days.get(ln[2], min_days_default) 

725 

726 # if the new urgency is lower (so the min days are higher), do nothing 

727 if mindays_old <= mindays_new: 

728 continue 

729 

730 # if the package exists in the target suite and it is more recent, do nothing 

731 tsrcv = sources_t.get(ln[0], None) 

732 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0: 

733 continue 

734 

735 # if the package doesn't exist in the primary source suite or it is older, do nothing 

736 usrcv = sources_s.get(ln[0], None) 

737 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 737 ↛ 738line 737 didn't jump to line 738 because the condition on line 737 was never true

738 continue 

739 

740 # update the urgency for the package 

741 urgencies[ln[0]] = ln[2] 

742 

743 def _write_dates_file(self) -> None: 

744 dates = self._dates 

745 try: 

746 directory = self.state_dir 

747 basename = "age-policy-dates" 

748 old_file = os.path.join(self.suite_info.target_suite.path, "Dates") 

749 except AttributeError: 

750 directory = self.suite_info.target_suite.path 

751 basename = "Dates" 

752 old_file = None 

753 filename = os.path.join(directory, basename) 

754 filename_tmp = os.path.join(directory, f"{basename}_new") 

755 with open(filename_tmp, "w", encoding="utf-8") as fd: 

756 for pkg in sorted(dates): 

757 version, date = dates[pkg] 

758 fd.write("%s %s %d\n" % (pkg, version, date)) 

759 os.rename(filename_tmp, filename) 

760 if old_file is not None and os.path.exists(old_file): 760 ↛ 761line 760 didn't jump to line 761 because the condition on line 760 was never true

761 self.logger.info("Removing old age-policy-dates file %s", old_file) 

762 os.unlink(old_file) 

763 

764 

765class RCBugPolicy(AbstractBasePolicy): 

766 """RC bug regression policy for source migrations 

767 

768 The RCBugPolicy will read provided list of RC bugs and block any 

769 source upload that would introduce a *new* RC bug in the target 

770 suite. 

771 

772 The RCBugPolicy's decision is influenced by the following: 

773 

774 State files: 

775 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in 

776 the given suite (one for both primary source suite and the target sutie is 

777 needed). 

778 - These files need to be updated externally. 

779 """ 

780 

781 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

782 super().__init__( 

783 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

784 ) 

785 self._bugs_source: dict[str, set[str]] | None = None 

786 self._bugs_target: dict[str, set[str]] | None = None 

787 

788 def register_hints(self, hint_parser: HintParser) -> None: 

789 f = simple_policy_hint_parser_function( 

790 IgnoreRCBugHint, lambda x: frozenset(x.split(",")) 

791 ) 

792 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2)) 

793 

794 def initialise(self, britney: "Britney") -> None: 

795 super().initialise(britney) 

796 source_suite = self.suite_info.primary_source_suite 

797 target_suite = self.suite_info.target_suite 

798 fallback_unstable = os.path.join(source_suite.path, "BugsV") 

799 fallback_testing = os.path.join(target_suite.path, "BugsV") 

800 try: 

801 filename_unstable = os.path.join( 

802 self.state_dir, f"rc-bugs-{source_suite.name}" 

803 ) 

804 filename_testing = os.path.join( 

805 self.state_dir, f"rc-bugs-{target_suite.name}" 

806 ) 

807 if ( 807 ↛ 813line 807 didn't jump to line 813

808 not os.path.exists(filename_unstable) 

809 and not os.path.exists(filename_testing) 

810 and os.path.exists(fallback_unstable) 

811 and os.path.exists(fallback_testing) 

812 ): 

813 filename_unstable = fallback_unstable 

814 filename_testing = fallback_testing 

815 except AttributeError: 

816 filename_unstable = fallback_unstable 

817 filename_testing = fallback_testing 

818 self._bugs_source = self._read_bugs(filename_unstable) 

819 self._bugs_target = self._read_bugs(filename_testing) 

820 

821 def apply_src_policy_impl( 

822 self, 

823 rcbugs_info: dict[str, Any], 

824 source_data_tdist: SourcePackage | None, 

825 source_data_srcdist: SourcePackage, 

826 excuse: "Excuse", 

827 ) -> PolicyVerdict: 

828 assert self._bugs_source is not None # for type checking 

829 assert self._bugs_target is not None # for type checking 

830 bugs_t = set() 

831 bugs_s = set() 

832 source_name = excuse.item.package 

833 binaries_s = {x.package_name for x in source_data_srcdist.binaries} 

834 try: 

835 binaries_t = {x.package_name for x in source_data_tdist.binaries} # type: ignore[union-attr] 

836 except AttributeError: 

837 binaries_t = set() 

838 

839 src_key = f"src:{source_name}" 

840 if source_data_tdist and src_key in self._bugs_target: 

841 bugs_t.update(self._bugs_target[src_key]) 

842 if src_key in self._bugs_source: 

843 bugs_s.update(self._bugs_source[src_key]) 

844 

845 for pkg in binaries_s: 

846 if pkg in self._bugs_source: 

847 bugs_s |= self._bugs_source[pkg] 

848 for pkg in binaries_t: 

849 if pkg in self._bugs_target: 

850 bugs_t |= self._bugs_target[pkg] 

851 

852 # The bts seems to support filing source bugs against a binary of the 

853 # same name if that binary isn't built by any source. An example is bug 

854 # 820347 against Package: juce (in the live-2016-04-11 test). Add those 

855 # bugs too. 

856 if ( 

857 source_name not in (binaries_s | binaries_t) 

858 and source_name 

859 not in { 

860 x.package_name 

861 for x in self.suite_info.primary_source_suite.all_binaries_in_suite.keys() 

862 } 

863 and source_name 

864 not in { 

865 x.package_name 

866 for x in self.suite_info.target_suite.all_binaries_in_suite.keys() 

867 } 

868 ): 

869 if source_name in self._bugs_source: 

870 bugs_s |= self._bugs_source[source_name] 

871 if source_name in self._bugs_target: 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true

872 bugs_t |= self._bugs_target[source_name] 

873 

874 # If a package is not in the target suite, it has no RC bugs per 

875 # definition. Unfortunately, it seems that the live-data is 

876 # not always accurate (e.g. live-2011-12-13 suggests that 

877 # obdgpslogger had the same bug in testing and unstable, 

878 # but obdgpslogger was not in testing at that time). 

879 # - For the curious, obdgpslogger was removed on that day 

880 # and the BTS probably had not caught up with that fact. 

881 # (https://tracker.debian.org/news/415935) 

882 assert not bugs_t or source_data_tdist, ( 

883 "%s had bugs in the target suite but is not present" % source_name 

884 ) 

885 

886 verdict = PolicyVerdict.PASS 

887 

888 assert self.hints is not None 

889 for hint in self.hints.search( 

890 "ignore-rc-bugs", 

891 package=source_name, 

892 version=source_data_srcdist.version, 

893 ): 

894 ignore_hint = cast(IgnoreRCBugHint, hint) 

895 ignored_bugs = ignore_hint.ignored_rcbugs 

896 

897 # Only handle one hint for now 

898 if "ignored-bugs" in rcbugs_info: 

899 self.logger.info( 

900 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s", 

901 ignore_hint.user, 

902 source_name, 

903 rcbugs_info["ignored-bugs"]["issued-by"], 

904 ) 

905 continue 

906 if not ignored_bugs.isdisjoint(bugs_s): 906 ↛ 915line 906 didn't jump to line 915 because the condition on line 906 was always true

907 bugs_s -= ignored_bugs 

908 bugs_t -= ignored_bugs 

909 rcbugs_info["ignored-bugs"] = { 

910 "bugs": sorted(ignored_bugs), 

911 "issued-by": ignore_hint.user, 

912 } 

913 verdict = PolicyVerdict.PASS_HINTED 

914 else: 

915 self.logger.info( 

916 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package", 

917 ignore_hint.user, 

918 source_name, 

919 str(ignored_bugs), 

920 ) 

921 

922 rcbugs_info["shared-bugs"] = sorted(bugs_s & bugs_t) 

923 rcbugs_info["unique-source-bugs"] = sorted(bugs_s - bugs_t) 

924 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_s) 

925 

926 # update excuse 

927 new_bugs = rcbugs_info["unique-source-bugs"] 

928 old_bugs = rcbugs_info["unique-target-bugs"] 

929 excuse.setbugs(old_bugs, new_bugs) 

930 

931 if new_bugs: 

932 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

933 excuse.add_verdict_info( 

934 verdict, 

935 "Updating %s would introduce bugs in %s: %s" 

936 % ( 

937 source_name, 

938 self.suite_info.target_suite.name, 

939 ", ".join( 

940 f'<a href="https://bugs.debian.org/{quote(a)}">#{a}</a>' 

941 for a in new_bugs 

942 ), 

943 ), 

944 ) 

945 

946 if old_bugs: 

947 excuse.addinfo( 

948 "Updating %s will fix bugs in %s: %s" 

949 % ( 

950 source_name, 

951 self.suite_info.target_suite.name, 

952 ", ".join( 

953 f'<a href="https://bugs.debian.org/{quote(a)}">#{a}</a>' 

954 for a in old_bugs 

955 ), 

956 ) 

957 ) 

958 

959 return verdict 

960 

961 def _read_bugs(self, filename: str) -> dict[str, set[str]]: 

962 """Read the release critical bug summary from the specified file 

963 

964 The file contains rows with the format: 

965 

966 <package-name> <bug number>[,<bug number>...] 

967 

968 The method returns a dictionary where the key is the binary package 

969 name and the value is the list of open RC bugs for it. 

970 """ 

971 bugs: dict[str, set[str]] = {} 

972 self.logger.info("Loading RC bugs data from %s", filename) 

973 with open(filename, encoding="ascii") as f: 

974 for line in f: 

975 ln = line.split() 

976 if len(ln) != 2: # pragma: no cover 

977 self.logger.warning("Malformed line found in line %s", line) 

978 continue 

979 pkg = ln[0] 

980 if pkg not in bugs: 

981 bugs[pkg] = set() 

982 bugs[pkg].update(ln[1].split(",")) 

983 return bugs 

984 

985 

986class PiupartsState(Enum): 

987 FAIL = auto() 

988 PASS = auto() 

989 WAITING = auto() 

990 UNKNOWN = auto() 

991 

992 @staticmethod 

993 def from_str(val: str) -> "PiupartsState": 

994 match val: 

995 case "F": 

996 return PiupartsState.FAIL 

997 case "P": 

998 return PiupartsState.PASS 

999 case "W": 999 ↛ 1001line 999 didn't jump to line 1001 because the pattern on line 999 always matched

1000 return PiupartsState.WAITING 

1001 case "X": 

1002 return PiupartsState.UNKNOWN 

1003 case _: 

1004 raise ValueError(f"Invalid piuparts state {val}") 

1005 

1006 

1007class PiupartsResult(StrEnum): 

1008 PASS = "pass" 

1009 REGRESSION = "regression" 

1010 FAILED = "failed" 

1011 WAITING_FOR_TESTS = "waiting-for-test-results" 

1012 CANNOT_BE_TESTED = "cannot-be-tested" 

1013 

1014 

1015class PiupartsPolicy(AbstractBasePolicy): 

1016 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1017 super().__init__( 

1018 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

1019 ) 

1020 self._piuparts_source: dict[str, tuple[PiupartsState, str]] | None = None 

1021 self._piuparts_target: dict[str, PiupartsState] | None = None 

1022 

1023 def register_hints(self, hint_parser: HintParser) -> None: 

1024 hint_parser.register_hint_type(HintType("ignore-piuparts")) 

1025 

1026 def initialise(self, britney: "Britney") -> None: 

1027 super().initialise(britney) 

1028 source_suite = self.suite_info.primary_source_suite 

1029 target_suite = self.suite_info.target_suite 

1030 try: 

1031 filename_unstable = os.path.join( 

1032 self.state_dir, f"piuparts-summary-{source_suite.name}.json" 

1033 ) 

1034 filename_testing = os.path.join( 

1035 self.state_dir, f"piuparts-summary-{target_suite.name}.json" 

1036 ) 

1037 except AttributeError as e: # pragma: no cover 

1038 raise RuntimeError( 

1039 "Please set STATE_DIR in the britney configuration" 

1040 ) from e 

1041 self._piuparts_source = self._read_piuparts_summary(filename_unstable) 

1042 self._piuparts_target = self._read_piuparts_summary_without_url( 

1043 filename_testing 

1044 ) 

1045 

1046 def apply_src_policy_impl( 

1047 self, 

1048 piuparts_info: dict[str, Any], 

1049 source_data_tdist: SourcePackage | None, 

1050 source_data_srcdist: SourcePackage, 

1051 excuse: "Excuse", 

1052 ) -> PolicyVerdict: 

1053 assert self._piuparts_source is not None # for type checking 

1054 assert self._piuparts_target is not None # for type checking 

1055 source_name = excuse.item.package 

1056 

1057 if source_name in self._piuparts_target: 

1058 testing_state = self._piuparts_target[source_name] 

1059 else: 

1060 testing_state = PiupartsState.UNKNOWN 

1061 url: str | None 

1062 if source_name in self._piuparts_source: 

1063 unstable_state, url = self._piuparts_source[source_name] 

1064 else: 

1065 unstable_state = PiupartsState.UNKNOWN 

1066 url = None 

1067 url_html = "(no link yet)" 

1068 if url is not None: 

1069 url_html = '<a href="{0}">{0}</a>'.format(url) 

1070 

1071 match unstable_state: 

1072 case PiupartsState.PASS: 

1073 # Not a regression 

1074 msg = f"Piuparts tested OK - {url_html}" 

1075 result = PolicyVerdict.PASS 

1076 piuparts_info["test-results"] = PiupartsResult.PASS 

1077 case PiupartsState.FAIL if testing_state is not PiupartsState.FAIL: 

1078 piuparts_info["test-results"] = PiupartsResult.REGRESSION 

1079 msg = f"Piuparts regression - {url_html}" 

1080 result = PolicyVerdict.REJECTED_PERMANENTLY 

1081 case PiupartsState.FAIL: 

1082 piuparts_info["test-results"] = PiupartsResult.FAILED 

1083 msg = f"Piuparts failure (not a regression) - {url_html}" 

1084 result = PolicyVerdict.PASS 

1085 case PiupartsState.WAITING: 

1086 msg = f"Piuparts check waiting for test results - {url_html}" 

1087 result = PolicyVerdict.REJECTED_TEMPORARILY 

1088 piuparts_info["test-results"] = PiupartsResult.WAITING_FOR_TESTS 

1089 case _: 

1090 msg = f"Piuparts can't test {source_name} (not a blocker) - {url_html}" 

1091 piuparts_info["test-results"] = PiupartsResult.CANNOT_BE_TESTED 

1092 result = PolicyVerdict.PASS 

1093 

1094 if url is not None: 

1095 piuparts_info["piuparts-test-url"] = url 

1096 if result.is_rejected: 

1097 excuse.add_verdict_info(result, msg) 

1098 else: 

1099 excuse.addinfo(msg) 

1100 

1101 if result.is_rejected: 

1102 assert self.hints is not None 

1103 if ( 

1104 ignore_hint := self.hints.search_first( 

1105 "ignore-piuparts", 

1106 package=source_name, 

1107 version=source_data_srcdist.version, 

1108 ) 

1109 ) is not None: 

1110 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user} 

1111 result = PolicyVerdict.PASS_HINTED 

1112 excuse.addinfo( 

1113 f"Piuparts issue ignored as requested by {ignore_hint.user}" 

1114 ) 

1115 

1116 return result 

1117 

1118 def _read_piuparts_summary_gen( 

1119 self, filename: str 

1120 ) -> Generator[tuple[str, PiupartsState, str], None, None]: 

1121 self.logger.info("Loading piuparts report from %s", filename) 

1122 with open(filename) as fd: 1122 ↛ exitline 1122 didn't return from function '_read_piuparts_summary_gen' because the return on line 1124 wasn't executed

1123 if os.fstat(fd.fileno()).st_size < 1: 1123 ↛ 1124line 1123 didn't jump to line 1124 because the condition on line 1123 was never true

1124 return 

1125 data = json.load(fd) 

1126 try: 

1127 if ( 

1128 data["_id"] != "Piuparts Package Test Results Summary" 

1129 or data["_version"] != "1.0" 

1130 ): # pragma: no cover 

1131 raise ValueError( 

1132 f"Piuparts results in {filename} does not have the correct ID or version" 

1133 ) 

1134 except KeyError as e: # pragma: no cover 

1135 raise ValueError( 

1136 f"Piuparts results in {filename} is missing id or version field" 

1137 ) from e 

1138 for source, suite_data in data["packages"].items(): 

1139 if len(suite_data) != 1: # pragma: no cover 

1140 raise ValueError( 

1141 f"Piuparts results in {filename}, the source {source} does not have " 

1142 "exactly one result set" 

1143 ) 

1144 item = next(iter(suite_data.values())) 

1145 state, _, url = item 

1146 yield (source, PiupartsState.from_str(state), url) 

1147 

1148 def _read_piuparts_summary( 

1149 self, filename: str 

1150 ) -> dict[str, tuple[PiupartsState, str]]: 

1151 return { 

1152 source: (state, url) 

1153 for (source, state, url) in self._read_piuparts_summary_gen(filename) 

1154 } 

1155 

1156 def _read_piuparts_summary_without_url( 

1157 self, filename: str 

1158 ) -> dict[str, PiupartsState]: 

1159 return { 

1160 source: state 

1161 for (source, state, _) in self._read_piuparts_summary_gen(filename) 

1162 } 

1163 

1164 

1165class DependsPolicy(AbstractBasePolicy): 

1166 pkg_universe: "BinaryPackageUniverse" 

1167 broken_packages: frozenset["BinaryPackageId"] 

1168 all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

1169 allow_uninst: dict[str, set[str | None]] 

1170 

1171 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1172 super().__init__( 

1173 "depends", 

1174 options, 

1175 suite_info, 

1176 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1177 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1178 ) 

1179 self.nobreakall_arches = None 

1180 self.new_arches = None 

1181 self.break_arches = None 

1182 

1183 def initialise(self, britney: "Britney") -> None: 

1184 super().initialise(britney) 

1185 self.pkg_universe = britney.pkg_universe 

1186 self.broken_packages = self.pkg_universe.broken_packages 

1187 self.all_binaries = britney.all_binaries 

1188 self.nobreakall_arches = self.options.nobreakall_arches 

1189 self.new_arches = self.options.new_arches 

1190 self.break_arches = self.options.break_arches 

1191 self.allow_uninst = britney.allow_uninst 

1192 

1193 def apply_srcarch_policy_impl( 

1194 self, 

1195 deps_info: dict[str, Any], 

1196 arch: str, 

1197 source_data_tdist: SourcePackage | None, 

1198 source_data_srcdist: SourcePackage, 

1199 excuse: "Excuse", 

1200 ) -> PolicyVerdict: 

1201 verdict = PolicyVerdict.PASS 

1202 

1203 assert self.break_arches is not None 

1204 assert self.new_arches is not None 

1205 if arch in self.break_arches or arch in self.new_arches: 

1206 # we don't check these in the policy (TODO - for now?) 

1207 return verdict 

1208 

1209 item = excuse.item 

1210 source_suite = item.suite 

1211 target_suite = self.suite_info.target_suite 

1212 

1213 packages_s_a = source_suite.binaries[arch] 

1214 packages_t_a = target_suite.binaries[arch] 

1215 

1216 my_bins = sorted(filter_out_faux_gen(excuse.packages[arch])) 

1217 

1218 arch_all_installable = set() 

1219 arch_arch_installable = set() 

1220 consider_it_regression = True 

1221 

1222 for pkg_id in my_bins: 

1223 pkg_name = pkg_id.package_name 

1224 binary_u = packages_s_a[pkg_name] 

1225 pkg_arch = binary_u.architecture 

1226 

1227 # in some cases, we want to track the uninstallability of a 

1228 # package (because the autopkgtest policy uses this), but we still 

1229 # want to allow the package to be uninstallable 

1230 skip_dep_check = False 

1231 

1232 if binary_u.source_version != source_data_srcdist.version: 

1233 # don't check cruft in unstable 

1234 continue 

1235 

1236 if item.architecture != "source" and pkg_arch == "all": 

1237 # we don't care about the existing arch: all binaries when 

1238 # checking a binNMU item, because the arch: all binaries won't 

1239 # migrate anyway 

1240 skip_dep_check = True 

1241 

1242 if pkg_arch == "all" and arch not in self.nobreakall_arches: 

1243 skip_dep_check = True 

1244 

1245 if pkg_name in self.allow_uninst[arch]: 1245 ↛ 1248line 1245 didn't jump to line 1248 because the condition on line 1245 was never true

1246 # this binary is allowed to become uninstallable, so we don't 

1247 # need to check anything 

1248 skip_dep_check = True 

1249 

1250 if pkg_name in packages_t_a: 

1251 oldbin = packages_t_a[pkg_name] 

1252 if not target_suite.is_installable(oldbin.pkg_id): 

1253 # as the current binary in testing is already 

1254 # uninstallable, the newer version is allowed to be 

1255 # uninstallable as well, so we don't need to check 

1256 # anything 

1257 skip_dep_check = True 

1258 consider_it_regression = False 

1259 

1260 if pkg_id in self.broken_packages: 

1261 if pkg_arch == "all": 

1262 arch_all_installable.add(False) 

1263 else: 

1264 arch_arch_installable.add(False) 

1265 # dependencies can't be satisfied by all the known binaries - 

1266 # this certainly won't work... 

1267 excuse.add_unsatisfiable_on_arch(arch) 

1268 if skip_dep_check: 

1269 # ...but if the binary is allowed to become uninstallable, 

1270 # we don't care 

1271 # we still want the binary to be listed as uninstallable, 

1272 continue 

1273 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

1274 if pkg_name.endswith("-faux-build-depends"): 1274 ↛ 1275line 1274 didn't jump to line 1275 because the condition on line 1274 was never true

1275 name = pkg_name.removesuffix("-faux-build-depends") 

1276 excuse.add_verdict_info( 

1277 verdict, 

1278 f"src:{name} has unsatisfiable build dependency", 

1279 ) 

1280 else: 

1281 excuse.add_verdict_info( 

1282 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency" 

1283 ) 

1284 excuse.addreason("depends") 

1285 else: 

1286 if pkg_arch == "all": 

1287 arch_all_installable.add(True) 

1288 else: 

1289 arch_arch_installable.add(True) 

1290 

1291 if skip_dep_check: 

1292 continue 

1293 

1294 deps = self.pkg_universe.dependencies_of(pkg_id) 

1295 

1296 for dep in deps: 

1297 # dep is a list of packages, each of which satisfy the 

1298 # dependency 

1299 

1300 if not dep: 

1301 continue 

1302 is_ok = False 

1303 needed_for_dep = set() 

1304 

1305 for alternative in dep: 

1306 if target_suite.is_pkg_in_the_suite(alternative): 

1307 # dep can be satisfied in testing - ok 

1308 is_ok = True 

1309 elif alternative in my_bins: 

1310 # can be satisfied by binary from same item: will be 

1311 # ok if item migrates 

1312 is_ok = True 

1313 else: 

1314 needed_for_dep.add(alternative) 

1315 

1316 if not is_ok: 

1317 spec = DependencySpec(DependencyType.DEPENDS, arch) 

1318 excuse.add_package_depends(spec, needed_for_dep) 

1319 

1320 # The autopkgtest policy needs delicate trade offs for 

1321 # non-installability. The current choice (considering source 

1322 # migration and only binaries built by the version of the 

1323 # source): 

1324 # 

1325 # * Run autopkgtest if all arch:$arch binaries are installable 

1326 # (but some or all arch:all binaries are not) 

1327 # 

1328 # * Don't schedule nor wait for not installable arch:all only package 

1329 # on ! NOBREAKALL_ARCHES 

1330 # 

1331 # * Run autopkgtest if installability isn't a regression (there are (or 

1332 # rather, should) not be a lot of packages in this state, and most 

1333 # likely they'll just fail quickly) 

1334 # 

1335 # * Don't schedule, but wait otherwise 

1336 if arch_arch_installable == {True} and False in arch_all_installable: 

1337 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1338 elif ( 

1339 arch not in self.nobreakall_arches 

1340 and not arch_arch_installable 

1341 and False in arch_all_installable 

1342 ): 

1343 deps_info.setdefault("arch_all_not_installable", []).append(arch) 

1344 elif not consider_it_regression: 

1345 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1346 

1347 return verdict 

1348 

1349 

1350@unique 

1351class BuildDepResult(IntEnum): 

1352 # relation is satisfied in target 

1353 OK = 1 

1354 # relation can be satisfied by other packages in source 

1355 DEPENDS = 2 

1356 # relation cannot be satisfied 

1357 FAILED = 3 

1358 

1359 

1360class BuildDependsPolicy(AbstractBasePolicy): 

1361 

1362 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1363 super().__init__( 

1364 "build-depends", 

1365 options, 

1366 suite_info, 

1367 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1368 ) 

1369 self._all_buildarch: list[str] = [] 

1370 

1371 parse_option(options, "all_buildarch") 

1372 

1373 def initialise(self, britney: "Britney") -> None: 

1374 super().initialise(britney) 

1375 if self.options.all_buildarch: 

1376 self._all_buildarch = SuiteContentLoader.config_str_as_list( 

1377 self.options.all_buildarch, [] 

1378 ) 

1379 

1380 def apply_src_policy_impl( 

1381 self, 

1382 build_deps_info: dict[str, Any], 

1383 source_data_tdist: SourcePackage | None, 

1384 source_data_srcdist: SourcePackage, 

1385 excuse: "Excuse", 

1386 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1387 ) -> PolicyVerdict: 

1388 verdict = PolicyVerdict.PASS 

1389 

1390 # analyze the dependency fields (if present) 

1391 if deps := source_data_srcdist.build_deps_arch: 

1392 v = self._check_build_deps( 

1393 deps, 

1394 DependencyType.BUILD_DEPENDS, 

1395 build_deps_info, 

1396 source_data_tdist, 

1397 source_data_srcdist, 

1398 excuse, 

1399 get_dependency_solvers=get_dependency_solvers, 

1400 ) 

1401 verdict = PolicyVerdict.worst_of(verdict, v) 

1402 

1403 if ideps := source_data_srcdist.build_deps_indep: 

1404 v = self._check_build_deps( 

1405 ideps, 

1406 DependencyType.BUILD_DEPENDS_INDEP, 

1407 build_deps_info, 

1408 source_data_tdist, 

1409 source_data_srcdist, 

1410 excuse, 

1411 get_dependency_solvers=get_dependency_solvers, 

1412 ) 

1413 verdict = PolicyVerdict.worst_of(verdict, v) 

1414 

1415 return verdict 

1416 

1417 def _get_check_archs( 

1418 self, archs: Container[str], dep_type: DependencyType 

1419 ) -> list[str]: 

1420 oos = self.options.outofsync_arches 

1421 

1422 if dep_type is DependencyType.BUILD_DEPENDS: 

1423 return [ 

1424 arch 

1425 for arch in self.options.architectures 

1426 if arch in archs and arch not in oos 

1427 ] 

1428 

1429 # first try the all buildarch 

1430 checkarchs = list(self._all_buildarch) 

1431 # then try the architectures where this source has arch specific 

1432 # binaries (in the order of the architecture config file) 

1433 checkarchs.extend( 

1434 arch 

1435 for arch in self.options.architectures 

1436 if arch in archs and arch not in checkarchs 

1437 ) 

1438 # then try all other architectures 

1439 checkarchs.extend( 

1440 arch for arch in self.options.architectures if arch not in checkarchs 

1441 ) 

1442 

1443 # and drop OUTOFSYNC_ARCHES 

1444 return [arch for arch in checkarchs if arch not in oos] 

1445 

1446 def _add_info_for_arch( 

1447 self, 

1448 arch: str, 

1449 excuses_info: dict[str, list[str]], 

1450 blockers: dict[str, set[BinaryPackageId]], 

1451 results: dict[str, BuildDepResult], 

1452 dep_type: DependencyType, 

1453 target_suite: TargetSuite, 

1454 source_suite: Suite, 

1455 excuse: "Excuse", 

1456 verdict: PolicyVerdict, 

1457 ) -> PolicyVerdict: 

1458 if arch in blockers: 

1459 packages = blockers[arch] 

1460 

1461 # for the solving packages, update the excuse to add the dependencies 

1462 for p in packages: 

1463 if arch not in self.options.break_arches: 1463 ↛ 1462line 1463 didn't jump to line 1462 because the condition on line 1463 was always true

1464 spec = DependencySpec(dep_type, arch) 

1465 excuse.add_package_depends(spec, {p}) 

1466 

1467 if arch in results and results[arch] is BuildDepResult.FAILED: 

1468 verdict = PolicyVerdict.worst_of( 

1469 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1470 ) 

1471 

1472 if arch in excuses_info: 

1473 for excuse_text in excuses_info[arch]: 

1474 if verdict.is_rejected: 1474 ↛ 1477line 1474 didn't jump to line 1477 because the condition on line 1474 was always true

1475 excuse.add_verdict_info(verdict, excuse_text) 

1476 else: 

1477 excuse.addinfo(excuse_text) 

1478 

1479 return verdict 

1480 

1481 def _check_build_deps( 

1482 self, 

1483 deps: str, 

1484 dep_type: DependencyType, 

1485 build_deps_info: dict[str, Any], 

1486 source_data_tdist: SourcePackage | None, 

1487 source_data_srcdist: SourcePackage, 

1488 excuse: "Excuse", 

1489 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1490 ) -> PolicyVerdict: 

1491 verdict = PolicyVerdict.PASS 

1492 any_arch_ok = dep_type is DependencyType.BUILD_DEPENDS_INDEP 

1493 

1494 britney = self.britney 

1495 

1496 # local copies for better performance 

1497 parse_src_depends = apt_pkg.parse_src_depends 

1498 

1499 source_name = excuse.item.package 

1500 source_suite = excuse.item.suite 

1501 target_suite = self.suite_info.target_suite 

1502 binaries_s = source_suite.binaries 

1503 provides_s = source_suite.provides_table 

1504 binaries_t = target_suite.binaries 

1505 provides_t = target_suite.provides_table 

1506 unsat_bd: dict[str, list[str]] = {} 

1507 relevant_archs: set[str] = { 

1508 binary.architecture 

1509 for binary in filter_out_faux_gen(source_data_srcdist.binaries) 

1510 if britney.all_binaries[binary].architecture != "all" 

1511 } 

1512 

1513 excuses_info: dict[str, list[str]] = defaultdict(list) 

1514 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set) 

1515 arch_results = {} 

1516 result_archs = defaultdict(list) 

1517 bestresult = BuildDepResult.FAILED 

1518 check_archs = self._get_check_archs(relevant_archs, dep_type) 

1519 if not check_archs: 

1520 # when the arch list is empty, we check the b-d on any arch, instead of all archs 

1521 # this happens for Build-Depens on a source package that only produces arch: all binaries 

1522 any_arch_ok = True 

1523 check_archs = self._get_check_archs( 

1524 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP 

1525 ) 

1526 

1527 for arch in check_archs: 

1528 # retrieve the binary package from the specified suite and arch 

1529 binaries_s_a = binaries_s[arch] 

1530 provides_s_a = provides_s[arch] 

1531 binaries_t_a = binaries_t[arch] 

1532 provides_t_a = provides_t[arch] 

1533 arch_results[arch] = BuildDepResult.OK 

1534 # for every dependency block (formed as conjunction of disjunction) 

1535 for block_txt in deps.split(","): 

1536 block_list = parse_src_depends(block_txt, False, arch) 

1537 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be 

1538 # filtered out by (e.g.) architecture restrictions. We need to cope with this while 

1539 # keeping block_txt and block aligned. 

1540 if not block_list: 

1541 # Relation is not relevant for this architecture. 

1542 continue 

1543 block = block_list[0] 

1544 # if the block is satisfied in the target suite, then skip the block 

1545 if get_dependency_solvers( 

1546 block, binaries_t_a, provides_t_a, build_depends=True 

1547 ): 

1548 # Satisfied in the target suite; all ok. 

1549 continue 

1550 

1551 # check if the block can be satisfied in the source suite, and list the solving packages 

1552 packages = get_dependency_solvers( 

1553 block, binaries_s_a, provides_s_a, build_depends=True 

1554 ) 

1555 

1556 # if the dependency can be satisfied by the same source package, skip the block: 

1557 # obviously both binary packages will enter the target suite together 

1558 if any(source_name == p.source for p in packages): 1558 ↛ 1559line 1558 didn't jump to line 1559 because the condition on line 1558 was never true

1559 continue 

1560 

1561 # if no package can satisfy the dependency, add this information to the excuse 

1562 if not packages: 

1563 excuses_info[arch].append( 

1564 "%s unsatisfiable %s on %s: %s" 

1565 % (source_name, dep_type, arch, block_txt.strip()) 

1566 ) 

1567 if arch not in unsat_bd: 1567 ↛ 1569line 1567 didn't jump to line 1569 because the condition on line 1567 was always true

1568 unsat_bd[arch] = [] 

1569 unsat_bd[arch].append(block_txt.strip()) 

1570 arch_results[arch] = BuildDepResult.FAILED 

1571 continue 

1572 

1573 blockers[arch].update(p.pkg_id for p in packages) 

1574 if arch_results[arch] < BuildDepResult.DEPENDS: 

1575 arch_results[arch] = BuildDepResult.DEPENDS 

1576 

1577 if any_arch_ok: 

1578 if arch_results[arch] < bestresult: 

1579 bestresult = arch_results[arch] 

1580 result_archs[arch_results[arch]].append(arch) 

1581 if bestresult is BuildDepResult.OK: 

1582 # we found an architecture where the b-deps-indep are 

1583 # satisfied in the target suite, so we can stop 

1584 break 

1585 

1586 if any_arch_ok: 

1587 arch = result_archs[bestresult][0] 

1588 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}") 

1589 key = "check-%s-on-arch" % dep_type.get_reason() 

1590 build_deps_info[key] = arch 

1591 verdict = self._add_info_for_arch( 

1592 arch, 

1593 excuses_info, 

1594 blockers, 

1595 arch_results, 

1596 dep_type, 

1597 target_suite, 

1598 source_suite, 

1599 excuse, 

1600 verdict, 

1601 ) 

1602 

1603 else: 

1604 for arch in check_archs: 

1605 verdict = self._add_info_for_arch( 

1606 arch, 

1607 excuses_info, 

1608 blockers, 

1609 arch_results, 

1610 dep_type, 

1611 target_suite, 

1612 source_suite, 

1613 excuse, 

1614 verdict, 

1615 ) 

1616 

1617 if unsat_bd: 

1618 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd 

1619 

1620 return verdict 

1621 

1622 

1623class BuiltUsingPolicy(AbstractBasePolicy): 

1624 """Built-Using policy 

1625 

1626 Binaries that incorporate (part of) another source package must list these 

1627 sources under 'Built-Using'. 

1628 

1629 This policy checks if the corresponding sources are available in the 

1630 target suite. If they are not, but they are candidates for migration, a 

1631 dependency is added. 

1632 

1633 If the binary incorporates a newer version of a source, that is not (yet) 

1634 a candidate, we don't want to accept that binary. A rebuild later in the 

1635 primary suite wouldn't fix the issue, because that would incorporate the 

1636 newer version again. 

1637 

1638 If the binary incorporates an older version of the source, a newer version 

1639 will be accepted as a replacement. We assume that this can be fixed by 

1640 rebuilding the binary at some point during the development cycle. 

1641 

1642 Requiring exact version of the source would not be useful in practice. A 

1643 newer upload of that source wouldn't be blocked by this policy, so the 

1644 built-using would be outdated anyway. 

1645 

1646 """ 

1647 

1648 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1649 super().__init__( 

1650 "built-using", 

1651 options, 

1652 suite_info, 

1653 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1654 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1655 ) 

1656 

1657 def initialise(self, britney: "Britney") -> None: 

1658 super().initialise(britney) 

1659 

1660 def apply_srcarch_policy_impl( 

1661 self, 

1662 build_deps_info: dict[str, Any], 

1663 arch: str, 

1664 source_data_tdist: SourcePackage | None, 

1665 source_data_srcdist: SourcePackage, 

1666 excuse: "Excuse", 

1667 ) -> PolicyVerdict: 

1668 verdict = PolicyVerdict.PASS 

1669 

1670 source_suite = excuse.item.suite 

1671 target_suite = self.suite_info.target_suite 

1672 binaries_s = source_suite.binaries 

1673 

1674 def check_bu_in_suite( 

1675 bu_source: str, bu_version: str, source_suite: Suite 

1676 ) -> bool: 

1677 found = False 

1678 if bu_source not in source_suite.sources: 

1679 return found 

1680 s_source = source_suite.sources[bu_source] 

1681 s_ver = s_source.version 

1682 if apt_pkg.version_compare(s_ver, bu_version) >= 0: 

1683 found = True 

1684 dep = PackageId(bu_source, s_ver, "source") 

1685 if arch in self.options.break_arches: 

1686 excuse.add_detailed_info( 

1687 "Ignoring Built-Using for %s/%s on %s" 

1688 % (pkg_name, arch, dep.uvname) 

1689 ) 

1690 else: 

1691 spec = DependencySpec(DependencyType.BUILT_USING, arch) 

1692 excuse.add_package_depends(spec, {dep}) 

1693 excuse.add_detailed_info( 

1694 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}" 

1695 ) 

1696 

1697 return found 

1698 

1699 for pkg_id in sorted( 

1700 x 

1701 for x in filter_out_faux_gen(source_data_srcdist.binaries) 

1702 if x.architecture == arch 

1703 ): 

1704 pkg_name = pkg_id.package_name 

1705 

1706 # retrieve the testing (if present) and unstable corresponding binary packages 

1707 binary_s = binaries_s[arch][pkg_name] 

1708 if binary_s.builtusing is None: 

1709 continue 

1710 

1711 for bu in binary_s.builtusing: 

1712 bu_source = bu[0] 

1713 bu_version = bu[1] 

1714 found = False 

1715 if bu_source in target_suite.sources: 

1716 t_source = target_suite.sources[bu_source] 

1717 t_ver = t_source.version 

1718 if apt_pkg.version_compare(t_ver, bu_version) >= 0: 

1719 found = True 

1720 

1721 if not found: 

1722 found = check_bu_in_suite(bu_source, bu_version, source_suite) 

1723 

1724 if not found and source_suite.suite_class.is_additional_source: 

1725 found = check_bu_in_suite( 

1726 bu_source, bu_version, self.suite_info.primary_source_suite 

1727 ) 

1728 

1729 if not found: 

1730 if arch in self.options.break_arches: 

1731 excuse.add_detailed_info( 

1732 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s" 

1733 % (pkg_name, arch, bu_source, bu_version) 

1734 ) 

1735 else: 

1736 verdict = PolicyVerdict.worst_of( 

1737 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1738 ) 

1739 excuse.add_verdict_info( 

1740 verdict, 

1741 "%s/%s has unsatisfiable Built-Using on %s %s" 

1742 % (pkg_name, arch, bu_source, bu_version), 

1743 ) 

1744 

1745 return verdict 

1746 

1747 

1748class BlockPolicy(AbstractBasePolicy): 

1749 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$") 

1750 

1751 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1752 super().__init__( 

1753 "block", 

1754 options, 

1755 suite_info, 

1756 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1757 ) 

1758 self._blockall: dict[str | None, Hint] = {} 

1759 

1760 def initialise(self, britney: "Britney") -> None: 

1761 super().initialise(britney) 

1762 assert self.hints is not None 

1763 for hint in self.hints.search(type="block-all"): 

1764 self._blockall[hint.package] = hint 

1765 

1766 self._key_packages: frozenset[str] = frozenset() 

1767 if "key" in self._blockall: 

1768 self._key_packages = self._read_key_packages() 

1769 

1770 def _read_key_packages(self) -> frozenset[str]: 

1771 """Read the list of key packages 

1772 

1773 The file contains data in the yaml format : 

1774 

1775 - reason: <something> 

1776 source: <package> 

1777 

1778 The method returns a list of all key packages. 

1779 """ 

1780 filename = os.path.join(self.state_dir, "key_packages.yaml") 

1781 self.logger.info("Loading key packages from %s", filename) 

1782 if os.path.exists(filename): 1782 ↛ 1787line 1782 didn't jump to line 1787 because the condition on line 1782 was always true

1783 with open(filename) as f: 

1784 data = yaml.safe_load(f) 

1785 key_packages = frozenset(item["source"] for item in data) 

1786 else: 

1787 self.logger.error( 

1788 "Britney was asked to block key packages, " 

1789 + "but no key_packages.yaml file was found." 

1790 ) 

1791 sys.exit(1) 

1792 

1793 return key_packages 

1794 

1795 def register_hints(self, hint_parser: HintParser) -> None: 

1796 # block related hints are currently defined in hint.py 

1797 pass 

1798 

1799 def _check_blocked( 

1800 self, arch: str, version: str, excuse: "Excuse" 

1801 ) -> PolicyVerdict: 

1802 verdict = PolicyVerdict.PASS 

1803 blocked = {} 

1804 unblocked = {} 

1805 block_info = {} 

1806 source_suite = excuse.item.suite 

1807 suite_name = source_suite.name 

1808 src = excuse.item.package 

1809 is_primary = source_suite.suite_class is SuiteClass.PRIMARY_SOURCE_SUITE 

1810 

1811 tooltip = ( 

1812 f"please contact {self.options.distribution}-release if update is needed" 

1813 ) 

1814 

1815 assert self.hints is not None 

1816 mismatches = False 

1817 r = self.BLOCK_HINT_REGEX 

1818 for hint in self.hints.search(package=src): 

1819 m = r.match(hint.type) 

1820 if m: 

1821 if m.group(1) == "un": 

1822 assert hint.suite is not None 

1823 if ( 

1824 hint.version != version 

1825 or hint.suite.name != suite_name 

1826 or (hint.architecture != arch and hint.architecture != "source") 

1827 ): 

1828 self.logger.info( 

1829 "hint mismatch: %s %s %s", version, arch, suite_name 

1830 ) 

1831 mismatches = True 

1832 else: 

1833 unblocked[m.group(2)] = hint.user 

1834 excuse.add_hint(hint) 

1835 else: 

1836 # block(-*) hint: only accepts a source, so this will 

1837 # always match 

1838 blocked[m.group(2)] = hint.user 

1839 excuse.add_hint(hint) 

1840 

1841 if "block" not in blocked and is_primary: 

1842 # if there is a specific block hint for this package, we don't 

1843 # check for the general hints 

1844 

1845 if self.options.distribution == "debian": 1845 ↛ 1849line 1845 didn't jump to line 1849 because the condition on line 1845 was always true

1846 url = "https://release.debian.org/testing/freeze_policy.html" 

1847 tooltip = f'Follow the <a href="{url}">freeze policy</a> when applying for an unblock' 

1848 

1849 if "source" in self._blockall: 

1850 blocked["block"] = self._blockall["source"].user 

1851 excuse.add_hint(self._blockall["source"]) 

1852 elif ( 

1853 "new-source" in self._blockall 

1854 and src not in self.suite_info.target_suite.sources 

1855 ): 

1856 blocked["block"] = self._blockall["new-source"].user 

1857 excuse.add_hint(self._blockall["new-source"]) 

1858 # no tooltip: new sources will probably not be accepted anyway 

1859 block_info["block"] = "blocked by {}: is not in {}".format( 

1860 self._blockall["new-source"].user, 

1861 self.suite_info.target_suite.name, 

1862 ) 

1863 elif "key" in self._blockall and src in self._key_packages: 

1864 blocked["block"] = self._blockall["key"].user 

1865 excuse.add_hint(self._blockall["key"]) 

1866 block_info["block"] = "blocked by {}: is a key package ({})".format( 

1867 self._blockall["key"].user, 

1868 tooltip, 

1869 ) 

1870 elif "no-autopkgtest" in self._blockall: 

1871 if excuse.autopkgtest_results == {"PASS"}: 

1872 if not blocked: 1872 ↛ 1898line 1872 didn't jump to line 1898 because the condition on line 1872 was always true

1873 excuse.addinfo("not blocked: has successful autopkgtest") 

1874 else: 

1875 blocked["block"] = self._blockall["no-autopkgtest"].user 

1876 excuse.add_hint(self._blockall["no-autopkgtest"]) 

1877 if not excuse.autopkgtest_results: 

1878 block_info["block"] = ( 

1879 "blocked by %s: does not have autopkgtest (%s)" 

1880 % ( 

1881 self._blockall["no-autopkgtest"].user, 

1882 tooltip, 

1883 ) 

1884 ) 

1885 else: 

1886 block_info["block"] = ( 

1887 "blocked by %s: autopkgtest not fully successful (%s)" 

1888 % ( 

1889 self._blockall["no-autopkgtest"].user, 

1890 tooltip, 

1891 ) 

1892 ) 

1893 

1894 elif not is_primary: 

1895 blocked["block"] = suite_name 

1896 excuse.needs_approval = True 

1897 

1898 for block_cmd in blocked: 

1899 unblock_cmd = "un" + block_cmd 

1900 if block_cmd in unblocked: 

1901 if is_primary or block_cmd == "block-udeb": 

1902 excuse.addinfo( 

1903 "Ignoring %s request by %s, due to %s request by %s" 

1904 % ( 

1905 block_cmd, 

1906 blocked[block_cmd], 

1907 unblock_cmd, 

1908 unblocked[block_cmd], 

1909 ) 

1910 ) 

1911 else: 

1912 excuse.addinfo("Approved by %s" % (unblocked[block_cmd])) 

1913 else: 

1914 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL 

1915 if is_primary or block_cmd == "block-udeb": 

1916 # redirect people to d-i RM for udeb things: 

1917 if block_cmd == "block-udeb": 

1918 tooltip = "please contact the d-i release manager if an update is needed" 

1919 if block_cmd in block_info: 

1920 info = block_info[block_cmd] 

1921 else: 

1922 info = ( 

1923 "Not touching package due to {} request by {} ({})".format( 

1924 block_cmd, 

1925 blocked[block_cmd], 

1926 tooltip, 

1927 ) 

1928 ) 

1929 excuse.add_verdict_info(verdict, info) 

1930 else: 

1931 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM") 

1932 excuse.addreason("block") 

1933 if mismatches: 

1934 excuse.add_detailed_info( 

1935 f"Some hints for {src} do not match this item" 

1936 ) 

1937 return verdict 

1938 

1939 def apply_src_policy_impl( 

1940 self, 

1941 block_info: dict[str, Any], 

1942 source_data_tdist: SourcePackage | None, 

1943 source_data_srcdist: SourcePackage, 

1944 excuse: "Excuse", 

1945 ) -> PolicyVerdict: 

1946 return self._check_blocked("source", source_data_srcdist.version, excuse) 

1947 

1948 def apply_srcarch_policy_impl( 

1949 self, 

1950 block_info: dict[str, Any], 

1951 arch: str, 

1952 source_data_tdist: SourcePackage | None, 

1953 source_data_srcdist: SourcePackage, 

1954 excuse: "Excuse", 

1955 ) -> PolicyVerdict: 

1956 return self._check_blocked(arch, source_data_srcdist.version, excuse) 

1957 

1958 

1959class BuiltOnBuilddPolicy(AbstractBasePolicy): 

1960 

1961 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1962 super().__init__( 

1963 "builtonbuildd", 

1964 options, 

1965 suite_info, 

1966 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1967 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1968 ) 

1969 self._builtonbuildd: dict[str, Any] = { 

1970 "signerinfo": None, 

1971 } 

1972 

1973 def register_hints(self, hint_parser: HintParser) -> None: 

1974 hint_parser.register_hint_type( 

1975 HintType( 

1976 "allow-archall-maintainer-upload", 

1977 versioned=HintAnnotate.FORBIDDEN, 

1978 ) 

1979 ) 

1980 

1981 def initialise(self, britney: "Britney") -> None: 

1982 super().initialise(britney) 

1983 try: 

1984 filename_signerinfo = os.path.join(self.state_dir, "signers.json") 

1985 except AttributeError as e: # pragma: no cover 

1986 raise RuntimeError( 

1987 "Please set STATE_DIR in the britney configuration" 

1988 ) from e 

1989 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo) 

1990 

1991 def apply_srcarch_policy_impl( 

1992 self, 

1993 buildd_info: dict[str, Any], 

1994 arch: str, 

1995 source_data_tdist: SourcePackage | None, 

1996 source_data_srcdist: SourcePackage, 

1997 excuse: "Excuse", 

1998 ) -> PolicyVerdict: 

1999 verdict = PolicyVerdict.PASS 

2000 signers = self._builtonbuildd["signerinfo"] 

2001 

2002 if "signed-by" not in buildd_info: 

2003 buildd_info["signed-by"] = {} 

2004 

2005 item = excuse.item 

2006 source_suite = item.suite 

2007 

2008 # we use the source component, because a binary in contrib can 

2009 # belong to a source in main 

2010 component = get_component(source_data_srcdist.section) 

2011 

2012 packages_s_a = source_suite.binaries[arch] 

2013 assert self.hints is not None 

2014 

2015 for pkg_id in sorted( 

2016 x 

2017 for x in filter_out_faux_gen(source_data_srcdist.binaries) 

2018 if x.architecture == arch 

2019 ): 

2020 pkg_name = pkg_id.package_name 

2021 binary_u = packages_s_a[pkg_name] 

2022 pkg_arch = binary_u.architecture 

2023 

2024 if binary_u.source_version != source_data_srcdist.version: 2024 ↛ 2025line 2024 didn't jump to line 2025 because the condition on line 2024 was never true

2025 continue 

2026 

2027 if item.architecture != "source" and pkg_arch == "all": 

2028 # we don't care about the existing arch: all binaries when 

2029 # checking a binNMU item, because the arch: all binaries won't 

2030 # migrate anyway 

2031 continue 

2032 

2033 signer = None 

2034 uid = None 

2035 uidinfo = "" 

2036 buildd_ok = False 

2037 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2038 try: 

2039 signer = signers[pkg_name][pkg_id.version][pkg_arch] 

2040 if signer["buildd"]: 

2041 buildd_ok = True 

2042 uid = signer["uid"] 

2043 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}" 

2044 except KeyError: 

2045 self.logger.info( 

2046 "signer info for %s %s (%s) on %s not found", 

2047 pkg_name, 

2048 binary_u.version, 

2049 pkg_arch, 

2050 arch, 

2051 ) 

2052 uidinfo = f"upload info for arch {pkg_arch} binaries not found" 

2053 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

2054 if not buildd_ok: 

2055 if component != "main": 

2056 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2056 ↛ 2060line 2056 didn't jump to line 2060 because the condition on line 2056 was always true

2057 excuse.add_detailed_info( 

2058 f"{uidinfo}, but package in {component}" 

2059 ) 

2060 buildd_ok = True 

2061 elif pkg_arch == "all": 

2062 if ( 

2063 allow_hint := self.hints.search_first( 

2064 "allow-archall-maintainer-upload", package=item.package 

2065 ) 

2066 ) is not None: 

2067 buildd_ok = True 

2068 verdict = PolicyVerdict.worst_of( 

2069 verdict, PolicyVerdict.PASS_HINTED 

2070 ) 

2071 if pkg_arch not in buildd_info["signed-by"]: 

2072 excuse.addinfo( 

2073 f"{uidinfo}, but whitelisted by {allow_hint.user}" 

2074 ) 

2075 if not buildd_ok: 

2076 verdict = failure_verdict 

2077 if pkg_arch not in buildd_info["signed-by"]: 

2078 if pkg_arch == "all": 

2079 uidinfo += ( 

2080 ", a new source-only upload is needed to allow migration" 

2081 ) 

2082 excuse.add_verdict_info(verdict, f"Not built on buildd: {uidinfo}") 

2083 

2084 if ( 2084 ↛ 2088line 2084 didn't jump to line 2088

2085 pkg_arch in buildd_info["signed-by"] 

2086 and buildd_info["signed-by"][pkg_arch] != uid 

2087 ): 

2088 self.logger.info( 

2089 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed", 

2090 pkg_name, 

2091 binary_u.source, 

2092 binary_u.source_version, 

2093 pkg_arch, 

2094 uid, 

2095 buildd_info["signed-by"][pkg_arch], 

2096 ) 

2097 

2098 buildd_info["signed-by"][pkg_arch] = uid 

2099 

2100 return verdict 

2101 

2102 def _read_signerinfo(self, filename: str) -> dict[str, Any]: 

2103 signerinfo: dict[str, Any] = {} 

2104 self.logger.info("Loading signer info from %s", filename) 

2105 with open(filename) as fd: 2105 ↛ exitline 2105 didn't return from function '_read_signerinfo' because the return on line 2107 wasn't executed

2106 if os.fstat(fd.fileno()).st_size < 1: 2106 ↛ 2107line 2106 didn't jump to line 2107 because the condition on line 2106 was never true

2107 return signerinfo 

2108 signerinfo = json.load(fd) 

2109 

2110 return signerinfo 

2111 

2112 

2113class ImplicitDependencyPolicy(AbstractBasePolicy): 

2114 """Implicit Dependency policy 

2115 

2116 Upgrading a package pkg-a can break the installability of a package pkg-b. 

2117 A newer version (or the removal) of pkg-b might fix the issue. In that 

2118 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only 

2119 migrate if pkg-b also migrates. 

2120 

2121 This policy tries to discover a few common cases, and adds the relevant 

2122 info to the excuses. If another item is needed to fix the 

2123 uninstallability, a dependency is added. If no newer item can fix it, this 

2124 excuse will be blocked. 

2125 

2126 Note that the migration step will check the installability of every 

2127 package, so this policy doesn't need to handle every corner case. It 

2128 must, however, make sure that no excuse is unnecessarily blocked. 

2129 

2130 Some cases that should be detected by this policy: 

2131 

2132 * pkg-a is upgraded from 1.0-1 to 2.0-1, while 

2133 pkg-b has "Depends: pkg-a (<< 2.0)" 

2134 This typically happens if pkg-b has a strict dependency on pkg-a because 

2135 it uses some non-stable internal interface (examples are glibc, 

2136 binutils, python3-defaults, ...) 

2137 

2138 * pkg-a is upgraded from 1.0-1 to 2.0-1, and 

2139 pkg-a 1.0-1 has "Provides: provides-1", 

2140 pkg-a 2.0-1 has "Provides: provides-2", 

2141 pkg-b has "Depends: provides-1" 

2142 This typically happens when pkg-a has an interface that changes between 

2143 versions, and a virtual package is used to identify the version of this 

2144 interface (e.g. perl-api-x.y) 

2145 

2146 """ 

2147 

2148 _pkg_universe: "BinaryPackageUniverse" 

2149 _all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

2150 _allow_uninst: dict[str, set[str | None]] 

2151 _nobreakall_arches: list[str] 

2152 

2153 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2154 super().__init__( 

2155 "implicit-deps", 

2156 options, 

2157 suite_info, 

2158 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2159 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2160 ) 

2161 

2162 def initialise(self, britney: "Britney") -> None: 

2163 super().initialise(britney) 

2164 self._pkg_universe = britney.pkg_universe 

2165 self._all_binaries = britney.all_binaries 

2166 self._smooth_updates = britney.options.smooth_updates 

2167 self._nobreakall_arches = self.options.nobreakall_arches 

2168 self._new_arches = self.options.new_arches 

2169 self._break_arches = self.options.break_arches 

2170 self._allow_uninst = britney.allow_uninst 

2171 self._outofsync_arches = self.options.outofsync_arches 

2172 

2173 def can_be_removed(self, pkg: BinaryPackage) -> bool: 

2174 src = pkg.source 

2175 target_suite = self.suite_info.target_suite 

2176 

2177 # TODO these conditions shouldn't be hardcoded here 

2178 # ideally, we would be able to look up excuses to see if the removal 

2179 # is in there, but in the current flow, this policy is called before 

2180 # all possible excuses exist, so there is no list for us to check 

2181 

2182 if src not in self.suite_info.primary_source_suite.sources: 

2183 # source for pkg not in unstable: candidate for removal 

2184 return True 

2185 

2186 source_t = target_suite.sources[src] 

2187 assert self.hints is not None 

2188 if self.hints.has_hint("remove", package=src, version=source_t.version): 

2189 # removal hint for the source in testing: candidate for removal 

2190 return True 

2191 

2192 if target_suite.is_cruft(pkg): 

2193 # if pkg is cruft in testing, removal will be tried 

2194 return True 

2195 

2196 # the case were the newer version of the source no longer includes the 

2197 # binary (or includes a cruft version of the binary) will be handled 

2198 # separately (in that case there might be an implicit dependency on 

2199 # the newer source) 

2200 

2201 return False 

2202 

2203 def should_skip_rdep( 

2204 self, pkg: BinaryPackage, source_name: str, myarch: str 

2205 ) -> bool: 

2206 target_suite = self.suite_info.target_suite 

2207 

2208 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id): 

2209 # it is not in the target suite, migration cannot break anything 

2210 return True 

2211 

2212 if pkg.source == source_name: 

2213 # if it is built from the same source, it will be upgraded 

2214 # with the source 

2215 return True 

2216 

2217 if self.can_be_removed(pkg): 

2218 # could potentially be removed, so if that happens, it won't be 

2219 # broken 

2220 return True 

2221 

2222 if pkg.architecture == "all" and myarch not in self._nobreakall_arches: 

2223 # arch all on non nobreakarch is allowed to become uninstallable 

2224 return True 

2225 

2226 if pkg.pkg_id.package_name in self._allow_uninst[myarch]: 

2227 # there is a hint to allow this binary to become uninstallable 

2228 return True 

2229 

2230 if not target_suite.is_installable(pkg.pkg_id): 

2231 # it is already uninstallable in the target suite, migration 

2232 # cannot break anything 

2233 return True 

2234 

2235 return False 

2236 

2237 def breaks_installability( 

2238 self, 

2239 pkg_id_t: BinaryPackageId, 

2240 pkg_id_s: BinaryPackageId | None, 

2241 pkg_to_check: BinaryPackageId, 

2242 ) -> bool: 

2243 """ 

2244 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of 

2245 pkg_to_check. 

2246 

2247 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to 

2248 None. 

2249 """ 

2250 

2251 pkg_universe = self._pkg_universe 

2252 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check) 

2253 

2254 for dep in pkg_universe.dependencies_of(pkg_to_check): 

2255 if pkg_id_t not in dep: 

2256 # this depends doesn't have pkg_id_t as alternative, so 

2257 # upgrading pkg_id_t cannot break this dependency clause 

2258 continue 

2259 

2260 # We check all the alternatives for this dependency, to find one 

2261 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s 

2262 found_alternative = False 

2263 for d in dep: 

2264 if d in negative_deps: 

2265 # If this alternative dependency conflicts with 

2266 # pkg_to_check, it cannot be used to satisfy the 

2267 # dependency. 

2268 # This commonly happens when breaks are added to pkg_id_s. 

2269 continue 

2270 

2271 if d.package_name != pkg_id_t.package_name: 

2272 # a binary different from pkg_id_t can satisfy the dep, so 

2273 # upgrading pkg_id_t won't break this dependency 

2274 found_alternative = True 

2275 break 

2276 

2277 if d != pkg_id_s: 

2278 # We want to know the impact of the upgrade of 

2279 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the 

2280 # target suite, any other version of this binary will 

2281 # not be there, so it cannot satisfy this dependency. 

2282 # This includes pkg_id_t, but also other versions. 

2283 continue 

2284 

2285 # pkg_id_s can satisfy the dep 

2286 found_alternative = True 

2287 break 

2288 

2289 if not found_alternative: 

2290 return True 

2291 return False 

2292 

2293 def check_upgrade( 

2294 self, 

2295 pkg_id_t: BinaryPackageId, 

2296 pkg_id_s: BinaryPackageId | None, 

2297 source_name: str, 

2298 myarch: str, 

2299 broken_binaries: set[str], 

2300 excuse: "Excuse", 

2301 ) -> PolicyVerdict: 

2302 verdict = PolicyVerdict.PASS 

2303 

2304 pkg_universe = self._pkg_universe 

2305 all_binaries = self._all_binaries 

2306 

2307 # check all rdeps of the package in testing 

2308 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t) 

2309 

2310 for rdep_pkg in sorted(rdeps_t): 

2311 rdep_p = all_binaries[rdep_pkg] 

2312 

2313 # check some cases where the rdep won't become uninstallable, or 

2314 # where we don't care if it does 

2315 if self.should_skip_rdep(rdep_p, source_name, myarch): 

2316 continue 

2317 

2318 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg): 

2319 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg, 

2320 # there is no implicit dependency 

2321 continue 

2322 

2323 # The upgrade breaks the installability of the rdep. We need to 

2324 # find out if there is a newer version of the rdep that solves the 

2325 # uninstallability. If that is the case, there is an implicit 

2326 # dependency. If not, the upgrade will fail. 

2327 

2328 # check source versions 

2329 good_newer_versions = set() 

2330 for npkg, suite in find_newer_binaries( 

2331 self.suite_info, rdep_p, add_source_for_dropped_bin=True 

2332 ): 

2333 if npkg.architecture == "source": 

2334 # When a newer version of the source package doesn't have 

2335 # the binary, we get the source as 'newer version'. In 

2336 # this case, the binary will not be uninstallable if the 

2337 # newer source migrates, because it is no longer there. 

2338 good_newer_versions.add(npkg) 

2339 continue 

2340 assert isinstance(npkg, BinaryPackageId) 

2341 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg): 

2342 good_newer_versions.add(npkg) 

2343 

2344 if good_newer_versions: 

2345 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch) 

2346 excuse.add_package_depends(spec, good_newer_versions) 

2347 else: 

2348 # no good newer versions: no possible solution 

2349 broken_binaries.add(rdep_pkg.name) 

2350 if pkg_id_s: 

2351 action = "migrating {} to {}".format( 

2352 pkg_id_s.name, 

2353 self.suite_info.target_suite.name, 

2354 ) 

2355 else: 

2356 action = "removing {} from {}".format( 

2357 pkg_id_t.name, 

2358 self.suite_info.target_suite.name, 

2359 ) 

2360 if rdep_pkg.package_name.endswith("-faux-build-depends"): 

2361 name = rdep_pkg.package_name.removesuffix("-faux-build-depends") 

2362 info = f'{action} makes Build-Depends of src:<a href="#{name}">{name}</a> uninstallable' 

2363 else: 

2364 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format( 

2365 action, rdep_pkg.name 

2366 ) 

2367 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2368 excuse.add_verdict_info(verdict, info) 

2369 

2370 return verdict 

2371 

2372 def apply_srcarch_policy_impl( 

2373 self, 

2374 implicit_dep_info: dict[str, Any], 

2375 arch: str, 

2376 source_data_tdist: SourcePackage | None, 

2377 source_data_srcdist: SourcePackage, 

2378 excuse: "Excuse", 

2379 ) -> PolicyVerdict: 

2380 verdict = PolicyVerdict.PASS 

2381 

2382 if not source_data_tdist: 

2383 # this item is not currently in testing: no implicit dependency 

2384 return verdict 

2385 

2386 if excuse.hasreason("missingbuild"): 

2387 # if the build is missing, the policy would treat this as if the 

2388 # binaries would be removed, which would give incorrect (and 

2389 # confusing) info 

2390 info = "missing build, not checking implicit dependencies on %s" % (arch) 

2391 excuse.add_detailed_info(info) 

2392 return verdict 

2393 

2394 source_suite = excuse.item.suite 

2395 source_name = excuse.item.package 

2396 target_suite = self.suite_info.target_suite 

2397 all_binaries = self._all_binaries 

2398 

2399 # we check all binaries for this excuse that are currently in testing 

2400 relevant_binaries = sorted( 

2401 x 

2402 for x in source_data_tdist.binaries 

2403 if (arch == "source" or x.architecture == arch) 

2404 and x.package_name in target_suite.binaries[x.architecture] 

2405 and x.architecture not in self._new_arches 

2406 and x.architecture not in self._break_arches 

2407 and x.architecture not in self._outofsync_arches 

2408 ) 

2409 

2410 broken_binaries: set[str] = set() 

2411 

2412 assert self.hints is not None 

2413 for pkg_id_t in relevant_binaries: 

2414 mypkg = pkg_id_t.package_name 

2415 myarch = pkg_id_t.architecture 

2416 binaries_t_a = target_suite.binaries[myarch] 

2417 binaries_s_a = source_suite.binaries[myarch] 

2418 

2419 if target_suite.is_cruft(all_binaries[pkg_id_t]): 

2420 # this binary is cruft in testing: it will stay around as long 

2421 # as necessary to satisfy dependencies, so we don't need to 

2422 # care 

2423 continue 

2424 

2425 if mypkg in binaries_s_a: 

2426 mybin = binaries_s_a[mypkg] 

2427 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id 

2428 if mybin.source != source_name: 

2429 # hijack: this is too complicated to check, so we ignore 

2430 # it (the migration code will check the installability 

2431 # later anyway) 

2432 pass 

2433 elif mybin.source_version != source_data_srcdist.version: 

2434 # cruft in source suite: pretend the binary doesn't exist 

2435 pkg_id_s = None 

2436 elif pkg_id_t == pkg_id_s: 

2437 # same binary (probably arch: all from a binNMU): 

2438 # 'upgrading' doesn't change anything, for this binary, so 

2439 # it won't break anything 

2440 continue 

2441 else: 

2442 pkg_id_s = None 

2443 

2444 if not pkg_id_s and is_smooth_update_allowed( 

2445 binaries_t_a[mypkg], self._smooth_updates, self.hints 

2446 ): 

2447 # the binary isn't in the new version (or is cruft there), and 

2448 # smooth updates are allowed: the binary can stay around if 

2449 # that is necessary to satisfy dependencies, so we don't need 

2450 # to check it 

2451 continue 

2452 

2453 if ( 

2454 not pkg_id_s 

2455 and source_data_tdist.version == source_data_srcdist.version 

2456 and source_suite.suite_class is SuiteClass.ADDITIONAL_SOURCE_SUITE 

2457 and binaries_t_a[mypkg].architecture == "all" 

2458 ): 

2459 # we're very probably migrating a binNMU built in tpu where the arch:all 

2460 # binaries were not copied to it as that's not needed. This policy could 

2461 # needlessly block. 

2462 continue 

2463 

2464 v = self.check_upgrade( 

2465 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse 

2466 ) 

2467 verdict = PolicyVerdict.worst_of(verdict, v) 

2468 

2469 # each arch is processed separately, so if we already have info from 

2470 # other archs, we need to merge the info from this arch 

2471 broken_old = set(implicit_dep_info.get("broken-binaries", [])) 

2472 implicit_dep_info["broken-binaries"] = sorted(broken_old | broken_binaries) 

2473 

2474 return verdict 

2475 

2476 

2477class ReverseRemovalPolicy(AbstractBasePolicy): 

2478 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2479 super().__init__( 

2480 "reverseremoval", 

2481 options, 

2482 suite_info, 

2483 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2484 ) 

2485 

2486 def register_hints(self, hint_parser: HintParser) -> None: 

2487 hint_parser.register_hint_type(HintType("ignore-reverse-remove")) 

2488 

2489 def initialise(self, britney: "Britney") -> None: 

2490 super().initialise(britney) 

2491 

2492 pkg_universe = britney.pkg_universe 

2493 source_suites = britney.suite_info.source_suites 

2494 target_suite = britney.suite_info.target_suite 

2495 

2496 # Build set of the sources of reverse (Build-) Depends 

2497 assert self.hints is not None 

2498 

2499 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set) 

2500 for hint in self.hints.search("remove"): 

2501 for item in hint.packages: 

2502 # I think we don't need to look at the target suite 

2503 for src_suite in source_suites: 

2504 try: 

2505 # Explicitly not running filter_out_faux here 

2506 my_bins = set(src_suite.sources[item.uvname].binaries) 

2507 except KeyError: 

2508 continue 

2509 compute_reverse_tree(pkg_universe, my_bins) 

2510 for this_bin in my_bins: 

2511 rev_bin.setdefault(this_bin, set()).add(item.uvname) 

2512 

2513 rev_src: dict[str, set[str]] = defaultdict(set) 

2514 for bin_pkg, reasons in rev_bin.items(): 

2515 # If the pkg is in the target suite, there's nothing this 

2516 # policy wants to do. 

2517 if target_suite.is_pkg_in_the_suite(bin_pkg): 

2518 continue 

2519 that_bin = britney.all_binaries[bin_pkg] 

2520 bin_src = that_bin.source + "/" + that_bin.source_version 

2521 rev_src.setdefault(bin_src, set()).update(reasons) 

2522 self._block_src_for_rm_hint = rev_src 

2523 

2524 def apply_src_policy_impl( 

2525 self, 

2526 rev_remove_info: dict[str, Any], 

2527 source_data_tdist: SourcePackage | None, 

2528 source_data_srcdist: SourcePackage, 

2529 excuse: "Excuse", 

2530 ) -> PolicyVerdict: 

2531 verdict = PolicyVerdict.PASS 

2532 

2533 item = excuse.item 

2534 if item.name in self._block_src_for_rm_hint: 

2535 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name])) 

2536 assert self.hints is not None 

2537 excuse.addreason("reverseremoval") 

2538 if ( 

2539 ignore_hint := self.hints.search_first( 

2540 "ignore-reverse-remove", package=item.uvname, version=item.version 

2541 ) 

2542 ) is not None: 

2543 excuse.addreason("ignore-reverse-remove") 

2544 excuse.addinfo( 

2545 "Should block migration because of remove hint for %s, but forced by %s" 

2546 % (reason, ignore_hint.user) 

2547 ) 

2548 verdict = PolicyVerdict.PASS_HINTED 

2549 else: 

2550 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2551 excuse.add_verdict_info( 

2552 verdict, "Remove hint for (transitive) dependency: %s" % reason 

2553 ) 

2554 

2555 return verdict 

2556 

2557 

2558class ReproducibleState(Enum): 

2559 BAD = auto() 

2560 FAIL = auto() 

2561 GOOD = auto() 

2562 UNKNOWN = auto() 

2563 

2564 @staticmethod 

2565 def from_str(val: str | None) -> "ReproducibleState": 

2566 match val: 

2567 case "BAD": 

2568 return ReproducibleState.BAD 

2569 case "FAIL": 2569 ↛ 2570line 2569 didn't jump to line 2570 because the pattern on line 2569 never matched

2570 return ReproducibleState.FAIL 

2571 case "GOOD": 2571 ↛ 2573line 2571 didn't jump to line 2573 because the pattern on line 2571 always matched

2572 return ReproducibleState.GOOD 

2573 case "UNKNOWN" | None: 

2574 return ReproducibleState.UNKNOWN 

2575 case _: 

2576 raise ValueError(f"Invalid reproducability state f{str}") 

2577 

2578 

2579@dataclass(slots=True, frozen=True) 

2580class ReproducibleData: 

2581 state: ReproducibleState 

2582 build_id: str | None = field(default=None, kw_only=True) 

2583 diffoscope_log_id: str | None = field(default=None, kw_only=True) 

2584 artifact_id: str | None = field(default=None, kw_only=True) 

2585 

2586 

2587class ReproduciblePolicy(AbstractBasePolicy): 

2588 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2589 super().__init__( 

2590 "reproducible", 

2591 options, 

2592 suite_info, 

2593 {SuiteClass.PRIMARY_SOURCE_SUITE}, 

2594 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2595 ) 

2596 self._reproducible: dict[str, dict[tuple[str, str], ReproducibleData]] = {} 

2597 

2598 # Default values for this policy's options 

2599 parse_option(options, "repro_success_bounty", default=0, to_int=True) 

2600 parse_option(options, "repro_regression_penalty", default=0, to_int=True) 

2601 parse_option(options, "repro_log_url") 

2602 parse_option(options, "repro_excuse_url") 

2603 parse_option(options, "repro_retry_url") 

2604 parse_option(options, "repro_components") 

2605 

2606 def register_hints(self, hint_parser: HintParser) -> None: 

2607 hint_parser.register_hint_type( 

2608 HintType( 

2609 "ignore-reproducible-src", 

2610 versioned=HintAnnotate.OPTIONAL, 

2611 architectured=HintAnnotate.OPTIONAL, 

2612 ) 

2613 ) 

2614 hint_parser.register_hint_type( 

2615 HintType( 

2616 "ignore-reproducible", 

2617 versioned=HintAnnotate.OPTIONAL, 

2618 architectured=HintAnnotate.OPTIONAL, 

2619 ) 

2620 ) 

2621 

2622 def initialise(self, britney: "Britney") -> None: 

2623 super().initialise(britney) 

2624 summary = self._reproducible 

2625 

2626 valid_release_names = { 

2627 suite.codename 

2628 for suite in chain( 

2629 (britney.suite_info.target_suite,), 

2630 britney.suite_info.source_suites, 

2631 ) 

2632 } | { 

2633 suite.name 

2634 for suite in chain( 

2635 (britney.suite_info.target_suite,), 

2636 britney.suite_info.source_suites, 

2637 ) 

2638 } 

2639 

2640 assert hasattr( 

2641 self, "state_dir" 

2642 ), "Please set STATE_DIR in the britney configuration" 

2643 assert ( 

2644 self.options.repro_components 

2645 ), "Please set REPRO_COMPONENTS in the britney configuration" 

2646 for file in os.listdir(self.state_dir): 

2647 if not file.startswith("reproducible-") or not file.endswith(".json"): 2647 ↛ 2648line 2647 didn't jump to line 2648 because the condition on line 2647 was never true

2648 continue 

2649 filename = os.path.join(self.state_dir, file) 

2650 

2651 self.logger.info("Loading reproducibility report from %s", filename) 

2652 with open(filename) as fd: 2652 ↛ 2646line 2652 didn't jump to line 2646 because the continue on line 2654 wasn't executed

2653 if os.fstat(fd.fileno()).st_size < 1: 2653 ↛ 2654line 2653 didn't jump to line 2654 because the condition on line 2653 was never true

2654 continue 

2655 data = json.load(fd) 

2656 

2657 for result in data["records"]: 

2658 if ( 2658 ↛ 2662line 2658 didn't jump to line 2662 because the condition on line 2658 was never true

2659 release := result.get("release") 

2660 ) is not None and release not in valid_release_names: 

2661 # tests do not have a release set 

2662 continue 

2663 

2664 state = ReproducibleState.from_str(result.get("status")) 

2665 repo = { 

2666 key: value 

2667 for key, value in result.items() 

2668 if key in ("build_id", "diffoscope_log_id", "artifact_id") 

2669 } 

2670 

2671 summary.setdefault(result["architecture"], {})[ 

2672 (result["name"], result["version"]) 

2673 ] = ReproducibleData(state, **repo) 

2674 

2675 def _lookup_data( 

2676 self, package_name: str, version: str, arch: str 

2677 ) -> tuple[ReproducibleData, str] | None: 

2678 key = (package_name, version) 

2679 if (repo := self._reproducible[arch].get(key)) is not None: 

2680 return repo, arch 

2681 

2682 repo = self._reproducible["all"].get(key) 

2683 return (repo, "all") if repo is not None else None 

2684 

2685 def _format_link(self, bpid: BinaryPackageId, arch: str) -> str: 

2686 data = self._lookup_data(bpid.package_name, bpid.version, arch) 

2687 assert data is not None 

2688 repo, arch = data 

2689 if repo.diffoscope_log_id and (diff_id := repo.artifact_id): 2689 ↛ 2690line 2689 didn't jump to line 2690 because the condition on line 2689 was never true

2690 endpoint = f"artifacts/{diff_id}/diffoscope" 

2691 else: 

2692 endpoint = "log" 

2693 url = self.options.repro_log_url.format( 

2694 arch=arch, build_id=repo.build_id, endpoint=endpoint 

2695 ) 

2696 return f'<a href="{url}">{bpid.package_name}</a>' 

2697 

2698 def _create_link_to_log(self, arch: str, failed_bpids: set[BinaryPackageId]) -> str: 

2699 if not self.options.repro_log_url: 2699 ↛ 2700line 2699 didn't jump to line 2700 because the condition on line 2699 was never true

2700 return ": " + ", ".join(bpid.package_name for bpid in sorted(failed_bpids)) 

2701 

2702 return ": " + ", ".join( 

2703 self._format_link(bpid, arch) for bpid in sorted(failed_bpids) 

2704 ) 

2705 

2706 def apply_srcarch_policy_impl( 

2707 self, 

2708 policy_info: dict[str, Any], 

2709 arch: str, 

2710 source_data_tdist: SourcePackage | None, 

2711 source_data_srcdist: SourcePackage, 

2712 excuse: "Excuse", 

2713 ) -> PolicyVerdict: 

2714 verdict = PolicyVerdict.PASS 

2715 eligible_for_bounty = False 

2716 all_hints = [] 

2717 

2718 assert self.hints is not None # Needed for type checking / mypy 

2719 

2720 # we don't want to apply this policy (yet) on binNMUs 

2721 if excuse.item.architecture != "source": 2721 ↛ 2722line 2721 didn't jump to line 2722 because the condition on line 2721 was never true

2722 return verdict 

2723 

2724 # we're not supposed to judge on this arch 

2725 if arch not in self.options.repro_arches: 2725 ↛ 2726line 2725 didn't jump to line 2726 because the condition on line 2725 was never true

2726 return verdict 

2727 

2728 # bail out if this arch has no packages for this source (not build 

2729 # here) 

2730 if arch not in excuse.packages: 2730 ↛ 2731line 2730 didn't jump to line 2731 because the condition on line 2730 was never true

2731 return verdict 

2732 

2733 component = get_component(source_data_srcdist.section) 

2734 

2735 if ( 2735 ↛ 2739line 2735 didn't jump to line 2739

2736 self.options.repro_components 

2737 and component not in self.options.repro_components.split() 

2738 ): 

2739 return verdict 

2740 

2741 source_name = excuse.item.package 

2742 

2743 if self.options.repro_excuse_url: 2743 ↛ 2744line 2743 didn't jump to line 2744 because the condition on line 2743 was never true

2744 url = self.options.repro_excuse_url.format( 

2745 package=quote(source_name), arch=arch 

2746 ) 

2747 url_html = ' - <a href="%s">info</a>' % url 

2748 # When run on multiple archs, the last one "wins" 

2749 policy_info["status-url"] = url 

2750 else: 

2751 url = None 

2752 url_html = "" 

2753 

2754 if arch not in self._reproducible: 2754 ↛ 2755line 2754 didn't jump to line 2755 because the condition on line 2754 was never true

2755 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2756 msg = f"No reproducibility data available at all for {arch}" 

2757 excuse.add_verdict_info(verdict, msg) 

2758 return verdict 

2759 if "all" not in self._reproducible: 2759 ↛ 2760line 2759 didn't jump to line 2760 because the condition on line 2759 was never true

2760 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2761 msg = "No reproducibility data available at all for arch:all" 

2762 excuse.add_verdict_info(verdict, msg) 

2763 return verdict 

2764 

2765 # skip/delay policy until both arch:arch and arch:all builds are done 

2766 if (arch or "all") in excuse.missing_builds: 2766 ↛ 2767line 2766 didn't jump to line 2767 because the condition on line 2766 was never true

2767 self.logger.debug( 

2768 "%s not built for %s or all, skipping reproducible policy", 

2769 excuse.name, 

2770 arch, 

2771 ) 

2772 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2773 excuse.add_verdict_info( 

2774 verdict, 

2775 f"Reproducibility check deferred on {arch}: missing builds{url_html}", 

2776 ) 

2777 return verdict 

2778 

2779 source_suite_state = "not-unknown" 

2780 failed_bpids: set[BinaryPackageId] = set() 

2781 # The states should either be GOOD/BAD for all binaries, UNKNOWN for all 

2782 # binaries, or missing for all binaries, but let's not assume that. 

2783 # They can be from different components after all. 

2784 bins_src, src_suite_name = binaries_from_source_version( 

2785 source_data_srcdist, self.suite_info 

2786 ) 

2787 for bpid in bins_src: 

2788 if bpid.architecture not in ("all", arch): 2788 ↛ 2789line 2788 didn't jump to line 2789 because the condition on line 2788 was never true

2789 continue 

2790 in_component = True 

2791 for suite in self.suite_info.source_suites: 

2792 if suite.name == src_suite_name and ( 2792 ↛ 2800line 2792 didn't jump to line 2800 because the condition on line 2792 was never true

2793 ( 

2794 component := get_component( 

2795 suite.all_binaries_in_suite[bpid].section 

2796 ) 

2797 ) 

2798 not in self.options.repro_components 

2799 ): 

2800 self.logger.debug( 

2801 "repro check for %s skipped due to component %s", 

2802 bpid, 

2803 component, 

2804 ) 

2805 in_component = False 

2806 break 

2807 if not in_component: 2807 ↛ 2809line 2807 didn't jump to line 2809 because the condition on line 2807 was never true

2808 # TODO: should we update the excuses text? 

2809 continue 

2810 

2811 if ( 

2812 data := self._lookup_data(bpid.package_name, bpid.version, arch) 

2813 ) is not None: 

2814 pkg_info, _ = data 

2815 self.logger.debug("repro data for %s: %s", bpid, pkg_info.state) 

2816 if pkg_info.state is ReproducibleState.BAD: 

2817 failed_bpids.add(bpid) 

2818 # not changing source_suite_state here on purpose 

2819 elif ( 2819 ↛ 2823line 2819 didn't jump to line 2823

2820 pkg_info.state is ReproducibleState.FAIL 

2821 or pkg_info.state is ReproducibleState.UNKNOWN 

2822 ): 

2823 source_suite_state = "unknown" 

2824 else: 

2825 self.logger.debug("No repro data found for %s", bpid) 

2826 # but maybe it's hinted (e.g. at the time of writing 

2827 # reproduce.debian.net has a bug where udebs go missing) 

2828 if ( 2828 ↛ 2836line 2828 didn't jump to line 2836 because the condition on line 2828 was never true

2829 bpid_hints := self.hints.search_first( 

2830 "ignore-reproducible", 

2831 package=bpid.package_name, 

2832 version=bpid.version, 

2833 architecture=bpid.architecture, 

2834 ) 

2835 ) is not None: 

2836 all_hints.append(bpid_hints) 

2837 self.logger.debug(f"repro: hint found for {source_name}: {bpid}") 

2838 else: 

2839 source_suite_state = "unknown" 

2840 break 

2841 

2842 if source_suite_state == "not-unknown": 

2843 source_suite_state = "known" 

2844 

2845 excuse_info = [] 

2846 if source_suite_state == "unknown": 

2847 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2848 excuse_info.append( 

2849 f"Reproducibility check waiting for results on {arch}{url_html}" 

2850 ) 

2851 policy_info.setdefault("state", {}).setdefault(arch, "unavailable") 

2852 elif failed_bpids: 

2853 ignored_bpids: set[BinaryPackageId] = set() 

2854 if source_data_tdist is None: 2854 ↛ 2855line 2854 didn't jump to line 2855 because the condition on line 2854 was never true

2855 target_suite_state = "new" 

2856 else: 

2857 target_suite_state = "reproducible" 

2858 for bpid in failed_bpids: 

2859 pkg_name = bpid.package_name 

2860 for bpid_t in filter_out_faux_gen(source_data_tdist.binaries): 

2861 if bpid_t.architecture not in ("all", arch): 2861 ↛ 2862line 2861 didn't jump to line 2862 because the condition on line 2861 was never true

2862 continue 

2863 if pkg_name != bpid_t.package_name: 2863 ↛ 2864line 2863 didn't jump to line 2864 because the condition on line 2863 was never true

2864 continue 

2865 if ( 2865 ↛ 2880line 2865 didn't jump to line 2880 because the condition on line 2865 was always true

2866 data := self._lookup_data(pkg_name, bpid_t.version, arch) 

2867 ) is not None: 

2868 pkg_info, _ = data 

2869 self.logger.debug( 

2870 "testing repro data for %s: %s", bpid_t, pkg_info.state 

2871 ) 

2872 if pkg_info.state is ReproducibleState.BAD: 

2873 ignored_bpids.add(bpid) 

2874 elif ( 2874 ↛ 2878line 2874 didn't jump to line 2878

2875 pkg_info.state is ReproducibleState.FAIL 

2876 or pkg_info.state is ReproducibleState.UNKNOWN 

2877 ): 

2878 target_suite_state = "unknown" 

2879 else: 

2880 self.logger.debug( 

2881 "No testing repro data found for %s", bpid_t 

2882 ) 

2883 # This shouldn't happen as for the past migration 

2884 # to have been allowed, there should be data. 

2885 target_suite_state = "unknown" 

2886 break 

2887 

2888 # Reminder: code here is part of the non-reproducibile source-suite branch 

2889 if target_suite_state == "new": 2889 ↛ 2890line 2889 didn't jump to line 2890 because the condition on line 2889 was never true

2890 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2891 excuse_info.append( 

2892 f"New but not reproduced on {arch}{url_html}" 

2893 f"{self._create_link_to_log(arch, failed_bpids)}" 

2894 ) 

2895 policy_info.setdefault("state", {}).setdefault( 

2896 arch, "new but not reproducible" 

2897 ) 

2898 elif target_suite_state == "unknown": 2898 ↛ 2900line 2898 didn't jump to line 2900 because the condition on line 2898 was never true

2899 # Shouldn't happen after initial bootstrap once blocking 

2900 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2901 excuse_info.append( 

2902 f"Reproducibility check failed and now waiting for reference " 

2903 f"results on {arch}{url_html}" 

2904 f"{self._create_link_to_log(arch, failed_bpids)}" 

2905 ) 

2906 policy_info.setdefault("state", {}).setdefault( 

2907 arch, "waiting for reference" 

2908 ) 

2909 elif failed_bpids <= ignored_bpids: 

2910 # For the forseeable future we want to prevent regressions, one day 

2911 # we might want to even block these. 

2912 # verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2913 excuse_info.append( 

2914 f"Not reproduced on {arch} (not a regression)" 

2915 f"{self._create_link_to_log(arch, failed_bpids)}" 

2916 ) 

2917 policy_info.setdefault("state", {}).setdefault(arch, "not reproducible") 

2918 else: 

2919 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2920 excuse_info.append( 

2921 f"Reproducibility regression on {arch}" 

2922 f"{self._create_link_to_log(arch, failed_bpids - ignored_bpids)}" 

2923 ) 

2924 policy_info.setdefault("state", {}).setdefault(arch, "regression") 

2925 

2926 # non-reproducible source-suite cases are handled above, so here we 

2927 # handle the last of the source-suite cases 

2928 else: 

2929 excuse_info.append(f"Reproduced on {arch}{url_html}") 

2930 policy_info.setdefault("state", {}).setdefault(arch, "reproducible") 

2931 eligible_for_bounty = True 

2932 

2933 if verdict.is_rejected: 

2934 for hint_arch in ("source", arch): 

2935 if ( 

2936 ignore_hint := self.hints.search_first( 

2937 "ignore-reproducible-src", 

2938 package=source_name, 

2939 version=source_data_srcdist.version, 

2940 architecture=hint_arch, 

2941 ) 

2942 ) is not None: 

2943 # one hint is enough, take the first one encountered 

2944 verdict = PolicyVerdict.PASS_HINTED 

2945 policy_info.setdefault("hints", {}).setdefault(arch, []).append( 

2946 f"{ignore_hint.user}: {str(ignore_hint)}" 

2947 ) 

2948 if hint_arch == arch: 2948 ↛ 2951line 2948 didn't jump to line 2951 because the condition on line 2948 was always true

2949 on_arch = f" on {arch}" 

2950 else: 

2951 on_arch = "" 

2952 excuse_info.append( 

2953 f"Reproducibility issues ignored for src:{ignore_hint.package}" 

2954 f"{on_arch} as requested by {ignore_hint.user}" 

2955 ) 

2956 break 

2957 

2958 if verdict.is_rejected: 

2959 if source_suite_state == "known": 

2960 check_bpids = failed_bpids - ignored_bpids 

2961 else: 

2962 # Let's not wait for results if all binaries have a hint 

2963 check_bpids = filter_out_faux(source_data_srcdist.binaries) 

2964 missed_bpids = set() 

2965 

2966 for bpid in check_bpids: 

2967 if ( 

2968 bpid_hint := self.hints.search_first( 

2969 "ignore-reproducible", 

2970 package=bpid.package_name, 

2971 version=bpid.version, 

2972 architecture=bpid.architecture, 

2973 ) 

2974 ) is not None: 

2975 # one hint per binary is enough 

2976 all_hints.append(bpid_hint) 

2977 self.logger.debug( 

2978 "repro: hint found for %s: %s", source_name, bpid 

2979 ) 

2980 else: 

2981 missed_bpids.add(bpid) 

2982 

2983 if not missed_bpids: 

2984 verdict = PolicyVerdict.PASS_HINTED 

2985 for hint in all_hints: 

2986 policy_info.setdefault("hints", {}).setdefault(arch, []).append( 

2987 hint.user + ": " + str(hint) 

2988 ) 

2989 # TODO: we're going to print this for arch:all binaries on each arch 

2990 excuse_info.append( 

2991 f"Reproducibility issues ignored for {hint.package} on {arch} as " 

2992 f"requested by {hint.user}" 

2993 ) 

2994 elif all_hints: 2994 ↛ 2995line 2994 didn't jump to line 2995 because the condition on line 2994 was never true

2995 self.logger.info( 

2996 "repro: binary hints for %s ignored as they don't cover these binaries %s", 

2997 source_name, 

2998 missed_bpids, 

2999 ) 

3000 

3001 # A binary without results got hinted 

3002 if not verdict.is_rejected and all_hints: 

3003 for hint in all_hints: 

3004 excuse_info.append( 

3005 f"Reproducibility unknown for {hint.package} but ignored on {arch} as " 

3006 f"requested by {hint.user}" 

3007 ) 

3008 

3009 if self.options.repro_success_bounty and eligible_for_bounty: 3009 ↛ 3010line 3009 didn't jump to line 3010 because the condition on line 3009 was never true

3010 excuse.add_bounty("reproducibility", self.options.repro_success_bounty) 

3011 

3012 if verdict.is_rejected and self.options.repro_regression_penalty: 3012 ↛ 3014line 3012 didn't jump to line 3014 because the condition on line 3012 was never true

3013 # With a non-zero penalty, we shouldn't block on this policy 

3014 verdict = PolicyVerdict.PASS 

3015 if self.options.repro_regression_penalty > 0: 

3016 excuse.add_penalty( 

3017 "reproducibility", self.options.repro_regression_penalty 

3018 ) 

3019 

3020 for msg in excuse_info: 

3021 if verdict.is_rejected: 

3022 excuse.add_verdict_info(verdict, msg) 

3023 else: 

3024 excuse.addinfo(msg) 

3025 

3026 return verdict