Coverage for britney2/hints.py: 91%
138 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-04-18 20:48 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-04-18 20:48 +0000
1# -*- coding: utf-8 -*-
3# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
15import logging
17from itertools import chain
20class MalformedHintException(Exception):
21 pass
24class HintCollection(object):
25 def __init__(self):
26 self._hints = []
28 @property
29 def is_empty(self):
30 return not self._hints
32 def __getitem__(self, type=None):
33 return self.search(type)
35 def search(self, type=None, onlyactive=True, package=None,
36 version=None, architecture=None, suite=None, removal=None):
38 return [hint for hint in self._hints if
39 (type is None or type == hint.type) and
40 (hint.active or not onlyactive) and
41 (package is None or package == hint.packages[0].package) and
42 (version is None or version == hint.packages[0].version) and
43 (architecture is None or architecture == hint.packages[0].architecture) and
44 (suite is None or suite == hint.packages[0].suite) and
45 (removal is None or removal == hint.packages[0].is_removal)]
47 def add_hint(self, hint):
48 self._hints.append(hint)
51class Hint(object):
52 NO_VERSION = ['block', 'block-all', 'block-udeb', 'allow-archall-maintainer-upload', 'allow-uninst']
54 def __init__(self, user, hint_type, packages):
55 self._user = user
56 self._active = True
57 self._type = hint_type
58 self._packages = packages
60 self.check()
62 def check(self):
63 for package in self.packages:
64 # TODO check if hint is allowed to specify architecture
65 if self.type in self.__class__.NO_VERSION:
66 if package.version is not None:
67 raise MalformedHintException("\"%s\" needs unversioned packages, got \"%s\"" % (self.type, package))
68 else:
69 if package.version is None: 69 ↛ 70line 69 didn't jump to line 70, because the condition on line 69 was never true
70 raise MalformedHintException("\"%s\" needs versioned packages, got \"%s\"" % (self.type, package))
72 def set_active(self, active):
73 self._active = active
75 def __str__(self):
76 if self.type in self.__class__.NO_VERSION:
77 return '%s %s' % (self._type, ' '.join(x.uvname for x in self._packages))
78 else:
79 return '%s %s' % (self._type, ' '.join(x.name for x in self._packages))
81 def __eq__(self, other):
82 if self.type != other.type: 82 ↛ 83line 82 didn't jump to line 83, because the condition on line 82 was never true
83 return False
84 else:
85 # we can't use sets, because unversioned items cannot be hashed
86 return sorted(self.packages) == sorted(other.packages)
88 @property
89 def type(self):
90 return self._type
92 @property
93 def packages(self):
94 return self._packages
96 @property
97 def active(self):
98 return self._active
100 @property
101 def user(self):
102 return self._user
104 @property
105 def package(self):
106 if self.packages: 106 ↛ 110line 106 didn't jump to line 110, because the condition on line 106 was never false
107 assert len(self.packages) == 1, self.packages
108 return self.packages[0].package
109 else:
110 return None
112 @property
113 def version(self):
114 if self.packages: 114 ↛ 118line 114 didn't jump to line 118, because the condition on line 114 was never false
115 assert len(self.packages) == 1, self.packages
116 return self.packages[0].version
117 else:
118 return None
120 @property
121 def architecture(self):
122 if self.packages: 122 ↛ 126line 122 didn't jump to line 126, because the condition on line 122 was never false
123 assert len(self.packages) == 1, self.packages
124 return self.packages[0].architecture
125 else:
126 return None
128 @property
129 def suite(self):
130 if self.packages: 130 ↛ 134line 130 didn't jump to line 134, because the condition on line 130 was never false
131 assert len(self.packages) == 1, self.packages
132 return self.packages[0].suite
133 else:
134 return None
137def split_into_one_hint_per_package(mi_factory, hints, who, hint_name, *args):
138 for item in mi_factory.parse_items(*args):
139 hints.add_hint(Hint(who, hint_name, [item]))
142def single_hint_taking_list_of_packages(mi_factory, hints, who, hint_type, *args):
143 hints.add_hint(Hint(who, hint_type, mi_factory.parse_items(*args)))
146class HintParser(object):
148 def __init__(self, mi_factory):
149 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
150 self.logger = logging.getLogger(logger_name)
151 self.hints = HintCollection()
152 self.mi_factory = mi_factory
153 self._hint_table = { 153 ↛ exitline 153 didn't jump to the function exit
154 'remark': (0, lambda *x: None),
156 # Migration grouping hints
157 'easy': (2, single_hint_taking_list_of_packages), # Easy needs at least 2 to make sense
158 'force-hint': (1, single_hint_taking_list_of_packages),
159 'hint': (1, single_hint_taking_list_of_packages),
161 # Block / freeze related hints
162 'block': (1, split_into_one_hint_per_package),
163 'block-all': (1, split_into_one_hint_per_package),
164 'block-udeb': (1, split_into_one_hint_per_package),
165 'unblock': (1, split_into_one_hint_per_package),
166 'unblock-udeb': (1, split_into_one_hint_per_package),
168 # Other
169 'remove': (1, split_into_one_hint_per_package),
170 'force': (1, split_into_one_hint_per_package),
171 'allow-uninst': (1, split_into_one_hint_per_package),
172 'allow-smooth-update': (1, split_into_one_hint_per_package),
173 }
174 self._aliases = {
175 'approve': 'unblock',
176 }
178 @property
179 def registered_hints(self):
180 """A set of all known hints (and aliases thereof)"""
181 return set(chain(self._hint_table.keys(), self._aliases.keys()))
183 def register_hint_type(self, hint_name: str, parser_function, *, min_args=1, aliases=None) -> None:
184 """Register a new hint that is supported by the parser
186 This registers a new hint that can be parsed by the hint parser. All hints are single words with a
187 space-separated list of arguments (on a single line). The hint parser will do some basic processing,
188 the permission checking and minor validation on the hint before passing it on to the parser function
189 given.
191 The parser_function will receive the following arguments:
192 * A hint collection
193 * Identifier of the entity providing the hint
194 * The hint_name (aliases will be mapped to the hint_name)
195 * Zero or more string arguments for the hint (so the function needs to use *args)
197 The parser_function will then have to process the arguments and call the hint collection's "add_hint"
198 as needed. Example implementations include "split_into_one_hint_per_package", which is used by almost
199 all policy hints.
201 :param hint_name: The name of the hint
202 :param parser_function: A function to add the hint
203 :param min_args: An optional positive integer (non-zero) denoting the number of arguments the hint takes.
204 :param aliases: An optional iterable of aliases to the hint (use only for backwards compatibility)
205 """
206 if min_args < 1: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true
207 raise ValueError("min_args must be at least 1")
208 if hint_name in self._hint_table: 208 ↛ 209line 208 didn't jump to line 209, because the condition on line 208 was never true
209 raise ValueError("The hint type %s is already registered" % hint_name)
210 if hint_name in self._aliases: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true
211 raise ValueError("The hint type %s is already registered as an alias of %s" % (
212 hint_name, self._aliases[hint_name]))
213 self._hint_table[hint_name] = (min_args, parser_function)
214 if aliases:
215 for alias in aliases:
216 self._aliases[alias] = hint_name
218 def parse_hints(self, who, permitted_hints, filename, lines):
219 hint_table = self._hint_table
220 line_no = 0
221 hints = self.hints
222 aliases = self._aliases
223 mi_factory = self.mi_factory
224 for line in lines:
225 line = line.strip()
226 line_no += 1
227 if line == "" or line.startswith('#'):
228 continue
229 ln = line.split()
230 hint_name = ln[0]
231 if hint_name in aliases:
232 hint_name = aliases[hint_name]
233 ln[0] = hint_name
234 if hint_name == 'finished':
235 break
236 if hint_name not in hint_table:
237 self.logger.warning("Unknown hint found in %s (line %d): '%s'", filename, line_no, line)
238 continue
239 if hint_name not in permitted_hints and 'ALL' not in permitted_hints:
240 reason = 'The hint is not a part of the permitted hints for ' + who
241 self.logger.info("Ignoring \"%s\" hint from %s found in %s (line %d): %s",
242 hint_name, who, filename, line_no, reason)
243 continue
244 min_args, hint_parser_impl = hint_table[hint_name]
245 if len(ln) - 1 < min_args:
246 self.logger.warning("Malformed hint found in %s (line %d): Needs at least %d argument(s), got %d",
247 filename, line_no, min_args, len(ln) - 1)
248 continue
249 try:
250 hint_parser_impl(mi_factory, hints, who, *ln)
251 except MalformedHintException as e:
252 self.logger.warning("Malformed hint found in %s (line %d): \"%s\"", filename, line_no, e.args[0])
253 continue