mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-04-07 19:03:38 +00:00
Compare commits
2 Commits
872a119bea
...
51431791df
| Author | SHA1 | Date | |
|---|---|---|---|
| 51431791df | |||
| d26525d1d3 |
@@ -20,7 +20,7 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from collections.abc import Callable, Iterable
|
from collections.abc import Callable
|
||||||
from multiprocessing import Pool
|
from multiprocessing import Pool
|
||||||
from typing import ClassVar, TypeVar
|
from typing import ClassVar, TypeVar
|
||||||
|
|
||||||
@@ -28,9 +28,9 @@ from ahriman.application.lock import Lock
|
|||||||
from ahriman.core.configuration import Configuration
|
from ahriman.core.configuration import Configuration
|
||||||
from ahriman.core.exceptions import ExitCode, MissingArchitectureError, MultipleArchitecturesError
|
from ahriman.core.exceptions import ExitCode, MissingArchitectureError, MultipleArchitecturesError
|
||||||
from ahriman.core.log.log_loader import LogLoader
|
from ahriman.core.log.log_loader import LogLoader
|
||||||
|
from ahriman.core.repository import Explorer
|
||||||
from ahriman.core.types import ExplicitBool
|
from ahriman.core.types import ExplicitBool
|
||||||
from ahriman.models.repository_id import RepositoryId
|
from ahriman.models.repository_id import RepositoryId
|
||||||
from ahriman.models.repository_paths import RepositoryPaths
|
|
||||||
|
|
||||||
|
|
||||||
# this workaround is for several things
|
# this workaround is for several things
|
||||||
@@ -169,11 +169,6 @@ class Handler:
|
|||||||
Raises:
|
Raises:
|
||||||
MissingArchitectureError: if no architecture set and automatic detection is not allowed or failed
|
MissingArchitectureError: if no architecture set and automatic detection is not allowed or failed
|
||||||
"""
|
"""
|
||||||
configuration = Configuration()
|
|
||||||
configuration.load(args.configuration)
|
|
||||||
# pylint, wtf???
|
|
||||||
root = configuration.getpath("repository", "root") # pylint: disable=assignment-from-no-return
|
|
||||||
|
|
||||||
# preparse systemd repository-id argument
|
# preparse systemd repository-id argument
|
||||||
# we are using unescaped values, so / is not allowed here, because it is impossible to separate if from dashes
|
# we are using unescaped values, so / is not allowed here, because it is impossible to separate if from dashes
|
||||||
if args.repository_id is not None:
|
if args.repository_id is not None:
|
||||||
@@ -184,27 +179,10 @@ class Handler:
|
|||||||
if repository_parts:
|
if repository_parts:
|
||||||
args.repository = "-".join(repository_parts) # replace slash with dash
|
args.repository = "-".join(repository_parts) # replace slash with dash
|
||||||
|
|
||||||
# extract repository names first
|
configuration = Configuration()
|
||||||
if (from_args := args.repository) is not None:
|
configuration.load(args.configuration)
|
||||||
repositories: Iterable[str] = [from_args]
|
repositories = Explorer.repositories_extract(configuration, args.repository, args.architecture)
|
||||||
elif from_filesystem := RepositoryPaths.known_repositories(root):
|
|
||||||
repositories = from_filesystem
|
|
||||||
else: # try to read configuration now
|
|
||||||
repositories = [configuration.get("repository", "name")]
|
|
||||||
|
|
||||||
# extract architecture names
|
if not repositories:
|
||||||
if (architecture := args.architecture) is not None:
|
|
||||||
parsed = set(
|
|
||||||
RepositoryId(architecture, repository)
|
|
||||||
for repository in repositories
|
|
||||||
)
|
|
||||||
else: # try to read from file system
|
|
||||||
parsed = set(
|
|
||||||
RepositoryId(architecture, repository)
|
|
||||||
for repository in repositories
|
|
||||||
for architecture in RepositoryPaths.known_architectures(root, repository)
|
|
||||||
)
|
|
||||||
|
|
||||||
if not parsed:
|
|
||||||
raise MissingArchitectureError(args.command)
|
raise MissingArchitectureError(args.command)
|
||||||
return sorted(parsed)
|
return sorted(repositories)
|
||||||
|
|||||||
@@ -17,14 +17,12 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
#
|
#
|
||||||
import argparse
|
|
||||||
|
|
||||||
from dataclasses import replace
|
from dataclasses import replace
|
||||||
from sqlite3 import Connection
|
from sqlite3 import Connection
|
||||||
|
|
||||||
from ahriman.application.handlers.handler import Handler
|
|
||||||
from ahriman.core.alpm.pacman import Pacman
|
from ahriman.core.alpm.pacman import Pacman
|
||||||
from ahriman.core.configuration import Configuration
|
from ahriman.core.configuration import Configuration
|
||||||
|
from ahriman.core.repository import Explorer
|
||||||
from ahriman.core.sign.gpg import GPG
|
from ahriman.core.sign.gpg import GPG
|
||||||
from ahriman.core.utils import atomic_move, package_like, symlink_relative
|
from ahriman.core.utils import atomic_move, package_like, symlink_relative
|
||||||
from ahriman.models.package import Package
|
from ahriman.models.package import Package
|
||||||
@@ -45,10 +43,7 @@ def migrate_data(connection: Connection, configuration: Configuration) -> None:
|
|||||||
"""
|
"""
|
||||||
del connection
|
del connection
|
||||||
|
|
||||||
config_path, _ = configuration.check_loaded()
|
for repository_id in Explorer.repositories_extract(configuration):
|
||||||
args = argparse.Namespace(configuration=config_path, architecture=None, repository=None, repository_id=None)
|
|
||||||
|
|
||||||
for repository_id in Handler.repositories_extract(args):
|
|
||||||
paths = replace(configuration.repository_paths, repository_id=repository_id)
|
paths = replace(configuration.repository_paths, repository_id=repository_id)
|
||||||
pacman = Pacman(repository_id, configuration, refresh_database=PacmanSynchronization.Disabled)
|
pacman = Pacman(repository_id, configuration, refresh_database=PacmanSynchronization.Disabled)
|
||||||
|
|
||||||
|
|||||||
@@ -17,4 +17,5 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
#
|
#
|
||||||
|
from ahriman.core.repository.explorer import Explorer
|
||||||
from ahriman.core.repository.repository import Repository
|
from ahriman.core.repository.repository import Repository
|
||||||
|
|||||||
@@ -20,7 +20,6 @@
|
|||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
from collections.abc import Iterable
|
from collections.abc import Iterable
|
||||||
from filelock import FileLock
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
@@ -28,7 +27,7 @@ from ahriman.core.build_tools.package_archive import PackageArchive
|
|||||||
from ahriman.core.build_tools.task import Task
|
from ahriman.core.build_tools.task import Task
|
||||||
from ahriman.core.repository.cleaner import Cleaner
|
from ahriman.core.repository.cleaner import Cleaner
|
||||||
from ahriman.core.repository.package_info import PackageInfo
|
from ahriman.core.repository.package_info import PackageInfo
|
||||||
from ahriman.core.utils import atomic_move, list_flatmap, package_like, safe_filename, symlink_relative
|
from ahriman.core.utils import atomic_move, filelock, list_flatmap, package_like, safe_filename, symlink_relative
|
||||||
from ahriman.models.changes import Changes
|
from ahriman.models.changes import Changes
|
||||||
from ahriman.models.event import EventType
|
from ahriman.models.event import EventType
|
||||||
from ahriman.models.package import Package
|
from ahriman.models.package import Package
|
||||||
@@ -112,7 +111,7 @@ class Executor(PackageInfo, Cleaner):
|
|||||||
self.logger.info("using prebuilt packages for %s-%s", loaded_package.base, loaded_package.version)
|
self.logger.info("using prebuilt packages for %s-%s", loaded_package.base, loaded_package.version)
|
||||||
built = []
|
built = []
|
||||||
for artifact in prebuilt:
|
for artifact in prebuilt:
|
||||||
with FileLock(artifact.with_name(f".{artifact.name}.lock")):
|
with filelock(artifact):
|
||||||
shutil.copy(artifact, path)
|
shutil.copy(artifact, path)
|
||||||
built.append(path / artifact.name)
|
built.append(path / artifact.name)
|
||||||
else:
|
else:
|
||||||
|
|||||||
70
src/ahriman/core/repository/explorer.py
Normal file
70
src/ahriman/core/repository/explorer.py
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2021-2026 ahriman team.
|
||||||
|
#
|
||||||
|
# This file is part of ahriman
|
||||||
|
# (see https://github.com/arcan1s/ahriman).
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
from collections.abc import Iterable
|
||||||
|
|
||||||
|
from ahriman.core.configuration import Configuration
|
||||||
|
from ahriman.models.repository_id import RepositoryId
|
||||||
|
from ahriman.models.repository_paths import RepositoryPaths
|
||||||
|
|
||||||
|
|
||||||
|
class Explorer:
|
||||||
|
"""
|
||||||
|
helper to read filesystem and find created repositories
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def repositories_extract(configuration: Configuration, repository: str | None = None,
|
||||||
|
architecture: str | None = None) -> list[RepositoryId]:
|
||||||
|
"""
|
||||||
|
get known architectures
|
||||||
|
|
||||||
|
Args:
|
||||||
|
configuration(Configuration): configuration instance
|
||||||
|
repository(str | None, optional): predefined repository name if available (Default value = None)
|
||||||
|
architecture(str | None, optional): predefined repository architecture if available (Default value = None)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[RepositoryId]: list of repository names and architectures for which tree is created
|
||||||
|
"""
|
||||||
|
# pylint, wtf???
|
||||||
|
root = configuration.getpath("repository", "root") # pylint: disable=assignment-from-no-return
|
||||||
|
|
||||||
|
# extract repository names first
|
||||||
|
if repository is not None:
|
||||||
|
repositories: Iterable[str] = [repository]
|
||||||
|
elif from_filesystem := RepositoryPaths.known_repositories(root):
|
||||||
|
repositories = from_filesystem
|
||||||
|
else: # try to read configuration now
|
||||||
|
repositories = [configuration.get("repository", "name")]
|
||||||
|
|
||||||
|
# extract architecture names
|
||||||
|
if architecture is not None:
|
||||||
|
parsed = set(
|
||||||
|
RepositoryId(architecture, repository)
|
||||||
|
for repository in repositories
|
||||||
|
)
|
||||||
|
else: # try to read from file system
|
||||||
|
parsed = set(
|
||||||
|
RepositoryId(architecture, repository)
|
||||||
|
for repository in repositories
|
||||||
|
for architecture in RepositoryPaths.known_architectures(root, repository)
|
||||||
|
)
|
||||||
|
|
||||||
|
return sorted(parsed)
|
||||||
@@ -18,6 +18,7 @@
|
|||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
#
|
#
|
||||||
# pylint: disable=too-many-lines
|
# pylint: disable=too-many-lines
|
||||||
|
import contextlib
|
||||||
import datetime
|
import datetime
|
||||||
import io
|
import io
|
||||||
import itertools
|
import itertools
|
||||||
@@ -47,6 +48,7 @@ __all__ = [
|
|||||||
"dataclass_view",
|
"dataclass_view",
|
||||||
"enum_values",
|
"enum_values",
|
||||||
"extract_user",
|
"extract_user",
|
||||||
|
"filelock",
|
||||||
"filter_json",
|
"filter_json",
|
||||||
"full_version",
|
"full_version",
|
||||||
"list_flatmap",
|
"list_flatmap",
|
||||||
@@ -87,7 +89,7 @@ def atomic_move(src: Path, dst: Path) -> None:
|
|||||||
|
|
||||||
>>> atomic_move(src, dst)
|
>>> atomic_move(src, dst)
|
||||||
"""
|
"""
|
||||||
with FileLock(dst.with_name(f".{dst.name}.lock")):
|
with filelock(dst):
|
||||||
shutil.move(src, dst)
|
shutil.move(src, dst)
|
||||||
|
|
||||||
|
|
||||||
@@ -262,6 +264,25 @@ def extract_user() -> str | None:
|
|||||||
return os.getenv("SUDO_USER") or os.getenv("DOAS_USER") or os.getenv("USER")
|
return os.getenv("SUDO_USER") or os.getenv("DOAS_USER") or os.getenv("USER")
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def filelock(path: Path) -> Iterator[FileLock]:
|
||||||
|
"""
|
||||||
|
wrapper around :class:`filelock.FileLock`, which also removes locks afterward
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path(Path): path to lock on. The lock file will be created as ``.{path.name}.lock``
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
FileLock: acquired file lock instance
|
||||||
|
"""
|
||||||
|
lock_path = path.with_name(f".{path.name}.lock")
|
||||||
|
try:
|
||||||
|
with FileLock(lock_path) as lock:
|
||||||
|
yield lock
|
||||||
|
finally:
|
||||||
|
lock_path.unlink(missing_ok=True)
|
||||||
|
|
||||||
|
|
||||||
def filter_json(source: dict[str, Any], known_fields: Iterable[str]) -> dict[str, Any]:
|
def filter_json(source: dict[str, Any], known_fields: Iterable[str]) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
filter json object by fields used for json-to-object conversion
|
filter json object by fields used for json-to-object conversion
|
||||||
|
|||||||
@@ -145,63 +145,11 @@ def test_repositories_extract(args: argparse.Namespace, configuration: Configura
|
|||||||
args.configuration = configuration.path
|
args.configuration = configuration.path
|
||||||
args.repository = "repo"
|
args.repository = "repo"
|
||||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
extract_mock = mocker.patch("ahriman.core.repository.Explorer.repositories_extract",
|
||||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
return_value=[RepositoryId("arch", "repo")])
|
||||||
|
|
||||||
assert Handler.repositories_extract(args) == [RepositoryId("arch", "repo")]
|
assert Handler.repositories_extract(args) == [RepositoryId("arch", "repo")]
|
||||||
known_architectures_mock.assert_not_called()
|
extract_mock.assert_called_once_with(pytest.helpers.anyvar(Configuration, True), args.repository, args.architecture)
|
||||||
known_repositories_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_repositories_extract_repository(args: argparse.Namespace, configuration: Configuration,
|
|
||||||
mocker: MockerFixture) -> None:
|
|
||||||
"""
|
|
||||||
must generate list of available repositories based on flags and tree
|
|
||||||
"""
|
|
||||||
args.architecture = "arch"
|
|
||||||
args.configuration = configuration.path
|
|
||||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
|
||||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
|
||||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories",
|
|
||||||
return_value={"repo"})
|
|
||||||
|
|
||||||
assert Handler.repositories_extract(args) == [RepositoryId("arch", "repo")]
|
|
||||||
known_architectures_mock.assert_not_called()
|
|
||||||
known_repositories_mock.assert_called_once_with(configuration.repository_paths.root)
|
|
||||||
|
|
||||||
|
|
||||||
def test_repositories_extract_repository_legacy(args: argparse.Namespace, configuration: Configuration,
|
|
||||||
mocker: MockerFixture) -> None:
|
|
||||||
"""
|
|
||||||
must generate list of available repositories based on flags and tree (legacy mode)
|
|
||||||
"""
|
|
||||||
args.architecture = "arch"
|
|
||||||
args.configuration = configuration.path
|
|
||||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
|
||||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
|
||||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories",
|
|
||||||
return_value=set())
|
|
||||||
|
|
||||||
assert Handler.repositories_extract(args) == [RepositoryId("arch", "aur")]
|
|
||||||
known_architectures_mock.assert_not_called()
|
|
||||||
known_repositories_mock.assert_called_once_with(configuration.repository_paths.root)
|
|
||||||
|
|
||||||
|
|
||||||
def test_repositories_extract_architecture(args: argparse.Namespace, configuration: Configuration,
|
|
||||||
mocker: MockerFixture) -> None:
|
|
||||||
"""
|
|
||||||
must read repository name from config
|
|
||||||
"""
|
|
||||||
args.configuration = configuration.path
|
|
||||||
args.repository = "repo"
|
|
||||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
|
||||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures",
|
|
||||||
return_value={"arch"})
|
|
||||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
|
||||||
|
|
||||||
assert Handler.repositories_extract(args) == [RepositoryId("arch", "repo")]
|
|
||||||
known_architectures_mock.assert_called_once_with(configuration.repository_paths.root, "repo")
|
|
||||||
known_repositories_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_repositories_extract_empty(args: argparse.Namespace, configuration: Configuration,
|
def test_repositories_extract_empty(args: argparse.Namespace, configuration: Configuration,
|
||||||
@@ -212,8 +160,7 @@ def test_repositories_extract_empty(args: argparse.Namespace, configuration: Con
|
|||||||
args.command = "config"
|
args.command = "config"
|
||||||
args.configuration = configuration.path
|
args.configuration = configuration.path
|
||||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures", return_value=set())
|
mocker.patch("ahriman.core.repository.Explorer.repositories_extract", return_value=[])
|
||||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories", return_value=set())
|
|
||||||
|
|
||||||
with pytest.raises(MissingArchitectureError):
|
with pytest.raises(MissingArchitectureError):
|
||||||
Handler.repositories_extract(args)
|
Handler.repositories_extract(args)
|
||||||
@@ -227,12 +174,11 @@ def test_repositories_extract_systemd(args: argparse.Namespace, configuration: C
|
|||||||
args.configuration = configuration.path
|
args.configuration = configuration.path
|
||||||
args.repository_id = "i686/some/repo/name"
|
args.repository_id = "i686/some/repo/name"
|
||||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
extract_mock = mocker.patch("ahriman.core.repository.Explorer.repositories_extract",
|
||||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
return_value=[RepositoryId("i686", "some-repo-name")])
|
||||||
|
|
||||||
assert Handler.repositories_extract(args) == [RepositoryId("i686", "some-repo-name")]
|
assert Handler.repositories_extract(args) == [RepositoryId("i686", "some-repo-name")]
|
||||||
known_architectures_mock.assert_not_called()
|
extract_mock.assert_called_once_with(pytest.helpers.anyvar(Configuration, True), "some-repo-name", "i686")
|
||||||
known_repositories_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_repositories_extract_systemd_with_dash(args: argparse.Namespace, configuration: Configuration,
|
def test_repositories_extract_systemd_with_dash(args: argparse.Namespace, configuration: Configuration,
|
||||||
@@ -243,12 +189,11 @@ def test_repositories_extract_systemd_with_dash(args: argparse.Namespace, config
|
|||||||
args.configuration = configuration.path
|
args.configuration = configuration.path
|
||||||
args.repository_id = "i686-some-repo-name"
|
args.repository_id = "i686-some-repo-name"
|
||||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
extract_mock = mocker.patch("ahriman.core.repository.Explorer.repositories_extract",
|
||||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
return_value=[RepositoryId("i686", "some-repo-name")])
|
||||||
|
|
||||||
assert Handler.repositories_extract(args) == [RepositoryId("i686", "some-repo-name")]
|
assert Handler.repositories_extract(args) == [RepositoryId("i686", "some-repo-name")]
|
||||||
known_architectures_mock.assert_not_called()
|
extract_mock.assert_called_once_with(pytest.helpers.anyvar(Configuration, True), "some-repo-name", "i686")
|
||||||
known_repositories_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_repositories_extract_systemd_legacy(args: argparse.Namespace, configuration: Configuration,
|
def test_repositories_extract_systemd_legacy(args: argparse.Namespace, configuration: Configuration,
|
||||||
@@ -259,10 +204,8 @@ def test_repositories_extract_systemd_legacy(args: argparse.Namespace, configura
|
|||||||
args.configuration = configuration.path
|
args.configuration = configuration.path
|
||||||
args.repository_id = "i686"
|
args.repository_id = "i686"
|
||||||
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
mocker.patch("ahriman.core.configuration.Configuration.load", new=lambda self, _: self.copy_from(configuration))
|
||||||
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
extract_mock = mocker.patch("ahriman.core.repository.Explorer.repositories_extract",
|
||||||
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories",
|
return_value=[RepositoryId("i686", "aur")])
|
||||||
return_value=set())
|
|
||||||
|
|
||||||
assert Handler.repositories_extract(args) == [RepositoryId("i686", "aur")]
|
assert Handler.repositories_extract(args) == [RepositoryId("i686", "aur")]
|
||||||
known_architectures_mock.assert_not_called()
|
extract_mock.assert_called_once_with(pytest.helpers.anyvar(Configuration, True), None, "i686")
|
||||||
known_repositories_mock.assert_called_once_with(configuration.repository_paths.root)
|
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ def test_migrate_data(connection: Connection, configuration: Configuration, mock
|
|||||||
repository_id,
|
repository_id,
|
||||||
replace(repository_id, architecture="i686"),
|
replace(repository_id, architecture="i686"),
|
||||||
]
|
]
|
||||||
mocker.patch("ahriman.application.handlers.handler.Handler.repositories_extract", return_value=repositories)
|
mocker.patch("ahriman.core.repository.Explorer.repositories_extract", return_value=repositories)
|
||||||
migration_mock = mocker.patch("ahriman.core.database.migrations.m016_archive.move_packages")
|
migration_mock = mocker.patch("ahriman.core.database.migrations.m016_archive.move_packages")
|
||||||
|
|
||||||
migrate_data(connection, configuration)
|
migrate_data(connection, configuration)
|
||||||
|
|||||||
56
tests/ahriman/core/repository/test_explorer.py
Normal file
56
tests/ahriman/core/repository/test_explorer.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
from pytest_mock import MockerFixture
|
||||||
|
|
||||||
|
from ahriman.core.configuration import Configuration
|
||||||
|
from ahriman.core.repository import Explorer
|
||||||
|
from ahriman.models.repository_id import RepositoryId
|
||||||
|
|
||||||
|
|
||||||
|
def test_repositories_extract(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must generate list of available repositories based on arguments
|
||||||
|
"""
|
||||||
|
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||||
|
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
||||||
|
|
||||||
|
assert Explorer.repositories_extract(configuration, "repo", "arch") == [RepositoryId("arch", "repo")]
|
||||||
|
known_architectures_mock.assert_not_called()
|
||||||
|
known_repositories_mock.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_repositories_extract_repository(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must generate list of available repositories based on arguments and tree
|
||||||
|
"""
|
||||||
|
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||||
|
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories",
|
||||||
|
return_value={"repo"})
|
||||||
|
|
||||||
|
assert Explorer.repositories_extract(configuration, architecture="arch") == [RepositoryId("arch", "repo")]
|
||||||
|
known_architectures_mock.assert_not_called()
|
||||||
|
known_repositories_mock.assert_called_once_with(configuration.repository_paths.root)
|
||||||
|
|
||||||
|
|
||||||
|
def test_repositories_extract_repository_legacy(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must generate list of available repositories based on arguments and tree (legacy mode)
|
||||||
|
"""
|
||||||
|
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures")
|
||||||
|
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories",
|
||||||
|
return_value=set())
|
||||||
|
|
||||||
|
assert Explorer.repositories_extract(configuration, architecture="arch") == [RepositoryId("arch", "aur")]
|
||||||
|
known_architectures_mock.assert_not_called()
|
||||||
|
known_repositories_mock.assert_called_once_with(configuration.repository_paths.root)
|
||||||
|
|
||||||
|
|
||||||
|
def test_repositories_extract_architecture(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must read repository name from config
|
||||||
|
"""
|
||||||
|
known_architectures_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_architectures",
|
||||||
|
return_value={"arch"})
|
||||||
|
known_repositories_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.known_repositories")
|
||||||
|
|
||||||
|
assert Explorer.repositories_extract(configuration, repository="repo") == [RepositoryId("arch", "repo")]
|
||||||
|
known_architectures_mock.assert_called_once_with(configuration.repository_paths.root, "repo")
|
||||||
|
known_repositories_mock.assert_not_called()
|
||||||
@@ -20,11 +20,11 @@ def test_atomic_move(mocker: MockerFixture) -> None:
|
|||||||
"""
|
"""
|
||||||
must move file with locking
|
must move file with locking
|
||||||
"""
|
"""
|
||||||
filelock_mock = mocker.patch("ahriman.core.utils.FileLock")
|
filelock_mock = mocker.patch("ahriman.core.utils.filelock")
|
||||||
move_mock = mocker.patch("shutil.move")
|
move_mock = mocker.patch("shutil.move")
|
||||||
|
|
||||||
atomic_move(Path("source"), Path("destination"))
|
atomic_move(Path("source"), Path("destination"))
|
||||||
filelock_mock.assert_called_once_with(Path(".destination.lock"))
|
filelock_mock.assert_called_once_with(Path("destination"))
|
||||||
move_mock.assert_called_once_with(Path("source"), Path("destination"))
|
move_mock.assert_called_once_with(Path("source"), Path("destination"))
|
||||||
|
|
||||||
|
|
||||||
@@ -247,6 +247,30 @@ def test_extract_user() -> None:
|
|||||||
assert extract_user() == "doas"
|
assert extract_user() == "doas"
|
||||||
|
|
||||||
|
|
||||||
|
def test_filelock(tmp_path: Path) -> None:
|
||||||
|
"""
|
||||||
|
must acquire lock and remove lock file after
|
||||||
|
"""
|
||||||
|
local = tmp_path / "local"
|
||||||
|
lock = local.with_name(f".{local.name}.lock")
|
||||||
|
|
||||||
|
with filelock(local):
|
||||||
|
assert lock.exists()
|
||||||
|
assert not lock.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_filelock_cleanup_on_missing(tmp_path: Path) -> None:
|
||||||
|
"""
|
||||||
|
must not fail if lock file is already removed
|
||||||
|
"""
|
||||||
|
local = tmp_path / "local"
|
||||||
|
lock = local.with_name(f".{local.name}.lock")
|
||||||
|
|
||||||
|
with filelock(local):
|
||||||
|
lock.unlink(missing_ok=True)
|
||||||
|
assert not lock.exists()
|
||||||
|
|
||||||
|
|
||||||
def test_filter_json(package_ahriman: Package) -> None:
|
def test_filter_json(package_ahriman: Package) -> None:
|
||||||
"""
|
"""
|
||||||
must filter fields by known list
|
must filter fields by known list
|
||||||
|
|||||||
Reference in New Issue
Block a user