Coverage for britney2/policies/policy.py: 91%

1331 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-29 17:21 +0000

1import json 

2import logging 

3import optparse 

4import os 

5import re 

6import sys 

7import time 

8from abc import ABC, abstractmethod 

9from collections import defaultdict 

10from collections.abc import Callable, Container 

11from enum import IntEnum, unique 

12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast 

13from urllib.parse import quote 

14 

15import apt_pkg 

16import yaml 

17 

18from britney2 import ( 

19 BinaryPackage, 

20 BinaryPackageId, 

21 DependencyType, 

22 PackageId, 

23 SourcePackage, 

24 Suite, 

25 SuiteClass, 

26 Suites, 

27 TargetSuite, 

28) 

29from britney2.excusedeps import DependencySpec 

30from britney2.hints import ( 

31 Hint, 

32 HintAnnotate, 

33 HintCollection, 

34 HintParser, 

35 HintType, 

36 PolicyHintParserProto, 

37) 

38from britney2.inputs.suiteloader import SuiteContentLoader 

39from britney2.migrationitem import MigrationItem, MigrationItemFactory 

40from britney2.policies import ApplySrcPolicy, PolicyVerdict 

41from britney2.utils import ( 

42 GetDependencySolversProto, 

43 compute_reverse_tree, 

44 filter_out_faux, 

45 find_newer_binaries, 

46 get_dependency_solvers, 

47 is_smooth_update_allowed, 

48 parse_option, 

49) 

50 

51if TYPE_CHECKING: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 from ..britney import Britney 

53 from ..excuse import Excuse 

54 from ..installability.universe import BinaryPackageUniverse 

55 

56 

57class PolicyLoadRequest: 

58 __slots__ = ("_options_name", "_default_value", "_policy_constructor") 

59 

60 def __init__( 

61 self, 

62 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

63 options_name: str | None, 

64 default_value: bool, 

65 ) -> None: 

66 self._policy_constructor = policy_constructor 

67 self._options_name = options_name 

68 self._default_value = default_value 

69 

70 def is_enabled(self, options: optparse.Values) -> bool: 

71 if self._options_name is None: 

72 assert self._default_value 

73 return True 

74 actual_value = getattr(options, self._options_name, None) 

75 if actual_value is None: 

76 return self._default_value 

77 return actual_value.lower() in ("yes", "y", "true", "t") 

78 

79 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy": 

80 return self._policy_constructor(options, suite_info) 

81 

82 @classmethod 

83 def always_load( 

84 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"] 

85 ) -> "PolicyLoadRequest": 

86 return cls(policy_constructor, None, True) 

87 

88 @classmethod 

89 def conditionally_load( 

90 cls, 

91 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"], 

92 option_name: str, 

93 default_value: bool, 

94 ) -> "PolicyLoadRequest": 

95 return cls(policy_constructor, option_name, default_value) 

96 

97 

98class PolicyEngine: 

99 def __init__(self) -> None: 

100 self._policies: list["BasePolicy"] = [] 

101 

102 def add_policy(self, policy: "BasePolicy") -> None: 

103 self._policies.append(policy) 

104 

105 def load_policies( 

106 self, 

107 options: optparse.Values, 

108 suite_info: Suites, 

109 policy_load_requests: list[PolicyLoadRequest], 

110 ) -> None: 

111 for policy_load_request in policy_load_requests: 

112 if policy_load_request.is_enabled(options): 

113 self.add_policy(policy_load_request.load(options, suite_info)) 

114 

115 def register_policy_hints(self, hint_parser: HintParser) -> None: 

116 for policy in self._policies: 

117 policy.register_hints(hint_parser) 

118 

119 def initialise(self, britney: "Britney", hints: HintCollection) -> None: 

120 for policy in self._policies: 

121 policy.hints = hints 

122 policy.initialise(britney) 

123 

124 def save_state(self, britney: "Britney") -> None: 

125 for policy in self._policies: 

126 policy.save_state(britney) 

127 

128 def apply_src_policies( 

129 self, 

130 source_t: SourcePackage | None, 

131 source_u: SourcePackage, 

132 excuse: "Excuse", 

133 ) -> None: 

134 excuse_verdict = excuse.policy_verdict 

135 source_suite = excuse.item.suite 

136 suite_class = source_suite.suite_class 

137 for policy in self._policies: 

138 pinfo: dict[str, Any] = {} 

139 policy_verdict = PolicyVerdict.NOT_APPLICABLE 

140 if suite_class in policy.applicable_suites: 

141 if policy.src_policy.run_arch: 

142 for arch in policy.options.architectures: 

143 v = policy.apply_srcarch_policy_impl( 

144 pinfo, arch, source_t, source_u, excuse 

145 ) 

146 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

147 if policy.src_policy.run_src: 

148 v = policy.apply_src_policy_impl(pinfo, source_t, source_u, excuse) 

149 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v) 

150 # The base policy provides this field, so the subclass should leave it blank 

151 assert "verdict" not in pinfo 

152 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

153 excuse.policy_info[policy.policy_id] = pinfo 

154 pinfo["verdict"] = policy_verdict.name 

155 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

156 excuse.policy_verdict = excuse_verdict 

157 

158 def apply_srcarch_policies( 

159 self, 

160 arch: str, 

161 source_t: SourcePackage | None, 

162 source_u: SourcePackage, 

163 excuse: "Excuse", 

164 ) -> None: 

165 excuse_verdict = excuse.policy_verdict 

166 source_suite = excuse.item.suite 

167 suite_class = source_suite.suite_class 

168 for policy in self._policies: 

169 pinfo: dict[str, Any] = {} 

170 if suite_class in policy.applicable_suites: 

171 policy_verdict = policy.apply_srcarch_policy_impl( 

172 pinfo, arch, source_t, source_u, excuse 

173 ) 

174 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict) 

175 # The base policy provides this field, so the subclass should leave it blank 

176 assert "verdict" not in pinfo 

177 if policy_verdict != PolicyVerdict.NOT_APPLICABLE: 

178 excuse.policy_info[policy.policy_id] = pinfo 

179 pinfo["verdict"] = policy_verdict.name 

180 excuse.policy_verdict = excuse_verdict 

181 

182 

183class BasePolicy(ABC): 

184 britney: "Britney" 

185 policy_id: str 

186 hints: HintCollection | None 

187 applicable_suites: set[SuiteClass] 

188 src_policy: ApplySrcPolicy 

189 options: optparse.Values 

190 suite_info: Suites 

191 

192 def __init__( 

193 self, 

194 options: optparse.Values, 

195 suite_info: Suites, 

196 ) -> None: 

197 """The BasePolicy constructor 

198 

199 :param options: The options member of Britney with all the 

200 config values. 

201 """ 

202 

203 @property 

204 @abstractmethod 

205 def state_dir(self) -> str: ... 205 ↛ exitline 205 didn't return from function 'state_dir' because

206 

207 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover 

208 """Register new hints that this policy accepts 

209 

210 :param hint_parser: (see HintParser.register_hint_type) 

211 """ 

212 

213 def initialise(self, britney: "Britney") -> None: # pragma: no cover 

214 """Called once to make the policy initialise any data structures 

215 

216 This is useful for e.g. parsing files or other "heavy do-once" work. 

217 

218 :param britney: This is the instance of the "Britney" class. 

219 """ 

220 self.britney = britney 

221 

222 def save_state(self, britney: "Britney") -> None: # pragma: no cover 

223 """Called once at the end of the run to make the policy save any persistent data 

224 

225 Note this will *not* be called for "dry-runs" as such runs should not change 

226 the state. 

227 

228 :param britney: This is the instance of the "Britney" class. 

229 """ 

230 

231 def apply_src_policy_impl( 

232 self, 

233 policy_info: dict[str, Any], 

234 source_data_tdist: SourcePackage | None, 

235 source_data_srcdist: SourcePackage, 

236 excuse: "Excuse", 

237 ) -> PolicyVerdict: # pragma: no cover 

238 """Apply a policy on a given source migration 

239 

240 Britney will call this method on a given source package, when 

241 Britney is considering to migrate it from the given source 

242 suite to the target suite. The policy will then evaluate the 

243 the migration and then return a verdict. 

244 

245 :param policy_info: A dictionary of all policy results. The 

246 policy can add a value stored in a key related to its name. 

247 (e.g. policy_info['age'] = {...}). This will go directly into 

248 the "excuses.yaml" output. 

249 

250 :param source_data_tdist: Information about the source package 

251 in the target distribution (e.g. "testing"). This is the 

252 data structure in source_suite.sources[source_name] 

253 

254 :param source_data_srcdist: Information about the source 

255 package in the source distribution (e.g. "unstable" or "tpu"). 

256 This is the data structure in target_suite.sources[source_name] 

257 

258 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

259 """ 

260 return PolicyVerdict.NOT_APPLICABLE 

261 

262 def apply_srcarch_policy_impl( 

263 self, 

264 policy_info: dict[str, Any], 

265 arch: str, 

266 source_data_tdist: SourcePackage | None, 

267 source_data_srcdist: SourcePackage, 

268 excuse: "Excuse", 

269 ) -> PolicyVerdict: 

270 """Apply a policy on a given binary migration 

271 

272 Britney will call this method on binaries from a given source package 

273 on a given architecture, when Britney is considering to migrate them 

274 from the given source suite to the target suite. The policy will then 

275 evaluate the migration and then return a verdict. 

276 

277 :param policy_info: A dictionary of all policy results. The 

278 policy can add a value stored in a key related to its name. 

279 (e.g. policy_info['age'] = {...}). This will go directly into 

280 the "excuses.yaml" output. 

281 

282 :param arch: The architecture the item is applied to. This is mostly 

283 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC 

284 (as that is the only case where arch can differ from item.architecture) 

285 

286 :param source_data_tdist: Information about the source package 

287 in the target distribution (e.g. "testing"). This is the 

288 data structure in source_suite.sources[source_name] 

289 

290 :param source_data_srcdist: Information about the source 

291 package in the source distribution (e.g. "unstable" or "tpu"). 

292 This is the data structure in target_suite.sources[source_name] 

293 

294 :return: A Policy Verdict (e.g. PolicyVerdict.PASS) 

295 """ 

296 # if the policy doesn't implement this function, assume it's OK 

297 return PolicyVerdict.NOT_APPLICABLE 

298 

299 

300class AbstractBasePolicy(BasePolicy): 

301 """ 

302 A shared abstract class for building BasePolicy objects. 

303 

304 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy 

305 objects with just a two-item constructor, while all other uses of BasePolicy- 

306 derived objects need the 5-item constructor. So AbstractBasePolicy was split 

307 out to document this. 

308 """ 

309 

310 def __init__( 

311 self, 

312 policy_id: str, 

313 options: optparse.Values, 

314 suite_info: Suites, 

315 applicable_suites: set[SuiteClass], 

316 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC, 

317 ) -> None: 

318 """Concrete initializer. 

319 

320 :param policy_id: Identifies the policy. It will 

321 determine the key used for the excuses.yaml etc. 

322 

323 :param options: The options member of Britney with all the 

324 config values. 

325 

326 :param applicable_suites: Where this policy applies. 

327 """ 

328 self.policy_id = policy_id 

329 self.options = options 

330 self.suite_info = suite_info 

331 self.applicable_suites = applicable_suites 

332 self.src_policy = src_policy 

333 self.hints: HintCollection | None = None 

334 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

335 self.logger = logging.getLogger(logger_name) 

336 

337 @property 

338 def state_dir(self) -> str: 

339 return cast(str, self.options.state_dir) 

340 

341 

342_T = TypeVar("_T") 

343 

344 

345class SimplePolicyHint(Hint, Generic[_T]): 

346 def __init__( 

347 self, 

348 user: str, 

349 hint_type: HintType, 

350 policy_parameter: _T, 

351 packages: list[MigrationItem], 

352 ) -> None: 

353 super().__init__(user, hint_type, packages) 

354 self._policy_parameter = policy_parameter 

355 

356 def __eq__(self, other: Any) -> bool: 

357 if self.type != other.type or self._policy_parameter != other._policy_parameter: 

358 return False 

359 return super().__eq__(other) 

360 

361 def str(self) -> str: 

362 return "{} {} {}".format( 

363 self._type, 

364 str(self._policy_parameter), 

365 " ".join(x.name for x in self._packages), 

366 ) 

367 

368 

369class AgeDayHint(SimplePolicyHint[int]): 

370 @property 

371 def days(self) -> int: 

372 return self._policy_parameter 

373 

374 

375class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]): 

376 @property 

377 def ignored_rcbugs(self) -> frozenset[str]: 

378 return self._policy_parameter 

379 

380 

381def simple_policy_hint_parser_function( 

382 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint], 

383 converter: Callable[[str], _T], 

384) -> PolicyHintParserProto: 

385 def f( 

386 mi_factory: MigrationItemFactory, 

387 hints: HintCollection, 

388 who: str, 

389 hint_type: HintType, 

390 *args: str, 

391 ) -> None: 

392 policy_parameter = args[0] 

393 args = args[1:] 

394 for item in mi_factory.parse_items(*args): 

395 hints.add_hint( 

396 class_name(who, hint_type, converter(policy_parameter), [item]) 

397 ) 

398 

399 return f 

400 

401 

402class AgePolicy(AbstractBasePolicy): 

403 """Configurable Aging policy for source migrations 

404 

405 The AgePolicy will let packages stay in the source suite for a pre-defined 

406 amount of days before letting migrate (based on their urgency, if any). 

407 

408 The AgePolicy's decision is influenced by the following: 

409 

410 State files: 

411 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source 

412 packages. Note that urgencies are "sticky" and the most "urgent" urgency 

413 will be used (i.e. the one with lowest age-requirements). 

414 - This file needs to be updated externally, if the policy should take 

415 urgencies into consideration. If empty (or not updated), the policy 

416 will simply use the default urgency (see the "Config" section below) 

417 - In Debian, these values are taken from the .changes file, but that is 

418 not a requirement for Britney. 

419 * ${STATE_DIR}/age-policy-dates: File containing the age of all source 

420 packages. 

421 - The policy will automatically update this file. 

422 Config: 

423 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency 

424 (or for unknown urgencies). Will also be used to set the "minimum" 

425 aging requirements for packages not in the target suite. 

426 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the 

427 given urgency. 

428 - Commonly used urgencies are: low, medium, high, emergency, critical 

429 Hints: 

430 * urgent <source>/<version>: Disregard the age requirements for a given 

431 source/version. 

432 * age-days X <source>/<version>: Set the age requirements for a given 

433 source/version to X days. Note that X can exceed the highest 

434 age-requirement normally given. 

435 

436 """ 

