Coverage for britney2/policies/policy.py: 85%

1255 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-08 19:15 +0000

1import json 

2import logging 

3import optparse 

4import os 

5import re 

6import sys 

7import time 

8from abc import ABC, abstractmethod 

9from collections import defaultdict 

10from collections.abc import Callable, Container 

11from enum import IntEnum, unique 

12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast 

13from urllib.parse import quote 

14 

15import apt_pkg 

16import yaml 

17 

18from britney2 import ( 

19 BinaryPackage, 

20 BinaryPackageId, 

21 DependencyType, 

22 PackageId, 

23 SourcePackage, 

24 Suite, 

25 SuiteClass, 

26 Suites, 

27 TargetSuite, 

28) 

29from britney2.excusedeps import DependencySpec 

30from britney2.hints import ( 

31 Hint, 

32 HintAnnotate, 

33 HintCollection, 

34 HintParser, 

35 HintType, 

36 PolicyHintParserProto, 

37) 

38from britney2.inputs.suiteloader import SuiteContentLoader 

39from britney2.migrationitem import MigrationItem, MigrationItemFactory 

40from britney2.policies import ApplySrcPolicy, PolicyVerdict 

41from britney2.utils import ( 

42 GetDependencySolversProto, 

43 compute_reverse_tree, 

44 filter_out_faux, 

45 find_newer_binaries, 

46 get_dependency_solvers, 

47 is_smooth_update_allowed, 

48 parse_option, 

49) 

50 

51if TYPE_CHECKING: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 from ..britney import Britney 

53 from ..excuse import Excuse 

54 from ..installability.universe import BinaryPackageUniverse 

55 

56 

57class PolicyLoadRequest: 

58 __slots__ = ("_options_name", "_default_value", "_policy_constructor") 

59 

60 def __init__( 

61 self, 

62 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

63 options_name: str | None, 

64 default_value: bool, 

65 ) -> None: 

66 self._policy_constructor = policy_constructor 

67 self._options_name = options_name 

68 self._default_value = default_value 

69 

70 def is_enabled(self, options: optparse.Values) -> bool: 

71 if self._options_name is None: 

72 assert self._default_value 

73 return True 

74 actual_value = getattr(options, self._options_name, None) 

75 if actual_value is None: 

76 return self._default_value 

77 return actual_value.lower() in ("yes", "y", "true", "t") 

78 

79 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy": 

80 return self._policy_constructor(options, suite_info) 

81 

82 @classmethod 

83 def always_load( 

84 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"] 

85 ) -> "PolicyLoadRequest": 

86 return cls(policy_constructor, None, True) 

87 

88 @classmethod 

89 def conditionally_load( 

90 cls, 

91 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

92 option_name: str, 

93 default_value: bool, 

94 ) -> "PolicyLoadRequest": 

95 return cls(policy_constructor, option_name, default_value) 

96 

97 

98class PolicyEngine: 

99 def __init__(self) -> None: 

100 self._policies: list["BasePolicy"] = [] 

101 

102 def add_policy(self, policy: "BasePolicy") -> None: 

103 self._policies.append(policy) 

104 

105 def load_policies( 

106 self, 

107 options: optparse.Values, 

108 suite_info: Suites, 

109 policy_load_requests: list[PolicyLoadRequest], 

110 ) -> None: 

111 for policy_load_request in policy_load_requests: 

112 if policy_load_request.is_enabled(options): 

113 self.add_policy(policy_load_request.load(options, suite_info)) 

114 

115 def register_policy_hints(self, hint_parser: HintParser) -> None: 

116 for policy in self._policies: 

117 policy.register_hints(hint_parser) 

118 

119 def initialise(self, britney: "Britney", hints: HintCollection) -> None: 

120 for policy in self._policies: 

121 policy.hints = hints 

122 policy.initialise(britney) 

123 

124 def save_state(self, britney: "Britney") -> None: 

125 for policy in self._policies: 

126 policy.save_state(britney) 

127 

128 def apply_src_policies( 

129 self, 

130 item: MigrationItem, 

131 source_t: SourcePackage | None, 

132 source_u: SourcePackage, 

133 excuse: "Excuse", 

134 ) -> None: 

135 excuse_verdict = excuse.policy_verdict 

136 source_suite = item.suite 

137 suite_class = source_suite.suite_class 

138 for policy in self._policies: 

139 pinfo: dict[str, Any] = {} 

140 policy_verdict = PolicyVerdict.NOT_APPLICABLE 

141 if suite_class in policy.applicable_suites: 

142 if policy.src_policy.run_arch: 

143 for arch in policy.options.architectures: 

144 v = policy.apply_srcarch_policy_impl( 

145 pinfo, item, arch, source_t, source_u, excuse 

146 ) 

147 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

148 if policy.src_policy.run_src: 

149 v = policy.apply_src_policy_impl( 

150 pinfo, item, source_t, source_u, excuse 

151 ) 

152 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

153 # The base policy provides this field, so the subclass should leave it blank 

154 assert "verdict" not in pinfo 

155 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

156 excuse.policy_info[policy.policy_id] = pinfo 

157 pinfo["verdict"] = policy_verdict.name 

158 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

159 excuse.policy_verdict = excuse_verdict 

160 

161 def apply_srcarch_policies( 

162 self, 

163 item: MigrationItem, 

164 arch: str, 

165 source_t: SourcePackage | None, 

166 source_u: SourcePackage, 

167 excuse: "Excuse", 

168 ) -> None: 

169 excuse_verdict = excuse.policy_verdict 

170 source_suite = item.suite 

171 suite_class = source_suite.suite_class 

172 for policy in self._policies: 

173 pinfo: dict[str, Any] = {} 

174 if suite_class in policy.applicable_suites: 

175 policy_verdict = policy.apply_srcarch_policy_impl( 

176 pinfo, item, arch, source_t, source_u, excuse 

177 ) 

178 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

179 # The base policy provides this field, so the subclass should leave it blank 

180 assert "verdict" not in pinfo 

181 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

182 excuse.policy_info[policy.policy_id] = pinfo 

183 pinfo["verdict"] = policy_verdict.name 

184 excuse.policy_verdict = excuse_verdict 

185 

186 

187class BasePolicy(ABC): 

188 britney: "Britney" 

189 policy_id: str 

190 hints: HintCollection | None 

191 applicable_suites: set[SuiteClass] 

192 src_policy: ApplySrcPolicy 

193 options: optparse.Values 

194 suite_info: Suites 

195 

196 def __init__( 

197 self, 

198 options: optparse.Values, 

199 suite_info: Suites, 

200 ) -> None: 

201 """The BasePolicy constructor 

202 

203 :param options: The options member of Britney with all the 

204 config values. 

205 """ 

206 

207 @property 

208 @abstractmethod 

209 def state_dir(self) -> str: ... 209 ↛ exitline 209 didn't return from function 'state_dir' because

210 

211 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover 

212 """Register new hints that this policy accepts 

213 

214 :param hint_parser: (see HintParser.register_hint_type) 

215 """ 

216 

217 def initialise(self, britney: "Britney") -> None: # pragma: no cover 

218 """Called once to make the policy initialise any data structures 

219 

220 This is useful for e.g. parsing files or other "heavy do-once" work. 

221 

222 :param britney: This is the instance of the "Britney" class. 

223 """ 

224 self.britney = britney 

225 

226 def save_state(self, britney: "Britney") -> None: # pragma: no cover 

227 """Called once at the end of the run to make the policy save any persistent data 

228 

229 Note this will *not* be called for "dry-runs" as such runs should not change 

230 the state. 

231 

232 :param britney: This is the instance of the "Britney" class. 

233 """ 

234 

235 def apply_src_policy_impl( 

236 self, 

237 policy_info: dict[str, Any], 

238 item: MigrationItem, 

239 source_data_tdist: SourcePackage | None, 

240 source_data_srcdist: SourcePackage, 

241 excuse: "Excuse", 

242 ) -> PolicyVerdict: # pragma: no cover 

243 """Apply a policy on a given source migration 

244 

245 Britney will call this method on a given source package, when 

246 Britney is considering to migrate it from the given source 

247 suite to the target suite. The policy will then evaluate the 

248 the migration and then return a verdict. 

249 

250 :param policy_info: A dictionary of all policy results. The 

251 policy can add a value stored in a key related to its name. 

252 (e.g. policy_info['age'] = {...}). This will go directly into 

253 the "excuses.yaml" output. 

254 

255 :param item: The migration item the policy is applied to. 

256 

257 :param source_data_tdist: Information about the source package 

258 in the target distribution (e.g. "testing"). This is the 

259 data structure in source_suite.sources[source_name] 

260 

261 :param source_data_srcdist: Information about the source 

262 package in the source distribution (e.g. "unstable" or "tpu"). 

263 This is the data structure in target_suite.sources[source_name] 

264 

265 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

266 """ 

267 return PolicyVerdict.NOT_APPLICABLE 

268 

269 def apply_srcarch_policy_impl( 

270 self, 

271 policy_info: dict[str, Any], 

272 item: MigrationItem, 

273 arch: str, 

274 source_data_tdist: SourcePackage | None, 

275 source_data_srcdist: SourcePackage, 

276 excuse: "Excuse", 

277 ) -> PolicyVerdict: 

278 """Apply a policy on a given binary migration 

279 

280 Britney will call this method on binaries from a given source package 

281 on a given architecture, when Britney is considering to migrate them 

282 from the given source suite to the target suite. The policy will then 

283 evaluate the migration and then return a verdict. 

284 

285 :param policy_info: A dictionary of all policy results. The 

286 policy can add a value stored in a key related to its name. 

287 (e.g. policy_info['age'] = {...}). This will go directly into 

288 the "excuses.yaml" output. 

289 

290 :param item: The migration item the policy is applied to. 

291 

292 :param arch: The architecture the item is applied to. This is mostly 

293 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC 

294 (as that is the only case where arch can differ from item.architecture) 

295 

296 :param source_data_tdist: Information about the source package 

297 in the target distribution (e.g. "testing"). This is the 

298 data structure in source_suite.sources[source_name] 

299 

300 :param source_data_srcdist: Information about the source 

301 package in the source distribution (e.g. "unstable" or "tpu"). 

302 This is the data structure in target_suite.sources[source_name] 

303 

304 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

305 """ 

306 # if the policy doesn't implement this function, assume it's OK 

307 return PolicyVerdict.NOT_APPLICABLE 

308 

309 

310class AbstractBasePolicy(BasePolicy): 

311 """ 

312 A shared abstract class for building BasePolicy objects. 

313 

314 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy 

315 objects with just a two-item constructor, while all other uses of BasePolicy- 

316 derived objects need the 5-item constructor. So AbstractBasePolicy was split 

317 out to document this. 

318 """ 

319 

320 def __init__( 

321 self, 

322 policy_id: str, 

323 options: optparse.Values, 

324 suite_info: Suites, 

325 applicable_suites: set[SuiteClass], 

326 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC, 

327 ) -> None: 

328 """Concrete initializer. 

329 

330 :param policy_id: Identifies the policy. It will 

331 determine the key used for the excuses.yaml etc. 

332 

333 :param options: The options member of Britney with all the 

334 config values. 

335 

336 :param applicable_suites: Where this policy applies. 

337 """ 

338 self.policy_id = policy_id 

339 self.options = options 

340 self.suite_info = suite_info 

341 self.applicable_suites = applicable_suites 

342 self.src_policy = src_policy 

343 self.hints: HintCollection | None = None 

344 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

345 self.logger = logging.getLogger(logger_name) 

346 

347 @property 

348 def state_dir(self) -> str: 

349 return cast(str, self.options.state_dir) 

350 

351 

352_T = TypeVar("_T") 

353 

354 

355class SimplePolicyHint(Hint, Generic[_T]): 

356 def __init__( 

357 self, 

358 user: str, 

359 hint_type: HintType, 

360 policy_parameter: _T, 

361 packages: list[MigrationItem], 

362 ) -> None: 

363 super().__init__(user, hint_type, packages) 

364 self._policy_parameter = policy_parameter 

365 

366 def __eq__(self, other: Any) -> bool: 

367 if self.type != other.type or self._policy_parameter != other._policy_parameter: 

368 return False 

369 return super().__eq__(other) 

370 

371 def str(self) -> str: 

372 return "{} {} {}".format( 

373 self._type, 

374 str(self._policy_parameter), 

375 " ".join(x.name for x in self._packages), 

376 ) 

377 

378 

379class AgeDayHint(SimplePolicyHint[int]): 

380 @property 

381 def days(self) -> int: 

382 return self._policy_parameter 

383 

384 

385class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]): 

386 @property 

387 def ignored_rcbugs(self) -> frozenset[str]: 

388 return self._policy_parameter 

389 

390 

391def simple_policy_hint_parser_function( 

392 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint], 

393 converter: Callable[[str], _T], 

394) -> PolicyHintParserProto: 

395 def f( 

396 mi_factory: MigrationItemFactory, 

397 hints: HintCollection, 

398 who: str, 

399 hint_type: HintType, 

400 *args: str, 

401 ) -> None: 

402 policy_parameter = args[0] 

403 args = args[1:] 

404 for item in mi_factory.parse_items(*args): 

