Coverage for britney2/hints.py: 93%

195 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-06-17 09:00 +0000

1# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org> 

2 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12 

13import logging 

14from collections.abc import Generator, Iterable 

15from dataclasses import dataclass 

16from enum import Enum, unique 

17from itertools import chain 

18from typing import TYPE_CHECKING, Any, Optional, Protocol 

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true

21 from . import Suite 

22 from .migrationitem import MigrationItem, MigrationItemFactory 

23 

24 

25class MalformedHintException(Exception): 

26 pass 

27 

28 

29@unique 

30class HintAnnotate(Enum): 

31 REQUIRED = 1 

32 FORBIDDEN = 2 

33 OPTIONAL = 3 

34 

35 

36class HintCollection: 

37 def __init__(self) -> None: 

38 self._hints: dict[str, list["Hint"]] = {} 

39 

40 @property 

41 def is_empty(self) -> bool: 

42 return not self._hints 

43 

44 def __getitem__(self, type: str) -> list["Hint"]: 

45 """Get hints per type""" 

46 return self._hints[type] if type in self._hints else [] 

47 

48 def _search( 

49 self, 

50 type: str | None = None, 

51 *, 

52 package: str | None = None, 

53 version: str | None = None, 

54 architecture: str | None = None, 

55 suite: Optional["Suite"] = None, 

56 removal: bool | None = None, 

57 ) -> Generator["Hint", None, None]: 

58 for itype in (type,) if type is not None else self._hints.keys(): 

59 if itype not in self._hints: 

60 continue 

61 

62 for hint in self._hints[itype]: 

63 if ( 

64 hint.active 

65 and (package is None or package == hint.packages[0].package) 

66 and ( 

67 version is None 

68 or hint.packages[0].version is None 

69 or version == hint.packages[0].version 

70 ) 

71 and ( 

72 architecture is None 

73 or hint.packages[0].architecture == "source" 

74 or architecture == hint.packages[0].architecture 

75 ) 

76 and (suite is None or suite == hint.packages[0].suite) 

77 and (removal is None or removal == hint.packages[0].is_removal) 

78 ): 

79 yield hint 

80 

81 def search( 

82 self, 

83 type: str | None = None, 

84 *, 

85 package: str | None = None, 

86 version: str | None = None, 

87 architecture: str | None = None, 

88 suite: Optional["Suite"] = None, 

89 removal: bool | None = None, 

90 ) -> list["Hint"]: 

91 return [ 

92 hint 

93 for hint in self._search( 

94 type, 

95 package=package, 

96 version=version, 

97 architecture=architecture, 

98 suite=suite, 

99 removal=removal, 

100 ) 

101 ] 

102 

103 def search_first( 

104 self, 

105 type: str | None = None, 

106 *, 

107 package: str | None = None, 

108 version: str | None = None, 

109 architecture: str | None = None, 

110 suite: Optional["Suite"] = None, 

111 removal: bool | None = None, 

112 ) -> "Hint | None": 

113 for hint in self._search( 

114 type, 

115 package=package, 

116 version=version, 

117 architecture=architecture, 

118 suite=suite, 

119 removal=removal, 

120 ): 

121 return hint 

122 else: 

123 return None 

124 

125 def has_hint( 

126 self, 

127 type: str | None = None, 

128 *, 

129 package: str | None = None, 

130 version: str | None = None, 

131 architecture: str | None = None, 

132 suite: Optional["Suite"] = None, 

133 removal: bool | None = None, 

134 ) -> bool: 

135 return ( 

136 self.search_first( 

137 type, 

138 package=package, 

139 version=version, 

140 architecture=architecture, 

141 suite=suite, 

142 removal=removal, 

143 ) 

144 is not None 

145 ) 

146 

147 def add_hint(self, hint: "Hint") -> None: 

148 if hint.type not in self._hints: 

149 self._hints[hint.type] = [hint] 

150 else: 

