Coverage for britney2/policies/policy.py: 84%

1240 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-10-17 17:32 +0000

1import json 

2import logging 

3import optparse 

4import os 

5import re 

6import sys 

7import time 

8from abc import ABC, abstractmethod 

9from collections import defaultdict 

10from collections.abc import Callable, Container 

11from enum import IntEnum, unique 

12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast 

13from urllib.parse import quote 

14 

15import apt_pkg 

16import yaml 

17 

18from britney2 import ( 

19 BinaryPackage, 

20 BinaryPackageId, 

21 DependencyType, 

22 PackageId, 

23 SourcePackage, 

24 Suite, 

25 SuiteClass, 

26 Suites, 

27 TargetSuite, 

28) 

29from britney2.excusedeps import DependencySpec 

30from britney2.hints import ( 

31 Hint, 

32 HintAnnotate, 

33 HintCollection, 

34 HintParser, 

35 HintType, 

36 PolicyHintParserProto, 

37) 

38from britney2.inputs.suiteloader import SuiteContentLoader 

39from britney2.migrationitem import MigrationItem, MigrationItemFactory 

40from britney2.policies import ApplySrcPolicy, PolicyVerdict 

41from britney2.utils import ( 

42 GetDependencySolversProto, 

43 compute_reverse_tree, 

44 find_newer_binaries, 

45 get_dependency_solvers, 

46 is_smooth_update_allowed, 

47 parse_option, 

48) 

49 

50if TYPE_CHECKING: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true

51 from ..britney import Britney 

52 from ..excuse import Excuse 

53 from ..installability.universe import BinaryPackageUniverse 

54 

55 

56class PolicyLoadRequest: 

57 __slots__ = ("_options_name", "_default_value", "_policy_constructor") 

58 

59 def __init__( 

60 self, 

61 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

62 options_name: str | None, 

63 default_value: bool, 

64 ) -> None: 

65 self._policy_constructor = policy_constructor 

66 self._options_name = options_name 

67 self._default_value = default_value 

68 

69 def is_enabled(self, options: optparse.Values) -> bool: 

70 if self._options_name is None: 

71 assert self._default_value 

72 return True 

73 actual_value = getattr(options, self._options_name, None) 

74 if actual_value is None: 

75 return self._default_value 

76 return actual_value.lower() in ("yes", "y", "true", "t") 

77 

78 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy": 

79 return self._policy_constructor(options, suite_info) 

80 

81 @classmethod 

82 def always_load( 

83 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"] 

84 ) -> "PolicyLoadRequest": 

85 return cls(policy_constructor, None, True) 

86 

87 @classmethod 

88 def conditionally_load( 

89 cls, 

90 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

91 option_name: str, 

92 default_value: bool, 

93 ) -> "PolicyLoadRequest": 

94 return cls(policy_constructor, option_name, default_value) 

95 

96 

97class PolicyEngine: 

98 def __init__(self) -> None: 

99 self._policies: list["BasePolicy"] = [] 

100 

101 def add_policy(self, policy: "BasePolicy") -> None: 

102 self._policies.append(policy) 

103 

104 def load_policies( 

105 self, 

106 options: optparse.Values, 

107 suite_info: Suites, 

108 policy_load_requests: list[PolicyLoadRequest], 

109 ) -> None: 

110 for policy_load_request in policy_load_requests: 

111 if policy_load_request.is_enabled(options): 

112 self.add_policy(policy_load_request.load(options, suite_info)) 

113 

114 def register_policy_hints(self, hint_parser: HintParser) -> None: 

115 for policy in self._policies: 

116 policy.register_hints(hint_parser) 

117 

118 def initialise(self, britney: "Britney", hints: HintCollection) -> None: 

119 for policy in self._policies: 

120 policy.hints = hints 

121 policy.initialise(britney) 

122 

123 def save_state(self, britney: "Britney") -> None: 

124 for policy in self._policies: 

125 policy.save_state(britney) 

126 

127 def apply_src_policies( 

128 self, 

129 item: MigrationItem, 

130 source_t: SourcePackage | None, 

131 source_u: SourcePackage, 

132 excuse: "Excuse", 

133 ) -> None: 

134 excuse_verdict = excuse.policy_verdict 

135 source_suite = item.suite 

136 suite_class = source_suite.suite_class 

137 for policy in self._policies: 

138 pinfo: dict[str, Any] = {} 

139 policy_verdict = PolicyVerdict.NOT_APPLICABLE 

140 if suite_class in policy.applicable_suites: 

141 if policy.src_policy.run_arch: 

142 for arch in policy.options.architectures: 

143 v = policy.apply_srcarch_policy_impl( 

144 pinfo, item, arch, source_t, source_u, excuse 

145 ) 

146 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

147 if policy.src_policy.run_src: 

148 v = policy.apply_src_policy_impl( 

149 pinfo, item, source_t, source_u, excuse 

150 ) 

151 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

152 # The base policy provides this field, so the subclass should leave it blank 

153 assert "verdict" not in pinfo 

154 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

155 excuse.policy_info[policy.policy_id] = pinfo 

156 pinfo["verdict"] = policy_verdict.name 

157 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

158 excuse.policy_verdict = excuse_verdict 

159 

160 def apply_srcarch_policies( 

161 self, 

162 item: MigrationItem, 

163 arch: str, 

164 source_t: SourcePackage | None, 

165 source_u: SourcePackage, 

166 excuse: "Excuse", 

167 ) -> None: 

168 excuse_verdict = excuse.policy_verdict 

169 source_suite = item.suite 

170 suite_class = source_suite.suite_class 

171 for policy in self._policies: 

172 pinfo: dict[str, Any] = {} 

173 if suite_class in policy.applicable_suites: 

174 policy_verdict = policy.apply_srcarch_policy_impl( 

175 pinfo, item, arch, source_t, source_u, excuse 

176 ) 

177 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

178 # The base policy provides this field, so the subclass should leave it blank 

179 assert "verdict" not in pinfo 

180 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

181 excuse.policy_info[policy.policy_id] = pinfo 

182 pinfo["verdict"] = policy_verdict.name 

183 excuse.policy_verdict = excuse_verdict 

184 

185 

186class BasePolicy(ABC): 

187 britney: "Britney" 

188 policy_id: str 

189 hints: HintCollection | None 

190 applicable_suites: set[SuiteClass] 

191 src_policy: ApplySrcPolicy 

192 options: optparse.Values 

193 suite_info: Suites 

194 

195 def __init__( 

196 self, 

197 options: optparse.Values, 

198 suite_info: Suites, 

199 ) -> None: 

200 """The BasePolicy constructor 

201 

202 :param options: The options member of Britney with all the 

203 config values. 

204 """ 

205 

206 @property 

207 @abstractmethod 

208 def state_dir(self) -> str: ... 208 ↛ exitline 208 didn't return from function 'state_dir' because

209 

210 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover 

211 """Register new hints that this policy accepts 

212 

213 :param hint_parser: (see HintParser.register_hint_type) 

214 """ 

215 

216 def initialise(self, britney: "Britney") -> None: # pragma: no cover 

217 """Called once to make the policy initialise any data structures 

218 

219 This is useful for e.g. parsing files or other "heavy do-once" work. 

220 

221 :param britney: This is the instance of the "Britney" class. 

222 """ 

223 self.britney = britney 

224 

225 def save_state(self, britney: "Britney") -> None: # pragma: no cover 

226 """Called once at the end of the run to make the policy save any persistent data 

227 

228 Note this will *not* be called for "dry-runs" as such runs should not change 

229 the state. 

230 

231 :param britney: This is the instance of the "Britney" class. 

232 """ 

233 

234 def apply_src_policy_impl( 

235 self, 

236 policy_info: dict[str, Any], 

237 item: MigrationItem, 

238 source_data_tdist: SourcePackage | None, 

239 source_data_srcdist: SourcePackage, 

240 excuse: "Excuse", 

241 ) -> PolicyVerdict: # pragma: no cover 

242 """Apply a policy on a given source migration 

243 

244 Britney will call this method on a given source package, when 

245 Britney is considering to migrate it from the given source 

246 suite to the target suite. The policy will then evaluate the 

247 the migration and then return a verdict. 

248 

249 :param policy_info: A dictionary of all policy results. The 

250 policy can add a value stored in a key related to its name. 

251 (e.g. policy_info['age'] = {...}). This will go directly into 

252 the "excuses.yaml" output. 

253 

254 :param item: The migration item the policy is applied to. 

255 

256 :param source_data_tdist: Information about the source package 

257 in the target distribution (e.g. "testing"). This is the 

258 data structure in source_suite.sources[source_name] 

259 

260 :param source_data_srcdist: Information about the source 

261 package in the source distribution (e.g. "unstable" or "tpu"). 

262 This is the data structure in target_suite.sources[source_name] 

263 

264 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

265 """ 

266 return PolicyVerdict.NOT_APPLICABLE 

267 

268 def apply_srcarch_policy_impl( 

269 self, 

270 policy_info: dict[str, Any], 

271 item: MigrationItem, 

272 arch: str, 

273 source_data_tdist: SourcePackage | None, 

274 source_data_srcdist: SourcePackage, 

275 excuse: "Excuse", 

276 ) -> PolicyVerdict: 

277 """Apply a policy on a given binary migration 

278 

279 Britney will call this method on binaries from a given source package 

280 on a given architecture, when Britney is considering to migrate them 

281 from the given source suite to the target suite. The policy will then 

282 evaluate the migration and then return a verdict. 

283 

284 :param policy_info: A dictionary of all policy results. The 

285 policy can add a value stored in a key related to its name. 

286 (e.g. policy_info['age'] = {...}). This will go directly into 

287 the "excuses.yaml" output. 

288 

289 :param item: The migration item the policy is applied to. 

290 

291 :param arch: The architecture the item is applied to. This is mostly 

292 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC 

293 (as that is the only case where arch can differ from item.architecture) 

294 

295 :param source_data_tdist: Information about the source package 

296 in the target distribution (e.g. "testing"). This is the 

297 data structure in source_suite.sources[source_name] 

298 

299 :param source_data_srcdist: Information about the source 

300 package in the source distribution (e.g. "unstable" or "tpu"). 

301 This is the data structure in target_suite.sources[source_name] 

302 

303 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

304 """ 

305 # if the policy doesn't implement this function, assume it's OK 

306 return PolicyVerdict.NOT_APPLICABLE 

307 

308 

309class AbstractBasePolicy(BasePolicy): 

310 """ 

311 A shared abstract class for building BasePolicy objects. 

312 

313 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy 

314 objects with just a two-item constructor, while all other uses of BasePolicy- 

315 derived objects need the 5-item constructor. So AbstractBasePolicy was split 

316 out to document this. 

317 """ 

318 

319 def __init__( 

320 self, 

321 policy_id: str, 

322 options: optparse.Values, 

323 suite_info: Suites, 

324 applicable_suites: set[SuiteClass], 

325 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC, 

326 ) -> None: 

327 """Concrete initializer. 

328 

329 :param policy_id: Identifies the policy. It will 

330 determine the key used for the excuses.yaml etc. 

331 

332 :param options: The options member of Britney with all the 

333 config values. 

334 

335 :param applicable_suites: Where this policy applies. 

336 """ 

337 self.policy_id = policy_id 

338 self.options = options 

339 self.suite_info = suite_info 

340 self.applicable_suites = applicable_suites 

341 self.src_policy = src_policy 

342 self.hints: HintCollection | None = None 

343 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

344 self.logger = logging.getLogger(logger_name) 

345 

346 @property 

347 def state_dir(self) -> str: 

348 return cast(str, self.options.state_dir) 

349 

350 

351_T = TypeVar("_T") 

352 

353 

354class SimplePolicyHint(Hint, Generic[_T]): 

355 def __init__( 

356 self, 

357 user: str, 

358 hint_type: HintType, 

359 policy_parameter: _T, 

360 packages: list[MigrationItem], 

361 ) -> None: 

362 super().__init__(user, hint_type, packages) 

363 self._policy_parameter = policy_parameter 

364 

365 def __eq__(self, other: Any) -> bool: 

366 if self.type != other.type or self._policy_parameter != other._policy_parameter: 

367 return False 

368 return super().__eq__(other) 

369 

370 def str(self) -> str: 

371 return "{} {} {}".format( 

372 self._type, 

373 str(self._policy_parameter), 

374 " ".join(x.name for x in self._packages), 

375 ) 

376 

377 

378class AgeDayHint(SimplePolicyHint[int]): 

379 @property 

380 def days(self) -> int: 

381 return self._policy_parameter 

382 

383 

384class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]): 

385 @property 

386 def ignored_rcbugs(self) -> frozenset[str]: 

387 return self._policy_parameter 

388 

389 

390def simple_policy_hint_parser_function( 

391 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint], 

392 converter: Callable[[str], _T], 

393) -> PolicyHintParserProto: 

394 def f( 

395 mi_factory: MigrationItemFactory, 

396 hints: HintCollection, 

397 who: str, 

398 hint_type: HintType, 

399 *args: str, 

400 ) -> None: 