405 hints.add_hint( 

406 class_name(who, hint_type, converter(policy_parameter), [item]) 

407 ) 

408 

409 return f 

410 

411 

412class AgePolicy(AbstractBasePolicy): 

413 """Configurable Aging policy for source migrations 

414 

415 The AgePolicy will let packages stay in the source suite for a pre-defined 

416 amount of days before letting migrate (based on their urgency, if any). 

417 

418 The AgePolicy's decision is influenced by the following: 

419 

420 State files: 

421 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source 

422 packages. Note that urgencies are "sticky" and the most "urgent" urgency 

423 will be used (i.e. the one with lowest age-requirements). 

424 - This file needs to be updated externally, if the policy should take 

425 urgencies into consideration. If empty (or not updated), the policy 

426 will simply use the default urgency (see the "Config" section below) 

427 - In Debian, these values are taken from the .changes file, but that is 

428 not a requirement for Britney. 

429 * ${STATE_DIR}/age-policy-dates: File containing the age of all source 

430 packages. 

431 - The policy will automatically update this file. 

432 Config: 

433 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency 

434 (or for unknown urgencies). Will also be used to set the "minimum" 

435 aging requirements for packages not in the target suite. 

436 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the 

437 given urgency. 

438 - Commonly used urgencies are: low, medium, high, emergency, critical 

439 Hints: 

440 * urgent <source>/<version>: Disregard the age requirements for a given 

441 source/version. 

442 * age-days X <source>/<version>: Set the age requirements for a given 

443 source/version to X days. Note that X can exceed the highest 

444 age-requirement normally given. 

445 

446 """ 

447 

448 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

449 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}) 

450 self._min_days = self._generate_mindays_table() 

451 self._min_days_default = 0 

452 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day) 

453 # NB: _date_now is used in tests 

454 time_now = time.time() 

455 if hasattr(self.options, "fake_runtime"): 

456 time_now = int(self.options.fake_runtime) 

457 self.logger.info("overriding runtime with fake_runtime %d" % time_now) 

458 

459 self._date_now = int(((time_now / (60 * 60)) - 19) / 24) 

460 self._dates: dict[str, tuple[str, int]] = {} 

461 self._urgencies: dict[str, str] = {} 

462 self._default_urgency: str = self.options.default_urgency 

463 self._penalty_immune_urgencies: frozenset[str] = frozenset() 

464 if hasattr(self.options, "no_penalties"): 

465 self._penalty_immune_urgencies = frozenset( 

466 x.strip() for x in self.options.no_penalties.split() 

467 ) 

468 self._bounty_min_age: int | None = None # initialised later 

469 

470 def _generate_mindays_table(self) -> dict[str, int]: 

471 mindays: dict[str, int] = {} 

472 for k in dir(self.options): 

473 if not k.startswith("mindays_"): 

474 continue 

475 v = getattr(self.options, k) 

476 try: 

477 as_days = int(v) 

478 except ValueError: 

479 raise ValueError( 

480 "Unable to parse " 

481 + k 

482 + " as a number of days. Must be 0 or a positive integer" 

483 ) 

484 if as_days < 0: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 raise ValueError( 

486 "The value of " + k + " must be zero or a positive integer" 

487 ) 

488 mindays[k.split("_")[1]] = as_days 

489 return mindays 

490 

491 def register_hints(self, hint_parser: HintParser) -> None: 

492 hint_parser.register_hint_type( 

493 HintType( 

494 "age-days", 

495 simple_policy_hint_parser_function(AgeDayHint, int), 

496 min_args=2, 

497 ) 

498 ) 

499 hint_parser.register_hint_type(HintType("urgent")) 

500 

501 def initialise(self, britney: "Britney") -> None: 

502 super().initialise(britney) 

503 self._read_dates_file() 

504 self._read_urgencies_file() 

505 if self._default_urgency not in self._min_days: # pragma: no cover 

506 raise ValueError( 

507 "Missing age-requirement for default urgency (MINDAYS_%s)" 

508 % self._default_urgency 

509 ) 

510 self._min_days_default = self._min_days[self._default_urgency] 

511 try: 

512 self._bounty_min_age = int(self.options.bounty_min_age) 

513 except ValueError: 513 ↛ 514line 513 didn't jump to line 514 because the exception caught by line 513 didn't happen

514 if self.options.bounty_min_age in self._min_days: 

515 self._bounty_min_age = self._min_days[self.options.bounty_min_age] 

516 else: # pragma: no cover 

517 raise ValueError( 

518 "Please fix BOUNTY_MIN_AGE in the britney configuration" 

519 ) 

520 except AttributeError: 

521 # The option wasn't defined in the configuration 

522 self._bounty_min_age = 0 

523 

524 def save_state(self, britney: "Britney") -> None: 

525 super().save_state(britney) 

526 self._write_dates_file() 

527 

528 def apply_src_policy_impl( 

529 self, 

530 age_info: dict[str, Any], 

531 item: MigrationItem, 

532 source_data_tdist: SourcePackage | None, 

533 source_data_srcdist: SourcePackage, 

534 excuse: "Excuse", 

535 ) -> PolicyVerdict: 

536 # retrieve the urgency for the upload, ignoring it if this is a NEW package 

537 # (not present in the target suite) 

538 source_name = item.package 

539 urgency = self._urgencies.get(source_name, self._default_urgency) 

540 

541 if urgency not in self._min_days: 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true

542 age_info["unknown-urgency"] = urgency 

543 urgency = self._default_urgency 

544 

545 if not source_data_tdist: 

546 if self._min_days[urgency] < self._min_days_default: 

547 age_info["urgency-reduced"] = { 

548 "from": urgency, 

549 "to": self._default_urgency, 

550 } 

551 urgency = self._default_urgency 

552 

553 if source_name not in self._dates: 

554 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

555 elif self._dates[source_name][0] != source_data_srcdist.version: 

556 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

557 

558 days_old = self._date_now - self._dates[source_name][1] 

559 min_days = self._min_days[urgency] 

560 for bounty in excuse.bounty: 

561 if excuse.bounty[bounty]: 561 ↛ 560line 561 didn't jump to line 560 because the condition on line 561 was always true

562 self.logger.info( 

563 "Applying bounty for %s granted by %s: %d days", 

564 source_name, 

565 bounty, 

566 excuse.bounty[bounty], 

567 ) 

568 excuse.addinfo( 

569 "Required age reduced by %d days because of %s" 

570 % (excuse.bounty[bounty], bounty) 

571 ) 

572 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen" 

573 min_days -= excuse.bounty[bounty] 

574 if urgency not in self._penalty_immune_urgencies: 

575 for penalty in excuse.penalty: 

576 if excuse.penalty[penalty]: 576 ↛ 575line 576 didn't jump to line 575 because the condition on line 576 was always true

577 self.logger.info( 

578 "Applying penalty for %s given by %s: %d days", 

579 source_name, 

580 penalty, 

581 excuse.penalty[penalty], 

582 ) 

583 excuse.addinfo( 

584 "Required age increased by %d days because of %s" 

585 % (excuse.penalty[penalty], penalty) 

586 ) 

587 assert ( 

588 excuse.penalty[penalty] > 0 

589 ), "negative penalties should be handled earlier" 

590 min_days += excuse.penalty[penalty] 

591 

592 assert self._bounty_min_age is not None 

593 # the age in BOUNTY_MIN_AGE can be higher than the one associated with 

594 # the real urgency, so don't forget to take it into account 

595 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency]) 

596 if min_days < bounty_min_age: 

597 min_days = bounty_min_age 

598 excuse.addinfo( 

599 "Required age is not allowed to drop below %d days" % min_days 

600 ) 

601 

602 age_info["current-age"] = days_old 

603 

604 assert self.hints is not None 

605 for age_days_hint in cast( 

606 "list[AgeDayHint]", 

607 self.hints.search( 

608 "age-days", package=source_name, version=source_data_srcdist.version 

609 ), 

610 ): 

611 new_req = age_days_hint.days 

612 age_info["age-requirement-reduced"] = { 

613 "new-requirement": new_req, 

614 "changed-by": age_days_hint.user, 

615 } 

616 if "original-age-requirement" not in age_info: 616 ↛ 618line 616 didn't jump to line 618 because the condition on line 616 was always true

617 age_info["original-age-requirement"] = min_days 

618 min_days = new_req 

619 

620 age_info["age-requirement"] = min_days 

621 res = PolicyVerdict.PASS 

622 

623 if days_old < min_days: 

624 urgent_hints = self.hints.search( 

625 "urgent", package=source_name, version=source_data_srcdist.version 

626 ) 

627 if urgent_hints: 

628 age_info["age-requirement-reduced"] = { 

629 "new-requirement": 0, 

630 "changed-by": urgent_hints[0].user, 

631 } 

632 res = PolicyVerdict.PASS_HINTED 

633 else: 

634 res = PolicyVerdict.REJECTED_TEMPORARILY 

635 

636 # update excuse 

637 age_hint = age_info.get("age-requirement-reduced", None) 

638 age_min_req = age_info["age-requirement"] 

639 if age_hint: 

640 new_req = age_hint["new-requirement"] 

641 who = age_hint["changed-by"] 

642 if new_req: 

643 excuse.addinfo( 

644 "Overriding age needed from %d days to %d by %s" 

645 % (age_min_req, new_req, who) 

646 ) 

647 age_min_req = new_req 

648 else: 

649 excuse.addinfo("Too young, but urgency pushed by %s" % who) 

650 age_min_req = 0 

651 excuse.setdaysold(age_info["current-age"], age_min_req) 

652 

653 if age_min_req == 0: 

654 excuse.addinfo("%d days old" % days_old) 

655 elif days_old < age_min_req: 

656 excuse.add_verdict_info( 

657 res, "Too young, only %d of %d days old" % (days_old, age_min_req) 

658 ) 

659 else: 

660 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req)) 

661 

662 return res 

663 

664 def _read_dates_file(self) -> None: 

665 """Parse the dates file""" 

666 dates = self._dates 

667 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates") 

668 using_new_name = False 

669 try: 

670 filename = os.path.join(self.state_dir, "age-policy-dates") 

671 if not os.path.exists(filename) and os.path.exists(fallback_filename): 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true

672 filename = fallback_filename 

673 else: 

674 using_new_name = True 

675 except AttributeError: 

676 if os.path.exists(fallback_filename): 

677 filename = fallback_filename 

678 else: 

679 raise RuntimeError("Please set STATE_DIR in the britney configuration") 

680 

681 try: 

682 with open(filename, encoding="utf-8") as fd: 

683 for line in fd: 

684 if line.startswith("#"): 

685 # Ignore comment lines (mostly used for tests) 

686 continue 

687 # <source> <version> <date>) 

688 ln = line.split() 

689 if len(ln) != 3: # pragma: no cover 

690 continue 

691 try: 

692 dates[ln[0]] = (ln[1], int(ln[2])) 

693 except ValueError: # pragma: no cover 

694 pass 

695 except FileNotFoundError: 

696 if not using_new_name: 696 ↛ 698line 696 didn't jump to line 698 because the condition on line 696 was never true

697 # If we using the legacy name, then just give up 

698 raise 

699 self.logger.info("%s does not appear to exist. Creating it", filename) 

700 with open(filename, mode="x", encoding="utf-8"): 

701 pass 

702 

703 def _read_urgencies_file(self) -> None: 

704 urgencies = self._urgencies 

705 min_days_default = self._min_days_default 

706 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency") 

707 try: 

708 filename = os.path.join(self.state_dir, "age-policy-urgencies") 

709 if not os.path.exists(filename) and os.path.exists(fallback_filename): 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true

710 filename = fallback_filename 

711 except AttributeError: 

712 filename = fallback_filename 

713 

714 sources_s = self.suite_info.primary_source_suite.sources 

715 sources_t = self.suite_info.target_suite.sources 

716 

717 with open(filename, errors="surrogateescape", encoding="ascii") as fd: 

718 for line in fd: 

719 if line.startswith("#"): 

720 # Ignore comment lines (mostly used for tests) 

721 continue 

722 # <source> <version> <urgency> 

723 ln = line.split() 

724 if len(ln) != 3: 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true

725 continue 

726 

727 # read the minimum days associated with the urgencies 

728 urgency_old = urgencies.get(ln[0], None) 

729 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type] 

730 mindays_new = self._min_days.get(ln[2], min_days_default) 

731 

732 # if the new urgency is lower (so the min days are higher), do nothing 

733 if mindays_old <= mindays_new: 

734 continue 

735 

736 # if the package exists in the target suite and it is more recent, do nothing 

737 tsrcv = sources_t.get(ln[0], None) 

738 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0: 

739 continue 

740 

741 # if the package doesn't exist in the primary source suite or it is older, do nothing 

742 usrcv = sources_s.get(ln[0], None) 

743 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 743 ↛ 744line 743 didn't jump to line 744 because the condition on line 743 was never true

744 continue 

745 

746 # update the urgency for the package 

747 urgencies[ln[0]] = ln[2] 

748 

749 def _write_dates_file(self) -> None: 

750 dates = self._dates 

751 try: 

752 directory = self.state_dir 

753 basename = "age-policy-dates" 

