refactor: drop some methods from package class into separated wrappers

This commit is contained in:
2026-02-11 02:53:46 +02:00
parent 389bad6725
commit 6a2454548d
17 changed files with 489 additions and 412 deletions

View File

@@ -0,0 +1,117 @@
#
# Copyright (c) 2021-2026 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from pyalpm import vercmp # type: ignore[import-not-found]
from ahriman.core.build_tools.task import Task
from ahriman.core.configuration import Configuration
from ahriman.core.log import LazyLogging
from ahriman.core.utils import full_version, utcnow
from ahriman.models.package import Package
from ahriman.models.pkgbuild import Pkgbuild
class PackageVersion(LazyLogging):
"""
package version extractor and helper
Attributes:
package(Package): package definitions
"""
def __init__(self, package: Package) -> None:
"""
Args:
package(Package): package definitions
"""
self.package = package
def actual_version(self, configuration: Configuration) -> str:
"""
additional method to handle VCS package versions
Args:
configuration(Configuration): configuration instance
Returns:
str: package version if package is not VCS and current version according to VCS otherwise
"""
if not self.package.is_vcs:
return self.package.version
_, repository_id = configuration.check_loaded()
paths = configuration.repository_paths
task = Task(self.package, configuration, repository_id.architecture, paths)
try:
# create fresh chroot environment, fetch sources and - automagically - update PKGBUILD
task.init(paths.cache_for(self.package.base), [], None)
pkgbuild = Pkgbuild.from_file(paths.cache_for(self.package.base) / "PKGBUILD")
return full_version(pkgbuild.get("epoch"), pkgbuild["pkgver"], pkgbuild["pkgrel"])
except Exception:
self.logger.exception("cannot determine version of VCS package")
finally:
# clear log files generated by devtools
for log_file in paths.cache_for(self.package.base).glob("*.log"):
log_file.unlink()
return self.package.version
def is_newer_than(self, timestamp: float | int) -> bool:
"""
check if package was built after the specified timestamp
Args:
timestamp(float | int): timestamp to check build date against
Returns:
bool: ``True`` in case if package was built after the specified date and ``False`` otherwise.
In case if build date is not set by any of packages, it returns False
"""
return any(
package.build_date > timestamp
for package in self.package.packages.values()
if package.build_date is not None
)
def is_outdated(self, remote: Package, configuration: Configuration, *,
calculate_version: bool = True) -> bool:
"""
check if package is out-of-dated
Args:
remote(Package): package properties from remote source
configuration(Configuration): configuration instance
calculate_version(bool, optional): expand version to actual value (by calculating git versions)
(Default value = True)
Returns:
bool: ``True`` if the package is out-of-dated and ``False`` otherwise
"""
vcs_allowed_age = configuration.getint("build", "vcs_allowed_age", fallback=0)
min_vcs_build_date = utcnow().timestamp() - vcs_allowed_age
if calculate_version and not self.is_newer_than(min_vcs_build_date):
remote_version = PackageVersion(remote).actual_version(configuration)
else:
remote_version = remote.version
result: int = vercmp(self.package.version, remote_version)
return result < 0

View File

