Coverage for britney2/policies/policy.py: 91%

1328 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-04-19 18:02 +0000

1import json 

2import logging 

3import optparse 

4import os 

5import re 

6import sys 

7import time 

8from abc import ABC, abstractmethod 

9from collections import defaultdict 

10from collections.abc import Callable, Container 

11from enum import IntEnum, unique 

12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast 

13from urllib.parse import quote 

14 

15import apt_pkg 

16import yaml 

17 

18from britney2 import ( 

19 BinaryPackage, 

20 BinaryPackageId, 

21 DependencyType, 

22 PackageId, 

23 SourcePackage, 

24 Suite, 

25 SuiteClass, 

26 Suites, 

27 TargetSuite, 

28) 

29from britney2.excusedeps import DependencySpec 

30from britney2.hints import ( 

31 Hint, 

32 HintAnnotate, 

33 HintCollection, 

34 HintParser, 

35 HintType, 

36 PolicyHintParserProto, 

37) 

38from britney2.inputs.suiteloader import SuiteContentLoader 

39from britney2.migrationitem import MigrationItem, MigrationItemFactory 

40from britney2.policies import ApplySrcPolicy, PolicyVerdict 

41from britney2.utils import ( 

42 GetDependencySolversProto, 

43 binaries_from_source_version, 

44 compute_reverse_tree, 

45 filter_out_faux, 

46 find_newer_binaries, 

47 get_dependency_solvers, 

48 is_smooth_update_allowed, 

49 parse_option, 

50) 

51 

52if TYPE_CHECKING: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true

53 from ..britney import Britney 

54 from ..excuse import Excuse 

55 from ..installability.universe import BinaryPackageUniverse 

56 

57 

58class PolicyLoadRequest: 

59 __slots__ = ("_options_name", "_default_value", "_policy_constructor") 

60 

61 def __init__( 

62 self, 

63 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

64 options_name: str | None, 

65 default_value: bool, 

66 ) -> None: 

67 self._policy_constructor = policy_constructor 

68 self._options_name = options_name 

69 self._default_value = default_value 

70 

71 def is_enabled(self, options: optparse.Values) -> bool: 

72 if self._options_name is None: 

73 assert self._default_value 

74 return True 

75 actual_value = getattr(options, self._options_name, None) 

76 if actual_value is None: 

77 return self._default_value 

78 return actual_value.lower() in ("yes", "y", "true", "t") 

79 

80 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy": 

81 return self._policy_constructor(options, suite_info) 

82 

83 @classmethod 

84 def always_load( 

85 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"] 

86 ) -> "PolicyLoadRequest": 

87 return cls(policy_constructor, None, True) 

88 

89 @classmethod 

90 def conditionally_load( 

91 cls, 

92 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

93 option_name: str, 

94 default_value: bool, 

95 ) -> "PolicyLoadRequest": 

96 return cls(policy_constructor, option_name, default_value) 

97 

98 

99class PolicyEngine: 

100 def __init__(self) -> None: 

101 self._policies: list["BasePolicy"] = [] 

102 

103 def add_policy(self, policy: "BasePolicy") -> None: 

104 self._policies.append(policy) 

105 

106 def load_policies( 

107 self, 

108 options: optparse.Values, 

109 suite_info: Suites, 

110 policy_load_requests: list[PolicyLoadRequest], 

111 ) -> None: 

112 for policy_load_request in policy_load_requests: 

113 if policy_load_request.is_enabled(options): 

114 self.add_policy(policy_load_request.load(options, suite_info)) 

115 

116 def register_policy_hints(self, hint_parser: HintParser) -> None: 

117 for policy in self._policies: 

118 policy.register_hints(hint_parser) 

119 

120 def initialise(self, britney: "Britney", hints: HintCollection) -> None: 

121 for policy in self._policies: 

122 policy.hints = hints 

123 policy.initialise(britney) 

124 

125 def save_state(self, britney: "Britney") -> None: 

126 for policy in self._policies: 

127 policy.save_state(britney) 

128 

129 def apply_src_policies( 

130 self, 

131 source_t: SourcePackage | None, 

132 source_u: SourcePackage, 

133 excuse: "Excuse", 

134 ) -> None: 

135 excuse_verdict = excuse.policy_verdict 

136 source_suite = excuse.item.suite 

137 suite_class = source_suite.suite_class 

138 for policy in self._policies: 

139 pinfo: dict[str, Any] = {} 

140 policy_verdict = PolicyVerdict.NOT_APPLICABLE 

141 if suite_class in policy.applicable_suites: 

142 if policy.src_policy.run_arch: 

143 for arch in policy.options.architectures: 

144 v = policy.apply_srcarch_policy_impl( 

145 pinfo, arch, source_t, source_u, excuse 

146 ) 

147 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

148 if policy.src_policy.run_src: 

149 v = policy.apply_src_policy_impl(pinfo, source_t, source_u, excuse) 

150 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

151 # The base policy provides this field, so the subclass should leave it blank 

152 assert "verdict" not in pinfo 

153 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

154 excuse.policy_info[policy.policy_id] = pinfo 

155 pinfo["verdict"] = policy_verdict.name 

156 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

157 excuse.policy_verdict = excuse_verdict 

158 

159 def apply_srcarch_policies( 

160 self, 

161 arch: str, 

162 source_t: SourcePackage | None, 

163 source_u: SourcePackage, 

164 excuse: "Excuse", 

165 ) -> None: 

166 excuse_verdict = excuse.policy_verdict 

167 source_suite = excuse.item.suite 

168 suite_class = source_suite.suite_class 

169 for policy in self._policies: 

170 pinfo: dict[str, Any] = {} 

171 if suite_class in policy.applicable_suites: 

172 policy_verdict = policy.apply_srcarch_policy_impl( 

173 pinfo, arch, source_t, source_u, excuse 

174 ) 

175 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

176 # The base policy provides this field, so the subclass should leave it blank 

177 assert "verdict" not in pinfo 

178 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

179 excuse.policy_info[policy.policy_id] = pinfo 

180 pinfo["verdict"] = policy_verdict.name 

181 excuse.policy_verdict = excuse_verdict 

182 

183 

184class BasePolicy(ABC): 

185 britney: "Britney" 

186 policy_id: str 

187 hints: HintCollection | None 

188 applicable_suites: set[SuiteClass] 

189 src_policy: ApplySrcPolicy 

190 options: optparse.Values 

191 suite_info: Suites 

192 

193 def __init__( 

194 self, 

195 options: optparse.Values, 

196 suite_info: Suites, 

197 ) -> None: 

198 """The BasePolicy constructor 

199 

200 :param options: The options member of Britney with all the 

201 config values. 

202 """ 

203 

204 @property 

205 @abstractmethod 

206 def state_dir(self) -> str: ... 206 ↛ exitline 206 didn't return from function 'state_dir' because

207 

208 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover 

209 """Register new hints that this policy accepts 

210 

211 :param hint_parser: (see HintParser.register_hint_type) 

212 """ 

213 

214 def initialise(self, britney: "Britney") -> None: # pragma: no cover 

215 """Called once to make the policy initialise any data structures 

216 

217 This is useful for e.g. parsing files or other "heavy do-once" work. 

218 

219 :param britney: This is the instance of the "Britney" class. 

220 """ 

221 self.britney = britney 

222 

223 def save_state(self, britney: "Britney") -> None: # pragma: no cover 

224 """Called once at the end of the run to make the policy save any persistent data 

225 

226 Note this will *not* be called for "dry-runs" as such runs should not change 

227 the state. 

228 

229 :param britney: This is the instance of the "Britney" class. 

230 """ 

231 

232 def apply_src_policy_impl( 

233 self, 

234 policy_info: dict[str, Any], 

235 source_data_tdist: SourcePackage | None, 

236 source_data_srcdist: SourcePackage, 

237 excuse: "Excuse", 

238 ) -> PolicyVerdict: # pragma: no cover 

239 """Apply a policy on a given source migration 

240 

241 Britney will call this method on a given source package, when 

242 Britney is considering to migrate it from the given source 

243 suite to the target suite. The policy will then evaluate the 

244 the migration and then return a verdict. 

245 

246 :param policy_info: A dictionary of all policy results. The 

247 policy can add a value stored in a key related to its name. 

248 (e.g. policy_info['age'] = {...}). This will go directly into 

249 the "excuses.yaml" output. 

250 

251 :param source_data_tdist: Information about the source package 

252 in the target distribution (e.g. "testing"). This is the 

253 data structure in source_suite.sources[source_name] 

254 

255 :param source_data_srcdist: Information about the source 

256 package in the source distribution (e.g. "unstable" or "tpu"). 

257 This is the data structure in target_suite.sources[source_name] 

258 

259 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

260 """ 

261 return PolicyVerdict.NOT_APPLICABLE 

262 

263 def apply_srcarch_policy_impl( 

264 self, 

265 policy_info: dict[str, Any], 

266 arch: str, 

267 source_data_tdist: SourcePackage | None, 

268 source_data_srcdist: SourcePackage, 

269 excuse: "Excuse", 

270 ) -> PolicyVerdict: 

271 """Apply a policy on a given binary migration 

272 

273 Britney will call this method on binaries from a given source package 

274 on a given architecture, when Britney is considering to migrate them 

275 from the given source suite to the target suite. The policy will then 

276 evaluate the migration and then return a verdict. 

277 

278 :param policy_info: A dictionary of all policy results. The 

279 policy can add a value stored in a key related to its name. 

280 (e.g. policy_info['age'] = {...}). This will go directly into 

281 the "excuses.yaml" output. 

282 

283 :param arch: The architecture the item is applied to. This is mostly 

284 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC 

285 (as that is the only case where arch can differ from item.architecture) 

286 

287 :param source_data_tdist: Information about the source package 

288 in the target distribution (e.g. "testing"). This is the 

289 data structure in source_suite.sources[source_name] 

290 

291 :param source_data_srcdist: Information about the source 

292 package in the source distribution (e.g. "unstable" or "tpu"). 

293 This is the data structure in target_suite.sources[source_name] 

294 

295 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

296 """ 

297 # if the policy doesn't implement this function, assume it's OK 

298 return PolicyVerdict.NOT_APPLICABLE 

299 

300 

301class AbstractBasePolicy(BasePolicy): 

302 """ 

303 A shared abstract class for building BasePolicy objects. 

304 

305 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy 

306 objects with just a two-item constructor, while all other uses of BasePolicy- 

307 derived objects need the 5-item constructor. So AbstractBasePolicy was split 

308 out to document this. 

309 """ 

310 

311 def __init__( 

312 self, 

313 policy_id: str, 

314 options: optparse.Values, 

315 suite_info: Suites, 

316 applicable_suites: set[SuiteClass], 

317 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC, 

318 ) -> None: 

319 """Concrete initializer. 

320 

321 :param policy_id: Identifies the policy. It will 

322 determine the key used for the excuses.yaml etc. 

323 

324 :param options: The options member of Britney with all the 

325 config values. 

326 

327 :param applicable_suites: Where this policy applies. 

328 """ 

329 self.policy_id = policy_id 

330 self.options = options 

331 self.suite_info = suite_info 

332 self.applicable_suites = applicable_suites 

333 self.src_policy = src_policy 

334 self.hints: HintCollection | None = None 

335 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

336 self.logger = logging.getLogger(logger_name) 

337 

338 @property 

339 def state_dir(self) -> str: 

340 return cast(str, self.options.state_dir) 

341 

342 

343_T = TypeVar("_T") 

344 

345 

346class SimplePolicyHint(Hint, Generic[_T]): 

347 def __init__( 

348 self, 

349 user: str, 

350 hint_type: HintType, 

351 policy_parameter: _T, 

352 packages: list[MigrationItem], 

353 ) -> None: 

354 super().__init__(user, hint_type, packages) 

355 self._policy_parameter = policy_parameter 

356 

357 def __eq__(self, other: Any) -> bool: 

358 if self.type != other.type or self._policy_parameter != other._policy_parameter: 

359 return False 

360 return super().__eq__(other) 

361 

362 def str(self) -> str: 

363 return "{} {} {}".format( 

364 self._type, 

365 str(self._policy_parameter), 

366 " ".join(x.name for x in self._packages), 

367 ) 

368 

369 

370class AgeDayHint(SimplePolicyHint[int]): 

371 @property 

372 def days(self) -> int: 

373 return self._policy_parameter 

374 

375 

376class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]): 

377 @property 

378 def ignored_rcbugs(self) -> frozenset[str]: 

379 return self._policy_parameter 

380 

381 

382def simple_policy_hint_parser_function( 

383 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint], 

384 converter: Callable[[str], _T], 

385) -> PolicyHintParserProto: 

386 def f( 

387 mi_factory: MigrationItemFactory, 

388 hints: HintCollection, 

389 who: str, 

390 hint_type: HintType, 

391 *args: str, 

392 ) -> None: 

393 policy_parameter = args[0] 

394 args = args[1:] 

395 for item in mi_factory.parse_items(*args): 

396 hints.add_hint( 

397 class_name(who, hint_type, converter(policy_parameter), [item]) 

398 ) 

399 

400 return f 

401 

402 

403class AgePolicy(AbstractBasePolicy): 

404 """Configurable Aging policy for source migrations 

405 

406 The AgePolicy will let packages stay in the source suite for a pre-defined 

407 amount of days before letting migrate (based on their urgency, if any). 

408 

409 The AgePolicy's decision is influenced by the following: 

410 

411 State files: 

412 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source 

413 packages. Note that urgencies are "sticky" and the most "urgent" urgency 

414 will be used (i.e. the one with lowest age-requirements). 

415 - This file needs to be updated externally, if the policy should take 

416 urgencies into consideration. If empty (or not updated), the policy 

417 will simply use the default urgency (see the "Config" section below) 

418 - In Debian, these values are taken from the .changes file, but that is 

419 not a requirement for Britney. 

420 * ${STATE_DIR}/age-policy-dates: File containing the age of all source 

421 packages. 

422 - The policy will automatically update this file. 

423 Config: 

424 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency 

425 (or for unknown urgencies). Will also be used to set the "minimum" 

426 aging requirements for packages not in the target suite. 

427 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the 

428 given urgency. 

429 - Commonly used urgencies are: low, medium, high, emergency, critical 

430 Hints: 

431 * urgent <source>/<version>: Disregard the age requirements for a given 

432 source/version. 

433 * age-days X <source>/<version>: Set the age requirements for a given 

434 source/version to X days. Note that X can exceed the highest 

435 age-requirement normally given. 

436 

437 """ 