437 

438 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

439 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}) 

440 self._min_days = self._generate_mindays_table() 

441 self._min_days_default = 0 

442 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day) 

443 # NB: _date_now is used in tests 

444 time_now = time.time() 

445 if hasattr(self.options, "fake_runtime"): 

446 time_now = int(self.options.fake_runtime) 

447 self.logger.info("overriding runtime with fake_runtime %d" % time_now) 

448 

449 self._date_now = int(((time_now / (60 * 60)) - 19) / 24) 

450 self._dates: dict[str, tuple[str, int]] = {} 

451 self._urgencies: dict[str, str] = {} 

452 self._default_urgency: str = self.options.default_urgency 

453 self._penalty_immune_urgencies: frozenset[str] = frozenset() 

454 if hasattr(self.options, "no_penalties"): 

455 self._penalty_immune_urgencies = frozenset( 

456 x.strip() for x in self.options.no_penalties.split() 

457 ) 

458 self._bounty_min_age: int | None = None # initialised later 

459 

460 def _generate_mindays_table(self) -> dict[str, int]: 

461 mindays: dict[str, int] = {} 

462 for k in dir(self.options): 

463 if not k.startswith("mindays_"): 

464 continue 

465 v = getattr(self.options, k) 

466 try: 

467 as_days = int(v) 

468 except ValueError: 

469 raise ValueError( 

470 "Unable to parse " 

471 + k 

472 + " as a number of days. Must be 0 or a positive integer" 

473 ) 

474 if as_days < 0: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true

475 raise ValueError( 

476 "The value of " + k + " must be zero or a positive integer" 

477 ) 

478 mindays[k.split("_")[1]] = as_days 

479 return mindays 

480 

481 def register_hints(self, hint_parser: HintParser) -> None: 

482 hint_parser.register_hint_type( 

483 HintType( 

484 "age-days", 

485 simple_policy_hint_parser_function(AgeDayHint, int), 

486 min_args=2, 

487 ) 

488 ) 

489 hint_parser.register_hint_type(HintType("urgent")) 

490 

491 def initialise(self, britney: "Britney") -> None: 

492 super().initialise(britney) 

493 self._read_dates_file() 

494 self._read_urgencies_file() 

495 if self._default_urgency not in self._min_days: # pragma: no cover 

496 raise ValueError( 

497 "Missing age-requirement for default urgency (MINDAYS_%s)" 

498 % self._default_urgency 

499 ) 

500 self._min_days_default = self._min_days[self._default_urgency] 

501 try: 

502 self._bounty_min_age = int(self.options.bounty_min_age) 

503 except ValueError: 503 ↛ 504line 503 didn't jump to line 504 because the exception caught by line 503 didn't happen

504 if self.options.bounty_min_age in self._min_days: 

505 self._bounty_min_age = self._min_days[self.options.bounty_min_age] 

506 else: # pragma: no cover 

507 raise ValueError( 

508 "Please fix BOUNTY_MIN_AGE in the britney configuration" 

509 ) 

510 except AttributeError: 

511 # The option wasn't defined in the configuration 

512 self._bounty_min_age = 0 

513 

514 def save_state(self, britney: "Britney") -> None: 

515 super().save_state(britney) 

516 self._write_dates_file() 

517 

518 def apply_src_policy_impl( 

519 self, 

520 age_info: dict[str, Any], 

521 source_data_tdist: SourcePackage | None, 

522 source_data_srcdist: SourcePackage, 

523 excuse: "Excuse", 

524 ) -> PolicyVerdict: 

525 # retrieve the urgency for the upload, ignoring it if this is a NEW package 

526 # (not present in the target suite) 

527 source_name = excuse.item.package 

528 urgency = self._urgencies.get(source_name, self._default_urgency) 

529 

530 if urgency not in self._min_days: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 age_info["unknown-urgency"] = urgency 

532 urgency = self._default_urgency 

533 

534 if not source_data_tdist: 

535 if self._min_days[urgency] < self._min_days_default: 

536 age_info["urgency-reduced"] = { 

537 "from": urgency, 

538 "to": self._default_urgency, 

539 } 

540 urgency = self._default_urgency 

541 

542 if source_name not in self._dates: 

543 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

544 elif self._dates[source_name][0] != source_data_srcdist.version: 

545 self._dates[source_name] = (source_data_srcdist.version, self._date_now) 

546 

547 days_old = self._date_now - self._dates[source_name][1] 

548 min_days = self._min_days[urgency] 

549 for bounty in excuse.bounty: 

550 if excuse.bounty[bounty]: 550 ↛ 549line 550 didn't jump to line 549 because the condition on line 550 was always true

551 self.logger.info( 

552 "Applying bounty for %s granted by %s: %d days", 

553 source_name, 

554 bounty, 

555 excuse.bounty[bounty], 

556 ) 

557 excuse.addinfo( 

558 "Required age reduced by %d days because of %s" 

559 % (excuse.bounty[bounty], bounty) 

560 ) 

561 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen" 

562 min_days -= excuse.bounty[bounty] 

563 if urgency not in self._penalty_immune_urgencies: 

564 for penalty in excuse.penalty: 

565 if excuse.penalty[penalty]: 565 ↛ 564line 565 didn't jump to line 564 because the condition on line 565 was always true

566 self.logger.info( 

567 "Applying penalty for %s given by %s: %d days", 

568 source_name, 

569 penalty, 

570 excuse.penalty[penalty], 

571 ) 

572 excuse.addinfo( 

573 "Required age increased by %d days because of %s" 

574 % (excuse.penalty[penalty], penalty) 

575 ) 

576 assert ( 

577 excuse.penalty[penalty] > 0 

578 ), "negative penalties should be handled earlier" 

579 min_days += excuse.penalty[penalty] 

580 

581 assert self._bounty_min_age is not None 

582 # the age in BOUNTY_MIN_AGE can be higher than the one associated with 

583 # the real urgency, so don't forget to take it into account 

584 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency]) 

585 if min_days < bounty_min_age: 

586 min_days = bounty_min_age 

587 excuse.addinfo( 

588 "Required age is not allowed to drop below %d days" % min_days 

589 ) 

590 

591 age_info["current-age"] = days_old 

592 

593 assert self.hints is not None 

594 for age_days_hint in cast( 

595 "list[AgeDayHint]", 

596 self.hints.search( 

597 "age-days", package=source_name, version=source_data_srcdist.version 

598 ), 

599 ): 

600 new_req = age_days_hint.days 

601 age_info["age-requirement-reduced"] = { 

602 "new-requirement": new_req, 

603 "changed-by": age_days_hint.user, 

604 } 

605 if "original-age-requirement" not in age_info: 605 ↛ 607line 605 didn't jump to line 607 because the condition on line 605 was always true

606 age_info["original-age-requirement"] = min_days 

607 min_days = new_req 

608 

609 age_info["age-requirement"] = min_days 

610 res = PolicyVerdict.PASS 

611 

612 if days_old < min_days: 

613 urgent_hints = self.hints.search( 

614 "urgent", package=source_name, version=source_data_srcdist.version 

615 ) 

616 if urgent_hints: 

617 age_info["age-requirement-reduced"] = { 

618 "new-requirement": 0, 

619 "changed-by": urgent_hints[0].user, 

620 } 

621 res = PolicyVerdict.PASS_HINTED 

622 else: 

623 res = PolicyVerdict.REJECTED_TEMPORARILY 

624 

625 # update excuse 

626 age_hint = age_info.get("age-requirement-reduced", None) 

627 age_min_req = age_info["age-requirement"] 

628 if age_hint: 

629 new_req = age_hint["new-requirement"] 

630 who = age_hint["changed-by"] 

631 if new_req: 

632 excuse.addinfo( 

633 "Overriding age needed from %d days to %d by %s" 

634 % (age_min_req, new_req, who) 

635 ) 

636 age_min_req = new_req 

637 else: 

638 excuse.addinfo("Too young, but urgency pushed by %s" % who) 

639 age_min_req = 0 

640 excuse.setdaysold(age_info["current-age"], age_min_req) 

641 

642 if age_min_req == 0: 

643 excuse.addinfo("%d days old" % days_old) 

644 elif days_old < age_min_req: 

645 excuse.add_verdict_info( 

646 res, "Too young, only %d of %d days old" % (days_old, age_min_req) 

647 ) 

648 else: 

649 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req)) 

650 

651 return res 

652 

653 def _read_dates_file(self) -> None: 

654 """Parse the dates file""" 

655 dates = self._dates 

656 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates") 

657 using_new_name = False 

658 try: 

659 filename = os.path.join(self.state_dir, "age-policy-dates") 

660 if not os.path.exists(filename) and os.path.exists(fallback_filename): 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true

661 filename = fallback_filename 

662 else: 

663 using_new_name = True 

664 except AttributeError: 

665 if os.path.exists(fallback_filename): 

666 filename = fallback_filename 

667 else: 

668 raise RuntimeError("Please set STATE_DIR in the britney configuration") 

669 

670 try: 

671 with open(filename, encoding="utf-8") as fd: 

672 for line in fd: 

673 if line.startswith("#"): 

674 # Ignore comment lines (mostly used for tests) 

675 continue 

676 # <source> <version> <date>) 

677 ln = line.split() 

678 if len(ln) != 3: # pragma: no cover 

679 continue 

680 try: 

681 dates[ln[0]] = (ln[1], int(ln[2])) 

682 except ValueError: # pragma: no cover 

683 pass 

684 except FileNotFoundError: 

685 if not using_new_name: 685 ↛ 687line 685 didn't jump to line 687 because the condition on line 685 was never true

686 # If we using the legacy name, then just give up 

687 raise 

688 self.logger.info("%s does not appear to exist. Creating it", filename) 

689 with open(filename, mode="x", encoding="utf-8"): 

690 pass 

691 

692 def _read_urgencies_file(self) -> None: 

693 urgencies = self._urgencies 

694 min_days_default = self._min_days_default 

695 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency") 

696 try: 

697 filename = os.path.join(self.state_dir, "age-policy-urgencies") 

698 if not os.path.exists(filename) and os.path.exists(fallback_filename): 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true

699 filename = fallback_filename 

700 except AttributeError: 

701 filename = fallback_filename 

702 

703 sources_s = self.suite_info.primary_source_suite.sources 

704 sources_t = self.suite_info.target_suite.sources 

705 

706 with open(filename, errors="surrogateescape", encoding="ascii") as fd: 

707 for line in fd: 

708 if line.startswith("#"): 

709 # Ignore comment lines (mostly used for tests) 

710 continue 

711 # <source> <version> <urgency> 

712 ln = line.split() 

713 if len(ln) != 3: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true

714 continue 

715 

716 # read the minimum days associated with the urgencies 

717 urgency_old = urgencies.get(ln[0], None) 

718 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type] 

719 mindays_new = self._min_days.get(ln[2], min_days_default) 

720 

721 # if the new urgency is lower (so the min days are higher), do nothing 

722 if mindays_old <= mindays_new: 

723 continue 

724 

725 # if the package exists in the target suite and it is more recent, do nothing 

726 tsrcv = sources_t.get(ln[0], None) 

727 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0: 

728 continue 

729 

730 # if the package doesn't exist in the primary source suite or it is older, do nothing 

731 usrcv = sources_s.get(ln[0], None) 

732 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true

733 continue 

734 

735 # update the urgency for the package 

736 urgencies[ln[0]] = ln[2] 

737 

738 def _write_dates_file(self) -> None: 

739 dates = self._dates 

740 try: 

741 directory = self.state_dir 

742 basename = "age-policy-dates" 

743 old_file = os.path.join(self.suite_info.target_suite.path, "Dates") 

744 except AttributeError: 

745 directory = self.suite_info.target_suite.path 

746 basename = "Dates" 

747 old_file = None 

748 filename = os.path.join(directory, basename) 

749 filename_tmp = os.path.join(directory, "%s_new" % basename) 

750 with open(filename_tmp, "w", encoding="utf-8") as fd: 

751 for pkg in sorted(dates): 

752 version, date = dates[pkg] 

753 fd.write("%s %s %d\n" % (pkg, version, date)) 

754 os.rename(filename_tmp, filename) 

755 if old_file is not None and os.path.exists(old_file): 755 ↛ 756line 755 didn't jump to line 756 because the condition on line 755 was never true

756 self.logger.info("Removing old age-policy-dates file %s", old_file) 

757 os.unlink(old_file) 

758 

759 

760class RCBugPolicy(AbstractBasePolicy): 

761 """RC bug regression policy for source migrations 

762 

763 The RCBugPolicy will read provided list of RC bugs and block any 

764 source upload that would introduce a *new* RC bug in the target 

765 suite. 

766 

767 The RCBugPolicy's decision is influenced by the following: 

768 

769 State files: 

770 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in 

771 the given suite (one for both primary source suite and the target sutie is 

772 needed). 

773 - These files need to be updated externally. 

774 """ 

775 

776 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

777 super().__init__( 

778 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

779 ) 

780 self._bugs_source: dict[str, set[str]] | None = None 

781 self._bugs_target: dict[str, set[str]] | None = None 

782 

783 def register_hints(self, hint_parser: HintParser) -> None: 

784 f = simple_policy_hint_parser_function( 

785 IgnoreRCBugHint, lambda x: frozenset(x.split(",")) 

786 ) 

