mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-04-07 02:53:38 +00:00
Compare commits
9 Commits
bb31919858
...
872a119bea
| Author | SHA1 | Date | |
|---|---|---|---|
| 872a119bea | |||
| 110ba26bd8 | |||
| 94c6764617 | |||
| 7fedfce4f5 | |||
| 123118d3c9 | |||
| 9e02d7bee8 | |||
| 231d3b47da | |||
| 2a137b42f5 | |||
| 257829d15c |
2
.github/workflows/setup.sh
vendored
2
.github/workflows/setup.sh
vendored
@@ -10,7 +10,7 @@ echo -e '[arcanisrepo]\nServer = https://repo.arcanis.me/$arch\nSigLevel = Never
|
||||
# refresh the image
|
||||
pacman -Syyu --noconfirm
|
||||
# main dependencies
|
||||
pacman -S --noconfirm devtools git pyalpm python-bcrypt python-inflection python-pyelftools python-requests python-systemd sudo
|
||||
pacman -S --noconfirm devtools git pyalpm python-bcrypt python-filelock python-inflection python-pyelftools python-requests python-systemd sudo
|
||||
# make dependencies
|
||||
pacman -S --noconfirm --asdeps base-devel python-build python-flit python-installer python-tox python-wheel
|
||||
# optional dependencies
|
||||
|
||||
@@ -24,7 +24,8 @@ RUN pacman -S --noconfirm --asdeps \
|
||||
devtools \
|
||||
git \
|
||||
pyalpm \
|
||||
python-bcrypt \
|
||||
python-bcrypt \
|
||||
python-filelock \
|
||||
python-inflection \
|
||||
python-pyelftools \
|
||||
python-requests \
|
||||
|
||||
@@ -8,7 +8,7 @@ pkgdesc="ArcH linux ReposItory MANager"
|
||||
arch=('any')
|
||||
url="https://ahriman.readthedocs.io/"
|
||||
license=('GPL-3.0-or-later')
|
||||
depends=('devtools>=1:1.0.0' 'git' 'pyalpm' 'python-bcrypt' 'python-inflection' 'python-pyelftools' 'python-requests')
|
||||
depends=('devtools>=1:1.0.0' 'git' 'pyalpm' 'python-bcrypt' 'python-filelock' 'python-inflection' 'python-pyelftools' 'python-requests')
|
||||
makedepends=('python-build' 'python-flit' 'python-installer' 'python-wheel')
|
||||
source=("https://github.com/arcan1s/ahriman/releases/download/$pkgver/$pkgbase-$pkgver.tar.gz"
|
||||
"$pkgbase.sysusers"
|
||||
|
||||
@@ -18,6 +18,7 @@ authors = [
|
||||
|
||||
dependencies = [
|
||||
"bcrypt",
|
||||
"filelock",
|
||||
"inflection",
|
||||
"pyelftools",
|
||||
"requests",
|
||||
|
||||
@@ -50,7 +50,7 @@ class TreeMigrate(Handler):
|
||||
target_tree.tree_create()
|
||||
# perform migration
|
||||
TreeMigrate.tree_move(current_tree, target_tree)
|
||||
TreeMigrate.fix_symlinks(target_tree)
|
||||
TreeMigrate.symlinks_fix(target_tree)
|
||||
|
||||
@staticmethod
|
||||
def _set_service_tree_migrate_parser(root: SubParserAction) -> argparse.ArgumentParser:
|
||||
@@ -69,7 +69,7 @@ class TreeMigrate(Handler):
|
||||
return parser
|
||||
|
||||
@staticmethod
|
||||
def fix_symlinks(paths: RepositoryPaths) -> None:
|
||||
def symlinks_fix(paths: RepositoryPaths) -> None:
|
||||
"""
|
||||
fix package archive symlinks
|
||||
|
||||
|
||||
@@ -59,22 +59,15 @@ class Repo(LazyLogging):
|
||||
"""
|
||||
return self.root / f"{self.name}.db.tar.gz"
|
||||
|
||||
def add(self, path: Path, *, remove: bool = True) -> None:
|
||||
def add(self, path: Path) -> None:
|
||||
"""
|
||||
add new package to repository
|
||||
|
||||
Args:
|
||||
path(Path): path to archive to add
|
||||
remove(bool, optional): whether to remove old packages or not (Default value = True)
|
||||
"""
|
||||
command = ["repo-add", *self.sign_args]
|
||||
if remove:
|
||||
command.extend(["--remove"])
|
||||
command.extend([str(self.repo_path), str(path)])
|
||||
|
||||
# add to repository
|
||||
check_output(
|
||||
*command,
|
||||
"repo-add", *self.sign_args, "--remove", str(self.repo_path), str(path),
|
||||
exception=BuildError.from_process(path.name),
|
||||
cwd=self.root,
|
||||
logger=self.logger,
|
||||
@@ -97,7 +90,7 @@ class Repo(LazyLogging):
|
||||
filename(Path): package filename to remove
|
||||
"""
|
||||
# remove package and signature (if any) from filesystem
|
||||
for full_path in self.root.glob(f"**/{filename.name}*"):
|
||||
for full_path in self.root.glob(f"{filename.name}*"):
|
||||
full_path.unlink()
|
||||
|
||||
# remove package from registry
|
||||
|
||||
@@ -24,7 +24,7 @@ from pathlib import Path
|
||||
|
||||
from ahriman.core.alpm.repo import Repo
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.utils import symlink_relative, utcnow, walk
|
||||
from ahriman.core.utils import package_like, symlink_relative, utcnow, walk
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.package_description import PackageDescription
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
@@ -97,8 +97,9 @@ class ArchiveTree(LazyLogging):
|
||||
parents = [repository] + list(repository.parents[:-1])
|
||||
for parent in parents:
|
||||
path = root / parent
|
||||
if not list(path.iterdir()):
|
||||
path.rmdir()
|
||||
if list(path.iterdir()):
|
||||
continue # directory is not empty
|
||||
path.rmdir()
|
||||
|
||||
def repository_for(self, date: datetime.date | None = None) -> Path:
|
||||
"""
|
||||
@@ -156,6 +157,8 @@ class ArchiveTree(LazyLogging):
|
||||
if self.repository_id.name != name or self.repository_id.architecture != architecture:
|
||||
continue # we only process same name repositories
|
||||
|
||||
if not package_like(path):
|
||||
continue
|
||||
if not path.is_symlink():
|
||||
continue # find symlinks only
|
||||
if path.exists():
|
||||
|
||||
@@ -79,8 +79,8 @@ def move_packages(repository_paths: RepositoryPaths, pacman: Pacman) -> None:
|
||||
artifacts.append(signature)
|
||||
|
||||
for source in artifacts:
|
||||
target = repository_paths.ensure_exists(repository_paths.archive_for(package.base)) / source.name
|
||||
# move package to the archive directory
|
||||
target = repository_paths.archive_for(package.base) / source.name
|
||||
atomic_move(source, target)
|
||||
# create symlink to the archive
|
||||
symlink_relative(source, target)
|
||||
|
||||
@@ -19,7 +19,8 @@
|
||||
#
|
||||
import shutil
|
||||
|
||||
from collections.abc import Iterable, Iterator
|
||||
from collections.abc import Iterable
|
||||
from filelock import FileLock
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
@@ -27,7 +28,7 @@ from ahriman.core.build_tools.package_archive import PackageArchive
|
||||
from ahriman.core.build_tools.task import Task
|
||||
from ahriman.core.repository.cleaner import Cleaner
|
||||
from ahriman.core.repository.package_info import PackageInfo
|
||||
from ahriman.core.utils import atomic_move, filelock, package_like, safe_filename, symlink_relative
|
||||
from ahriman.core.utils import atomic_move, list_flatmap, package_like, safe_filename, symlink_relative
|
||||
from ahriman.models.changes import Changes
|
||||
from ahriman.models.event import EventType
|
||||
from ahriman.models.package import Package
|
||||
@@ -41,35 +42,34 @@ class Executor(PackageInfo, Cleaner):
|
||||
trait for common repository update processes
|
||||
"""
|
||||
|
||||
def _archive_lookup(self, package: Package) -> Iterator[Path]:
|
||||
def _archive_lookup(self, package: Package) -> list[Path]:
|
||||
"""
|
||||
check if there is a rebuilt package already
|
||||
|
||||
Args:
|
||||
package(Package): package to check
|
||||
|
||||
Yields:
|
||||
Path: list of built packages and signatures if available, empty list otherwise
|
||||
Returns:
|
||||
list[Path]: list of built packages and signatures if available, empty list otherwise
|
||||
"""
|
||||
archive = self.paths.archive_for(package.base)
|
||||
if not archive.is_dir():
|
||||
return []
|
||||
|
||||
# find all packages which have same version
|
||||
same_version = [
|
||||
built
|
||||
for path in filter(package_like, archive.iterdir())
|
||||
if (built := Package.from_archive(path, self.pacman)).version == package.version
|
||||
]
|
||||
# no packages of the same version found
|
||||
if not same_version:
|
||||
return
|
||||
for path in filter(package_like, archive.iterdir()):
|
||||
# check if package version is the same
|
||||
built = Package.from_archive(path, self.pacman)
|
||||
if built.version != package.version:
|
||||
continue
|
||||
|
||||
packages = [single for built in same_version for single in built.packages.values()]
|
||||
# all packages must be either any or same architecture
|
||||
if not all(single.architecture in ("any", self.architecture) for single in packages):
|
||||
return
|
||||
packages = built.packages.values()
|
||||
# all packages must be either any or same architecture
|
||||
if not all(single.architecture in ("any", self.architecture) for single in packages):
|
||||
continue
|
||||
|
||||
for single in packages:
|
||||
yield from archive.glob(f"{single.filename}*")
|
||||
return list_flatmap(packages, lambda single: archive.glob(f"{single.filename}*"))
|
||||
|
||||
return []
|
||||
|
||||
def _archive_rename(self, description: PackageDescription, package_base: str) -> None:
|
||||
"""
|
||||
@@ -112,7 +112,7 @@ class Executor(PackageInfo, Cleaner):
|
||||
self.logger.info("using prebuilt packages for %s-%s", loaded_package.base, loaded_package.version)
|
||||
built = []
|
||||
for artifact in prebuilt:
|
||||
with filelock(artifact):
|
||||
with FileLock(artifact.with_name(f".{artifact.name}.lock")):
|
||||
shutil.copy(artifact, path)
|
||||
built.append(path / artifact.name)
|
||||
else:
|
||||
@@ -169,7 +169,7 @@ class Executor(PackageInfo, Cleaner):
|
||||
files = self.sign.process_sign_package(full_path, packager_key)
|
||||
|
||||
for src in files:
|
||||
dst = self.paths.archive_for(package_base) / src.name
|
||||
dst = self.paths.ensure_exists(self.paths.archive_for(package_base)) / src.name
|
||||
atomic_move(src, dst) # move package to archive directory
|
||||
if not (symlink := self.paths.repository / dst.name).exists():
|
||||
symlink_relative(symlink, dst) # create link to archive
|
||||
|
||||
@@ -18,9 +18,7 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
# pylint: disable=too-many-lines
|
||||
import contextlib
|
||||
import datetime
|
||||
import fcntl
|
||||
import io
|
||||
import itertools
|
||||
import logging
|
||||
@@ -33,6 +31,7 @@ import subprocess
|
||||
from collections.abc import Callable, Iterable, Iterator, Mapping
|
||||
from dataclasses import asdict
|
||||
from enum import Enum
|
||||
from filelock import FileLock
|
||||
from pathlib import Path
|
||||
from pwd import getpwuid
|
||||
from typing import Any, IO, TypeVar
|
||||
@@ -48,7 +47,6 @@ __all__ = [
|
||||
"dataclass_view",
|
||||
"enum_values",
|
||||
"extract_user",
|
||||
"filelock",
|
||||
"filter_json",
|
||||
"full_version",
|
||||
"list_flatmap",
|
||||
@@ -89,7 +87,7 @@ def atomic_move(src: Path, dst: Path) -> None:
|
||||
|
||||
>>> atomic_move(src, dst)
|
||||
"""
|
||||
with filelock(dst):
|
||||
with FileLock(dst.with_name(f".{dst.name}.lock")):
|
||||
shutil.move(src, dst)
|
||||
|
||||
|
||||
@@ -264,29 +262,6 @@ def extract_user() -> str | None:
|
||||
return os.getenv("SUDO_USER") or os.getenv("DOAS_USER") or os.getenv("USER")
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def filelock(path: Path) -> Iterator[None]:
|
||||
"""
|
||||
lock on file passed as argument
|
||||
|
||||
Args:
|
||||
path(Path): path object on which lock must be performed
|
||||
"""
|
||||
lock_path = path.with_name(f".{path.name}")
|
||||
try:
|
||||
with lock_path.open("ab") as lock_file:
|
||||
fd = lock_file.fileno()
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_EX) # lock file and wait lock is until available
|
||||
yield
|
||||
finally:
|
||||
fcntl.flock(fd, fcntl.LOCK_UN) # unlock file first
|
||||
finally:
|
||||
# remove lock file at the end
|
||||
# there might be a race condition here, but we don't care about this case
|
||||
lock_path.unlink(missing_ok=True)
|
||||
|
||||
|
||||
def filter_json(source: dict[str, Any], known_fields: Iterable[str]) -> dict[str, Any]:
|
||||
"""
|
||||
filter json object by fields used for json-to-object conversion
|
||||
@@ -327,7 +302,7 @@ def full_version(epoch: str | int | None, pkgver: str, pkgrel: str) -> str:
|
||||
return f"{prefix}{pkgver}-{pkgrel}"
|
||||
|
||||
|
||||
def list_flatmap(source: Iterable[T], extractor: Callable[[T], list[R]]) -> list[R]:
|
||||
def list_flatmap(source: Iterable[T], extractor: Callable[[T], Iterable[R]]) -> list[R]:
|
||||
"""
|
||||
extract elements from list of lists, flatten them and apply ``extractor``
|
||||
|
||||
|
||||
@@ -228,12 +228,7 @@ class RepositoryPaths(LazyLogging):
|
||||
Returns:
|
||||
Path: path to archive directory for package base
|
||||
"""
|
||||
directory = self.archive / "packages" / package_base[0] / package_base
|
||||
if not directory.is_dir(): # create if not exists
|
||||
with self.preserve_owner():
|
||||
directory.mkdir(mode=0o755, parents=True)
|
||||
|
||||
return directory
|
||||
return self.archive / "packages" / package_base[0] / package_base
|
||||
|
||||
def cache_for(self, package_base: str) -> Path:
|
||||
"""
|
||||
@@ -247,6 +242,27 @@ class RepositoryPaths(LazyLogging):
|
||||
"""
|
||||
return self.cache / package_base
|
||||
|
||||
def ensure_exists(self, directory: Path) -> Path:
|
||||
"""
|
||||
get path based on ``directory`` callable provided and ensure it exists
|
||||
|
||||
Args:
|
||||
directory(Path): path to directory to check
|
||||
|
||||
Returns:
|
||||
Path: original path based on extractor provided. Directory will always exist
|
||||
|
||||
Examples:
|
||||
This method calls directory accessor and then checks if there is a directory and - otherwise - creates it::
|
||||
|
||||
>>> paths.ensure_exists(paths.archive_for(package_base))
|
||||
"""
|
||||
if not directory.is_dir():
|
||||
with self.preserve_owner():
|
||||
directory.mkdir(mode=0o755, parents=True)
|
||||
|
||||
return directory
|
||||
|
||||
@contextlib.contextmanager
|
||||
def preserve_owner(self) -> Iterator[None]:
|
||||
"""
|
||||
@@ -303,13 +319,12 @@ class RepositoryPaths(LazyLogging):
|
||||
if self.repository_id.is_empty:
|
||||
return # do not even try to create tree in case if no repository id set
|
||||
|
||||
with self.preserve_owner():
|
||||
for directory in (
|
||||
self.archive,
|
||||
self.cache,
|
||||
self.chroot,
|
||||
self.packages,
|
||||
self.pacman,
|
||||
self.repository,
|
||||
):
|
||||
directory.mkdir(mode=0o755, parents=True, exist_ok=True)
|
||||
for directory in (
|
||||
self.archive,
|
||||
self.cache,
|
||||
self.chroot,
|
||||
self.packages,
|
||||
self.pacman,
|
||||
self.repository,
|
||||
):
|
||||
self.ensure_exists(directory)
|
||||
|
||||
@@ -17,7 +17,7 @@ def test_run(args: argparse.Namespace, configuration: Configuration, mocker: Moc
|
||||
"""
|
||||
tree_create_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.tree_create")
|
||||
application_mock = mocker.patch("ahriman.application.handlers.tree_migrate.TreeMigrate.tree_move")
|
||||
symlinks_mock = mocker.patch("ahriman.application.handlers.tree_migrate.TreeMigrate.fix_symlinks")
|
||||
symlinks_mock = mocker.patch("ahriman.application.handlers.tree_migrate.TreeMigrate.symlinks_fix")
|
||||
_, repository_id = configuration.check_loaded()
|
||||
old_paths = configuration.repository_paths
|
||||
new_paths = RepositoryPaths(old_paths.root, old_paths.repository_id, _force_current_tree=True)
|
||||
@@ -28,11 +28,10 @@ def test_run(args: argparse.Namespace, configuration: Configuration, mocker: Moc
|
||||
symlinks_mock.assert_called_once_with(new_paths)
|
||||
|
||||
|
||||
def test_fix_symlinks(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
def test_symlinks_fix(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must replace symlinks during migration
|
||||
"""
|
||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
||||
mocker.patch("ahriman.application.handlers.tree_migrate.walk", side_effect=[
|
||||
[
|
||||
repository_paths.archive_for(package_ahriman.base) / "file",
|
||||
@@ -47,7 +46,7 @@ def test_fix_symlinks(repository_paths: RepositoryPaths, package_ahriman: Packag
|
||||
unlink_mock = mocker.patch("pathlib.Path.unlink")
|
||||
symlink_mock = mocker.patch("pathlib.Path.symlink_to")
|
||||
|
||||
TreeMigrate.fix_symlinks(repository_paths)
|
||||
TreeMigrate.symlinks_fix(repository_paths)
|
||||
unlink_mock.assert_called_once_with()
|
||||
symlink_mock.assert_called_once_with(
|
||||
Path("..") /
|
||||
|
||||
@@ -35,17 +35,6 @@ def test_repo_add(repo: Repo, mocker: MockerFixture) -> None:
|
||||
assert "--remove" in check_output_mock.call_args[0]
|
||||
|
||||
|
||||
def test_repo_add_no_remove(repo: Repo, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call repo-add without remove flag
|
||||
"""
|
||||
check_output_mock = mocker.patch("ahriman.core.alpm.repo.check_output")
|
||||
|
||||
repo.add(Path("path"), remove=False)
|
||||
check_output_mock.assert_called_once() # it will be checked later
|
||||
assert "--remove" not in check_output_mock.call_args[0]
|
||||
|
||||
|
||||
def test_repo_init(repo: Repo, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call repo-add with empty package list on repo initializing
|
||||
|
||||
@@ -109,6 +109,7 @@ def test_symlinks_fix(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
|
||||
archive_tree.repository_for() / filename
|
||||
for filename in (
|
||||
"symlink-1.0.0-1-x86_64.pkg.tar.zst",
|
||||
"symlink-1.0.0-1-x86_64.pkg.tar.zst.sig",
|
||||
"broken_symlink-1.0.0-1-x86_64.pkg.tar.zst",
|
||||
"file-1.0.0-1-x86_64.pkg.tar.zst",
|
||||
)
|
||||
|
||||
@@ -19,7 +19,7 @@ def test_archive_lookup(executor: Executor, package_ahriman: Package, package_py
|
||||
"""
|
||||
must existing packages which match the version
|
||||
"""
|
||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
||||
mocker.patch("pathlib.Path.is_dir", return_value=True)
|
||||
mocker.patch("pathlib.Path.iterdir", return_value=[
|
||||
Path("1.pkg.tar.zst"),
|
||||
Path("2.pkg.tar.zst"),
|
||||
@@ -40,7 +40,7 @@ def test_archive_lookup_version_mismatch(executor: Executor, package_ahriman: Pa
|
||||
"""
|
||||
must return nothing if no packages found with the same version
|
||||
"""
|
||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
||||
mocker.patch("pathlib.Path.is_dir", return_value=True)
|
||||
mocker.patch("pathlib.Path.iterdir", return_value=[
|
||||
Path("1.pkg.tar.zst"),
|
||||
])
|
||||
@@ -55,8 +55,8 @@ def test_archive_lookup_architecture_mismatch(executor: Executor, package_ahrima
|
||||
must return nothing if architecture doesn't match
|
||||
"""
|
||||
package_ahriman.packages[package_ahriman.base].architecture = "x86_64"
|
||||
mocker.patch("pathlib.Path.is_dir", return_value=True)
|
||||
mocker.patch("ahriman.core.repository.executor.Executor.architecture", return_value="i686")
|
||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
||||
mocker.patch("pathlib.Path.iterdir", return_value=[
|
||||
Path("1.pkg.tar.zst"),
|
||||
])
|
||||
@@ -65,6 +65,17 @@ def test_archive_lookup_architecture_mismatch(executor: Executor, package_ahrima
|
||||
assert list(executor._archive_lookup(package_ahriman)) == []
|
||||
|
||||
|
||||
def test_archive_lookup_no_archive_directory(
|
||||
executor: Executor,
|
||||
package_ahriman: Package,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must return nothing if no archive directory found
|
||||
"""
|
||||
mocker.patch("pathlib.Path.is_dir", return_value=False)
|
||||
assert list(executor._archive_lookup(package_ahriman)) == []
|
||||
|
||||
|
||||
def test_archive_rename(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly remove package archive
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import datetime
|
||||
import fcntl
|
||||
import logging
|
||||
import os
|
||||
import pytest
|
||||
@@ -21,11 +20,11 @@ def test_atomic_move(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must move file with locking
|
||||
"""
|
||||
filelock_mock = mocker.patch("ahriman.core.utils.filelock")
|
||||
filelock_mock = mocker.patch("ahriman.core.utils.FileLock")
|
||||
move_mock = mocker.patch("shutil.move")
|
||||
|
||||
atomic_move(Path("source"), Path("destination"))
|
||||
filelock_mock.assert_called_once_with(Path("destination"))
|
||||
filelock_mock.assert_called_once_with(Path(".destination.lock"))
|
||||
move_mock.assert_called_once_with(Path("source"), Path("destination"))
|
||||
|
||||
|
||||
@@ -248,53 +247,6 @@ def test_extract_user() -> None:
|
||||
assert extract_user() == "doas"
|
||||
|
||||
|
||||
def test_filelock(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must perform file locking
|
||||
"""
|
||||
lock_mock = mocker.patch("fcntl.flock")
|
||||
open_mock = mocker.patch("pathlib.Path.open", autospec=True)
|
||||
unlink_mock = mocker.patch("pathlib.Path.unlink")
|
||||
|
||||
with filelock(Path("local")):
|
||||
pass
|
||||
open_mock.assert_called_once_with(Path(".local"), "ab")
|
||||
lock_mock.assert_has_calls([
|
||||
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
|
||||
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
|
||||
])
|
||||
unlink_mock.assert_called_once_with(missing_ok=True)
|
||||
|
||||
|
||||
def test_filelock_remove_lock(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must remove lock file in case of exception
|
||||
"""
|
||||
mocker.patch("pathlib.Path.open", side_effect=Exception)
|
||||
unlink_mock = mocker.patch("pathlib.Path.unlink")
|
||||
|
||||
with pytest.raises(Exception):
|
||||
with filelock(Path("local")):
|
||||
pass
|
||||
unlink_mock.assert_called_once_with(missing_ok=True)
|
||||
|
||||
|
||||
def test_filelock_unlock(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must unlock file in case of exception
|
||||
"""
|
||||
mocker.patch("pathlib.Path.open")
|
||||
lock_mock = mocker.patch("fcntl.flock")
|
||||
|
||||
with pytest.raises(Exception):
|
||||
with filelock(Path("local")):
|
||||
raise Exception
|
||||
lock_mock.assert_has_calls([
|
||||
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
|
||||
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
|
||||
])
|
||||
|
||||
|
||||
def test_filter_json(package_ahriman: Package) -> None:
|
||||
"""
|
||||
must filter fields by known list
|
||||
|
||||
@@ -185,28 +185,14 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc
|
||||
iterdir_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must correctly define archive path
|
||||
"""
|
||||
mocker.patch("pathlib.Path.is_dir", return_value=True)
|
||||
path = repository_paths.archive_for(package_ahriman.base)
|
||||
assert path == repository_paths.archive / "packages" / "a" / package_ahriman.base
|
||||
|
||||
|
||||
def test_archive_for_create_tree(repository_paths: RepositoryPaths, package_ahriman: Package,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must create archive directory if it doesn't exist
|
||||
"""
|
||||
owner_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
||||
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
|
||||
|
||||
repository_paths.archive_for(package_ahriman.base)
|
||||
owner_mock.assert_called_once_with()
|
||||
mkdir_mock.assert_called_once_with(mode=0o755, parents=True)
|
||||
|
||||
|
||||
def test_cache_for(repository_paths: RepositoryPaths, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must return correct path for cache directory
|
||||
@@ -216,6 +202,29 @@ def test_cache_for(repository_paths: RepositoryPaths, package_ahriman: Package)
|
||||
assert path.parent == repository_paths.cache
|
||||
|
||||
|
||||
def test_ensure_exists(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must create directory if it doesn't exist
|
||||
"""
|
||||
owner_guard_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
||||
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
|
||||
|
||||
repository_paths.ensure_exists(repository_paths.archive)
|
||||
owner_guard_mock.assert_called_once_with()
|
||||
mkdir_mock.assert_called_once_with(mode=0o755, parents=True)
|
||||
|
||||
|
||||
def test_ensure_exists_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must do not create directory if it already exists
|
||||
"""
|
||||
mocker.patch("pathlib.Path.is_dir", return_value=True)
|
||||
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
|
||||
|
||||
repository_paths.ensure_exists(repository_paths.archive)
|
||||
mkdir_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_preserve_owner(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must preserve file owner during operations
|
||||
@@ -305,8 +314,12 @@ def test_tree_create(repository_paths: RepositoryPaths, mocker: MockerFixture) -
|
||||
owner_guard_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
|
||||
|
||||
repository_paths.tree_create()
|
||||
mkdir_mock.assert_has_calls([MockCall(mode=0o755, parents=True, exist_ok=True) for _ in paths], any_order=True)
|
||||
owner_guard_mock.assert_called_once_with()
|
||||
mkdir_mock.assert_has_calls([MockCall(mode=0o755, parents=True) for _ in paths], any_order=True)
|
||||
owner_guard_mock.assert_has_calls([
|
||||
MockCall(),
|
||||
MockCall().__enter__(),
|
||||
MockCall().__exit__(None, None, None)
|
||||
] * len(paths))
|
||||
|
||||
|
||||
def test_tree_create_skip(mocker: MockerFixture) -> None:
|
||||
|
||||
Reference in New Issue
Block a user