438 

439 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

440 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}) 

441 self._min_days = self._generate_mindays_table() 

442 self._min_days_default = 0 

443 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day) 

444 # NB: _date_now is used in tests 

445 time_now = time.time() 

446 if hasattr(self.options, "fake_runtime"): 

447 time_now = int(self.options.fake_runtime) 

448 self.logger.info("overriding runtime with fake_runtime %d" % time_now) 

449 

450 self._date_now = int(((time_now / (60 * 60)) - 19) / 24) 

451 self._dates: dict[str, tuple[str, int]] = {} 

452 self._urgencies: dict[str, str] = {} 

453 self._default_urgency: str = self.options.default_urgency 

454 self._penalty_immune_urgencies: frozenset[str] = frozenset() 

455 if hasattr(self.options, "no_penalties"): 

456 self._penalty_immune_urgencies = frozenset( 

457 x.strip() for x in self.options.no_penalties.split() 

458 ) 

459 self._bounty_min_age: int | None = None # initialised later 

460 

461 def _generate_mindays_table(self) -> dict[str, int]: 

462 mindays: dict[str, int] = {} 

463 for k in dir(self.options): 

464 if not k.startswith("mindays_"): 

465 continue 

466 v = getattr(self.options, k) 

467 try: 

468 as_days = int(v) 

469 except ValueError: 

470 raise ValueError( 

471 "Unable to parse " 

472 + k 

473 + " as a number of days. Must be 0 or a positive integer" 

474 ) 

475 if as_days < 0: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true

476 raise ValueError( 

477 "The value of " + k + " must be zero or a positive integer" 

478 ) 

479 mindays[k.split("_")[1]] = as_days 

480 return mindays 

481 

482 def register_hints(self, hint_parser: HintParser) -> None: 

483 hint_parser.register_hint_type( 

484 HintType( 

485 "age-days", 

486 simple_policy_hint_parser_function(AgeDayHint, int), 

487 min_args=2, 

488 ) 

489 ) 

490 hint_parser.register_hint_type(HintType("urgent")) 

491 

492 def initialise(self, britney: "Britney") -> None: 

493 super().initialise(britney) 

494 self._read_dates_file() 

495 self._read_urgencies_file() 

496 if self._default_urgency not in self._min_days: # pragma: no cover 

497 raise ValueError( 

498 "Missing age-requirement for default urgency (MINDAYS_%s)" 

499 % self._default_urgency 

500 ) 

501 self._min_days_default = self._min_days[self._default_urgency] 

502 try: 

503 self._bounty_min_age = int(self.options.bounty_min_age) 

504 except ValueError: 504 ↛ 505line 504 didn't jump to line 505 because the exception caught by line 504 didn't happen

505 if self.options.bounty_min_age in self._min_days: 

506 self._bounty_min_age = self._min_days[self.options.bounty_min_age] 

507 else: # pragma: no cover 

508 raise ValueError( 

509 "Please fix BOUNTY_MIN_AGE in the britney configuration" 

510 ) 

511 except AttributeError: 

512 # The option wasn't defined in the configuration 

513 self._bounty_min_age = 0 

514 

515 def save_state(self, britney: "Britney") -> None: 

516 super().save_state(britney) 

517 self._write_dates_file() 

518 

519 def apply_src_policy_impl( 

520 self, 

521 age_info: dict[str, Any], 

522 source_data_tdist: SourcePackage | None, 

523 source_data_srcdist: SourcePackage, 

524 excuse: "Excuse", 

525 ) -> PolicyVerdict: 

526 # retrieve the urgency for the upload, ignoring it if this is a NEW package 

527 # (not present in the target suite) 

528 source_name = excuse.item.package 

529 urgency = self._urgencies.get(source_name, self._default_urgency) 

530 

531 if urgency not in self._min_days: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true

532 age_info["unknown-urgency"] = urgency 

533 urgency = self._default_urgency 

534 

535 if not source_data_tdist: 

536 if self._min_days[urgency] < self._min_days_default: 

537 age_info["urgency-reduced"] = { 

538 "from": urgency, 

539 "to": self._default_urgency, 

540 } 

541 urgency = self._default_urgency 

542 

543 if source_name not in self._dates: 

544 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

545 elif self._dates[source_name][0] != source_data_srcdist.version: 

546 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

547 

548 days_old = self._date_now - self._dates[source_name][1] 

549 min_days = self._min_days[urgency] 

550 for bounty in excuse.bounty: 

551 if excuse.bounty[bounty]: 551 ↛ 550line 551 didn't jump to line 550 because the condition on line 551 was always true

552 self.logger.info( 

553 "Applying bounty for %s granted by %s: %d days", 

554 source_name, 

555 bounty, 

556 excuse.bounty[bounty], 

557 ) 

558 excuse.addinfo( 

559 "Required age reduced by %d days because of %s" 

560 % (excuse.bounty[bounty], bounty) 

561 ) 

562 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen" 

563 min_days -= excuse.bounty[bounty] 

564 if urgency not in self._penalty_immune_urgencies: 

565 for penalty in excuse.penalty: 

566 if excuse.penalty[penalty]: 566 ↛ 565line 566 didn't jump to line 565 because the condition on line 566 was always true

567 self.logger.info( 

568 "Applying penalty for %s given by %s: %d days", 

569 source_name, 

570 penalty, 

571 excuse.penalty[penalty], 

572 ) 

573 excuse.addinfo( 

574 "Required age increased by %d days because of %s" 

575 % (excuse.penalty[penalty], penalty) 

576 ) 

577 assert ( 

578 excuse.penalty[penalty] > 0 

579 ), "negative penalties should be handled earlier" 

580 min_days += excuse.penalty[penalty] 

581 

582 assert self._bounty_min_age is not None 

583 # the age in BOUNTY_MIN_AGE can be higher than the one associated with 

584 # the real urgency, so don't forget to take it into account 

585 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency]) 

586 if min_days < bounty_min_age: 

587 min_days = bounty_min_age 

588 excuse.addinfo( 

589 "Required age is not allowed to drop below %d days" % min_days 

590 ) 

591 

592 age_info["current-age"] = days_old 

593 

594 assert self.hints is not None 

595 for age_days_hint in cast( 

596 "list[AgeDayHint]", 

597 self.hints.search( 

598 "age-days", package=source_name, version=source_data_srcdist.version 

599 ), 

600 ): 

601 new_req = age_days_hint.days 

602 age_info["age-requirement-reduced"] = { 

603 "new-requirement": new_req, 

604 "changed-by": age_days_hint.user, 

605 } 

606 if "original-age-requirement" not in age_info: 606 ↛ 608line 606 didn't jump to line 608 because the condition on line 606 was always true

607 age_info["original-age-requirement"] = min_days 

608 min_days = new_req 

609 

610 age_info["age-requirement"] = min_days 

611 res = PolicyVerdict.PASS 

612 

613 if days_old < min_days: 

614 urgent_hints = self.hints.search( 

615 "urgent", package=source_name, version=source_data_srcdist.version 

616 ) 

617 if urgent_hints: 

618 age_info["age-requirement-reduced"] = { 

619 "new-requirement": 0, 

620 "changed-by": urgent_hints[0].user, 

621 } 

622 res = PolicyVerdict.PASS_HINTED 

623 else: 

624 res = PolicyVerdict.REJECTED_TEMPORARILY 

625 

626 # update excuse 

627 age_hint = age_info.get("age-requirement-reduced", None) 

628 age_min_req = age_info["age-requirement"] 

629 if age_hint: 

630 new_req = age_hint["new-requirement"] 

631 who = age_hint["changed-by"] 

632 if new_req: 

633 excuse.addinfo( 

634 "Overriding age needed from %d days to %d by %s" 

635 % (age_min_req, new_req, who) 

636 ) 

637 age_min_req = new_req 

638 else: 

639 excuse.addinfo("Too young, but urgency pushed by %s" % who) 

640 age_min_req = 0 

641 excuse.setdaysold(age_info["current-age"], age_min_req) 

642 

643 if age_min_req == 0: 

644 excuse.addinfo("%d days old" % days_old) 

645 elif days_old < age_min_req: 

646 excuse.add_verdict_info( 

647 res, "Too young, only %d of %d days old" % (days_old, age_min_req) 

648 ) 

649 else: 

650 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req)) 

651 

652 return res 

653 

654 def _read_dates_file(self) -> None: 

655 """Parse the dates file""" 

656 dates = self._dates 

657 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates") 

658 using_new_name = False 

659 try: 

660 filename = os.path.join(self.state_dir, "age-policy-dates") 

661 if not os.path.exists(filename) and os.path.exists(fallback_filename): 661 ↛ 662line 661 didn't jump to line 662 because the condition on line 661 was never true

662 filename = fallback_filename 

663 else: 

664 using_new_name = True 

665 except AttributeError: 

666 if os.path.exists(fallback_filename): 

667 filename = fallback_filename 

668 else: 

669 raise RuntimeError("Please set STATE_DIR in the britney configuration") 

670 

671 try: 

672 with open(filename, encoding="utf-8") as fd: 

673 for line in fd: 

674 if line.startswith("#"): 

675 # Ignore comment lines (mostly used for tests) 

676 continue 

677 # <source> <version> <date>) 

678 ln = line.split() 

679 if len(ln) != 3: # pragma: no cover 

680 continue 

681 try: 

682 dates[ln[0]] = (ln[1], int(ln[2])) 

683 except ValueError: # pragma: no cover 

684 pass 

685 except FileNotFoundError: 

686 if not using_new_name: 686 ↛ 688line 686 didn't jump to line 688 because the condition on line 686 was never true

687 # If we using the legacy name, then just give up 

688 raise 

689 self.logger.info("%s does not appear to exist. Creating it", filename) 

690 with open(filename, mode="x", encoding="utf-8"): 

691 pass 

692 

693 def _read_urgencies_file(self) -> None: 

694 urgencies = self._urgencies 

695 min_days_default = self._min_days_default 

696 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency") 

697 try: 

698 filename = os.path.join(self.state_dir, "age-policy-urgencies") 

699 if not os.path.exists(filename) and os.path.exists(fallback_filename): 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true

700 filename = fallback_filename 

701 except AttributeError: 

702 filename = fallback_filename 

703 

704 sources_s = self.suite_info.primary_source_suite.sources 

705 sources_t = self.suite_info.target_suite.sources 

706 

707 with open(filename, errors="surrogateescape", encoding="ascii") as fd: 

708 for line in fd: 

709 if line.startswith("#"): 

710 # Ignore comment lines (mostly used for tests) 

711 continue 

712 # <source> <version> <urgency> 

713 ln = line.split() 

714 if len(ln) != 3: 714 ↛ 715line 714 didn't jump to line 715 because the condition on line 714 was never true

715 continue 

716 

717 # read the minimum days associated with the urgencies 

718 urgency_old = urgencies.get(ln[0], None) 

719 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type] 

720 mindays_new = self._min_days.get(ln[2], min_days_default) 

721 

722 # if the new urgency is lower (so the min days are higher), do nothing 

723 if mindays_old <= mindays_new: 

724 continue 

725 

726 # if the package exists in the target suite and it is more recent, do nothing 

727 tsrcv = sources_t.get(ln[0], None) 

728 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0: 

729 continue 

730 

731 # if the package doesn't exist in the primary source suite or it is older, do nothing 

732 usrcv = sources_s.get(ln[0], None) 

733 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 733 ↛ 734line 733 didn't jump to line 734 because the condition on line 733 was never true

734 continue 

735 

736 # update the urgency for the package 

737 urgencies[ln[0]] = ln[2] 

738 

739 def _write_dates_file(self) -> None: 

740 dates = self._dates 

741 try: 

742 directory = self.state_dir 

743 basename = "age-policy-dates" 

744 old_file = os.path.join(self.suite_info.target_suite.path, "Dates") 

745 except AttributeError: 

746 directory = self.suite_info.target_suite.path 

747 basename = "Dates" 

748 old_file = None 

749 filename = os.path.join(directory, basename) 

750 filename_tmp = os.path.join(directory, "%s_new" % basename) 

751 with open(filename_tmp, "w", encoding="utf-8") as fd: 

752 for pkg in sorted(dates): 

753 version, date = dates[pkg] 

754 fd.write("%s %s %d\n" % (pkg, version, date)) 

755 os.rename(filename_tmp, filename) 

756 if old_file is not None and os.path.exists(old_file): 756 ↛ 757line 756 didn't jump to line 757 because the condition on line 756 was never true

757 self.logger.info("Removing old age-policy-dates file %s", old_file) 

758 os.unlink(old_file) 

759 

760 

761class RCBugPolicy(AbstractBasePolicy): 

762 """RC bug regression policy for source migrations 

763 

764 The RCBugPolicy will read provided list of RC bugs and block any 

765 source upload that would introduce a *new* RC bug in the target 

766 suite. 

767 

768 The RCBugPolicy's decision is influenced by the following: 

769 

770 State files: 

771 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in 

772 the given suite (one for both primary source suite and the target sutie is 

773 needed). 

774 - These files need to be updated externally. 

775 """ 

776 

777 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

778 super().__init__( 

779 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

780 ) 

781 self._bugs_source: dict[str, set[str]] | None = None 

782 self._bugs_target: dict[str, set[str]] | None = None 

783 

784 def register_hints(self, hint_parser: HintParser) -> None: 