@@ -27,6 +27,7 @@ from ahriman.core.exceptions import CalledProcessError
from ahriman.core.log import LazyLogging
from ahriman.core.utils import check_output, utcnow, walk
from ahriman.models.package import Package
from ahriman.models.pkgbuild import Pkgbuild
from ahriman.models.pkgbuild_patch import PkgbuildPatch
from ahriman.models.remote_source import RemoteSource
from ahriman.models.repository_paths import RepositoryPaths
@@ -81,7 +82,7 @@ class Sources(LazyLogging):
Returns:
list[PkgbuildPatch]: generated patch for PKGBUILD architectures if required
"""
architectures = Package.supported_architectures(sources_dir)
architectures = Pkgbuild.supported_architectures(sources_dir)
if "any" in architectures: # makepkg does not like when there is any other arch except for any
return []
architectures.add(architecture)
@@ -161,7 +162,7 @@ class Sources(LazyLogging):
cwd=sources_dir, logger=instance.logger)
# extract local files...
files = ["PKGBUILD", ".SRCINFO"] + [str(path) for path in Package.local_files(sources_dir)]
files = ["PKGBUILD", ".SRCINFO"] + [str(path) for path in Pkgbuild.local_files(sources_dir)]
instance.add(sources_dir, *files)
# ...and commit them
instance.commit(sources_dir)

View File

@@ -17,10 +17,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import copy
from collections.abc import Iterable
from pathlib import Path
from tempfile import TemporaryDirectory
from ahriman.core.build_tools.package_version import PackageVersion
from ahriman.core.build_tools.sources import Sources
from ahriman.core.repository.repository_properties import RepositoryProperties
from ahriman.core.utils import package_like
@@ -33,6 +36,40 @@ class PackageInfo(RepositoryProperties):
handler for the package information
"""
def full_depends(self, package: Package, packages: Iterable[Package]) -> list[str]:
"""
generate full dependencies list including transitive dependencies
Args:
package(Package): package to check dependencies for
packages(Iterable[Package]): repository package list
Returns:
list[str]: all dependencies of the package
"""
dependencies = {}
# load own package dependencies
for package_base in packages:
for name, repo_package in package_base.packages.items():
dependencies[name] = repo_package.depends
for provides in repo_package.provides:
dependencies[provides] = repo_package.depends
# load repository dependencies
for database in self.pacman.handle.get_syncdbs():
for pacman_package in database.pkgcache:
dependencies[pacman_package.name] = pacman_package.depends
for provides in pacman_package.provides:
dependencies[provides] = pacman_package.depends
result = set(package.depends)
current_depends: set[str] = set()
while result != current_depends:
current_depends = copy.deepcopy(result)
for package_name in current_depends:
result.update(dependencies.get(package_name, []))
return sorted(result)
def load_archives(self, packages: Iterable[Path]) -> list[Package]:
"""
load packages from list of archives
@@ -58,7 +95,7 @@ class PackageInfo(RepositoryProperties):
# force version to max of them
self.logger.warning("version of %s differs, found %s and %s",
current.base, current.version, local.version)
if current.is_outdated(local, self.configuration, calculate_version=False):
if PackageVersion(current).is_outdated(local, self.configuration, calculate_version=False):
current.version = local.version
current.packages.update(local.packages)
except Exception:
@@ -130,5 +167,5 @@ class PackageInfo(RepositoryProperties):
return [
package
for package in packages
if depends_on.intersection(package.full_depends(self.pacman, packages))
if depends_on.intersection(self.full_depends(package, packages))
]

View File

@@ -19,6 +19,7 @@
#
from collections.abc import Iterable
from ahriman.core.build_tools.package_version import PackageVersion
from ahriman.core.build_tools.sources import Sources
from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.repository.cleaner import Cleaner
@@ -68,7 +69,7 @@ class UpdateHandler(PackageInfo, Cleaner):
try:
remote = load_remote(local)
if local.is_outdated(remote, self.configuration, calculate_version=vcs):
if PackageVersion(local).is_outdated(remote, self.configuration, calculate_version=vcs):
self.reporter.set_pending(local.base)
self.event(local.base, EventType.PackageOutdated, "Remote version is newer than local")
result.append(remote)
@@ -157,7 +158,7 @@ class UpdateHandler(PackageInfo, Cleaner):
if local.remote.is_remote:
continue # avoid checking AUR packages
if local.is_outdated(remote, self.configuration, calculate_version=vcs):
if PackageVersion(local).is_outdated(remote, self.configuration, calculate_version=vcs):
self.reporter.set_pending(local.base)
self.event(local.base, EventType.PackageOutdated, "Locally pulled sources are outdated")
result.append(remote)

View File

@@ -35,6 +35,7 @@ from pwd import getpwuid
from typing import Any, IO, TypeVar
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
from ahriman.core.types import Comparable
__all__ = [
@@ -45,6 +46,7 @@ __all__ = [
"extract_user",
"filter_json",
"full_version",
"list_flatmap",
"minmax",
"owner",
"package_like",
@@ -62,6 +64,7 @@ __all__ = [
]
R = TypeVar("R", bound=Comparable)
T = TypeVar("T")
@@ -276,6 +279,24 @@ def full_version(epoch: str | int | None, pkgver: str, pkgrel: str) -> str:
return f"{prefix}{pkgver}-{pkgrel}"
def list_flatmap(source: Iterable[T], extractor: Callable[[T], list[R]]) -> list[R]:
"""
extract elements from list of lists, flatten them and apply ``extractor``
Args:
source(Iterable[T]): source list
extractor(Callable[[T], list[R]): property extractor
Returns:
list[T]: combined list of unique entries in properties list
"""
def generator() -> Iterator[R]:
for inner in source:
yield from extractor(inner)
return sorted(set(generator()))
def minmax(source: Iterable[T], *, key: Callable[[T], Any] | None = None) -> tuple[T, T]:
"""
get min and max value from iterable

View File

@@ -17,23 +17,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# pylint: disable=too-many-lines,too-many-public-methods
from __future__ import annotations
import copy
from collections.abc import Callable, Iterable, Iterator
from collections.abc import Iterable
from dataclasses import dataclass
from pathlib import Path
from pyalpm import vercmp # type: ignore[import-not-found]
from typing import Any, Self
from urllib.parse import urlparse
from ahriman.core.alpm.pacman import Pacman
from ahriman.core.alpm.remote import AUR, Official, OfficialSyncdb
from ahriman.core.configuration import Configuration
from ahriman.core.log import LazyLogging
from ahriman.core.utils import dataclass_view, full_version, parse_version, srcinfo_property_list, utcnow
from ahriman.core.utils import dataclass_view, full_version, list_flatmap, parse_version, srcinfo_property_list
from ahriman.models.package_description import PackageDescription
from ahriman.models.package_source import PackageSource
from ahriman.models.pkgbuild import Pkgbuild
@@ -89,7 +84,7 @@ class Package(LazyLogging):
Returns:
list[str]: sum of dependencies per each package
"""
return self._package_list_property(lambda package: package.depends)
return list_flatmap(self.packages.values(), lambda package: package.depends)
@property
def depends_build(self) -> set[str]:
@@ -109,7 +104,7 @@ class Package(LazyLogging):
Returns:
list[str]: sum of test dependencies per each package
"""
return self._package_list_property(lambda package: package.check_depends)
return list_flatmap(self.packages.values(), lambda package: package.check_depends)
@property
def depends_make(self) -> list[str]:
@@ -119,7 +114,7 @@ class Package(LazyLogging):
Returns:
list[str]: sum of make dependencies per each package
"""
return self._package_list_property(lambda package: package.make_depends)
return list_flatmap(self.packages.values(), lambda package: package.make_depends)
@property
def depends_opt(self) -> list[str]:
@@ -129,7 +124,7 @@ class Package(LazyLogging):
Returns:
list[str]: sum of optional dependencies per each package
"""
return self._package_list_property(lambda package: package.opt_depends)
return list_flatmap(self.packages.values(), lambda package: package.opt_depends)
@property
def groups(self) -> list[str]:
@@ -139,7 +134,7 @@ class Package(LazyLogging):
Returns:
list[str]: sum of groups per each package
"""
return self._package_list_property(lambda package: package.groups)
return list_flatmap(self.packages.values(), lambda package: package.groups)
@property
def is_single_package(self) -> bool:
@@ -174,7 +169,7 @@ class Package(LazyLogging):
Returns:
list[str]: sum of licenses per each package
"""
return self._package_list_property(lambda package: package.licenses)
return list_flatmap(self.packages.values(), lambda package: package.licenses)
@property
def packages_full(self) -> list[str]:
@@ -345,184 +340,6 @@ class Package(LazyLogging):
packager=packager,
)
@staticmethod
def local_files(path: Path) -> Iterator[Path]:
"""
extract list of local files
Args:
path(Path): path to package sources directory
Yields:
Path: list of paths of files which belong to the package and distributed together with this tarball.
All paths are relative to the ``path``
Raises:
PackageInfoError: if there are parsing errors
"""
pkgbuild = Pkgbuild.from_file(path / "PKGBUILD")
# we could use arch property, but for consistency it is better to call special method
architectures = Package.supported_architectures(path)
for architecture in architectures:
for source in srcinfo_property_list("source", pkgbuild, {}, architecture=architecture):
if "::" in source:
_, source = source.split("::", maxsplit=1) # in case if filename is specified, remove it
if urlparse(source).scheme:
# basically file schema should use absolute path which is impossible if we are distributing
# files together with PKGBUILD. In this case we are going to skip it also
continue
yield Path(source)
if (install := pkgbuild.get("install")) is not None:
yield Path(install)
@staticmethod
def supported_architectures(path: Path) -> set[str]:
"""
load supported architectures from package sources
Args:
path(Path): path to package sources directory
Returns:
set[str]: list of package supported architectures
"""
pkgbuild = Pkgbuild.from_file(path / "PKGBUILD")
return set(pkgbuild.get("arch", []))
def _package_list_property(self, extractor: Callable[[PackageDescription], list[str]]) -> list[str]:
"""
extract list property from single packages and combine them into one list
Notes:
Basically this method is generic for type of ``list[T]``, but there is no trait ``Comparable`` in default
packages, thus we limit this method only to new types
Args:
extractor(Callable[[PackageDescription], list[str]): package property extractor
Returns:
list[str]: combined list of unique entries in properties list
"""
def generator() -> Iterator[str]:
for package in self.packages.values():
yield from extractor(package)
return sorted(set(generator()))
def actual_version(self, configuration: Configuration) -> str:
"""
additional method to handle VCS package versions
Args:
configuration(Configuration): configuration instance
Returns:
str: package version if package is not VCS and current version according to VCS otherwise
"""
if not self.is_vcs:
return self.version
from ahriman.core.build_tools.task import Task
_, repository_id = configuration.check_loaded()
paths = configuration.repository_paths
task = Task(self, configuration, repository_id.architecture, paths)
try:
# create fresh chroot environment, fetch sources and - automagically - update PKGBUILD
task.init(paths.cache_for(self.base), [], None)
pkgbuild = Pkgbuild.from_file(paths.cache_for(self.base) / "PKGBUILD")
return full_version(pkgbuild.get("epoch"), pkgbuild["pkgver"], pkgbuild["pkgrel"])
except Exception:
self.logger.exception("cannot determine version of VCS package")
finally:
# clear log files generated by devtools
for log_file in paths.cache_for(self.base).glob("*.log"):
log_file.unlink()
return self.version
def full_depends(self, pacman: Pacman, packages: Iterable[Package]) -> list[str]:
"""
generate full dependencies list including transitive dependencies
Args:
pacman(Pacman): alpm wrapper instance
packages(Iterable[Package]): repository package list
Returns:
list[str]: all dependencies of the package
"""
dependencies = {}
# load own package dependencies
for package_base in packages:
for name, repo_package in package_base.packages.items():
dependencies[name] = repo_package.depends
for provides in repo_package.provides:
dependencies[provides] = repo_package.depends
# load repository dependencies
for database in pacman.handle.get_syncdbs():
for pacman_package in database.pkgcache:
dependencies[pacman_package.name] = pacman_package.depends
for provides in pacman_package.provides:
dependencies[provides] = pacman_package.depends
result = set(self.depends)
current_depends: set[str] = set()
while result != current_depends:
current_depends = copy.deepcopy(result)
for package in current_depends:
result.update(dependencies.get(package, []))
return sorted(result)
def is_newer_than(self, timestamp: float | int) -> bool:
"""
check if package was built after the specified timestamp
Args:
timestamp(float | int): timestamp to check build date against
Returns:
bool: ``True`` in case if package was built after the specified date and ``False`` otherwise.
In case if build date is not set by any of packages, it returns False
"""
return any(
package.build_date > timestamp
for package in self.packages.values()
if package.build_date is not None
)
def is_outdated(self, remote: Package, configuration: Configuration, *,
calculate_version: bool = True) -> bool:
"""
check if package is out-of-dated
Args:
remote(Package): package properties from remote source
configuration(Configuration): configuration instance
calculate_version(bool, optional): expand version to actual value (by calculating git versions)
(Default value = True)
Returns:
bool: ``True`` if the package is out-of-dated and ``False`` otherwise
"""
vcs_allowed_age = configuration.getint("build", "vcs_allowed_age", fallback=0)
min_vcs_build_date = utcnow().timestamp() - vcs_allowed_age
if calculate_version and not self.is_newer_than(min_vcs_build_date):
remote_version = remote.actual_version(configuration)
else:
remote_version = remote.version
result: int = vercmp(self.version, remote_version)
return result < 0
def next_pkgrel(self, local_version: str | None) -> str | None:
"""
generate next pkgrel variable. The package release will be incremented if ``local_version`` is more or equal to

View File

@@ -22,8 +22,10 @@ from dataclasses import dataclass
from io import StringIO
from pathlib import Path
from typing import Any, ClassVar, IO, Self
from urllib.parse import urlparse
from ahriman.core.alpm.pkgbuild_parser import PkgbuildParser, PkgbuildToken
from ahriman.core.utils import srcinfo_property_list
from ahriman.models.pkgbuild_patch import PkgbuildPatch
@@ -103,6 +105,54 @@ class Pkgbuild(Mapping[str, Any]):
return cls({key: value for key, value in fields.items() if key})
@staticmethod
def local_files(path: Path) -> Iterator[Path]:
"""
extract list of local files
Args:
path(Path): path to package sources directory
Yields:
Path: list of paths of files which belong to the package and distributed together with this tarball.
All paths are relative to the ``path``
Raises:
PackageInfoError: if there are parsing errors
"""
pkgbuild = Pkgbuild.from_file(path / "PKGBUILD")
# we could use arch property, but for consistency it is better to call special method
architectures = Pkgbuild.supported_architectures(path)
for architecture in architectures:
for source in srcinfo_property_list("source", pkgbuild, {}, architecture=architecture):
if "::" in source:
_, source = source.split("::", maxsplit=1) # in case if filename is specified, remove it
if urlparse(source).scheme:
# basically file schema should use absolute path which is impossible if we are distributing
# files together with PKGBUILD. In this case we are going to skip it also
continue
yield Path(source)
if (install := pkgbuild.get("install")) is not None:
yield Path(install)
@staticmethod
def supported_architectures(path: Path) -> set[str]:
"""
load supported architectures from package sources
Args:
path(Path): path to package sources directory
Returns:
set[str]: list of package supported architectures
"""
pkgbuild = Pkgbuild.from_file(path / "PKGBUILD")
return set(pkgbuild.get("arch", []))
def packages(self) -> dict[str, Self]:
"""
extract properties from internal package functions