754 old_file = os.path.join(self.suite_info.target_suite.path, "Dates") 

755 except AttributeError: 

756 directory = self.suite_info.target_suite.path 

757 basename = "Dates" 

758 old_file = None 

759 filename = os.path.join(directory, basename) 

760 filename_tmp = os.path.join(directory, "%s_new" % basename) 

761 with open(filename_tmp, "w", encoding="utf-8") as fd: 

762 for pkg in sorted(dates): 

763 version, date = dates[pkg] 

764 fd.write("%s %s %d\n" % (pkg, version, date)) 

765 os.rename(filename_tmp, filename) 

766 if old_file is not None and os.path.exists(old_file): 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true

767 self.logger.info("Removing old age-policy-dates file %s", old_file) 

768 os.unlink(old_file) 

769 

770 

771class RCBugPolicy(AbstractBasePolicy): 

772 """RC bug regression policy for source migrations 

773 

774 The RCBugPolicy will read provided list of RC bugs and block any 

775 source upload that would introduce a *new* RC bug in the target 

776 suite. 

777 

778 The RCBugPolicy's decision is influenced by the following: 

779 

780 State files: 

781 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in 

782 the given suite (one for both primary source suite and the target sutie is 

783 needed). 

784 - These files need to be updated externally. 

785 """ 

786 

787 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

788 super().__init__( 

789 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

790 ) 

791 self._bugs_source: dict[str, set[str]] | None = None 

792 self._bugs_target: dict[str, set[str]] | None = None 

793 

794 def register_hints(self, hint_parser: HintParser) -> None: 

795 f = simple_policy_hint_parser_function( 

796 IgnoreRCBugHint, lambda x: frozenset(x.split(",")) 

797 ) 

798 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2)) 

799 

800 def initialise(self, britney: "Britney") -> None: 

801 super().initialise(britney) 

802 source_suite = self.suite_info.primary_source_suite 

803 target_suite = self.suite_info.target_suite 

804 fallback_unstable = os.path.join(source_suite.path, "BugsV") 

805 fallback_testing = os.path.join(target_suite.path, "BugsV") 

806 try: 

807 filename_unstable = os.path.join( 

808 self.state_dir, "rc-bugs-%s" % source_suite.name 

809 ) 

810 filename_testing = os.path.join( 

811 self.state_dir, "rc-bugs-%s" % target_suite.name 

812 ) 

813 if ( 813 ↛ 819line 813 didn't jump to line 819

814 not os.path.exists(filename_unstable) 

815 and not os.path.exists(filename_testing) 

816 and os.path.exists(fallback_unstable) 

817 and os.path.exists(fallback_testing) 

818 ): 

819 filename_unstable = fallback_unstable 

820 filename_testing = fallback_testing 

821 except AttributeError: 

822 filename_unstable = fallback_unstable 

823 filename_testing = fallback_testing 

824 self._bugs_source = self._read_bugs(filename_unstable) 

825 self._bugs_target = self._read_bugs(filename_testing) 

826 

827 def apply_src_policy_impl( 

828 self, 

829 rcbugs_info: dict[str, Any], 

830 item: MigrationItem, 

831 source_data_tdist: SourcePackage | None, 

832 source_data_srcdist: SourcePackage, 

833 excuse: "Excuse", 

834 ) -> PolicyVerdict: 

835 assert self._bugs_source is not None # for type checking 

836 assert self._bugs_target is not None # for type checking 

837 bugs_t = set() 

838 bugs_s = set() 

839 source_name = item.package 

840 binaries_s = {x[0] for x in source_data_srcdist.binaries} 

841 try: 

842 binaries_t = {x[0] for x in source_data_tdist.binaries} # type: ignore[union-attr] 

843 except AttributeError: 

844 binaries_t = set() 

845 

846 src_key = f"src:{source_name}" 

847 if source_data_tdist and src_key in self._bugs_target: 

848 bugs_t.update(self._bugs_target[src_key]) 

849 if src_key in self._bugs_source: 

850 bugs_s.update(self._bugs_source[src_key]) 

851 

852 for pkg in binaries_s: 

853 if pkg in self._bugs_source: 

854 bugs_s |= self._bugs_source[pkg] 

855 for pkg in binaries_t: 

856 if pkg in self._bugs_target: 

857 bugs_t |= self._bugs_target[pkg] 

858 

859 # The bts seems to support filing source bugs against a binary of the 

860 # same name if that binary isn't built by any source. An example is bug 

861 # 820347 against Package: juce (in the live-2016-04-11 test). Add those 

862 # bugs too. 

863 if ( 

864 source_name not in (binaries_s | binaries_t) 

865 and source_name 

866 not in { 

867 x.package_name 

868 for x in self.suite_info.primary_source_suite.all_binaries_in_suite.keys() 

869 } 

870 and source_name 

871 not in { 

872 x.package_name 

873 for x in self.suite_info.target_suite.all_binaries_in_suite.keys() 

874 } 

875 ): 

876 if source_name in self._bugs_source: 

877 bugs_s |= self._bugs_source[source_name] 

878 if source_name in self._bugs_target: 878 ↛ 879line 878 didn't jump to line 879 because the condition on line 878 was never true

879 bugs_t |= self._bugs_target[source_name] 

880 

881 # If a package is not in the target suite, it has no RC bugs per 

882 # definition. Unfortunately, it seems that the live-data is 

883 # not always accurate (e.g. live-2011-12-13 suggests that 

884 # obdgpslogger had the same bug in testing and unstable, 

885 # but obdgpslogger was not in testing at that time). 

886 # - For the curious, obdgpslogger was removed on that day 

887 # and the BTS probably had not caught up with that fact. 

888 # (https://tracker.debian.org/news/415935) 

889 assert not bugs_t or source_data_tdist, ( 

890 "%s had bugs in the target suite but is not present" % source_name 

891 ) 

892 

893 verdict = PolicyVerdict.PASS 

894 

895 assert self.hints is not None 

896 for ignore_hint in cast( 

897 list[IgnoreRCBugHint], 

898 self.hints.search( 

899 "ignore-rc-bugs", 

900 package=source_name, 

901 version=source_data_srcdist.version, 

902 ), 

903 ): 

904 ignored_bugs = ignore_hint.ignored_rcbugs 

905 

906 # Only handle one hint for now 

907 if "ignored-bugs" in rcbugs_info: 

908 self.logger.info( 

909 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s", 

910 ignore_hint.user, 

911 source_name, 

912 rcbugs_info["ignored-bugs"]["issued-by"], 

913 ) 

914 continue 

915 if not ignored_bugs.isdisjoint(bugs_s): 915 ↛ 924line 915 didn't jump to line 924 because the condition on line 915 was always true

916 bugs_s -= ignored_bugs 

917 bugs_t -= ignored_bugs 

918 rcbugs_info["ignored-bugs"] = { 

919 "bugs": sorted(ignored_bugs), 

920 "issued-by": ignore_hint.user, 

921 } 

922 verdict = PolicyVerdict.PASS_HINTED 

923 else: 

924 self.logger.info( 

925 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package", 

926 ignore_hint.user, 

927 source_name, 

928 str(ignored_bugs), 

929 ) 

930 

931 rcbugs_info["shared-bugs"] = sorted(bugs_s & bugs_t) 

932 rcbugs_info["unique-source-bugs"] = sorted(bugs_s - bugs_t) 

933 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_s) 

934 

935 # update excuse 

936 new_bugs = rcbugs_info["unique-source-bugs"] 

937 old_bugs = rcbugs_info["unique-target-bugs"] 

938 excuse.setbugs(old_bugs, new_bugs) 

939 

940 if new_bugs: 

941 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

942 excuse.add_verdict_info( 

943 verdict, 

944 "Updating %s would introduce bugs in %s: %s" 

945 % ( 

946 source_name, 

947 self.suite_info.target_suite.name, 

948 ", ".join( 

949 [ 

950 '<a href="https://bugs.debian.org/%s">#%s</a>' 

951 % (quote(a), a) 

952 for a in new_bugs 

953 ] 

954 ), 

955 ), 

956 ) 

957 

958 if old_bugs: 

959 excuse.addinfo( 

960 "Updating %s will fix bugs in %s: %s" 

961 % ( 

962 source_name, 

963 self.suite_info.target_suite.name, 

964 ", ".join( 

965 [ 

966 '<a href="https://bugs.debian.org/%s">#%s</a>' 

967 % (quote(a), a) 

968 for a in old_bugs 

969 ] 

970 ), 

971 ) 

972 ) 

973 

974 return verdict 

975 

976 def _read_bugs(self, filename: str) -> dict[str, set[str]]: 

977 """Read the release critical bug summary from the specified file 

978 

979 The file contains rows with the format: 

980 

981 <package-name> <bug number>[,<bug number>...] 

982 

983 The method returns a dictionary where the key is the binary package 

984 name and the value is the list of open RC bugs for it. 

985 """ 

986 bugs: dict[str, set[str]] = {} 

987 self.logger.info("Loading RC bugs data from %s", filename) 

988 with open(filename, encoding="ascii") as f: 

989 for line in f: 

990 ln = line.split() 

991 if len(ln) != 2: # pragma: no cover 

992 self.logger.warning("Malformed line found in line %s", line) 

993 continue 

994 pkg = ln[0] 

995 if pkg not in bugs: 

996 bugs[pkg] = set() 

997 bugs[pkg].update(ln[1].split(",")) 

998 return bugs 

999 

1000 

1001class PiupartsPolicy(AbstractBasePolicy): 

1002 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1003 super().__init__( 

1004 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

1005 ) 

1006 self._piuparts_source: dict[str, tuple[str, str]] | None = None 

1007 self._piuparts_target: dict[str, tuple[str, str]] | None = None 

1008 

1009 def register_hints(self, hint_parser: HintParser) -> None: 

1010 hint_parser.register_hint_type(HintType("ignore-piuparts")) 

1011 

1012 def initialise(self, britney: "Britney") -> None: 

1013 super().initialise(britney) 

1014 source_suite = self.suite_info.primary_source_suite 

1015 target_suite = self.suite_info.target_suite 

1016 try: 

1017 filename_unstable = os.path.join( 

1018 self.state_dir, "piuparts-summary-%s.json" % source_suite.name 

1019 ) 

1020 filename_testing = os.path.join( 

1021 self.state_dir, "piuparts-summary-%s.json" % target_suite.name 

1022 ) 

1023 except AttributeError as e: # pragma: no cover 

1024 raise RuntimeError( 

1025 "Please set STATE_DIR in the britney configuration" 

1026 ) from e 

1027 self._piuparts_source = self._read_piuparts_summary( 

1028 filename_unstable, keep_url=True 

1029 ) 

1030 self._piuparts_target = self._read_piuparts_summary( 

1031 filename_testing, keep_url=False 

1032 ) 

1033 

1034 def apply_src_policy_impl( 

1035 self, 

1036 piuparts_info: dict[str, Any], 

1037 item: MigrationItem, 

1038 source_data_tdist: SourcePackage | None, 

1039 source_data_srcdist: SourcePackage, 

1040 excuse: "Excuse", 

1041 ) -> PolicyVerdict: 

1042 assert self._piuparts_source is not None # for type checking 

1043 assert self._piuparts_target is not None # for type checking 

1044 source_name = item.package 

1045 

1046 if source_name in self._piuparts_target: 

1047 testing_state = self._piuparts_target[source_name][0] 

1048 else: 

1049 testing_state = "X" 

1050 url: str | None 

1051 if source_name in self._piuparts_source: 

1052 unstable_state, url = self._piuparts_source[source_name] 

1053 else: 

1054 unstable_state = "X" 

1055 url = None 

1056 url_html = "(no link yet)" 

1057 if url is not None: 

1058 url_html = '<a href="{0}">{0}</a>'.format(url) 

1059 

1060 if unstable_state == "P": 

1061 # Not a regression 

1062 msg = f"Piuparts tested OK - {url_html}" 

1063 result = PolicyVerdict.PASS 

1064 piuparts_info["test-results"] = "pass" 

1065 elif unstable_state == "F": 

1066 if testing_state != unstable_state: 

1067 piuparts_info["test-results"] = "regression" 

1068 msg = f"Piuparts regression - {url_html}" 

1069 result = PolicyVerdict.REJECTED_PERMANENTLY 

1070 else: 

1071 piuparts_info["test-results"] = "failed" 

1072 msg = f"Piuparts failure (not a regression) - {url_html}" 

1073 result = PolicyVerdict.PASS 

1074 elif unstable_state == "W": 

1075 msg = f"Piuparts check waiting for test results - {url_html}" 

1076 result = PolicyVerdict.REJECTED_TEMPORARILY 

1077 piuparts_info["test-results"] = "waiting-for-test-results" 

1078 else: 

1079 msg = f"Piuparts can't test {source_name} (not a blocker) - {url_html}" 

1080 piuparts_info["test-results"] = "cannot-be-tested" 

1081 result = PolicyVerdict.PASS 

1082 

1083 if url is not None: 

1084 piuparts_info["piuparts-test-url"] = url 

1085 if result.is_rejected: 

1086 excuse.add_verdict_info(result, msg) 

1087 else: 

1088 excuse.addinfo(msg) 

