mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-04-08 19:23:38 +00:00
remove side effect from getter
This commit is contained in:
@@ -79,8 +79,8 @@ def move_packages(repository_paths: RepositoryPaths, pacman: Pacman) -> None:
|
|||||||
artifacts.append(signature)
|
artifacts.append(signature)
|
||||||
|
|
||||||
for source in artifacts:
|
for source in artifacts:
|
||||||
|
target = repository_paths.ensure_exists(repository_paths.archive_for, package.base) / source.name
|
||||||
# move package to the archive directory
|
# move package to the archive directory
|
||||||
target = repository_paths.archive_for(package.base) / source.name
|
|
||||||
atomic_move(source, target)
|
atomic_move(source, target)
|
||||||
# create symlink to the archive
|
# create symlink to the archive
|
||||||
symlink_relative(source, target)
|
symlink_relative(source, target)
|
||||||
|
|||||||
@@ -52,6 +52,8 @@ class Executor(PackageInfo, Cleaner):
|
|||||||
Path: list of built packages and signatures if available, empty list otherwise
|
Path: list of built packages and signatures if available, empty list otherwise
|
||||||
"""
|
"""
|
||||||
archive = self.paths.archive_for(package.base)
|
archive = self.paths.archive_for(package.base)
|
||||||
|
if not archive.is_dir():
|
||||||
|
return
|
||||||
|
|
||||||
# find all packages which have same version
|
# find all packages which have same version
|
||||||
same_version = [
|
same_version = [
|
||||||
@@ -169,7 +171,7 @@ class Executor(PackageInfo, Cleaner):
|
|||||||
files = self.sign.process_sign_package(full_path, packager_key)
|
files = self.sign.process_sign_package(full_path, packager_key)
|
||||||
|
|
||||||
for src in files:
|
for src in files:
|
||||||
dst = self.paths.archive_for(package_base) / src.name
|
dst = self.paths.ensure_exists(self.paths.archive_for, package_base) / src.name
|
||||||
atomic_move(src, dst) # move package to archive directory
|
atomic_move(src, dst) # move package to archive directory
|
||||||
if not (symlink := self.paths.repository / dst.name).exists():
|
if not (symlink := self.paths.repository / dst.name).exists():
|
||||||
symlink_relative(symlink, dst) # create link to archive
|
symlink_relative(symlink, dst) # create link to archive
|
||||||
|
|||||||
@@ -21,17 +21,21 @@ import contextlib
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
from collections.abc import Iterator
|
from collections.abc import Callable, Iterator
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from functools import cached_property
|
from functools import cached_property
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pwd import getpwuid
|
from pwd import getpwuid
|
||||||
|
from typing import ParamSpec
|
||||||
|
|
||||||
from ahriman.core.log import LazyLogging
|
from ahriman.core.log import LazyLogging
|
||||||
from ahriman.core.utils import owner
|
from ahriman.core.utils import owner
|
||||||
from ahriman.models.repository_id import RepositoryId
|
from ahriman.models.repository_id import RepositoryId
|
||||||
|
|
||||||
|
|
||||||
|
Params = ParamSpec("Params")
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class RepositoryPaths(LazyLogging):
|
class RepositoryPaths(LazyLogging):
|
||||||
"""
|
"""
|
||||||
@@ -228,12 +232,7 @@ class RepositoryPaths(LazyLogging):
|
|||||||
Returns:
|
Returns:
|
||||||
Path: path to archive directory for package base
|
Path: path to archive directory for package base
|
||||||
"""
|
"""
|
||||||
directory = self.archive / "packages" / package_base[0] / package_base
|
return self.archive / "packages" / package_base[0] / package_base
|
||||||
if not directory.is_dir(): # create if not exists
|
|
||||||
with self.preserve_owner():
|
|
||||||
directory.mkdir(mode=0o755, parents=True)
|
|
||||||
|
|
||||||
return directory
|
|
||||||
|
|
||||||
def cache_for(self, package_base: str) -> Path:
|
def cache_for(self, package_base: str) -> Path:
|
||||||
"""
|
"""
|
||||||
@@ -247,6 +246,33 @@ class RepositoryPaths(LazyLogging):
|
|||||||
"""
|
"""
|
||||||
return self.cache / package_base
|
return self.cache / package_base
|
||||||
|
|
||||||
|
def ensure_exists(self, directory: Callable[Params, Path] | Path, *args: Params.args,
|
||||||
|
**kwargs: Params.kwargs) -> Path:
|
||||||
|
"""
|
||||||
|
get path based on ``directory`` callable provided and ensure it exists
|
||||||
|
|
||||||
|
Args:
|
||||||
|
directory(Callable[Params, Path] | Path): either directory extractor or path to directory to check
|
||||||
|
args(Params.args): positional arguments for directory call
|
||||||
|
kwargs(Params.kwargs): keyword arguments for directory call
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path: original path based on extractor provided. Directory will always exist
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
This method calls directory accessor and then checks if there is a directory and - otherwise - creates it::
|
||||||
|
|
||||||
|
>>> paths.ensure_exists(paths.archive_for, package_base)
|
||||||
|
"""
|
||||||
|
if callable(directory):
|
||||||
|
directory = directory(*args, **kwargs)
|
||||||
|
|
||||||
|
if not directory.is_dir():
|
||||||
|
with self.preserve_owner():
|
||||||
|
directory.mkdir(mode=0o755, parents=True)
|
||||||
|
|
||||||
|
return directory
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def preserve_owner(self) -> Iterator[None]:
|
def preserve_owner(self) -> Iterator[None]:
|
||||||
"""
|
"""
|
||||||
@@ -303,13 +329,12 @@ class RepositoryPaths(LazyLogging):
|
|||||||
if self.repository_id.is_empty:
|
if self.repository_id.is_empty:
|
||||||
return # do not even try to create tree in case if no repository id set
|
return # do not even try to create tree in case if no repository id set
|
||||||
|
|
||||||
with self.preserve_owner():
|
for directory in (
|
||||||
for directory in (
|
self.archive,
|
||||||
self.archive,
|
self.cache,
|
||||||
self.cache,
|
self.chroot,
|
||||||
self.chroot,
|
self.packages,
|
||||||
self.packages,
|
self.pacman,
|
||||||
self.pacman,
|
self.repository,
|
||||||
self.repository,
|
):
|
||||||
):
|
self.ensure_exists(directory)
|
||||||
directory.mkdir(mode=0o755, parents=True, exist_ok=True)
|
|
||||||
|
|||||||
@@ -32,7 +32,6 @@ def test_fix_symlinks(repository_paths: RepositoryPaths, package_ahriman: Packag
|
|||||||
"""
|
"""
|
||||||
must replace symlinks during migration
|
must replace symlinks during migration
|
||||||
"""
|
"""
|
||||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
|
||||||
mocker.patch("ahriman.application.handlers.tree_migrate.walk", side_effect=[
|
mocker.patch("ahriman.application.handlers.tree_migrate.walk", side_effect=[
|
||||||
[
|
[
|
||||||
repository_paths.archive_for(package_ahriman.base) / "file",
|
repository_paths.archive_for(package_ahriman.base) / "file",
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ def test_archive_lookup(executor: Executor, package_ahriman: Package, package_py
|
|||||||
"""
|
"""
|
||||||
must existing packages which match the version
|
must existing packages which match the version
|
||||||
"""
|
"""
|
||||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
mocker.patch("pathlib.Path.is_dir", return_value=True)
|
||||||
mocker.patch("pathlib.Path.iterdir", return_value=[
|
mocker.patch("pathlib.Path.iterdir", return_value=[
|
||||||
Path("1.pkg.tar.zst"),
|
Path("1.pkg.tar.zst"),
|
||||||
Path("2.pkg.tar.zst"),
|
Path("2.pkg.tar.zst"),
|
||||||
@@ -40,7 +40,7 @@ def test_archive_lookup_version_mismatch(executor: Executor, package_ahriman: Pa
|
|||||||
"""
|
"""
|
||||||
must return nothing if no packages found with the same version
|
must return nothing if no packages found with the same version
|
||||||
"""
|
"""
|
||||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
mocker.patch("pathlib.Path.is_dir", return_value=True)
|
||||||
mocker.patch("pathlib.Path.iterdir", return_value=[
|
mocker.patch("pathlib.Path.iterdir", return_value=[
|
||||||
Path("1.pkg.tar.zst"),
|
Path("1.pkg.tar.zst"),
|
||||||
])
|
])
|
||||||
@@ -55,8 +55,8 @@ def test_archive_lookup_architecture_mismatch(executor: Executor, package_ahrima
|
|||||||
must return nothing if architecture doesn't match
|
must return nothing if architecture doesn't match
|
||||||
"""
|
"""
|
||||||
package_ahriman.packages[package_ahriman.base].architecture = "x86_64"
|
package_ahriman.packages[package_ahriman.base].architecture = "x86_64"
|
||||||
|
mocker.patch("pathlib.Path.is_dir", return_value=True)
|
||||||
mocker.patch("ahriman.core.repository.executor.Executor.architecture", return_value="i686")
|
mocker.patch("ahriman.core.repository.executor.Executor.architecture", return_value="i686")
|
||||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
|
||||||
mocker.patch("pathlib.Path.iterdir", return_value=[
|
mocker.patch("pathlib.Path.iterdir", return_value=[
|
||||||
Path("1.pkg.tar.zst"),
|
Path("1.pkg.tar.zst"),
|
||||||
])
|
])
|
||||||
@@ -65,6 +65,17 @@ def test_archive_lookup_architecture_mismatch(executor: Executor, package_ahrima
|
|||||||
assert list(executor._archive_lookup(package_ahriman)) == []
|
assert list(executor._archive_lookup(package_ahriman)) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_archive_lookup_no_archive_directory(
|
||||||
|
executor: Executor,
|
||||||
|
package_ahriman: Package,
|
||||||
|
mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must return nothing if no archive directory found
|
||||||
|
"""
|
||||||
|
mocker.patch("pathlib.Path.is_dir", return_value=False)
|
||||||
|
assert list(executor._archive_lookup(package_ahriman)) == []
|
||||||
|
|
||||||
|
|
||||||
def test_archive_rename(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
|
def test_archive_rename(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||||
"""
|
"""
|
||||||
must correctly remove package archive
|
must correctly remove package archive
|
||||||
|
|||||||
@@ -185,28 +185,14 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc
|
|||||||
iterdir_mock.assert_not_called()
|
iterdir_mock.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package) -> None:
|
||||||
"""
|
"""
|
||||||
must correctly define archive path
|
must correctly define archive path
|
||||||
"""
|
"""
|
||||||
mocker.patch("pathlib.Path.is_dir", return_value=True)
|
|
||||||
path = repository_paths.archive_for(package_ahriman.base)
|
path = repository_paths.archive_for(package_ahriman.base)
|
||||||
assert path == repository_paths.archive / "packages" / "a" / package_ahriman.base
|
assert path == repository_paths.archive / "packages" / "a" / package_ahriman.base
|
||||||
|
|
||||||
|
|
||||||
def test_archive_for_create_tree(repository_paths: RepositoryPaths, package_ahriman: Package,
|
|
||||||
mocker: MockerFixture) -> None:
|
|
||||||
"""
|
|
||||||
must create archive directory if it doesn't exist
|
|
||||||
"""
|
|
||||||
owner_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
|
||||||
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
|
|
||||||
|
|
||||||
repository_paths.archive_for(package_ahriman.base)
|
|
||||||
owner_mock.assert_called_once_with()
|
|
||||||
mkdir_mock.assert_called_once_with(mode=0o755, parents=True)
|
|
||||||
|
|
||||||
|
|
||||||
def test_cache_for(repository_paths: RepositoryPaths, package_ahriman: Package) -> None:
|
def test_cache_for(repository_paths: RepositoryPaths, package_ahriman: Package) -> None:
|
||||||
"""
|
"""
|
||||||
must return correct path for cache directory
|
must return correct path for cache directory
|
||||||
@@ -216,6 +202,29 @@ def test_cache_for(repository_paths: RepositoryPaths, package_ahriman: Package)
|
|||||||
assert path.parent == repository_paths.cache
|
assert path.parent == repository_paths.cache
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_exists(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must create directory if it doesn't exist
|
||||||
|
"""
|
||||||
|
owner_guard_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
||||||
|
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
|
||||||
|
|
||||||
|
repository_paths.ensure_exists(repository_paths.archive)
|
||||||
|
owner_guard_mock.assert_called_once_with()
|
||||||
|
mkdir_mock.assert_called_once_with(mode=0o755, parents=True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_exists_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must do not create directory if it already exists
|
||||||
|
"""
|
||||||
|
mocker.patch("pathlib.Path.is_dir", return_value=True)
|
||||||
|
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
|
||||||
|
|
||||||
|
repository_paths.ensure_exists(repository_paths.archive)
|
||||||
|
mkdir_mock.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
def test_preserve_owner(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
def test_preserve_owner(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||||
"""
|
"""
|
||||||
must preserve file owner during operations
|
must preserve file owner during operations
|
||||||
@@ -305,8 +314,12 @@ def test_tree_create(repository_paths: RepositoryPaths, mocker: MockerFixture) -
|
|||||||
owner_guard_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
owner_guard_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
||||||
|
|
||||||
repository_paths.tree_create()
|
repository_paths.tree_create()
|
||||||
mkdir_mock.assert_has_calls([MockCall(mode=0o755, parents=True, exist_ok=True) for _ in paths], any_order=True)
|
mkdir_mock.assert_has_calls([MockCall(mode=0o755, parents=True) for _ in paths], any_order=True)
|
||||||
owner_guard_mock.assert_called_once_with()
|
owner_guard_mock.assert_has_calls([
|
||||||
|
MockCall(),
|
||||||
|
MockCall().__enter__(),
|
||||||
|
MockCall().__exit__(None, None, None)
|
||||||
|
] * len(paths))
|
||||||
|
|
||||||
|
|
||||||
def test_tree_create_skip(mocker: MockerFixture) -> None:
|
def test_tree_create_skip(mocker: MockerFixture) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user