785 f = simple_policy_hint_parser_function( 

786 IgnoreRCBugHint, lambda x: frozenset(x.split(",")) 

787 ) 

788 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2)) 

789 

790 def initialise(self, britney: "Britney") -> None: 

791 super().initialise(britney) 

792 source_suite = self.suite_info.primary_source_suite 

793 target_suite = self.suite_info.target_suite 

794 fallback_unstable = os.path.join(source_suite.path, "BugsV") 

795 fallback_testing = os.path.join(target_suite.path, "BugsV") 

796 try: 

797 filename_unstable = os.path.join( 

798 self.state_dir, "rc-bugs-%s" % source_suite.name 

799 ) 

800 filename_testing = os.path.join( 

801 self.state_dir, "rc-bugs-%s" % target_suite.name 

802 ) 

803 if ( 803 ↛ 809line 803 didn't jump to line 809

804 not os.path.exists(filename_unstable) 

805 and not os.path.exists(filename_testing) 

806 and os.path.exists(fallback_unstable) 

807 and os.path.exists(fallback_testing) 

808 ): 

809 filename_unstable = fallback_unstable 

810 filename_testing = fallback_testing 

811 except AttributeError: 

812 filename_unstable = fallback_unstable 

813 filename_testing = fallback_testing 

814 self._bugs_source = self._read_bugs(filename_unstable) 

815 self._bugs_target = self._read_bugs(filename_testing) 

816 

817 def apply_src_policy_impl( 

818 self, 

819 rcbugs_info: dict[str, Any], 

820 source_data_tdist: SourcePackage | None, 

821 source_data_srcdist: SourcePackage, 

822 excuse: "Excuse", 

823 ) -> PolicyVerdict: 

824 assert self._bugs_source is not None # for type checking 

825 assert self._bugs_target is not None # for type checking 

826 bugs_t = set() 

827 bugs_s = set() 

828 source_name = excuse.item.package 

829 binaries_s = {x[0] for x in source_data_srcdist.binaries} 

830 try: 

831 binaries_t = {x[0] for x in source_data_tdist.binaries} # type: ignore[union-attr] 

832 except AttributeError: 

833 binaries_t = set() 

834 

835 src_key = f"src:{source_name}" 

836 if source_data_tdist and src_key in self._bugs_target: 

837 bugs_t.update(self._bugs_target[src_key]) 

838 if src_key in self._bugs_source: 

839 bugs_s.update(self._bugs_source[src_key]) 

840 

841 for pkg in binaries_s: 

842 if pkg in self._bugs_source: 

843 bugs_s |= self._bugs_source[pkg] 

844 for pkg in binaries_t: 

845 if pkg in self._bugs_target: 

846 bugs_t |= self._bugs_target[pkg] 

847 

848 # The bts seems to support filing source bugs against a binary of the 

849 # same name if that binary isn't built by any source. An example is bug 

850 # 820347 against Package: juce (in the live-2016-04-11 test). Add those 

851 # bugs too. 

852 if ( 

853 source_name not in (binaries_s | binaries_t) 

854 and source_name 

855 not in { 

856 x.package_name 

857 for x in self.suite_info.primary_source_suite.all_binaries_in_suite.keys() 

858 } 

859 and source_name 

860 not in { 

861 x.package_name 

862 for x in self.suite_info.target_suite.all_binaries_in_suite.keys() 

863 } 

864 ): 

865 if source_name in self._bugs_source: 

866 bugs_s |= self._bugs_source[source_name] 

867 if source_name in self._bugs_target: 867 ↛ 868line 867 didn't jump to line 868 because the condition on line 867 was never true

868 bugs_t |= self._bugs_target[source_name] 

869 

870 # If a package is not in the target suite, it has no RC bugs per 

871 # definition. Unfortunately, it seems that the live-data is 

872 # not always accurate (e.g. live-2011-12-13 suggests that 

873 # obdgpslogger had the same bug in testing and unstable, 

874 # but obdgpslogger was not in testing at that time). 

875 # - For the curious, obdgpslogger was removed on that day 

876 # and the BTS probably had not caught up with that fact. 

877 # (https://tracker.debian.org/news/415935) 

878 assert not bugs_t or source_data_tdist, ( 

879 "%s had bugs in the target suite but is not present" % source_name 

880 ) 

881 

882 verdict = PolicyVerdict.PASS 

883 

884 assert self.hints is not None 

885 for ignore_hint in cast( 

886 list[IgnoreRCBugHint], 

887 self.hints.search( 

888 "ignore-rc-bugs", 

889 package=source_name, 

890 version=source_data_srcdist.version, 

891 ), 

892 ): 

893 ignored_bugs = ignore_hint.ignored_rcbugs 

894 

895 # Only handle one hint for now 

896 if "ignored-bugs" in rcbugs_info: 

897 self.logger.info( 

898 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s", 

899 ignore_hint.user, 

900 source_name, 

901 rcbugs_info["ignored-bugs"]["issued-by"], 

902 ) 

903 continue 

904 if not ignored_bugs.isdisjoint(bugs_s): 904 ↛ 913line 904 didn't jump to line 913 because the condition on line 904 was always true

905 bugs_s -= ignored_bugs 

906 bugs_t -= ignored_bugs 

907 rcbugs_info["ignored-bugs"] = { 

908 "bugs": sorted(ignored_bugs), 

909 "issued-by": ignore_hint.user, 

910 } 

911 verdict = PolicyVerdict.PASS_HINTED 

912 else: 

913 self.logger.info( 

914 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package", 

915 ignore_hint.user, 

916 source_name, 

917 str(ignored_bugs), 

918 ) 

919 

920 rcbugs_info["shared-bugs"] = sorted(bugs_s & bugs_t) 

921 rcbugs_info["unique-source-bugs"] = sorted(bugs_s - bugs_t) 

922 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_s) 

923 

924 # update excuse 

925 new_bugs = rcbugs_info["unique-source-bugs"] 

926 old_bugs = rcbugs_info["unique-target-bugs"] 

927 excuse.setbugs(old_bugs, new_bugs) 

928 

929 if new_bugs: 

930 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

931 excuse.add_verdict_info( 

932 verdict, 

933 "Updating %s would introduce bugs in %s: %s" 

934 % ( 

935 source_name, 

936 self.suite_info.target_suite.name, 

937 ", ".join( 

938 [ 

939 '<a href="https://bugs.debian.org/%s">#%s</a>' 

940 % (quote(a), a) 

941 for a in new_bugs 

942 ] 

943 ), 

944 ), 

945 ) 

946 

947 if old_bugs: 

948 excuse.addinfo( 

949 "Updating %s will fix bugs in %s: %s" 

950 % ( 

951 source_name, 

952 self.suite_info.target_suite.name, 

953 ", ".join( 

954 [ 

955 '<a href="https://bugs.debian.org/%s">#%s</a>' 

956 % (quote(a), a) 

957 for a in old_bugs 

958 ] 

959 ), 

960 ) 

961 ) 

962 

963 return verdict 

964 

965 def _read_bugs(self, filename: str) -> dict[str, set[str]]: 

966 """Read the release critical bug summary from the specified file 

967 

968 The file contains rows with the format: 

969 

970 <package-name> <bug number>[,<bug number>...] 

971 

972 The method returns a dictionary where the key is the binary package 

973 name and the value is the list of open RC bugs for it. 

974 """ 

975 bugs: dict[str, set[str]] = {} 

976 self.logger.info("Loading RC bugs data from %s", filename) 

977 with open(filename, encoding="ascii") as f: 

978 for line in f: 

979 ln = line.split() 

980 if len(ln) != 2: # pragma: no cover 

981 self.logger.warning("Malformed line found in line %s", line) 

982 continue 

983 pkg = ln[0] 

984 if pkg not in bugs: 

985 bugs[pkg] = set() 

986 bugs[pkg].update(ln[1].split(",")) 

987 return bugs 

988 

989 

990class PiupartsPolicy(AbstractBasePolicy): 

991 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

992 super().__init__( 

993 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

994 ) 

995 self._piuparts_source: dict[str, tuple[str, str]] | None = None 

996 self._piuparts_target: dict[str, tuple[str, str]] | None = None 

997 

998 def register_hints(self, hint_parser: HintParser) -> None: 

999 hint_parser.register_hint_type(HintType("ignore-piuparts")) 

1000 

1001 def initialise(self, britney: "Britney") -> None: 

1002 super().initialise(britney) 

1003 source_suite = self.suite_info.primary_source_suite 

1004 target_suite = self.suite_info.target_suite 

1005 try: 

1006 filename_unstable = os.path.join( 

1007 self.state_dir, "piuparts-summary-%s.json" % source_suite.name 

1008 ) 

1009 filename_testing = os.path.join( 

1010 self.state_dir, "piuparts-summary-%s.json" % target_suite.name 

1011 ) 

1012 except AttributeError as e: # pragma: no cover 

1013 raise RuntimeError( 

1014 "Please set STATE_DIR in the britney configuration" 

1015 ) from e 

1016 self._piuparts_source = self._read_piuparts_summary( 

1017 filename_unstable, keep_url=True 

1018 ) 

1019 self._piuparts_target = self._read_piuparts_summary( 

1020 filename_testing, keep_url=False 

1021 ) 

1022 

1023 def apply_src_policy_impl( 

1024 self, 

1025 piuparts_info: dict[str, Any], 

1026 source_data_tdist: SourcePackage | None, 

1027 source_data_srcdist: SourcePackage, 

1028 excuse: "Excuse", 

1029 ) -> PolicyVerdict: 

1030 assert self._piuparts_source is not None # for type checking 

1031 assert self._piuparts_target is not None # for type checking 

1032 source_name = excuse.item.package 

1033 

1034 if source_name in self._piuparts_target: 

1035 testing_state = self._piuparts_target[source_name][0] 

1036 else: 

1037 testing_state = "X" 

1038 url: str | None 

1039 if source_name in self._piuparts_source: 

1040 unstable_state, url = self._piuparts_source[source_name] 

1041 else: 

1042 unstable_state = "X" 

1043 url = None 

1044 url_html = "(no link yet)" 

1045 if url is not None: 

1046 url_html = '<a href="{0}">{0}</a>'.format(url) 

1047 

1048 match unstable_state: 

1049 case "P": 

1050 # Not a regression 

1051 msg = f"Piuparts tested OK - {url_html}" 

1052 result = PolicyVerdict.PASS 

1053 piuparts_info["test-results"] = "pass" 

1054 case "F" if testing_state != "F": 

1055 piuparts_info["test-results"] = "regression" 

1056 msg = f"Piuparts regression - {url_html}" 

1057 result = PolicyVerdict.REJECTED_PERMANENTLY 

1058 case "F": 

1059 piuparts_info["test-results"] = "failed" 

1060 msg = f"Piuparts failure (not a regression) - {url_html}" 

1061 result = PolicyVerdict.PASS 

1062 case "W": 

1063 msg = f"Piuparts check waiting for test results - {url_html}" 

1064 result = PolicyVerdict.REJECTED_TEMPORARILY 

1065 piuparts_info["test-results"] = "waiting-for-test-results" 

1066 case _: 

1067 msg = f"Piuparts can't test {source_name} (not a blocker) - {url_html}" 

1068 piuparts_info["test-results"] = "cannot-be-tested" 

1069 result = PolicyVerdict.PASS 

1070 

1071 if url is not None: 

1072 piuparts_info["piuparts-test-url"] = url 

1073 if result.is_rejected: 

1074 excuse.add_verdict_info(result, msg) 

1075 else: 

1076 excuse.addinfo(msg) 

1077 

1078 if result.is_rejected: 

1079 assert self.hints is not None 

1080 for ignore_hint in self.hints.search( 

1081 "ignore-piuparts", 

1082 package=source_name, 

1083 version=source_data_srcdist.version, 

1084 ): 

1085 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user} 

1086 result = PolicyVerdict.PASS_HINTED 

1087 excuse.addinfo( 

1088 f"Piuparts issue ignored as requested by {ignore_hint.user}" 

1089 ) 

1090 break 

1091 

1092 return result 

1093 

1094 def _read_piuparts_summary( 

1095 self, filename: str, keep_url: bool = True 

1096 ) -> dict[str, tuple[str, str]]: 

1097 summary: dict[str, tuple[str, str]] = {} 

1098 self.logger.info("Loading piuparts report from %s", filename) 

1099 with open(filename) as fd: 1099 ↛ exitline 1099 didn't return from function '_read_piuparts_summary' because the return on line 1101 wasn't executed

1100 if os.fstat(fd.fileno()).st_size < 1: 1100 ↛ 1101line 1100 didn't jump to line 1101 because the condition on line 1100 was never true

1101 return summary 

1102 data = json.load(fd) 

1103 try: 

1104 if ( 

1105 data["_id"] != "Piuparts Package Test Results Summary" 

1106 or data["_version"] != "1.0" 

1107 ): # pragma: no cover 

1108 raise ValueError( 

1109 f"Piuparts results in {filename} does not have the correct ID or version" 

1110 ) 

1111 except KeyError as e: # pragma: no cover 

1112 raise ValueError( 

1113 f"Piuparts results in {filename} is missing id or version field" 

1114 ) from e 

1115 for source, suite_data in data["packages"].items(): 

1116 if len(suite_data) != 1: # pragma: no cover 

1117 raise ValueError( 

1118 f"Piuparts results in {filename}, the source {source} does not have " 

1119 "exactly one result set" 

1120 ) 

1121 item = next(iter(suite_data.values())) 

1122 state, _, url = item 

1123 if not keep_url: 

1124 url = None 

1125 summary[source] = (state, url) 

1126 

1127 return summary 

1128 

1129 

1130class DependsPolicy(AbstractBasePolicy): 

1131 pkg_universe: "BinaryPackageUniverse" 

1132 broken_packages: frozenset["BinaryPackageId"] 

1133 all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

1134 allow_uninst: dict[str, set[str | None]] 

1135 

