Compare commits

...

2 Commits

Author SHA1 Message Date
bfb51434a0 initial impl 2026-03-18 16:53:25 +02:00
a04b6c3b9c refactor: move package archive lockup to package info trait 2026-03-15 20:23:20 +02:00
9 changed files with 308 additions and 154 deletions

View File

@@ -162,6 +162,10 @@ class ApplicationRepository(ApplicationProperties):
self.on_result(build_result) self.on_result(build_result)
result.merge(build_result) result.merge(build_result)
# filter packages which were prebuilt
succeeded = {package.base for package in build_result.success}
updates = filter(lambda package: package.base not in succeeded, updates)
builder = Updater.load(self.repository_id, self.configuration, self.repository) builder = Updater.load(self.repository_id, self.configuration, self.repository)
# ok so for now we split all packages into chunks and process each chunk accordingly # ok so for now we split all packages into chunks and process each chunk accordingly

View File

@@ -21,10 +21,10 @@ import argparse
from ahriman.application.application import Application from ahriman.application.application import Application
from ahriman.application.handlers.handler import Handler, SubParserAction from ahriman.application.handlers.handler import Handler, SubParserAction
from ahriman.application.handlers.update import Update
from ahriman.core.configuration import Configuration from ahriman.core.configuration import Configuration
from ahriman.core.utils import enum_values, extract_user from ahriman.core.utils import enum_values, extract_user
from ahriman.models.package_source import PackageSource from ahriman.models.package_source import PackageSource
from ahriman.models.packagers import Packagers
from ahriman.models.pkgbuild_patch import PkgbuildPatch from ahriman.models.pkgbuild_patch import PkgbuildPatch
from ahriman.models.repository_id import RepositoryId from ahriman.models.repository_id import RepositoryId
@@ -48,26 +48,7 @@ class Add(Handler):
""" """
application = Application(repository_id, configuration, report=report, refresh_pacman_database=args.refresh) application = Application(repository_id, configuration, report=report, refresh_pacman_database=args.refresh)
application.on_start() application.on_start()
Add.perform_action(application, args)
application.add(args.package, args.source, args.username)
patches = [PkgbuildPatch.from_env(patch) for patch in args.variable] if args.variable is not None else []
for package in args.package: # for each requested package insert patch
for patch in patches:
application.reporter.package_patches_update(package, patch)
if not args.now:
return
packages = application.updates(args.package, aur=False, local=False, manual=True, vcs=False, check_files=False)
if args.changes: # generate changes if requested
application.changes(packages)
packages = application.with_dependencies(packages, process_dependencies=args.dependencies)
packagers = Packagers(args.username, {package.base: package.packager for package in packages})
application.print_updates(packages, log_fn=application.logger.info)
result = application.update(packages, packagers, bump_pkgrel=args.increment)
Add.check_status(args.exit_code, not result.is_empty)
@staticmethod @staticmethod
def _set_package_add_parser(root: SubParserAction) -> argparse.ArgumentParser: def _set_package_add_parser(root: SubParserAction) -> argparse.ArgumentParser:
@@ -103,14 +84,34 @@ class Add(Handler):
parser.add_argument("--increment", help="increment package release (pkgrel) version on duplicate", parser.add_argument("--increment", help="increment package release (pkgrel) version on duplicate",
action=argparse.BooleanOptionalAction, default=True) action=argparse.BooleanOptionalAction, default=True)
parser.add_argument("-n", "--now", help="run update function after", action="store_true") parser.add_argument("-n", "--now", help="run update function after", action="store_true")
parser.add_argument("-y", "--refresh", help="download fresh package databases from the mirror before actions, "
"-yy to force refresh even if up to date",
action="count", default=False)
parser.add_argument("-s", "--source", help="explicitly specify the package source for this command", parser.add_argument("-s", "--source", help="explicitly specify the package source for this command",
type=PackageSource, choices=enum_values(PackageSource), default=PackageSource.Auto) type=PackageSource, choices=enum_values(PackageSource), default=PackageSource.Auto)
parser.add_argument("-u", "--username", help="build as user", default=extract_user()) parser.add_argument("-u", "--username", help="build as user", default=extract_user())
parser.add_argument("-v", "--variable", help="apply specified makepkg variables to the next build", parser.add_argument("-v", "--variable", help="apply specified makepkg variables to the next build",
action="append") action="append")
parser.add_argument("-y", "--refresh", help="download fresh package databases from the mirror before actions, "
"-yy to force refresh even if up to date",
action="count", default=False)
parser.set_defaults(aur=False, check_files=False, dry_run=False, local=False, manual=True, vcs=False)
return parser return parser
@staticmethod
def perform_action(application: Application, args: argparse.Namespace) -> None:
"""
perform add action
Args:
application(Application): application instance
args(argparse.Namespace): command line args
"""
application.add(args.package, args.source, args.username)
patches = [PkgbuildPatch.from_env(patch) for patch in args.variable] if args.variable is not None else []
for package in args.package: # for each requested package insert patch
for patch in patches:
application.reporter.package_patches_update(package, patch)
if not args.now:
return
Update.perform_action(application, args)
arguments = [_set_package_add_parser] arguments = [_set_package_add_parser]