151 self._hints[hint.type].append(hint) 

152 

153 def remove_inactive_hints(self) -> None: 

154 for hint_type, hints in self._hints.items(): 

155 self._hints[hint_type] = [hint for hint in hints if hint.active] 

156 

157 

158class PolicyHintParserProto(Protocol): 

159 def __call__( 159 ↛ exitline 159 didn't jump to the function exit

160 self, 

161 mi_factory: "MigrationItemFactory", 

162 hints: HintCollection, 

163 who: str, 

164 hint_type: "HintType", 

165 /, 

166 *args: str, 

167 ) -> None: ... 

168 

169 

170def split_into_one_hint_per_package( 

171 mi_factory: "MigrationItemFactory", 

172 hints: HintCollection, 

173 who: str, 

174 hint_type: "HintType", 

175 /, 

176 *args: str, 

177) -> None: 

178 for item in mi_factory.parse_items(*args): 

179 hints.add_hint(Hint(who, hint_type, [item])) 

180 

181 

182def single_hint_taking_list_of_packages( 

183 mi_factory: "MigrationItemFactory", 

184 hints: HintCollection, 

185 who: str, 

186 hint_type: "HintType", 

187 /, 

188 *args: str, 

189) -> None: 

190 hints.add_hint(Hint(who, hint_type, mi_factory.parse_items(*args))) 

191 

192 

193@dataclass(frozen=True, slots=True) 

194class HintType: 

195 hint_name: str 

196 parser_function: PolicyHintParserProto = split_into_one_hint_per_package 

197 min_args: int = 1 

198 versioned: HintAnnotate = HintAnnotate.REQUIRED 

199 architectured: HintAnnotate = HintAnnotate.FORBIDDEN 

200 

201 def __post_init__( 

202 self, 

203 ) -> None: 

204 assert ( 

205 self.min_args > 0 or self.hint_name == "remark" 

206 ), "min_args must be at least 1" 

207 

208 

209class Hint: 

210 def __init__( 

211 self, 

212 user: str, 

213 hint_type: HintType, 

214 packages: list["MigrationItem"], 

215 ) -> None: 

216 self._user = user 

217 self._active = True 

218 self._type = hint_type 

219 self._packages = packages 

220 

221 self.check() 

222 

223 def check(self) -> None: 

224 for package in self.packages: 

225 if ( 

226 self._type.versioned is HintAnnotate.FORBIDDEN 

227 and package.version is not None 

228 ): 

229 raise MalformedHintException( 

230 f'"{self.type}" needs unversioned packages, got "{package}"' 

231 ) 

232 elif ( 

233 self._type.versioned is HintAnnotate.REQUIRED 

234 and package.version is None 

235 ): 

236 raise MalformedHintException( 

237 f'"{self.type}" needs versioned packages, got "{package}"' 

238 ) 

239 if ( 

240 self._type.architectured is HintAnnotate.REQUIRED 

241 and package.architecture == "source" 

242 ): 

243 raise MalformedHintException( 

244 f'"{self.type}" needs to be architecture specific, got {package}' 

245 ) 

246 elif ( 

247 self._type.architectured is HintAnnotate.FORBIDDEN 

248 and package.architecture != "source" 

249 ): 

250 raise MalformedHintException( 

251 f'"{self.type}" must not be architecture specific, got {package}' 

252 ) 

253 

254 def set_active(self, active: bool) -> None: 

255 self._active = active 

256 

257 def __str__(self) -> str: 

258 if self._type.versioned is HintAnnotate.FORBIDDEN: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 return " ".join(chain((self.type,), (x.uvname for x in self._packages))) 

260 else: 

261 return " ".join(chain((self.type,), (x.name for x in self._packages))) 

262 

263 def __eq__(self, other: Any) -> bool: 

264 if self.type != other.type: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 return False 

266 else: 

267 # we can't use sets, because unversioned items cannot be hashed 