401 policy_parameter = args[0] 

402 args = args[1:] 

403 for item in mi_factory.parse_items(*args): 

404 hints.add_hint( 

405 class_name(who, hint_type, converter(policy_parameter), [item]) 

406 ) 

407 

408 return f 

409 

410 

411class AgePolicy(AbstractBasePolicy): 

412 """Configurable Aging policy for source migrations 

413 

414 The AgePolicy will let packages stay in the source suite for a pre-defined 

415 amount of days before letting migrate (based on their urgency, if any). 

416 

417 The AgePolicy's decision is influenced by the following: 

418 

419 State files: 

420 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source 

421 packages. Note that urgencies are "sticky" and the most "urgent" urgency 

422 will be used (i.e. the one with lowest age-requirements). 

423 - This file needs to be updated externally, if the policy should take 

424 urgencies into consideration. If empty (or not updated), the policy 

425 will simply use the default urgency (see the "Config" section below) 

426 - In Debian, these values are taken from the .changes file, but that is 

427 not a requirement for Britney. 

428 * ${STATE_DIR}/age-policy-dates: File containing the age of all source 

429 packages. 

430 - The policy will automatically update this file. 

431 Config: 

432 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency 

433 (or for unknown urgencies). Will also be used to set the "minimum" 

434 aging requirements for packages not in the target suite. 

435 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the 

436 given urgency. 

437 - Commonly used urgencies are: low, medium, high, emergency, critical 

438 Hints: 

439 * urgent <source>/<version>: Disregard the age requirements for a given 

440 source/version. 

441 * age-days X <source>/<version>: Set the age requirements for a given 

442 source/version to X days. Note that X can exceed the highest 

443 age-requirement normally given. 

444 

445 """ 

446 

447 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

448 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}) 

449 self._min_days = self._generate_mindays_table() 

450 self._min_days_default = 0 

451 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day) 

452 # NB: _date_now is used in tests 

453 time_now = time.time() 

454 if hasattr(self.options, "fake_runtime"): 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true

455 time_now = int(self.options.fake_runtime) 

456 self.logger.info("overriding runtime with fake_runtime %d" % time_now) 

457 

458 self._date_now = int(((time_now / (60 * 60)) - 19) / 24) 

459 self._dates: dict[str, tuple[str, int]] = {} 

460 self._urgencies: dict[str, str] = {} 

461 self._default_urgency: str = self.options.default_urgency 

462 self._penalty_immune_urgencies: frozenset[str] = frozenset() 

463 if hasattr(self.options, "no_penalties"): 

464 self._penalty_immune_urgencies = frozenset( 

465 x.strip() for x in self.options.no_penalties.split() 

466 ) 

467 self._bounty_min_age: int | None = None # initialised later 

468 

469 def _generate_mindays_table(self) -> dict[str, int]: 

470 mindays: dict[str, int] = {} 

471 for k in dir(self.options): 

472 if not k.startswith("mindays_"): 

473 continue 

474 v = getattr(self.options, k) 

475 try: 

476 as_days = int(v) 

477 except ValueError: 

478 raise ValueError( 

479 "Unable to parse " 

480 + k 

481 + " as a number of days. Must be 0 or a positive integer" 

482 ) 

483 if as_days < 0: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 raise ValueError( 

485 "The value of " + k + " must be zero or a positive integer" 

486 ) 

487 mindays[k.split("_")[1]] = as_days 

488 return mindays 

489 

490 def register_hints(self, hint_parser: HintParser) -> None: 

491 hint_parser.register_hint_type( 

492 HintType( 

493 "age-days", 

494 simple_policy_hint_parser_function(AgeDayHint, int), 

495 min_args=2, 

496 ) 

497 ) 

498 hint_parser.register_hint_type(HintType("urgent")) 

499 

500 def initialise(self, britney: "Britney") -> None: 

501 super().initialise(britney) 

502 self._read_dates_file() 

503 self._read_urgencies_file() 

504 if self._default_urgency not in self._min_days: # pragma: no cover 

505 raise ValueError( 

506 "Missing age-requirement for default urgency (MINDAYS_%s)" 

507 % self._default_urgency 

508 ) 

509 self._min_days_default = self._min_days[self._default_urgency] 

510 try: 

511 self._bounty_min_age = int(self.options.bounty_min_age) 

512 except ValueError: 512 ↛ 513line 512 didn't jump to line 513 because the exception caught by line 512 didn't happen

513 if self.options.bounty_min_age in self._min_days: 

514 self._bounty_min_age = self._min_days[self.options.bounty_min_age] 

515 else: # pragma: no cover 

516 raise ValueError( 

517 "Please fix BOUNTY_MIN_AGE in the britney configuration" 

518 ) 

519 except AttributeError: 

520 # The option wasn't defined in the configuration 

521 self._bounty_min_age = 0 

522 

523 def save_state(self, britney: "Britney") -> None: 

524 super().save_state(britney) 

525 self._write_dates_file() 

526 

527 def apply_src_policy_impl( 

528 self, 

529 age_info: dict[str, Any], 

530 item: MigrationItem, 

531 source_data_tdist: SourcePackage | None, 

532 source_data_srcdist: SourcePackage, 

533 excuse: "Excuse", 

534 ) -> PolicyVerdict: 

535 # retrieve the urgency for the upload, ignoring it if this is a NEW package 

536 # (not present in the target suite) 

537 source_name = item.package 

538 urgency = self._urgencies.get(source_name, self._default_urgency) 

539 

540 if urgency not in self._min_days: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true

541 age_info["unknown-urgency"] = urgency 

542 urgency = self._default_urgency 

543 

544 if not source_data_tdist: 

545 if self._min_days[urgency] < self._min_days_default: 

546 age_info["urgency-reduced"] = { 

547 "from": urgency, 

548 "to": self._default_urgency, 

549 } 

550 urgency = self._default_urgency 

551 

552 if source_name not in self._dates: 

553 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

554 elif self._dates[source_name][0] != source_data_srcdist.version: 

555 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

556 

557 days_old = self._date_now - self._dates[source_name][1] 

558 min_days = self._min_days[urgency] 

559 for bounty in excuse.bounty: 

560 if excuse.bounty[bounty]: 560 ↛ 559line 560 didn't jump to line 559 because the condition on line 560 was always true

561 self.logger.info( 

562 "Applying bounty for %s granted by %s: %d days", 

563 source_name, 

564 bounty, 

565 excuse.bounty[bounty], 

566 ) 

567 excuse.addinfo( 

568 "Required age reduced by %d days because of %s" 

569 % (excuse.bounty[bounty], bounty) 

570 ) 

571 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen" 

572 min_days -= excuse.bounty[bounty] 

573 if urgency not in self._penalty_immune_urgencies: 

574 for penalty in excuse.penalty: 

575 if excuse.penalty[penalty]: 575 ↛ 574line 575 didn't jump to line 574 because the condition on line 575 was always true

576 self.logger.info( 

577 "Applying penalty for %s given by %s: %d days", 

578 source_name, 

579 penalty, 

580 excuse.penalty[penalty], 

581 ) 

582 excuse.addinfo( 

583 "Required age increased by %d days because of %s" 

584 % (excuse.penalty[penalty], penalty) 

585 ) 

586 assert ( 

587 excuse.penalty[penalty] > 0 

588 ), "negative penalties should be handled earlier" 

589 min_days += excuse.penalty[penalty] 

590 

591 assert self._bounty_min_age is not None 

592 # the age in BOUNTY_MIN_AGE can be higher than the one associated with 

593 # the real urgency, so don't forget to take it into account 

594 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency]) 

595 if min_days < bounty_min_age: 

596 min_days = bounty_min_age 

597 excuse.addinfo( 

598 "Required age is not allowed to drop below %d days" % min_days 

599 ) 

600 

601 age_info["current-age"] = days_old 

602 

603 assert self.hints is not None 

604 for age_days_hint in cast( 

605 "list[AgeDayHint]", 

606 self.hints.search( 

607 "age-days", package=source_name, version=source_data_srcdist.version 

608 ), 

609 ): 

610 new_req = age_days_hint.days 

611 age_info["age-requirement-reduced"] = { 

612 "new-requirement": new_req, 

613 "changed-by": age_days_hint.user, 

614 } 

615 if "original-age-requirement" not in age_info: 615 ↛ 617line 615 didn't jump to line 617 because the condition on line 615 was always true

616 age_info["original-age-requirement"] = min_days 

617 min_days = new_req 

618 

619 age_info["age-requirement"] = min_days 

620 res = PolicyVerdict.PASS 

621 

622 if days_old < min_days: 

623 urgent_hints = self.hints.search( 

624 "urgent", package=source_name, version=source_data_srcdist.version 

625 ) 

626 if urgent_hints: 

627 age_info["age-requirement-reduced"] = { 

628 "new-requirement": 0, 

629 "changed-by": urgent_hints[0].user, 

630 } 

631 res = PolicyVerdict.PASS_HINTED 

632 else: 

633 res = PolicyVerdict.REJECTED_TEMPORARILY 

634 

635 # update excuse 

636 age_hint = age_info.get("age-requirement-reduced", None) 

637 age_min_req = age_info["age-requirement"] 

638 if age_hint: 

639 new_req = age_hint["new-requirement"] 

640 who = age_hint["changed-by"] 

641 if new_req: 

642 excuse.addinfo( 

643 "Overriding age needed from %d days to %d by %s" 

644 % (age_min_req, new_req, who) 

645 ) 

646 age_min_req = new_req 

647 else: 

648 excuse.addinfo("Too young, but urgency pushed by %s" % who) 

649 age_min_req = 0 

650 excuse.setdaysold(age_info["current-age"], age_min_req) 

651 

652 if age_min_req == 0: 

653 excuse.addinfo("%d days old" % days_old) 

654 elif days_old < age_min_req: 

655 excuse.add_verdict_info( 

656 res, "Too young, only %d of %d days old" % (days_old, age_min_req) 

657 ) 

658 else: 

659 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req)) 

660 

661 return res 

662 

663 def _read_dates_file(self) -> None: 

664 """Parse the dates file""" 

665 dates = self._dates 

666 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates") 

667 using_new_name = False 

668 try: 

669 filename = os.path.join(self.state_dir, "age-policy-dates") 

670 if not os.path.exists(filename) and os.path.exists(fallback_filename): 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true

671 filename = fallback_filename 

672 else: 

673 using_new_name = True 

674 except AttributeError: 

675 if os.path.exists(fallback_filename): 

676 filename = fallback_filename 

677 else: 

678 raise RuntimeError("Please set STATE_DIR in the britney configuration") 

679 

680 try: 

681 with open(filename, encoding="utf-8") as fd: 

682 for line in fd: 

683 if line.startswith("#"): 

684 # Ignore comment lines (mostly used for tests) 

685 continue 

686 # <source> <version> <date>) 

687 ln = line.split() 

688 if len(ln) != 3: # pragma: no cover 

689 continue 

690 try: 

691 dates[ln[0]] = (ln[1], int(ln[2])) 

692 except ValueError: # pragma: no cover 

693 pass 

694 except FileNotFoundError: 

695 if not using_new_name: 695 ↛ 697line 695 didn't jump to line 697 because the condition on line 695 was never true

696 # If we using the legacy name, then just give up 

697 raise 

698 self.logger.info("%s does not appear to exist. Creating it", filename) 

699 with open(filename, mode="x", encoding="utf-8"): 

700 pass 

701 

702 def _read_urgencies_file(self) -> None: 

703 urgencies = self._urgencies 

704 min_days_default = self._min_days_default 

705 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency") 

706 try: 

707 filename = os.path.join(self.state_dir, "age-policy-urgencies") 

708 if not os.path.exists(filename) and os.path.exists(fallback_filename): 708 ↛ 709line 708 didn't jump to line 709 because the condition on line 708 was never true

709 filename = fallback_filename 

710 except AttributeError: 

711 filename = fallback_filename 

712 

713 sources_s = self.suite_info.primary_source_suite.sources 

714 sources_t = self.suite_info.target_suite.sources 

715 

716 with open(filename, errors="surrogateescape", encoding="ascii") as fd: 

717 for line in fd: 

718 if line.startswith("#"): 

719 # Ignore comment lines (mostly used for tests) 

720 continue 

721 # <source> <version> <urgency> 

722 ln = line.split() 

723 if len(ln) != 3: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true

724 continue 

725 

726 # read the minimum days associated with the urgencies 

727 urgency_old = urgencies.get(ln[0], None) 

728 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type] 

729 mindays_new = self._min_days.get(ln[2], min_days_default) 

730 

731 # if the new urgency is lower (so the min days are higher), do nothing 

732 if mindays_old <= mindays_new: 

733 continue 

734 

735 # if the package exists in the target suite and it is more recent, do nothing 

736 tsrcv = sources_t.get(ln[0], None) 

737 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0: 

738 continue 

739 

740 # if the package doesn't exist in the primary source suite or it is older, do nothing 

741 usrcv = sources_s.get(ln[0], None) 