1136 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1137 super().__init__( 

1138 "depends", 

1139 options, 

1140 suite_info, 

1141 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1142 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1143 ) 

1144 self.nobreakall_arches = None 

1145 self.new_arches = None 

1146 self.break_arches = None 

1147 

1148 def initialise(self, britney: "Britney") -> None: 

1149 super().initialise(britney) 

1150 self.pkg_universe = britney.pkg_universe 

1151 self.broken_packages = self.pkg_universe.broken_packages 

1152 self.all_binaries = britney.all_binaries 

1153 self.nobreakall_arches = self.options.nobreakall_arches 

1154 self.new_arches = self.options.new_arches 

1155 self.break_arches = self.options.break_arches 

1156 self.allow_uninst = britney.allow_uninst 

1157 

1158 def apply_srcarch_policy_impl( 

1159 self, 

1160 deps_info: dict[str, Any], 

1161 arch: str, 

1162 source_data_tdist: SourcePackage | None, 

1163 source_data_srcdist: SourcePackage, 

1164 excuse: "Excuse", 

1165 ) -> PolicyVerdict: 

1166 verdict = PolicyVerdict.PASS 

1167 

1168 assert self.break_arches is not None 

1169 assert self.new_arches is not None 

1170 if arch in self.break_arches or arch in self.new_arches: 

1171 # we don't check these in the policy (TODO - for now?) 

1172 return verdict 

1173 

1174 item = excuse.item 

1175 source_suite = item.suite 

1176 target_suite = self.suite_info.target_suite 

1177 

1178 packages_s_a = source_suite.binaries[arch] 

1179 packages_t_a = target_suite.binaries[arch] 

1180 

1181 my_bins = sorted(filter_out_faux(excuse.packages[arch])) 

1182 

1183 arch_all_installable = set() 

1184 arch_arch_installable = set() 

1185 consider_it_regression = True 

1186 

1187 for pkg_id in my_bins: 

1188 pkg_name = pkg_id.package_name 

1189 binary_u = packages_s_a[pkg_name] 

1190 pkg_arch = binary_u.architecture 

1191 

1192 # in some cases, we want to track the uninstallability of a 

1193 # package (because the autopkgtest policy uses this), but we still 

1194 # want to allow the package to be uninstallable 

1195 skip_dep_check = False 

1196 

1197 if binary_u.source_version != source_data_srcdist.version: 

1198 # don't check cruft in unstable 

1199 continue 

1200 

1201 if item.architecture != "source" and pkg_arch == "all": 

1202 # we don't care about the existing arch: all binaries when 

1203 # checking a binNMU item, because the arch: all binaries won't 

1204 # migrate anyway 

1205 skip_dep_check = True 

1206 

1207 if pkg_arch == "all" and arch not in self.nobreakall_arches: 

1208 skip_dep_check = True 

1209 

1210 if pkg_name in self.allow_uninst[arch]: 1210 ↛ 1213line 1210 didn't jump to line 1213 because the condition on line 1210 was never true

1211 # this binary is allowed to become uninstallable, so we don't 

1212 # need to check anything 

1213 skip_dep_check = True 

1214 

1215 if pkg_name in packages_t_a: 

1216 oldbin = packages_t_a[pkg_name] 

1217 if not target_suite.is_installable(oldbin.pkg_id): 

1218 # as the current binary in testing is already 

1219 # uninstallable, the newer version is allowed to be 

1220 # uninstallable as well, so we don't need to check 

1221 # anything 

1222 skip_dep_check = True 

1223 consider_it_regression = False 

1224 

1225 if pkg_id in self.broken_packages: 

1226 if pkg_arch == "all": 

1227 arch_all_installable.add(False) 

1228 else: 

1229 arch_arch_installable.add(False) 

1230 # dependencies can't be satisfied by all the known binaries - 

1231 # this certainly won't work... 

1232 excuse.add_unsatisfiable_on_arch(arch) 

1233 if skip_dep_check: 

1234 # ...but if the binary is allowed to become uninstallable, 

1235 # we don't care 

1236 # we still want the binary to be listed as uninstallable, 

1237 continue 

1238 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

1239 if pkg_name.endswith("-faux-build-depends"): 1239 ↛ 1240line 1239 didn't jump to line 1240 because the condition on line 1239 was never true

1240 name = pkg_name.replace("-faux-build-depends", "") 

1241 excuse.add_verdict_info( 

1242 verdict, 

1243 f"src:{name} has unsatisfiable build dependency", 

1244 ) 

1245 else: 

1246 excuse.add_verdict_info( 

1247 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency" 

1248 ) 

1249 excuse.addreason("depends") 

1250 else: 

1251 if pkg_arch == "all": 

1252 arch_all_installable.add(True) 

1253 else: 

1254 arch_arch_installable.add(True) 

1255 

1256 if skip_dep_check: 

1257 continue 

1258 

1259 deps = self.pkg_universe.dependencies_of(pkg_id) 

1260 

1261 for dep in deps: 

1262 # dep is a list of packages, each of which satisfy the 

1263 # dependency 

1264 

1265 if dep == frozenset(): 

1266 continue 

1267 is_ok = False 

1268 needed_for_dep = set() 

1269 

1270 for alternative in dep: 

1271 if target_suite.is_pkg_in_the_suite(alternative): 

1272 # dep can be satisfied in testing - ok 

1273 is_ok = True 

1274 elif alternative in my_bins: 

1275 # can be satisfied by binary from same item: will be 

1276 # ok if item migrates 

1277 is_ok = True 

1278 else: 

1279 needed_for_dep.add(alternative) 

1280 

1281 if not is_ok: 

1282 spec = DependencySpec(DependencyType.DEPENDS, arch) 

1283 excuse.add_package_depends(spec, needed_for_dep) 

1284 

1285 # The autopkgtest policy needs delicate trade offs for 

1286 # non-installability. The current choice (considering source 

1287 # migration and only binaries built by the version of the 

1288 # source): 

1289 # 

1290 # * Run autopkgtest if all arch:$arch binaries are installable 

1291 # (but some or all arch:all binaries are not) 

1292 # 

1293 # * Don't schedule nor wait for not installable arch:all only package 

1294 # on ! NOBREAKALL_ARCHES 

1295 # 

1296 # * Run autopkgtest if installability isn't a regression (there are (or 

1297 # rather, should) not be a lot of packages in this state, and most 

1298 # likely they'll just fail quickly) 

1299 # 

1300 # * Don't schedule, but wait otherwise 

1301 if arch_arch_installable == {True} and False in arch_all_installable: 

1302 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1303 elif ( 

1304 arch not in self.nobreakall_arches 

1305 and arch_arch_installable == set() 

1306 and False in arch_all_installable 

1307 ): 

1308 deps_info.setdefault("arch_all_not_installable", []).append(arch) 

1309 elif not consider_it_regression: 

1310 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1311 

1312 return verdict 

1313 

1314 

1315@unique 

1316class BuildDepResult(IntEnum): 

1317 # relation is satisfied in target 

1318 OK = 1 

1319 # relation can be satisfied by other packages in source 

1320 DEPENDS = 2 

1321 # relation cannot be satisfied 

1322 FAILED = 3 

1323 

1324 

1325class BuildDependsPolicy(AbstractBasePolicy): 

1326 

1327 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1328 super().__init__( 

1329 "build-depends", 

1330 options, 

1331 suite_info, 

1332 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1333 ) 

1334 self._all_buildarch: list[str] = [] 

1335 

1336 parse_option(options, "all_buildarch") 

1337 

1338 def initialise(self, britney: "Britney") -> None: 

1339 super().initialise(britney) 

1340 if self.options.all_buildarch: 

1341 self._all_buildarch = SuiteContentLoader.config_str_as_list( 

1342 self.options.all_buildarch, [] 

1343 ) 

1344 

1345 def apply_src_policy_impl( 

1346 self, 

1347 build_deps_info: dict[str, Any], 

1348 source_data_tdist: SourcePackage | None, 

1349 source_data_srcdist: SourcePackage, 

1350 excuse: "Excuse", 

1351 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1352 ) -> PolicyVerdict: 

1353 verdict = PolicyVerdict.PASS 

1354 

1355 # analyze the dependency fields (if present) 

1356 if deps := source_data_srcdist.build_deps_arch: 

1357 v = self._check_build_deps( 

1358 deps, 

1359 DependencyType.BUILD_DEPENDS, 

1360 build_deps_info, 

1361 source_data_tdist, 

1362 source_data_srcdist, 

1363 excuse, 

1364 get_dependency_solvers=get_dependency_solvers, 

1365 ) 

1366 verdict = PolicyVerdict.worst_of(verdict, v) 

1367 

1368 if ideps := source_data_srcdist.build_deps_indep: 

1369 v = self._check_build_deps( 

1370 ideps, 

1371 DependencyType.BUILD_DEPENDS_INDEP, 

1372 build_deps_info, 

1373 source_data_tdist, 

1374 source_data_srcdist, 

1375 excuse, 

1376 get_dependency_solvers=get_dependency_solvers, 

1377 ) 

1378 verdict = PolicyVerdict.worst_of(verdict, v) 

1379 

1380 return verdict 

1381 

1382 def _get_check_archs( 

1383 self, archs: Container[str], dep_type: DependencyType 

1384 ) -> list[str]: 

1385 oos = self.options.outofsync_arches 

1386 

1387 if dep_type == DependencyType.BUILD_DEPENDS: 

1388 return [ 

1389 arch 

1390 for arch in self.options.architectures 

1391 if arch in archs and arch not in oos 

1392 ] 

1393 

1394 # first try the all buildarch 

1395 checkarchs = list(self._all_buildarch) 

1396 # then try the architectures where this source has arch specific 

1397 # binaries (in the order of the architecture config file) 

1398 checkarchs.extend( 

1399 arch 

1400 for arch in self.options.architectures 

1401 if arch in archs and arch not in checkarchs 

1402 ) 

1403 # then try all other architectures 

1404 checkarchs.extend( 

1405 arch for arch in self.options.architectures if arch not in checkarchs 

1406 ) 

1407 

1408 # and drop OUTOFSYNC_ARCHES 

1409 return [arch for arch in checkarchs if arch not in oos] 

1410 

1411 def _add_info_for_arch( 

1412 self, 

1413 arch: str, 

1414 excuses_info: dict[str, list[str]], 

1415 blockers: dict[str, set[BinaryPackageId]], 

1416 results: dict[str, BuildDepResult], 

1417 dep_type: DependencyType, 

1418 target_suite: TargetSuite, 

1419 source_suite: Suite, 

1420 excuse: "Excuse", 

1421 verdict: PolicyVerdict, 

1422 ) -> PolicyVerdict: 

1423 if arch in blockers: 

1424 packages = blockers[arch] 

1425 

1426 # for the solving packages, update the excuse to add the dependencies 

1427 for p in packages: 

1428 if arch not in self.options.break_arches: 1428 ↛ 1427line 1428 didn't jump to line 1427 because the condition on line 1428 was always true

1429 spec = DependencySpec(dep_type, arch) 

1430 excuse.add_package_depends(spec, {p}) 

1431 

1432 if arch in results and results[arch] == BuildDepResult.FAILED: 

1433 verdict = PolicyVerdict.worst_of( 

1434 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1435 ) 

1436 

1437 if arch in excuses_info: 

1438 for excuse_text in excuses_info[arch]: 

1439 if verdict.is_rejected: 1439 ↛ 1442line 1439 didn't jump to line 1442 because the condition on line 1439 was always true

1440 excuse.add_verdict_info(verdict, excuse_text) 

1441 else: 

1442 excuse.addinfo(excuse_text) 

1443 

1444 return verdict 

1445 

1446 def _check_build_deps( 

1447 self, 

1448 deps: str, 

1449 dep_type: DependencyType, 

1450 build_deps_info: dict[str, Any], 

1451 source_data_tdist: SourcePackage | None, 

1452 source_data_srcdist: SourcePackage, 

1453 excuse: "Excuse", 

1454 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1455 ) -> PolicyVerdict: 

1456 verdict = PolicyVerdict.PASS 

1457 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP 

1458 

1459 britney = self.britney 

1460 

1461 # local copies for better performance 

1462 parse_src_depends = apt_pkg.parse_src_depends 

1463 

1464 source_name = excuse.item.package 

1465 source_suite = excuse.item.suite 

1466 target_suite = self.suite_info.target_suite 

1467 binaries_s = source_suite.binaries 

1468 provides_s = source_suite.provides_table 

1469 binaries_t = target_suite.binaries 

1470 provides_t = target_suite.provides_table 

1471 unsat_bd: dict[str, list[str]] = {} 

1472 relevant_archs: set[str] = { 

1473 binary.architecture 

1474 for binary in filter_out_faux(source_data_srcdist.binaries) 

1475 if britney.all_binaries[binary].architecture != "all" 

1476 } 

1477 

1478 excuses_info: dict[str, list[str]] = defaultdict(list) 

1479 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set) 

1480 arch_results = {} 

1481 result_archs = defaultdict(list) 

1482 bestresult = BuildDepResult.FAILED 

1483 check_archs = self._get_check_archs(relevant_archs, dep_type) 

1484 if not check_archs: 

1485 # when the arch list is empty, we check the b-d on any arch, instead of all archs 

1486 # this happens for Build-Depens on a source package that only produces arch: all binaries 

1487 any_arch_ok = True 

1488 check_archs = self._get_check_archs( 

1489 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP 

1490 ) 

1491 

1492 for arch in check_archs: 

1493 # retrieve the binary package from the specified suite and arch 

1494 binaries_s_a = binaries_s[arch] 

1495 provides_s_a = provides_s[arch] 

1496 binaries_t_a = binaries_t[arch] 

1497 provides_t_a = provides_t[arch] 

1498 arch_results[arch] = BuildDepResult.OK 

1499 # for every dependency block (formed as conjunction of disjunction) 

1500 for block_txt in deps.split(","): 

1501 block_list = parse_src_depends(block_txt, False, arch) 