787 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2)) 

788 

789 def initialise(self, britney: "Britney") -> None: 

790 super().initialise(britney) 

791 source_suite = self.suite_info.primary_source_suite 

792 target_suite = self.suite_info.target_suite 

793 fallback_unstable = os.path.join(source_suite.path, "BugsV") 

794 fallback_testing = os.path.join(target_suite.path, "BugsV") 

795 try: 

796 filename_unstable = os.path.join( 

797 self.state_dir, "rc-bugs-%s" % source_suite.name 

798 ) 

799 filename_testing = os.path.join( 

800 self.state_dir, "rc-bugs-%s" % target_suite.name 

801 ) 

802 if ( 802 ↛ 808line 802 didn't jump to line 808

803 not os.path.exists(filename_unstable) 

804 and not os.path.exists(filename_testing) 

805 and os.path.exists(fallback_unstable) 

806 and os.path.exists(fallback_testing) 

807 ): 

808 filename_unstable = fallback_unstable 

809 filename_testing = fallback_testing 

810 except AttributeError: 

811 filename_unstable = fallback_unstable 

812 filename_testing = fallback_testing 

813 self._bugs_source = self._read_bugs(filename_unstable) 

814 self._bugs_target = self._read_bugs(filename_testing) 

815 

816 def apply_src_policy_impl( 

817 self, 

818 rcbugs_info: dict[str, Any], 

819 source_data_tdist: SourcePackage | None, 

820 source_data_srcdist: SourcePackage, 

821 excuse: "Excuse", 

822 ) -> PolicyVerdict: 

823 assert self._bugs_source is not None # for type checking 

824 assert self._bugs_target is not None # for type checking 

825 bugs_t = set() 

826 bugs_s = set() 

827 source_name = excuse.item.package 

828 binaries_s = {x[0] for x in source_data_srcdist.binaries} 

829 try: 

830 binaries_t = {x[0] for x in source_data_tdist.binaries} # type: ignore[union-attr] 

831 except AttributeError: 

832 binaries_t = set() 

833 

834 src_key = f"src:{source_name}" 

835 if source_data_tdist and src_key in self._bugs_target: 

836 bugs_t.update(self._bugs_target[src_key]) 

837 if src_key in self._bugs_source: 

838 bugs_s.update(self._bugs_source[src_key]) 

839 

840 for pkg in binaries_s: 

841 if pkg in self._bugs_source: 

842 bugs_s |= self._bugs_source[pkg] 

843 for pkg in binaries_t: 

844 if pkg in self._bugs_target: 

845 bugs_t |= self._bugs_target[pkg] 

846 

847 # The bts seems to support filing source bugs against a binary of the 

848 # same name if that binary isn't built by any source. An example is bug 

849 # 820347 against Package: juce (in the live-2016-04-11 test). Add those 

850 # bugs too. 

851 if ( 

852 source_name not in (binaries_s | binaries_t) 

853 and source_name 

854 not in { 

855 x.package_name 

856 for x in self.suite_info.primary_source_suite.all_binaries_in_suite.keys() 

857 } 

858 and source_name 

859 not in { 

860 x.package_name 

861 for x in self.suite_info.target_suite.all_binaries_in_suite.keys() 

862 } 

863 ): 

864 if source_name in self._bugs_source: 

865 bugs_s |= self._bugs_source[source_name] 

866 if source_name in self._bugs_target: 866 ↛ 867line 866 didn't jump to line 867 because the condition on line 866 was never true

867 bugs_t |= self._bugs_target[source_name] 

868 

869 # If a package is not in the target suite, it has no RC bugs per 

870 # definition. Unfortunately, it seems that the live-data is 

871 # not always accurate (e.g. live-2011-12-13 suggests that 

872 # obdgpslogger had the same bug in testing and unstable, 

873 # but obdgpslogger was not in testing at that time). 

874 # - For the curious, obdgpslogger was removed on that day 

875 # and the BTS probably had not caught up with that fact. 

876 # (https://tracker.debian.org/news/415935) 

877 assert not bugs_t or source_data_tdist, ( 

878 "%s had bugs in the target suite but is not present" % source_name 

879 ) 

880 

881 verdict = PolicyVerdict.PASS 

882 

883 assert self.hints is not None 

884 for ignore_hint in cast( 

885 list[IgnoreRCBugHint], 

886 self.hints.search( 

887 "ignore-rc-bugs", 

888 package=source_name, 

889 version=source_data_srcdist.version, 

890 ), 

891 ): 

892 ignored_bugs = ignore_hint.ignored_rcbugs 

893 

894 # Only handle one hint for now 

895 if "ignored-bugs" in rcbugs_info: 

896 self.logger.info( 

897 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s", 

898 ignore_hint.user, 

899 source_name, 

900 rcbugs_info["ignored-bugs"]["issued-by"], 

901 ) 

902 continue 

903 if not ignored_bugs.isdisjoint(bugs_s): 903 ↛ 912line 903 didn't jump to line 912 because the condition on line 903 was always true

904 bugs_s -= ignored_bugs 

905 bugs_t -= ignored_bugs 

906 rcbugs_info["ignored-bugs"] = { 

907 "bugs": sorted(ignored_bugs), 

908 "issued-by": ignore_hint.user, 

909 } 

910 verdict = PolicyVerdict.PASS_HINTED 

911 else: 

912 self.logger.info( 

913 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package", 

914 ignore_hint.user, 

915 source_name, 

916 str(ignored_bugs), 

917 ) 

918 

919 rcbugs_info["shared-bugs"] = sorted(bugs_s & bugs_t) 

920 rcbugs_info["unique-source-bugs"] = sorted(bugs_s - bugs_t) 

921 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_s) 

922 

923 # update excuse 

924 new_bugs = rcbugs_info["unique-source-bugs"] 

925 old_bugs = rcbugs_info["unique-target-bugs"] 

926 excuse.setbugs(old_bugs, new_bugs) 

927 

928 if new_bugs: 

929 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

930 excuse.add_verdict_info( 

931 verdict, 

932 "Updating %s would introduce bugs in %s: %s" 

933 % ( 

934 source_name, 

935 self.suite_info.target_suite.name, 

936 ", ".join( 

937 [ 

938 '<a href="https://bugs.debian.org/%s">#%s</a>' 

939 % (quote(a), a) 

940 for a in new_bugs 

941 ] 

942 ), 

943 ), 

944 ) 

945 

946 if old_bugs: 

947 excuse.addinfo( 

948 "Updating %s will fix bugs in %s: %s" 

949 % ( 

950 source_name, 

951 self.suite_info.target_suite.name, 

952 ", ".join( 

953 [ 

954 '<a href="https://bugs.debian.org/%s">#%s</a>' 

955 % (quote(a), a) 

956 for a in old_bugs 

957 ] 

958 ), 

959 ) 

960 ) 

961 

962 return verdict 

963 

964 def _read_bugs(self, filename: str) -> dict[str, set[str]]: 

965 """Read the release critical bug summary from the specified file 

966 

967 The file contains rows with the format: 

968 

969 <package-name> <bug number>[,<bug number>...] 

970 

971 The method returns a dictionary where the key is the binary package 

972 name and the value is the list of open RC bugs for it. 

973 """ 

974 bugs: dict[str, set[str]] = {} 

975 self.logger.info("Loading RC bugs data from %s", filename) 

976 with open(filename, encoding="ascii") as f: 

977 for line in f: 

978 ln = line.split() 

979 if len(ln) != 2: # pragma: no cover 

980 self.logger.warning("Malformed line found in line %s", line) 

981 continue 

982 pkg = ln[0] 

983 if pkg not in bugs: 

984 bugs[pkg] = set() 

985 bugs[pkg].update(ln[1].split(",")) 

986 return bugs 

987 

988 

989class PiupartsPolicy(AbstractBasePolicy): 

990 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

991 super().__init__( 

992 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

993 ) 

994 self._piuparts_source: dict[str, tuple[str, str]] | None = None 

995 self._piuparts_target: dict[str, tuple[str, str]] | None = None 

996 

997 def register_hints(self, hint_parser: HintParser) -> None: 

998 hint_parser.register_hint_type(HintType("ignore-piuparts")) 

999 

1000 def initialise(self, britney: "Britney") -> None: 

1001 super().initialise(britney) 

1002 source_suite = self.suite_info.primary_source_suite 

1003 target_suite = self.suite_info.target_suite 

1004 try: 

1005 filename_unstable = os.path.join( 

1006 self.state_dir, "piuparts-summary-%s.json" % source_suite.name 

1007 ) 

1008 filename_testing = os.path.join( 

1009 self.state_dir, "piuparts-summary-%s.json" % target_suite.name 

1010 ) 

1011 except AttributeError as e: # pragma: no cover 

1012 raise RuntimeError( 

1013 "Please set STATE_DIR in the britney configuration" 

1014 ) from e 

1015 self._piuparts_source = self._read_piuparts_summary( 

1016 filename_unstable, keep_url=True 

1017 ) 

1018 self._piuparts_target = self._read_piuparts_summary( 

1019 filename_testing, keep_url=False 

1020 ) 

1021 

1022 def apply_src_policy_impl( 

1023 self, 

1024 piuparts_info: dict[str, Any], 

1025 source_data_tdist: SourcePackage | None, 

1026 source_data_srcdist: SourcePackage, 

1027 excuse: "Excuse", 

1028 ) -> PolicyVerdict: 

1029 assert self._piuparts_source is not None # for type checking 

1030 assert self._piuparts_target is not None # for type checking 

1031 source_name = excuse.item.package 

1032 

1033 if source_name in self._piuparts_target: 

1034 testing_state = self._piuparts_target[source_name][0] 

1035 else: 

1036 testing_state = "X" 

1037 url: str | None 

1038 if source_name in self._piuparts_source: 

1039 unstable_state, url = self._piuparts_source[source_name] 

1040 else: 

1041 unstable_state = "X" 

1042 url = None 

1043 url_html = "(no link yet)" 

1044 if url is not None: 

1045 url_html = '<a href="{0}">{0}</a>'.format(url) 

1046 

1047 match unstable_state: 

1048 case "P": 

1049 # Not a regression 

1050 msg = f"Piuparts tested OK - {url_html}" 

1051 result = PolicyVerdict.PASS 

1052 piuparts_info["test-results"] = "pass" 

1053 case "F" if testing_state != "F": 

1054 piuparts_info["test-results"] = "regression" 

1055 msg = f"Piuparts regression - {url_html}" 

1056 result = PolicyVerdict.REJECTED_PERMANENTLY 

1057 case "F": 

1058 piuparts_info["test-results"] = "failed" 

1059 msg = f"Piuparts failure (not a regression) - {url_html}" 

1060 result = PolicyVerdict.PASS 

1061 case "W": 

1062 msg = f"Piuparts check waiting for test results - {url_html}" 

1063 result = PolicyVerdict.REJECTED_TEMPORARILY 

1064 piuparts_info["test-results"] = "waiting-for-test-results" 

1065 case _: 

1066 msg = f"Piuparts can't test {source_name} (not a blocker) - {url_html}" 

1067 piuparts_info["test-results"] = "cannot-be-tested" 

1068 result = PolicyVerdict.PASS 

1069 

1070 if url is not None: 

1071 piuparts_info["piuparts-test-url"] = url 

1072 if result.is_rejected: 

1073 excuse.add_verdict_info(result, msg) 

1074 else: 

1075 excuse.addinfo(msg) 

1076 

1077 if result.is_rejected: 

1078 assert self.hints is not None 

1079 for ignore_hint in self.hints.search( 

1080 "ignore-piuparts", 

1081 package=source_name, 

1082 version=source_data_srcdist.version, 

1083 ): 

1084 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user} 

1085 result = PolicyVerdict.PASS_HINTED 

1086 excuse.addinfo( 

1087 f"Piuparts issue ignored as requested by {ignore_hint.user}" 

1088 ) 

1089 break 

1090 

1091 return result 

1092 

1093 def _read_piuparts_summary( 

1094 self, filename: str, keep_url: bool = True 

1095 ) -> dict[str, tuple[str, str]]: 

1096 summary: dict[str, tuple[str, str]] = {} 

1097 self.logger.info("Loading piuparts report from %s", filename) 

1098 with open(filename) as fd: 1098 ↛ exitline 1098 didn't return from function '_read_piuparts_summary' because the return on line 1100 wasn't executed

1099 if os.fstat(fd.fileno()).st_size < 1: 1099 ↛ 1100line 1099 didn't jump to line 1100 because the condition on line 1099 was never true

1100 return summary 

1101 data = json.load(fd) 

1102 try: 

1103 if ( 

1104 data["_id"] != "Piuparts Package Test Results Summary" 

1105 or data["_version"] != "1.0" 

1106 ): # pragma: no cover 

1107 raise ValueError( 

1108 f"Piuparts results in {filename} does not have the correct ID or version" 

1109 ) 

1110 except KeyError as e: # pragma: no cover 

1111 raise ValueError( 

1112 f"Piuparts results in {filename} is missing id or version field" 

1113 ) from e 

1114 for source, suite_data in data["packages"].items(): 

1115 if len(suite_data) != 1: # pragma: no cover 

1116 raise ValueError( 

1117 f"Piuparts results in {filename}, the source {source} does not have " 

1118 "exactly one result set" 

1119 ) 

1120 item = next(iter(suite_data.values())) 

1121 state, _, url = item 

1122 if not keep_url: 

1123 url = None 

1124 summary[source] = (state, url) 

1125 

1126 return summary 

1127 

1128 

1129class DependsPolicy(AbstractBasePolicy): 

1130 pkg_universe: "BinaryPackageUniverse" 

1131 broken_packages: frozenset["BinaryPackageId"] 

1132 all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

1133 allow_uninst: dict[str, set[str | None]] 

1134 

1135 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1136 super().__init__( 

1137 "depends", 

1138 options, 

1139 suite_info, 

1140 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1141 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1142 ) 

1143 self.nobreakall_arches = None 

1144 self.new_arches = None 