742 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 742 ↛ 743line 742 didn't jump to line 743 because the condition on line 742 was never true

743 continue 

744 

745 # update the urgency for the package 

746 urgencies[ln[0]] = ln[2] 

747 

748 def _write_dates_file(self) -> None: 

749 dates = self._dates 

750 try: 

751 directory = self.state_dir 

752 basename = "age-policy-dates" 

753 old_file = os.path.join(self.suite_info.target_suite.path, "Dates") 

754 except AttributeError: 

755 directory = self.suite_info.target_suite.path 

756 basename = "Dates" 

757 old_file = None 

758 filename = os.path.join(directory, basename) 

759 filename_tmp = os.path.join(directory, "%s_new" % basename) 

760 with open(filename_tmp, "w", encoding="utf-8") as fd: 

761 for pkg in sorted(dates): 

762 version, date = dates[pkg] 

763 fd.write("%s %s %d\n" % (pkg, version, date)) 

764 os.rename(filename_tmp, filename) 

765 if old_file is not None and os.path.exists(old_file): 765 ↛ 766line 765 didn't jump to line 766 because the condition on line 765 was never true

766 self.logger.info("Removing old age-policy-dates file %s", old_file) 

767 os.unlink(old_file) 

768 

769 

770class RCBugPolicy(AbstractBasePolicy): 

771 """RC bug regression policy for source migrations 

772 

773 The RCBugPolicy will read provided list of RC bugs and block any 

774 source upload that would introduce a *new* RC bug in the target 

775 suite. 

776 

777 The RCBugPolicy's decision is influenced by the following: 

778 

779 State files: 

780 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in 

781 the given suite (one for both primary source suite and the target sutie is 

782 needed). 

783 - These files need to be updated externally. 

784 """ 

785 

786 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

787 super().__init__( 

788 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

789 ) 

790 self._bugs_source: dict[str, set[str]] | None = None 

791 self._bugs_target: dict[str, set[str]] | None = None 

792 

793 def register_hints(self, hint_parser: HintParser) -> None: 

794 f = simple_policy_hint_parser_function( 

795 IgnoreRCBugHint, lambda x: frozenset(x.split(",")) 

796 ) 

797 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2)) 

798 

799 def initialise(self, britney: "Britney") -> None: 

800 super().initialise(britney) 

801 source_suite = self.suite_info.primary_source_suite 

802 target_suite = self.suite_info.target_suite 

803 fallback_unstable = os.path.join(source_suite.path, "BugsV") 

804 fallback_testing = os.path.join(target_suite.path, "BugsV") 

805 try: 

806 filename_unstable = os.path.join( 

807 self.state_dir, "rc-bugs-%s" % source_suite.name 

808 ) 

809 filename_testing = os.path.join( 

810 self.state_dir, "rc-bugs-%s" % target_suite.name 

811 ) 

812 if ( 812 ↛ 818line 812 didn't jump to line 818

813 not os.path.exists(filename_unstable) 

814 and not os.path.exists(filename_testing) 

815 and os.path.exists(fallback_unstable) 

816 and os.path.exists(fallback_testing) 

817 ): 

818 filename_unstable = fallback_unstable 

819 filename_testing = fallback_testing 

820 except AttributeError: 

821 filename_unstable = fallback_unstable 

822 filename_testing = fallback_testing 

823 self._bugs_source = self._read_bugs(filename_unstable) 

824 self._bugs_target = self._read_bugs(filename_testing) 

825 

826 def apply_src_policy_impl( 

827 self, 

828 rcbugs_info: dict[str, Any], 

829 item: MigrationItem, 

830 source_data_tdist: SourcePackage | None, 

831 source_data_srcdist: SourcePackage, 

832 excuse: "Excuse", 

833 ) -> PolicyVerdict: 

834 assert self._bugs_source is not None # for type checking 

835 assert self._bugs_target is not None # for type checking 

836 bugs_t = set() 

837 bugs_u = set() 

838 source_name = item.package 

839 

840 for src_key in (source_name, "src:%s" % source_name): 

841 if source_data_tdist and src_key in self._bugs_target: 

842 bugs_t.update(self._bugs_target[src_key]) 

843 if src_key in self._bugs_source: 

844 bugs_u.update(self._bugs_source[src_key]) 

845 

846 for pkg, _, _ in source_data_srcdist.binaries: 

847 if pkg in self._bugs_source: 

848 bugs_u |= self._bugs_source[pkg] 

849 if source_data_tdist: 

850 for pkg, _, _ in source_data_tdist.binaries: 

851 if pkg in self._bugs_target: 

852 bugs_t |= self._bugs_target[pkg] 

853 

854 # If a package is not in the target suite, it has no RC bugs per 

855 # definition. Unfortunately, it seems that the live-data is 

856 # not always accurate (e.g. live-2011-12-13 suggests that 

857 # obdgpslogger had the same bug in testing and unstable, 

858 # but obdgpslogger was not in testing at that time). 

859 # - For the curious, obdgpslogger was removed on that day 

860 # and the BTS probably had not caught up with that fact. 

861 # (https://tracker.debian.org/news/415935) 

862 assert not bugs_t or source_data_tdist, ( 

863 "%s had bugs in the target suite but is not present" % source_name 

864 ) 

865 

866 verdict = PolicyVerdict.PASS 

867 

868 assert self.hints is not None 

869 for ignore_hint in cast( 

870 list[IgnoreRCBugHint], 

871 self.hints.search( 

872 "ignore-rc-bugs", 

873 package=source_name, 

874 version=source_data_srcdist.version, 

875 ), 

876 ): 

877 ignored_bugs = ignore_hint.ignored_rcbugs 

878 

879 # Only handle one hint for now 

880 if "ignored-bugs" in rcbugs_info: 880 ↛ 881line 880 didn't jump to line 881 because the condition on line 880 was never true

881 self.logger.info( 

882 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s", 

883 ignore_hint.user, 

884 source_name, 

885 rcbugs_info["ignored-bugs"]["issued-by"], 

886 ) 

887 continue 

888 if not ignored_bugs.isdisjoint(bugs_u): 888 ↛ 897line 888 didn't jump to line 897 because the condition on line 888 was always true

889 bugs_u -= ignored_bugs 

890 bugs_t -= ignored_bugs 

891 rcbugs_info["ignored-bugs"] = { 

892 "bugs": sorted(ignored_bugs), 

893 "issued-by": ignore_hint.user, 

894 } 

895 verdict = PolicyVerdict.PASS_HINTED 

896 else: 

897 self.logger.info( 

898 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package", 

899 ignore_hint.user, 

900 source_name, 

901 str(ignored_bugs), 

902 ) 

903 

904 rcbugs_info["shared-bugs"] = sorted(bugs_u & bugs_t) 

905 rcbugs_info["unique-source-bugs"] = sorted(bugs_u - bugs_t) 

906 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_u) 

907 

908 # update excuse 

909 new_bugs = rcbugs_info["unique-source-bugs"] 

910 old_bugs = rcbugs_info["unique-target-bugs"] 

911 excuse.setbugs(old_bugs, new_bugs) 

912 

913 if new_bugs: 

914 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

915 excuse.add_verdict_info( 

916 verdict, 

917 "Updating %s would introduce bugs in %s: %s" 

918 % ( 

919 source_name, 

920 self.suite_info.target_suite.name, 

921 ", ".join( 

922 [ 

923 '<a href="https://bugs.debian.org/%s">#%s</a>' 

924 % (quote(a), a) 

925 for a in new_bugs 

926 ] 

927 ), 

928 ), 

929 ) 

930 

931 if old_bugs: 

932 excuse.addinfo( 

933 "Updating %s will fix bugs in %s: %s" 

934 % ( 

935 source_name, 

936 self.suite_info.target_suite.name, 

937 ", ".join( 

938 [ 

939 '<a href="https://bugs.debian.org/%s">#%s</a>' 

940 % (quote(a), a) 

941 for a in old_bugs 

942 ] 

943 ), 

944 ) 

945 ) 

946 

947 return verdict 

948 

949 def _read_bugs(self, filename: str) -> dict[str, set[str]]: 

950 """Read the release critical bug summary from the specified file 

951 

952 The file contains rows with the format: 

953 

954 <package-name> <bug number>[,<bug number>...] 

955 

956 The method returns a dictionary where the key is the binary package 

957 name and the value is the list of open RC bugs for it. 

958 """ 

959 bugs: dict[str, set[str]] = {} 

960 self.logger.info("Loading RC bugs data from %s", filename) 

961 with open(filename, encoding="ascii") as f: 

962 for line in f: 

963 ln = line.split() 

964 if len(ln) != 2: # pragma: no cover 

965 self.logger.warning("Malformed line found in line %s", line) 

966 continue 

967 pkg = ln[0] 

968 if pkg not in bugs: 968 ↛ 962line 968 didn't jump to line 962 because the condition on line 968 was always true

969 bugs[pkg] = set() 

970 bugs[pkg].update(ln[1].split(",")) 

971 return bugs 

972 

973 

974class PiupartsPolicy(AbstractBasePolicy): 

975 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

976 super().__init__( 

977 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

978 ) 

979 self._piuparts_source: dict[str, tuple[str, str]] | None = None 

980 self._piuparts_target: dict[str, tuple[str, str]] | None = None 

981 

982 def register_hints(self, hint_parser: HintParser) -> None: 

983 hint_parser.register_hint_type(HintType("ignore-piuparts")) 

984 

985 def initialise(self, britney: "Britney") -> None: 

986 super().initialise(britney) 

987 source_suite = self.suite_info.primary_source_suite 

988 target_suite = self.suite_info.target_suite 

989 try: 

990 filename_unstable = os.path.join( 

991 self.state_dir, "piuparts-summary-%s.json" % source_suite.name 

992 ) 

993 filename_testing = os.path.join( 

994 self.state_dir, "piuparts-summary-%s.json" % target_suite.name 

995 ) 

996 except AttributeError as e: # pragma: no cover 

997 raise RuntimeError( 

998 "Please set STATE_DIR in the britney configuration" 

999 ) from e 

1000 self._piuparts_source = self._read_piuparts_summary( 

1001 filename_unstable, keep_url=True 

1002 ) 

1003 self._piuparts_target = self._read_piuparts_summary( 

1004 filename_testing, keep_url=False 

1005 ) 

1006 

1007 def apply_src_policy_impl( 

1008 self, 

1009 piuparts_info: dict[str, Any], 

1010 item: MigrationItem, 

1011 source_data_tdist: SourcePackage | None, 

1012 source_data_srcdist: SourcePackage, 

1013 excuse: "Excuse", 

1014 ) -> PolicyVerdict: 

1015 assert self._piuparts_source is not None # for type checking 

1016 assert self._piuparts_target is not None # for type checking 

1017 source_name = item.package 

1018 

1019 if source_name in self._piuparts_target: 

1020 testing_state = self._piuparts_target[source_name][0] 

1021 else: 

1022 testing_state = "X" 

1023 url: str | None 

1024 if source_name in self._piuparts_source: 

1025 unstable_state, url = self._piuparts_source[source_name] 

1026 else: 

1027 unstable_state = "X" 

1028 url = None 

1029 url_html = "(no link yet)" 

1030 if url is not None: 

1031 url_html = '<a href="{0}">{0}</a>'.format(url) 

1032 

1033 if unstable_state == "P": 

1034 # Not a regression 

1035 msg = f"Piuparts tested OK - {url_html}" 

1036 result = PolicyVerdict.PASS 

1037 piuparts_info["test-results"] = "pass" 

1038 elif unstable_state == "F": 

1039 if testing_state != unstable_state: 

1040 piuparts_info["test-results"] = "regression" 

1041 msg = f"Rejected due to piuparts regression - {url_html}" 

1042 result = PolicyVerdict.REJECTED_PERMANENTLY 

1043 else: 

1044 piuparts_info["test-results"] = "failed" 

1045 msg = f"Ignoring piuparts failure (Not a regression) - {url_html}" 

1046 result = PolicyVerdict.PASS 

1047 elif unstable_state == "W": 

1048 msg = f"Waiting for piuparts test results (stalls migration) - {url_html}" 

1049 result = PolicyVerdict.REJECTED_TEMPORARILY 

1050 piuparts_info["test-results"] = "waiting-for-test-results" 

1051 else: 

1052 msg = f"Cannot be tested by piuparts (not a blocker) - {url_html}" 

1053 piuparts_info["test-results"] = "cannot-be-tested" 

1054 result = PolicyVerdict.PASS 

1055 

1056 if url is not None: 

1057 piuparts_info["piuparts-test-url"] = url 

1058 if result.is_rejected: 

1059 excuse.add_verdict_info(result, msg) 

1060 else: 

1061 excuse.addinfo(msg) 

1062 

1063 if result.is_rejected: 

1064 assert self.hints is not None 

1065 for ignore_hint in self.hints.search( 

1066 "ignore-piuparts", 

1067 package=source_name, 

1068 version=source_data_srcdist.version, 

1069 ): 

1070 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user} 

1071 result = PolicyVerdict.PASS_HINTED 