View File

@@ -0,0 +1,132 @@
#
# Copyright (c) 2021-2026 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
from pathlib import Path
from ahriman.application.application import Application
from ahriman.application.handlers.add import Add
from ahriman.application.handlers.handler import Handler, SubParserAction
from ahriman.core.configuration import Configuration
from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.utils import extract_user
from ahriman.models.package import Package
from ahriman.models.package_source import PackageSource
from ahriman.models.repository_id import RepositoryId
class Rollback(Handler):
"""
package rollback handler
"""
@classmethod
def run(cls, args: argparse.Namespace, repository_id: RepositoryId, configuration: Configuration, *,
report: bool) -> None:
"""
callback for command line
Args:
args(argparse.Namespace): command line args
repository_id(RepositoryId): repository unique identifier
configuration(Configuration): configuration instance
report(bool): force enable or disable reporting
"""
application = Application(repository_id, configuration, report=report)
application.on_start()
package = Rollback.package_load(application, args.package, args.version)
artifacts = Rollback.package_artifacts(application, package)
args.package = [str(artifact) for artifact in artifacts]
Add.perform_action(application, args)
if args.hold:
application.reporter.package_hold_update(package.base, enabled=True)
@staticmethod
def _set_package_archives_parser(root: SubParserAction) -> argparse.ArgumentParser:
"""
add parser for package archives subcommand
Args:
root(SubParserAction): subparsers for the commands
Returns:
argparse.ArgumentParser: created argument parser
"""
parser = root.add_parser("package-rollback", help="rollback package",
description="rollback package to specified version from archives")
parser.add_argument("package", help="package base")
parser.add_argument("version", help="package version")
parser.add_argument("--hold", help="hold package afterwards",
action=argparse.BooleanOptionalAction, default=True)
parser.add_argument("-u", "--username", help="build as user", default=extract_user())
parser.set_defaults(aur=False, changes=False, check_files=False, dependencies=False, dry_run=False,
exit_code=False, increment=False, now=True, local=False, manual=False, refresh=False,
source=PackageSource.Archive, variable=None, vcs=False)
return parser
@staticmethod
def package_artifacts(application: Application, package: Package) -> list[Path]:
"""
look for package artifacts and returns paths to them if any
Args:
application(Application): application instance
package(Package): package descriptor
Returns:
list[Path]: paths to found artifacts
Raises:
UnknownPackageError: if artifacts do not exist
"""
# lookup for built artifacts
artifacts = application.repository.package_archives_lookup(package)
if not artifacts:
raise UnknownPackageError(package.base) from None
return artifacts
@staticmethod
def package_load(application: Application, package_base: str, version: str) -> Package:
"""
load package from given arguments
Args:
application(Application): application instance
package_base(str): package base
version(str): package version
Returns:
Package: loaded package
Raises:
UnknownPackageError: if package does not exist
"""
try:
package, _ = next(iter(application.reporter.package_get(package_base)))
package.version = version
return package
except StopIteration:
raise UnknownPackageError(package_base) from None
arguments = [_set_package_archives_parser]

View File

