Coverage for britney2/hints.py: 89%
145 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
1# -*- coding: utf-8 -*-
3# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
15from collections.abc import Callable, Iterable
16import logging
17from itertools import chain
18from typing import TYPE_CHECKING, Any, Optional, Protocol, Union
20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21, because the condition on line 20 was never true
21 from . import Suite
22 from .migrationitem import MigrationItem, MigrationItemFactory
25class MalformedHintException(Exception):
26 pass
29class HintCollection(object):
30 def __init__(self) -> None:
31 self._hints: list["Hint"] = []
33 @property
34 def is_empty(self) -> bool:
35 return not self._hints
37 def __getitem__(self, type: Optional[str] = None) -> list["Hint"]:
38 return self.search(type)
40 def search(
41 self,
42 type: Optional[str] = None,
43 onlyactive: bool = True,
44 package: Optional[str] = None,
45 version: Optional[str] = None,
46 architecture: Optional[str] = None,
47 suite: Optional["Suite"] = None,
48 removal: Optional[bool] = None,
49 ) -> list["Hint"]:
50 return [
51 hint
52 for hint in self._hints
53 if (type is None or type == hint.type)
54 and (hint.active or not onlyactive)
55 and (package is None or package == hint.packages[0].package)
56 and (version is None or version == hint.packages[0].version)
57 and (architecture is None or architecture == hint.packages[0].architecture)
58 and (suite is None or suite == hint.packages[0].suite)
59 and (removal is None or removal == hint.packages[0].is_removal)
60 ]
62 def add_hint(self, hint: "Hint") -> None:
63 self._hints.append(hint)
66class Hint(object):
67 NO_VERSION = [
68 "block",
69 "block-all",
70 "block-udeb",
71 "allow-archall-maintainer-upload",
72 "allow-uninst",
73 ]
75 def __init__(
76 self, user: str, hint_type: str, packages: list["MigrationItem"]
77 ) -> None:
78 self._user = user
79 self._active = True
80 self._type = hint_type
81 self._packages = packages
83 self.check()
85 def check(self) -> None:
86 for package in self.packages:
87 # TODO check if hint is allowed to specify architecture
88 if self.type in self.__class__.NO_VERSION:
89 if package.version is not None:
90 raise MalformedHintException(
91 '"%s" needs unversioned packages, got "%s"'
92 % (self.type, package)
93 )
94 else:
95 if package.version is None: 95 ↛ 96line 95 didn't jump to line 96, because the condition on line 95 was never true
96 raise MalformedHintException(
97 '"%s" needs versioned packages, got "%s"' % (self.type, package)
98 )
100 def set_active(self, active: bool) -> None:
101 self._active = active
103 def __str__(self) -> str:
104 if self.type in self.__class__.NO_VERSION:
105 return "%s %s" % (self._type, " ".join(x.uvname for x in self._packages))
106 else:
107 return "%s %s" % (self._type, " ".join(x.name for x in self._packages))
109 def __eq__(self, other: Any) -> bool:
110 if self.type != other.type: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true
111 return False
112 else:
113 # we can't use sets, because unversioned items cannot be hashed
114 return sorted(self.packages) == sorted(other.packages)
116 @property
117 def type(self) -> str:
118 return self._type
120 @property
121 def packages(self) -> list["MigrationItem"]:
122 return self._packages
124 @property
125 def active(self) -> bool:
126 return self._active
128 @property
129 def user(self) -> str:
130 return self._user
132 @property
133 def package(self) -> Optional[str]:
134 if self.packages: 134 ↛ 138line 134 didn't jump to line 138, because the condition on line 134 was never false
135 assert len(self.packages) == 1, self.packages
136 return self.packages[0].package
137 else:
138 return None
140 @property
141 def version(self) -> Optional[str]:
142 if self.packages: 142 ↛ 146line 142 didn't jump to line 146, because the condition on line 142 was never false
143 assert len(self.packages) == 1, self.packages
144 return self.packages[0].version
145 else:
146 return None
148 @property
149 def architecture(self) -> Optional[str]:
150 if self.packages: 150 ↛ 154line 150 didn't jump to line 154, because the condition on line 150 was never false
151 assert len(self.packages) == 1, self.packages
152 return self.packages[0].architecture
153 else:
154 return None
156 @property
157 def suite(self) -> Optional["Suite"]:
158 if self.packages: 158 ↛ 162line 158 didn't jump to line 162, because the condition on line 158 was never false
159 assert len(self.packages) == 1, self.packages
160 return self.packages[0].suite
161 else:
162 return None
165class PolicyHintParserProto(Protocol):
166 def __call__( 166 ↛ exitline 166 didn't jump to the function exit
167 self,
168 mi_factory: "MigrationItemFactory",
169 hints: HintCollection,
170 who: str,
171 hint_name: str,
172 /,
173 *args: str,
174 ) -> None: ...
177def split_into_one_hint_per_package(
178 mi_factory: "MigrationItemFactory",
179 hints: HintCollection,
180 who: str,
181 hint_name: str,
182 /,
183 *args: str,
184) -> None:
185 for item in mi_factory.parse_items(*args):
186 hints.add_hint(Hint(who, hint_name, [item]))
189def single_hint_taking_list_of_packages(
190 mi_factory: "MigrationItemFactory",
191 hints: HintCollection,
192 who: str,
193 hint_type: str,
194 /,
195 *args: str,
196) -> None:
197 hints.add_hint(Hint(who, hint_type, mi_factory.parse_items(*args)))
200class HintParser(object):
201 def __init__(self, mi_factory: "MigrationItemFactory") -> None:
202 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
203 self.logger = logging.getLogger(logger_name)
204 self.hints = HintCollection()
205 self.mi_factory = mi_factory
206 self._hint_table: dict[ 206 ↛ exitline 206 didn't jump to the function exit
207 str,
208 tuple[int, PolicyHintParserProto],
209 ] = {
210 "remark": (0, lambda *x: None),
211 # Migration grouping hints
212 "easy": (
213 2,
214 single_hint_taking_list_of_packages,
215 ), # Easy needs at least 2 to make sense
216 "force-hint": (1, single_hint_taking_list_of_packages),
217 "hint": (1, single_hint_taking_list_of_packages),
218 # Block / freeze related hints
219 "block": (1, split_into_one_hint_per_package),
220 "block-all": (1, split_into_one_hint_per_package),
221 "block-udeb": (1, split_into_one_hint_per_package),
222 "unblock": (1, split_into_one_hint_per_package),
223 "unblock-udeb": (1, split_into_one_hint_per_package),
224 # Other
225 "remove": (1, split_into_one_hint_per_package),
226 "force": (1, split_into_one_hint_per_package),
227 "allow-uninst": (1, split_into_one_hint_per_package),
228 "allow-smooth-update": (1, split_into_one_hint_per_package),
229 }
230 self._aliases: dict[str, str] = {
231 "approve": "unblock",
232 }
234 @property
235 def registered_hints(self) -> set[str]:
236 """A set of all known hints (and aliases thereof)"""
237 return set(chain(self._hint_table.keys(), self._aliases.keys()))
239 def register_hint_type(
240 self,
241 hint_name: str,
242 parser_function: PolicyHintParserProto,
243 *,
244 min_args: int = 1,
245 aliases: Optional[Iterable[str]] = None,
246 ) -> None:
247 """Register a new hint that is supported by the parser
249 This registers a new hint that can be parsed by the hint parser. All hints are single words with a
250 space-separated list of arguments (on a single line). The hint parser will do some basic processing,
251 the permission checking and minor validation on the hint before passing it on to the parser function
252 given.
254 The parser_function will receive the following arguments:
255 * A hint collection
256 * Identifier of the entity providing the hint
257 * The hint_name (aliases will be mapped to the hint_name)
258 * Zero or more string arguments for the hint (so the function needs to use *args)
260 The parser_function will then have to process the arguments and call the hint collection's "add_hint"
261 as needed. Example implementations include "split_into_one_hint_per_package", which is used by almost
262 all policy hints.
264 :param hint_name: The name of the hint
265 :param parser_function: A function to add the hint
266 :param min_args: An optional positive integer (non-zero) denoting the number of arguments the hint takes.
267 :param aliases: An optional iterable of aliases to the hint (use only for backwards compatibility)
268 """
269 if min_args < 1: 269 ↛ 270line 269 didn't jump to line 270, because the condition on line 269 was never true
270 raise ValueError("min_args must be at least 1")
271 if hint_name in self._hint_table: 271 ↛ 272line 271 didn't jump to line 272, because the condition on line 271 was never true
272 raise ValueError("The hint type %s is already registered" % hint_name)
273 if hint_name in self._aliases: 273 ↛ 274line 273 didn't jump to line 274, because the condition on line 273 was never true
274 raise ValueError(
275 "The hint type %s is already registered as an alias of %s"
276 % (hint_name, self._aliases[hint_name])
277 )
278 self._hint_table[hint_name] = (min_args, parser_function)
279 if aliases:
280 for alias in aliases:
281 self._aliases[alias] = hint_name
283 def parse_hints(
284 self,
285 who: str,
286 permitted_hints: Union[str, list[str]],
287 filename: str,
288 lines: Iterable[str],
289 ) -> None:
290 hint_table = self._hint_table
291 line_no = 0
292 hints = self.hints
293 aliases = self._aliases
294 mi_factory = self.mi_factory
295 for line in lines:
296 line = line.strip()
297 line_no += 1
298 if line == "" or line.startswith("#"):
299 continue
300 ln = line.split()
301 hint_name = ln[0]
302 if hint_name in aliases:
303 hint_name = aliases[hint_name]
304 ln[0] = hint_name
305 if hint_name == "finished":
306 break
307 if hint_name not in hint_table:
308 self.logger.warning(
309 "Unknown hint found in %s (line %d): '%s'", filename, line_no, line
310 )
311 continue
312 if hint_name not in permitted_hints and "ALL" not in permitted_hints:
313 reason = "The hint is not a part of the permitted hints for " + who
314 self.logger.info(
315 'Ignoring "%s" hint from %s found in %s (line %d): %s',
316 hint_name,
317 who,
318 filename,
319 line_no,
320 reason,
321 )
322 continue
323 min_args, hint_parser_impl = hint_table[hint_name]
324 if len(ln) - 1 < min_args:
325 self.logger.warning(
326 "Malformed hint found in %s (line %d): Needs at least %d argument(s), got %d",
327 filename,
328 line_no,
329 min_args,
330 len(ln) - 1,
331 )
332 continue
333 try:
334 hint_parser_impl(mi_factory, hints, who, *ln)
335 except MalformedHintException as e:
336 self.logger.warning(
337 'Malformed hint found in %s (line %d): "%s"',
338 filename,
339 line_no,
340 e.args[0],
341 )
342 continue