1072 excuse.addinfo( 

1073 f"Ignoring piuparts issue as requested by {ignore_hint.user}" 

1074 ) 

1075 break 

1076 

1077 return result 

1078 

1079 def _read_piuparts_summary( 

1080 self, filename: str, keep_url: bool = True 

1081 ) -> dict[str, tuple[str, str]]: 

1082 summary: dict[str, tuple[str, str]] = {} 

1083 self.logger.info("Loading piuparts report from %s", filename) 

1084 with open(filename) as fd: 1084 ↛ exitline 1084 didn't return from function '_read_piuparts_summary' because the return on line 1086 wasn't executed

1085 if os.fstat(fd.fileno()).st_size < 1: 1085 ↛ 1086line 1085 didn't jump to line 1086 because the condition on line 1085 was never true

1086 return summary 

1087 data = json.load(fd) 

1088 try: 

1089 if ( 

1090 data["_id"] != "Piuparts Package Test Results Summary" 

1091 or data["_version"] != "1.0" 

1092 ): # pragma: no cover 

1093 raise ValueError( 

1094 f"Piuparts results in {filename} does not have the correct ID or version" 

1095 ) 

1096 except KeyError as e: # pragma: no cover 

1097 raise ValueError( 

1098 f"Piuparts results in {filename} is missing id or version field" 

1099 ) from e 

1100 for source, suite_data in data["packages"].items(): 

1101 if len(suite_data) != 1: # pragma: no cover 

1102 raise ValueError( 

1103 f"Piuparts results in {filename}, the source {source} does not have " 

1104 "exactly one result set" 

1105 ) 

1106 item = next(iter(suite_data.values())) 

1107 state, _, url = item 

1108 if not keep_url: 

1109 url = None 

1110 summary[source] = (state, url) 

1111 

1112 return summary 

1113 

1114 

1115class DependsPolicy(AbstractBasePolicy): 

1116 pkg_universe: "BinaryPackageUniverse" 

1117 broken_packages: frozenset["BinaryPackageId"] 

1118 all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

1119 allow_uninst: dict[str, set[str | None]] 

1120 

1121 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1122 super().__init__( 

1123 "depends", 

1124 options, 

1125 suite_info, 

1126 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1127 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1128 ) 

1129 self.nobreakall_arches = None 

1130 self.new_arches = None 

1131 self.break_arches = None 

1132 

1133 def initialise(self, britney: "Britney") -> None: 

1134 super().initialise(britney) 

1135 self.pkg_universe = britney.pkg_universe 

1136 self.broken_packages = self.pkg_universe.broken_packages 

1137 self.all_binaries = britney.all_binaries 

1138 self.nobreakall_arches = self.options.nobreakall_arches 

1139 self.new_arches = self.options.new_arches 

1140 self.break_arches = self.options.break_arches 

1141 self.allow_uninst = britney.allow_uninst 

1142 

1143 def apply_srcarch_policy_impl( 

1144 self, 

1145 deps_info: dict[str, Any], 

1146 item: MigrationItem, 

1147 arch: str, 

1148 source_data_tdist: SourcePackage | None, 

1149 source_data_srcdist: SourcePackage, 

1150 excuse: "Excuse", 

1151 ) -> PolicyVerdict: 

1152 verdict = PolicyVerdict.PASS 

1153 

1154 assert self.break_arches is not None 

1155 assert self.new_arches is not None 

1156 if arch in self.break_arches or arch in self.new_arches: 

1157 # we don't check these in the policy (TODO - for now?) 

1158 return verdict 

1159 

1160 source_suite = item.suite 

1161 target_suite = self.suite_info.target_suite 

1162 

1163 packages_s_a = source_suite.binaries[arch] 

1164 packages_t_a = target_suite.binaries[arch] 

1165 

1166 my_bins = sorted(excuse.packages[arch]) 

1167 

1168 arch_all_installable = set() 

1169 arch_arch_installable = set() 

1170 consider_it_regression = True 

1171 

1172 for pkg_id in my_bins: 

1173 pkg_name = pkg_id.package_name 

1174 binary_u = packages_s_a[pkg_name] 

1175 pkg_arch = binary_u.architecture 

1176 

1177 # in some cases, we want to track the uninstallability of a 

1178 # package (because the autopkgtest policy uses this), but we still 

1179 # want to allow the package to be uninstallable 

1180 skip_dep_check = False 

1181 

1182 if binary_u.source_version != source_data_srcdist.version: 

1183 # don't check cruft in unstable 

1184 continue 

1185 

1186 if item.architecture != "source" and pkg_arch == "all": 

1187 # we don't care about the existing arch: all binaries when 

1188 # checking a binNMU item, because the arch: all binaries won't 

1189 # migrate anyway 

1190 skip_dep_check = True 

1191 

1192 if pkg_arch == "all" and arch not in self.nobreakall_arches: 

1193 skip_dep_check = True 

1194 

1195 if pkg_name in self.allow_uninst[arch]: 1195 ↛ 1198line 1195 didn't jump to line 1198 because the condition on line 1195 was never true

1196 # this binary is allowed to become uninstallable, so we don't 

1197 # need to check anything 

1198 skip_dep_check = True 

1199 

1200 if pkg_name in packages_t_a: 

1201 oldbin = packages_t_a[pkg_name] 

1202 if not target_suite.is_installable(oldbin.pkg_id): 

1203 # as the current binary in testing is already 

1204 # uninstallable, the newer version is allowed to be 

1205 # uninstallable as well, so we don't need to check 

1206 # anything 

1207 skip_dep_check = True 

1208 consider_it_regression = False 

1209 

1210 if pkg_id in self.broken_packages: 

1211 if pkg_arch == "all": 

1212 arch_all_installable.add(False) 

1213 else: 

1214 arch_arch_installable.add(False) 

1215 # dependencies can't be satisfied by all the known binaries - 

1216 # this certainly won't work... 

1217 excuse.add_unsatisfiable_on_arch(arch) 

1218 if skip_dep_check: 

1219 # ...but if the binary is allowed to become uninstallable, 

1220 # we don't care 

1221 # we still want the binary to be listed as uninstallable, 

1222 continue 

1223 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

1224 excuse.add_verdict_info( 

1225 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency" 

1226 ) 

1227 excuse.addreason("depends") 

1228 else: 

1229 if pkg_arch == "all": 

1230 arch_all_installable.add(True) 

1231 else: 

1232 arch_arch_installable.add(True) 

1233 

1234 if skip_dep_check: 

1235 continue 

1236 

1237 deps = self.pkg_universe.dependencies_of(pkg_id) 

1238 

1239 for dep in deps: 

1240 # dep is a list of packages, each of which satisfy the 

1241 # dependency 

1242 

1243 if dep == frozenset(): 

1244 continue 

1245 is_ok = False 

1246 needed_for_dep = set() 

1247 

1248 for alternative in dep: 

1249 if target_suite.is_pkg_in_the_suite(alternative): 

1250 # dep can be satisfied in testing - ok 

1251 is_ok = True 

1252 elif alternative in my_bins: 

1253 # can be satisfied by binary from same item: will be 

1254 # ok if item migrates 

1255 is_ok = True 

1256 else: 

1257 needed_for_dep.add(alternative) 

1258 

1259 if not is_ok: 

1260 spec = DependencySpec(DependencyType.DEPENDS, arch) 

1261 excuse.add_package_depends(spec, needed_for_dep) 

1262 

1263 # The autopkgtest policy needs delicate trade offs for 

1264 # non-installability. The current choice (considering source 

1265 # migration and only binaries built by the version of the 

1266 # source): 

1267 # 

1268 # * Run autopkgtest if all arch:$arch binaries are installable 

1269 # (but some or all arch:all binaries are not) 

1270 # 

1271 # * Don't schedule nor wait for not installable arch:all only package 

1272 # on ! NOBREAKALL_ARCHES 

1273 # 

1274 # * Run autopkgtest if installability isn't a regression (there are (or 

1275 # rather, should) not be a lot of packages in this state, and most 

1276 # likely they'll just fail quickly) 

1277 # 

1278 # * Don't schedule, but wait otherwise 

1279 if arch_arch_installable == {True} and False in arch_all_installable: 1279 ↛ 1280line 1279 didn't jump to line 1280 because the condition on line 1279 was never true

1280 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1281 elif ( 

1282 arch not in self.nobreakall_arches 

1283 and arch_arch_installable == set() 

1284 and False in arch_all_installable 

1285 ): 

1286 deps_info.setdefault("arch_all_not_installable", []).append(arch) 

1287 elif not consider_it_regression: 

1288 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1289 

1290 return verdict 

1291 

1292 

1293@unique 

1294class BuildDepResult(IntEnum): 

1295 # relation is satisfied in target 

1296 OK = 1 

1297 # relation can be satisfied by other packages in source 

1298 DEPENDS = 2 

1299 # relation cannot be satisfied 

1300 FAILED = 3 

1301 

1302 

1303class BuildDependsPolicy(AbstractBasePolicy): 

1304 

1305 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1306 super().__init__( 

1307 "build-depends", 

1308 options, 

1309 suite_info, 

1310 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1311 ) 

1312 self._all_buildarch: list[str] = [] 

1313 

1314 parse_option(options, "all_buildarch") 

1315 

1316 def initialise(self, britney: "Britney") -> None: 

1317 super().initialise(britney) 

1318 if self.options.all_buildarch: 

1319 self._all_buildarch = SuiteContentLoader.config_str_as_list( 

1320 self.options.all_buildarch, [] 

1321 ) 

1322 

1323 def apply_src_policy_impl( 

1324 self, 

1325 build_deps_info: dict[str, Any], 

1326 item: MigrationItem, 

1327 source_data_tdist: SourcePackage | None, 

1328 source_data_srcdist: SourcePackage, 

1329 excuse: "Excuse", 

1330 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1331 ) -> PolicyVerdict: 

1332 verdict = PolicyVerdict.PASS 

1333 

1334 # analyze the dependency fields (if present) 

1335 if deps := source_data_srcdist.build_deps_arch: 

1336 v = self._check_build_deps( 

1337 deps, 

1338 DependencyType.BUILD_DEPENDS, 

1339 build_deps_info, 

1340 item, 

1341 source_data_tdist, 

1342 source_data_srcdist, 

1343 excuse, 

1344 get_dependency_solvers=get_dependency_solvers, 

1345 ) 

1346 verdict = PolicyVerdict.worst_of(verdict, v) 

1347 

1348 if ideps := source_data_srcdist.build_deps_indep: 

1349 v = self._check_build_deps( 

1350 ideps, 

1351 DependencyType.BUILD_DEPENDS_INDEP, 

1352 build_deps_info, 

1353 item, 

1354 source_data_tdist, 

1355 source_data_srcdist, 

1356 excuse, 

1357 get_dependency_solvers=get_dependency_solvers, 

1358 ) 

1359 verdict = PolicyVerdict.worst_of(verdict, v) 

1360 

1361 return verdict 

1362 

1363 def _get_check_archs( 

1364 self, archs: Container[str], dep_type: DependencyType 

1365 ) -> list[str]: 

1366 oos = self.options.outofsync_arches 

1367 

1368 if dep_type == DependencyType.BUILD_DEPENDS: 

1369 return [ 

1370 arch 

1371 for arch in self.options.architectures 

1372 if arch in archs and arch not in oos 

1373 ] 

1374 

1375 # first try the all buildarch 

1376 checkarchs = list(self._all_buildarch) 

1377 # then try the architectures where this source has arch specific 

1378 # binaries (in the order of the architecture config file) 

1379 checkarchs.extend( 

1380 arch 

1381 for arch in self.options.architectures 

1382 if arch in archs and arch not in checkarchs 

1383 ) 

1384 # then try all other architectures 

1385 checkarchs.extend( 

1386 arch for arch in self.options.architectures if arch not in checkarchs 

1387 ) 

1388 

1389 # and drop OUTOFSYNC_ARCHES 

1390 return [arch for arch in checkarchs if arch not in oos] 

1391 

1392 def _add_info_for_arch( 

1393 self, 

1394 arch: str, 

1395 excuses_info: dict[str, list[str]], 

1396 blockers: dict[str, set[BinaryPackageId]], 

1397 results: dict[str, BuildDepResult], 

1398 dep_type: DependencyType, 

1399 target_suite: TargetSuite, 

1400 source_suite: Suite, 

1401 excuse: "Excuse", 

1402 verdict: PolicyVerdict, 

1403 ) -> PolicyVerdict: 

1404 if arch in blockers: 

1405 packages = blockers[arch] 

1406 

1407 # for the solving packages, update the excuse to add the dependencies 

1408 for p in packages: 

1409 if arch not in self.options.break_arches: 1409 ↛ 1408line 1409 didn't jump to line 1408 because the condition on line 1409 was always true

1410 spec = DependencySpec(dep_type, arch) 

1411 excuse.add_package_depends(spec, {p}) 

1412 

1413 if arch in results and results[arch] == BuildDepResult.FAILED: 

1414 verdict = PolicyVerdict.worst_of( 

1415 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1416 ) 

