Coverage for britney2/hints.py: 89%

145 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-03-23 07:34 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org> 

4 

5# This program is free software; you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation; either version 2 of the License, or 

8# (at your option) any later version. 

9 

10# This program is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14 

15from collections.abc import Callable, Iterable 

16import logging 

17from itertools import chain 

18from typing import TYPE_CHECKING, Any, Optional, Protocol, Union 

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21, because the condition on line 20 was never true

21 from . import Suite 

22 from .migrationitem import MigrationItem, MigrationItemFactory 

23 

24 

25class MalformedHintException(Exception): 

26 pass 

27 

28 

29class HintCollection(object): 

30 def __init__(self) -> None: 

31 self._hints: list["Hint"] = [] 

32 

33 @property 

34 def is_empty(self) -> bool: 

35 return not self._hints 

36 

37 def __getitem__(self, type: Optional[str] = None) -> list["Hint"]: 

38 return self.search(type) 

39 

40 def search( 

41 self, 

42 type: Optional[str] = None, 

43 onlyactive: bool = True, 

44 package: Optional[str] = None, 

45 version: Optional[str] = None, 

46 architecture: Optional[str] = None, 

47 suite: Optional["Suite"] = None, 

48 removal: Optional[bool] = None, 

49 ) -> list["Hint"]: 

50 return [ 

51 hint 

52 for hint in self._hints 

53 if (type is None or type == hint.type) 

54 and (hint.active or not onlyactive) 

55 and (package is None or package == hint.packages[0].package) 

56 and (version is None or version == hint.packages[0].version) 

57 and (architecture is None or architecture == hint.packages[0].architecture) 

58 and (suite is None or suite == hint.packages[0].suite) 

59 and (removal is None or removal == hint.packages[0].is_removal) 

60 ] 

61 

62 def add_hint(self, hint: "Hint") -> None: 

63 self._hints.append(hint) 

64 

65 

66class Hint(object): 

67 NO_VERSION = [ 

68 "block", 

69 "block-all", 

70 "block-udeb", 

71 "allow-archall-maintainer-upload", 

72 "allow-uninst", 

73 ] 

74 

75 def __init__( 

76 self, user: str, hint_type: str, packages: list["MigrationItem"] 

77 ) -> None: 

78 self._user = user 

79 self._active = True 

80 self._type = hint_type 

81 self._packages = packages 

82 

83 self.check() 

84 

85 def check(self) -> None: 

86 for package in self.packages: 

87 # TODO check if hint is allowed to specify architecture 

88 if self.type in self.__class__.NO_VERSION: 

89 if package.version is not None: 

90 raise MalformedHintException( 

91 '"%s" needs unversioned packages, got "%s"' 

92 % (self.type, package) 

93 ) 

94 else: 

95 if package.version is None: 95 ↛ 96line 95 didn't jump to line 96, because the condition on line 95 was never true

96 raise MalformedHintException( 

97 '"%s" needs versioned packages, got "%s"' % (self.type, package) 

98 ) 

99 

100 def set_active(self, active: bool) -> None: 

101 self._active = active 

102 

103 def __str__(self) -> str: 

104 if self.type in self.__class__.NO_VERSION: 

105 return "%s %s" % (self._type, " ".join(x.uvname for x in self._packages)) 

106 else: 

107 return "%s %s" % (self._type, " ".join(x.name for x in self._packages)) 

108 

109 def __eq__(self, other: Any) -> bool: 

110 if self.type != other.type: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true

111 return False 

112 else: 

113 # we can't use sets, because unversioned items cannot be hashed 

114 return sorted(self.packages) == sorted(other.packages) 

115 

116 @property 

117 def type(self) -> str: 

118 return self._type 

119 

120 @property 

121 def packages(self) -> list["MigrationItem"]: 

122 return self._packages 

123 

124 @property 

125 def active(self) -> bool: 

126 return self._active 

127 

128 @property 

129 def user(self) -> str: 

130 return self._user 

131 

132 @property 

133 def package(self) -> Optional[str]: 