1089 

1090 if result.is_rejected: 

1091 assert self.hints is not None 

1092 for ignore_hint in self.hints.search( 

1093 "ignore-piuparts", 

1094 package=source_name, 

1095 version=source_data_srcdist.version, 

1096 ): 

1097 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user} 

1098 result = PolicyVerdict.PASS_HINTED 

1099 excuse.addinfo( 

1100 f"Piuparts issue ignored as requested by {ignore_hint.user}" 

1101 ) 

1102 break 

1103 

1104 return result 

1105 

1106 def _read_piuparts_summary( 

1107 self, filename: str, keep_url: bool = True 

1108 ) -> dict[str, tuple[str, str]]: 

1109 summary: dict[str, tuple[str, str]] = {} 

1110 self.logger.info("Loading piuparts report from %s", filename) 

1111 with open(filename) as fd: 1111 ↛ exitline 1111 didn't return from function '_read_piuparts_summary' because the return on line 1113 wasn't executed

1112 if os.fstat(fd.fileno()).st_size < 1: 1112 ↛ 1113line 1112 didn't jump to line 1113 because the condition on line 1112 was never true

1113 return summary 

1114 data = json.load(fd) 

1115 try: 

1116 if ( 

1117 data["_id"] != "Piuparts Package Test Results Summary" 

1118 or data["_version"] != "1.0" 

1119 ): # pragma: no cover 

1120 raise ValueError( 

1121 f"Piuparts results in {filename} does not have the correct ID or version" 

1122 ) 

1123 except KeyError as e: # pragma: no cover 

1124 raise ValueError( 

1125 f"Piuparts results in {filename} is missing id or version field" 

1126 ) from e 

1127 for source, suite_data in data["packages"].items(): 

1128 if len(suite_data) != 1: # pragma: no cover 

1129 raise ValueError( 

1130 f"Piuparts results in {filename}, the source {source} does not have " 

1131 "exactly one result set" 

1132 ) 

1133 item = next(iter(suite_data.values())) 

1134 state, _, url = item 

1135 if not keep_url: 

1136 url = None 

1137 summary[source] = (state, url) 

1138 

1139 return summary 

1140 

1141 

1142class DependsPolicy(AbstractBasePolicy): 

1143 pkg_universe: "BinaryPackageUniverse" 

1144 broken_packages: frozenset["BinaryPackageId"] 

1145 all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

1146 allow_uninst: dict[str, set[str | None]] 

1147 

1148 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1149 super().__init__( 

1150 "depends", 

1151 options, 

1152 suite_info, 

1153 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1154 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1155 ) 

1156 self.nobreakall_arches = None 

1157 self.new_arches = None 

1158 self.break_arches = None 

1159 

1160 def initialise(self, britney: "Britney") -> None: 

1161 super().initialise(britney) 

1162 self.pkg_universe = britney.pkg_universe 

1163 self.broken_packages = self.pkg_universe.broken_packages 

1164 self.all_binaries = britney.all_binaries 

1165 self.nobreakall_arches = self.options.nobreakall_arches 

1166 self.new_arches = self.options.new_arches 

1167 self.break_arches = self.options.break_arches 

1168 self.allow_uninst = britney.allow_uninst 

1169 

1170 def apply_srcarch_policy_impl( 

1171 self, 

1172 deps_info: dict[str, Any], 

1173 item: MigrationItem, 

1174 arch: str, 

1175 source_data_tdist: SourcePackage | None, 

1176 source_data_srcdist: SourcePackage, 

1177 excuse: "Excuse", 

1178 ) -> PolicyVerdict: 

1179 verdict = PolicyVerdict.PASS 

1180 

1181 assert self.break_arches is not None 

1182 assert self.new_arches is not None 

1183 if arch in self.break_arches or arch in self.new_arches: 

1184 # we don't check these in the policy (TODO - for now?) 

1185 return verdict 

1186 

1187 source_suite = item.suite 

1188 target_suite = self.suite_info.target_suite 

1189 

1190 packages_s_a = source_suite.binaries[arch] 

1191 packages_t_a = target_suite.binaries[arch] 

1192 

1193 my_bins = sorted(filter_out_faux(excuse.packages[arch])) 

1194 

1195 arch_all_installable = set() 

1196 arch_arch_installable = set() 

1197 consider_it_regression = True 

1198 

1199 for pkg_id in my_bins: 

1200 pkg_name = pkg_id.package_name 

1201 binary_u = packages_s_a[pkg_name] 

1202 pkg_arch = binary_u.architecture 

1203 

1204 # in some cases, we want to track the uninstallability of a 

1205 # package (because the autopkgtest policy uses this), but we still 

1206 # want to allow the package to be uninstallable 

1207 skip_dep_check = False 

1208 

1209 if binary_u.source_version != source_data_srcdist.version: 

1210 # don't check cruft in unstable 

1211 continue 

1212 

1213 if item.architecture != "source" and pkg_arch == "all": 

1214 # we don't care about the existing arch: all binaries when 

1215 # checking a binNMU item, because the arch: all binaries won't 

1216 # migrate anyway 

1217 skip_dep_check = True 

1218 

1219 if pkg_arch == "all" and arch not in self.nobreakall_arches: 

1220 skip_dep_check = True 

1221 

1222 if pkg_name in self.allow_uninst[arch]: 1222 ↛ 1225line 1222 didn't jump to line 1225 because the condition on line 1222 was never true

1223 # this binary is allowed to become uninstallable, so we don't 

1224 # need to check anything 

1225 skip_dep_check = True 

1226 

1227 if pkg_name in packages_t_a: 

1228 oldbin = packages_t_a[pkg_name] 

1229 if not target_suite.is_installable(oldbin.pkg_id): 

1230 # as the current binary in testing is already 

1231 # uninstallable, the newer version is allowed to be 

1232 # uninstallable as well, so we don't need to check 

1233 # anything 

1234 skip_dep_check = True 

1235 consider_it_regression = False 

1236 

1237 if pkg_id in self.broken_packages: 

1238 if pkg_arch == "all": 

1239 arch_all_installable.add(False) 

1240 else: 

1241 arch_arch_installable.add(False) 

1242 # dependencies can't be satisfied by all the known binaries - 

1243 # this certainly won't work... 

1244 excuse.add_unsatisfiable_on_arch(arch) 

1245 if skip_dep_check: 

1246 # ...but if the binary is allowed to become uninstallable, 

1247 # we don't care 

1248 # we still want the binary to be listed as uninstallable, 

1249 continue 

1250 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

1251 if pkg_name.endswith("-faux-build-depends"): 1251 ↛ 1252line 1251 didn't jump to line 1252 because the condition on line 1251 was never true

1252 name = pkg_name.replace("-faux-build-depends", "") 

1253 excuse.add_verdict_info( 

1254 verdict, 

1255 f"src:{name} has unsatisfiable build dependency", 

1256 ) 

1257 else: 

1258 excuse.add_verdict_info( 

1259 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency" 

1260 ) 

1261 excuse.addreason("depends") 

1262 else: 

1263 if pkg_arch == "all": 

1264 arch_all_installable.add(True) 

1265 else: 

1266 arch_arch_installable.add(True) 

1267 

1268 if skip_dep_check: 

1269 continue 

1270 

1271 deps = self.pkg_universe.dependencies_of(pkg_id) 

1272 

1273 for dep in deps: 

1274 # dep is a list of packages, each of which satisfy the 

1275 # dependency 

1276 

1277 if dep == frozenset(): 

1278 continue 

1279 is_ok = False 

1280 needed_for_dep = set() 

1281 

1282 for alternative in dep: 

1283 if target_suite.is_pkg_in_the_suite(alternative): 

1284 # dep can be satisfied in testing - ok 

1285 is_ok = True 

1286 elif alternative in my_bins: 

1287 # can be satisfied by binary from same item: will be 

1288 # ok if item migrates 

1289 is_ok = True 

1290 else: 

1291 needed_for_dep.add(alternative) 

1292 

1293 if not is_ok: 

1294 spec = DependencySpec(DependencyType.DEPENDS, arch) 

1295 excuse.add_package_depends(spec, needed_for_dep) 

1296 

1297 # The autopkgtest policy needs delicate trade offs for 

1298 # non-installability. The current choice (considering source 

1299 # migration and only binaries built by the version of the 

1300 # source): 

1301 # 

1302 # * Run autopkgtest if all arch:$arch binaries are installable 

1303 # (but some or all arch:all binaries are not) 

1304 # 

1305 # * Don't schedule nor wait for not installable arch:all only package 

1306 # on ! NOBREAKALL_ARCHES 

1307 # 

1308 # * Run autopkgtest if installability isn't a regression (there are (or 

1309 # rather, should) not be a lot of packages in this state, and most 

1310 # likely they'll just fail quickly) 

1311 # 

1312 # * Don't schedule, but wait otherwise 

1313 if arch_arch_installable == {True} and False in arch_all_installable: 

1314 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1315 elif ( 

1316 arch not in self.nobreakall_arches 

1317 and arch_arch_installable == set() 

1318 and False in arch_all_installable 

1319 ): 

1320 deps_info.setdefault("arch_all_not_installable", []).append(arch) 

1321 elif not consider_it_regression: 

1322 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1323 

1324 return verdict 

1325 

1326 

1327@unique 

1328class BuildDepResult(IntEnum): 

1329 # relation is satisfied in target 

1330 OK = 1 

1331 # relation can be satisfied by other packages in source 

1332 DEPENDS = 2 

1333 # relation cannot be satisfied 

1334 FAILED = 3 

1335 

1336 

1337class BuildDependsPolicy(AbstractBasePolicy): 

1338 

1339 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1340 super().__init__( 

1341 "build-depends", 

1342 options, 

1343 suite_info, 

1344 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1345 ) 

1346 self._all_buildarch: list[str] = [] 

1347 

1348 parse_option(options, "all_buildarch") 

1349 

1350 def initialise(self, britney: "Britney") -> None: 

1351 super().initialise(britney) 

1352 if self.options.all_buildarch: 

1353 self._all_buildarch = SuiteContentLoader.config_str_as_list( 

1354 self.options.all_buildarch, [] 

1355 ) 

1356 

1357 def apply_src_policy_impl( 

1358 self, 

1359 build_deps_info: dict[str, Any], 

1360 item: MigrationItem, 

1361 source_data_tdist: SourcePackage | None, 

1362 source_data_srcdist: SourcePackage, 

1363 excuse: "Excuse", 

1364 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1365 ) -> PolicyVerdict: 

1366 verdict = PolicyVerdict.PASS 

1367 

1368 # analyze the dependency fields (if present) 

1369 if deps := source_data_srcdist.build_deps_arch: 

1370 v = self._check_build_deps( 

1371 deps, 

1372 DependencyType.BUILD_DEPENDS, 

1373 build_deps_info, 

1374 item, 

1375 source_data_tdist, 

1376 source_data_srcdist, 

1377 excuse, 

1378 get_dependency_solvers=get_dependency_solvers, 

1379 ) 

1380 verdict = PolicyVerdict.worst_of(verdict, v) 

1381 

1382 if ideps := source_data_srcdist.build_deps_indep: 

1383 v = self._check_build_deps( 

1384 ideps, 

1385 DependencyType.BUILD_DEPENDS_INDEP, 

1386 build_deps_info, 

1387 item, 

1388 source_data_tdist, 

1389 source_data_srcdist, 

1390 excuse, 

1391 get_dependency_solvers=get_dependency_solvers, 

1392 ) 

1393 verdict = PolicyVerdict.worst_of(verdict, v) 

1394 

1395 return verdict 

1396 

1397 def _get_check_archs( 

1398 self, archs: Container[str], dep_type: DependencyType 

1399 ) -> list[str]: 

1400 oos = self.options.outofsync_arches 

1401 

1402 if dep_type == DependencyType.BUILD_DEPENDS: 

1403 return [ 

1404 arch 

1405 for arch in self.options.architectures 

1406 if arch in archs and arch not in oos 

1407 ] 

1408 

1409 # first try the all buildarch 

1410 checkarchs = list(self._all_buildarch) 

1411 # then try the architectures where this source has arch specific 

1412 # binaries (in the order of the architecture config file) 

1413 checkarchs.extend( 

1414 arch 

1415 for arch in self.options.architectures 

1416 if arch in archs and arch not in checkarchs 

1417 ) 

1418 # then try all other architectures 

1419 checkarchs.extend( 

1420 arch for arch in self.options.architectures if arch not in checkarchs 

1421 ) 

1422 

1423 # and drop OUTOFSYNC_ARCHES 

1424 return [arch for arch in checkarchs if arch not in oos] 

1425 

1426 def _add_info_for_arch( 

1427 self, 

1428 arch: str, 

1429 excuses_info: dict[str, list[str]], 

1430 blockers: dict[str, set[BinaryPackageId]], 

1431 results: dict[str, BuildDepResult], 

1432 dep_type: DependencyType, 

1433 target_suite: TargetSuite, 

1434 source_suite: Suite, 

1435 excuse: "Excuse", 

1436 verdict: PolicyVerdict, 

1437 ) -> PolicyVerdict: 

1438 if arch in blockers: 

1439 packages = blockers[arch] 