1417 

1418 if arch in excuses_info: 

1419 for excuse_text in excuses_info[arch]: 

1420 if verdict.is_rejected: 1420 ↛ 1423line 1420 didn't jump to line 1423 because the condition on line 1420 was always true

1421 excuse.add_verdict_info(verdict, excuse_text) 

1422 else: 

1423 excuse.addinfo(excuse_text) 

1424 

1425 return verdict 

1426 

1427 def _check_build_deps( 

1428 self, 

1429 deps: str, 

1430 dep_type: DependencyType, 

1431 build_deps_info: dict[str, Any], 

1432 item: MigrationItem, 

1433 source_data_tdist: SourcePackage | None, 

1434 source_data_srcdist: SourcePackage, 

1435 excuse: "Excuse", 

1436 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1437 ) -> PolicyVerdict: 

1438 verdict = PolicyVerdict.PASS 

1439 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP 

1440 

1441 britney = self.britney 

1442 

1443 # local copies for better performance 

1444 parse_src_depends = apt_pkg.parse_src_depends 

1445 

1446 source_name = item.package 

1447 source_suite = item.suite 

1448 target_suite = self.suite_info.target_suite 

1449 binaries_s = source_suite.binaries 

1450 provides_s = source_suite.provides_table 

1451 binaries_t = target_suite.binaries 

1452 provides_t = target_suite.provides_table 

1453 unsat_bd: dict[str, list[str]] = {} 

1454 relevant_archs: set[str] = { 

1455 binary.architecture 

1456 for binary in source_data_srcdist.binaries 

1457 if britney.all_binaries[binary].architecture != "all" 

1458 } 

1459 

1460 excuses_info: dict[str, list[str]] = defaultdict(list) 

1461 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set) 

1462 arch_results = {} 

1463 result_archs = defaultdict(list) 

1464 bestresult = BuildDepResult.FAILED 

1465 check_archs = self._get_check_archs(relevant_archs, dep_type) 

1466 if not check_archs: 

1467 # when the arch list is empty, we check the b-d on any arch, instead of all archs 

1468 # this happens for Build-Depens on a source package that only produces arch: all binaries 

1469 any_arch_ok = True 

1470 check_archs = self._get_check_archs( 

1471 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP 

1472 ) 

1473 

1474 for arch in check_archs: 

1475 # retrieve the binary package from the specified suite and arch 

1476 binaries_s_a = binaries_s[arch] 

1477 provides_s_a = provides_s[arch] 

1478 binaries_t_a = binaries_t[arch] 

1479 provides_t_a = provides_t[arch] 

1480 arch_results[arch] = BuildDepResult.OK 

1481 # for every dependency block (formed as conjunction of disjunction) 

1482 for block_txt in deps.split(","): 

1483 block_list = parse_src_depends(block_txt, False, arch) 

1484 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be 

1485 # filtered out by (e.g.) architecture restrictions. We need to cope with this while 

1486 # keeping block_txt and block aligned. 

1487 if not block_list: 

1488 # Relation is not relevant for this architecture. 

1489 continue 

1490 block = block_list[0] 

1491 # if the block is satisfied in the target suite, then skip the block 

1492 if get_dependency_solvers( 

1493 block, binaries_t_a, provides_t_a, build_depends=True 

1494 ): 

1495 # Satisfied in the target suite; all ok. 

1496 continue 

1497 

1498 # check if the block can be satisfied in the source suite, and list the solving packages 

1499 packages = get_dependency_solvers( 

1500 block, binaries_s_a, provides_s_a, build_depends=True 

1501 ) 

1502 sources = sorted(p.source for p in packages) 

1503 

1504 # if the dependency can be satisfied by the same source package, skip the block: 

1505 # obviously both binary packages will enter the target suite together 

1506 if source_name in sources: 1506 ↛ 1507line 1506 didn't jump to line 1507 because the condition on line 1506 was never true

1507 continue 

1508 

1509 # if no package can satisfy the dependency, add this information to the excuse 

1510 if not packages: 

1511 excuses_info[arch].append( 

1512 "%s unsatisfiable %s on %s: %s" 

1513 % (source_name, dep_type, arch, block_txt.strip()) 

1514 ) 

1515 if arch not in unsat_bd: 1515 ↛ 1517line 1515 didn't jump to line 1517 because the condition on line 1515 was always true

1516 unsat_bd[arch] = [] 

1517 unsat_bd[arch].append(block_txt.strip()) 

1518 arch_results[arch] = BuildDepResult.FAILED 

1519 continue 

1520 

1521 blockers[arch].update(p.pkg_id for p in packages) 

1522 if arch_results[arch] < BuildDepResult.DEPENDS: 

1523 arch_results[arch] = BuildDepResult.DEPENDS 

1524 

1525 if any_arch_ok: 

1526 if arch_results[arch] < bestresult: 

1527 bestresult = arch_results[arch] 

1528 result_archs[arch_results[arch]].append(arch) 

1529 if bestresult == BuildDepResult.OK: 1529 ↛ 1532line 1529 didn't jump to line 1532 because the condition on line 1529 was never true

1530 # we found an architecture where the b-deps-indep are 

1531 # satisfied in the target suite, so we can stop 

1532 break 

1533 

1534 if any_arch_ok: 

1535 arch = result_archs[bestresult][0] 

1536 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}") 

1537 key = "check-%s-on-arch" % dep_type.get_reason() 

1538 build_deps_info[key] = arch 

1539 verdict = self._add_info_for_arch( 

1540 arch, 

1541 excuses_info, 

1542 blockers, 

1543 arch_results, 

1544 dep_type, 

1545 target_suite, 

1546 source_suite, 

1547 excuse, 

1548 verdict, 

1549 ) 

1550 

1551 else: 

1552 for arch in check_archs: 

1553 verdict = self._add_info_for_arch( 

1554 arch, 

1555 excuses_info, 

1556 blockers, 

1557 arch_results, 

1558 dep_type, 

1559 target_suite, 

1560 source_suite, 

1561 excuse, 

1562 verdict, 

1563 ) 

1564 

1565 if unsat_bd: 

1566 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd 

1567 

1568 return verdict 

1569 

1570 

1571class BuiltUsingPolicy(AbstractBasePolicy): 

1572 """Built-Using policy 

1573 

1574 Binaries that incorporate (part of) another source package must list these 

1575 sources under 'Built-Using'. 

1576 

1577 This policy checks if the corresponding sources are available in the 

1578 target suite. If they are not, but they are candidates for migration, a 

1579 dependency is added. 

1580 

1581 If the binary incorporates a newer version of a source, that is not (yet) 

1582 a candidate, we don't want to accept that binary. A rebuild later in the 

1583 primary suite wouldn't fix the issue, because that would incorporate the 

1584 newer version again. 

1585 

1586 If the binary incorporates an older version of the source, a newer version 

1587 will be accepted as a replacement. We assume that this can be fixed by 

1588 rebuilding the binary at some point during the development cycle. 

1589 

1590 Requiring exact version of the source would not be useful in practice. A 

1591 newer upload of that source wouldn't be blocked by this policy, so the 

1592 built-using would be outdated anyway. 

1593 

1594 """ 

1595 

1596 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1597 super().__init__( 

1598 "built-using", 

1599 options, 

1600 suite_info, 

1601 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1602 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1603 ) 

1604 

1605 def initialise(self, britney: "Britney") -> None: 

1606 super().initialise(britney) 

1607 

1608 def apply_srcarch_policy_impl( 

1609 self, 

1610 build_deps_info: dict[str, Any], 

1611 item: MigrationItem, 

1612 arch: str, 

1613 source_data_tdist: SourcePackage | None, 

1614 source_data_srcdist: SourcePackage, 

1615 excuse: "Excuse", 

1616 ) -> PolicyVerdict: 

1617 verdict = PolicyVerdict.PASS 

1618 

1619 source_suite = item.suite 

1620 target_suite = self.suite_info.target_suite 

1621 binaries_s = source_suite.binaries 

1622 

1623 def check_bu_in_suite( 

1624 bu_source: str, bu_version: str, source_suite: Suite 

1625 ) -> bool: 

1626 found = False 

1627 if bu_source not in source_suite.sources: 

1628 return found 

1629 s_source = source_suite.sources[bu_source] 

1630 s_ver = s_source.version 

1631 if apt_pkg.version_compare(s_ver, bu_version) >= 0: 

1632 found = True 

1633 dep = PackageId(bu_source, s_ver, "source") 

1634 if arch in self.options.break_arches: 1634 ↛ 1635line 1634 didn't jump to line 1635 because the condition on line 1634 was never true

1635 excuse.add_detailed_info( 

1636 "Ignoring Built-Using for %s/%s on %s" 

1637 % (pkg_name, arch, dep.uvname) 

1638 ) 

1639 else: 

1640 spec = DependencySpec(DependencyType.BUILT_USING, arch) 

1641 excuse.add_package_depends(spec, {dep}) 

1642 excuse.add_detailed_info( 

1643 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}" 

1644 ) 

1645 

1646 return found 

1647 

1648 for pkg_id in sorted( 

1649 x for x in source_data_srcdist.binaries if x.architecture == arch 

1650 ): 

1651 pkg_name = pkg_id.package_name 

1652 

1653 # retrieve the testing (if present) and unstable corresponding binary packages 

1654 binary_s = binaries_s[arch][pkg_name] 

1655 

1656 for bu in binary_s.builtusing: 

1657 bu_source = bu[0] 

1658 bu_version = bu[1] 

1659 found = False 

1660 if bu_source in target_suite.sources: 

1661 t_source = target_suite.sources[bu_source] 

1662 t_ver = t_source.version 

1663 if apt_pkg.version_compare(t_ver, bu_version) >= 0: 

1664 found = True 

1665 

1666 if not found: 

1667 found = check_bu_in_suite(bu_source, bu_version, source_suite) 

1668 

1669 if not found and source_suite.suite_class.is_additional_source: 

1670 found = check_bu_in_suite( 

1671 bu_source, bu_version, self.suite_info.primary_source_suite 

1672 ) 

1673 

1674 if not found: 

1675 if arch in self.options.break_arches: 1675 ↛ 1676line 1675 didn't jump to line 1676 because the condition on line 1675 was never true

1676 excuse.add_detailed_info( 

1677 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s" 

1678 % (pkg_name, arch, bu_source, bu_version) 

1679 ) 

1680 else: 

1681 verdict = PolicyVerdict.worst_of( 

1682 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1683 ) 

1684 excuse.add_verdict_info( 

1685 verdict, 

1686 "%s/%s has unsatisfiable Built-Using on %s %s" 

1687 % (pkg_name, arch, bu_source, bu_version), 

1688 ) 

1689 

1690 return verdict 

1691 

1692 

1693class BlockPolicy(AbstractBasePolicy): 

1694 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$") 

1695 

1696 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1697 super().__init__( 

1698 "block", 

1699 options, 

1700 suite_info, 

1701 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1702 ) 

1703 self._blockall: dict[str | None, Hint] = {} 

1704 

1705 def initialise(self, britney: "Britney") -> None: 

1706 super().initialise(britney) 

1707 assert self.hints is not None 

1708 for hint in self.hints.search(type="block-all"): 

1709 self._blockall[hint.package] = hint 

1710 

1711 self._key_packages = [] 

1712 if "key" in self._blockall: 

1713 self._key_packages = self._read_key_packages() 

1714 

1715 def _read_key_packages(self) -> list[str]: 

1716 """Read the list of key packages 

1717 

1718 The file contains data in the yaml format : 

1719 

1720 - reason: <something> 

1721 source: <package> 

1722 

1723 The method returns a list of all key packages. 

1724 """ 

1725 filename = os.path.join(self.state_dir, "key_packages.yaml") 

1726 self.logger.info("Loading key packages from %s", filename) 

1727 if os.path.exists(filename): 1727 ↛ 1732line 1727 didn't jump to line 1732 because the condition on line 1727 was always true

1728 with open(filename) as f: 

1729 data = yaml.safe_load(f) 

1730 key_packages = [item["source"] for item in data] 

1731 else: 

1732 self.logger.error( 

1733 "Britney was asked to block key packages, " 

1734 + "but no key_packages.yaml file was found." 

1735 ) 

1736 sys.exit(1) 

1737 

1738 return key_packages 

1739 

1740 def register_hints(self, hint_parser: HintParser) -> None: 

1741 # block related hints are currently defined in hint.py 

1742 pass 

1743 

1744 def _check_blocked( 

1745 self, item: "MigrationItem", arch: str, version: str, excuse: "Excuse" 

1746 ) -> PolicyVerdict: 

1747 verdict = PolicyVerdict.PASS 

1748 blocked = {} 

1749 unblocked = {} 

1750 block_info = {} 

1751 source_suite = item.suite 

1752 suite_name = source_suite.name 

1753 src = item.package 

1754 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE 

1755 

1756 tooltip = ( 

1757 "please contact %s-release if update is needed" % self.options.distribution 

1758 ) 

1759 

1760 assert self.hints is not None 

1761 shints = self.hints.search(package=src) 