1502 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be 

1503 # filtered out by (e.g.) architecture restrictions. We need to cope with this while 

1504 # keeping block_txt and block aligned. 

1505 if not block_list: 

1506 # Relation is not relevant for this architecture. 

1507 continue 

1508 block = block_list[0] 

1509 # if the block is satisfied in the target suite, then skip the block 

1510 if get_dependency_solvers( 

1511 block, binaries_t_a, provides_t_a, build_depends=True 

1512 ): 

1513 # Satisfied in the target suite; all ok. 

1514 continue 

1515 

1516 # check if the block can be satisfied in the source suite, and list the solving packages 

1517 packages = get_dependency_solvers( 

1518 block, binaries_s_a, provides_s_a, build_depends=True 

1519 ) 

1520 sources = sorted(p.source for p in packages) 

1521 

1522 # if the dependency can be satisfied by the same source package, skip the block: 

1523 # obviously both binary packages will enter the target suite together 

1524 if source_name in sources: 1524 ↛ 1525line 1524 didn't jump to line 1525 because the condition on line 1524 was never true

1525 continue 

1526 

1527 # if no package can satisfy the dependency, add this information to the excuse 

1528 if not packages: 

1529 excuses_info[arch].append( 

1530 "%s unsatisfiable %s on %s: %s" 

1531 % (source_name, dep_type, arch, block_txt.strip()) 

1532 ) 

1533 if arch not in unsat_bd: 1533 ↛ 1535line 1533 didn't jump to line 1535 because the condition on line 1533 was always true

1534 unsat_bd[arch] = [] 

1535 unsat_bd[arch].append(block_txt.strip()) 

1536 arch_results[arch] = BuildDepResult.FAILED 

1537 continue 

1538 

1539 blockers[arch].update(p.pkg_id for p in packages) 

1540 if arch_results[arch] < BuildDepResult.DEPENDS: 

1541 arch_results[arch] = BuildDepResult.DEPENDS 

1542 

1543 if any_arch_ok: 

1544 if arch_results[arch] < bestresult: 

1545 bestresult = arch_results[arch] 

1546 result_archs[arch_results[arch]].append(arch) 

1547 if bestresult == BuildDepResult.OK: 

1548 # we found an architecture where the b-deps-indep are 

1549 # satisfied in the target suite, so we can stop 

1550 break 

1551 

1552 if any_arch_ok: 

1553 arch = result_archs[bestresult][0] 

1554 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}") 

1555 key = "check-%s-on-arch" % dep_type.get_reason() 

1556 build_deps_info[key] = arch 

1557 verdict = self._add_info_for_arch( 

1558 arch, 

1559 excuses_info, 

1560 blockers, 

1561 arch_results, 

1562 dep_type, 

1563 target_suite, 

1564 source_suite, 

1565 excuse, 

1566 verdict, 

1567 ) 

1568 

1569 else: 

1570 for arch in check_archs: 

1571 verdict = self._add_info_for_arch( 

1572 arch, 

1573 excuses_info, 

1574 blockers, 

1575 arch_results, 

1576 dep_type, 

1577 target_suite, 

1578 source_suite, 

1579 excuse, 

1580 verdict, 

1581 ) 

1582 

1583 if unsat_bd: 

1584 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd 

1585 

1586 return verdict 

1587 

1588 

1589class BuiltUsingPolicy(AbstractBasePolicy): 

1590 """Built-Using policy 

1591 

1592 Binaries that incorporate (part of) another source package must list these 

1593 sources under 'Built-Using'. 

1594 

1595 This policy checks if the corresponding sources are available in the 

1596 target suite. If they are not, but they are candidates for migration, a 

1597 dependency is added. 

1598 

1599 If the binary incorporates a newer version of a source, that is not (yet) 

1600 a candidate, we don't want to accept that binary. A rebuild later in the 

1601 primary suite wouldn't fix the issue, because that would incorporate the 

1602 newer version again. 

1603 

1604 If the binary incorporates an older version of the source, a newer version 

1605 will be accepted as a replacement. We assume that this can be fixed by 

1606 rebuilding the binary at some point during the development cycle. 

1607 

1608 Requiring exact version of the source would not be useful in practice. A 

1609 newer upload of that source wouldn't be blocked by this policy, so the 

1610 built-using would be outdated anyway. 

1611 

1612 """ 

1613 

1614 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1615 super().__init__( 

1616 "built-using", 

1617 options, 

1618 suite_info, 

1619 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1620 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1621 ) 

1622 

1623 def initialise(self, britney: "Britney") -> None: 

1624 super().initialise(britney) 

1625 

1626 def apply_srcarch_policy_impl( 

1627 self, 

1628 build_deps_info: dict[str, Any], 

1629 arch: str, 

1630 source_data_tdist: SourcePackage | None, 

1631 source_data_srcdist: SourcePackage, 

1632 excuse: "Excuse", 

1633 ) -> PolicyVerdict: 

1634 verdict = PolicyVerdict.PASS 

1635 

1636 source_suite = excuse.item.suite 

1637 target_suite = self.suite_info.target_suite 

1638 binaries_s = source_suite.binaries 

1639 

1640 def check_bu_in_suite( 

1641 bu_source: str, bu_version: str, source_suite: Suite 

1642 ) -> bool: 

1643 found = False 

1644 if bu_source not in source_suite.sources: 

1645 return found 

1646 s_source = source_suite.sources[bu_source] 

1647 s_ver = s_source.version 

1648 if apt_pkg.version_compare(s_ver, bu_version) >= 0: 

1649 found = True 

1650 dep = PackageId(bu_source, s_ver, "source") 

1651 if arch in self.options.break_arches: 

1652 excuse.add_detailed_info( 

1653 "Ignoring Built-Using for %s/%s on %s" 

1654 % (pkg_name, arch, dep.uvname) 

1655 ) 

1656 else: 

1657 spec = DependencySpec(DependencyType.BUILT_USING, arch) 

1658 excuse.add_package_depends(spec, {dep}) 

1659 excuse.add_detailed_info( 

1660 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}" 

1661 ) 

1662 

1663 return found 

1664 

1665 for pkg_id in sorted( 

1666 x 

1667 for x in filter_out_faux(source_data_srcdist.binaries) 

1668 if x.architecture == arch 

1669 ): 

1670 pkg_name = pkg_id.package_name 

1671 

1672 # retrieve the testing (if present) and unstable corresponding binary packages 

1673 binary_s = binaries_s[arch][pkg_name] 

1674 

1675 for bu in binary_s.builtusing: 

1676 bu_source = bu[0] 

1677 bu_version = bu[1] 

1678 found = False 

1679 if bu_source in target_suite.sources: 

1680 t_source = target_suite.sources[bu_source] 

1681 t_ver = t_source.version 

1682 if apt_pkg.version_compare(t_ver, bu_version) >= 0: 

1683 found = True 

1684 

1685 if not found: 

1686 found = check_bu_in_suite(bu_source, bu_version, source_suite) 

1687 

1688 if not found and source_suite.suite_class.is_additional_source: 

1689 found = check_bu_in_suite( 

1690 bu_source, bu_version, self.suite_info.primary_source_suite 

1691 ) 

1692 

1693 if not found: 

1694 if arch in self.options.break_arches: 

1695 excuse.add_detailed_info( 

1696 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s" 

1697 % (pkg_name, arch, bu_source, bu_version) 

1698 ) 

1699 else: 

1700 verdict = PolicyVerdict.worst_of( 

1701 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1702 ) 

1703 excuse.add_verdict_info( 

1704 verdict, 

1705 "%s/%s has unsatisfiable Built-Using on %s %s" 

1706 % (pkg_name, arch, bu_source, bu_version), 

1707 ) 

1708 

1709 return verdict 

1710 

1711 

1712class BlockPolicy(AbstractBasePolicy): 

1713 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$") 

1714 

1715 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1716 super().__init__( 

1717 "block", 

1718 options, 

1719 suite_info, 

1720 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1721 ) 

1722 self._blockall: dict[str | None, Hint] = {} 

1723 

1724 def initialise(self, britney: "Britney") -> None: 

1725 super().initialise(britney) 

1726 assert self.hints is not None 

1727 for hint in self.hints.search(type="block-all"): 

1728 self._blockall[hint.package] = hint 

1729 

1730 self._key_packages = [] 

1731 if "key" in self._blockall: 

1732 self._key_packages = self._read_key_packages() 

1733 

1734 def _read_key_packages(self) -> list[str]: 

1735 """Read the list of key packages 

1736 

1737 The file contains data in the yaml format : 

1738 

1739 - reason: <something> 

1740 source: <package> 

1741 

1742 The method returns a list of all key packages. 

1743 """ 

1744 filename = os.path.join(self.state_dir, "key_packages.yaml") 

1745 self.logger.info("Loading key packages from %s", filename) 

1746 if os.path.exists(filename): 1746 ↛ 1751line 1746 didn't jump to line 1751 because the condition on line 1746 was always true

1747 with open(filename) as f: 

1748 data = yaml.safe_load(f) 

1749 key_packages = [item["source"] for item in data] 

1750 else: 

1751 self.logger.error( 

1752 "Britney was asked to block key packages, " 

1753 + "but no key_packages.yaml file was found." 

1754 ) 

1755 sys.exit(1) 

1756 

1757 return key_packages 

1758 

1759 def register_hints(self, hint_parser: HintParser) -> None: 

1760 # block related hints are currently defined in hint.py 

1761 pass 

1762 

1763 def _check_blocked( 

1764 self, arch: str, version: str, excuse: "Excuse" 

1765 ) -> PolicyVerdict: 

1766 verdict = PolicyVerdict.PASS 

1767 blocked = {} 

1768 unblocked = {} 

1769 block_info = {} 

1770 source_suite = excuse.item.suite 

1771 suite_name = source_suite.name 

1772 src = excuse.item.package 

1773 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE 

1774 

1775 tooltip = ( 

1776 "please contact %s-release if update is needed" % self.options.distribution 

1777 ) 

1778 

1779 assert self.hints is not None 

1780 shints = self.hints.search(package=src) 

1781 mismatches = False 

1782 r = self.BLOCK_HINT_REGEX 

1783 for hint in shints: 

1784 m = r.match(hint.type) 

1785 if m: 

1786 if m.group(1) == "un": 

1787 assert hint.suite is not None 

1788 if ( 

1789 hint.version != version 

1790 or hint.suite.name != suite_name 

1791 or (hint.architecture != arch and hint.architecture != "source") 

1792 ): 

1793 self.logger.info( 

1794 "hint mismatch: %s %s %s", version, arch, suite_name 

1795 ) 

1796 mismatches = True 

1797 else: 

1798 unblocked[m.group(2)] = hint.user 

1799 excuse.add_hint(hint) 

1800 else: 

1801 # block(-*) hint: only accepts a source, so this will 

1802 # always match 

1803 blocked[m.group(2)] = hint.user 

1804 excuse.add_hint(hint) 

1805 

1806 if "block" not in blocked and is_primary: 

1807 # if there is a specific block hint for this package, we don't 

1808 # check for the general hints 

1809 

1810 if self.options.distribution == "debian": 1810 ↛ 1817line 1810 didn't jump to line 1817 because the condition on line 1810 was always true

1811 url = "https://release.debian.org/testing/freeze_policy.html" 

1812 tooltip = ( 

1813 'Follow the <a href="%s">freeze policy</a> when applying for an unblock' 

1814 % url 

1815 ) 

1816 

1817 if "source" in self._blockall: 

1818 blocked["block"] = self._blockall["source"].user 

1819 excuse.add_hint(self._blockall["source"]) 

1820 elif ( 

1821 "new-source" in self._blockall 

1822 and src not in self.suite_info.target_suite.sources 

1823 ): 

1824 blocked["block"] = self._blockall["new-source"].user 

1825 excuse.add_hint(self._blockall["new-source"]) 

1826 # no tooltip: new sources will probably not be accepted anyway 

1827 block_info["block"] = "blocked by {}: is not in {}".format( 

1828 self._blockall["new-source"].user, 

1829 self.suite_info.target_suite.name, 

1830 ) 

1831 elif "key" in self._blockall and src in self._key_packages: 

1832 blocked["block"] = self._blockall["key"].user 

1833 excuse.add_hint(self._blockall["key"]) 

1834 block_info["block"] = "blocked by {}: is a key package ({})".format( 

1835 self._blockall["key"].user, 

1836 tooltip, 

1837 ) 

1838 elif "no-autopkgtest" in self._blockall: 

1839 if excuse.autopkgtest_results == {"PASS"}: 

1840 if not blocked: 1840 ↛ 1866line 1840 didn't jump to line 1866 because the condition on line 1840 was always true

1841 excuse.addinfo("not blocked: has successful autopkgtest") 

1842 else: 

1843 blocked["block"] = self._blockall["no-autopkgtest"].user 

1844 excuse.add_hint(self._blockall["no-autopkgtest"]) 

1845 if not excuse.autopkgtest_results: 

1846 block_info["block"] = ( 

1847 "blocked by %s: does not have autopkgtest (%s)" 

1848 % ( 

1849 self._blockall["no-autopkgtest"].user, 

1850 tooltip, 

1851 ) 

1852 ) 

1853 else: 

1854 block_info["block"] = ( 

1855 "blocked by %s: autopkgtest not fully successful (%s)" 

1856 % ( 

1857 self._blockall["no-autopkgtest"].user, 

1858 tooltip, 

1859 ) 

1860 ) 

1861 

1862 elif not is_primary: 

1863 blocked["block"] = suite_name 

1864 excuse.needs_approval = True 

1865 

1866 for block_cmd in blocked: 

1867 unblock_cmd = "un" + block_cmd 

1868 if block_cmd in unblocked: 

1869 if is_primary or block_cmd == "block-udeb": 

1870 excuse.addinfo( 

1871 "Ignoring %s request by %s, due to %s request by %s" 

1872 % ( 

1873 block_cmd, 

1874 blocked[block_cmd], 

1875 unblock_cmd, 

1876 unblocked[block_cmd], 

1877 ) 

1878 ) 

