Coverage for britney2/hints.py: 93%

192 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-10-17 17:32 +0000

1# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org> 

2 

3# This program is free software; you can redistribute it and/or modify 

4# it under the terms of the GNU General Public License as published by 

5# the Free Software Foundation; either version 2 of the License, or 

6# (at your option) any later version. 

7 

8# This program is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU General Public License for more details. 

12 

13import logging 

14from collections.abc import Callable, Iterable 

15from enum import Enum, unique 

16from itertools import chain 

17from typing import TYPE_CHECKING, Any, Optional, Protocol, Union 

18 

19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true

20 from . import Suite 

21 from .migrationitem import MigrationItem, MigrationItemFactory 

22 

23 

24class MalformedHintException(Exception): 

25 pass 

26 

27 

28@unique 

29class HintAnnotate(Enum): 

30 REQUIRED = 1 

31 FORBIDDEN = 2 

32 OPTIONAL = 3 

33 

34 

35class HintCollection: 

36 def __init__(self) -> None: 

37 self._hints: list["Hint"] = [] 

38 

39 @property 

40 def is_empty(self) -> bool: 

41 return not self._hints 

42 

43 def __getitem__(self, type: str | None = None) -> list["Hint"]: 

44 return self.search(type) 

45 

46 def search( 

47 self, 

48 type: str | None = None, 

49 onlyactive: bool = True, 

50 package: str | None = None, 

51 version: str | None = None, 

52 architecture: str | None = None, 

53 suite: Optional["Suite"] = None, 

54 removal: bool | None = None, 

55 ) -> list["Hint"]: 

56 return [ 

57 hint 

58 for hint in self._hints 

59 if (type is None or type == hint.type) 

60 and (hint.active or not onlyactive) 

61 and (package is None or package == hint.packages[0].package) 

62 and ( 

63 version is None 

64 or hint.packages[0].version is None 

65 or version == hint.packages[0].version 

66 ) 

67 and ( 

68 architecture is None 

69 or hint.packages[0].architecture == "source" 

70 or architecture == hint.packages[0].architecture 

71 ) 

72 and (suite is None or suite == hint.packages[0].suite) 

73 and (removal is None or removal == hint.packages[0].is_removal) 

74 ] 

75 

76 def add_hint(self, hint: "Hint") -> None: 

77 self._hints.append(hint) 

78 

79 

80class PolicyHintParserProto(Protocol): 

81 def __call__( 81 ↛ exitline 81 didn't jump to the function exit

82 self, 

83 mi_factory: "MigrationItemFactory", 

84 hints: HintCollection, 

85 who: str, 

86 hint_type: "HintType", 

87 /, 

88 *args: str, 

89 ) -> None: ... 

90 

91 

92def split_into_one_hint_per_package( 

93 mi_factory: "MigrationItemFactory", 

94 hints: HintCollection, 

95 who: str, 

96 hint_type: "HintType", 

97 /, 

98 *args: str, 

99) -> None: 

100 for item in mi_factory.parse_items(*args): 

101 hints.add_hint(Hint(who, hint_type, [item])) 

102 

103 

104def single_hint_taking_list_of_packages( 

105 mi_factory: "MigrationItemFactory", 

106 hints: HintCollection, 

107 who: str, 

108 hint_type: "HintType", 

109 /, 

110 *args: str, 

111) -> None: 

112 hints.add_hint(Hint(who, hint_type, mi_factory.parse_items(*args))) 

113 

114 

115class HintType: 

116 def __init__( 

117 self, 

118 hint_name: str, 

119 parser_function: PolicyHintParserProto = split_into_one_hint_per_package, 

120 min_args: int = 1, 

121 versioned: HintAnnotate = HintAnnotate.REQUIRED, 

122 architectured: HintAnnotate = HintAnnotate.FORBIDDEN, 

123 ) -> None: 

124 assert min_args > 0 or hint_name == "remark", "min_args must be at least 1" 

125 self._hint_name = hint_name 

126 self._min_args = min_args 

127 self._parser_function = parser_function 

128 self._versioned = versioned 

129 self._architectured = architectured 

130 

131 @property 

132 def hint_name(self) -> str: 

133 return self._hint_name 

134 

135 @property 

136 def min_args(self) -> int: 

137 return self._min_args 

138 

139 @property 

140 def parser_function(self) -> PolicyHintParserProto: 

141 return self._parser_function 

142 

143 @property 

144 def versioned(self) -> HintAnnotate: 

145 return self._versioned 

146 

147 @property 

148 def architectured(self) -> HintAnnotate: 

149 return self._architectured 

150 

151 

152class Hint: 

153 def __init__( 

154 self, 

155 user: str, 

156 hint_type: HintType, 

157 packages: list["MigrationItem"], 

158 ) -> None: 