1762 mismatches = False 

1763 r = self.BLOCK_HINT_REGEX 

1764 for hint in shints: 

1765 m = r.match(hint.type) 

1766 if m: 

1767 if m.group(1) == "un": 

1768 assert hint.suite is not None 

1769 if ( 

1770 hint.version != version 

1771 or hint.suite.name != suite_name 

1772 or (hint.architecture != arch and hint.architecture != "source") 

1773 ): 

1774 self.logger.info( 

1775 "hint mismatch: %s %s %s", version, arch, suite_name 

1776 ) 

1777 mismatches = True 

1778 else: 

1779 unblocked[m.group(2)] = hint.user 

1780 excuse.add_hint(hint) 

1781 else: 

1782 # block(-*) hint: only accepts a source, so this will 

1783 # always match 

1784 blocked[m.group(2)] = hint.user 

1785 excuse.add_hint(hint) 

1786 

1787 if "block" not in blocked and is_primary: 

1788 # if there is a specific block hint for this package, we don't 

1789 # check for the general hints 

1790 

1791 if self.options.distribution == "debian": 1791 ↛ 1798line 1791 didn't jump to line 1798 because the condition on line 1791 was always true

1792 url = "https://release.debian.org/testing/freeze_policy.html" 

1793 tooltip = ( 

1794 'Follow the <a href="%s">freeze policy</a> when applying for an unblock' 

1795 % url 

1796 ) 

1797 

1798 if "source" in self._blockall: 

1799 blocked["block"] = self._blockall["source"].user 

1800 excuse.add_hint(self._blockall["source"]) 

1801 elif ( 

1802 "new-source" in self._blockall 

1803 and src not in self.suite_info.target_suite.sources 

1804 ): 

1805 blocked["block"] = self._blockall["new-source"].user 

1806 excuse.add_hint(self._blockall["new-source"]) 

1807 # no tooltip: new sources will probably not be accepted anyway 

1808 block_info["block"] = "blocked by {}: is not in {}".format( 

1809 self._blockall["new-source"].user, 

1810 self.suite_info.target_suite.name, 

1811 ) 

1812 elif "key" in self._blockall and src in self._key_packages: 

1813 blocked["block"] = self._blockall["key"].user 

1814 excuse.add_hint(self._blockall["key"]) 

1815 block_info["block"] = "blocked by {}: is a key package ({})".format( 

1816 self._blockall["key"].user, 

1817 tooltip, 

1818 ) 

1819 elif "no-autopkgtest" in self._blockall: 

1820 if excuse.autopkgtest_results == {"PASS"}: 

1821 if not blocked: 1821 ↛ 1847line 1821 didn't jump to line 1847 because the condition on line 1821 was always true

1822 excuse.addinfo("not blocked: has successful autopkgtest") 

1823 else: 

1824 blocked["block"] = self._blockall["no-autopkgtest"].user 

1825 excuse.add_hint(self._blockall["no-autopkgtest"]) 

1826 if not excuse.autopkgtest_results: 

1827 block_info["block"] = ( 

1828 "blocked by %s: does not have autopkgtest (%s)" 

1829 % ( 

1830 self._blockall["no-autopkgtest"].user, 

1831 tooltip, 

1832 ) 

1833 ) 

1834 else: 

1835 block_info["block"] = ( 

1836 "blocked by %s: autopkgtest not fully successful (%s)" 

1837 % ( 

1838 self._blockall["no-autopkgtest"].user, 

1839 tooltip, 

1840 ) 

1841 ) 

1842 

1843 elif not is_primary: 

1844 blocked["block"] = suite_name 

1845 excuse.needs_approval = True 

1846 

1847 for block_cmd in blocked: 

1848 unblock_cmd = "un" + block_cmd 

1849 if block_cmd in unblocked: 

1850 if is_primary or block_cmd == "block-udeb": 

1851 excuse.addinfo( 

1852 "Ignoring %s request by %s, due to %s request by %s" 

1853 % ( 

1854 block_cmd, 

1855 blocked[block_cmd], 

1856 unblock_cmd, 

1857 unblocked[block_cmd], 

1858 ) 

1859 ) 

1860 else: 

1861 excuse.addinfo("Approved by %s" % (unblocked[block_cmd])) 

1862 else: 

1863 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL 

1864 if is_primary or block_cmd == "block-udeb": 

1865 # redirect people to d-i RM for udeb things: 

1866 if block_cmd == "block-udeb": 

1867 tooltip = "please contact the d-i release manager if an update is needed" 

1868 if block_cmd in block_info: 

1869 info = block_info[block_cmd] 

1870 else: 

1871 info = ( 

1872 "Not touching package due to {} request by {} ({})".format( 

1873 block_cmd, 

1874 blocked[block_cmd], 

1875 tooltip, 

1876 ) 

1877 ) 

1878 excuse.add_verdict_info(verdict, info) 

1879 else: 

1880 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM") 

1881 excuse.addreason("block") 

1882 if mismatches: 

1883 excuse.add_detailed_info( 

1884 "Some hints for %s do not match this item" % src 

1885 ) 

1886 return verdict 

1887 

1888 def apply_src_policy_impl( 

1889 self, 

1890 block_info: dict[str, Any], 

1891 item: MigrationItem, 

1892 source_data_tdist: SourcePackage | None, 

1893 source_data_srcdist: SourcePackage, 

1894 excuse: "Excuse", 

1895 ) -> PolicyVerdict: 

1896 return self._check_blocked(item, "source", source_data_srcdist.version, excuse) 

1897 

1898 def apply_srcarch_policy_impl( 

1899 self, 

1900 block_info: dict[str, Any], 

1901 item: MigrationItem, 

1902 arch: str, 

1903 source_data_tdist: SourcePackage | None, 

1904 source_data_srcdist: SourcePackage, 

1905 excuse: "Excuse", 

1906 ) -> PolicyVerdict: 

1907 return self._check_blocked(item, arch, source_data_srcdist.version, excuse) 

1908 

1909 

1910class BuiltOnBuilddPolicy(AbstractBasePolicy): 

1911 

1912 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1913 super().__init__( 

1914 "builtonbuildd", 

1915 options, 

1916 suite_info, 

1917 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1918 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1919 ) 

1920 self._builtonbuildd: dict[str, Any] = { 

1921 "signerinfo": None, 

1922 } 

1923 

1924 def register_hints(self, hint_parser: HintParser) -> None: 

1925 hint_parser.register_hint_type( 

1926 HintType( 

1927 "allow-archall-maintainer-upload", 

1928 versioned=HintAnnotate.FORBIDDEN, 

1929 ) 

1930 ) 

1931 

1932 def initialise(self, britney: "Britney") -> None: 

1933 super().initialise(britney) 

1934 try: 

1935 filename_signerinfo = os.path.join(self.state_dir, "signers.json") 

1936 except AttributeError as e: # pragma: no cover 

1937 raise RuntimeError( 

1938 "Please set STATE_DIR in the britney configuration" 

1939 ) from e 

1940 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo) 

1941 

1942 def apply_srcarch_policy_impl( 

1943 self, 

1944 buildd_info: dict[str, Any], 

1945 item: MigrationItem, 

1946 arch: str, 

1947 source_data_tdist: SourcePackage | None, 

1948 source_data_srcdist: SourcePackage, 

1949 excuse: "Excuse", 

1950 ) -> PolicyVerdict: 

1951 verdict = PolicyVerdict.PASS 

1952 signers = self._builtonbuildd["signerinfo"] 

1953 

1954 if "signed-by" not in buildd_info: 

1955 buildd_info["signed-by"] = {} 

1956 

1957 source_suite = item.suite 

1958 

1959 # horribe hard-coding, but currently, we don't keep track of the 

1960 # component when loading the packages files 

1961 component = "main" 

1962 # we use the source component, because a binary in contrib can 

1963 # belong to a source in main 

1964 section = source_data_srcdist.section 

1965 if section.find("/") > -1: 

1966 component = section.split("/")[0] 

1967 

1968 packages_s_a = source_suite.binaries[arch] 

1969 assert self.hints is not None 

1970 

1971 for pkg_id in sorted( 

1972 x for x in source_data_srcdist.binaries if x.architecture == arch 

1973 ): 

1974 pkg_name = pkg_id.package_name 

1975 binary_u = packages_s_a[pkg_name] 

1976 pkg_arch = binary_u.architecture 

1977 

1978 if binary_u.source_version != source_data_srcdist.version: 1978 ↛ 1979line 1978 didn't jump to line 1979 because the condition on line 1978 was never true

1979 continue 

1980 

1981 if item.architecture != "source" and pkg_arch == "all": 

1982 # we don't care about the existing arch: all binaries when 

1983 # checking a binNMU item, because the arch: all binaries won't 

1984 # migrate anyway 

1985 continue 

1986 

1987 signer = None 

1988 uid = None 

1989 uidinfo = "" 

1990 buildd_ok = False 

1991 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

1992 try: 

1993 signer = signers[pkg_name][pkg_id.version][pkg_arch] 

1994 if signer["buildd"]: 

1995 buildd_ok = True 

1996 uid = signer["uid"] 

1997 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}" 

1998 except KeyError: 

1999 self.logger.info( 

2000 "signer info for %s %s (%s) on %s not found " 

2001 % (pkg_name, binary_u.version, pkg_arch, arch) 

2002 ) 

2003 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch) 

2004 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

2005 if not buildd_ok: 

2006 if component != "main": 

2007 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2007 ↛ 2011line 2007 didn't jump to line 2011 because the condition on line 2007 was always true

2008 excuse.add_detailed_info( 

2009 f"{uidinfo}, but package in {component}" 

2010 ) 

2011 buildd_ok = True 

2012 elif pkg_arch == "all": 

2013 allow_hints = self.hints.search( 

2014 "allow-archall-maintainer-upload", package=item.package 

2015 ) 

2016 if allow_hints: 

2017 buildd_ok = True 

2018 verdict = PolicyVerdict.worst_of( 

2019 verdict, PolicyVerdict.PASS_HINTED 

2020 ) 

2021 if pkg_arch not in buildd_info["signed-by"]: 

2022 excuse.addinfo( 

2023 "%s, but whitelisted by %s" 

2024 % (uidinfo, allow_hints[0].user) 

2025 ) 

2026 if not buildd_ok: 

2027 verdict = failure_verdict 

2028 if pkg_arch not in buildd_info["signed-by"]: 

2029 if pkg_arch == "all": 

2030 uidinfo += ( 

2031 ", a new source-only upload is needed to allow migration" 

2032 ) 

2033 excuse.add_verdict_info( 

2034 verdict, "Not built on buildd: %s" % (uidinfo) 

2035 ) 

2036 

2037 if ( 2037 ↛ 2041line 2037 didn't jump to line 2041

2038 pkg_arch in buildd_info["signed-by"] 

2039 and buildd_info["signed-by"][pkg_arch] != uid 

2040 ): 

2041 self.logger.info( 

2042 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed" 

2043 % ( 

2044 pkg_name, 

2045 binary_u.source, 

2046 binary_u.source_version, 

2047 pkg_arch, 

2048 uid, 

2049 buildd_info["signed-by"][pkg_arch], 

2050 ) 

2051 ) 

2052 

2053 buildd_info["signed-by"][pkg_arch] = uid 

2054 

2055 return verdict 

2056 

2057 def _read_signerinfo(self, filename: str) -> dict[str, Any]: 

2058 signerinfo: dict[str, Any] = {} 

2059 self.logger.info("Loading signer info from %s", filename) 

2060 with open(filename) as fd: 2060 ↛ exitline 2060 didn't return from function '_read_signerinfo' because the return on line 2062 wasn't executed

2061 if os.fstat(fd.fileno()).st_size < 1: 2061 ↛ 2062line 2061 didn't jump to line 2062 because the condition on line 2061 was never true

2062 return signerinfo 

2063 signerinfo = json.load(fd) 

2064 

2065 return signerinfo 

2066 

2067 

2068class ImplicitDependencyPolicy(AbstractBasePolicy): 

2069 """Implicit Dependency policy 

2070 

2071 Upgrading a package pkg-a can break the installability of a package pkg-b. 

2072 A newer version (or the removal) of pkg-b might fix the issue. In that 

2073 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only 

2074 migrate if pkg-b also migrates. 

2075 

2076 This policy tries to discover a few common cases, and adds the relevant 

2077 info to the excuses. If another item is needed to fix the 

2078 uninstallability, a dependency is added. If no newer item can fix it, this 

2079 excuse will be blocked. 

2080 

2081 Note that the migration step will check the installability of every 

2082 package, so this policy doesn't need to handle every corner case. It 

2083 must, however, make sure that no excuse is unnecessarily blocked. 

2084 

2085 Some cases that should be detected by this policy: 

2086 

2087 * pkg-a is upgraded from 1.0-1 to 2.0-1, while 

2088 pkg-b has "Depends: pkg-a (<< 2.0)" 

2089 This typically happens if pkg-b has a strict dependency on pkg-a because 

2090 it uses some non-stable internal interface (examples are glibc, 

2091 binutils, python3-defaults, ...) 

2092 

2093 * pkg-a is upgraded from 1.0-1 to 2.0-1, and 

2094 pkg-a 1.0-1 has "Provides: provides-1", 

2095 pkg-a 2.0-1 has "Provides: provides-2", 

2096 pkg-b has "Depends: provides-1" 

2097 This typically happens when pkg-a has an interface that changes between 

2098 versions, and a virtual package is used to identify the version of this 

2099 interface (e.g. perl-api-x.y) 

2100 

2101 """ 