1145 self.break_arches = None 

1146 

1147 def initialise(self, britney: "Britney") -> None: 

1148 super().initialise(britney) 

1149 self.pkg_universe = britney.pkg_universe 

1150 self.broken_packages = self.pkg_universe.broken_packages 

1151 self.all_binaries = britney.all_binaries 

1152 self.nobreakall_arches = self.options.nobreakall_arches 

1153 self.new_arches = self.options.new_arches 

1154 self.break_arches = self.options.break_arches 

1155 self.allow_uninst = britney.allow_uninst 

1156 

1157 def apply_srcarch_policy_impl( 

1158 self, 

1159 deps_info: dict[str, Any], 

1160 arch: str, 

1161 source_data_tdist: SourcePackage | None, 

1162 source_data_srcdist: SourcePackage, 

1163 excuse: "Excuse", 

1164 ) -> PolicyVerdict: 

1165 verdict = PolicyVerdict.PASS 

1166 

1167 assert self.break_arches is not None 

1168 assert self.new_arches is not None 

1169 if arch in self.break_arches or arch in self.new_arches: 

1170 # we don't check these in the policy (TODO - for now?) 

1171 return verdict 

1172 

1173 item = excuse.item 

1174 source_suite = item.suite 

1175 target_suite = self.suite_info.target_suite 

1176 

1177 packages_s_a = source_suite.binaries[arch] 

1178 packages_t_a = target_suite.binaries[arch] 

1179 

1180 my_bins = sorted(filter_out_faux(excuse.packages[arch])) 

1181 

1182 arch_all_installable = set() 

1183 arch_arch_installable = set() 

1184 consider_it_regression = True 

1185 

1186 for pkg_id in my_bins: 

1187 pkg_name = pkg_id.package_name 

1188 binary_u = packages_s_a[pkg_name] 

1189 pkg_arch = binary_u.architecture 

1190 

1191 # in some cases, we want to track the uninstallability of a 

1192 # package (because the autopkgtest policy uses this), but we still 

1193 # want to allow the package to be uninstallable 

1194 skip_dep_check = False 

1195 

1196 if binary_u.source_version != source_data_srcdist.version: 

1197 # don't check cruft in unstable 

1198 continue 

1199 

1200 if item.architecture != "source" and pkg_arch == "all": 

1201 # we don't care about the existing arch: all binaries when 

1202 # checking a binNMU item, because the arch: all binaries won't 

1203 # migrate anyway 

1204 skip_dep_check = True 

1205 

1206 if pkg_arch == "all" and arch not in self.nobreakall_arches: 

1207 skip_dep_check = True 

1208 

1209 if pkg_name in self.allow_uninst[arch]: 1209 ↛ 1212line 1209 didn't jump to line 1212 because the condition on line 1209 was never true

1210 # this binary is allowed to become uninstallable, so we don't 

1211 # need to check anything 

1212 skip_dep_check = True 

1213 

1214 if pkg_name in packages_t_a: 

1215 oldbin = packages_t_a[pkg_name] 

1216 if not target_suite.is_installable(oldbin.pkg_id): 

1217 # as the current binary in testing is already 

1218 # uninstallable, the newer version is allowed to be 

1219 # uninstallable as well, so we don't need to check 

1220 # anything 

1221 skip_dep_check = True 

1222 consider_it_regression = False 

1223 

1224 if pkg_id in self.broken_packages: 

1225 if pkg_arch == "all": 

1226 arch_all_installable.add(False) 

1227 else: 

1228 arch_arch_installable.add(False) 

1229 # dependencies can't be satisfied by all the known binaries - 

1230 # this certainly won't work... 

1231 excuse.add_unsatisfiable_on_arch(arch) 

1232 if skip_dep_check: 

1233 # ...but if the binary is allowed to become uninstallable, 

1234 # we don't care 

1235 # we still want the binary to be listed as uninstallable, 

1236 continue 

1237 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

1238 if pkg_name.endswith("-faux-build-depends"): 1238 ↛ 1239line 1238 didn't jump to line 1239 because the condition on line 1238 was never true

1239 name = pkg_name.replace("-faux-build-depends", "") 

1240 excuse.add_verdict_info( 

1241 verdict, 

1242 f"src:{name} has unsatisfiable build dependency", 

1243 ) 

1244 else: 

1245 excuse.add_verdict_info( 

1246 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency" 

1247 ) 

1248 excuse.addreason("depends") 

1249 else: 

1250 if pkg_arch == "all": 

1251 arch_all_installable.add(True) 

1252 else: 

1253 arch_arch_installable.add(True) 

1254 

1255 if skip_dep_check: 

1256 continue 

1257 

1258 deps = self.pkg_universe.dependencies_of(pkg_id) 

1259 

1260 for dep in deps: 

1261 # dep is a list of packages, each of which satisfy the 

1262 # dependency 

1263 

1264 if dep == frozenset(): 

1265 continue 

1266 is_ok = False 

1267 needed_for_dep = set() 

1268 

1269 for alternative in dep: 

1270 if target_suite.is_pkg_in_the_suite(alternative): 

1271 # dep can be satisfied in testing - ok 

1272 is_ok = True 

1273 elif alternative in my_bins: 

1274 # can be satisfied by binary from same item: will be 

1275 # ok if item migrates 

1276 is_ok = True 

1277 else: 

1278 needed_for_dep.add(alternative) 

1279 

1280 if not is_ok: 

1281 spec = DependencySpec(DependencyType.DEPENDS, arch) 

1282 excuse.add_package_depends(spec, needed_for_dep) 

1283 

1284 # The autopkgtest policy needs delicate trade offs for 

1285 # non-installability. The current choice (considering source 

1286 # migration and only binaries built by the version of the 

1287 # source): 

1288 # 

1289 # * Run autopkgtest if all arch:$arch binaries are installable 

1290 # (but some or all arch:all binaries are not) 

1291 # 

1292 # * Don't schedule nor wait for not installable arch:all only package 

1293 # on ! NOBREAKALL_ARCHES 

1294 # 

1295 # * Run autopkgtest if installability isn't a regression (there are (or 

1296 # rather, should) not be a lot of packages in this state, and most 

1297 # likely they'll just fail quickly) 

1298 # 

1299 # * Don't schedule, but wait otherwise 

1300 if arch_arch_installable == {True} and False in arch_all_installable: 

1301 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1302 elif ( 

1303 arch not in self.nobreakall_arches 

1304 and arch_arch_installable == set() 

1305 and False in arch_all_installable 

1306 ): 

1307 deps_info.setdefault("arch_all_not_installable", []).append(arch) 

1308 elif not consider_it_regression: 

1309 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch) 

1310 

1311 return verdict 

1312 

1313 

1314@unique 

1315class BuildDepResult(IntEnum): 

1316 # relation is satisfied in target 

1317 OK = 1 

1318 # relation can be satisfied by other packages in source 

1319 DEPENDS = 2 

1320 # relation cannot be satisfied 

1321 FAILED = 3 

1322 

1323 

1324class BuildDependsPolicy(AbstractBasePolicy): 

1325 

1326 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1327 super().__init__( 

1328 "build-depends", 

1329 options, 

1330 suite_info, 

1331 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1332 ) 

1333 self._all_buildarch: list[str] = [] 

1334 

1335 parse_option(options, "all_buildarch") 

1336 

1337 def initialise(self, britney: "Britney") -> None: 

1338 super().initialise(britney) 

1339 if self.options.all_buildarch: 

1340 self._all_buildarch = SuiteContentLoader.config_str_as_list( 

1341 self.options.all_buildarch, [] 

1342 ) 

1343 

1344 def apply_src_policy_impl( 

1345 self, 

1346 build_deps_info: dict[str, Any], 

1347 source_data_tdist: SourcePackage | None, 

1348 source_data_srcdist: SourcePackage, 

1349 excuse: "Excuse", 

1350 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1351 ) -> PolicyVerdict: 

1352 verdict = PolicyVerdict.PASS 

1353 

1354 # analyze the dependency fields (if present) 

1355 if deps := source_data_srcdist.build_deps_arch: 

1356 v = self._check_build_deps( 

1357 deps, 

1358 DependencyType.BUILD_DEPENDS, 

1359 build_deps_info, 

1360 source_data_tdist, 

1361 source_data_srcdist, 

1362 excuse, 

1363 get_dependency_solvers=get_dependency_solvers, 

1364 ) 

1365 verdict = PolicyVerdict.worst_of(verdict, v) 

1366 

1367 if ideps := source_data_srcdist.build_deps_indep: 

1368 v = self._check_build_deps( 

1369 ideps, 

1370 DependencyType.BUILD_DEPENDS_INDEP, 

1371 build_deps_info, 

1372 source_data_tdist, 

1373 source_data_srcdist, 

1374 excuse, 

1375 get_dependency_solvers=get_dependency_solvers, 

1376 ) 

1377 verdict = PolicyVerdict.worst_of(verdict, v) 

1378 

1379 return verdict 

1380 

1381 def _get_check_archs( 

1382 self, archs: Container[str], dep_type: DependencyType 

1383 ) -> list[str]: 

1384 oos = self.options.outofsync_arches 

1385 

1386 if dep_type == DependencyType.BUILD_DEPENDS: 

1387 return [ 

1388 arch 

1389 for arch in self.options.architectures 

1390 if arch in archs and arch not in oos 

1391 ] 

1392 

1393 # first try the all buildarch 

1394 checkarchs = list(self._all_buildarch) 

1395 # then try the architectures where this source has arch specific 

1396 # binaries (in the order of the architecture config file) 

1397 checkarchs.extend( 

1398 arch 

1399 for arch in self.options.architectures 

1400 if arch in archs and arch not in checkarchs 

1401 ) 

1402 # then try all other architectures 

1403 checkarchs.extend( 

1404 arch for arch in self.options.architectures if arch not in checkarchs 

1405 ) 

1406 

1407 # and drop OUTOFSYNC_ARCHES 

1408 return [arch for arch in checkarchs if arch not in oos] 

1409 

1410 def _add_info_for_arch( 

1411 self, 

1412 arch: str, 

1413 excuses_info: dict[str, list[str]], 

1414 blockers: dict[str, set[BinaryPackageId]], 

1415 results: dict[str, BuildDepResult], 

1416 dep_type: DependencyType, 

1417 target_suite: TargetSuite, 

1418 source_suite: Suite, 

1419 excuse: "Excuse", 

1420 verdict: PolicyVerdict, 

1421 ) -> PolicyVerdict: 

1422 if arch in blockers: 

1423 packages = blockers[arch] 

1424 

1425 # for the solving packages, update the excuse to add the dependencies 

1426 for p in packages: 

1427 if arch not in self.options.break_arches: 1427 ↛ 1426line 1427 didn't jump to line 1426 because the condition on line 1427 was always true

1428 spec = DependencySpec(dep_type, arch) 

1429 excuse.add_package_depends(spec, {p}) 

1430 

1431 if arch in results and results[arch] == BuildDepResult.FAILED: 

1432 verdict = PolicyVerdict.worst_of( 

1433 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1434 ) 

1435 

1436 if arch in excuses_info: 

1437 for excuse_text in excuses_info[arch]: 

1438 if verdict.is_rejected: 1438 ↛ 1441line 1438 didn't jump to line 1441 because the condition on line 1438 was always true

1439 excuse.add_verdict_info(verdict, excuse_text) 

1440 else: 

1441 excuse.addinfo(excuse_text) 

1442 

1443 return verdict 

1444 

1445 def _check_build_deps( 

1446 self, 

1447 deps: str, 

1448 dep_type: DependencyType, 

1449 build_deps_info: dict[str, Any], 

1450 source_data_tdist: SourcePackage | None, 

1451 source_data_srcdist: SourcePackage, 

1452 excuse: "Excuse", 

1453 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers, 

1454 ) -> PolicyVerdict: 

1455 verdict = PolicyVerdict.PASS 

1456 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP 

1457 

1458 britney = self.britney 

1459 

1460 # local copies for better performance 

1461 parse_src_depends = apt_pkg.parse_src_depends 

1462 

1463 source_name = excuse.item.package 

1464 source_suite = excuse.item.suite 

1465 target_suite = self.suite_info.target_suite 

1466 binaries_s = source_suite.binaries 

1467 provides_s = source_suite.provides_table 

1468 binaries_t = target_suite.binaries 

1469 provides_t = target_suite.provides_table 

1470 unsat_bd: dict[str, list[str]] = {} 

1471 relevant_archs: set[str] = { 

1472 binary.architecture 

1473 for binary in filter_out_faux(source_data_srcdist.binaries) 

1474 if britney.all_binaries[binary].architecture != "all" 

1475 } 

1476 

1477 excuses_info: dict[str, list[str]] = defaultdict(list) 

1478 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set) 

1479 arch_results = {} 

1480 result_archs = defaultdict(list) 

1481 bestresult = BuildDepResult.FAILED 

1482 check_archs = self._get_check_archs(relevant_archs, dep_type) 

1483 if not check_archs: 

1484 # when the arch list is empty, we check the b-d on any arch, instead of all archs 

1485 # this happens for Build-Depens on a source package that only produces arch: all binaries 

1486 any_arch_ok = True 

1487 check_archs = self._get_check_archs( 

1488 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP 

1489 ) 

1490 

1491 for arch in check_archs: 

1492 # retrieve the binary package from the specified suite and arch 

1493 binaries_s_a = binaries_s[arch] 

1494 provides_s_a = provides_s[arch] 

1495 binaries_t_a = binaries_t[arch] 

1496 provides_t_a = provides_t[arch] 

1497 arch_results[arch] = BuildDepResult.OK 

1498 # for every dependency block (formed as conjunction of disjunction) 

1499 for block_txt in deps.split(","): 

1500 block_list = parse_src_depends(block_txt, False, arch) 

1501 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be 

1502 # filtered out by (e.g.) architecture restrictions. We need to cope with this while 

1503 # keeping block_txt and block aligned. 

1504 if not block_list: 