159 self._user = user 

160 self._active = True 

161 self._type = hint_type 

162 self._packages = packages 

163 

164 self.check() 

165 

166 def check(self) -> None: 

167 for package in self.packages: 

168 if ( 

169 self._type.versioned == HintAnnotate.FORBIDDEN 

170 and package.version is not None 

171 ): 

172 raise MalformedHintException( 

173 f'"{self.type}" needs unversioned packages, got "{package}"' 

174 ) 

175 elif ( 

176 self._type.versioned == HintAnnotate.REQUIRED 

177 and package.version is None 

178 ): 

179 raise MalformedHintException( 

180 f'"{self.type}" needs versioned packages, got "{package}"' 

181 ) 

182 if ( 

183 self._type.architectured == HintAnnotate.REQUIRED 

184 and package.architecture == "source" 

185 ): 

186 raise MalformedHintException( 

187 f'"{self.type}" needs to be architecture specific, got {package}' 

188 ) 

189 elif ( 

190 self._type.architectured == HintAnnotate.FORBIDDEN 

191 and package.architecture != "source" 

192 ): 

193 raise MalformedHintException( 

194 f'"{self.type}" must not be architecture specific, got {package}' 

195 ) 

196 

197 def set_active(self, active: bool) -> None: 

198 self._active = active 

199 

200 def __str__(self) -> str: 

201 if self._type.versioned == HintAnnotate.FORBIDDEN: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 return "{} {}".format(self.type, " ".join(x.uvname for x in self._packages)) 

203 else: 

204 return "{} {}".format(self.type, " ".join(x.name for x in self._packages)) 

205 

206 def __eq__(self, other: Any) -> bool: 

207 if self.type != other.type: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 return False 

209 else: 

210 # we can't use sets, because unversioned items cannot be hashed 

211 return sorted(self.packages) == sorted(other.packages) 

212 

213 @property 

214 def type(self) -> str: 

215 return self._type.hint_name 

216 

217 @property 

218 def packages(self) -> list["MigrationItem"]: 

219 return self._packages 

220 

221 @property 

222 def active(self) -> bool: 

223 return self._active 

224 

225 @property 

226 def user(self) -> str: 

227 return self._user 

228 

229 @property 

230 def package(self) -> str | None: 

231 if self.packages: 231 ↛ 235line 231 didn't jump to line 235 because the condition on line 231 was always true

232 assert len(self.packages) == 1, self.packages 

233 return self.packages[0].package 

234 else: 

235 return None 

236 

237 @property 

238 def version(self) -> str | None: 

239 if self.packages: 239 ↛ 243line 239 didn't jump to line 243 because the condition on line 239 was always true

240 assert len(self.packages) == 1, self.packages 

241 return self.packages[0].version 

242 else: 

243 return None 

244 

245 @property 

246 def architecture(self) -> str | None: 

247 if self.packages: 247 ↛ 251line 247 didn't jump to line 251 because the condition on line 247 was always true

248 assert len(self.packages) == 1, self.packages 

249 return self.packages[0].architecture 

250 else: 

251 return None 

252 

253 @property 

254 def suite(self) -> Optional["Suite"]: 

255 if self.packages: 255 ↛ 259line 255 didn't jump to line 259 because the condition on line 255 was always true

256 assert len(self.packages) == 1, self.packages 

257 return self.packages[0].suite 

258 else: 

259 return None 

260 

261 

262class HintParser: 

263 def __init__(self, mi_factory: "MigrationItemFactory") -> None: 

264 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

265 self.logger = logging.getLogger(logger_name) 

266 self.hints = HintCollection() 

267 self.mi_factory = mi_factory 

268 

269 self._aliases: dict[str, str] = { 

270 "approve": "unblock", 

271 } 

272 

273 FORBIDDEN = HintAnnotate.FORBIDDEN 

274 OPTIONAL = HintAnnotate.OPTIONAL 

275 self._hint_set: set[HintType] = set() 

276 # Migration grouping hints 

277 self.register_hint_type(HintType("remark", lambda *x: None, 0)) 277 ↛ exitline 277 didn't run the lambda on line 277

278 # Easy needs at least 2 to make sense 

279 self.register_hint_type( 

280 HintType( 

281 "easy", single_hint_taking_list_of_packages, 2, architectured=OPTIONAL 

282 ) 

283 ) 

284 self.register_hint_type( 

285 HintType( 

286 "force-hint", 

287 single_hint_taking_list_of_packages, 

288 architectured=OPTIONAL, 

289 ) 

290 ) 

291 self.register_hint_type( 

292 HintType( 

293 "hint", single_hint_taking_list_of_packages, architectured=OPTIONAL 

294 ) 

295 ) 

296 # Block / freeze related hints 

297 self.register_hint_type(HintType("block", versioned=FORBIDDEN)) 