268 return sorted(self.packages) == sorted(other.packages) 

269 

270 @property 

271 def type(self) -> str: 

272 return self._type.hint_name 

273 

274 @property 

275 def packages(self) -> list["MigrationItem"]: 

276 return self._packages 

277 

278 @property 

279 def active(self) -> bool: 

280 return self._active 

281 

282 @property 

283 def user(self) -> str: 

284 return self._user 

285 

286 @property 

287 def package(self) -> str | None: 

288 if self.packages: 288 ↛ 292line 288 didn't jump to line 292 because the condition on line 288 was always true

289 assert len(self.packages) == 1, self.packages 

290 return self.packages[0].package 

291 else: 

292 return None 

293 

294 @property 

295 def version(self) -> str | None: 

296 if self.packages: 296 ↛ 300line 296 didn't jump to line 300 because the condition on line 296 was always true

297 assert len(self.packages) == 1, self.packages 

298 return self.packages[0].version 

299 else: 

300 return None 

301 

302 @property 

303 def architecture(self) -> str | None: 

304 if self.packages: 304 ↛ 308line 304 didn't jump to line 308 because the condition on line 304 was always true

305 assert len(self.packages) == 1, self.packages 

306 return self.packages[0].architecture 

307 else: 

308 return None 

309 

310 @property 

311 def suite(self) -> Optional["Suite"]: 

312 if self.packages: 312 ↛ 316line 312 didn't jump to line 316 because the condition on line 312 was always true

313 assert len(self.packages) == 1, self.packages 

314 return self.packages[0].suite 

315 else: 

316 return None 

317 

318 

319class HintParser: 

320 def __init__(self, mi_factory: "MigrationItemFactory") -> None: 

321 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

322 self.logger = logging.getLogger(logger_name) 

323 self.hints = HintCollection() 

324 self.mi_factory = mi_factory 

325 

326 self._aliases: dict[str, str] = { 

327 "approve": "unblock", 

328 } 

329 

330 FORBIDDEN = HintAnnotate.FORBIDDEN 

331 OPTIONAL = HintAnnotate.OPTIONAL 

332 self._hint_set: set[HintType] = set() 

333 # Migration grouping hints 

334 self.register_hint_type(HintType("remark", lambda *x: None, 0)) 334 ↛ exitline 334 didn't run the lambda on line 334

335 # Easy needs at least 2 to make sense 

336 self.register_hint_type( 

337 HintType( 

338 "easy", single_hint_taking_list_of_packages, 2, architectured=OPTIONAL 

339 ) 

340 ) 

341 self.register_hint_type( 

342 HintType( 

343 "force-hint", 

344 single_hint_taking_list_of_packages, 

345 architectured=OPTIONAL, 

346 ) 

347 ) 

348 self.register_hint_type( 

349 HintType( 

350 "hint", single_hint_taking_list_of_packages, architectured=OPTIONAL 

351 ) 

352 ) 

353 # Block / freeze related hints 

354 self.register_hint_type(HintType("block", versioned=FORBIDDEN)) 

355 self.register_hint_type(HintType("block-all", versioned=FORBIDDEN)) 

356 self.register_hint_type(HintType("block-udeb", versioned=FORBIDDEN)) 

357 self.register_hint_type(HintType("unblock", architectured=OPTIONAL)) 

358 self.register_hint_type(HintType("unblock-udeb", architectured=OPTIONAL)) 

359 # Other 

360 self.register_hint_type(HintType("remove")) 

361 self.register_hint_type(HintType("force", architectured=OPTIONAL)) 

362 self.register_hint_type( 

363 HintType("allow-uninst", versioned=FORBIDDEN, architectured=OPTIONAL) 

364 ) 

365 self.register_hint_type(HintType("allow-smooth-update")) 

366 

367 @property 

368 def registered_hint_names(self) -> set[str]: 

369 """A set of all known hint names (and aliases thereof)""" 