134 if self.packages: 134 ↛ 138line 134 didn't jump to line 138, because the condition on line 134 was never false

135 assert len(self.packages) == 1, self.packages 

136 return self.packages[0].package 

137 else: 

138 return None 

139 

140 @property 

141 def version(self) -> Optional[str]: 

142 if self.packages: 142 ↛ 146line 142 didn't jump to line 146, because the condition on line 142 was never false

143 assert len(self.packages) == 1, self.packages 

144 return self.packages[0].version 

145 else: 

146 return None 

147 

148 @property 

149 def architecture(self) -> Optional[str]: 

150 if self.packages: 150 ↛ 154line 150 didn't jump to line 154, because the condition on line 150 was never false

151 assert len(self.packages) == 1, self.packages 

152 return self.packages[0].architecture 

153 else: 

154 return None 

155 

156 @property 

157 def suite(self) -> Optional["Suite"]: 

158 if self.packages: 158 ↛ 162line 158 didn't jump to line 162, because the condition on line 158 was never false

159 assert len(self.packages) == 1, self.packages 

160 return self.packages[0].suite 

161 else: 

162 return None 

163 

164 

165class PolicyHintParserProto(Protocol): 

166 def __call__( 166 ↛ exitline 166 didn't jump to the function exit

167 self, 

168 mi_factory: "MigrationItemFactory", 

169 hints: HintCollection, 

170 who: str, 

171 hint_name: str, 

172 /, 

173 *args: str, 

174 ) -> None: ... 

175 

176 

177def split_into_one_hint_per_package( 

178 mi_factory: "MigrationItemFactory", 

179 hints: HintCollection, 

180 who: str, 

181 hint_name: str, 

182 /, 

183 *args: str, 

184) -> None: 

185 for item in mi_factory.parse_items(*args): 

186 hints.add_hint(Hint(who, hint_name, [item])) 

187 

188 

189def single_hint_taking_list_of_packages( 

190 mi_factory: "MigrationItemFactory", 

191 hints: HintCollection, 

192 who: str, 

193 hint_type: str, 

194 /, 

195 *args: str, 

196) -> None: 

197 hints.add_hint(Hint(who, hint_type, mi_factory.parse_items(*args))) 

198 

199 

200class HintParser(object): 

201 def __init__(self, mi_factory: "MigrationItemFactory") -> None: 

202 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

203 self.logger = logging.getLogger(logger_name) 

204 self.hints = HintCollection() 

205 self.mi_factory = mi_factory 

206 self._hint_table: dict[ 206 ↛ exitline 206 didn't jump to the function exit

207 str, 

208 tuple[int, PolicyHintParserProto], 

209 ] = { 

210 "remark": (0, lambda *x: None), 

211 # Migration grouping hints 

212 "easy": ( 

213 2, 

214 single_hint_taking_list_of_packages, 

215 ), # Easy needs at least 2 to make sense 

216 "force-hint": (1, single_hint_taking_list_of_packages), 

217 "hint": (1, single_hint_taking_list_of_packages), 

218 # Block / freeze related hints 

219 "block": (1, split_into_one_hint_per_package), 

220 "block-all": (1, split_into_one_hint_per_package), 

221 "block-udeb": (1, split_into_one_hint_per_package), 

222 "unblock": (1, split_into_one_hint_per_package), 

223 "unblock-udeb": (1, split_into_one_hint_per_package), 

224 # Other 

225 "remove": (1, split_into_one_hint_per_package), 

226 "force": (1, split_into_one_hint_per_package), 

227 "allow-uninst": (1, split_into_one_hint_per_package), 

228 "allow-smooth-update": (1, split_into_one_hint_per_package), 

229 } 

230 self._aliases: dict[str, str] = { 

231 "approve": "unblock", 

232 } 

233 

234 @property 

235 def registered_hints(self) -> set[str]: 

236 """A set of all known hints (and aliases thereof)""" 

237 return set(chain(self._hint_table.keys(), self._aliases.keys())) 

238 