1879 else: 

1880 excuse.addinfo("Approved by %s" % (unblocked[block_cmd])) 

1881 else: 

1882 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL 

1883 if is_primary or block_cmd == "block-udeb": 

1884 # redirect people to d-i RM for udeb things: 

1885 if block_cmd == "block-udeb": 

1886 tooltip = "please contact the d-i release manager if an update is needed" 

1887 if block_cmd in block_info: 

1888 info = block_info[block_cmd] 

1889 else: 

1890 info = ( 

1891 "Not touching package due to {} request by {} ({})".format( 

1892 block_cmd, 

1893 blocked[block_cmd], 

1894 tooltip, 

1895 ) 

1896 ) 

1897 excuse.add_verdict_info(verdict, info) 

1898 else: 

1899 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM") 

1900 excuse.addreason("block") 

1901 if mismatches: 

1902 excuse.add_detailed_info( 

1903 "Some hints for %s do not match this item" % src 

1904 ) 

1905 return verdict 

1906 

1907 def apply_src_policy_impl( 

1908 self, 

1909 block_info: dict[str, Any], 

1910 source_data_tdist: SourcePackage | None, 

1911 source_data_srcdist: SourcePackage, 

1912 excuse: "Excuse", 

1913 ) -> PolicyVerdict: 

1914 return self._check_blocked("source", source_data_srcdist.version, excuse) 

1915 

1916 def apply_srcarch_policy_impl( 

1917 self, 

1918 block_info: dict[str, Any], 

1919 arch: str, 

1920 source_data_tdist: SourcePackage | None, 

1921 source_data_srcdist: SourcePackage, 

1922 excuse: "Excuse", 

1923 ) -> PolicyVerdict: 

1924 return self._check_blocked(arch, source_data_srcdist.version, excuse) 

1925 

1926 

1927class BuiltOnBuilddPolicy(AbstractBasePolicy): 

1928 

1929 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1930 super().__init__( 

1931 "builtonbuildd", 

1932 options, 

1933 suite_info, 

1934 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1935 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1936 ) 

1937 self._builtonbuildd: dict[str, Any] = { 

1938 "signerinfo": None, 

1939 } 

1940 

1941 def register_hints(self, hint_parser: HintParser) -> None: 

1942 hint_parser.register_hint_type( 

1943 HintType( 

1944 "allow-archall-maintainer-upload", 

1945 versioned=HintAnnotate.FORBIDDEN, 

1946 ) 

1947 ) 

1948 

1949 def initialise(self, britney: "Britney") -> None: 

1950 super().initialise(britney) 

1951 try: 

1952 filename_signerinfo = os.path.join(self.state_dir, "signers.json") 

1953 except AttributeError as e: # pragma: no cover 

1954 raise RuntimeError( 

1955 "Please set STATE_DIR in the britney configuration" 

1956 ) from e 

1957 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo) 

1958 

1959 def apply_srcarch_policy_impl( 

1960 self, 

1961 buildd_info: dict[str, Any], 

1962 arch: str, 

1963 source_data_tdist: SourcePackage | None, 

1964 source_data_srcdist: SourcePackage, 

1965 excuse: "Excuse", 

1966 ) -> PolicyVerdict: 

1967 verdict = PolicyVerdict.PASS 

1968 signers = self._builtonbuildd["signerinfo"] 

1969 

1970 if "signed-by" not in buildd_info: 

1971 buildd_info["signed-by"] = {} 

1972 

1973 item = excuse.item 

1974 source_suite = item.suite 

1975 

1976 # horrible hard-coding, but currently, we don't keep track of the 

1977 # component when loading the packages files 

1978 component = "main" 

1979 # we use the source component, because a binary in contrib can 

1980 # belong to a source in main 

1981 section = source_data_srcdist.section 

1982 if section.find("/") > -1: 

1983 component = section.split("/")[0] 

1984 

1985 packages_s_a = source_suite.binaries[arch] 

1986 assert self.hints is not None 

1987 

1988 for pkg_id in sorted( 

1989 x 

1990 for x in filter_out_faux(source_data_srcdist.binaries) 

1991 if x.architecture == arch 

1992 ): 

1993 pkg_name = pkg_id.package_name 

1994 binary_u = packages_s_a[pkg_name] 

1995 pkg_arch = binary_u.architecture 

1996 

1997 if binary_u.source_version != source_data_srcdist.version: 1997 ↛ 1998line 1997 didn't jump to line 1998 because the condition on line 1997 was never true

1998 continue 

1999 

2000 if item.architecture != "source" and pkg_arch == "all": 

2001 # we don't care about the existing arch: all binaries when 

2002 # checking a binNMU item, because the arch: all binaries won't 

2003 # migrate anyway 

2004 continue 

2005 

2006 signer = None 

2007 uid = None 

2008 uidinfo = "" 

2009 buildd_ok = False 

2010 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2011 try: 

2012 signer = signers[pkg_name][pkg_id.version][pkg_arch] 

2013 if signer["buildd"]: 

2014 buildd_ok = True 

2015 uid = signer["uid"] 

2016 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}" 

2017 except KeyError: 

2018 self.logger.info( 

2019 "signer info for %s %s (%s) on %s not found " 

2020 % (pkg_name, binary_u.version, pkg_arch, arch) 

2021 ) 

2022 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch) 

2023 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

2024 if not buildd_ok: 

2025 if component != "main": 

2026 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2026 ↛ 2030line 2026 didn't jump to line 2030 because the condition on line 2026 was always true

2027 excuse.add_detailed_info( 

2028 f"{uidinfo}, but package in {component}" 

2029 ) 

2030 buildd_ok = True 

2031 elif pkg_arch == "all": 

2032 allow_hints = self.hints.search( 

2033 "allow-archall-maintainer-upload", package=item.package 

2034 ) 

2035 if allow_hints: 

2036 buildd_ok = True 

2037 verdict = PolicyVerdict.worst_of( 

2038 verdict, PolicyVerdict.PASS_HINTED 

2039 ) 

2040 if pkg_arch not in buildd_info["signed-by"]: 

2041 excuse.addinfo( 

2042 "%s, but whitelisted by %s" 

2043 % (uidinfo, allow_hints[0].user) 

2044 ) 

2045 if not buildd_ok: 

2046 verdict = failure_verdict 

2047 if pkg_arch not in buildd_info["signed-by"]: 

2048 if pkg_arch == "all": 

2049 uidinfo += ( 

2050 ", a new source-only upload is needed to allow migration" 

2051 ) 

2052 excuse.add_verdict_info( 

2053 verdict, "Not built on buildd: %s" % (uidinfo) 

2054 ) 

2055 

2056 if ( 2056 ↛ 2060line 2056 didn't jump to line 2060

2057 pkg_arch in buildd_info["signed-by"] 

2058 and buildd_info["signed-by"][pkg_arch] != uid 

2059 ): 

2060 self.logger.info( 

2061 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed" 

2062 % ( 

2063 pkg_name, 

2064 binary_u.source, 

2065 binary_u.source_version, 

2066 pkg_arch, 

2067 uid, 

2068 buildd_info["signed-by"][pkg_arch], 

2069 ) 

2070 ) 

2071 

2072 buildd_info["signed-by"][pkg_arch] = uid 

2073 

2074 return verdict 

2075 

2076 def _read_signerinfo(self, filename: str) -> dict[str, Any]: 

2077 signerinfo: dict[str, Any] = {} 

2078 self.logger.info("Loading signer info from %s", filename) 

2079 with open(filename) as fd: 2079 ↛ exitline 2079 didn't return from function '_read_signerinfo' because the return on line 2081 wasn't executed

2080 if os.fstat(fd.fileno()).st_size < 1: 2080 ↛ 2081line 2080 didn't jump to line 2081 because the condition on line 2080 was never true

2081 return signerinfo 

2082 signerinfo = json.load(fd) 

2083 

2084 return signerinfo 

2085 

2086 

2087class ImplicitDependencyPolicy(AbstractBasePolicy): 

2088 """Implicit Dependency policy 

2089 

2090 Upgrading a package pkg-a can break the installability of a package pkg-b. 

2091 A newer version (or the removal) of pkg-b might fix the issue. In that 

2092 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only 

2093 migrate if pkg-b also migrates. 

2094 

2095 This policy tries to discover a few common cases, and adds the relevant 

2096 info to the excuses. If another item is needed to fix the 

2097 uninstallability, a dependency is added. If no newer item can fix it, this 

2098 excuse will be blocked. 

2099 

2100 Note that the migration step will check the installability of every 

2101 package, so this policy doesn't need to handle every corner case. It 

2102 must, however, make sure that no excuse is unnecessarily blocked. 

2103 

2104 Some cases that should be detected by this policy: 

2105 

2106 * pkg-a is upgraded from 1.0-1 to 2.0-1, while 

2107 pkg-b has "Depends: pkg-a (<< 2.0)" 

2108 This typically happens if pkg-b has a strict dependency on pkg-a because 

2109 it uses some non-stable internal interface (examples are glibc, 

2110 binutils, python3-defaults, ...) 

2111 

2112 * pkg-a is upgraded from 1.0-1 to 2.0-1, and 

2113 pkg-a 1.0-1 has "Provides: provides-1", 

2114 pkg-a 2.0-1 has "Provides: provides-2", 

2115 pkg-b has "Depends: provides-1" 

2116 This typically happens when pkg-a has an interface that changes between 

2117 versions, and a virtual package is used to identify the version of this 

2118 interface (e.g. perl-api-x.y) 

2119 

2120 """ 

2121 

2122 _pkg_universe: "BinaryPackageUniverse" 

2123 _all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

2124 _allow_uninst: dict[str, set[str | None]] 

2125 _nobreakall_arches: list[str] 

2126 

2127 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2128 super().__init__( 

2129 "implicit-deps", 

2130 options, 

2131 suite_info, 

2132 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2133 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2134 ) 

2135 

2136 def initialise(self, britney: "Britney") -> None: 

2137 super().initialise(britney) 

2138 self._pkg_universe = britney.pkg_universe 

2139 self._all_binaries = britney.all_binaries 

2140 self._smooth_updates = britney.options.smooth_updates 

2141 self._nobreakall_arches = self.options.nobreakall_arches 

2142 self._new_arches = self.options.new_arches 

2143 self._break_arches = self.options.break_arches 

2144 self._allow_uninst = britney.allow_uninst 

2145 self._outofsync_arches = self.options.outofsync_arches 

2146 

2147 def can_be_removed(self, pkg: BinaryPackage) -> bool: 

2148 src = pkg.source 

2149 target_suite = self.suite_info.target_suite 

2150 

2151 # TODO these conditions shouldn't be hardcoded here 

2152 # ideally, we would be able to look up excuses to see if the removal 

2153 # is in there, but in the current flow, this policy is called before 

2154 # all possible excuses exist, so there is no list for us to check 

2155 

2156 if src not in self.suite_info.primary_source_suite.sources: 

2157 # source for pkg not in unstable: candidate for removal 

2158 return True 

2159 

2160 source_t = target_suite.sources[src] 

2161 assert self.hints is not None 

2162 for hint in self.hints.search("remove", package=src, version=source_t.version): 

2163 # removal hint for the source in testing: candidate for removal 

2164 return True 

2165 

2166 if target_suite.is_cruft(pkg): 

2167 # if pkg is cruft in testing, removal will be tried 

2168 return True 

2169 

2170 # the case were the newer version of the source no longer includes the 

2171 # binary (or includes a cruft version of the binary) will be handled 

2172 # separately (in that case there might be an implicit dependency on 

2173 # the newer source) 

2174 

2175 return False 

2176 

2177 def should_skip_rdep( 

2178 self, pkg: BinaryPackage, source_name: str, myarch: str 

2179 ) -> bool: 

2180 target_suite = self.suite_info.target_suite 

2181 

2182 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id): 

2183 # it is not in the target suite, migration cannot break anything 

2184 return True 

2185 

2186 if pkg.source == source_name: 

2187 # if it is built from the same source, it will be upgraded 

2188 # with the source 

2189 return True 

2190 

2191 if self.can_be_removed(pkg): 

2192 # could potentially be removed, so if that happens, it won't be 

2193 # broken 

2194 return True 

2195 

2196 if pkg.architecture == "all" and myarch not in self._nobreakall_arches: 

2197 # arch all on non nobreakarch is allowed to become uninstallable 

2198 return True 

2199 

2200 if pkg.pkg_id.package_name in self._allow_uninst[myarch]: 

2201 # there is a hint to allow this binary to become uninstallable 

2202 return True 

2203 

2204 if not target_suite.is_installable(pkg.pkg_id): 

2205 # it is already uninstallable in the target suite, migration 

2206 # cannot break anything 

2207 return True 

2208 

2209 return False 

2210 

2211 def breaks_installability( 

2212 self, 

2213 pkg_id_t: BinaryPackageId, 

2214 pkg_id_s: BinaryPackageId | None, 

2215 pkg_to_check: BinaryPackageId, 

2216 ) -> bool: 

2217 """ 

2218 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of 

2219 pkg_to_check. 

2220 

2221 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to 

2222 None. 

2223 """ 

2224 

2225 pkg_universe = self._pkg_universe 

2226 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check) 

2227 

2228 for dep in pkg_universe.dependencies_of(pkg_to_check): 

2229 if pkg_id_t not in dep: 

2230 # this depends doesn't have pkg_id_t as alternative, so 

2231 # upgrading pkg_id_t cannot break this dependency clause 

2232 continue 

2233 

2234 # We check all the alternatives for this dependency, to find one 

2235 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s 

2236 found_alternative = False 

2237 for d in dep: 

2238 if d in negative_deps: 

2239 # If this alternative dependency conflicts with 

2240 # pkg_to_check, it cannot be used to satisfy the 

2241 # dependency. 

2242 # This commonly happens when breaks are added to pkg_id_s. 