2102 

2103 _pkg_universe: "BinaryPackageUniverse" 

2104 _all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

2105 _allow_uninst: dict[str, set[str | None]] 

2106 _nobreakall_arches: list[str] 

2107 

2108 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2109 super().__init__( 

2110 "implicit-deps", 

2111 options, 

2112 suite_info, 

2113 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2114 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2115 ) 

2116 

2117 def initialise(self, britney: "Britney") -> None: 

2118 super().initialise(britney) 

2119 self._pkg_universe = britney.pkg_universe 

2120 self._all_binaries = britney.all_binaries 

2121 self._smooth_updates = britney.options.smooth_updates 

2122 self._nobreakall_arches = self.options.nobreakall_arches 

2123 self._new_arches = self.options.new_arches 

2124 self._break_arches = self.options.break_arches 

2125 self._allow_uninst = britney.allow_uninst 

2126 self._outofsync_arches = self.options.outofsync_arches 

2127 

2128 def can_be_removed(self, pkg: BinaryPackage) -> bool: 

2129 src = pkg.source 

2130 target_suite = self.suite_info.target_suite 

2131 

2132 # TODO these conditions shouldn't be hardcoded here 

2133 # ideally, we would be able to look up excuses to see if the removal 

2134 # is in there, but in the current flow, this policy is called before 

2135 # all possible excuses exist, so there is no list for us to check 

2136 

2137 if src not in self.suite_info.primary_source_suite.sources: 

2138 # source for pkg not in unstable: candidate for removal 

2139 return True 

2140 

2141 source_t = target_suite.sources[src] 

2142 assert self.hints is not None 

2143 for hint in self.hints.search("remove", package=src, version=source_t.version): 

2144 # removal hint for the source in testing: candidate for removal 

2145 return True 

2146 

2147 if target_suite.is_cruft(pkg): 

2148 # if pkg is cruft in testing, removal will be tried 

2149 return True 

2150 

2151 # the case were the newer version of the source no longer includes the 

2152 # binary (or includes a cruft version of the binary) will be handled 

2153 # separately (in that case there might be an implicit dependency on 

2154 # the newer source) 

2155 

2156 return False 

2157 

2158 def should_skip_rdep( 

2159 self, pkg: BinaryPackage, source_name: str, myarch: str 

2160 ) -> bool: 

2161 target_suite = self.suite_info.target_suite 

2162 

2163 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id): 

2164 # it is not in the target suite, migration cannot break anything 

2165 return True 

2166 

2167 if pkg.source == source_name: 

2168 # if it is built from the same source, it will be upgraded 

2169 # with the source 

2170 return True 

2171 

2172 if self.can_be_removed(pkg): 

2173 # could potentially be removed, so if that happens, it won't be 

2174 # broken 

2175 return True 

2176 

2177 if pkg.architecture == "all" and myarch not in self._nobreakall_arches: 

2178 # arch all on non nobreakarch is allowed to become uninstallable 

2179 return True 

2180 

2181 if pkg.pkg_id.package_name in self._allow_uninst[myarch]: 

2182 # there is a hint to allow this binary to become uninstallable 

2183 return True 

2184 

2185 if not target_suite.is_installable(pkg.pkg_id): 

2186 # it is already uninstallable in the target suite, migration 

2187 # cannot break anything 

2188 return True 

2189 

2190 return False 

2191 

2192 def breaks_installability( 

2193 self, 

2194 pkg_id_t: BinaryPackageId, 

2195 pkg_id_s: BinaryPackageId | None, 

2196 pkg_to_check: BinaryPackageId, 

2197 ) -> bool: 

2198 """ 

2199 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of 

2200 pkg_to_check. 

2201 

2202 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to 

2203 None. 

2204 """ 

2205 

2206 pkg_universe = self._pkg_universe 

2207 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check) 

2208 

2209 for dep in pkg_universe.dependencies_of(pkg_to_check): 

2210 if pkg_id_t not in dep: 

2211 # this depends doesn't have pkg_id_t as alternative, so 

2212 # upgrading pkg_id_t cannot break this dependency clause 

2213 continue 

2214 

2215 # We check all the alternatives for this dependency, to find one 

2216 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s 

2217 found_alternative = False 

2218 for d in dep: 

2219 if d in negative_deps: 

2220 # If this alternative dependency conflicts with 

2221 # pkg_to_check, it cannot be used to satisfy the 

2222 # dependency. 

2223 # This commonly happens when breaks are added to pkg_id_s. 

2224 continue 

2225 

2226 if d.package_name != pkg_id_t.package_name: 

2227 # a binary different from pkg_id_t can satisfy the dep, so 

2228 # upgrading pkg_id_t won't break this dependency 

2229 found_alternative = True 

2230 break 

2231 

2232 if d != pkg_id_s: 

2233 # We want to know the impact of the upgrade of 

2234 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the 

2235 # target suite, any other version of this binary will 

2236 # not be there, so it cannot satisfy this dependency. 

2237 # This includes pkg_id_t, but also other versions. 

2238 continue 

2239 

2240 # pkg_id_s can satisfy the dep 

2241 found_alternative = True 

2242 

2243 if not found_alternative: 

2244 return True 

2245 return False 

2246 

2247 def check_upgrade( 

2248 self, 

2249 pkg_id_t: BinaryPackageId, 

2250 pkg_id_s: BinaryPackageId | None, 

2251 source_name: str, 

2252 myarch: str, 

2253 broken_binaries: set[str], 

2254 excuse: "Excuse", 

2255 ) -> PolicyVerdict: 

2256 verdict = PolicyVerdict.PASS 

2257 

2258 pkg_universe = self._pkg_universe 

2259 all_binaries = self._all_binaries 

2260 

2261 # check all rdeps of the package in testing 

2262 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t) 

2263 

2264 for rdep_pkg in sorted(rdeps_t): 

2265 rdep_p = all_binaries[rdep_pkg] 

2266 

2267 # check some cases where the rdep won't become uninstallable, or 

2268 # where we don't care if it does 

2269 if self.should_skip_rdep(rdep_p, source_name, myarch): 

2270 continue 

2271 

2272 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg): 

2273 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg, 

2274 # there is no implicit dependency 

2275 continue 

2276 

2277 # The upgrade breaks the installability of the rdep. We need to 

2278 # find out if there is a newer version of the rdep that solves the 

2279 # uninstallability. If that is the case, there is an implicit 

2280 # dependency. If not, the upgrade will fail. 

2281 

2282 # check source versions 

2283 newer_versions = find_newer_binaries( 

2284 self.suite_info, rdep_p, add_source_for_dropped_bin=True 

2285 ) 

2286 good_newer_versions = set() 

2287 for npkg, suite in newer_versions: 

2288 if npkg.architecture == "source": 

2289 # When a newer version of the source package doesn't have 

2290 # the binary, we get the source as 'newer version'. In 

2291 # this case, the binary will not be uninstallable if the 

2292 # newer source migrates, because it is no longer there. 

2293 good_newer_versions.add(npkg) 

2294 continue 

2295 assert isinstance(npkg, BinaryPackageId) 

2296 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg): 

2297 good_newer_versions.add(npkg) 

2298 

2299 if good_newer_versions: 

2300 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch) 

2301 excuse.add_package_depends(spec, good_newer_versions) 

2302 else: 

2303 # no good newer versions: no possible solution 

2304 broken_binaries.add(rdep_pkg.name) 

2305 if pkg_id_s: 

2306 action = "migrating {} to {}".format( 

2307 pkg_id_s.name, 

2308 self.suite_info.target_suite.name, 

2309 ) 

2310 else: 

2311 action = "removing {} from {}".format( 

2312 pkg_id_t.name, 

2313 self.suite_info.target_suite.name, 

2314 ) 

2315 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format( 

2316 action, rdep_pkg.name 

2317 ) 

2318 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2319 excuse.add_verdict_info(verdict, info) 

2320 

2321 return verdict 

2322 

2323 def apply_srcarch_policy_impl( 

2324 self, 

2325 implicit_dep_info: dict[str, Any], 

2326 item: MigrationItem, 

2327 arch: str, 

2328 source_data_tdist: SourcePackage | None, 

2329 source_data_srcdist: SourcePackage, 

2330 excuse: "Excuse", 

2331 ) -> PolicyVerdict: 

2332 verdict = PolicyVerdict.PASS 

2333 

2334 if not source_data_tdist: 

2335 # this item is not currently in testing: no implicit dependency 

2336 return verdict 

2337 

2338 if excuse.hasreason("missingbuild"): 

2339 # if the build is missing, the policy would treat this as if the 

2340 # binaries would be removed, which would give incorrect (and 

2341 # confusing) info 

2342 info = "missing build, not checking implicit dependencies on %s" % (arch) 

2343 excuse.add_detailed_info(info) 

2344 return verdict 

2345 

2346 source_suite = item.suite 

2347 source_name = item.package 

2348 target_suite = self.suite_info.target_suite 

2349 all_binaries = self._all_binaries 

2350 

2351 # we check all binaries for this excuse that are currently in testing 

2352 relevant_binaries = [ 

2353 x 

2354 for x in source_data_tdist.binaries 

2355 if (arch == "source" or x.architecture == arch) 

2356 and x.package_name in target_suite.binaries[x.architecture] 

2357 and x.architecture not in self._new_arches 

2358 and x.architecture not in self._break_arches 

2359 and x.architecture not in self._outofsync_arches 

2360 ] 

2361 

2362 broken_binaries: set[str] = set() 

2363 

2364 assert self.hints is not None 

2365 for pkg_id_t in sorted(relevant_binaries): 

2366 mypkg = pkg_id_t.package_name 

2367 myarch = pkg_id_t.architecture 

2368 binaries_t_a = target_suite.binaries[myarch] 

2369 binaries_s_a = source_suite.binaries[myarch] 

2370 

2371 if target_suite.is_cruft(all_binaries[pkg_id_t]): 

2372 # this binary is cruft in testing: it will stay around as long 

2373 # as necessary to satisfy dependencies, so we don't need to 

2374 # care 

2375 continue 

2376 

2377 if mypkg in binaries_s_a: 

2378 mybin = binaries_s_a[mypkg] 

2379 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id 

2380 if mybin.source != source_name: 

2381 # hijack: this is too complicated to check, so we ignore 

2382 # it (the migration code will check the installability 

2383 # later anyway) 

2384 pass 

2385 elif mybin.source_version != source_data_srcdist.version: 

2386 # cruft in source suite: pretend the binary doesn't exist 

2387 pkg_id_s = None 

2388 elif pkg_id_t == pkg_id_s: 

2389 # same binary (probably arch: all from a binNMU): 

2390 # 'upgrading' doesn't change anything, for this binary, so 

2391 # it won't break anything 

2392 continue 

2393 else: 

2394 pkg_id_s = None 

2395 

2396 if not pkg_id_s and is_smooth_update_allowed( 

2397 binaries_t_a[mypkg], self._smooth_updates, self.hints 

2398 ): 

2399 # the binary isn't in the new version (or is cruft there), and 

2400 # smooth updates are allowed: the binary can stay around if 

2401 # that is necessary to satisfy dependencies, so we don't need 

2402 # to check it 

2403 continue 

2404 

2405 if ( 

2406 not pkg_id_s 

2407 and source_data_tdist.version == source_data_srcdist.version 

2408 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE 

2409 and binaries_t_a[mypkg].architecture == "all" 

2410 ): 

2411 # we're very probably migrating a binNMU built in tpu where the arch:all 

2412 # binaries were not copied to it as that's not needed. This policy could 

2413 # needlessly block. 

2414 continue 

2415 

2416 v = self.check_upgrade( 

2417 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse 

2418 ) 

2419 verdict = PolicyVerdict.worst_of(verdict, v) 

2420 

2421 # each arch is processed separately, so if we already have info from 

2422 # other archs, we need to merge the info from this arch 

2423 broken_old = set() 

2424 if "implicit-deps" not in implicit_dep_info: 

2425 implicit_dep_info["implicit-deps"] = {} 

2426 else: 

2427 broken_old = set(implicit_dep_info["implicit-deps"]["broken-binaries"]) 

2428 

2429 implicit_dep_info["implicit-deps"]["broken-binaries"] = sorted( 

2430 broken_old | broken_binaries 

2431 ) 

2432 

2433 return verdict 

2434 

2435 

2436class ReverseRemovalPolicy(AbstractBasePolicy): 

2437 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2438 super().__init__( 

2439 "reverseremoval", 

2440 options, 

2441 suite_info, 

2442 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2443 ) 

