mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-04-07 02:53:38 +00:00
Compare commits
2 Commits
872a119bea
...
51431791df
| Author | SHA1 | Date | |
|---|---|---|---|
| 51431791df | |||
| d26525d1d3 |
@@ -20,7 +20,7 @@
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
from collections.abc import Callable, Iterable
|
||||
from collections.abc import Callable
|
||||
from multiprocessing import Pool
|
||||
from typing import ClassVar, TypeVar
|
||||
|
||||
@@ -28,9 +28,9 @@ from ahriman.application.lock import Lock
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.exceptions import ExitCode, MissingArchitectureError, MultipleArchitecturesError
|
||||
from ahriman.core.log.log_loader import LogLoader
|
||||
from ahriman.core.repository import Explorer
|
||||
from ahriman.core.types import ExplicitBool
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
|
||||
# this workaround is for several things
|
||||
@@ -169,11 +169,6 @@ class Handler:
|
||||
Raises:
|
||||
MissingArchitectureError: if no architecture set and automatic detection is not allowed or failed
|
||||
"""
|
||||
configuration = Configuration()
|
||||
configuration.load(args.configuration)
|
||||
# pylint, wtf???
|
||||
root = configuration.getpath("repository", "root") # pylint: disable=assignment-from-no-return
|
||||
|
||||
# preparse systemd repository-id argument
|
||||
# we are using unescaped values, so / is not allowed here, because it is impossible to separate if from dashes
|
||||
if args.repository_id is not None:
|
||||
@@ -184,27 +179,10 @@ class Handler:
|
||||
if repository_parts:
|
||||
args.repository = "-".join(repository_parts) # replace slash with dash
|
||||
|
||||
# extract repository names first
|
||||
if (from_args := args.repository) is not None:
|
||||
repositories: Iterable[str] = [from_args]
|
||||
elif from_filesystem := RepositoryPaths.known_repositories(root):
|
||||
repositories = from_filesystem
|
||||
else: # try to read configuration now
|
||||
repositories = [configuration.get("repository", "name")]
|
||||
configuration = Configuration()
|
||||
configuration.load(args.configuration)
|
||||
repositories = Explorer.repositories_extract(configuration, args.repository, args.architecture)
|
||||
|
||||
# extract architecture names
|
||||
if (architecture := args.architecture) is not None:
|
||||
parsed = set(
|
||||
RepositoryId(architecture, repository)
|
||||
for repository in repositories
|
||||
)
|
||||
else: # try to read from file system
|
||||
parsed = set(
|
||||
RepositoryId(architecture, repository)
|
||||
for repository in repositories
|
||||
for architecture in RepositoryPaths.known_architectures(root, repository)
|
||||
)
|
||||
|
||||
if not parsed:
|
||||
if not repositories:
|
||||
raise MissingArchitectureError(args.command)
|
||||
return sorted(parsed)
|
||||
return sorted(repositories)
|
||||
|
||||
@@ -17,14 +17,12 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import argparse
|
||||
|
||||
from dataclasses import replace
|
||||
from sqlite3 import Connection
|
||||
|
||||
from ahriman.application.handlers.handler import Handler
|
||||
from ahriman.core.alpm.pacman import Pacman
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.repository import Explorer
|
||||
from ahriman.core.sign.gpg import GPG
|
||||
from ahriman.core.utils import atomic_move, package_like, symlink_relative
|
||||
from ahriman.models.package import Package
|
||||
@@ -45,10 +43,7 @@ def migrate_data(connection: Connection, configuration: Configuration) -> None:
|
||||
"""
|
||||
del connection
|
||||
|
||||
config_path, _ = configuration.check_loaded()
|
||||
args = argparse.Namespace(configuration=config_path, architecture=None, repository=None, repository_id=None)
|
||||
|
||||
for repository_id in Handler.repositories_extract(args):
|
||||
for repository_id in Explorer.repositories_extract(configuration):
|
||||
paths = replace(configuration.repository_paths, repository_id=repository_id)
|
||||
pacman = Pacman(repository_id, configuration, refresh_database=PacmanSynchronization.Disabled)
|
||||
|
||||
|
||||
@@ -17,4 +17,5 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from ahriman.core.repository.explorer import Explorer
|
||||
from ahriman.core.repository.repository import Repository
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
import shutil
|
||||
|
||||
from collections.abc import Iterable
|
||||
from filelock import FileLock
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
@@ -28,7 +27,7 @@ from ahriman.core.build_tools.package_archive import PackageArchive
|
||||
from ahriman.core.build_tools.task import Task
|
||||
from ahriman.core.repository.cleaner import Cleaner
|
||||
from ahriman.core.repository.package_info import PackageInfo
|
||||
from ahriman.core.utils import atomic_move, list_flatmap, package_like, safe_filename, symlink_relative
|
||||
from ahriman.core.utils import atomic_move, filelock, list_flatmap, package_like, safe_filename, symlink_relative
|
||||
from ahriman.models.changes import Changes
|
||||
from ahriman.models.event import EventType
|
||||
from ahriman.models.package import Package
|
||||
@@ -112,7 +111,7 @@ class Executor(PackageInfo, Cleaner):
|
||||
self.logger.info("using prebuilt packages for %s-%s", loaded_package.base, loaded_package.version)
|
||||
built = []
|
||||
for artifact in prebuilt:
|
||||
with FileLock(artifact.with_name(f".{artifact.name}.lock")):
|
||||
with filelock(artifact):
|
||||
shutil.copy(artifact, path)
|
||||
built.append(path / artifact.name)
|
||||
else:
|
||||
|
||||
70
src/ahriman/core/repository/explorer.py
Normal file
70
src/ahriman/core/repository/explorer.py
Normal file
@@ -0,0 +1,70 @@
|
||||
#
|
||||
# Copyright (c) 2021-2026 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from collections.abc import Iterable
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
|
||||
class Explorer:
|
||||
"""
|
||||
helper to read filesystem and find created repositories
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def repositories_extract(configuration: Configuration, repository: str | None = None,
|
||||
architecture: str | None = None) -> list[RepositoryId]:
|
||||
"""
|
||||
get known architectures
|
||||
|
||||
Args:
|
||||
configuration(Configuration): configuration instance
|
||||
repository(str | None, optional): predefined repository name if available (Default value = None)
|
||||
architecture(str | None, optional): predefined repository architecture if available (Default value = None)
|
||||
|
||||
Returns:
|
||||
list[RepositoryId]: list of repository names and architectures for which tree is created
|
||||
"""
|
||||
# pylint, wtf???
|
||||
root = configuration.getpath("repository", "root") # pylint: disable=assignment-from-no-return
|
||||
|
||||
# extract repository names first
|
||||
if repository is not None:
|
||||
repositories: Iterable[str] = [repository]
|
||||
elif from_filesystem := RepositoryPaths.known_repositories(root):
|
||||
repositories = from_filesystem
|
||||
else: # try to read configuration now
|
||||
repositories = [configuration.get("repository", "name")]
|
||||
|
||||
# extract architecture names
|
||||
if architecture is not None:
|
||||
parsed = set(
|
||||
RepositoryId(architecture, repository)
|
||||
for repository in repositories
|
||||
)
|
||||
else: # try to read from file system
|
||||
parsed = set(
|
||||
RepositoryId(architecture, repository)
|
||||
for repository in repositories
|
||||
for architecture in RepositoryPaths.known_architectures(root, repository)
|
||||
)
|
||||
|
||||
return sorted(parsed)
|
||||
@@ -18,6 +18,7 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
# pylint: disable=too-many-lines
|
||||
import contextlib
|
||||
import datetime
|
||||
import io
|
||||
import itertools
|
||||
@@ -47,6 +48,7 @@ __all__ = [
|
||||
"dataclass_view",
|
||||
"enum_values",
|
||||
"extract_user",
|
||||
"filelock",
|
||||
"filter_json",
|
||||
"full_version",
|
||||
"list_flatmap",
|
||||
@@ -87,7 +89,7 @@ def atomic_move(src: Path, dst: Path) -> None:
|
||||
|
||||
>>> atomic_move(src, dst)
|
||||
"""
|
||||
with FileLock(dst.with_name(f".{dst.name}.lock")):
|
||||
with filelock(dst):
|
||||
shutil.move(src, dst)
|
||||
|
||||
|
||||
@@ -262,6 +264,25 @@ def extract_user() -> str | None:
|
||||
return os.getenv("SUDO_USER") or os.getenv("DOAS_USER") or os.getenv("USER")
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def filelock(path: Path) -> Iterator[FileLock]:
|
||||
"""
|
||||
wrapper around :class:`filelock.FileLock`, which also removes locks afterward
|
||||
|
||||
Args:
|
||||
path(Path): path to lock on. The lock file will be created as ``.{path.name}.lock``
|
||||
|
||||
Yields:
|
||||
FileLock: acquired file lock instance
|
||||
"""
|
||||
lock_path = path.with_name(f".{path.name}.lock")
|
||||
try:
|
||||
with FileLock(lock_path) as lock:
|
||||
yield lock
|
||||
finally:
|
||||
lock_path.unlink(missing_ok=True)
|
||||
|
||||
|
||||
def filter_json(source: dict[str, Any], known_fields: Iterable[str]) -> dict[str, Any]:
|
||||
"""
|
||||
filter json object by fields used for json-to-object conversion
|
||||
|
||||
@@ -145,63 +145,11 @@ def test_repositories_extract(args: argparse.Namespace, configuration: Configura
|
||||
args.configuration = configuration.path
|
||||
args.repository = "repo"
|
||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
||||
extract_mock = mocker.patch("ahriman.core.repository.Explorer.repositories_extract",
|
||||
return_value=[RepositoryId("arch", "repo")])
|
||||
|
||||
assert Handler.repositories_extract(args) == [RepositoryId("arch", "repo")]
|
||||
known_architectures_mock.assert_not_called()
|
||||
known_repositories_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_repositories_extract_repository(args: argparse.Namespace, configuration: Configuration,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must generate list of available repositories based on flags and tree
|
||||
"""
|
||||
args.architecture = "arch"
|
||||
args.configuration = configuration.path
|
||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories",
|
||||
return_value={"repo"})
|
||||
|
||||
assert Handler.repositories_extract(args) == [RepositoryId("arch", "repo")]
|
||||
known_architectures_mock.assert_not_called()
|
||||
known_repositories_mock.assert_called_once_with(configuration.repository_paths.root)
|
||||
|
||||
|
||||
def test_repositories_extract_repository_legacy(args: argparse.Namespace, configuration: Configuration,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must generate list of available repositories based on flags and tree (legacy mode)
|
||||
"""
|
||||
args.architecture = "arch"
|
||||
args.configuration = configuration.path
|
||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories",
|
||||
return_value=set())
|
||||
|
||||
assert Handler.repositories_extract(args) == [RepositoryId("arch", "aur")]
|
||||
known_architectures_mock.assert_not_called()
|
||||
known_repositories_mock.assert_called_once_with(configuration.repository_paths.root)
|
||||
|
||||
|
||||
def test_repositories_extract_architecture(args: argparse.Namespace, configuration: Configuration,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must read repository name from config
|
||||
"""
|
||||
args.configuration = configuration.path
|
||||
args.repository = "repo"
|
||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures",
|
||||
return_value={"arch"})
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
||||
|
||||
assert Handler.repositories_extract(args) == [RepositoryId("arch", "repo")]
|
||||
known_architectures_mock.assert_called_once_with(configuration.repository_paths.root, "repo")
|
||||
known_repositories_mock.assert_not_called()
|
||||
extract_mock.assert_called_once_with(pytest.helpers.anyvar(Configuration, True), args.repository, args.architecture)
|
||||
|
||||
|
||||
def test_repositories_extract_empty(args: argparse.Namespace, configuration: Configuration,
|
||||
@@ -212,8 +160,7 @@ def test_repositories_extract_empty(args: argparse.Namespace, configuration: Con
|
||||
args.command = "config"
|
||||
args.configuration = configuration.path
|
||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures", return_value=set())
|
||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories", return_value=set())
|
||||
mocker.patch("ahriman.core.repository.Explorer.repositories_extract", return_value=[])
|
||||
|
||||
with pytest.raises(MissingArchitectureError):
|
||||
Handler.repositories_extract(args)
|
||||
@@ -227,12 +174,11 @@ def test_repositories_extract_systemd(args: argparse.Namespace, configuration: C
|
||||
args.configuration = configuration.path
|
||||
args.repository_id = "i686/some/repo/name"
|
||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
||||
extract_mock = mocker.patch("ahriman.core.repository.Explorer.repositories_extract",
|
||||
return_value=[RepositoryId("i686", "some-repo-name")])
|
||||
|
||||
assert Handler.repositories_extract(args) == [RepositoryId("i686", "some-repo-name")]
|
||||
known_architectures_mock.assert_not_called()
|
||||
known_repositories_mock.assert_not_called()
|
||||
extract_mock.assert_called_once_with(pytest.helpers.anyvar(Configuration, True), "some-repo-name", "i686")
|
||||
|
||||
|
||||
def test_repositories_extract_systemd_with_dash(args: argparse.Namespace, configuration: Configuration,
|
||||
@@ -243,12 +189,11 @@ def test_repositories_extract_systemd_with_dash(args: argparse.Namespace, config
|
||||
args.configuration = configuration.path
|
||||
args.repository_id = "i686-some-repo-name"
|
||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
||||
extract_mock = mocker.patch("ahriman.core.repository.Explorer.repositories_extract",
|
||||
return_value=[RepositoryId("i686", "some-repo-name")])
|
||||
|
||||
assert Handler.repositories_extract(args) == [RepositoryId("i686", "some-repo-name")]
|
||||
known_architectures_mock.assert_not_called()
|
||||
known_repositories_mock.assert_not_called()
|
||||
extract_mock.assert_called_once_with(pytest.helpers.anyvar(Configuration, True), "some-repo-name", "i686")
|
||||
|
||||
|
||||
def test_repositories_extract_systemd_legacy(args: argparse.Namespace, configuration: Configuration,
|
||||
@@ -259,10 +204,8 @@ def test_repositories_extract_systemd_legacy(args: argparse.Namespace, configura
|
||||
args.configuration = configuration.path
|
||||
args.repository_id = "i686"
|
||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories",
|
||||
return_value=set())
|
||||
extract_mock = mocker.patch("ahriman.core.repository.Explorer.repositories_extract",
|
||||
return_value=[RepositoryId("i686", "aur")])
|
||||
|
||||
assert Handler.repositories_extract(args) == [RepositoryId("i686", "aur")]
|
||||
known_architectures_mock.assert_not_called()
|
||||
known_repositories_mock.assert_called_once_with(configuration.repository_paths.root)
|
||||
extract_mock.assert_called_once_with(pytest.helpers.anyvar(Configuration, True), None, "i686")
|
||||
|
||||
@@ -23,7 +23,7 @@ def test_migrate_data(connection: Connection, configuration: Configuration, mock
|
||||
repository_id,
|
||||
replace(repository_id, architecture="i686"),
|
||||
]
|
||||
mocker.patch("ahriman.application.handlers.handler.Handler.repositories_extract", return_value=repositories)
|
||||
mocker.patch("ahriman.core.repository.Explorer.repositories_extract", return_value=repositories)
|
||||
migration_mock = mocker.patch("ahriman.core.database.migrations.m016_archive.move_packages")
|
||||
|
||||
migrate_data(connection, configuration)
|
||||
|
||||
56
tests/ahriman/core/repository/test_explorer.py
Normal file
56
tests/ahriman/core/repository/test_explorer.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.repository import Explorer
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
|
||||
|
||||
def test_repositories_extract(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must generate list of available repositories based on arguments
|
||||
"""
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
||||
|
||||
assert Explorer.repositories_extract(configuration, "repo", "arch") == [RepositoryId("arch", "repo")]
|
||||
known_architectures_mock.assert_not_called()
|
||||
known_repositories_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_repositories_extract_repository(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must generate list of available repositories based on arguments and tree
|
||||
"""
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories",
|
||||
return_value={"repo"})
|
||||
|
||||
assert Explorer.repositories_extract(configuration, architecture="arch") == [RepositoryId("arch", "repo")]
|
||||
known_architectures_mock.assert_not_called()
|
||||
known_repositories_mock.assert_called_once_with(configuration.repository_paths.root)
|
||||
|
||||
|
||||
def test_repositories_extract_repository_legacy(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must generate list of available repositories based on arguments and tree (legacy mode)
|
||||
"""
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories",
|
||||
return_value=set())
|
||||
|
||||
assert Explorer.repositories_extract(configuration, architecture="arch") == [RepositoryId("arch", "aur")]
|
||||
known_architectures_mock.assert_not_called()
|
||||
known_repositories_mock.assert_called_once_with(configuration.repository_paths.root)
|
||||
|
||||
|
||||
def test_repositories_extract_architecture(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must read repository name from config
|
||||
"""
|
||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures",
|
||||
return_value={"arch"})
|
||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
||||
|
||||
assert Explorer.repositories_extract(configuration, repository="repo") == [RepositoryId("arch", "repo")]
|
||||
known_architectures_mock.assert_called_once_with(configuration.repository_paths.root, "repo")
|
||||
known_repositories_mock.assert_not_called()
|
||||
@@ -20,11 +20,11 @@ def test_atomic_move(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must move file with locking
|
||||
"""
|
||||
filelock_mock = mocker.patch("ahriman.core.utils.FileLock")
|
||||
filelock_mock = mocker.patch("ahriman.core.utils.filelock")
|
||||
move_mock = mocker.patch("shutil.move")
|
||||
|
||||
atomic_move(Path("source"), Path("destination"))
|
||||
filelock_mock.assert_called_once_with(Path(".destination.lock"))
|
||||
filelock_mock.assert_called_once_with(Path("destination"))
|
||||
move_mock.assert_called_once_with(Path("source"), Path("destination"))
|
||||
|
||||
|
||||
@@ -247,6 +247,30 @@ def test_extract_user() -> None:
|
||||
assert extract_user() == "doas"
|
||||
|
||||
|
||||
def test_filelock(tmp_path: Path) -> None:
|
||||
"""
|
||||
must acquire lock and remove lock file after
|
||||
"""
|
||||
local = tmp_path / "local"
|
||||
lock = local.with_name(f".{local.name}.lock")
|
||||
|
||||
with filelock(local):
|
||||
assert lock.exists()
|
||||
assert not lock.exists()
|
||||
|
||||
|
||||
def test_filelock_cleanup_on_missing(tmp_path: Path) -> None:
|
||||
"""
|
||||
must not fail if lock file is already removed
|
||||
"""
|
||||
local = tmp_path / "local"
|
||||
lock = local.with_name(f".{local.name}.lock")
|
||||
|
||||
with filelock(local):
|
||||
lock.unlink(missing_ok=True)
|
||||
assert not lock.exists()
|
||||
|
||||
|
||||
def test_filter_json(package_ahriman: Package) -> None:
|
||||
"""
|
||||
must filter fields by known list
|
||||
|
||||
Reference in New Issue
Block a user