Coverage for britney2/__init__.py: 95%

181 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-10-17 17:32 +0000

1import logging 

2from collections import namedtuple 

3from collections.abc import Iterable, Iterator 

4from enum import Enum, unique 

5from typing import TYPE_CHECKING, Any, NamedTuple, Optional 

6 

7if TYPE_CHECKING: 7 ↛ 8line 7 didn't jump to line 8 because the condition on line 7 was never true

8 from .installability.tester import InstallabilityTester 

9 

10 

11class DependencyType(Enum): 

12 DEPENDS = ("Depends", "depends", "dependency") 

13 # BUILD_DEPENDS includes BUILD_DEPENDS_ARCH 

14 BUILD_DEPENDS = ("Build-Depends(-Arch)", "build-depends", "build-dependency") 

15 BUILD_DEPENDS_INDEP = ( 

16 "Build-Depends-Indep", 

17 "build-depends-indep", 

18 "build-dependency (indep)", 

19 ) 

20 BUILT_USING = ("Built-Using", "built-using", "built-using") 

21 # Pseudo dependency where Breaks/Conflicts effectively become a inverted dependency. E.g. 

22 # p Depends on q plus q/2 breaks p/1 implies that p/2 must migrate before q/2 can migrate 

23 # (or they go at the same time). 

24 # - can also happen with version ranges 

25 IMPLICIT_DEPENDENCY = ( 

26 "Implicit dependency", 

27 "implicit-dependency", 

28 "implicit-dependency", 

29 ) 

30 

31 def __str__(self) -> str: 

32 return self.value[0] 

33 

34 def get_reason(self) -> str: 

35 return self.value[1] 

36 

37 def get_description(self) -> str: 

38 return self.value[2] 

39 

40 

41@unique 

42class SuiteClass(Enum): 

43 TARGET_SUITE = (False, False) 

44 PRIMARY_SOURCE_SUITE = (True, True) 

45 ADDITIONAL_SOURCE_SUITE = (True, False) 

46 

47 @property 

48 def is_source(self) -> bool: 

49 return self.value[0] 

50 

51 @property 

52 def is_target(self) -> bool: 

53 return not self.is_source 

54 

55 @property 

56 def is_primary_source(self) -> bool: 

57 return self is SuiteClass.PRIMARY_SOURCE_SUITE 

58 

59 @property 

60 def is_additional_source(self) -> bool: 

61 return self is SuiteClass.ADDITIONAL_SOURCE_SUITE 

62 

63 

64class Suite: 

65 def __init__( 

66 self, 

67 suite_class: SuiteClass, 

68 name: str, 

69 path: str, 

70 suite_short_name: str | None = None, 

71 ) -> None: 

72 self.suite_class = suite_class 

73 self.name = name 

74 self.codename = name 

75 self.path = path 

76 self.suite_short_name = suite_short_name if suite_short_name else "" 

77 self.sources: dict[str, SourcePackage] = {} 

78 self._binaries: dict[str, dict[str, BinaryPackage]] = {} 

79 self.provides_table: dict[str, dict[str, set[tuple[str, str]]]] = {} 

80 self._all_binaries_in_suite: dict[BinaryPackageId, BinaryPackage] | None = None 

81 

82 @property 

83 def excuses_suffix(self) -> str: 

84 return self.suite_short_name 

85 

86 @property 

87 def binaries(self) -> dict[str, dict[str, "BinaryPackage"]]: 

88 # TODO some callers modify this structure, which doesn't invalidate 

89 # the self._all_binaries_in_suite cache 

90 return self._binaries 

91 

92 @binaries.setter 

93 def binaries(self, binaries: dict[str, dict[str, "BinaryPackage"]]) -> None: 

94 self._binaries = binaries 

95 self._all_binaries_in_suite = None 

96 

97 @property 

98 def all_binaries_in_suite(self) -> dict["BinaryPackageId", "BinaryPackage"]: 

99 if not self._all_binaries_in_suite: 

100 self._all_binaries_in_suite = { 

101 x.pkg_id: x for a in self._binaries for x in self._binaries[a].values() 

102 } 

103 return self._all_binaries_in_suite 

104 

105 def any_of_these_are_in_the_suite(self, pkgs: Iterable["BinaryPackageId"]) -> bool: 

106 """Test if at least one package of a given set is in the suite 

107 

108 :return: True if any of the packages in pkgs are currently in the suite 

109 """ 