2444 

2445 def register_hints(self, hint_parser: HintParser) -> None: 

2446 hint_parser.register_hint_type(HintType("ignore-reverse-remove")) 

2447 

2448 def initialise(self, britney: "Britney") -> None: 

2449 super().initialise(britney) 

2450 

2451 pkg_universe = britney.pkg_universe 

2452 source_suites = britney.suite_info.source_suites 

2453 target_suite = britney.suite_info.target_suite 

2454 

2455 # Build set of the sources of reverse (Build-) Depends 

2456 assert self.hints is not None 

2457 hints = self.hints.search("remove") 

2458 

2459 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set) 

2460 for hint in hints: 

2461 for item in hint.packages: 

2462 # I think we don't need to look at the target suite 

2463 for src_suite in source_suites: 

2464 try: 

2465 my_bins = set(src_suite.sources[item.uvname].binaries) 

2466 except KeyError: 

2467 continue 

2468 compute_reverse_tree(pkg_universe, my_bins) 

2469 for this_bin in my_bins: 

2470 rev_bin.setdefault(this_bin, set()).add(item.uvname) 

2471 

2472 rev_src: dict[str, set[str]] = defaultdict(set) 

2473 for bin_pkg, reasons in rev_bin.items(): 

2474 # If the pkg is in the target suite, there's nothing this 

2475 # policy wants to do. 

2476 if target_suite.is_pkg_in_the_suite(bin_pkg): 

2477 continue 

2478 that_bin = britney.all_binaries[bin_pkg] 

2479 bin_src = that_bin.source + "/" + that_bin.source_version 

2480 rev_src.setdefault(bin_src, set()).update(reasons) 

2481 self._block_src_for_rm_hint = rev_src 

2482 

2483 def apply_src_policy_impl( 

2484 self, 

2485 rev_remove_info: dict[str, Any], 

2486 item: MigrationItem, 

2487 source_data_tdist: SourcePackage | None, 

2488 source_data_srcdist: SourcePackage, 

2489 excuse: "Excuse", 

2490 ) -> PolicyVerdict: 

2491 verdict = PolicyVerdict.PASS 

2492 

2493 if item.name in self._block_src_for_rm_hint: 

2494 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name])) 

2495 assert self.hints is not None 

2496 ignore_hints = self.hints.search( 

2497 "ignore-reverse-remove", package=item.uvname, version=item.version 

2498 ) 

2499 excuse.addreason("reverseremoval") 

2500 if ignore_hints: 2500 ↛ 2501line 2500 didn't jump to line 2501 because the condition on line 2500 was never true

2501 excuse.addreason("ignore-reverse-remove") 

2502 excuse.addinfo( 

2503 "Should block migration because of remove hint for %s, but forced by %s" 

2504 % (reason, ignore_hints[0].user) 

2505 ) 

2506 verdict = PolicyVerdict.PASS_HINTED 

2507 else: 

2508 excuse.addinfo("Remove hint for (transitive) dependency: %s" % reason) 

2509 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2510 

2511 return verdict 

2512 

2513 

2514class ReproduciblePolicy(AbstractBasePolicy): 

2515 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2516 super().__init__( 

2517 "reproducible", 

2518 options, 

2519 suite_info, 

2520 {SuiteClass.PRIMARY_SOURCE_SUITE}, 

2521 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2522 ) 

2523 self._reproducible: dict[str, Any] = { 

2524 "source": {}, 

2525 "target": {}, 

2526 } 

2527 

2528 # Default values for this policy's options 

2529 parse_option(options, "repro_success_bounty", default=0, to_int=True) 

2530 parse_option(options, "repro_regression_penalty", default=0, to_int=True) 

2531 parse_option(options, "repro_url") 

2532 parse_option(options, "repro_retry_url") 

2533 parse_option(options, "repro_components") 

2534 

2535 def register_hints(self, hint_parser: HintParser) -> None: 

2536 hint_parser.register_hint_type( 

2537 HintType("ignore-reproducible", architectured=HintAnnotate.OPTIONAL) 

2538 ) 

2539 

2540 def initialise(self, britney: "Britney") -> None: 

2541 super().initialise(britney) 

2542 source_suite = self.suite_info.primary_source_suite 

2543 target_suite = self.suite_info.target_suite 

2544 try: 

2545 filename = os.path.join(self.state_dir, "reproducible.json") 

2546 except AttributeError as e: # pragma: no cover 

2547 raise RuntimeError( 

2548 "Please set STATE_DIR in the britney configuration" 

2549 ) from e 

2550 

2551 self._reproducible = self._read_repro_status( 

2552 filename, 

2553 source={source_suite.name, source_suite.codename}, 

2554 target={target_suite.name, target_suite.codename}, 

2555 ) 

2556 

2557 def apply_srcarch_policy_impl( 

2558 self, 

2559 reproducible_info: dict[str, Any], 

2560 item: MigrationItem, 

2561 arch: str, 

2562 source_data_tdist: SourcePackage | None, 

2563 source_data_srcdist: SourcePackage, 

2564 excuse: "Excuse", 

2565 ) -> PolicyVerdict: 

2566 verdict = PolicyVerdict.PASS 

2567 

2568 # we don't want to apply this policy (yet) on binNMUs 

2569 if item.architecture != "source": 

2570 return verdict 

2571 

2572 # we're not supposed to judge on this arch 

2573 if arch not in self.options.repro_arches: 

2574 return verdict 

2575 

2576 # bail out if this arch has no packages for this source (not build 

2577 # here) 

2578 if arch not in excuse.packages: 

2579 return verdict 

2580 

2581 # horrible hard-coding, but currently, we don't keep track of the 

2582 # component when loading the packages files 

2583 component = "main" 

2584 if "/" in (section := source_data_srcdist.section): 

2585 component = section.split("/")[0] 

2586 

2587 if ( 

2588 self.options.repro_components 

2589 and component not in self.options.repro_components 

2590 ): 

2591 return verdict 

2592 

2593 source_name = item.package 

2594 try: 

2595 tar_res = self._reproducible["target"][arch] 

2596 src_res = self._reproducible["source"][arch] 

2597 except KeyError: 

2598 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2599 msg = "No reproducible data available at all for %s" % arch 

2600 excuse.add_verdict_info(verdict, msg) 

2601 return verdict 

2602 

2603 if source_data_tdist is None: 

2604 target_suite_state = "new" 

2605 elif source_name not in tar_res: 

2606 target_suite_state = "unknown" 

2607 elif tar_res[source_name]["version"] == source_data_tdist.version: 

2608 target_suite_state = tar_res[source_name]["status"] 

2609 else: 

2610 target_suite_state = "stale" 

2611 

2612 if source_name in src_res and src_res[source_name]["version"] == item.version: 

2613 source_suite_state = src_res[source_name]["status"] 

2614 else: 

2615 source_suite_state = "unknown" 

2616 

2617 # status of ['E404', 'FTBFS', 'FTBR', 'NFU', 'blacklisted', 'depwait', 

2618 # 'reproducible', 'timeout'] with ['new', 'stale', 'unknown'] 

2619 wait_states = ("E404", "depwait", "stale", "timeout", "unknown") 

2620 no_build_states = ("FTBFS", "NFU", "blacklisted") 

2621 

2622 # if this package doesn't build on this architecture, we don't need to 

2623 # judge it 

2624 # FTBFS: Fails to build from source on r-b infra 

2625 # NFU: the package explicitly doesn't support building on this arch 

2626 # blacklisted: per package per arch per suite 

2627 if source_suite_state in no_build_states: 

2628 return verdict 

2629 # Assume depwait in the source suite only are intermittent (might not 

2630 # be true, e.g. with new build depends) 

2631 if source_suite_state == target_suite_state and target_suite_state == "depwait": 

2632 return verdict 

2633 

2634 if self.options.repro_url: 

2635 url = self.options.repro_url.format(package=quote(source_name), arch=arch) 

2636 url_html = ' - <a href="%s">info</a>' % url 

2637 if self.options.repro_retry_url: 

2638 url_html += ( 

2639 ' <a href="%s">♻ </a>' 

2640 % self.options.repro_retry_url.format( 

2641 package=quote(source_name), arch=arch 

2642 ) 

2643 ) 

2644 # When run on multiple archs, the last one "wins" 

2645 reproducible_info["reproducible-test-url"] = url 

2646 else: 

2647 url = None 

2648 url_html = "" 

2649 

2650 eligible_for_bounty = False 

2651 if source_suite_state == "reproducible": 

2652 verdict = PolicyVerdict.PASS 

2653 msg = f"Reproducible on {arch}{url_html}" 

2654 reproducible_info.setdefault("test-results", []).append( 

2655 "reproducible on %s" % arch 

2656 ) 

2657 eligible_for_bounty = True 

2658 elif source_suite_state == "FTBR": 

2659 if target_suite_state == "new": 

2660 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2661 msg = f"New but not reproducible on {arch}{url_html}" 

2662 reproducible_info.setdefault("test-results", []).append( 

2663 "new but not reproducible on %s" % arch 

2664 ) 

2665 elif target_suite_state in wait_states: 

2666 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2667 msg = "Waiting for reproducibility reference results on {}{}".format( 

2668 arch, 

2669 url_html, 

2670 ) 

2671 reproducible_info.setdefault("test-results", []).append( 

2672 "waiting-for-reference-results on %s" % arch 

2673 ) 

2674 elif target_suite_state == "reproducible": 

2675 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2676 msg = f"Reproducibility regression on {arch}{url_html}" 

2677 reproducible_info.setdefault("test-results", []).append( 

2678 "regression on %s" % arch 

2679 ) 

2680 elif target_suite_state == "FTBR": 

2681 verdict = PolicyVerdict.PASS 

2682 msg = "Ignoring non-reproducibility on {} (not a regression){}".format( 

2683 arch, 

2684 url_html, 

2685 ) 

2686 reproducible_info.setdefault("test-results", []).append( 

2687 "not reproducible on %s" % arch 

2688 ) 

2689 else: 

2690 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2691 msg = "No reference result, but not reproducibility on {}{}".format( 

2692 arch, 

2693 url_html, 

2694 ) 

2695 reproducible_info.setdefault("test-results", []).append( 

2696 f"reference {target_suite_state} on {arch}" 

2697 ) 

2698 elif source_suite_state in wait_states: 

2699 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2700 msg = f"Waiting for reproducibility test results on {arch}{url_html}" 

2701 reproducible_info.setdefault("test-results", []).append( 

2702 "waiting-for-test-results on %s" % arch 

2703 ) 

2704 else: 

2705 raise KeyError("Unhandled reproducibility state %s" % source_suite_state) 

2706 

2707 if verdict.is_rejected: 

2708 assert self.hints is not None 

2709 for hint_arch in ("source", arch): 

2710 for ignore_hint in self.hints.search( 

2711 "ignore-reproducible", 

2712 package=source_name, 

2713 version=source_data_srcdist.version, 

2714 architecture=hint_arch, 

2715 ): 

2716 verdict = PolicyVerdict.PASS_HINTED 

2717 reproducible_info.setdefault("ignored-reproducible", {}).setdefault( 

2718 arch, {} 

2719 ).setdefault("issued-by", []).append(ignore_hint.user) 

2720 excuse.addinfo( 

2721 "Ignoring reproducibility issue on %s as requested " 

2722 "by %s" % (arch, ignore_hint.user) 

2723 ) 

2724 break 

2725 

2726 if self.options.repro_success_bounty and eligible_for_bounty: 

2727 excuse.add_bounty("reproducibility", self.options.repro_success_bounty) 

2728 

2729 if self.options.repro_regression_penalty and verdict in { 

2730 PolicyVerdict.REJECTED_PERMANENTLY, 

2731 PolicyVerdict.REJECTED_TEMPORARILY, 

2732 }: 

2733 if self.options.repro_regression_penalty > 0: 

2734 excuse.add_penalty( 

2735 "reproducibility", self.options.repro_regression_penalty 

2736 ) 

2737 # In case we give penalties instead of blocking, we must always pass 

2738 verdict = PolicyVerdict.PASS 

2739 

2740 if verdict.is_rejected: 

2741 excuse.add_verdict_info(verdict, msg) 

2742 else: 

2743 excuse.addinfo(msg) 

2744 

2745 return verdict 

2746 

2747 def _read_repro_status( 

2748 self, filename: str, source: set[str], target: set[str] 

2749 ) -> dict[str, dict[str, str]]: 

2750 summary = self._reproducible 

2751 self.logger.info("Loading reproducibility report from %s", filename) 

2752 with open(filename) as fd: 

2753 if os.fstat(fd.fileno()).st_size < 1: 

2754 return summary 

2755 data = json.load(fd) 

2756 

2757 for result in data: 

2758 if result["suite"] in source: 

2759 summary["source"].setdefault(result["architecture"], {})[ 

2760 result["package"] 

2761 ] = result 

2762 if result["suite"] in target: 

2763 summary["target"].setdefault(result["architecture"], {})[ 

2764 result["package"] 

2765 ] = result 

2766 

2767 return summary