1505 # Relation is not relevant for this architecture. 

1506 continue 

1507 block = block_list[0] 

1508 # if the block is satisfied in the target suite, then skip the block 

1509 if get_dependency_solvers( 

1510 block, binaries_t_a, provides_t_a, build_depends=True 

1511 ): 

1512 # Satisfied in the target suite; all ok. 

1513 continue 

1514 

1515 # check if the block can be satisfied in the source suite, and list the solving packages 

1516 packages = get_dependency_solvers( 

1517 block, binaries_s_a, provides_s_a, build_depends=True 

1518 ) 

1519 sources = sorted(p.source for p in packages) 

1520 

1521 # if the dependency can be satisfied by the same source package, skip the block: 

1522 # obviously both binary packages will enter the target suite together 

1523 if source_name in sources: 1523 ↛ 1524line 1523 didn't jump to line 1524 because the condition on line 1523 was never true

1524 continue 

1525 

1526 # if no package can satisfy the dependency, add this information to the excuse 

1527 if not packages: 

1528 excuses_info[arch].append( 

1529 "%s unsatisfiable %s on %s: %s" 

1530 % (source_name, dep_type, arch, block_txt.strip()) 

1531 ) 

1532 if arch not in unsat_bd: 1532 ↛ 1534line 1532 didn't jump to line 1534 because the condition on line 1532 was always true

1533 unsat_bd[arch] = [] 

1534 unsat_bd[arch].append(block_txt.strip()) 

1535 arch_results[arch] = BuildDepResult.FAILED 

1536 continue 

1537 

1538 blockers[arch].update(p.pkg_id for p in packages) 

1539 if arch_results[arch] < BuildDepResult.DEPENDS: 

1540 arch_results[arch] = BuildDepResult.DEPENDS 

1541 

1542 if any_arch_ok: 

1543 if arch_results[arch] < bestresult: 

1544 bestresult = arch_results[arch] 

1545 result_archs[arch_results[arch]].append(arch) 

1546 if bestresult == BuildDepResult.OK: 

1547 # we found an architecture where the b-deps-indep are 

1548 # satisfied in the target suite, so we can stop 

1549 break 

1550 

1551 if any_arch_ok: 

1552 arch = result_archs[bestresult][0] 

1553 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}") 

1554 key = "check-%s-on-arch" % dep_type.get_reason() 

1555 build_deps_info[key] = arch 

1556 verdict = self._add_info_for_arch( 

1557 arch, 

1558 excuses_info, 

1559 blockers, 

1560 arch_results, 

1561 dep_type, 

1562 target_suite, 

1563 source_suite, 

1564 excuse, 

1565 verdict, 

1566 ) 

1567 

1568 else: 

1569 for arch in check_archs: 

1570 verdict = self._add_info_for_arch( 

1571 arch, 

1572 excuses_info, 

1573 blockers, 

1574 arch_results, 

1575 dep_type, 

1576 target_suite, 

1577 source_suite, 

1578 excuse, 

1579 verdict, 

1580 ) 

1581 

1582 if unsat_bd: 

1583 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd 

1584 

1585 return verdict 

1586 

1587 

1588class BuiltUsingPolicy(AbstractBasePolicy): 

1589 """Built-Using policy 

1590 

1591 Binaries that incorporate (part of) another source package must list these 

1592 sources under 'Built-Using'. 

1593 

1594 This policy checks if the corresponding sources are available in the 

1595 target suite. If they are not, but they are candidates for migration, a 

1596 dependency is added. 

1597 

1598 If the binary incorporates a newer version of a source, that is not (yet) 

1599 a candidate, we don't want to accept that binary. A rebuild later in the 

1600 primary suite wouldn't fix the issue, because that would incorporate the 

1601 newer version again. 

1602 

1603 If the binary incorporates an older version of the source, a newer version 

1604 will be accepted as a replacement. We assume that this can be fixed by 

1605 rebuilding the binary at some point during the development cycle. 

1606 

1607 Requiring exact version of the source would not be useful in practice. A 

1608 newer upload of that source wouldn't be blocked by this policy, so the 

1609 built-using would be outdated anyway. 

1610 

1611 """ 

1612 

1613 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1614 super().__init__( 

1615 "built-using", 

1616 options, 

1617 suite_info, 

1618 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1619 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1620 ) 

1621 

1622 def initialise(self, britney: "Britney") -> None: 

1623 super().initialise(britney) 

1624 

1625 def apply_srcarch_policy_impl( 

1626 self, 

1627 build_deps_info: dict[str, Any], 

1628 arch: str, 

1629 source_data_tdist: SourcePackage | None, 

1630 source_data_srcdist: SourcePackage, 

1631 excuse: "Excuse", 

1632 ) -> PolicyVerdict: 

1633 verdict = PolicyVerdict.PASS 

1634 

1635 source_suite = excuse.item.suite 

1636 target_suite = self.suite_info.target_suite 

1637 binaries_s = source_suite.binaries 

1638 

1639 def check_bu_in_suite( 

1640 bu_source: str, bu_version: str, source_suite: Suite 

1641 ) -> bool: 

1642 found = False 

1643 if bu_source not in source_suite.sources: 

1644 return found 

1645 s_source = source_suite.sources[bu_source] 

1646 s_ver = s_source.version 

1647 if apt_pkg.version_compare(s_ver, bu_version) >= 0: 

1648 found = True 

1649 dep = PackageId(bu_source, s_ver, "source") 

1650 if arch in self.options.break_arches: 

1651 excuse.add_detailed_info( 

1652 "Ignoring Built-Using for %s/%s on %s" 

1653 % (pkg_name, arch, dep.uvname) 

1654 ) 

1655 else: 

1656 spec = DependencySpec(DependencyType.BUILT_USING, arch) 

1657 excuse.add_package_depends(spec, {dep}) 

1658 excuse.add_detailed_info( 

1659 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}" 

1660 ) 

1661 

1662 return found 

1663 

1664 for pkg_id in sorted( 

1665 x 

1666 for x in filter_out_faux(source_data_srcdist.binaries) 

1667 if x.architecture == arch 

1668 ): 

1669 pkg_name = pkg_id.package_name 

1670 

1671 # retrieve the testing (if present) and unstable corresponding binary packages 

1672 binary_s = binaries_s[arch][pkg_name] 

1673 

1674 for bu in binary_s.builtusing: 

1675 bu_source = bu[0] 

1676 bu_version = bu[1] 

1677 found = False 

1678 if bu_source in target_suite.sources: 

1679 t_source = target_suite.sources[bu_source] 

1680 t_ver = t_source.version 

1681 if apt_pkg.version_compare(t_ver, bu_version) >= 0: 

1682 found = True 

1683 

1684 if not found: 

1685 found = check_bu_in_suite(bu_source, bu_version, source_suite) 

1686 

1687 if not found and source_suite.suite_class.is_additional_source: 

1688 found = check_bu_in_suite( 

1689 bu_source, bu_version, self.suite_info.primary_source_suite 

1690 ) 

1691 

1692 if not found: 

1693 if arch in self.options.break_arches: 

1694 excuse.add_detailed_info( 

1695 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s" 

1696 % (pkg_name, arch, bu_source, bu_version) 

1697 ) 

1698 else: 

1699 verdict = PolicyVerdict.worst_of( 

1700 verdict, PolicyVerdict.REJECTED_PERMANENTLY 

1701 ) 

1702 excuse.add_verdict_info( 

1703 verdict, 

1704 "%s/%s has unsatisfiable Built-Using on %s %s" 

1705 % (pkg_name, arch, bu_source, bu_version), 

1706 ) 

1707 

1708 return verdict 

1709 

1710 

1711class BlockPolicy(AbstractBasePolicy): 

1712 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$") 

1713 

1714 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1715 super().__init__( 

1716 "block", 

1717 options, 

1718 suite_info, 

1719 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1720 ) 

1721 self._blockall: dict[str | None, Hint] = {} 

1722 

1723 def initialise(self, britney: "Britney") -> None: 

1724 super().initialise(britney) 

1725 assert self.hints is not None 

1726 for hint in self.hints.search(type="block-all"): 

1727 self._blockall[hint.package] = hint 

1728 

1729 self._key_packages = [] 

1730 if "key" in self._blockall: 

1731 self._key_packages = self._read_key_packages() 

1732 

1733 def _read_key_packages(self) -> list[str]: 

1734 """Read the list of key packages 

1735 

1736 The file contains data in the yaml format : 

1737 

1738 - reason: <something> 

1739 source: <package> 

1740 

1741 The method returns a list of all key packages. 

1742 """ 

1743 filename = os.path.join(self.state_dir, "key_packages.yaml") 

1744 self.logger.info("Loading key packages from %s", filename) 

1745 if os.path.exists(filename): 1745 ↛ 1750line 1745 didn't jump to line 1750 because the condition on line 1745 was always true

1746 with open(filename) as f: 

1747 data = yaml.safe_load(f) 

1748 key_packages = [item["source"] for item in data] 

1749 else: 

1750 self.logger.error( 

1751 "Britney was asked to block key packages, " 

1752 + "but no key_packages.yaml file was found." 

1753 ) 

1754 sys.exit(1) 

1755 

1756 return key_packages 

1757 

1758 def register_hints(self, hint_parser: HintParser) -> None: 

1759 # block related hints are currently defined in hint.py 

1760 pass 

1761 

1762 def _check_blocked( 

1763 self, arch: str, version: str, excuse: "Excuse" 

1764 ) -> PolicyVerdict: 

1765 verdict = PolicyVerdict.PASS 

1766 blocked = {} 

1767 unblocked = {} 

1768 block_info = {} 

1769 source_suite = excuse.item.suite 

1770 suite_name = source_suite.name 

1771 src = excuse.item.package 

1772 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE 

1773 

1774 tooltip = ( 

1775 "please contact %s-release if update is needed" % self.options.distribution 

1776 ) 

1777 

1778 assert self.hints is not None 

1779 shints = self.hints.search(package=src) 

1780 mismatches = False 

1781 r = self.BLOCK_HINT_REGEX 

1782 for hint in shints: 

1783 m = r.match(hint.type) 

1784 if m: 

1785 if m.group(1) == "un": 

1786 assert hint.suite is not None 

1787 if ( 

1788 hint.version != version 

1789 or hint.suite.name != suite_name 

1790 or (hint.architecture != arch and hint.architecture != "source") 

1791 ): 

1792 self.logger.info( 

1793 "hint mismatch: %s %s %s", version, arch, suite_name 

1794 ) 

1795 mismatches = True 

1796 else: 

1797 unblocked[m.group(2)] = hint.user 

1798 excuse.add_hint(hint) 

1799 else: 

1800 # block(-*) hint: only accepts a source, so this will 

1801 # always match 

1802 blocked[m.group(2)] = hint.user 

1803 excuse.add_hint(hint) 

1804 

1805 if "block" not in blocked and is_primary: 

1806 # if there is a specific block hint for this package, we don't 

1807 # check for the general hints 

1808 

1809 if self.options.distribution == "debian": 1809 ↛ 1816line 1809 didn't jump to line 1816 because the condition on line 1809 was always true

1810 url = "https://release.debian.org/testing/freeze_policy.html" 

1811 tooltip = ( 

1812 'Follow the <a href="%s">freeze policy</a> when applying for an unblock' 

1813 % url 

1814 ) 

1815 

1816 if "source" in self._blockall: 

1817 blocked["block"] = self._blockall["source"].user 

1818 excuse.add_hint(self._blockall["source"]) 

1819 elif ( 

1820 "new-source" in self._blockall 

1821 and src not in self.suite_info.target_suite.sources 

1822 ): 

1823 blocked["block"] = self._blockall["new-source"].user 

1824 excuse.add_hint(self._blockall["new-source"]) 

1825 # no tooltip: new sources will probably not be accepted anyway 

1826 block_info["block"] = "blocked by {}: is not in {}".format( 

1827 self._blockall["new-source"].user, 

1828 self.suite_info.target_suite.name, 

1829 ) 

1830 elif "key" in self._blockall and src in self._key_packages: 

1831 blocked["block"] = self._blockall["key"].user 

1832 excuse.add_hint(self._blockall["key"]) 

1833 block_info["block"] = "blocked by {}: is a key package ({})".format( 

1834 self._blockall["key"].user, 

1835 tooltip, 

1836 ) 

1837 elif "no-autopkgtest" in self._blockall: 

1838 if excuse.autopkgtest_results == {"PASS"}: 

1839 if not blocked: 1839 ↛ 1865line 1839 didn't jump to line 1865 because the condition on line 1839 was always true

1840 excuse.addinfo("not blocked: has successful autopkgtest") 

1841 else: 

1842 blocked["block"] = self._blockall["no-autopkgtest"].user 

1843 excuse.add_hint(self._blockall["no-autopkgtest"]) 

1844 if not excuse.autopkgtest_results: 

1845 block_info["block"] = ( 

1846 "blocked by %s: does not have autopkgtest (%s)" 

1847 % ( 

1848 self._blockall["no-autopkgtest"].user, 

1849 tooltip, 

1850 ) 

1851 ) 

1852 else: 

1853 block_info["block"] = ( 

1854 "blocked by %s: autopkgtest not fully successful (%s)" 

1855 % ( 

1856 self._blockall["no-autopkgtest"].user, 

1857 tooltip, 

1858 ) 

1859 ) 

1860 

1861 elif not is_primary: 

1862 blocked["block"] = suite_name 

1863 excuse.needs_approval = True 

1864 

1865 for block_cmd in blocked: 

1866 unblock_cmd = "un" + block_cmd 

1867 if block_cmd in unblocked: 

1868 if is_primary or block_cmd == "block-udeb": 

1869 excuse.addinfo( 

1870 "Ignoring %s request by %s, due to %s request by %s" 

1871 % ( 

1872 block_cmd, 

1873 blocked[block_cmd], 

1874 unblock_cmd, 

1875 unblocked[block_cmd], 

1876 ) 

1877 ) 

1878 else: 