2243 continue 

2244 

2245 if d.package_name != pkg_id_t.package_name: 

2246 # a binary different from pkg_id_t can satisfy the dep, so 

2247 # upgrading pkg_id_t won't break this dependency 

2248 found_alternative = True 

2249 break 

2250 

2251 if d != pkg_id_s: 

2252 # We want to know the impact of the upgrade of 

2253 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the 

2254 # target suite, any other version of this binary will 

2255 # not be there, so it cannot satisfy this dependency. 

2256 # This includes pkg_id_t, but also other versions. 

2257 continue 

2258 

2259 # pkg_id_s can satisfy the dep 

2260 found_alternative = True 

2261 

2262 if not found_alternative: 

2263 return True 

2264 return False 

2265 

2266 def check_upgrade( 

2267 self, 

2268 pkg_id_t: BinaryPackageId, 

2269 pkg_id_s: BinaryPackageId | None, 

2270 source_name: str, 

2271 myarch: str, 

2272 broken_binaries: set[str], 

2273 excuse: "Excuse", 

2274 ) -> PolicyVerdict: 

2275 verdict = PolicyVerdict.PASS 

2276 

2277 pkg_universe = self._pkg_universe 

2278 all_binaries = self._all_binaries 

2279 

2280 # check all rdeps of the package in testing 

2281 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t) 

2282 

2283 for rdep_pkg in sorted(rdeps_t): 

2284 rdep_p = all_binaries[rdep_pkg] 

2285 

2286 # check some cases where the rdep won't become uninstallable, or 

2287 # where we don't care if it does 

2288 if self.should_skip_rdep(rdep_p, source_name, myarch): 

2289 continue 

2290 

2291 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg): 

2292 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg, 

2293 # there is no implicit dependency 

2294 continue 

2295 

2296 # The upgrade breaks the installability of the rdep. We need to 

2297 # find out if there is a newer version of the rdep that solves the 

2298 # uninstallability. If that is the case, there is an implicit 

2299 # dependency. If not, the upgrade will fail. 

2300 

2301 # check source versions 

2302 newer_versions = find_newer_binaries( 

2303 self.suite_info, rdep_p, add_source_for_dropped_bin=True 

2304 ) 

2305 good_newer_versions = set() 

2306 for npkg, suite in newer_versions: 

2307 if npkg.architecture == "source": 

2308 # When a newer version of the source package doesn't have 

2309 # the binary, we get the source as 'newer version'. In 

2310 # this case, the binary will not be uninstallable if the 

2311 # newer source migrates, because it is no longer there. 

2312 good_newer_versions.add(npkg) 

2313 continue 

2314 assert isinstance(npkg, BinaryPackageId) 

2315 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg): 

2316 good_newer_versions.add(npkg) 

2317 

2318 if good_newer_versions: 

2319 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch) 

2320 excuse.add_package_depends(spec, good_newer_versions) 

2321 else: 

2322 # no good newer versions: no possible solution 

2323 broken_binaries.add(rdep_pkg.name) 

2324 if pkg_id_s: 

2325 action = "migrating {} to {}".format( 

2326 pkg_id_s.name, 

2327 self.suite_info.target_suite.name, 

2328 ) 

2329 else: 

2330 action = "removing {} from {}".format( 

2331 pkg_id_t.name, 

2332 self.suite_info.target_suite.name, 

2333 ) 

2334 if rdep_pkg[0].endswith("-faux-build-depends"): 

2335 name = rdep_pkg[0].replace("-faux-build-depends", "") 

2336 info = f'{action} makes Build-Depends of src:<a href="#{name}">{name}</a> uninstallable' 

2337 else: 

2338 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format( 

2339 action, rdep_pkg.name 

2340 ) 

2341 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2342 excuse.add_verdict_info(verdict, info) 

2343 

2344 return verdict 

2345 

2346 def apply_srcarch_policy_impl( 

2347 self, 

2348 implicit_dep_info: dict[str, Any], 

2349 arch: str, 

2350 source_data_tdist: SourcePackage | None, 

2351 source_data_srcdist: SourcePackage, 

2352 excuse: "Excuse", 

2353 ) -> PolicyVerdict: 

2354 verdict = PolicyVerdict.PASS 

2355 

2356 if not source_data_tdist: 

2357 # this item is not currently in testing: no implicit dependency 

2358 return verdict 

2359 

2360 if excuse.hasreason("missingbuild"): 

2361 # if the build is missing, the policy would treat this as if the 

2362 # binaries would be removed, which would give incorrect (and 

2363 # confusing) info 

2364 info = "missing build, not checking implicit dependencies on %s" % (arch) 

2365 excuse.add_detailed_info(info) 

2366 return verdict 

2367 

2368 source_suite = excuse.item.suite 

2369 source_name = excuse.item.package 

2370 target_suite = self.suite_info.target_suite 

2371 all_binaries = self._all_binaries 

2372 

2373 # we check all binaries for this excuse that are currently in testing 

2374 relevant_binaries = [ 

2375 x 

2376 for x in source_data_tdist.binaries 

2377 if (arch == "source" or x.architecture == arch) 

2378 and x.package_name in target_suite.binaries[x.architecture] 

2379 and x.architecture not in self._new_arches 

2380 and x.architecture not in self._break_arches 

2381 and x.architecture not in self._outofsync_arches 

2382 ] 

2383 

2384 broken_binaries: set[str] = set() 

2385 

2386 assert self.hints is not None 

2387 for pkg_id_t in sorted(relevant_binaries): 

2388 mypkg = pkg_id_t.package_name 

2389 myarch = pkg_id_t.architecture 

2390 binaries_t_a = target_suite.binaries[myarch] 

2391 binaries_s_a = source_suite.binaries[myarch] 

2392 

2393 if target_suite.is_cruft(all_binaries[pkg_id_t]): 

2394 # this binary is cruft in testing: it will stay around as long 

2395 # as necessary to satisfy dependencies, so we don't need to 

2396 # care 

2397 continue 

2398 

2399 if mypkg in binaries_s_a: 

2400 mybin = binaries_s_a[mypkg] 

2401 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id 

2402 if mybin.source != source_name: 

2403 # hijack: this is too complicated to check, so we ignore 

2404 # it (the migration code will check the installability 

2405 # later anyway) 

2406 pass 

2407 elif mybin.source_version != source_data_srcdist.version: 

2408 # cruft in source suite: pretend the binary doesn't exist 

2409 pkg_id_s = None 

2410 elif pkg_id_t == pkg_id_s: 

2411 # same binary (probably arch: all from a binNMU): 

2412 # 'upgrading' doesn't change anything, for this binary, so 

2413 # it won't break anything 

2414 continue 

2415 else: 

2416 pkg_id_s = None 

2417 

2418 if not pkg_id_s and is_smooth_update_allowed( 

2419 binaries_t_a[mypkg], self._smooth_updates, self.hints 

2420 ): 

2421 # the binary isn't in the new version (or is cruft there), and 

2422 # smooth updates are allowed: the binary can stay around if 

2423 # that is necessary to satisfy dependencies, so we don't need 

2424 # to check it 

2425 continue 

2426 

2427 if ( 

2428 not pkg_id_s 

2429 and source_data_tdist.version == source_data_srcdist.version 

2430 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE 

2431 and binaries_t_a[mypkg].architecture == "all" 

2432 ): 

2433 # we're very probably migrating a binNMU built in tpu where the arch:all 

2434 # binaries were not copied to it as that's not needed. This policy could 

2435 # needlessly block. 

2436 continue 

2437 

2438 v = self.check_upgrade( 

2439 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse 

2440 ) 

2441 verdict = PolicyVerdict.worst_of(verdict, v) 

2442 

2443 # each arch is processed separately, so if we already have info from 

2444 # other archs, we need to merge the info from this arch 

2445 broken_old = set(implicit_dep_info.get("broken-binaries", [])) 

2446 implicit_dep_info["broken-binaries"] = sorted(broken_old | broken_binaries) 

2447 

2448 return verdict 

2449 

2450 

2451class ReverseRemovalPolicy(AbstractBasePolicy): 

2452 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2453 super().__init__( 

2454 "reverseremoval", 

2455 options, 

2456 suite_info, 

2457 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2458 ) 

2459 

2460 def register_hints(self, hint_parser: HintParser) -> None: 

2461 hint_parser.register_hint_type(HintType("ignore-reverse-remove")) 

2462 

2463 def initialise(self, britney: "Britney") -> None: 

2464 super().initialise(britney) 

2465 

2466 pkg_universe = britney.pkg_universe 

2467 source_suites = britney.suite_info.source_suites 

2468 target_suite = britney.suite_info.target_suite 

2469 

2470 # Build set of the sources of reverse (Build-) Depends 

2471 assert self.hints is not None 

2472 hints = self.hints.search("remove") 

2473 

2474 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set) 

2475 for hint in hints: 

2476 for item in hint.packages: 

2477 # I think we don't need to look at the target suite 

2478 for src_suite in source_suites: 

2479 try: 

2480 # Explicitly not running filter_out_faux here 

2481 my_bins = set(src_suite.sources[item.uvname].binaries) 

2482 except KeyError: 

2483 continue 

2484 compute_reverse_tree(pkg_universe, my_bins) 

2485 for this_bin in my_bins: 

2486 rev_bin.setdefault(this_bin, set()).add(item.uvname) 

2487 

2488 rev_src: dict[str, set[str]] = defaultdict(set) 

2489 for bin_pkg, reasons in rev_bin.items(): 

2490 # If the pkg is in the target suite, there's nothing this 

2491 # policy wants to do. 

2492 if target_suite.is_pkg_in_the_suite(bin_pkg): 

2493 continue 

2494 that_bin = britney.all_binaries[bin_pkg] 

2495 bin_src = that_bin.source + "/" + that_bin.source_version 

2496 rev_src.setdefault(bin_src, set()).update(reasons) 

2497 self._block_src_for_rm_hint = rev_src 

2498 

2499 def apply_src_policy_impl( 

2500 self, 

2501 rev_remove_info: dict[str, Any], 

2502 source_data_tdist: SourcePackage | None, 

2503 source_data_srcdist: SourcePackage, 

2504 excuse: "Excuse", 

2505 ) -> PolicyVerdict: 

2506 verdict = PolicyVerdict.PASS 

2507 

2508 item = excuse.item 

2509 if item.name in self._block_src_for_rm_hint: 

2510 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name])) 

2511 assert self.hints is not None 

2512 ignore_hints = self.hints.search( 

2513 "ignore-reverse-remove", package=item.uvname, version=item.version 

2514 ) 

2515 excuse.addreason("reverseremoval") 

2516 if ignore_hints: 

2517 excuse.addreason("ignore-reverse-remove") 

2518 excuse.addinfo( 

2519 "Should block migration because of remove hint for %s, but forced by %s" 

2520 % (reason, ignore_hints[0].user) 

2521 ) 

2522 verdict = PolicyVerdict.PASS_HINTED 

2523 else: 

2524 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2525 excuse.add_verdict_info( 

2526 verdict, "Remove hint for (transitive) dependency: %s" % reason 

2527 ) 

2528 

2529 return verdict 

2530 

2531 

2532class ReproduciblePolicy(AbstractBasePolicy): 

2533 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2534 super().__init__( 

2535 "reproducible", 

2536 options, 

2537 suite_info, 

2538 {SuiteClass.PRIMARY_SOURCE_SUITE}, 

2539 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2540 ) 

2541 self._reproducible: dict[str, Any] = {} 

2542 

2543 # Default values for this policy's options 

2544 parse_option(options, "repro_success_bounty", default=0, to_int=True) 

2545 parse_option(options, "repro_regression_penalty", default=0, to_int=True) 

2546 parse_option(options, "repro_log_url") 

2547 parse_option(options, "repro_url") 

2548 parse_option(options, "repro_retry_url") 

2549 parse_option(options, "repro_components") 

2550 

2551 def register_hints(self, hint_parser: HintParser) -> None: 

2552 hint_parser.register_hint_type( 

2553 HintType( 

2554 "ignore-reproducible-src", 

2555 versioned=HintAnnotate.OPTIONAL, 

2556 architectured=HintAnnotate.OPTIONAL, 

2557 ) 

2558 ) 

2559 hint_parser.register_hint_type( 

2560 HintType( 

2561 "ignore-reproducible", 

2562 versioned=HintAnnotate.OPTIONAL, 

2563 architectured=HintAnnotate.OPTIONAL, 

2564 ) 

2565 ) 

2566 

2567 def initialise(self, britney: "Britney") -> None: 

2568 super().initialise(britney) 

2569 summary = self._reproducible 

2570 

2571 assert hasattr( 

2572 self, "state_dir" 

2573 ), "Please set STATE_DIR in the britney configuration" 

2574 assert ( 

2575 self.options.repro_components 

2576 ), "Please set REPRO_COMPONENTS in the britney configuration" 

2577 for file in os.listdir(self.state_dir): 

2578 if not file.startswith("reproducible-"): 2578 ↛ 2579line 2578 didn't jump to line 2579 because the condition on line 2578 was never true

2579 continue 

2580 filename = os.path.join(self.state_dir, file) 

2581 

2582 self.logger.info("Loading reproducibility report from %s", filename) 

2583 with open(filename) as fd: 2583 ↛ 2577line 2583 didn't jump to line 2577 because the continue on line 2585 wasn't executed

2584 if os.fstat(fd.fileno()).st_size < 1: 2584 ↛ 2585line 2584 didn't jump to line 2585 because the condition on line 2584 was never true

2585 continue 

2586 data = json.load(fd) 

2587 

2588 for result in data["records"]: 

2589 if result["component"] in self.options.repro_components: 2589 ↛ 2588line 2589 didn't jump to line 2588 because the condition on line 2589 was always true

2590 summary.setdefault(result["architecture"], {})[ 

2591 (result["name"], result["version"]) 

2592 ] = result 

2593 