1440 

1441 # for the solving packages, update the excuse to add the dependencies 

1442 for p in packages: 

1443 if arch not in self.options.break_arches: 1443 ↛ 1442line 1443 didn't jump to line 1442 because the condition on line 1443 was always true

1444 spec = DependencySpec(dep_type, arch) 

1445 excuse.add_package_depends(spec, {p}) 

1446 

1447 if arch in results and results[arch] == BuildDepResult.FAILED: 

1448 verdict = PolicyVerdict.worst_of( 

1449 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1450 ) 

1451 

1452 if arch in excuses_info: 

1453 for excuse_text in excuses_info[arch]: 

1454 if verdict.is_rejected: 1454 ↛ 1457line 1454 didn't jump to line 1457 because the condition on line 1454 was always true

1455 excuse.add_verdict_info(verdict, excuse_text) 

1456 else: 

1457 excuse.addinfo(excuse_text) 

1458 

1459 return verdict 

1460 

1461 def _check_build_deps( 

1462 self, 

1463 deps: str, 

1464 dep_type: DependencyType, 

1465 build_deps_info: dict[str, Any], 

1466 item: MigrationItem, 

1467 source_data_tdist: SourcePackage | None, 

1468 source_data_srcdist: SourcePackage, 

1469 excuse: "Excuse", 

1470 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1471 ) -> PolicyVerdict: 

1472 verdict = PolicyVerdict.PASS 

1473 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP 

1474 

1475 britney = self.britney 

1476 

1477 # local copies for better performance 

1478 parse_src_depends = apt_pkg.parse_src_depends 

1479 

1480 source_name = item.package 

1481 source_suite = item.suite 

1482 target_suite = self.suite_info.target_suite 

1483 binaries_s = source_suite.binaries 

1484 provides_s = source_suite.provides_table 

1485 binaries_t = target_suite.binaries 

1486 provides_t = target_suite.provides_table 

1487 unsat_bd: dict[str, list[str]] = {} 

1488 relevant_archs: set[str] = { 

1489 binary.architecture 

1490 for binary in filter_out_faux(source_data_srcdist.binaries) 

1491 if britney.all_binaries[binary].architecture != "all" 

1492 } 

1493 

1494 excuses_info: dict[str, list[str]] = defaultdict(list) 

1495 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set) 

1496 arch_results = {} 

1497 result_archs = defaultdict(list) 

1498 bestresult = BuildDepResult.FAILED 

1499 check_archs = self._get_check_archs(relevant_archs, dep_type) 

1500 if not check_archs: 

1501 # when the arch list is empty, we check the b-d on any arch, instead of all archs 

1502 # this happens for Build-Depens on a source package that only produces arch: all binaries 

1503 any_arch_ok = True 

1504 check_archs = self._get_check_archs( 

1505 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP 

1506 ) 

1507 

1508 for arch in check_archs: 

1509 # retrieve the binary package from the specified suite and arch 

1510 binaries_s_a = binaries_s[arch] 

1511 provides_s_a = provides_s[arch] 

1512 binaries_t_a = binaries_t[arch] 

1513 provides_t_a = provides_t[arch] 

1514 arch_results[arch] = BuildDepResult.OK 

1515 # for every dependency block (formed as conjunction of disjunction) 

1516 for block_txt in deps.split(","): 

1517 block_list = parse_src_depends(block_txt, False, arch) 

1518 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be 

1519 # filtered out by (e.g.) architecture restrictions. We need to cope with this while 

1520 # keeping block_txt and block aligned. 

1521 if not block_list: 

1522 # Relation is not relevant for this architecture. 

1523 continue 

1524 block = block_list[0] 

1525 # if the block is satisfied in the target suite, then skip the block 

1526 if get_dependency_solvers( 

1527 block, binaries_t_a, provides_t_a, build_depends=True 

1528 ): 

1529 # Satisfied in the target suite; all ok. 

1530 continue 

1531 

1532 # check if the block can be satisfied in the source suite, and list the solving packages 

1533 packages = get_dependency_solvers( 

1534 block, binaries_s_a, provides_s_a, build_depends=True 

1535 ) 

1536 sources = sorted(p.source for p in packages) 

1537 

1538 # if the dependency can be satisfied by the same source package, skip the block: 

1539 # obviously both binary packages will enter the target suite together 

1540 if source_name in sources: 1540 ↛ 1541line 1540 didn't jump to line 1541 because the condition on line 1540 was never true

1541 continue 

1542 

1543 # if no package can satisfy the dependency, add this information to the excuse 

1544 if not packages: 

1545 excuses_info[arch].append( 

1546 "%s unsatisfiable %s on %s: %s" 

1547 % (source_name, dep_type, arch, block_txt.strip()) 

1548 ) 

1549 if arch not in unsat_bd: 1549 ↛ 1551line 1549 didn't jump to line 1551 because the condition on line 1549 was always true

1550 unsat_bd[arch] = [] 

1551 unsat_bd[arch].append(block_txt.strip()) 

1552 arch_results[arch] = BuildDepResult.FAILED 

1553 continue 

1554 

1555 blockers[arch].update(p.pkg_id for p in packages) 

1556 if arch_results[arch] < BuildDepResult.DEPENDS: 

1557 arch_results[arch] = BuildDepResult.DEPENDS 

1558 

1559 if any_arch_ok: 

1560 if arch_results[arch] < bestresult: 

1561 bestresult = arch_results[arch] 

1562 result_archs[arch_results[arch]].append(arch) 

1563 if bestresult == BuildDepResult.OK: 

1564 # we found an architecture where the b-deps-indep are 

1565 # satisfied in the target suite, so we can stop 

1566 break 

1567 

1568 if any_arch_ok: 

1569 arch = result_archs[bestresult][0] 

1570 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}") 

1571 key = "check-%s-on-arch" % dep_type.get_reason() 

1572 build_deps_info[key] = arch 

1573 verdict = self._add_info_for_arch( 

1574 arch, 

1575 excuses_info, 

1576 blockers, 

1577 arch_results, 

1578 dep_type, 

1579 target_suite, 

1580 source_suite, 

1581 excuse, 

1582 verdict, 

1583 ) 

1584 

1585 else: 

1586 for arch in check_archs: 

1587 verdict = self._add_info_for_arch( 

1588 arch, 

1589 excuses_info, 

1590 blockers, 

1591 arch_results, 

1592 dep_type, 

1593 target_suite, 

1594 source_suite, 

1595 excuse, 

1596 verdict, 

1597 ) 

1598 

1599 if unsat_bd: 

1600 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd 

1601 

1602 return verdict 

1603 

1604 

1605class BuiltUsingPolicy(AbstractBasePolicy): 

1606 """Built-Using policy 

1607 

1608 Binaries that incorporate (part of) another source package must list these 

1609 sources under 'Built-Using'. 

1610 

1611 This policy checks if the corresponding sources are available in the 

1612 target suite. If they are not, but they are candidates for migration, a 

1613 dependency is added. 

1614 

1615 If the binary incorporates a newer version of a source, that is not (yet) 

1616 a candidate, we don't want to accept that binary. A rebuild later in the 

1617 primary suite wouldn't fix the issue, because that would incorporate the 

1618 newer version again. 

1619 

1620 If the binary incorporates an older version of the source, a newer version 

1621 will be accepted as a replacement. We assume that this can be fixed by 

1622 rebuilding the binary at some point during the development cycle. 

1623 

1624 Requiring exact version of the source would not be useful in practice. A 

1625 newer upload of that source wouldn't be blocked by this policy, so the 

1626 built-using would be outdated anyway. 

1627 

1628 """ 

1629 

1630 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1631 super().__init__( 

1632 "built-using", 

1633 options, 

1634 suite_info, 

1635 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1636 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1637 ) 

1638 

1639 def initialise(self, britney: "Britney") -> None: 

1640 super().initialise(britney) 

1641 

1642 def apply_srcarch_policy_impl( 

1643 self, 

1644 build_deps_info: dict[str, Any], 

1645 item: MigrationItem, 

1646 arch: str, 

1647 source_data_tdist: SourcePackage | None, 

1648 source_data_srcdist: SourcePackage, 

1649 excuse: "Excuse", 

1650 ) -> PolicyVerdict: 

1651 verdict = PolicyVerdict.PASS 

1652 

1653 source_suite = item.suite 

1654 target_suite = self.suite_info.target_suite 

1655 binaries_s = source_suite.binaries 

1656 

1657 def check_bu_in_suite( 

1658 bu_source: str, bu_version: str, source_suite: Suite 

1659 ) -> bool: 

1660 found = False 

1661 if bu_source not in source_suite.sources: 

1662 return found 

1663 s_source = source_suite.sources[bu_source] 

1664 s_ver = s_source.version 

1665 if apt_pkg.version_compare(s_ver, bu_version) >= 0: 

1666 found = True 

1667 dep = PackageId(bu_source, s_ver, "source") 

1668 if arch in self.options.break_arches: 

1669 excuse.add_detailed_info( 

1670 "Ignoring Built-Using for %s/%s on %s" 

1671 % (pkg_name, arch, dep.uvname) 

1672 ) 

1673 else: 

1674 spec = DependencySpec(DependencyType.BUILT_USING, arch) 

1675 excuse.add_package_depends(spec, {dep}) 

1676 excuse.add_detailed_info( 

1677 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}" 

1678 ) 

1679 

1680 return found 

1681 

1682 for pkg_id in sorted( 

1683 x 

1684 for x in filter_out_faux(source_data_srcdist.binaries) 

1685 if x.architecture == arch 

1686 ): 

1687 pkg_name = pkg_id.package_name 

1688 

1689 # retrieve the testing (if present) and unstable corresponding binary packages 

1690 binary_s = binaries_s[arch][pkg_name] 

1691 

1692 for bu in binary_s.builtusing: 

1693 bu_source = bu[0] 

1694 bu_version = bu[1] 

1695 found = False 

1696 if bu_source in target_suite.sources: 

1697 t_source = target_suite.sources[bu_source] 

1698 t_ver = t_source.version 

1699 if apt_pkg.version_compare(t_ver, bu_version) >= 0: 

1700 found = True 

1701 

1702 if not found: 

1703 found = check_bu_in_suite(bu_source, bu_version, source_suite) 

1704 

1705 if not found and source_suite.suite_class.is_additional_source: 

1706 found = check_bu_in_suite( 

1707 bu_source, bu_version, self.suite_info.primary_source_suite 

1708 ) 

1709 

1710 if not found: 

1711 if arch in self.options.break_arches: 

1712 excuse.add_detailed_info( 

1713 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s" 

1714 % (pkg_name, arch, bu_source, bu_version) 

1715 ) 

1716 else: 

1717 verdict = PolicyVerdict.worst_of( 

1718 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1719 ) 

1720 excuse.add_verdict_info( 

1721 verdict, 

1722 "%s/%s has unsatisfiable Built-Using on %s %s" 

1723 % (pkg_name, arch, bu_source, bu_version), 

1724 ) 

1725 

1726 return verdict 

1727 

1728 

1729class BlockPolicy(AbstractBasePolicy): 

1730 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$") 

1731 

1732 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1733 super().__init__( 

1734 "block", 

1735 options, 

1736 suite_info, 

1737 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1738 ) 

1739 self._blockall: dict[str | None, Hint] = {} 

1740 

1741 def initialise(self, britney: "Britney") -> None: 

1742 super().initialise(britney) 

1743 assert self.hints is not None 

1744 for hint in self.hints.search(type="block-all"): 

1745 self._blockall[hint.package] = hint 

1746 

1747 self._key_packages = [] 

1748 if "key" in self._blockall: 

1749 self._key_packages = self._read_key_packages() 

1750 

1751 def _read_key_packages(self) -> list[str]: 

1752 """Read the list of key packages 

1753 

1754 The file contains data in the yaml format : 

1755 

1756 - reason: <something> 

1757 source: <package> 

1758 

1759 The method returns a list of all key packages. 

1760 """ 

1761 filename = os.path.join(self.state_dir, "key_packages.yaml") 

1762 self.logger.info("Loading key packages from %s", filename) 

1763 if os.path.exists(filename): 1763 ↛ 1768line 1763 didn't jump to line 1768 because the condition on line 1763 was always true

1764 with open(filename) as f: 

1765 data = yaml.safe_load(f) 

1766 key_packages = [item["source"] for item in data] 

1767 else: 

1768 self.logger.error( 

1769 "Britney was asked to block key packages, " 

1770 + "but no key_packages.yaml file was found." 

1771 ) 

1772 sys.exit(1) 

1773 

1774 return key_packages 

1775 

1776 def register_hints(self, hint_parser: HintParser) -> None: 

1777 # block related hints are currently defined in hint.py 

1778 pass 

1779 

1780 def _check_blocked( 

1781 self, item: "MigrationItem", arch: str, version: str, excuse: "Excuse" 

1782 ) -> PolicyVerdict: 

1783 verdict = PolicyVerdict.PASS 

1784 blocked = {} 

1785 unblocked = {} 

1786 block_info = {} 

1787 source_suite = item.suite 

1788 suite_name = source_suite.name 

1789 src = item.package 

1790 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE 

1791 