298 self.register_hint_type(HintType("block-all", versioned=FORBIDDEN)) 

299 self.register_hint_type(HintType("block-udeb", versioned=FORBIDDEN)) 

300 self.register_hint_type(HintType("unblock", architectured=OPTIONAL)) 

301 self.register_hint_type(HintType("unblock-udeb", architectured=OPTIONAL)) 

302 # Other 

303 self.register_hint_type(HintType("remove")) 

304 self.register_hint_type(HintType("force", architectured=OPTIONAL)) 

305 self.register_hint_type( 

306 HintType("allow-uninst", versioned=FORBIDDEN, architectured=OPTIONAL) 

307 ) 

308 self.register_hint_type(HintType("allow-smooth-update")) 

309 

310 @property 

311 def registered_hint_names(self) -> set[str]: 

312 """A set of all known hint names (and aliases thereof)""" 

313 return set(chain((x.hint_name for x in self._hint_set), self._aliases.keys())) 

314 

315 @property 

316 def registered_hints(self) -> set[HintType]: 

317 """A set of all known hints""" 

318 return self._hint_set 

319 

320 def register_hint_type( 

321 self, 

322 hint_type: HintType, 

323 *, 

324 aliases: Iterable[str] | None = None, 

325 ) -> None: 

326 """Register a new hint that is supported by the parser 

327 

328 This registers a new hint that can be parsed by the hint parser. All hints are single words with a 

329 space-separated list of arguments (on a single line). The hint parser will do some basic processing, 

330 the permission checking and minor validation on the hint before passing it on to the parser function 

331 given. 

332 

333 The parser_function will receive the following arguments: 

334 * A hint collection 

335 * Identifier of the entity providing the hint 

336 * The hint_name (aliases will be mapped to the hint_name) 

337 * Zero or more string arguments for the hint (so the function needs to use *args) 

338 

339 The parser_function will then have to process the arguments and call the hint collection's "add_hint" 

340 as needed. Example implementations include "split_into_one_hint_per_package", which is used by almost 

341 all policy hints. 

342 

343 :param hint_type: The hint 

344 :param aliases: An optional iterable of aliases to the hint (use only for backwards compatibility) 

345 """ 

346 hint_name = hint_type.hint_name 

347 assert ( 

348 hint_name not in self._aliases 

349 ), f"The hint type {hint_name} is already registered as an alias of {self._aliases[hint_name]}" 

350 assert ( 

351 hint_name not in self.registered_hint_names 

352 ), f"The hint type {hint_name} is already registered" 

353 self._hint_set.add(hint_type) 

354 if aliases: 

355 for alias in aliases: 

356 self._aliases[alias] = hint_name 

357 

358 def parse_hints( 

359 self, 

360 who: str, 

361 permitted_hints: str | list[str], 

362 filename: str, 

363 lines: Iterable[str], 

364 ) -> None: 

365 reg_hints = self.registered_hints 

366 reg_hint_names = self.registered_hint_names 

367 line_no = 0 

368 hints = self.hints 

369 aliases = self._aliases 

370 mi_factory = self.mi_factory 

371 for line in lines: 

372 line = line.strip() 

373 line_no += 1 

374 if line == "" or line.startswith("#"): 

375 continue 

376 ln = line.split() 

377 hint_name = ln.pop(0) 

378 if hint_name in aliases: 

379 hint_name = aliases[hint_name] 

380 if hint_name == "finished": 

381 break 

382 if ( 

383 hint_name not in reg_hint_names 

384 ): # this includes aliases, but those are checked before 

385 self.logger.warning( 

386 "Unknown hint found in %s (line %d): '%s'", filename, line_no, line 

387 ) 

388 continue 

389 if hint_name not in permitted_hints and "ALL" not in permitted_hints: 

390 reason = "The hint is not a part of the permitted hints for " + who 

391 self.logger.info( 

392 'Ignoring "%s" hint from %s found in %s (line %d): %s', 

393 hint_name, 

394 who, 

395 filename, 

396 line_no, 

397 reason, 

398 ) 

399 continue 

400 hint_type = [x for x in reg_hints if x.hint_name == hint_name][0] 

401 min_args = hint_type.min_args 

402 if len(ln) < min_args: 

403 self.logger.warning( 

404 "Malformed hint found in %s (line %d): Needs at least %d argument(s), got %d", 

405 filename, 

406 line_no, 

407 min_args, 

408 len(ln), 

409 ) 

410 continue 

411 try: 

412 hint_type.parser_function(mi_factory, hints, who, hint_type, *ln) 

413 except MalformedHintException as e: 

414 self.logger.warning( 

415 'Malformed hint found in %s (line %d): "%s"', 

416 filename, 

417 line_no, 

418 e.args[0], 

419 ) 

420 continue