110 return not self.all_binaries_in_suite.keys().isdisjoint(pkgs) 

111 

112 def is_pkg_in_the_suite(self, pkg_id: "BinaryPackageId") -> bool: 

113 """Test if the package of is in testing 

114 

115 :return: True if the pkg is currently in the suite 

116 """ 

117 return pkg_id in self.all_binaries_in_suite 

118 

119 def which_of_these_are_in_the_suite( 

120 self, pkgs: Iterable["BinaryPackageId"] 

121 ) -> Iterator["BinaryPackageId"]: 

122 """Iterate over all packages that are in the suite 

123 

124 :return: An iterable of package ids that are in the suite 

125 """ 

126 yield from (x for x in pkgs if x in self.all_binaries_in_suite) 

127 

128 def is_cruft(self, pkg: "BinaryPackage") -> bool: 

129 """Check if the package is cruft in the suite 

130 

131 :param pkg: which BinaryPackage to check 

132 Note that this package is assumed to be in the suite 

133 """ 

134 newest_src_in_suite = self.sources[pkg.source] 

135 return pkg.source_version != newest_src_in_suite.version 

136 

137 

138class TargetSuite(Suite): 

139 inst_tester: "InstallabilityTester" 

140 

141 def __init__(self, *args: Any, **kwargs: Any) -> None: 

142 super().__init__(*args, **kwargs) 

143 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

144 self._logger = logging.getLogger(logger_name) 

145 

146 def is_installable(self, pkg_id: "BinaryPackageId") -> bool: 

147 """Determine whether the given package can be installed in the suite 

148 

149 :param pkg_id: A BinaryPackageId 

150 :return: True if the pkg is currently installable in the suite 

151 """ 

152 return self.inst_tester.is_installable(pkg_id) 

153 

154 def add_binary(self, pkg_id: "BinaryPackageId") -> None: 

155 """Add a binary package to the suite 

156 

157 :param pkg_id: The id of the package 

158 :raises KeyError: if the package is not known 

159 """ 

160 

161 # TODO The calling code currently manually updates the contents of 

162 # target_suite.binaries when this is called. It would probably make 

163 # more sense to do that here instead 

164 self.inst_tester.add_binary(pkg_id) 

165 self._all_binaries_in_suite = None 

166 

167 def remove_binary(self, pkg_id: "BinaryPackageId") -> None: 

168 """Remove a binary from the suite 

169 

170 :param pkg_id: The id of the package 

171 :raises KeyError: if the package is not known 

172 """ 

173 

174 # TODO The calling code currently manually updates the contents of 

175 # target_suite.binaries when this is called. It would probably make 

176 # more sense to do that here instead 

177 self.inst_tester.remove_binary(pkg_id) 

178 self._all_binaries_in_suite = None 

179 

180 def check_suite_source_pkg_consistency(self, comment: str) -> None: 

181 sources_t = self.sources 

182 binaries_t = self.binaries 

183 logger = self._logger 

184 issues_found = False 

185 

186 logger.info("check_target_suite_source_pkg_consistency %s", comment) 

187 

188 for arch in binaries_t: 

189 for pkg_name in binaries_t[arch]: 

190 pkg = binaries_t[arch][pkg_name] 

191 src = pkg.source 

192 

193 if src not in sources_t: # pragma: no cover 

194 issues_found = True 

195 logger.error( 

196 "inconsistency found (%s): src %s not in target, target has pkg %s with source %s" 

197 % (comment, src, pkg_name, src) 

198 ) 

199 

200 for src in sources_t: 

201 source_data = sources_t[src] 

202 for pkg_id in source_data.binaries: 

203 binary, _, parch = pkg_id 

204 if binary not in binaries_t[parch]: # pragma: no cover 

205 issues_found = True 

206 logger.error( 

207 "inconsistency found (%s): binary %s from source %s not in binaries_t[%s]" 

208 % (comment, binary, src, parch) 

209 ) 

210 

211 if issues_found: # pragma: no cover 

212 raise AssertionError("inconsistencies found in target suite") 

213 

214 

215class Suites: 

216 def __init__(self, target_suite: TargetSuite, source_suites: list[Suite]) -> None: 

217 self._suites: dict[str, Suite] = {} 

218 self._by_name_or_alias: dict[str, Suite] = {} 

219 self.target_suite = target_suite 

220 self.source_suites = source_suites 

221 self._suites[target_suite.name] = target_suite 

222 self._by_name_or_alias[target_suite.name] = target_suite 