1792 tooltip = ( 

1793 "please contact %s-release if update is needed" % self.options.distribution 

1794 ) 

1795 

1796 assert self.hints is not None 

1797 shints = self.hints.search(package=src) 

1798 mismatches = False 

1799 r = self.BLOCK_HINT_REGEX 

1800 for hint in shints: 

1801 m = r.match(hint.type) 

1802 if m: 

1803 if m.group(1) == "un": 

1804 assert hint.suite is not None 

1805 if ( 

1806 hint.version != version 

1807 or hint.suite.name != suite_name 

1808 or (hint.architecture != arch and hint.architecture != "source") 

1809 ): 

1810 self.logger.info( 

1811 "hint mismatch: %s %s %s", version, arch, suite_name 

1812 ) 

1813 mismatches = True 

1814 else: 

1815 unblocked[m.group(2)] = hint.user 

1816 excuse.add_hint(hint) 

1817 else: 

1818 # block(-*) hint: only accepts a source, so this will 

1819 # always match 

1820 blocked[m.group(2)] = hint.user 

1821 excuse.add_hint(hint) 

1822 

1823 if "block" not in blocked and is_primary: 

1824 # if there is a specific block hint for this package, we don't 

1825 # check for the general hints 

1826 

1827 if self.options.distribution == "debian": 1827 ↛ 1834line 1827 didn't jump to line 1834 because the condition on line 1827 was always true

1828 url = "https://release.debian.org/testing/freeze_policy.html" 

1829 tooltip = ( 

1830 'Follow the <a href="%s">freeze policy</a> when applying for an unblock' 

1831 % url 

1832 ) 

1833 

1834 if "source" in self._blockall: 

1835 blocked["block"] = self._blockall["source"].user 

1836 excuse.add_hint(self._blockall["source"]) 

1837 elif ( 

1838 "new-source" in self._blockall 

1839 and src not in self.suite_info.target_suite.sources 

1840 ): 

1841 blocked["block"] = self._blockall["new-source"].user 

1842 excuse.add_hint(self._blockall["new-source"]) 

1843 # no tooltip: new sources will probably not be accepted anyway 

1844 block_info["block"] = "blocked by {}: is not in {}".format( 

1845 self._blockall["new-source"].user, 

1846 self.suite_info.target_suite.name, 

1847 ) 

1848 elif "key" in self._blockall and src in self._key_packages: 

1849 blocked["block"] = self._blockall["key"].user 

1850 excuse.add_hint(self._blockall["key"]) 

1851 block_info["block"] = "blocked by {}: is a key package ({})".format( 

1852 self._blockall["key"].user, 

1853 tooltip, 

1854 ) 

1855 elif "no-autopkgtest" in self._blockall: 

1856 if excuse.autopkgtest_results == {"PASS"}: 

1857 if not blocked: 1857 ↛ 1883line 1857 didn't jump to line 1883 because the condition on line 1857 was always true

1858 excuse.addinfo("not blocked: has successful autopkgtest") 

1859 else: 

1860 blocked["block"] = self._blockall["no-autopkgtest"].user 

1861 excuse.add_hint(self._blockall["no-autopkgtest"]) 

1862 if not excuse.autopkgtest_results: 

1863 block_info["block"] = ( 

1864 "blocked by %s: does not have autopkgtest (%s)" 

1865 % ( 

1866 self._blockall["no-autopkgtest"].user, 

1867 tooltip, 

1868 ) 

1869 ) 

1870 else: 

1871 block_info["block"] = ( 

1872 "blocked by %s: autopkgtest not fully successful (%s)" 

1873 % ( 

1874 self._blockall["no-autopkgtest"].user, 

1875 tooltip, 

1876 ) 

1877 ) 

1878 

1879 elif not is_primary: 

1880 blocked["block"] = suite_name 

1881 excuse.needs_approval = True 

1882 

1883 for block_cmd in blocked: 

1884 unblock_cmd = "un" + block_cmd 

1885 if block_cmd in unblocked: 

1886 if is_primary or block_cmd == "block-udeb": 

1887 excuse.addinfo( 

1888 "Ignoring %s request by %s, due to %s request by %s" 

1889 % ( 

1890 block_cmd, 

1891 blocked[block_cmd], 

1892 unblock_cmd, 

1893 unblocked[block_cmd], 

1894 ) 

1895 ) 

1896 else: 

1897 excuse.addinfo("Approved by %s" % (unblocked[block_cmd])) 

1898 else: 

1899 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL 

1900 if is_primary or block_cmd == "block-udeb": 

1901 # redirect people to d-i RM for udeb things: 

1902 if block_cmd == "block-udeb": 

1903 tooltip = "please contact the d-i release manager if an update is needed" 

1904 if block_cmd in block_info: 

1905 info = block_info[block_cmd] 

1906 else: 

1907 info = ( 

1908 "Not touching package due to {} request by {} ({})".format( 

1909 block_cmd, 

1910 blocked[block_cmd], 

1911 tooltip, 

1912 ) 

1913 ) 

1914 excuse.add_verdict_info(verdict, info) 

1915 else: 

1916 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM") 

1917 excuse.addreason("block") 

1918 if mismatches: 

1919 excuse.add_detailed_info( 

1920 "Some hints for %s do not match this item" % src 

1921 ) 

1922 return verdict 

1923 

1924 def apply_src_policy_impl( 

1925 self, 

1926 block_info: dict[str, Any], 

1927 item: MigrationItem, 

1928 source_data_tdist: SourcePackage | None, 

1929 source_data_srcdist: SourcePackage, 

1930 excuse: "Excuse", 

1931 ) -> PolicyVerdict: 

1932 return self._check_blocked(item, "source", source_data_srcdist.version, excuse) 

1933 

1934 def apply_srcarch_policy_impl( 

1935 self, 

1936 block_info: dict[str, Any], 

1937 item: MigrationItem, 

1938 arch: str, 

1939 source_data_tdist: SourcePackage | None, 

1940 source_data_srcdist: SourcePackage, 

1941 excuse: "Excuse", 

1942 ) -> PolicyVerdict: 

1943 return self._check_blocked(item, arch, source_data_srcdist.version, excuse) 

1944 

1945 

1946class BuiltOnBuilddPolicy(AbstractBasePolicy): 

1947 

1948 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1949 super().__init__( 

1950 "builtonbuildd", 

1951 options, 

1952 suite_info, 

1953 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1954 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1955 ) 

1956 self._builtonbuildd: dict[str, Any] = { 

1957 "signerinfo": None, 

1958 } 

1959 

1960 def register_hints(self, hint_parser: HintParser) -> None: 

1961 hint_parser.register_hint_type( 

1962 HintType( 

1963 "allow-archall-maintainer-upload", 

1964 versioned=HintAnnotate.FORBIDDEN, 

1965 ) 

1966 ) 

1967 

1968 def initialise(self, britney: "Britney") -> None: 

1969 super().initialise(britney) 

1970 try: 

1971 filename_signerinfo = os.path.join(self.state_dir, "signers.json") 

1972 except AttributeError as e: # pragma: no cover 

1973 raise RuntimeError( 

1974 "Please set STATE_DIR in the britney configuration" 

1975 ) from e 

1976 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo) 

1977 

1978 def apply_srcarch_policy_impl( 

1979 self, 

1980 buildd_info: dict[str, Any], 

1981 item: MigrationItem, 

1982 arch: str, 

1983 source_data_tdist: SourcePackage | None, 

1984 source_data_srcdist: SourcePackage, 

1985 excuse: "Excuse", 

1986 ) -> PolicyVerdict: 

1987 verdict = PolicyVerdict.PASS 

1988 signers = self._builtonbuildd["signerinfo"] 

1989 

1990 if "signed-by" not in buildd_info: 

1991 buildd_info["signed-by"] = {} 

1992 

1993 source_suite = item.suite 

1994 

1995 # horrible hard-coding, but currently, we don't keep track of the 

1996 # component when loading the packages files 

1997 component = "main" 

1998 # we use the source component, because a binary in contrib can 

1999 # belong to a source in main 

2000 section = source_data_srcdist.section 

2001 if section.find("/") > -1: 

2002 component = section.split("/")[0] 

2003 

2004 packages_s_a = source_suite.binaries[arch] 

2005 assert self.hints is not None 

2006 

2007 for pkg_id in sorted( 

2008 x 

2009 for x in filter_out_faux(source_data_srcdist.binaries) 

2010 if x.architecture == arch 

2011 ): 

2012 pkg_name = pkg_id.package_name 

2013 binary_u = packages_s_a[pkg_name] 

2014 pkg_arch = binary_u.architecture 

2015 

2016 if binary_u.source_version != source_data_srcdist.version: 2016 ↛ 2017line 2016 didn't jump to line 2017 because the condition on line 2016 was never true

2017 continue 

2018 

2019 if item.architecture != "source" and pkg_arch == "all": 

2020 # we don't care about the existing arch: all binaries when 

2021 # checking a binNMU item, because the arch: all binaries won't 

2022 # migrate anyway 

2023 continue 

2024 

2025 signer = None 

2026 uid = None 

2027 uidinfo = "" 

2028 buildd_ok = False 

2029 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2030 try: 

2031 signer = signers[pkg_name][pkg_id.version][pkg_arch] 

2032 if signer["buildd"]: 

2033 buildd_ok = True 

2034 uid = signer["uid"] 

2035 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}" 

2036 except KeyError: 

2037 self.logger.info( 

2038 "signer info for %s %s (%s) on %s not found " 

2039 % (pkg_name, binary_u.version, pkg_arch, arch) 

2040 ) 

2041 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch) 

2042 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

2043 if not buildd_ok: 

2044 if component != "main": 

2045 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2045 ↛ 2049line 2045 didn't jump to line 2049 because the condition on line 2045 was always true

2046 excuse.add_detailed_info( 

2047 f"{uidinfo}, but package in {component}" 

2048 ) 

2049 buildd_ok = True 

2050 elif pkg_arch == "all": 

2051 allow_hints = self.hints.search( 

2052 "allow-archall-maintainer-upload", package=item.package 

2053 ) 

2054 if allow_hints: 

2055 buildd_ok = True 

2056 verdict = PolicyVerdict.worst_of( 

2057 verdict, PolicyVerdict.PASS_HINTED 

2058 ) 

2059 if pkg_arch not in buildd_info["signed-by"]: 

2060 excuse.addinfo( 

2061 "%s, but whitelisted by %s" 

2062 % (uidinfo, allow_hints[0].user) 

2063 ) 

2064 if not buildd_ok: 

2065 verdict = failure_verdict 

2066 if pkg_arch not in buildd_info["signed-by"]: 

2067 if pkg_arch == "all": 

2068 uidinfo += ( 

2069 ", a new source-only upload is needed to allow migration" 

2070 ) 

2071 excuse.add_verdict_info( 

2072 verdict, "Not built on buildd: %s" % (uidinfo) 

2073 ) 

2074 

2075 if ( 2075 ↛ 2079line 2075 didn't jump to line 2079

2076 pkg_arch in buildd_info["signed-by"] 

2077 and buildd_info["signed-by"][pkg_arch] != uid 

2078 ): 

2079 self.logger.info( 

2080 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed" 

2081 % ( 

2082 pkg_name, 

2083 binary_u.source, 

2084 binary_u.source_version, 

2085 pkg_arch, 

2086 uid, 

2087 buildd_info["signed-by"][pkg_arch], 

2088 ) 

2089 ) 

2090 

2091 buildd_info["signed-by"][pkg_arch] = uid 

2092 

2093 return verdict 

2094 

2095 def _read_signerinfo(self, filename: str) -> dict[str, Any]: 

2096 signerinfo: dict[str, Any] = {} 

2097 self.logger.info("Loading signer info from %s", filename) 

2098 with open(filename) as fd: 2098 ↛ exitline 2098 didn't return from function '_read_signerinfo' because the return on line 2100 wasn't executed

2099 if os.fstat(fd.fileno()).st_size < 1: 2099 ↛ 2100line 2099 didn't jump to line 2100 because the condition on line 2099 was never true

2100 return signerinfo 

2101 signerinfo = json.load(fd) 

2102 

2103 return signerinfo 

2104 

2105 

2106class ImplicitDependencyPolicy(AbstractBasePolicy): 

2107 """Implicit Dependency policy 

2108 

2109 Upgrading a package pkg-a can break the installability of a package pkg-b. 

2110 A newer version (or the removal) of pkg-b might fix the issue. In that 

2111 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only 

2112 migrate if pkg-b also migrates. 

2113 

2114 This policy tries to discover a few common cases, and adds the relevant 

2115 info to the excuses. If another item is needed to fix the 

2116 uninstallability, a dependency is added. If no newer item can fix it, this 

2117 excuse will be blocked. 

2118 

2119 Note that the migration step will check the installability of every 

2120 package, so this policy doesn't need to handle every corner case. It 

2121 must, however, make sure that no excuse is unnecessarily blocked. 

2122 

2123 Some cases that should be detected by this policy: 

2124 

2125 * pkg-a is upgraded from 1.0-1 to 2.0-1, while 

2126 pkg-b has "Depends: pkg-a (<< 2.0)" 

2127 This typically happens if pkg-b has a strict dependency on pkg-a because 

2128 it uses some non-stable internal interface (examples are glibc, 

2129 binutils, python3-defaults, ...) 

2130 

2131 * pkg-a is upgraded from 1.0-1 to 2.0-1, and 

2132 pkg-a 1.0-1 has "Provides: provides-1", 

2133 pkg-a 2.0-1 has "Provides: provides-2", 

2134 pkg-b has "Depends: provides-1" 

2135 This typically happens when pkg-a has an interface that changes between 

2136 versions, and a virtual package is used to identify the version of this 

2137 interface (e.g. perl-api-x.y) 

2138 

2139 """ 