239 def register_hint_type( 

240 self, 

241 hint_name: str, 

242 parser_function: PolicyHintParserProto, 

243 *, 

244 min_args: int = 1, 

245 aliases: Optional[Iterable[str]] = None, 

246 ) -> None: 

247 """Register a new hint that is supported by the parser 

248 

249 This registers a new hint that can be parsed by the hint parser. All hints are single words with a 

250 space-separated list of arguments (on a single line). The hint parser will do some basic processing, 

251 the permission checking and minor validation on the hint before passing it on to the parser function 

252 given. 

253 

254 The parser_function will receive the following arguments: 

255 * A hint collection 

256 * Identifier of the entity providing the hint 

257 * The hint_name (aliases will be mapped to the hint_name) 

258 * Zero or more string arguments for the hint (so the function needs to use *args) 

259 

260 The parser_function will then have to process the arguments and call the hint collection's "add_hint" 

261 as needed. Example implementations include "split_into_one_hint_per_package", which is used by almost 

262 all policy hints. 

263 

264 :param hint_name: The name of the hint 

265 :param parser_function: A function to add the hint 

266 :param min_args: An optional positive integer (non-zero) denoting the number of arguments the hint takes. 

267 :param aliases: An optional iterable of aliases to the hint (use only for backwards compatibility) 

268 """ 

269 if min_args < 1: 269 ↛ 270line 269 didn't jump to line 270, because the condition on line 269 was never true

270 raise ValueError("min_args must be at least 1") 

271 if hint_name in self._hint_table: 271 ↛ 272line 271 didn't jump to line 272, because the condition on line 271 was never true

272 raise ValueError("The hint type %s is already registered" % hint_name) 

273 if hint_name in self._aliases: 273 ↛ 274line 273 didn't jump to line 274, because the condition on line 273 was never true

274 raise ValueError( 

275 "The hint type %s is already registered as an alias of %s" 

276 % (hint_name, self._aliases[hint_name]) 

277 ) 

278 self._hint_table[hint_name] = (min_args, parser_function) 

279 if aliases: 

280 for alias in aliases: 

281 self._aliases[alias] = hint_name 

282 

283 def parse_hints( 

284 self, 

285 who: str, 

286 permitted_hints: Union[str, list[str]], 

287 filename: str, 

288 lines: Iterable[str], 

289 ) -> None: 

290 hint_table = self._hint_table 

291 line_no = 0 

292 hints = self.hints 

293 aliases = self._aliases 

294 mi_factory = self.mi_factory 

295 for line in lines: 

296 line = line.strip() 

297 line_no += 1 

298 if line == "" or line.startswith("#"): 

299 continue 

300 ln = line.split() 

301 hint_name = ln[0] 

302 if hint_name in aliases: 

303 hint_name = aliases[hint_name] 

304 ln[0] = hint_name 

305 if hint_name == "finished": 

306 break 

307 if hint_name not in hint_table: 

308 self.logger.warning( 

309 "Unknown hint found in %s (line %d): '%s'", filename, line_no, line 

310 ) 

311 continue 

312 if hint_name not in permitted_hints and "ALL" not in permitted_hints: 

313 reason = "The hint is not a part of the permitted hints for " + who 

314 self.logger.info( 

315 'Ignoring "%s" hint from %s found in %s (line %d): %s', 

316 hint_name, 

317 who, 

318 filename, 

319 line_no, 

320 reason, 

321 ) 

322 continue 

323 min_args, hint_parser_impl = hint_table[hint_name] 

324 if len(ln) - 1 < min_args: 

325 self.logger.warning( 

326 "Malformed hint found in %s (line %d): Needs at least %d argument(s), got %d", 

327 filename, 

328 line_no, 

329 min_args, 

330 len(ln) - 1, 

331 ) 

332 continue 

333 try: 

334 hint_parser_impl(mi_factory, hints, who, *ln) 

335 except MalformedHintException as e: 

336 self.logger.warning( 

337 'Malformed hint found in %s (line %d): "%s"', 

338 filename, 

339 line_no, 

340 e.args[0], 

341 ) 

342 continue