@@ -48,22 +48,7 @@ class Update(Handler):
""" """
application = Application(repository_id, configuration, report=report, refresh_pacman_database=args.refresh) application = Application(repository_id, configuration, report=report, refresh_pacman_database=args.refresh)
application.on_start() application.on_start()
Update.perform_action(application, args)
packages = application.updates(args.package, aur=args.aur, local=args.local, manual=args.manual, vcs=args.vcs,
check_files=args.check_files)
if args.changes: # generate changes if requested
application.changes(packages)
if args.dry_run: # exit from application if no build requested
Update.check_status(args.exit_code, packages) # status code check
return
packages = application.with_dependencies(packages, process_dependencies=args.dependencies)
packagers = Packagers(args.username, {package.base: package.packager for package in packages})
application.print_updates(packages, log_fn=application.logger.info)
result = application.update(packages, packagers, bump_pkgrel=args.increment)
Update.check_status(args.exit_code, not result.is_empty)
@staticmethod @staticmethod
def _set_repo_check_parser(root: SubParserAction) -> argparse.ArgumentParser: def _set_repo_check_parser(root: SubParserAction) -> argparse.ArgumentParser:
@@ -153,6 +138,31 @@ class Update(Handler):
return print(line) if dry_run else application.logger.info(line) # pylint: disable=bad-builtin return print(line) if dry_run else application.logger.info(line) # pylint: disable=bad-builtin
return inner return inner
@staticmethod
def perform_action(application: Application, args: argparse.Namespace) -> None:
"""
perform update action
Args:
application(Application): application instance
args(argparse.Namespace): command line args
"""
packages = application.updates(args.package, aur=args.aur, local=args.local, manual=args.manual, vcs=args.vcs,
check_files=args.check_files)
if args.changes: # generate changes if requested
application.changes(packages)
if args.dry_run: # exit from application if no build requested
Update.check_status(args.exit_code, packages) # status code check
return
packages = application.with_dependencies(packages, process_dependencies=args.dependencies)
packagers = Packagers(args.username, {package.base: package.packager for package in packages})
application.print_updates(packages, log_fn=application.logger.info)
result = application.update(packages, packagers, bump_pkgrel=args.increment)
Update.check_status(args.exit_code, not result.is_empty)
arguments = [ arguments = [
_set_repo_check_parser, _set_repo_check_parser,
_set_repo_update_parser, _set_repo_update_parser,

View File

@@ -27,7 +27,7 @@ from ahriman.core.build_tools.package_archive import PackageArchive
from ahriman.core.build_tools.task import Task from ahriman.core.build_tools.task import Task
from ahriman.core.repository.cleaner import Cleaner from ahriman.core.repository.cleaner import Cleaner
from ahriman.core.repository.package_info import PackageInfo from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.utils import atomic_move, filelock, list_flatmap, package_like, safe_filename, symlink_relative from ahriman.core.utils import atomic_move, filelock, safe_filename, symlink_relative
from ahriman.models.changes import Changes from ahriman.models.changes import Changes
from ahriman.models.event import EventType from ahriman.models.event import EventType
from ahriman.models.package import Package from ahriman.models.package import Package
@@ -41,34 +41,6 @@ class Executor(PackageInfo, Cleaner):
trait for common repository update processes trait for common repository update processes
""" """
def _archive_lookup(self, package: Package) -> list[Path]:
"""
check if there is a rebuilt package already
Args:
package(Package): package to check
Returns:
list[Path]: list of built packages and signatures if available, empty list otherwise
"""
archive = self.paths.archive_for(package.base)
if not archive.is_dir():
return []
for path in filter(package_like, archive.iterdir()):
# check if package version is the same
built = Package.from_archive(path)
if built.version != package.version:
continue
# all packages must be either any or same architecture
if not built.supports_architecture(self.repository_id.architecture):
continue
return list_flatmap(built.packages.values(), lambda single: archive.glob(f"{single.filename}*"))
return []
def _archive_rename(self, description: PackageDescription, package_base: str) -> None: def _archive_rename(self, description: PackageDescription, package_base: str) -> None:
""" """
rename package archive removing special symbols rename package archive removing special symbols
@@ -106,7 +78,7 @@ class Executor(PackageInfo, Cleaner):
commit_sha = task.init(path, patches, local_version) commit_sha = task.init(path, patches, local_version)
loaded_package = Package.from_build(path, self.repository_id.architecture, None) loaded_package = Package.from_build(path, self.repository_id.architecture, None)
if prebuilt := list(self._archive_lookup(loaded_package)): if prebuilt := self.package_archives_lookup(loaded_package):
self.logger.info("using prebuilt packages for %s-%s", loaded_package.base, loaded_package.version) self.logger.info("using prebuilt packages for %s-%s", loaded_package.base, loaded_package.version)
built = [] built = []
for artifact in prebuilt: for artifact in prebuilt:

View File

@@ -25,15 +25,15 @@ from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from ahriman.core.alpm.pacman import Pacman from ahriman.core.alpm.pacman import Pacman
from ahriman.core.build_tools.package_version import PackageVersion
from ahriman.core.build_tools.sources import Sources from ahriman.core.build_tools.sources import Sources
from ahriman.core.configuration import Configuration from ahriman.core.configuration import Configuration
from ahriman.core.log import LazyLogging from ahriman.core.log import LazyLogging
from ahriman.core.status import Client from ahriman.core.status import Client
from ahriman.core.utils import package_like from ahriman.core.utils import list_flatmap, package_like
from ahriman.models.changes import Changes from ahriman.models.changes import Changes
from ahriman.models.package import Package from ahriman.models.package import Package
from ahriman.models.repository_id import RepositoryId from ahriman.models.repository_id import RepositoryId
from ahriman.models.repository_paths import RepositoryPaths
class PackageInfo(LazyLogging): class PackageInfo(LazyLogging):
@@ -43,12 +43,14 @@ class PackageInfo(LazyLogging):
Attributes: Attributes:
configuration(Configuration): configuration instance configuration(Configuration): configuration instance
pacman(Pacman): alpm wrapper instance pacman(Pacman): alpm wrapper instance
paths(RepositoryPaths): repository paths instance
reporter(Client): build status reporter instance reporter(Client): build status reporter instance
repository_id(RepositoryId): repository unique identifier repository_id(RepositoryId): repository unique identifier
""" """
configuration: Configuration configuration: Configuration
pacman: Pacman pacman: Pacman
paths: RepositoryPaths
reporter: Client reporter: Client
repository_id: RepositoryId repository_id: RepositoryId
@@ -86,19 +88,21 @@ class PackageInfo(LazyLogging):
return sorted(result) return sorted(result)
def load_archives(self, packages: Iterable[Path]) -> list[Package]: def load_archives(self, packages: Iterable[Path], *, newest_only: bool = True) -> list[Package]:
""" """
load packages from list of archives load packages from list of archives
Args: Args:
packages(Iterable[Path]): paths to package archives packages(Iterable[Path]): paths to package archives
newest_only(bool, optional): filter packages with the same base, keeping only fresh packages installed
(Default value = True)
Returns: Returns:
list[Package]: list of read packages list[Package]: list of read packages
""" """
sources = {package.base: package.remote for package, _, in self.reporter.package_get(None)} sources = {package.base: package.remote for package, _, in self.reporter.package_get(None)}
result: dict[str, Package] = {} result: dict[str, dict[str, Package]] = {}
# we are iterating over bases, not single packages # we are iterating over bases, not single packages
for full_path in packages: for full_path in packages:
try: try:
@@ -106,17 +110,23 @@ class PackageInfo(LazyLogging):
if (source := sources.get(local.base)) is not None: # update source with remote if (source := sources.get(local.base)) is not None: # update source with remote
local.remote = source local.remote = source
current = result.setdefault(local.base, local) loaded_versions = result.setdefault(local.base, {})
if current.version != local.version: current = loaded_versions.setdefault(local.version, local)
# force version to max of them
self.logger.warning("version of %s differs, found %s and %s",
current.base, current.version, local.version)
if PackageVersion(current).is_outdated(local, self.configuration, calculate_version=False):
current.version = local.version
current.packages.update(local.packages) current.packages.update(local.packages)
except Exception: except Exception:
self.logger.exception("could not load package from %s", full_path) self.logger.exception("could not load package from %s", full_path)
return list(result.values())
if newest_only:
comparator: Callable[[Package, Package], int] = lambda left, right: left.vercmp(right.version)
for package_base, versions in result.items():
newest = max(versions.values(), key=cmp_to_key(comparator))
result[package_base] = {newest.version: newest}
return [
package
for versions in result.values()
for package in versions.values()
]
def package_archives(self, package_base: str) -> list[Package]: def package_archives(self, package_base: str) -> list[Package]:
""" """
@@ -130,11 +140,9 @@ class PackageInfo(LazyLogging):
Returns: Returns:
list[Package]: list of packages belonging to this base, sorted by version by ascension list[Package]: list of packages belonging to this base, sorted by version by ascension
""" """
paths = self.configuration.repository_paths
packages: dict[tuple[str, str], Package] = {} packages: dict[tuple[str, str], Package] = {}
# we can't use here load_archives, because it ignores versions # we can't use here load_archives, because it ignores versions
for full_path in filter(package_like, paths.archive_for(package_base).iterdir()): for full_path in filter(package_like, self.paths.archive_for(package_base).iterdir()):
local = Package.from_archive(full_path) local = Package.from_archive(full_path)
if not local.supports_architecture(self.repository_id.architecture): if not local.supports_architecture(self.repository_id.architecture):
continue continue
@@ -143,6 +151,33 @@ class PackageInfo(LazyLogging):
comparator: Callable[[Package, Package], int] = lambda left, right: left.vercmp(right.version) comparator: Callable[[Package, Package], int] = lambda left, right: left.vercmp(right.version)
return sorted(packages.values(), key=cmp_to_key(comparator)) return sorted(packages.values(), key=cmp_to_key(comparator))
def package_archives_lookup(self, package: Package) -> list[Path]:
"""
check if there is a rebuilt package already
Args:
package(Package): package to check
Returns:
list[Path]: list of built packages and signatures if available, empty list otherwise
"""
archive = self.paths.archive_for(package.base)
if not archive.is_dir():
return []
for built in self.load_archives(filter(package_like, archive.iterdir()), newest_only=False):
# check if package version is the same
if built.version != package.version:
continue
# all packages must be either any or same architecture
if not built.supports_architecture(self.repository_id.architecture):
continue
return list_flatmap(built.packages.values(), lambda single: archive.glob(f"{single.filename}*"))
return []
def package_changes(self, package: Package, last_commit_sha: str) -> Changes | None: def package_changes(self, package: Package, last_commit_sha: str) -> Changes | None:
""" """
extract package change for the package since last commit if available extract package change for the package since last commit if available
@@ -157,7 +192,7 @@ class PackageInfo(LazyLogging):
with TemporaryDirectory(ignore_cleanup_errors=True) as dir_name: with TemporaryDirectory(ignore_cleanup_errors=True) as dir_name:
dir_path = Path(dir_name) dir_path = Path(dir_name)
patches = self.reporter.package_patches_get(package.base, None) patches = self.reporter.package_patches_get(package.base, None)
current_commit_sha = Sources.load(dir_path, package, patches, self.configuration.repository_paths) current_commit_sha = Sources.load(dir_path, package, patches, self.paths)
if current_commit_sha != last_commit_sha: if current_commit_sha != last_commit_sha:
return Sources.changes(dir_path, last_commit_sha) return Sources.changes(dir_path, last_commit_sha)
@@ -173,7 +208,7 @@ class PackageInfo(LazyLogging):
Returns: Returns:
list[Package]: list of packages properties list[Package]: list of packages properties
""" """
packages = self.load_archives(filter(package_like, self.configuration.repository_paths.repository.iterdir())) packages = self.load_archives(filter(package_like, self.paths.repository.iterdir()))
if filter_packages: if filter_packages:
packages = [package for package in packages if package.base in filter_packages] packages = [package for package in packages if package.base in filter_packages]
@@ -186,7 +221,7 @@ class PackageInfo(LazyLogging):
Returns: Returns:
list[Path]: list of filenames from the directory list[Path]: list of filenames from the directory
""" """
return list(filter(package_like, self.configuration.repository_paths.packages.iterdir())) return list(filter(package_like, self.paths.packages.iterdir()))
def packages_depend_on(self, packages: list[Package], depends_on: Iterable[str] | None) -> list[Package]: def packages_depend_on(self, packages: list[Package], depends_on: Iterable[str] | None) -> list[Package]:
""" """

View File

@@ -103,6 +103,8 @@ def _create_watcher(path: Path, repository_id: RepositoryId) -> Watcher:
# load package info wrapper # load package info wrapper
package_info = PackageInfo() package_info = PackageInfo()
package_info.configuration = configuration package_info.configuration = configuration
package_info.paths = configuration.repository_paths
package_info.reporter = client
package_info.repository_id = repository_id package_info.repository_id = repository_id
return Watcher(client, package_info) return Watcher(client, package_info)

View File

@@ -1,6 +1,5 @@
import pytest import pytest
from dataclasses import replace
from pathlib import Path from pathlib import Path
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from typing import Any from typing import Any
@@ -11,72 +10,9 @@ from ahriman.models.changes import Changes
from ahriman.models.dependencies import Dependencies from ahriman.models.dependencies import Dependencies
from ahriman.models.package import Package from ahriman.models.package import Package
from ahriman.models.packagers import Packagers from ahriman.models.packagers import Packagers
from ahriman.models.repository_id import RepositoryId
from ahriman.models.user import User from ahriman.models.user import User
def test_archive_lookup(executor: Executor, package_ahriman: Package, package_python_schedule: Package,
mocker: MockerFixture) -> None:
"""
must existing packages which match the version
"""
mocker.patch("pathlib.Path.is_dir", return_value=True)
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
Path("2.pkg.tar.zst"),
Path("3.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", side_effect=[
package_ahriman,
package_python_schedule,
replace(package_ahriman, version="1"),
])
glob_mock = mocker.patch("pathlib.Path.glob", return_value=[Path("1.pkg.tar.xz")])
assert list(executor._archive_lookup(package_ahriman)) == [Path("1.pkg.tar.xz")]
glob_mock.assert_called_once_with(f"{package_ahriman.packages[package_ahriman.base].filename}*")
def test_archive_lookup_version_mismatch(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must return nothing if no packages found with the same version
"""
mocker.patch("pathlib.Path.is_dir", return_value=True)
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", return_value=replace(package_ahriman, version="1"))
assert list(executor._archive_lookup(package_ahriman)) == []
def test_archive_lookup_architecture_mismatch(executor: Executor, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must return nothing if architecture doesn't match
"""
package_ahriman.packages[package_ahriman.base].architecture = "x86_64"
mocker.patch("pathlib.Path.is_dir", return_value=True)
executor.repository_id = RepositoryId("i686", executor.repository_id.name)
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", return_value=package_ahriman)
assert list(executor._archive_lookup(package_ahriman)) == []
def test_archive_lookup_no_archive_directory(
executor: Executor,
package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must return nothing if no archive directory found
"""
mocker.patch("pathlib.Path.is_dir", return_value=False)
assert list(executor._archive_lookup(package_ahriman)) == []
def test_archive_rename(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None: def test_archive_rename(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must correctly remove package archive must correctly remove package archive
@@ -110,7 +46,7 @@ def test_package_build(executor: Executor, package_ahriman: Package, mocker: Moc
status_client_mock = mocker.patch("ahriman.core.status.Client.set_building") status_client_mock = mocker.patch("ahriman.core.status.Client.set_building")
init_mock = mocker.patch("ahriman.core.build_tools.task.Task.init", return_value="sha") init_mock = mocker.patch("ahriman.core.build_tools.task.Task.init", return_value="sha")
package_mock = mocker.patch("ahriman.models.package.Package.from_build", return_value=package_ahriman) package_mock = mocker.patch("ahriman.models.package.Package.from_build", return_value=package_ahriman)
lookup_mock = mocker.patch("ahriman.core.repository.executor.Executor._archive_lookup", return_value=[]) lookup_mock = mocker.patch("ahriman.core.repository.executor.Executor.package_archives_lookup", return_value=[])
with_packages_mock = mocker.patch("ahriman.models.package.Package.with_packages") with_packages_mock = mocker.patch("ahriman.models.package.Package.with_packages")
rename_mock = mocker.patch("ahriman.core.repository.executor.atomic_move") rename_mock = mocker.patch("ahriman.core.repository.executor.atomic_move")
@@ -131,7 +67,7 @@ def test_package_build_copy(executor: Executor, package_ahriman: Package, mocker
mocker.patch("ahriman.core.build_tools.task.Task.build", return_value=[Path(package_ahriman.base)]) mocker.patch("ahriman.core.build_tools.task.Task.build", return_value=[Path(package_ahriman.base)])
mocker.patch("ahriman.core.build_tools.task.Task.init") mocker.patch("ahriman.core.build_tools.task.Task.init")
mocker.patch("ahriman.models.package.Package.from_build", return_value=package_ahriman) mocker.patch("ahriman.models.package.Package.from_build", return_value=package_ahriman)
mocker.patch("ahriman.core.repository.executor.Executor._archive_lookup", return_value=[path]) mocker.patch("ahriman.core.repository.executor.Executor.package_archives_lookup", return_value=[path])
mocker.patch("ahriman.core.repository.executor.atomic_move") mocker.patch("ahriman.core.repository.executor.atomic_move")
mocker.patch("ahriman.models.package.Package.with_packages") mocker.patch("ahriman.models.package.Package.with_packages")
copy_mock = mocker.patch("shutil.copy") copy_mock = mocker.patch("shutil.copy")

View File

@@ -8,6 +8,7 @@ from unittest.mock import MagicMock
from ahriman.core.repository import Repository from ahriman.core.repository import Repository
from ahriman.models.changes import Changes from ahriman.models.changes import Changes
from ahriman.models.package import Package from ahriman.models.package import Package
from ahriman.models.repository_id import RepositoryId
def test_full_depends(repository: Repository, package_ahriman: Package, package_python_schedule: Package, def test_full_depends(repository: Repository, package_ahriman: Package, package_python_schedule: Package,
@@ -120,6 +121,67 @@ def test_package_archives_architecture_mismatch(repository: Repository, package_
assert len(result) == 0 assert len(result) == 0
def test_package_archives_lookup(repository: Repository, package_ahriman: Package, package_python_schedule: Package,
mocker: MockerFixture) -> None:
"""
must existing packages which match the version
"""
mocker.patch("pathlib.Path.is_dir", return_value=True)
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
Path("2.pkg.tar.zst"),
Path("3.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", side_effect=[
package_ahriman,
package_python_schedule,
replace(package_ahriman, version="1"),
])
glob_mock = mocker.patch("pathlib.Path.glob", return_value=[Path("1.pkg.tar.xz")])
assert repository.package_archives_lookup(package_ahriman) == [Path("1.pkg.tar.xz")]
glob_mock.assert_called_once_with(f"{package_ahriman.packages[package_ahriman.base].filename}*")
def test_package_archives_lookup_version_mismatch(repository: Repository, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must return nothing if no packages found with the same version
"""
mocker.patch("pathlib.Path.is_dir", return_value=True)
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", return_value=replace(package_ahriman, version="1"))
assert repository.package_archives_lookup(package_ahriman) == []
def test_package_archives_lookup_architecture_mismatch(repository: Repository, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must return nothing if architecture doesn't match
"""
package_ahriman.packages[package_ahriman.base].architecture = "x86_64"
mocker.patch("pathlib.Path.is_dir", return_value=True)
repository.repository_id = RepositoryId("i686", repository.repository_id.name)
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", return_value=package_ahriman)
assert repository.package_archives_lookup(package_ahriman) == []
def test_package_archives_lookup_no_archive_directory(repository: Repository, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must return nothing if no archive directory found
"""
mocker.patch("pathlib.Path.is_dir", return_value=False)
assert repository.package_archives_lookup(package_ahriman) == []
def test_package_changes(repository: Repository, package_ahriman: Package, mocker: MockerFixture) -> None: def test_package_changes(repository: Repository, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must load package changes must load package changes