2140 

2141 _pkg_universe: "BinaryPackageUniverse" 

2142 _all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

2143 _allow_uninst: dict[str, set[str | None]] 

2144 _nobreakall_arches: list[str] 

2145 

2146 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2147 super().__init__( 

2148 "implicit-deps", 

2149 options, 

2150 suite_info, 

2151 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2152 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2153 ) 

2154 

2155 def initialise(self, britney: "Britney") -> None: 

2156 super().initialise(britney) 

2157 self._pkg_universe = britney.pkg_universe 

2158 self._all_binaries = britney.all_binaries 

2159 self._smooth_updates = britney.options.smooth_updates 

2160 self._nobreakall_arches = self.options.nobreakall_arches 

2161 self._new_arches = self.options.new_arches 

2162 self._break_arches = self.options.break_arches 

2163 self._allow_uninst = britney.allow_uninst 

2164 self._outofsync_arches = self.options.outofsync_arches 

2165 

2166 def can_be_removed(self, pkg: BinaryPackage) -> bool: 

2167 src = pkg.source 

2168 target_suite = self.suite_info.target_suite 

2169 

2170 # TODO these conditions shouldn't be hardcoded here 

2171 # ideally, we would be able to look up excuses to see if the removal 

2172 # is in there, but in the current flow, this policy is called before 

2173 # all possible excuses exist, so there is no list for us to check 

2174 

2175 if src not in self.suite_info.primary_source_suite.sources: 

2176 # source for pkg not in unstable: candidate for removal 

2177 return True 

2178 

2179 source_t = target_suite.sources[src] 

2180 assert self.hints is not None 

2181 for hint in self.hints.search("remove", package=src, version=source_t.version): 

2182 # removal hint for the source in testing: candidate for removal 

2183 return True 

2184 

2185 if target_suite.is_cruft(pkg): 

2186 # if pkg is cruft in testing, removal will be tried 

2187 return True 

2188 

2189 # the case were the newer version of the source no longer includes the 

2190 # binary (or includes a cruft version of the binary) will be handled 

2191 # separately (in that case there might be an implicit dependency on 

2192 # the newer source) 

2193 

2194 return False 

2195 

2196 def should_skip_rdep( 

2197 self, pkg: BinaryPackage, source_name: str, myarch: str 

2198 ) -> bool: 

2199 target_suite = self.suite_info.target_suite 

2200 

2201 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id): 

2202 # it is not in the target suite, migration cannot break anything 

2203 return True 

2204 

2205 if pkg.source == source_name: 

2206 # if it is built from the same source, it will be upgraded 

2207 # with the source 

2208 return True 

2209 

2210 if self.can_be_removed(pkg): 

2211 # could potentially be removed, so if that happens, it won't be 

2212 # broken 

2213 return True 

2214 

2215 if pkg.architecture == "all" and myarch not in self._nobreakall_arches: 

2216 # arch all on non nobreakarch is allowed to become uninstallable 

2217 return True 

2218 

2219 if pkg.pkg_id.package_name in self._allow_uninst[myarch]: 

2220 # there is a hint to allow this binary to become uninstallable 

2221 return True 

2222 

2223 if not target_suite.is_installable(pkg.pkg_id): 

2224 # it is already uninstallable in the target suite, migration 

2225 # cannot break anything 

2226 return True 

2227 

2228 return False 

2229 

2230 def breaks_installability( 

2231 self, 

2232 pkg_id_t: BinaryPackageId, 

2233 pkg_id_s: BinaryPackageId | None, 

2234 pkg_to_check: BinaryPackageId, 

2235 ) -> bool: 

2236 """ 

2237 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of 

2238 pkg_to_check. 

2239 

2240 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to 

2241 None. 

2242 """ 

2243 

2244 pkg_universe = self._pkg_universe 

2245 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check) 

2246 

2247 for dep in pkg_universe.dependencies_of(pkg_to_check): 

2248 if pkg_id_t not in dep: 

2249 # this depends doesn't have pkg_id_t as alternative, so 

2250 # upgrading pkg_id_t cannot break this dependency clause 

2251 continue 

2252 

2253 # We check all the alternatives for this dependency, to find one 

2254 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s 

2255 found_alternative = False 

2256 for d in dep: 

2257 if d in negative_deps: 

2258 # If this alternative dependency conflicts with 

2259 # pkg_to_check, it cannot be used to satisfy the 

2260 # dependency. 

2261 # This commonly happens when breaks are added to pkg_id_s. 

2262 continue 

2263 

2264 if d.package_name != pkg_id_t.package_name: 

2265 # a binary different from pkg_id_t can satisfy the dep, so 

2266 # upgrading pkg_id_t won't break this dependency 

2267 found_alternative = True 

2268 break 

2269 

2270 if d != pkg_id_s: 

2271 # We want to know the impact of the upgrade of 

2272 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the 

2273 # target suite, any other version of this binary will 

2274 # not be there, so it cannot satisfy this dependency. 

2275 # This includes pkg_id_t, but also other versions. 

2276 continue 

2277 

2278 # pkg_id_s can satisfy the dep 

2279 found_alternative = True 

2280 

2281 if not found_alternative: 

2282 return True 

2283 return False 

2284 

2285 def check_upgrade( 

2286 self, 

2287 pkg_id_t: BinaryPackageId, 

2288 pkg_id_s: BinaryPackageId | None, 

2289 source_name: str, 

2290 myarch: str, 

2291 broken_binaries: set[str], 

2292 excuse: "Excuse", 

2293 ) -> PolicyVerdict: 

2294 verdict = PolicyVerdict.PASS 

2295 

2296 pkg_universe = self._pkg_universe 

2297 all_binaries = self._all_binaries 

2298 

2299 # check all rdeps of the package in testing 

2300 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t) 

2301 

2302 for rdep_pkg in sorted(rdeps_t): 

2303 rdep_p = all_binaries[rdep_pkg] 

2304 

2305 # check some cases where the rdep won't become uninstallable, or 

2306 # where we don't care if it does 

2307 if self.should_skip_rdep(rdep_p, source_name, myarch): 

2308 continue 

2309 

2310 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg): 

2311 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg, 

2312 # there is no implicit dependency 

2313 continue 

2314 

2315 # The upgrade breaks the installability of the rdep. We need to 

2316 # find out if there is a newer version of the rdep that solves the 

2317 # uninstallability. If that is the case, there is an implicit 

2318 # dependency. If not, the upgrade will fail. 

2319 

2320 # check source versions 

2321 newer_versions = find_newer_binaries( 

2322 self.suite_info, rdep_p, add_source_for_dropped_bin=True 

2323 ) 

2324 good_newer_versions = set() 

2325 for npkg, suite in newer_versions: 

2326 if npkg.architecture == "source": 

2327 # When a newer version of the source package doesn't have 

2328 # the binary, we get the source as 'newer version'. In 

2329 # this case, the binary will not be uninstallable if the 

2330 # newer source migrates, because it is no longer there. 

2331 good_newer_versions.add(npkg) 

2332 continue 

2333 assert isinstance(npkg, BinaryPackageId) 

2334 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg): 

2335 good_newer_versions.add(npkg) 

2336 

2337 if good_newer_versions: 

2338 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch) 

2339 excuse.add_package_depends(spec, good_newer_versions) 

2340 else: 

2341 # no good newer versions: no possible solution 

2342 broken_binaries.add(rdep_pkg.name) 

2343 if pkg_id_s: 

2344 action = "migrating {} to {}".format( 

2345 pkg_id_s.name, 

2346 self.suite_info.target_suite.name, 

2347 ) 

2348 else: 

2349 action = "removing {} from {}".format( 

2350 pkg_id_t.name, 

2351 self.suite_info.target_suite.name, 

2352 ) 

2353 if rdep_pkg[0].endswith("-faux-build-depends"): 

2354 name = rdep_pkg[0].replace("-faux-build-depends", "") 

2355 info = f'{action} makes Build-Depends of src:<a href="#{name}">{name}</a> uninstallable' 

2356 else: 

2357 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format( 

2358 action, rdep_pkg.name 

2359 ) 

2360 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2361 excuse.add_verdict_info(verdict, info) 

2362 

2363 return verdict 

2364 

2365 def apply_srcarch_policy_impl( 

2366 self, 

2367 implicit_dep_info: dict[str, Any], 

2368 item: MigrationItem, 

2369 arch: str, 

2370 source_data_tdist: SourcePackage | None, 

2371 source_data_srcdist: SourcePackage, 

2372 excuse: "Excuse", 

2373 ) -> PolicyVerdict: 

2374 verdict = PolicyVerdict.PASS 

2375 

2376 if not source_data_tdist: 

2377 # this item is not currently in testing: no implicit dependency 

2378 return verdict 

2379 

2380 if excuse.hasreason("missingbuild"): 

2381 # if the build is missing, the policy would treat this as if the 

2382 # binaries would be removed, which would give incorrect (and 

2383 # confusing) info 

2384 info = "missing build, not checking implicit dependencies on %s" % (arch) 

2385 excuse.add_detailed_info(info) 

2386 return verdict 

2387 

2388 source_suite = item.suite 

2389 source_name = item.package 

2390 target_suite = self.suite_info.target_suite 

2391 all_binaries = self._all_binaries 

2392 

2393 # we check all binaries for this excuse that are currently in testing 

2394 relevant_binaries = [ 

2395 x 

2396 for x in source_data_tdist.binaries 

2397 if (arch == "source" or x.architecture == arch) 

2398 and x.package_name in target_suite.binaries[x.architecture] 

2399 and x.architecture not in self._new_arches 

2400 and x.architecture not in self._break_arches 

2401 and x.architecture not in self._outofsync_arches 

2402 ] 

2403 

2404 broken_binaries: set[str] = set() 

2405 

2406 assert self.hints is not None 

2407 for pkg_id_t in sorted(relevant_binaries): 

2408 mypkg = pkg_id_t.package_name 

2409 myarch = pkg_id_t.architecture 

2410 binaries_t_a = target_suite.binaries[myarch] 

2411 binaries_s_a = source_suite.binaries[myarch] 

2412 

2413 if target_suite.is_cruft(all_binaries[pkg_id_t]): 

2414 # this binary is cruft in testing: it will stay around as long 

2415 # as necessary to satisfy dependencies, so we don't need to 

2416 # care 

2417 continue 

2418 

2419 if mypkg in binaries_s_a: 

2420 mybin = binaries_s_a[mypkg] 

2421 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id 

2422 if mybin.source != source_name: 

2423 # hijack: this is too complicated to check, so we ignore 

2424 # it (the migration code will check the installability 

2425 # later anyway) 

2426 pass 

2427 elif mybin.source_version != source_data_srcdist.version: 

2428 # cruft in source suite: pretend the binary doesn't exist 

2429 pkg_id_s = None 

2430 elif pkg_id_t == pkg_id_s: 

2431 # same binary (probably arch: all from a binNMU): 

2432 # 'upgrading' doesn't change anything, for this binary, so 

2433 # it won't break anything 

2434 continue 

2435 else: 

2436 pkg_id_s = None 

2437 

2438 if not pkg_id_s and is_smooth_update_allowed( 

2439 binaries_t_a[mypkg], self._smooth_updates, self.hints 

2440 ): 

2441 # the binary isn't in the new version (or is cruft there), and 

2442 # smooth updates are allowed: the binary can stay around if 

2443 # that is necessary to satisfy dependencies, so we don't need 

2444 # to check it 

2445 continue 

2446 

2447 if ( 

2448 not pkg_id_s 

2449 and source_data_tdist.version == source_data_srcdist.version 

2450 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE 

2451 and binaries_t_a[mypkg].architecture == "all" 

2452 ): 

2453 # we're very probably migrating a binNMU built in tpu where the arch:all 

2454 # binaries were not copied to it as that's not needed. This policy could 

2455 # needlessly block. 

2456 continue 

2457 

2458 v = self.check_upgrade( 

2459 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse 

2460 ) 

2461 verdict = PolicyVerdict.worst_of(verdict, v) 

2462 

2463 # each arch is processed separately, so if we already have info from 

2464 # other archs, we need to merge the info from this arch 

2465 broken_old = set() 

2466 if "implicit-deps" not in implicit_dep_info: 

2467 implicit_dep_info["implicit-deps"] = {} 

2468 else: 

2469 broken_old = set(implicit_dep_info["implicit-deps"]["broken-binaries"]) 

2470 

2471 implicit_dep_info["implicit-deps"]["broken-binaries"] = sorted( 

2472 broken_old | broken_binaries 

2473 ) 

2474 

2475 return verdict 

2476 

2477 