370 return set(chain((x.hint_name for x in self._hint_set), self._aliases.keys())) 

371 

372 @property 

373 def registered_hints(self) -> set[HintType]: 

374 """A set of all known hints""" 

375 return self._hint_set 

376 

377 def register_hint_type( 

378 self, 

379 hint_type: HintType, 

380 *, 

381 aliases: Iterable[str] | None = None, 

382 ) -> None: 

383 """Register a new hint that is supported by the parser 

384 

385 This registers a new hint that can be parsed by the hint parser. All hints are single words with a 

386 space-separated list of arguments (on a single line). The hint parser will do some basic processing, 

387 the permission checking and minor validation on the hint before passing it on to the parser function 

388 given. 

389 

390 The parser_function will receive the following arguments: 

391 * A hint collection 

392 * Identifier of the entity providing the hint 

393 * The hint_name (aliases will be mapped to the hint_name) 

394 * Zero or more string arguments for the hint (so the function needs to use *args) 

395 

396 The parser_function will then have to process the arguments and call the hint collection's "add_hint" 

397 as needed. Example implementations include "split_into_one_hint_per_package", which is used by almost 

398 all policy hints. 

399 

400 :param hint_type: The hint 

401 :param aliases: An optional iterable of aliases to the hint (use only for backwards compatibility) 

402 """ 

403 hint_name = hint_type.hint_name 

404 assert ( 

405 hint_name not in self._aliases 

406 ), f"The hint type {hint_name} is already registered as an alias of {self._aliases[hint_name]}" 

407 assert ( 

408 hint_name not in self.registered_hint_names 

409 ), f"The hint type {hint_name} is already registered" 

410 self._hint_set.add(hint_type) 

411 if aliases: 

412 for alias in aliases: 

413 self._aliases[alias] = hint_name 

414 

415 def parse_hints( 

416 self, 

417 who: str, 

418 permitted_hints: str | list[str], 

419 filename: str, 

420 lines: Iterable[str], 

421 ) -> None: 

422 reg_hints = self.registered_hints 

423 reg_hint_names = self.registered_hint_names 

424 hints = self.hints 

425 aliases = self._aliases 

426 mi_factory = self.mi_factory 

427 for line_no, line in enumerate(lines, 1): 

428 line = line.strip() 

429 if line == "" or line.startswith("#"): 

430 continue 

431 ln = line.split() 

432 hint_name = ln.pop(0) 

433 if hint_name in aliases: 

434 hint_name = aliases[hint_name] 

435 if hint_name == "finished": 

436 break 

437 if ( 

438 hint_name not in reg_hint_names 

439 ): # this includes aliases, but those are checked before 

440 self.logger.warning( 

441 "Unknown hint found in %s (line %d): '%s'", filename, line_no, line 

442 ) 

443 continue 

444 if hint_name not in permitted_hints and "ALL" not in permitted_hints: 

445 reason = "The hint is not a part of the permitted hints for " + who 

446 self.logger.info( 

447 'Ignoring "%s" hint from %s found in %s (line %d): %s', 

448 hint_name, 

449 who, 

450 filename, 

451 line_no, 

452 reason, 

453 ) 

454 continue 

455 hint_type = [x for x in reg_hints if x.hint_name == hint_name][0] 

456 min_args = hint_type.min_args 

457 if len(ln) < min_args: 

458 self.logger.warning( 

459 "Malformed hint found in %s (line %d): Needs at least %d argument(s), got %d", 

460 filename, 

461 line_no, 

462 min_args, 

463 len(ln), 

464 ) 

465 continue 

466 try: 

467 hint_type.parser_function(mi_factory, hints, who, hint_type, *ln) 

468 except MalformedHintException as e: 

469 self.logger.warning( 

470 'Malformed hint found in %s (line %d): "%s"', 

471 filename, 

472 line_no, 

473 e.args[0], 

474 ) 

475 continue