2594 def _create_link_to_log( 

2595 self, arch: str, src: str, failed_bpids: set[BinaryPackageId] 

2596 ) -> str: 

2597 link = "" 

2598 for bpid in sorted(failed_bpids): 

2599 link += f"{bpid[0]}, " 

2600 link = link.strip(", ") # remove end 

2601 

2602 if self.options.repro_log_url: 2602 ↛ 2611line 2602 didn't jump to line 2611 because the condition on line 2602 was always true

2603 # log should be the same for all binaries, using the last bpid 

2604 try: 

2605 log = self._reproducible[arch][(bpid[0], bpid[1])]["build_id"] 

2606 except KeyError: 

2607 log = self._reproducible["all"][(bpid[0], bpid[1])]["build_id"] 

2608 url_log = self.options.repro_log_url.format(package=src, arch=arch, log=log) 

2609 return f': <a href="{url_log}">{link}</a>' 

2610 else: 

2611 return ": " + link 

2612 

2613 def apply_srcarch_policy_impl( 

2614 self, 

2615 policy_info: dict[str, Any], 

2616 arch: str, 

2617 source_data_tdist: SourcePackage | None, 

2618 source_data_srcdist: SourcePackage, 

2619 excuse: "Excuse", 

2620 ) -> PolicyVerdict: 

2621 verdict = PolicyVerdict.PASS 

2622 eligible_for_bounty = False 

2623 

2624 # we don't want to apply this policy (yet) on binNMUs 

2625 if excuse.item.architecture != "source": 2625 ↛ 2626line 2625 didn't jump to line 2626 because the condition on line 2625 was never true

2626 return verdict 

2627 

2628 # we're not supposed to judge on this arch 

2629 if arch not in self.options.repro_arches: 2629 ↛ 2630line 2629 didn't jump to line 2630 because the condition on line 2629 was never true

2630 return verdict 

2631 

2632 # bail out if this arch has no packages for this source (not build 

2633 # here) 

2634 if arch not in excuse.packages: 2634 ↛ 2635line 2634 didn't jump to line 2635 because the condition on line 2634 was never true

2635 return verdict 

2636 

2637 # horrible hard-coding, but currently, we don't keep track of the 

2638 # component when loading the packages files 

2639 component = "main" 

2640 if "/" in (section := source_data_srcdist.section): 2640 ↛ 2641line 2640 didn't jump to line 2641 because the condition on line 2640 was never true

2641 component = section.split("/")[0] 

2642 

2643 if ( 2643 ↛ 2647line 2643 didn't jump to line 2647

2644 self.options.repro_components 

2645 and component not in self.options.repro_components.split() 

2646 ): 

2647 return verdict 

2648 

2649 source_name = excuse.item.package 

2650 

2651 if self.options.repro_url: 2651 ↛ 2652line 2651 didn't jump to line 2652 because the condition on line 2651 was never true

2652 url = self.options.repro_url.format(package=quote(source_name), arch=arch) 

2653 url_html = ' - <a href="%s">info</a>' % url 

2654 if self.options.repro_retry_url: 

2655 url_html += ( 

2656 ' <a href="%s">♻ </a>' 

2657 % self.options.repro_retry_url.format( 

2658 package=quote(source_name), arch=arch 

2659 ) 

2660 ) 

2661 # When run on multiple archs, the last one "wins" 

2662 policy_info["status-url"] = url 

2663 else: 

2664 url = None 

2665 url_html = "" 

2666 

2667 try: 

2668 repro = self._reproducible[arch].copy() 

2669 except KeyError: 

2670 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2671 msg = f"No reproducibility data available at all for {arch}" 

2672 excuse.add_verdict_info(verdict, msg) 

2673 return verdict 

2674 try: 

2675 repro |= self._reproducible["all"] 

2676 except KeyError: 

2677 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2678 msg = "No reproducibility data available at all for arch:all" 

2679 excuse.add_verdict_info(verdict, msg) 

2680 return verdict 

2681 

2682 # skip/delay policy until both arch:arch and arch:all builds are done 

2683 if (arch or "all") in excuse.missing_builds: 2683 ↛ 2684line 2683 didn't jump to line 2684 because the condition on line 2683 was never true

2684 self.logger.debug( 

2685 f"{excuse.name} not built for {arch} or all, skipping reproducible policy", 

2686 ) 

2687 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2688 excuse.add_verdict_info( 

2689 verdict, 

2690 f"Reproducibility check deferred on {arch}: missing builds", 

2691 ) 

2692 return verdict 

2693 

2694 source_suite_state = "not-unknown" 

2695 failed_bpids: set[BinaryPackageId] = set() 

2696 # The states should either be GOOD/BAD for all binaries, UNKNOWN for all 

2697 # binaries, or missing for all binaries, but let's not assume that. 

2698 for bpid in binaries_from_source_version(source_data_srcdist, self.suite_info): 

2699 if bpid[2] not in ("all", arch): 2699 ↛ 2700line 2699 didn't jump to line 2700 because the condition on line 2699 was never true

2700 continue 

2701 try: 

2702 state = repro[(bpid[0], bpid[1])]["status"] 

2703 assert state in [ 

2704 "BAD", 

2705 "FAIL", 

2706 "GOOD", 

2707 "UNKNOWN", 

2708 None, 

2709 ], f"Unexpected reproducible state {state}" 

2710 self.logger.debug(f"repro data for {bpid}: {state}") 

2711 if state == "BAD": 

2712 failed_bpids.add(bpid) 

2713 # not changing source_suite_state here on purpose 

2714 elif state is None or state in ("FAIL", "UNKNOWN"): 2714 ↛ 2715line 2714 didn't jump to line 2715 because the condition on line 2714 was never true

2715 source_suite_state = "unknown" 

2716 except KeyError: 

2717 self.logger.debug(f"No repro data found for {bpid}") 

2718 source_suite_state = "unknown" 

2719 break 

2720 

2721 if source_suite_state == "not-unknown": 

2722 source_suite_state = "known" 

2723 

2724 excuse_info = [] 

2725 if source_suite_state == "unknown": 

2726 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2727 excuse_info.append( 

2728 f"Reproducibility check waiting for results on {arch}{url_html}" 

2729 ) 

2730 policy_info.setdefault("state", {}).setdefault(arch, "unavailable") 

2731 elif failed_bpids: 

2732 ignored_bpids: set[BinaryPackageId] = set() 

2733 if source_data_tdist is None: 2733 ↛ 2734line 2733 didn't jump to line 2734 because the condition on line 2733 was never true

2734 target_suite_state = "new" 

2735 else: 

2736 target_suite_state = "reproducible" 

2737 for bpid in list(failed_bpids): 

2738 pkg_name = bpid[0] 

2739 for bpid_t in filter_out_faux(source_data_tdist.binaries): 

2740 if bpid_t[2] not in ("all", arch): 2740 ↛ 2741line 2740 didn't jump to line 2741 because the condition on line 2740 was never true

2741 continue 

2742 if pkg_name != bpid_t[0]: 2742 ↛ 2743line 2742 didn't jump to line 2743 because the condition on line 2742 was never true

2743 continue 

2744 try: 

2745 state = repro[(pkg_name, bpid_t[1])]["status"] 

2746 assert state in [ 

2747 "BAD", 

2748 "FAIL", 

2749 "GOOD", 

2750 "UNKNOWN", 

2751 None, 

2752 ], f"Unexpected reproducible state {state}" 

2753 self.logger.debug( 

2754 f"testing repro data for {bpid_t}: {state}" 

2755 ) 

2756 if state == "BAD": 

2757 ignored_bpids.add(bpid) 

2758 elif state is None or state in ("FAIL", "UNKNOWN"): 2758 ↛ 2759line 2758 didn't jump to line 2759 because the condition on line 2758 was never true

2759 target_suite_state = "unknown" 

2760 except KeyError: 

2761 self.logger.debug( 

2762 f"No testing repro data found for {bpid_t}" 

2763 ) 

2764 # This shouldn't happen as for the past migration 

2765 # to have been allowed, there should be data. 

2766 target_suite_state = "unknown" 

2767 break 

2768 

2769 # Reminder: code here is part of the non-reproducibile source-suite branch 

2770 if target_suite_state == "new": 2770 ↛ 2771line 2770 didn't jump to line 2771 because the condition on line 2770 was never true

2771 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2772 excuse_info.append( 

2773 f"New but not reproduced on {arch}{url_html}" 

2774 f"{self._create_link_to_log(arch, source_name, failed_bpids)}" 

2775 ) 

2776 policy_info.setdefault("state", {}).setdefault( 

2777 arch, "new but not reproducible" 

2778 ) 

2779 elif target_suite_state == "unknown": 2779 ↛ 2781line 2779 didn't jump to line 2781 because the condition on line 2779 was never true

2780 # Shouldn't happen after initial bootstrap once blocking 

2781 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2782 excuse_info.append( 

2783 f"Reproducibility check failed and now waiting for reference " 

2784 f"results on {arch}{url_html}" 

2785 ) 

2786 policy_info.setdefault("state", {}).setdefault( 

2787 arch, "waiting for reference" 

2788 ) 

2789 elif len(failed_bpids - ignored_bpids) == 0: 

2790 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2791 excuse_info.append( 

2792 f"Not reproduced on {arch} (not a regression)" 

2793 f"{self._create_link_to_log(arch, source_name, failed_bpids)}" 

2794 ) 

2795 policy_info.setdefault("state", {}).setdefault(arch, "not reproducible") 

2796 else: 

2797 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2798 excuse_info.append( 

2799 f"Reproducibility regression on {arch}" 

2800 f"{self._create_link_to_log(arch, source_name, failed_bpids - ignored_bpids)}" 

2801 ) 

2802 policy_info.setdefault("state", {}).setdefault(arch, "regression") 

2803 

2804 # non-reproducible source-suite cases are handled above, so here we 

2805 # handle the last of the source-suite cases 

2806 else: 

2807 excuse_info.append(f"Reproduced on {arch}{url_html}") 

2808 policy_info.setdefault("state", {}).setdefault(arch, "reproducible") 

2809 eligible_for_bounty = True 

2810 

2811 if verdict.is_rejected: 

2812 assert self.hints is not None 

2813 for hint_arch in ("source", arch): 

2814 ignore_hints = self.hints.search( 

2815 "ignore-reproducible-src", 

2816 package=source_name, 

2817 version=source_data_srcdist.version, 

2818 architecture=hint_arch, 

2819 ) 

2820 # one hint is enough, take the first one encountered 

2821 if ignore_hints: 

2822 verdict = PolicyVerdict.PASS_HINTED 

2823 policy_info.setdefault("hints", {}).setdefault(arch, []).append( 

2824 ignore_hints[0].user + ": " + str(ignore_hints[0]) 

2825 ) 

2826 if hint_arch == arch: 2826 ↛ 2829line 2826 didn't jump to line 2829 because the condition on line 2826 was always true

2827 on_arch = f" on {arch}" 

2828 else: 

2829 on_arch = "" 

2830 excuse_info.append( 

2831 f"Reproducibility issues ignored for src:{ignore_hints[0].package}" 

2832 f"{on_arch} as requested by {ignore_hints[0].user}" 

2833 ) 

2834 break 

2835 

2836 if verdict.is_rejected: 

2837 all_hints = [] 

2838 all_hinted = True 

2839 any_hinted = False 

2840 

2841 if source_suite_state == "known": 

2842 check_bpids = failed_bpids 

2843 else: 

2844 # Let's not wait for results if all binaries have a hint 

2845 check_bpids = filter_out_faux(source_data_srcdist.binaries) 

2846 missed_bpids = set() 

2847 

2848 for bpid in check_bpids: 

2849 bpid_hints = self.hints.search( 

2850 "ignore-reproducible", 

2851 package=bpid[0], 

2852 version=bpid[1], 

2853 architecture=bpid[2], 

2854 ) 

2855 if bpid_hints: 

2856 # one hint per binary is enough 

2857 all_hints.append(bpid_hints[0]) 

2858 any_hinted = True 

2859 self.logger.debug( 

2860 f"repro: hint found for {source_name}: {bpid}" 

2861 ) 

2862 else: 

2863 missed_bpids.add(bpid) 

2864 all_hinted = False 

2865 

2866 if all_hinted: 

2867 verdict = PolicyVerdict.PASS_HINTED 

2868 for hint in all_hints: 

2869 policy_info.setdefault("hints", {}).setdefault(arch, []).append( 

2870 hint.user + ": " + str(hint) 

2871 ) 

2872 # TODO: we're going to print this for arch:all binaries on each arch 

2873 excuse_info.append( 

2874 f"Reproducibility issues ignored for {hint.package} on {arch} as " 

2875 f"requested by {hint.user}" 

2876 ) 

2877 elif any_hinted: 2877 ↛ 2878line 2877 didn't jump to line 2878 because the condition on line 2877 was never true

2878 self.logger.info( 

2879 f"repro: binary hints for {source_name} ignored as they don't " 

2880 f"cover these binaries {missed_bpids}" 

2881 ) 

2882 

2883 if self.options.repro_success_bounty and eligible_for_bounty: 2883 ↛ 2884line 2883 didn't jump to line 2884 because the condition on line 2883 was never true

2884 excuse.add_bounty("reproducibility", self.options.repro_success_bounty) 

2885 

2886 if verdict.is_rejected and self.options.repro_regression_penalty: 2886 ↛ 2888line 2886 didn't jump to line 2888 because the condition on line 2886 was never true

2887 # With a non-zero penalty, we shouldn't block on this policy 

2888 verdict = PolicyVerdict.PASS 

2889 if self.options.repro_regression_penalty > 0: 

2890 excuse.add_penalty( 

2891 "reproducibility", self.options.repro_regression_penalty 

2892 ) 

2893 

2894 for msg in excuse_info: 

2895 if verdict.is_rejected: 

2896 excuse.add_verdict_info(verdict, msg) 

2897 else: 

2898 excuse.addinfo(msg) 

2899 

2900 return verdict