2478class ReverseRemovalPolicy(AbstractBasePolicy): 

2479 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2480 super().__init__( 

2481 "reverseremoval", 

2482 options, 

2483 suite_info, 

2484 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2485 ) 

2486 

2487 def register_hints(self, hint_parser: HintParser) -> None: 

2488 hint_parser.register_hint_type(HintType("ignore-reverse-remove")) 

2489 

2490 def initialise(self, britney: "Britney") -> None: 

2491 super().initialise(britney) 

2492 

2493 pkg_universe = britney.pkg_universe 

2494 source_suites = britney.suite_info.source_suites 

2495 target_suite = britney.suite_info.target_suite 

2496 

2497 # Build set of the sources of reverse (Build-) Depends 

2498 assert self.hints is not None 

2499 hints = self.hints.search("remove") 

2500 

2501 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set) 

2502 for hint in hints: 

2503 for item in hint.packages: 

2504 # I think we don't need to look at the target suite 

2505 for src_suite in source_suites: 

2506 try: 

2507 # Explicitly not running filter_out_faux here 

2508 my_bins = set(src_suite.sources[item.uvname].binaries) 

2509 except KeyError: 

2510 continue 

2511 compute_reverse_tree(pkg_universe, my_bins) 

2512 for this_bin in my_bins: 

2513 rev_bin.setdefault(this_bin, set()).add(item.uvname) 

2514 

2515 rev_src: dict[str, set[str]] = defaultdict(set) 

2516 for bin_pkg, reasons in rev_bin.items(): 

2517 # If the pkg is in the target suite, there's nothing this 

2518 # policy wants to do. 

2519 if target_suite.is_pkg_in_the_suite(bin_pkg): 

2520 continue 

2521 that_bin = britney.all_binaries[bin_pkg] 

2522 bin_src = that_bin.source + "/" + that_bin.source_version 

2523 rev_src.setdefault(bin_src, set()).update(reasons) 

2524 self._block_src_for_rm_hint = rev_src 

2525 

2526 def apply_src_policy_impl( 

2527 self, 

2528 rev_remove_info: dict[str, Any], 

2529 item: MigrationItem, 

2530 source_data_tdist: SourcePackage | None, 

2531 source_data_srcdist: SourcePackage, 

2532 excuse: "Excuse", 

2533 ) -> PolicyVerdict: 

2534 verdict = PolicyVerdict.PASS 

2535 

2536 if item.name in self._block_src_for_rm_hint: 

2537 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name])) 

2538 assert self.hints is not None 

2539 ignore_hints = self.hints.search( 

2540 "ignore-reverse-remove", package=item.uvname, version=item.version 

2541 ) 

2542 excuse.addreason("reverseremoval") 

2543 if ignore_hints: 

2544 excuse.addreason("ignore-reverse-remove") 

2545 excuse.addinfo( 

2546 "Should block migration because of remove hint for %s, but forced by %s" 

2547 % (reason, ignore_hints[0].user) 

2548 ) 

2549 verdict = PolicyVerdict.PASS_HINTED 

2550 else: 

2551 excuse.addinfo("Remove hint for (transitive) dependency: %s" % reason) 

2552 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2553 

2554 return verdict 

2555 

2556 

2557class ReproduciblePolicy(AbstractBasePolicy): 

2558 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2559 super().__init__( 

2560 "reproducible", 

2561 options, 

2562 suite_info, 

2563 {SuiteClass.PRIMARY_SOURCE_SUITE}, 

2564 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2565 ) 

2566 self._reproducible: dict[str, Any] = { 

2567 "source": {}, 

2568 "target": {}, 

2569 } 

2570 

2571 # Default values for this policy's options 

2572 parse_option(options, "repro_success_bounty", default=0, to_int=True) 

2573 parse_option(options, "repro_regression_penalty", default=0, to_int=True) 

2574 parse_option(options, "repro_url") 

2575 parse_option(options, "repro_retry_url") 

2576 parse_option(options, "repro_components") 

2577 

2578 def register_hints(self, hint_parser: HintParser) -> None: 

2579 hint_parser.register_hint_type( 

2580 HintType("ignore-reproducible", architectured=HintAnnotate.OPTIONAL) 

2581 ) 

2582 

2583 def initialise(self, britney: "Britney") -> None: 

2584 super().initialise(britney) 

2585 source_suite = self.suite_info.primary_source_suite 

2586 target_suite = self.suite_info.target_suite 

2587 try: 

2588 filename = os.path.join(self.state_dir, "reproducible.json") 

2589 except AttributeError as e: # pragma: no cover 

2590 raise RuntimeError( 

2591 "Please set STATE_DIR in the britney configuration" 

2592 ) from e 

2593 

2594 self._reproducible = self._read_repro_status( 

2595 filename, 

2596 source={source_suite.name, source_suite.codename}, 

2597 target={target_suite.name, target_suite.codename}, 

2598 ) 

2599 

2600 def apply_srcarch_policy_impl( 

2601 self, 

2602 reproducible_info: dict[str, Any], 

2603 item: MigrationItem, 

2604 arch: str, 

2605 source_data_tdist: SourcePackage | None, 

2606 source_data_srcdist: SourcePackage, 

2607 excuse: "Excuse", 

2608 ) -> PolicyVerdict: 

2609 verdict = PolicyVerdict.PASS 

2610 

2611 # we don't want to apply this policy (yet) on binNMUs 

2612 if item.architecture != "source": 

2613 return verdict 

2614 

2615 # we're not supposed to judge on this arch 

2616 if arch not in self.options.repro_arches: 

2617 return verdict 

2618 

2619 # bail out if this arch has no packages for this source (not build 

2620 # here) 

2621 if arch not in excuse.packages: 

2622 return verdict 

2623 

2624 # horrible hard-coding, but currently, we don't keep track of the 

2625 # component when loading the packages files 

2626 component = "main" 

2627 if "/" in (section := source_data_srcdist.section): 

2628 component = section.split("/")[0] 

2629 

2630 if ( 

2631 self.options.repro_components 

2632 and component not in self.options.repro_components 

2633 ): 

2634 return verdict 

2635 

2636 source_name = item.package 

2637 try: 

2638 tar_res = self._reproducible["target"][arch] 

2639 src_res = self._reproducible["source"][arch] 

2640 except KeyError: 

2641 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2642 msg = "No reproducible data available at all for %s" % arch 

2643 excuse.add_verdict_info(verdict, msg) 

2644 return verdict 

2645 

2646 if source_data_tdist is None: 

2647 target_suite_state = "new" 

2648 elif source_name not in tar_res: 

2649 target_suite_state = "unknown" 

2650 elif tar_res[source_name]["version"] == source_data_tdist.version: 

2651 target_suite_state = tar_res[source_name]["status"] 

2652 else: 

2653 target_suite_state = "stale" 

2654 

2655 if source_name in src_res and src_res[source_name]["version"] == item.version: 

2656 source_suite_state = src_res[source_name]["status"] 

2657 else: 

2658 source_suite_state = "unknown" 

2659 

2660 # status of ['E404', 'FTBFS', 'FTBR', 'NFU', 'blacklisted', 'depwait', 

2661 # 'reproducible', 'timeout'] with ['new', 'stale', 'unknown'] 

2662 wait_states = ("E404", "depwait", "stale", "timeout", "unknown") 

2663 no_build_states = ("FTBFS", "NFU", "blacklisted") 

2664 

2665 # if this package doesn't build on this architecture, we don't need to 

2666 # judge it 

2667 # FTBFS: Fails to build from source on r-b infra 

2668 # NFU: the package explicitly doesn't support building on this arch 

2669 # blacklisted: per package per arch per suite 

2670 if source_suite_state in no_build_states: 

2671 return verdict 

2672 # Assume depwait in the source suite only are intermittent (might not 

2673 # be true, e.g. with new build depends) 

2674 if source_suite_state == target_suite_state and target_suite_state == "depwait": 

2675 return verdict 

2676 

2677 if self.options.repro_url: 

2678 url = self.options.repro_url.format(package=quote(source_name), arch=arch) 

2679 url_html = ' - <a href="%s">info</a>' % url 

2680 if self.options.repro_retry_url: 

2681 url_html += ( 

2682 ' <a href="%s">♻ </a>' 

2683 % self.options.repro_retry_url.format( 

2684 package=quote(source_name), arch=arch 

2685 ) 

2686 ) 

2687 # When run on multiple archs, the last one "wins" 

2688 reproducible_info["reproducible-test-url"] = url 

2689 else: 

2690 url = None 

2691 url_html = "" 

2692 

2693 eligible_for_bounty = False 

2694 if source_suite_state == "reproducible": 

2695 verdict = PolicyVerdict.PASS 

2696 msg = f"Reproducible on {arch}{url_html}" 

2697 reproducible_info.setdefault("test-results", []).append( 

2698 "reproducible on %s" % arch 

2699 ) 

2700 eligible_for_bounty = True 

2701 elif source_suite_state == "FTBR": 

2702 if target_suite_state == "new": 

2703 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2704 msg = f"New but not reproducible on {arch}{url_html}" 

2705 reproducible_info.setdefault("test-results", []).append( 

2706 "new but not reproducible on %s" % arch 

2707 ) 

2708 elif target_suite_state in wait_states: 

2709 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2710 msg = "Waiting for reproducibility reference results on {}{}".format( 

2711 arch, 

2712 url_html, 

2713 ) 

2714 reproducible_info.setdefault("test-results", []).append( 

2715 "waiting-for-reference-results on %s" % arch 

2716 ) 

2717 elif target_suite_state == "reproducible": 

2718 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2719 msg = f"Reproducibility regression on {arch}{url_html}" 

2720 reproducible_info.setdefault("test-results", []).append( 

2721 "regression on %s" % arch 

2722 ) 

2723 elif target_suite_state == "FTBR": 

2724 verdict = PolicyVerdict.PASS 

2725 msg = "Ignoring non-reproducibility on {} (not a regression){}".format( 

2726 arch, 

2727 url_html, 

2728 ) 

2729 reproducible_info.setdefault("test-results", []).append( 

2730 "not reproducible on %s" % arch 

2731 ) 

2732 else: 

2733 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2734 msg = "No reference result, but not reproducibility on {}{}".format( 

2735 arch, 

2736 url_html, 

2737 ) 

2738 reproducible_info.setdefault("test-results", []).append( 

2739 f"reference {target_suite_state} on {arch}" 

2740 ) 

2741 elif source_suite_state in wait_states: 

2742 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2743 msg = f"Waiting for reproducibility test results on {arch}{url_html}" 

2744 reproducible_info.setdefault("test-results", []).append( 

2745 "waiting-for-test-results on %s" % arch 

2746 ) 

2747 else: 

2748 raise KeyError("Unhandled reproducibility state %s" % source_suite_state) 

2749 

2750 if verdict.is_rejected: 

2751 assert self.hints is not None 

2752 for hint_arch in ("source", arch): 

2753 for ignore_hint in self.hints.search( 

2754 "ignore-reproducible", 

2755 package=source_name, 

2756 version=source_data_srcdist.version, 

2757 architecture=hint_arch, 

2758 ): 

2759 verdict = PolicyVerdict.PASS_HINTED 

2760 reproducible_info.setdefault("ignored-reproducible", {}).setdefault( 

2761 arch, {} 

2762 ).setdefault("issued-by", []).append(ignore_hint.user) 

2763 excuse.addinfo( 

2764 "Ignoring reproducibility issue on %s as requested " 

2765 "by %s" % (arch, ignore_hint.user) 

2766 ) 

2767 break 

2768 

2769 if self.options.repro_success_bounty and eligible_for_bounty: 

2770 excuse.add_bounty("reproducibility", self.options.repro_success_bounty) 

2771 

2772 if self.options.repro_regression_penalty and verdict in { 

2773 PolicyVerdict.REJECTED_PERMANENTLY, 

2774 PolicyVerdict.REJECTED_TEMPORARILY, 

2775 }: 

2776 if self.options.repro_regression_penalty > 0: 

2777 excuse.add_penalty( 

2778 "reproducibility", self.options.repro_regression_penalty 

2779 ) 

2780 # In case we give penalties instead of blocking, we must always pass 

2781 verdict = PolicyVerdict.PASS 

2782 

2783 if verdict.is_rejected: 

2784 excuse.add_verdict_info(verdict, msg) 

2785 else: 

2786 excuse.addinfo(msg) 

2787 

2788 return verdict 

2789 

2790 def _read_repro_status( 

2791 self, filename: str, source: set[str], target: set[str] 

2792 ) -> dict[str, dict[str, str]]: 

2793 summary = self._reproducible 

2794 self.logger.info("Loading reproducibility report from %s", filename) 

2795 with open(filename) as fd: 

2796 if os.fstat(fd.fileno()).st_size < 1: 

2797 return summary 

2798 data = json.load(fd) 

2799 

2800 for result in data: 

2801 if result["suite"] in source: 

2802 summary["source"].setdefault(result["architecture"], {})[ 

2803 result["package"] 

2804 ] = result 

2805 if result["suite"] in target: 

2806 summary["target"].setdefault(result["architecture"], {})[ 

2807 result["package"] 

2808 ] = result 

2809 

2810 return summary