1879 excuse.addinfo("Approved by %s" % (unblocked[block_cmd])) 

1880 else: 

1881 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL 

1882 if is_primary or block_cmd == "block-udeb": 

1883 # redirect people to d-i RM for udeb things: 

1884 if block_cmd == "block-udeb": 

1885 tooltip = "please contact the d-i release manager if an update is needed" 

1886 if block_cmd in block_info: 

1887 info = block_info[block_cmd] 

1888 else: 

1889 info = ( 

1890 "Not touching package due to {} request by {} ({})".format( 

1891 block_cmd, 

1892 blocked[block_cmd], 

1893 tooltip, 

1894 ) 

1895 ) 

1896 excuse.add_verdict_info(verdict, info) 

1897 else: 

1898 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM") 

1899 excuse.addreason("block") 

1900 if mismatches: 

1901 excuse.add_detailed_info( 

1902 "Some hints for %s do not match this item" % src 

1903 ) 

1904 return verdict 

1905 

1906 def apply_src_policy_impl( 

1907 self, 

1908 block_info: dict[str, Any], 

1909 source_data_tdist: SourcePackage | None, 

1910 source_data_srcdist: SourcePackage, 

1911 excuse: "Excuse", 

1912 ) -> PolicyVerdict: 

1913 return self._check_blocked("source", source_data_srcdist.version, excuse) 

1914 

1915 def apply_srcarch_policy_impl( 

1916 self, 

1917 block_info: dict[str, Any], 

1918 arch: str, 

1919 source_data_tdist: SourcePackage | None, 

1920 source_data_srcdist: SourcePackage, 

1921 excuse: "Excuse", 

1922 ) -> PolicyVerdict: 

1923 return self._check_blocked(arch, source_data_srcdist.version, excuse) 

1924 

1925 

1926class BuiltOnBuilddPolicy(AbstractBasePolicy): 

1927 

1928 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

1929 super().__init__( 

1930 "builtonbuildd", 

1931 options, 

1932 suite_info, 

1933 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

1934 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

1935 ) 

1936 self._builtonbuildd: dict[str, Any] = { 

1937 "signerinfo": None, 

1938 } 

1939 

1940 def register_hints(self, hint_parser: HintParser) -> None: 

1941 hint_parser.register_hint_type( 

1942 HintType( 

1943 "allow-archall-maintainer-upload", 

1944 versioned=HintAnnotate.FORBIDDEN, 

1945 ) 

1946 ) 

1947 

1948 def initialise(self, britney: "Britney") -> None: 

1949 super().initialise(britney) 

1950 try: 

1951 filename_signerinfo = os.path.join(self.state_dir, "signers.json") 

1952 except AttributeError as e: # pragma: no cover 

1953 raise RuntimeError( 

1954 "Please set STATE_DIR in the britney configuration" 

1955 ) from e 

1956 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo) 

1957 

1958 def apply_srcarch_policy_impl( 

1959 self, 

1960 buildd_info: dict[str, Any], 

1961 arch: str, 

1962 source_data_tdist: SourcePackage | None, 

1963 source_data_srcdist: SourcePackage, 

1964 excuse: "Excuse", 

1965 ) -> PolicyVerdict: 

1966 verdict = PolicyVerdict.PASS 

1967 signers = self._builtonbuildd["signerinfo"] 

1968 

1969 if "signed-by" not in buildd_info: 

1970 buildd_info["signed-by"] = {} 

1971 

1972 item = excuse.item 

1973 source_suite = item.suite 

1974 

1975 # horrible hard-coding, but currently, we don't keep track of the 

1976 # component when loading the packages files 

1977 component = "main" 

1978 # we use the source component, because a binary in contrib can 

1979 # belong to a source in main 

1980 section = source_data_srcdist.section 

1981 if section.find("/") > -1: 

1982 component = section.split("/")[0] 

1983 

1984 packages_s_a = source_suite.binaries[arch] 

1985 assert self.hints is not None 

1986 

1987 for pkg_id in sorted( 

1988 x 

1989 for x in filter_out_faux(source_data_srcdist.binaries) 

1990 if x.architecture == arch 

1991 ): 

1992 pkg_name = pkg_id.package_name 

1993 binary_u = packages_s_a[pkg_name] 

1994 pkg_arch = binary_u.architecture 

1995 

1996 if binary_u.source_version != source_data_srcdist.version: 1996 ↛ 1997line 1996 didn't jump to line 1997 because the condition on line 1996 was never true

1997 continue 

1998 

1999 if item.architecture != "source" and pkg_arch == "all": 

2000 # we don't care about the existing arch: all binaries when 

2001 # checking a binNMU item, because the arch: all binaries won't 

2002 # migrate anyway 

2003 continue 

2004 

2005 signer = None 

2006 uid = None 

2007 uidinfo = "" 

2008 buildd_ok = False 

2009 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2010 try: 

2011 signer = signers[pkg_name][pkg_id.version][pkg_arch] 

2012 if signer["buildd"]: 

2013 buildd_ok = True 

2014 uid = signer["uid"] 

2015 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}" 

2016 except KeyError: 

2017 self.logger.info( 

2018 "signer info for %s %s (%s) on %s not found " 

2019 % (pkg_name, binary_u.version, pkg_arch, arch) 

2020 ) 

2021 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch) 

2022 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

2023 if not buildd_ok: 

2024 if component != "main": 

2025 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2025 ↛ 2029line 2025 didn't jump to line 2029 because the condition on line 2025 was always true

2026 excuse.add_detailed_info( 

2027 f"{uidinfo}, but package in {component}" 

2028 ) 

2029 buildd_ok = True 

2030 elif pkg_arch == "all": 

2031 allow_hints = self.hints.search( 

2032 "allow-archall-maintainer-upload", package=item.package 

2033 ) 

2034 if allow_hints: 

2035 buildd_ok = True 

2036 verdict = PolicyVerdict.worst_of( 

2037 verdict, PolicyVerdict.PASS_HINTED 

2038 ) 

2039 if pkg_arch not in buildd_info["signed-by"]: 

2040 excuse.addinfo( 

2041 "%s, but whitelisted by %s" 

2042 % (uidinfo, allow_hints[0].user) 

2043 ) 

2044 if not buildd_ok: 

2045 verdict = failure_verdict 

2046 if pkg_arch not in buildd_info["signed-by"]: 

2047 if pkg_arch == "all": 

2048 uidinfo += ( 

2049 ", a new source-only upload is needed to allow migration" 

2050 ) 

2051 excuse.add_verdict_info( 

2052 verdict, "Not built on buildd: %s" % (uidinfo) 

2053 ) 

2054 

2055 if ( 2055 ↛ 2059line 2055 didn't jump to line 2059

2056 pkg_arch in buildd_info["signed-by"] 

2057 and buildd_info["signed-by"][pkg_arch] != uid 

2058 ): 

2059 self.logger.info( 

2060 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed" 

2061 % ( 

2062 pkg_name, 

2063 binary_u.source, 

2064 binary_u.source_version, 

2065 pkg_arch, 

2066 uid, 

2067 buildd_info["signed-by"][pkg_arch], 

2068 ) 

2069 ) 

2070 

2071 buildd_info["signed-by"][pkg_arch] = uid 

2072 

2073 return verdict 

2074 

2075 def _read_signerinfo(self, filename: str) -> dict[str, Any]: 

2076 signerinfo: dict[str, Any] = {} 

2077 self.logger.info("Loading signer info from %s", filename) 

2078 with open(filename) as fd: 2078 ↛ exitline 2078 didn't return from function '_read_signerinfo' because the return on line 2080 wasn't executed

2079 if os.fstat(fd.fileno()).st_size < 1: 2079 ↛ 2080line 2079 didn't jump to line 2080 because the condition on line 2079 was never true

2080 return signerinfo 

2081 signerinfo = json.load(fd) 

2082 

2083 return signerinfo 

2084 

2085 

2086class ImplicitDependencyPolicy(AbstractBasePolicy): 

2087 """Implicit Dependency policy 

2088 

2089 Upgrading a package pkg-a can break the installability of a package pkg-b. 

2090 A newer version (or the removal) of pkg-b might fix the issue. In that 

2091 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only 

2092 migrate if pkg-b also migrates. 

2093 

2094 This policy tries to discover a few common cases, and adds the relevant 

2095 info to the excuses. If another item is needed to fix the 

2096 uninstallability, a dependency is added. If no newer item can fix it, this 

2097 excuse will be blocked. 

2098 

2099 Note that the migration step will check the installability of every 

2100 package, so this policy doesn't need to handle every corner case. It 

2101 must, however, make sure that no excuse is unnecessarily blocked. 

2102 

2103 Some cases that should be detected by this policy: 

2104 

2105 * pkg-a is upgraded from 1.0-1 to 2.0-1, while 

2106 pkg-b has "Depends: pkg-a (<< 2.0)" 

2107 This typically happens if pkg-b has a strict dependency on pkg-a because 

2108 it uses some non-stable internal interface (examples are glibc, 

2109 binutils, python3-defaults, ...) 

2110 

2111 * pkg-a is upgraded from 1.0-1 to 2.0-1, and 

2112 pkg-a 1.0-1 has "Provides: provides-1", 

2113 pkg-a 2.0-1 has "Provides: provides-2", 

2114 pkg-b has "Depends: provides-1" 

2115 This typically happens when pkg-a has an interface that changes between 

2116 versions, and a virtual package is used to identify the version of this 

2117 interface (e.g. perl-api-x.y) 

2118 

2119 """ 

2120 

2121 _pkg_universe: "BinaryPackageUniverse" 

2122 _all_binaries: dict["BinaryPackageId", "BinaryPackage"] 

2123 _allow_uninst: dict[str, set[str | None]] 

2124 _nobreakall_arches: list[str] 

2125 

2126 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2127 super().__init__( 

2128 "implicit-deps", 

2129 options, 

2130 suite_info, 

2131 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2132 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2133 ) 

2134 

2135 def initialise(self, britney: "Britney") -> None: 

2136 super().initialise(britney) 

2137 self._pkg_universe = britney.pkg_universe 

2138 self._all_binaries = britney.all_binaries 

2139 self._smooth_updates = britney.options.smooth_updates 

2140 self._nobreakall_arches = self.options.nobreakall_arches 

2141 self._new_arches = self.options.new_arches 

2142 self._break_arches = self.options.break_arches 

2143 self._allow_uninst = britney.allow_uninst 

2144 self._outofsync_arches = self.options.outofsync_arches 

2145 

2146 def can_be_removed(self, pkg: BinaryPackage) -> bool: 

2147 src = pkg.source 

2148 target_suite = self.suite_info.target_suite 

2149 

2150 # TODO these conditions shouldn't be hardcoded here 

2151 # ideally, we would be able to look up excuses to see if the removal 

2152 # is in there, but in the current flow, this policy is called before 

2153 # all possible excuses exist, so there is no list for us to check 

2154 

2155 if src not in self.suite_info.primary_source_suite.sources: 

2156 # source for pkg not in unstable: candidate for removal 

2157 return True 

2158 

2159 source_t = target_suite.sources[src] 

2160 assert self.hints is not None 

2161 for hint in self.hints.search("remove", package=src, version=source_t.version): 

2162 # removal hint for the source in testing: candidate for removal 

2163 return True 

2164 

2165 if target_suite.is_cruft(pkg): 

2166 # if pkg is cruft in testing, removal will be tried 

2167 return True 

2168 

2169 # the case were the newer version of the source no longer includes the 

2170 # binary (or includes a cruft version of the binary) will be handled 

2171 # separately (in that case there might be an implicit dependency on 

2172 # the newer source) 

2173 

2174 return False 

2175 

2176 def should_skip_rdep( 

2177 self, pkg: BinaryPackage, source_name: str, myarch: str 

2178 ) -> bool: 

2179 target_suite = self.suite_info.target_suite 

2180 

2181 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id): 

2182 # it is not in the target suite, migration cannot break anything 

2183 return True 

2184 

2185 if pkg.source == source_name: 

2186 # if it is built from the same source, it will be upgraded 

2187 # with the source 

2188 return True 

2189 

2190 if self.can_be_removed(pkg): 

2191 # could potentially be removed, so if that happens, it won't be 

2192 # broken 

2193 return True 

2194 

2195 if pkg.architecture == "all" and myarch not in self._nobreakall_arches: 

2196 # arch all on non nobreakarch is allowed to become uninstallable 

2197 return True 

2198 

2199 if pkg.pkg_id.package_name in self._allow_uninst[myarch]: 

2200 # there is a hint to allow this binary to become uninstallable 

2201 return True 

2202 

2203 if not target_suite.is_installable(pkg.pkg_id): 

2204 # it is already uninstallable in the target suite, migration 

2205 # cannot break anything 

2206 return True 

2207 

2208 return False 

2209 

2210 def breaks_installability( 

2211 self, 

2212 pkg_id_t: BinaryPackageId, 

2213 pkg_id_s: BinaryPackageId | None, 

2214 pkg_to_check: BinaryPackageId, 

2215 ) -> bool: 

2216 """ 

2217 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of 

2218 pkg_to_check. 

2219 

2220 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to 

2221 None. 

2222 """ 

2223 

2224 pkg_universe = self._pkg_universe 

2225 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check) 

2226 

2227 for dep in pkg_universe.dependencies_of(pkg_to_check): 

2228 if pkg_id_t not in dep: 

2229 # this depends doesn't have pkg_id_t as alternative, so 

2230 # upgrading pkg_id_t cannot break this dependency clause 

2231 continue 

2232 

2233 # We check all the alternatives for this dependency, to find one 

2234 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s 

2235 found_alternative = False 

2236 for d in dep: 

2237 if d in negative_deps: 

2238 # If this alternative dependency conflicts with 

2239 # pkg_to_check, it cannot be used to satisfy the 

2240 # dependency. 

2241 # This commonly happens when breaks are added to pkg_id_s. 

2242 continue 

2243 

2244 if d.package_name != pkg_id_t.package_name: 