223 if target_suite.suite_short_name: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 self._by_name_or_alias[target_suite.suite_short_name] = target_suite 

225 for suite in source_suites: 

226 self._suites[suite.name] = suite 

227 self._by_name_or_alias[suite.name] = suite 

228 if suite.suite_short_name: 

229 self._by_name_or_alias[suite.suite_short_name] = suite 

230 

231 @property 

232 def primary_source_suite(self) -> Suite: 

233 return self.source_suites[0] 

234 

235 @property 

236 def by_name_or_alias(self) -> dict[str, Suite]: 

237 return self._by_name_or_alias 

238 

239 @property 

240 def additional_source_suites(self) -> list[Suite]: 

241 return self.source_suites[1:] 

242 

243 def __getitem__(self, item: str) -> Suite: 

244 return self._suites[item] 

245 

246 def __len__(self) -> int: 

247 return len(self.source_suites) + 1 

248 

249 def __contains__(self, item: str) -> bool: 

250 return item in self._suites 

251 

252 def __iter__(self) -> Iterator[Suite]: 

253 # Sources first (as we will rely on this for loading data in the old live-data tests) 

254 yield from self.source_suites 

255 yield self.target_suite 

256 

257 

258class SourcePackage: 

259 __slots__ = [ 

260 "source", 

261 "version", 

262 "section", 

263 "binaries", 

264 "maintainer", 

265 "is_fakesrc", 

266 "build_deps_arch", 

267 "build_deps_indep", 

268 "testsuite", 

269 "testsuite_triggers", 

270 ] 

271 

272 def __init__( 

273 self, 

274 source: str, 

275 version: str, 

276 section: str, 

277 binaries: set["BinaryPackageId"], 

278 maintainer: str | None, 

279 is_fakesrc: bool, 

280 build_deps_arch: str | None, 

281 build_deps_indep: str | None, 

282 testsuite: list[str], 

283 testsuite_triggers: list[str], 

284 ) -> None: 

285 self.source = source 

286 self.version = version 

287 self.section = section 

288 self.binaries = binaries 

289 self.maintainer = maintainer 

290 self.is_fakesrc = is_fakesrc 

291 self.build_deps_arch = build_deps_arch 

292 self.build_deps_indep = build_deps_indep 

293 self.testsuite = testsuite 

294 self.testsuite_triggers = testsuite_triggers 

295 

296 def __getitem__(self, item: int) -> Any: 

297 return getattr(self, self.__slots__[item]) 

298 

299 

300class PackageId( 

301 namedtuple( 

302 "PackageId", 

303 [ 

304 "package_name", 

305 "version", 

306 "architecture", 

307 ], 

308 ) 

309): 

310 """Represent a source or binary package""" 

311 

312 def __init__(self, package_name: str, version: str, architecture: str) -> None: 

313 assert self.architecture != "all", "all not allowed for PackageId (%s)" % ( 

314 self.name 

315 ) 

316 

317 def __repr__(self) -> str: 

318 return "PID(%s)" % (self.name) 

319 

320 @property 

321 def name(self) -> str: 

322 if self.architecture == "source": 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 return f"{self.package_name}/{self.version}" 

324 else: 

325 return f"{self.package_name}/{self.version}/{self.architecture}" 

326 

327 @property 

328 def uvname(self) -> str: 

329 if self.architecture == "source": 329 ↛ 332line 329 didn't jump to line 332 because the condition on line 329 was always true

330 return "%s" % (self.package_name) 

331 else: 

332 return f"{self.package_name}/{self.architecture}" 

333 

334 

335class BinaryPackageId(PackageId): 

336 """Represent a binary package""" 

337 

338 def __init__(self, package_name: str, version: str, architecture: str) -> None: 

339 assert ( 

340 self.architecture != "source" 

341 ), "Source not allowed for BinaryPackageId (%s)" % (self.name) 

342 super().__init__(package_name, version, architecture) 

343 

344 def __repr__(self) -> str: 

345 return "BPID(%s)" % (self.name) 

346 

347 

348class BinaryPackage(NamedTuple): 

349 version: str 

350 section: str 

351 source: str 

352 source_version: str 

353 architecture: str 

354 multi_arch: str | None 

355 depends: str | None 

356 conflicts: str | None 

357 provides: list[tuple[str, str, str]] 

358 is_essential: bool 

359 pkg_id: BinaryPackageId 

360 builtusing: list[tuple[str, str]]