2245 # a binary different from pkg_id_t can satisfy the dep, so 

2246 # upgrading pkg_id_t won't break this dependency 

2247 found_alternative = True 

2248 break 

2249 

2250 if d != pkg_id_s: 

2251 # We want to know the impact of the upgrade of 

2252 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the 

2253 # target suite, any other version of this binary will 

2254 # not be there, so it cannot satisfy this dependency. 

2255 # This includes pkg_id_t, but also other versions. 

2256 continue 

2257 

2258 # pkg_id_s can satisfy the dep 

2259 found_alternative = True 

2260 

2261 if not found_alternative: 

2262 return True 

2263 return False 

2264 

2265 def check_upgrade( 

2266 self, 

2267 pkg_id_t: BinaryPackageId, 

2268 pkg_id_s: BinaryPackageId | None, 

2269 source_name: str, 

2270 myarch: str, 

2271 broken_binaries: set[str], 

2272 excuse: "Excuse", 

2273 ) -> PolicyVerdict: 

2274 verdict = PolicyVerdict.PASS 

2275 

2276 pkg_universe = self._pkg_universe 

2277 all_binaries = self._all_binaries 

2278 

2279 # check all rdeps of the package in testing 

2280 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t) 

2281 

2282 for rdep_pkg in sorted(rdeps_t): 

2283 rdep_p = all_binaries[rdep_pkg] 

2284 

2285 # check some cases where the rdep won't become uninstallable, or 

2286 # where we don't care if it does 

2287 if self.should_skip_rdep(rdep_p, source_name, myarch): 

2288 continue 

2289 

2290 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg): 

2291 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg, 

2292 # there is no implicit dependency 

2293 continue 

2294 

2295 # The upgrade breaks the installability of the rdep. We need to 

2296 # find out if there is a newer version of the rdep that solves the 

2297 # uninstallability. If that is the case, there is an implicit 

2298 # dependency. If not, the upgrade will fail. 

2299 

2300 # check source versions 

2301 newer_versions = find_newer_binaries( 

2302 self.suite_info, rdep_p, add_source_for_dropped_bin=True 

2303 ) 

2304 good_newer_versions = set() 

2305 for npkg, suite in newer_versions: 

2306 if npkg.architecture == "source": 

2307 # When a newer version of the source package doesn't have 

2308 # the binary, we get the source as 'newer version'. In 

2309 # this case, the binary will not be uninstallable if the 

2310 # newer source migrates, because it is no longer there. 

2311 good_newer_versions.add(npkg) 

2312 continue 

2313 assert isinstance(npkg, BinaryPackageId) 

2314 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg): 

2315 good_newer_versions.add(npkg) 

2316 

2317 if good_newer_versions: 

2318 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch) 

2319 excuse.add_package_depends(spec, good_newer_versions) 

2320 else: 

2321 # no good newer versions: no possible solution 

2322 broken_binaries.add(rdep_pkg.name) 

2323 if pkg_id_s: 

2324 action = "migrating {} to {}".format( 

2325 pkg_id_s.name, 

2326 self.suite_info.target_suite.name, 

2327 ) 

2328 else: 

2329 action = "removing {} from {}".format( 

2330 pkg_id_t.name, 

2331 self.suite_info.target_suite.name, 

2332 ) 

2333 if rdep_pkg[0].endswith("-faux-build-depends"): 

2334 name = rdep_pkg[0].replace("-faux-build-depends", "") 

2335 info = f'{action} makes Build-Depends of src:<a href="#{name}">{name}</a> uninstallable' 

2336 else: 

2337 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format( 

2338 action, rdep_pkg.name 

2339 ) 

2340 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2341 excuse.add_verdict_info(verdict, info) 

2342 

2343 return verdict 

2344 

2345 def apply_srcarch_policy_impl( 

2346 self, 

2347 implicit_dep_info: dict[str, Any], 

2348 arch: str, 

2349 source_data_tdist: SourcePackage | None, 

2350 source_data_srcdist: SourcePackage, 

2351 excuse: "Excuse", 

2352 ) -> PolicyVerdict: 

2353 verdict = PolicyVerdict.PASS 

2354 

2355 if not source_data_tdist: 

2356 # this item is not currently in testing: no implicit dependency 

2357 return verdict 

2358 

2359 if excuse.hasreason("missingbuild"): 

2360 # if the build is missing, the policy would treat this as if the 

2361 # binaries would be removed, which would give incorrect (and 

2362 # confusing) info 

2363 info = "missing build, not checking implicit dependencies on %s" % (arch) 

2364 excuse.add_detailed_info(info) 

2365 return verdict 

2366 

2367 source_suite = excuse.item.suite 

2368 source_name = excuse.item.package 

2369 target_suite = self.suite_info.target_suite 

2370 all_binaries = self._all_binaries 

2371 

2372 # we check all binaries for this excuse that are currently in testing 

2373 relevant_binaries = [ 

2374 x 

2375 for x in source_data_tdist.binaries 

2376 if (arch == "source" or x.architecture == arch) 

2377 and x.package_name in target_suite.binaries[x.architecture] 

2378 and x.architecture not in self._new_arches 

2379 and x.architecture not in self._break_arches 

2380 and x.architecture not in self._outofsync_arches 

2381 ] 

2382 

2383 broken_binaries: set[str] = set() 

2384 

2385 assert self.hints is not None 

2386 for pkg_id_t in sorted(relevant_binaries): 

2387 mypkg = pkg_id_t.package_name 

2388 myarch = pkg_id_t.architecture 

2389 binaries_t_a = target_suite.binaries[myarch] 

2390 binaries_s_a = source_suite.binaries[myarch] 

2391 

2392 if target_suite.is_cruft(all_binaries[pkg_id_t]): 

2393 # this binary is cruft in testing: it will stay around as long 

2394 # as necessary to satisfy dependencies, so we don't need to 

2395 # care 

2396 continue 

2397 

2398 if mypkg in binaries_s_a: 

2399 mybin = binaries_s_a[mypkg] 

2400 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id 

2401 if mybin.source != source_name: 

2402 # hijack: this is too complicated to check, so we ignore 

2403 # it (the migration code will check the installability 

2404 # later anyway) 

2405 pass 

2406 elif mybin.source_version != source_data_srcdist.version: 

2407 # cruft in source suite: pretend the binary doesn't exist 

2408 pkg_id_s = None 

2409 elif pkg_id_t == pkg_id_s: 

2410 # same binary (probably arch: all from a binNMU): 

2411 # 'upgrading' doesn't change anything, for this binary, so 

2412 # it won't break anything 

2413 continue 

2414 else: 

2415 pkg_id_s = None 

2416 

2417 if not pkg_id_s and is_smooth_update_allowed( 

2418 binaries_t_a[mypkg], self._smooth_updates, self.hints 

2419 ): 

2420 # the binary isn't in the new version (or is cruft there), and 

2421 # smooth updates are allowed: the binary can stay around if 

2422 # that is necessary to satisfy dependencies, so we don't need 

2423 # to check it 

2424 continue 

2425 

2426 if ( 

2427 not pkg_id_s 

2428 and source_data_tdist.version == source_data_srcdist.version 

2429 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE 

2430 and binaries_t_a[mypkg].architecture == "all" 

2431 ): 

2432 # we're very probably migrating a binNMU built in tpu where the arch:all 

2433 # binaries were not copied to it as that's not needed. This policy could 

2434 # needlessly block. 

2435 continue 

2436 

2437 v = self.check_upgrade( 

2438 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse 

2439 ) 

2440 verdict = PolicyVerdict.worst_of(verdict, v) 

2441 

2442 # each arch is processed separately, so if we already have info from 

2443 # other archs, we need to merge the info from this arch 

2444 broken_old = set() 

2445 if "implicit-deps" not in implicit_dep_info: 

2446 implicit_dep_info["implicit-deps"] = {} 

2447 else: 

2448 broken_old = set(implicit_dep_info["implicit-deps"]["broken-binaries"]) 

2449 

2450 implicit_dep_info["implicit-deps"]["broken-binaries"] = sorted( 

2451 broken_old | broken_binaries 

2452 ) 

2453 

2454 return verdict 

2455 

2456 

2457class ReverseRemovalPolicy(AbstractBasePolicy): 

2458 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2459 super().__init__( 

2460 "reverseremoval", 

2461 options, 

2462 suite_info, 

2463 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE}, 

2464 ) 

2465 

2466 def register_hints(self, hint_parser: HintParser) -> None: 

2467 hint_parser.register_hint_type(HintType("ignore-reverse-remove")) 

2468 

2469 def initialise(self, britney: "Britney") -> None: 

2470 super().initialise(britney) 

2471 

2472 pkg_universe = britney.pkg_universe 

2473 source_suites = britney.suite_info.source_suites 

2474 target_suite = britney.suite_info.target_suite 

2475 

2476 # Build set of the sources of reverse (Build-) Depends 

2477 assert self.hints is not None 

2478 hints = self.hints.search("remove") 

2479 

2480 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set) 

2481 for hint in hints: 

2482 for item in hint.packages: 

2483 # I think we don't need to look at the target suite 

2484 for src_suite in source_suites: 

2485 try: 

2486 # Explicitly not running filter_out_faux here 

2487 my_bins = set(src_suite.sources[item.uvname].binaries) 

2488 except KeyError: 

2489 continue 

2490 compute_reverse_tree(pkg_universe, my_bins) 

2491 for this_bin in my_bins: 

2492 rev_bin.setdefault(this_bin, set()).add(item.uvname) 

2493 

2494 rev_src: dict[str, set[str]] = defaultdict(set) 

2495 for bin_pkg, reasons in rev_bin.items(): 

2496 # If the pkg is in the target suite, there's nothing this 

2497 # policy wants to do. 

2498 if target_suite.is_pkg_in_the_suite(bin_pkg): 

2499 continue 

2500 that_bin = britney.all_binaries[bin_pkg] 

2501 bin_src = that_bin.source + "/" + that_bin.source_version 

2502 rev_src.setdefault(bin_src, set()).update(reasons) 

2503 self._block_src_for_rm_hint = rev_src 

2504 

2505 def apply_src_policy_impl( 

2506 self, 

2507 rev_remove_info: dict[str, Any], 

2508 source_data_tdist: SourcePackage | None, 

2509 source_data_srcdist: SourcePackage, 

2510 excuse: "Excuse", 

2511 ) -> PolicyVerdict: 

2512 verdict = PolicyVerdict.PASS 

2513 

2514 item = excuse.item 

2515 if item.name in self._block_src_for_rm_hint: 

2516 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name])) 

2517 assert self.hints is not None 

2518 ignore_hints = self.hints.search( 

2519 "ignore-reverse-remove", package=item.uvname, version=item.version 

2520 ) 

2521 excuse.addreason("reverseremoval") 

2522 if ignore_hints: 

2523 excuse.addreason("ignore-reverse-remove") 

2524 excuse.addinfo( 

2525 "Should block migration because of remove hint for %s, but forced by %s" 

2526 % (reason, ignore_hints[0].user) 

2527 ) 

2528 verdict = PolicyVerdict.PASS_HINTED 

2529 else: 

2530 excuse.addinfo("Remove hint for (transitive) dependency: %s" % reason) 

2531 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2532 

2533 return verdict 

2534 

2535 

2536class ReproduciblePolicy(AbstractBasePolicy): 

2537 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

2538 super().__init__( 

2539 "reproducible", 

2540 options, 

2541 suite_info, 

2542 {SuiteClass.PRIMARY_SOURCE_SUITE}, 

2543 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY, 

2544 ) 

2545 self._reproducible: dict[str, Any] = {} 

2546 

2547 # Default values for this policy's options 

2548 parse_option(options, "repro_success_bounty", default=0, to_int=True) 

2549 parse_option(options, "repro_regression_penalty", default=0, to_int=True) 

2550 parse_option(options, "repro_log_url") 

2551 parse_option(options, "repro_url") 

2552 parse_option(options, "repro_retry_url") 

2553 parse_option(options, "repro_components") 

2554 

2555 def register_hints(self, hint_parser: HintParser) -> None: 

2556 hint_parser.register_hint_type( 

2557 HintType( 

2558 "ignore-reproducible-src", 

2559 versioned=HintAnnotate.OPTIONAL, 

2560 architectured=HintAnnotate.OPTIONAL, 

2561 ) 

2562 ) 

2563 hint_parser.register_hint_type( 

2564 HintType( 

2565 "ignore-reproducible", 

2566 versioned=HintAnnotate.OPTIONAL, 

2567 architectured=HintAnnotate.OPTIONAL, 

2568 ) 

2569 ) 

2570 

2571 def initialise(self, britney: "Britney") -> None: 

2572 super().initialise(britney) 

2573 summary = self._reproducible 

2574 

2575 assert hasattr( 

2576 self, "state_dir" 

2577 ), "Please set STATE_DIR in the britney configuration" 

2578 assert ( 

2579 self.options.repro_components 

2580 ), "Please set REPRO_COMPONENTS in the britney configuration" 

2581 for file in os.listdir(self.state_dir): 

2582 if not file.startswith("reproducible-"): 2582 ↛ 2583line 2582 didn't jump to line 2583 because the condition on line 2582 was never true

2583 continue 

2584 filename = os.path.join(self.state_dir, file) 

2585 

2586 self.logger.info("Loading reproducibility report from %s", filename) 

2587 with open(filename) as fd: 2587 ↛ 2581line 2587 didn't jump to line 2581 because the continue on line 2589 wasn't executed

2588 if os.fstat(fd.fileno()).st_size < 1: 2588 ↛ 2589line 2588 didn't jump to line 2589 because the condition on line 2588 was never true

2589 continue 

2590 data = json.load(fd) 

2591 

2592 for result in data["records"]: 

2593 if result["component"] in self.options.repro_components: 2593 ↛ 2592line 2593 didn't jump to line 2592 because the condition on line 2593 was always true

2594 summary.setdefault(result["architecture"], {})[ 

2595 (result["name"], result["version"]) 

2596 ] = result 

2597 

2598 def _create_link_to_log(self, arch: str, failed_bpids: set[BinaryPackageId]) -> str: 

2599 link = "" 

2600 for bpid in sorted(failed_bpids): 

2601 link += f"{bpid[0]}, " 

2602 link = link.strip(", ") # remove end 

2603 

2604 if self.options.repro_log_url: 2604 ↛ 2615line 2604 didn't jump to line 2615 because the condition on line 2604 was always true

2605 # log should be the same for all binaries, using the last bpid 

2606 try: 

2607 log = self._reproducible[arch][(bpid[0], bpid[1])]["build_id"] 

2608 except KeyError: 

2609 log = self._reproducible["all"][(bpid[0], bpid[1])]["build_id"] 

2610 url_log = self.options.repro_log_url.format( 

2611 package=quote(bpid[0]), arch=arch, log=log 

2612 ) 

2613 return f': <a href="{url_log}">{link}</a>' 

2614 else: 

2615 return ": " + link 

2616 

2617 def apply_srcarch_policy_impl( 

2618 self, 

2619 policy_info: dict[str, Any], 

2620 arch: str, 

2621 source_data_tdist: SourcePackage | None, 

2622 source_data_srcdist: SourcePackage, 

2623 excuse: "Excuse", 

2624 ) -> PolicyVerdict: 

2625 verdict = PolicyVerdict.PASS 

2626 eligible_for_bounty = False 

2627 

2628 # we don't want to apply this policy (yet) on binNMUs 

2629 if excuse.item.architecture != "source": 2629 ↛ 2630line 2629 didn't jump to line 2630 because the condition on line 2629 was never true

2630 return verdict 

2631 

2632 # we're not supposed to judge on this arch 

2633 if arch not in self.options.repro_arches: 2633 ↛ 2634line 2633 didn't jump to line 2634 because the condition on line 2633 was never true

2634 return verdict 

2635 

2636 # bail out if this arch has no packages for this source (not build 

2637 # here) 

2638 if arch not in excuse.packages: 2638 ↛ 2639line 2638 didn't jump to line 2639 because the condition on line 2638 was never true

2639 return verdict 

2640 

2641 # horrible hard-coding, but currently, we don't keep track of the 

2642 # component when loading the packages files 

2643 component = "main" 

2644 if "/" in (section := source_data_srcdist.section): 2644 ↛ 2645line 2644 didn't jump to line 2645 because the condition on line 2644 was never true

2645 component = section.split("/")[0] 

2646 

2647 if ( 2647 ↛ 2651line 2647 didn't jump to line 2651

2648 self.options.repro_components 

2649 and component not in self.options.repro_components.split() 

2650 ): 

2651 return verdict 

2652 

2653 source_name = excuse.item.package 

2654 

2655 if self.options.repro_url: 2655 ↛ 2656line 2655 didn't jump to line 2656 because the condition on line 2655 was never true

2656 url = self.options.repro_url.format(package=quote(source_name), arch=arch) 

2657 url_html = ' - <a href="%s">info</a>' % url 

2658 if self.options.repro_retry_url: 

2659 url_html += ( 

2660 ' <a href="%s">♻ </a>' 

2661 % self.options.repro_retry_url.format( 

2662 package=quote(source_name), arch=arch 

2663 ) 

2664 ) 

2665 # When run on multiple archs, the last one "wins" 

2666 policy_info["status-url"] = url 

2667 else: 

2668 url = None 

2669 url_html = "" 

2670 

2671 try: 

2672 repro = self._reproducible[arch].copy() 

2673 except KeyError: 

2674 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2675 msg = f"No reproducibility data available at all for {arch}" 

2676 excuse.add_verdict_info(verdict, msg) 

2677 return verdict 

2678 try: 

2679 repro |= self._reproducible["all"] 

2680 except KeyError: 

2681 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2682 msg = "No reproducibility data available at all for arch:all" 

2683 excuse.add_verdict_info(verdict, msg) 

2684 return verdict 

2685 

2686 # skip/delay policy until both arch:arch and arch:all builds are done 

2687 if (arch or "all") in excuse.missing_builds: 2687 ↛ 2688line 2687 didn't jump to line 2688 because the condition on line 2687 was never true

2688 self.logger.debug( 

2689 f"{excuse.name} not built for {arch} or all, skipping reproducible policy", 

2690 ) 

2691 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2692 excuse.add_verdict_info( 

2693 verdict, 

2694 f"Reproducibility check deferred on {arch}: missing builds", 

2695 ) 

2696 return verdict 

2697 

2698 source_suite_state = "not-unknown" 

2699 failed_bpids: set[BinaryPackageId] = set() 

2700 # The states should either be GOOD/BAD for all binaries, UNKNOWN for all 

2701 # binaries, or missing for all binaries, but let's not assume that. 

2702 for bpid in filter_out_faux(source_data_srcdist.binaries): 

2703 if bpid[2] not in ("all", arch): 2703 ↛ 2704line 2703 didn't jump to line 2704 because the condition on line 2703 was never true

2704 continue 

2705 try: 

2706 state = repro[(bpid[0], bpid[1])]["status"] 

2707 assert state in [ 

2708 "BAD", 

2709 "FAIL", 

2710 "GOOD", 

2711 "UNKNOWN", 

2712 None, 

2713 ], f"Unexpected reproducible state {state}" 

2714 self.logger.debug(f"repro data for {bpid}: {state}") 

2715 if state == "BAD": 

2716 failed_bpids.add(bpid) 

2717 # not changing source_suite_state here on purpose 

2718 elif state is None or state in ("FAIL", "UNKNOWN"): 2718 ↛ 2719line 2718 didn't jump to line 2719 because the condition on line 2718 was never true

2719 source_suite_state = "unknown" 

2720 except KeyError: 

2721 self.logger.debug(f"No repro data found for {bpid}") 

2722 source_suite_state = "unknown" 

2723 break 

2724 

2725 if source_suite_state == "not-unknown": 

2726 source_suite_state = "known" 

2727 

2728 excuse_info = [] 

2729 if source_suite_state == "unknown": 

2730 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2731 excuse_info.append( 

2732 f"Reproducibility check waiting for results on {arch}{url_html}" 

2733 ) 

2734 policy_info.setdefault("state", {}).setdefault(arch, "unavailable") 

2735 elif failed_bpids: 

2736 ignored_bpids: set[BinaryPackageId] = set() 

2737 if source_data_tdist is None: 2737 ↛ 2738line 2737 didn't jump to line 2738 because the condition on line 2737 was never true

2738 target_suite_state = "new" 

2739 else: 

2740 target_suite_state = "reproducible" 

2741 for bpid in list(failed_bpids): 

2742 pkg_name = bpid[0] 

2743 for bpid_t in filter_out_faux(source_data_tdist.binaries): 

2744 if bpid_t[2] not in ("all", arch): 2744 ↛ 2745line 2744 didn't jump to line 2745 because the condition on line 2744 was never true

2745 continue 

2746 if pkg_name != bpid_t[0]: 2746 ↛ 2747line 2746 didn't jump to line 2747 because the condition on line 2746 was never true

2747 continue 

2748 try: 

2749 state = repro[(pkg_name, bpid_t[1])]["status"] 

2750 assert state in [ 

2751 "BAD", 

2752 "FAIL", 

2753 "GOOD", 

2754 "UNKNOWN", 

2755 None, 

2756 ], f"Unexpected reproducible state {state}" 

2757 self.logger.debug( 

2758 f"testing repro data for {bpid_t}: {state}" 

2759 ) 

2760 if state == "BAD": 

2761 ignored_bpids.add(bpid) 

2762 elif state is None or state in ("FAIL", "UNKNOWN"): 2762 ↛ 2763line 2762 didn't jump to line 2763 because the condition on line 2762 was never true

2763 target_suite_state = "unknown" 

2764 except KeyError: 

2765 self.logger.debug( 

2766 f"No testing repro data found for {bpid_t}" 

2767 ) 

2768 # This shouldn't happen as for the past migration 

2769 # to have been allowed, there should be data. 

2770 target_suite_state = "unknown" 

2771 break 

2772 

2773 # Reminder: code here is part of the non-reproducibile source-suite branch 

2774 if target_suite_state == "new": 2774 ↛ 2775line 2774 didn't jump to line 2775 because the condition on line 2774 was never true

2775 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2776 excuse_info.append( 

2777 f"New but not reproducible on {arch}{url_html}" 

2778 f"{self._create_link_to_log(arch, failed_bpids)}" 

2779 ) 

2780 policy_info.setdefault("state", {}).setdefault( 

2781 arch, "new but not reproducible" 

2782 ) 

2783 elif target_suite_state == "unknown": 2783 ↛ 2785line 2783 didn't jump to line 2785 because the condition on line 2783 was never true

2784 # Shouldn't happen after initial bootstrap once blocking 

2785 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

2786 excuse_info.append( 

2787 f"Reproducibility check failed and now waiting for reference " 

2788 f"results on {arch}{url_html}" 

2789 ) 

2790 policy_info.setdefault("state", {}).setdefault( 

2791 arch, "waiting for reference" 

2792 ) 

2793 elif len(failed_bpids - ignored_bpids) == 0: 

2794 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2795 excuse_info.append( 

2796 f"Not reproducible on {arch} (not a regression)" 

2797 f"{self._create_link_to_log(arch, failed_bpids)}" 

2798 ) 

2799 policy_info.setdefault("state", {}).setdefault(arch, "not reproducible") 

2800 else: 

2801 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

2802 excuse_info.append( 

2803 f"Reproducibility regression on {arch}" 

2804 f"{self._create_link_to_log(arch, failed_bpids - ignored_bpids)}" 

2805 ) 

2806 policy_info.setdefault("state", {}).setdefault(arch, "regression") 

2807 

2808 # non-reproducible source-suite cases are handled above, so here we 

2809 # handle the last of the source-suite cases 

2810 else: 

2811 excuse_info.append(f"Reproducible on {arch}{url_html}") 

2812 policy_info.setdefault("state", {}).setdefault(arch, "reproducible") 

2813 eligible_for_bounty = True 

2814 

2815 if verdict.is_rejected: 

2816 assert self.hints is not None 

2817 for hint_arch in ("source", arch): 

2818 ignore_hints = self.hints.search( 

2819 "ignore-reproducible-src", 

2820 package=source_name, 

2821 version=source_data_srcdist.version, 

2822 architecture=hint_arch, 

2823 ) 

2824 # one hint is enough, take the first one encountered 

2825 if ignore_hints: 

2826 verdict = PolicyVerdict.PASS_HINTED 

2827 policy_info.setdefault("hints", {}).setdefault(arch, []).append( 

2828 ignore_hints[0].user + ": " + str(ignore_hints[0]) 

2829 ) 

2830 if hint_arch == arch: 2830 ↛ 2833line 2830 didn't jump to line 2833 because the condition on line 2830 was always true

2831 on_arch = f" on {arch}" 

2832 else: 

2833 on_arch = "" 

2834 excuse_info.append( 

2835 f"Reproducibility issues ignored for src:{ignore_hints[0].package}" 

2836 f"{on_arch} as requested by {ignore_hints[0].user}" 

2837 ) 

2838 break 

2839 

2840 if verdict.is_rejected: 

2841 all_hints = [] 

2842 all_hinted = True 

2843 any_hinted = False 

2844 

2845 if source_suite_state == "known": 

2846 check_bpids = failed_bpids 

2847 else: 

2848 # Let's not wait for results if all binaries have a hint 

2849 check_bpids = filter_out_faux(source_data_srcdist.binaries) 

2850 missed_bpids = set() 

2851 

2852 for bpid in check_bpids: 

2853 bpid_hints = self.hints.search( 

2854 "ignore-reproducible", 

2855 package=bpid[0], 

2856 version=bpid[1], 

2857 architecture=bpid[2], 

2858 ) 

2859 if bpid_hints: 

2860 # one hint per binary is enough 

2861 all_hints.append(bpid_hints[0]) 

2862 any_hinted = True 

2863 self.logger.debug( 

2864 f"repro: hint found for {source_name}: {bpid}" 

2865 ) 

2866 else: 

2867 missed_bpids.add(bpid) 

2868 all_hinted = False 

2869 

2870 if all_hinted: 

2871 verdict = PolicyVerdict.PASS_HINTED 

2872 for hint in all_hints: 

2873 policy_info.setdefault("hints", {}).setdefault(arch, []).append( 

2874 hint.user + ": " + str(hint) 

2875 ) 

2876 # TODO: we're going to print this for arch:all binaries on each arch 

2877 excuse_info.append( 

2878 f"Reproducibility issues ignored for {hint.package} on {arch} as " 

2879 f"requested by {hint.user}" 

2880 ) 

2881 elif any_hinted: 2881 ↛ 2882line 2881 didn't jump to line 2882 because the condition on line 2881 was never true

2882 self.logger.info( 

2883 f"repro: binary hints for {source_name} ignored as they don't " 

2884 f"cover these binaries {missed_bpids}" 

2885 ) 

2886 

2887 if self.options.repro_success_bounty and eligible_for_bounty: 2887 ↛ 2888line 2887 didn't jump to line 2888 because the condition on line 2887 was never true

2888 excuse.add_bounty("reproducibility", self.options.repro_success_bounty) 

2889 

2890 if verdict.is_rejected and self.options.repro_regression_penalty: 2890 ↛ 2892line 2890 didn't jump to line 2892 because the condition on line 2890 was never true

2891 # With a non-zero penalty, we shouldn't block on this policy 

2892 verdict = PolicyVerdict.PASS 

2893 if self.options.repro_regression_penalty > 0: 

2894 excuse.add_penalty( 

2895 "reproducibility", self.options.repro_regression_penalty 

2896 ) 

2897 

2898 for msg in excuse_info: 

2899 if verdict.is_rejected: 

2900 excuse.add_verdict_info(verdict, msg) 

2901 else: 

2902 excuse.addinfo(msg) 

2903 

2904 return verdict