Compare commits

..

17 Commits

Author SHA1 Message Date
29e9ed62d6 drop excess REQUIRES_REPOSITORY 2026-01-02 14:49:14 +02:00
0a8d34217f support requires repostory flag 2026-01-02 14:40:23 +02:00
0b69a1bb77 gpg loader fix 2026-01-02 14:40:23 +02:00
ca18ac936d regenerate docs 2026-01-02 14:40:23 +02:00
9f25e7a6ef add archive trigger 2026-01-02 14:40:23 +02:00
8a1722a1ea add archive trigger 2026-01-02 14:40:23 +02:00
0afec3fa6e lookup through archive packages before build 2026-01-02 14:40:23 +02:00
f63b61f413 use generic packages tree for all repos 2026-01-02 14:40:23 +02:00
4574ee7685 implement atomic_move method, move files only with lock 2026-01-02 14:40:23 +02:00
19eb0e19e9 write tests to support new changes 2026-01-02 14:40:23 +02:00
c366c4289c store built packages in archive tree instead of repository 2026-01-02 14:40:23 +02:00
46af782db2 build: drop shtab wrapper after their release 2025-11-18 20:47:20 +02:00
6443e02352 type: use as keyword in case match 2025-10-26 09:36:54 +02:00
999ad39d6f feat: add trigger loader guard 2025-09-17 14:45:09 +03:00
dfab5f56b2 feat: use atexit instead of del for triggers 2025-08-11 14:53:10 +03:00
10798b9ba3 fix: correctly process trigger repo specific settings in validator (see #154) 2025-08-01 16:53:15 +03:00
358e3dc4d2 feat: expose repository name and architecure in configuration if available
In some cases there are reference to current repository settings. In
order to handle it correctly two ro options have been added

Related to #154
2025-07-31 14:14:22 +03:00
31 changed files with 844 additions and 187 deletions

View File

@ -0,0 +1,29 @@
ahriman.core.archive package
============================
Submodules
----------
ahriman.core.archive.archive\_tree module
-----------------------------------------
.. automodule:: ahriman.core.archive.archive_tree
:members:
:no-undoc-members:
:show-inheritance:
ahriman.core.archive.archive\_trigger module
--------------------------------------------
.. automodule:: ahriman.core.archive.archive_trigger
:members:
:no-undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: ahriman.core.archive
:members:
:no-undoc-members:
:show-inheritance:

View File

@ -8,6 +8,7 @@ Subpackages
:maxdepth: 4
ahriman.core.alpm
ahriman.core.archive
ahriman.core.auth
ahriman.core.build_tools
ahriman.core.configuration

View File

@ -146,6 +146,8 @@ Build related configuration. Group name can refer to architecture, e.g. ``build:
Base repository settings.
* ``architecture`` - repository architecture, string. This field is read-only and generated automatically from run options if possible.
* ``name`` - repository name, string. This field is read-only and generated automatically from run options if possible.
* ``root`` - root path for application, string, required.
``sign:*`` groups

View File

@ -1,5 +1,6 @@
[build]
; List of well-known triggers. Used only for configuration purposes.
triggers_known[] = ahriman.core.archive.ArchiveTrigger
triggers_known[] = ahriman.core.distributed.WorkerLoaderTrigger
triggers_known[] = ahriman.core.distributed.WorkerTrigger
triggers_known[] = ahriman.core.support.KeyringTrigger

View File

@ -66,7 +66,7 @@ class Status(Handler):
Status.check_status(args.exit_code, packages)
comparator: Callable[[tuple[Package, BuildStatus]], Comparable] = lambda item: item[0].base
filter_fn: Callable[[tuple[Package, BuildStatus]], bool] =\
filter_fn: Callable[[tuple[Package, BuildStatus]], bool] = \
lambda item: args.status is None or item[1].status == args.status
for package, package_status in sorted(filter(filter_fn, packages), key=comparator):
PackagePrinter(package, package_status)(verbose=args.info)

View File

@ -52,7 +52,7 @@ class Validate(Handler):
"""
from ahriman.core.configuration.validator import Validator
schema = Validate.schema(repository_id, configuration)
schema = Validate.schema(configuration)
validator = Validator(configuration=configuration, schema=schema)
if validator.validate(configuration.dump()):
@ -83,12 +83,11 @@ class Validate(Handler):
return parser
@staticmethod
def schema(repository_id: RepositoryId, configuration: Configuration) -> ConfigurationSchema:
def schema(configuration: Configuration) -> ConfigurationSchema:
"""
get schema with triggers
Args:
repository_id(RepositoryId): repository unique identifier
configuration(Configuration): configuration instance
Returns:
@ -107,12 +106,12 @@ class Validate(Handler):
continue
# default settings if any
for schema_name, schema in trigger_class.configuration_schema(repository_id, None).items():
for schema_name, schema in trigger_class.configuration_schema(None).items():
erased = Validate.schema_erase_required(copy.deepcopy(schema))
root[schema_name] = Validate.schema_merge(root.get(schema_name, {}), erased)
# settings according to enabled triggers
for schema_name, schema in trigger_class.configuration_schema(repository_id, configuration).items():
for schema_name, schema in trigger_class.configuration_schema(configuration).items():
root[schema_name] = Validate.schema_merge(root.get(schema_name, {}), copy.deepcopy(schema))
return root

View File

@ -88,22 +88,24 @@ class Repo(LazyLogging):
check_output("repo-add", *self.sign_args, str(self.repo_path),
cwd=self.root, logger=self.logger, user=self.uid)
def remove(self, package: str, filename: Path) -> None:
def remove(self, package_name: str | None, filename: Path) -> None:
"""
remove package from repository
Args:
package(str): package name to remove
package_name(str | None): package name to remove. If none set, it will be guessed from filename
filename(Path): package filename to remove
"""
package_name = package_name or filename.name.rsplit("-", maxsplit=3)[0]
# remove package and signature (if any) from filesystem
for full_path in self.root.glob(f"**/{filename.name}*"):
full_path.unlink()
# remove package from registry
check_output(
"repo-remove", *self.sign_args, str(self.repo_path), package,
exception=BuildError.from_process(package),
"repo-remove", *self.sign_args, str(self.repo_path), package_name,
exception=BuildError.from_process(package_name),
cwd=self.root,
logger=self.logger,
user=self.uid,

View File

@ -0,0 +1,20 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.core.archive.archive_trigger import ArchiveTrigger

View File

@ -0,0 +1,130 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import datetime
from pathlib import Path
from ahriman.core.alpm.repo import Repo
from ahriman.core.log import LazyLogging
from ahriman.core.utils import utcnow, walk
from ahriman.models.package import Package
from ahriman.models.repository_paths import RepositoryPaths
class ArchiveTree(LazyLogging):
"""
wrapper around archive tree
Attributes:
paths(RepositoryPaths): repository paths instance
repository_id(RepositoryId): repository unique identifier
sign_args(list[str]): additional args which have to be used to sign repository archive
"""
def __init__(self, repository_path: RepositoryPaths, sign_args: list[str]) -> None:
"""
Args:
repository_path(RepositoryPaths): repository paths instance
sign_args(list[str]): additional args which have to be used to sign repository archive
"""
self.paths = repository_path
self.repository_id = repository_path.repository_id
self.sign_args = sign_args
def repository_for(self, date: datetime.date | None = None) -> Path:
"""
get full path to repository at the specified date
Args:
date(datetime.date | None, optional): date to generate path. If none supplied then today will be used
(Default value = None)
Returns:
Path: path to the repository root
"""
date = date or utcnow().date()
return (
self.paths.archive
/ "repos"
/ date.strftime("%Y")
/ date.strftime("%m")
/ date.strftime("%d")
/ self.repository_id.name
/ self.repository_id.architecture
)
def symlinks_create(self, packages: list[Package]) -> None:
"""
create symlinks for the specified packages in today's repository
Args:
packages(list[Package]): list of packages to be updated
"""
root = self.repository_for()
repo = Repo(self.repository_id.name, self.paths, self.sign_args, root)
for package in packages:
archive = self.paths.archive_for(package.base)
for package_name, single in package.packages.items():
if single.filename is None:
self.logger.warning("received empty package filename for %s", package_name)
continue
has_file = False
for file in archive.glob(f"{single.filename}*"):
symlink = root / file.name
if symlink.exists():
continue # symlink is already created, skip processing
has_file = True
symlink.symlink_to(file.relative_to(symlink.parent, walk_up=True))
if has_file:
repo.add(root / single.filename)
def symlinks_fix(self) -> None:
"""
remove broken symlinks across repositories for all dates
"""
for path in walk(self.paths.archive / "repos"):
root = path.parent
*_, name, architecture = root.parts
if self.repository_id.name != name or self.repository_id.architecture != architecture:
continue # we only process same name repositories
if not path.is_symlink():
continue # find symlinks only
if path.exists():
continue # filter out not broken symlinks
Repo(self.repository_id.name, self.paths, self.sign_args, root).remove(None, path)
def tree_create(self) -> None:
"""
create repository tree for current repository
"""
root = self.repository_for()
if root.exists():
return
with self.paths.preserve_owner(self.paths.archive):
root.mkdir(0o755, parents=True)
# init empty repository here
Repo(self.repository_id.name, self.paths, self.sign_args, root).init()

View File

@ -0,0 +1,69 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.core.archive.archive_tree import ArchiveTree
from ahriman.core.configuration import Configuration
from ahriman.core.sign.gpg import GPG
from ahriman.core.triggers import Trigger
from ahriman.models.package import Package
from ahriman.models.repository_id import RepositoryId
from ahriman.models.result import Result
class ArchiveTrigger(Trigger):
"""
archive repository extension
Attributes:
paths(RepositoryPaths): repository paths instance
tree(ArchiveTree): archive tree wrapper
"""
def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None:
"""
Args:
repository_id(RepositoryId): repository unique identifier
configuration(Configuration): configuration instance
"""
Trigger.__init__(self, repository_id, configuration)
self.paths = configuration.repository_paths
self.tree = ArchiveTree(self.paths, GPG(configuration).repository_sign_args)
def on_result(self, result: Result, packages: list[Package]) -> None:
"""
run trigger
Args:
result(Result): build result
packages(list[Package]): list of all available packages
"""
self.tree.symlinks_create(packages)
def on_start(self) -> None:
"""
trigger action which will be called at the start of the application
"""
self.tree.tree_create()
def on_stop(self) -> None:
"""
trigger action which will be called before the stop of the application
"""
self.tree.symlinks_fix()

View File

@ -43,7 +43,6 @@ class Configuration(configparser.RawConfigParser):
SYSTEM_CONFIGURATION_PATH(Path): (class attribute) default system configuration path distributed by package
includes(list[Path]): list of includes which were read
path(Path | None): path to root configuration file
repository_id(RepositoryId | None): repository unique identifier
Examples:
Configuration class provides additional method in order to handle application configuration. Since this class is
@ -94,7 +93,7 @@ class Configuration(configparser.RawConfigParser):
},
)
self.repository_id: RepositoryId | None = None
self._repository_id: RepositoryId | None = None
self.path: Path | None = None
self.includes: list[Path] = []
@ -129,6 +128,32 @@ class Configuration(configparser.RawConfigParser):
"""
return self.getpath("settings", "logging")
@property
def repository_id(self) -> RepositoryId | None:
"""
repository identifier
Returns:
RepositoryId: repository unique identifier
"""
return self._repository_id
@repository_id.setter
def repository_id(self, repository_id: RepositoryId | None) -> None:
"""
setter for repository identifier
Args:
repository_id(RepositoryId | None): repository unique identifier
"""
self._repository_id = repository_id
if repository_id is None or repository_id.is_empty:
self.remove_option("repository", "name")
self.remove_option("repository", "architecture")
else:
self.set_option("repository", "name", repository_id.name)
self.set_option("repository", "architecture", repository_id.architecture)
@property
def repository_name(self) -> str:
"""

View File

@ -57,7 +57,7 @@ class ConfigurationMultiDict(dict[str, Any]):
OptionError: if the key already exists in the dictionary, but not a single value list or a string
"""
match self.get(key):
case [current_value] | str(current_value):
case [current_value] | (str() as current_value):
value = f"{current_value} {value}"
case None:
pass

View File

@ -249,6 +249,10 @@ CONFIGURATION_SCHEMA: ConfigurationSchema = {
"repository": {
"type": "dict",
"schema": {
"architecture": {
"type": "string",
"empty": False,
},
"name": {
"type": "string",
"empty": False,

View File

@ -17,7 +17,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from collections.abc import Iterable
import shutil
from collections.abc import Generator, Iterable
from pathlib import Path
from tempfile import TemporaryDirectory
@ -25,7 +27,7 @@ from ahriman.core.build_tools.package_archive import PackageArchive
from ahriman.core.build_tools.task import Task
from ahriman.core.repository.cleaner import Cleaner
from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.utils import atomic_move, safe_filename
from ahriman.core.utils import atomic_move, filelock, package_like, safe_filename
from ahriman.models.changes import Changes
from ahriman.models.event import EventType
from ahriman.models.package import Package
@ -39,6 +41,36 @@ class Executor(PackageInfo, Cleaner):
trait for common repository update processes
"""
def _archive_lookup(self, package: Package) -> Generator[Path, None, None]:
"""
check if there is a rebuilt package already
Args:
package(Package): package to check
Yields:
Path: list of built packages and signatures if available, empty list otherwise
"""
archive = self.paths.archive_for(package.base)
# find all packages which have same version
same_version = [
built
for path in filter(package_like, archive.iterdir())
if (built := Package.from_archive(path, self.pacman)).version == package.version
]
# no packages of the same version found
if not same_version:
return
packages = [single for built in same_version for single in built.packages.values()]
# all packages must be either any or same architecture
if not all(single.architecture in ("any", self.architecture) for single in packages):
return
for single in packages:
yield from archive.glob(f"{single.filename}*")
def _archive_rename(self, description: PackageDescription, package_base: str) -> None:
"""
rename package archive removing special symbols
@ -48,7 +80,7 @@ class Executor(PackageInfo, Cleaner):
package_base(str): package base name
"""
if description.filename is None:
self.logger.warning("received empty package name for base %s", package_base)
self.logger.warning("received empty package filename for base %s", package_base)
return # suppress type checking, it never can be none actually
if (safe := safe_filename(description.filename)) != description.filename:
@ -74,7 +106,17 @@ class Executor(PackageInfo, Cleaner):
task = Task(package, self.configuration, self.architecture, self.paths)
patches = self.reporter.package_patches_get(package.base, None)
commit_sha = task.init(path, patches, local_version)
built = task.build(path, PACKAGER=packager)
loaded_package = Package.from_build(path, self.architecture, None)
if prebuilt := list(self._archive_lookup(loaded_package)):
self.logger.info("using prebuilt packages for %s-%s", loaded_package.base, loaded_package.version)
built = []
for artefact in prebuilt:
with filelock(artefact):
shutil.copy(artefact, path)
built.append(path / artefact.name)
else:
built = task.build(path, PACKAGER=packager)
package.with_packages(built, self.pacman)
for src in built:
@ -119,7 +161,7 @@ class Executor(PackageInfo, Cleaner):
packager_key(str | None): packager key identifier
"""
if filename is None:
self.logger.warning("received empty package name for base %s", package_base)
self.logger.warning("received empty package filename for base %s", package_base)
return # suppress type checking, it never can be none actually
# in theory, it might be NOT packages directory, but we suppose it is

View File

@ -36,6 +36,7 @@ class Trigger(LazyLogging):
CONFIGURATION_SCHEMA(ConfigurationSchema): (class attribute) configuration schema template
CONFIGURATION_SCHEMA_FALLBACK(str | None): (class attribute) optional fallback option for defining
configuration schema type used
REQUIRES_REPOSITORY(bool): (class attribute) either trigger requires loaded repository or not
configuration(Configuration): configuration instance
repository_id(RepositoryId): repository unique identifier
@ -59,6 +60,7 @@ class Trigger(LazyLogging):
CONFIGURATION_SCHEMA: ClassVar[ConfigurationSchema] = {}
CONFIGURATION_SCHEMA_FALLBACK: ClassVar[str | None] = None
REQUIRES_REPOSITORY: ClassVar[bool] = True
def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None:
"""
@ -79,9 +81,18 @@ class Trigger(LazyLogging):
"""
return self.repository_id.architecture
@property
def is_allowed_to_run(self) -> bool:
"""
whether trigger allowed to run or not
Returns:
bool: ``True`` in case if trigger allowed to run and ``False`` otherwise
"""
return not (self.REQUIRES_REPOSITORY and self.repository_id.is_empty)
@classmethod
def configuration_schema(cls, repository_id: RepositoryId,
configuration: Configuration | None) -> ConfigurationSchema:
def configuration_schema(cls, configuration: Configuration | None) -> ConfigurationSchema:
"""
configuration schema based on supplied service configuration
@ -89,7 +100,6 @@ class Trigger(LazyLogging):
Schema must be in cerberus format, for details and examples you can check built-in triggers.
Args:
repository_id(str): repository unique identifier
configuration(Configuration | None): configuration instance. If set to None, the default schema
should be returned
@ -101,13 +111,15 @@ class Trigger(LazyLogging):
result: ConfigurationSchema = {}
for target in cls.configuration_sections(configuration):
if not configuration.has_section(target):
continue
section, schema_name = configuration.gettype(
target, repository_id, fallback=cls.CONFIGURATION_SCHEMA_FALLBACK)
if schema_name not in cls.CONFIGURATION_SCHEMA:
continue
result[section] = cls.CONFIGURATION_SCHEMA[schema_name]
for section in configuration.sections():
if not (section == target or section.startswith(f"{target}:")):
# either repository specific or exact name
continue
schema_name = configuration.get(section, "type", fallback=section)
if schema_name not in cls.CONFIGURATION_SCHEMA:
continue
result[section] = cls.CONFIGURATION_SCHEMA[schema_name]
return result

View File

@ -17,6 +17,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import atexit
import contextlib
import os
@ -60,17 +61,8 @@ class TriggerLoader(LazyLogging):
def __init__(self) -> None:
""""""
self._on_stop_requested = False
self.triggers: list[Trigger] = []
def __del__(self) -> None:
"""
custom destructor object which calls on_stop in case if it was requested
"""
if not self._on_stop_requested:
return
self.on_stop()
@classmethod
def load(cls, repository_id: RepositoryId, configuration: Configuration) -> Self:
"""
@ -85,8 +77,9 @@ class TriggerLoader(LazyLogging):
"""
instance = cls()
instance.triggers = [
instance.load_trigger(trigger, repository_id, configuration)
for trigger in instance.selected_triggers(configuration)
trigger
for trigger_name in instance.selected_triggers(configuration)
if (trigger := instance.load_trigger(trigger_name, repository_id, configuration)).is_allowed_to_run
]
return instance
@ -250,10 +243,11 @@ class TriggerLoader(LazyLogging):
run triggers on load
"""
self.logger.debug("executing triggers on start")
self._on_stop_requested = True
for trigger in self.triggers:
with self.__execute_trigger(trigger):
trigger.on_start()
# register on_stop call
atexit.register(self.on_stop)
def on_stop(self) -> None:
"""

View File

@ -18,6 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# pylint: disable=too-many-lines
import contextlib
import datetime
import fcntl
import io
@ -47,6 +48,7 @@ __all__ = [
"dataclass_view",
"enum_values",
"extract_user",
"filelock",
"filter_json",
"full_version",
"minmax",
@ -83,17 +85,8 @@ def atomic_move(src: Path, dst: Path) -> None:
>>> atomic_move(src, dst)
"""
lock_path = dst.with_name(f".{dst.name}")
try:
with lock_path.open("ab") as lock_file:
fd = lock_file.fileno()
try:
fcntl.flock(fd, fcntl.LOCK_EX) # lock file and wait lock is until available
shutil.move(src, dst)
finally:
fcntl.flock(fd, fcntl.LOCK_UN) # unlock file first
finally:
lock_path.unlink(missing_ok=True) # remove lock file at the end
with filelock(dst):
shutil.move(src, dst)
# pylint: disable=too-many-locals
@ -264,6 +257,27 @@ def extract_user() -> str | None:
return os.getenv("SUDO_USER") or os.getenv("DOAS_USER") or os.getenv("USER")
@contextlib.contextmanager
def filelock(path: Path) -> Generator[None, None, None]:
"""
lock on file passed as argument
Args:
path(Path): path object on which lock must be performed
"""
lock_path = path.with_name(f".{path.name}")
try:
with lock_path.open("ab") as lock_file:
fd = lock_file.fileno()
try:
fcntl.flock(fd, fcntl.LOCK_EX) # lock file and wait lock is until available
yield
finally:
fcntl.flock(fd, fcntl.LOCK_UN) # unlock file first
finally:
lock_path.unlink(missing_ok=True) # remove lock file at the end
def filter_json(source: dict[str, Any], known_fields: Iterable[str]) -> dict[str, Any]:
"""
filter json object by fields used for json-to-object conversion

View File

@ -37,6 +37,7 @@ SUBPACKAGES = {
"ahriman-triggers": [
prefix / "share" / "ahriman" / "settings" / "ahriman.ini.d" / "00-triggers.ini",
site_packages / "ahriman" / "application" / "handlers" / "triggers_support.py",
site_packages / "ahriman" / "core" / "archive",
site_packages / "ahriman" / "core" / "distributed",
site_packages / "ahriman" / "core" / "support",
],

View File

@ -69,7 +69,6 @@ def test_move_tree(mocker: MockerFixture) -> None:
TreeMigrate.tree_move(from_paths, to_paths)
rename_mock.assert_has_calls([
MockCall(from_paths.archive, to_paths.archive),
MockCall(from_paths.packages, to_paths.packages),
MockCall(from_paths.pacman, to_paths.pacman),
MockCall(from_paths.repository, to_paths.repository),

View File

@ -2,6 +2,7 @@ import argparse
import json
import pytest
from pathlib import Path
from pytest_mock import MockerFixture
from ahriman.application.handlers.validate import Validate
@ -53,12 +54,50 @@ def test_run_skip(args: argparse.Namespace, configuration: Configuration, mocker
print_mock.assert_not_called()
def test_run_default(args: argparse.Namespace, configuration: Configuration) -> None:
"""
must run on default configuration without errors
"""
args.exit_code = True
_, repository_id = configuration.check_loaded()
default = Configuration.from_path(Configuration.SYSTEM_CONFIGURATION_PATH, repository_id)
# copy autogenerated values
for section, key in (("build", "build_command"), ("repository", "root")):
value = configuration.get(section, key)
default.set_option(section, key, value)
Validate.run(args, repository_id, default, report=False)
def test_run_repo_specific_triggers(args: argparse.Namespace, configuration: Configuration,
resource_path_root: Path) -> None:
"""
must correctly insert repo specific triggers
"""
args.exit_code = True
_, repository_id = configuration.check_loaded()
# remove unused sections
for section in ("archive", "customs3", "github:x86_64", "logs-rotation", "mirrorlist"):
configuration.remove_section(section)
configuration.set_option("report", "target", "test")
for section in ("test", "test:i686", "test:another-repo:x86_64"):
configuration.set_option(section, "type", "html")
configuration.set_option(section, "link_path", "http://link_path")
configuration.set_option(section, "path", "path")
configuration.set_option(section, "template", "template")
configuration.set_option(section, "templates", str(resource_path_root))
Validate.run(args, repository_id, configuration, report=False)
def test_schema(configuration: Configuration) -> None:
"""
must generate full schema correctly
"""
_, repository_id = configuration.check_loaded()
schema = Validate.schema(repository_id, configuration)
schema = Validate.schema(configuration)
# defaults
assert schema.pop("console")
@ -91,9 +130,7 @@ def test_schema_invalid_trigger(configuration: Configuration) -> None:
"""
configuration.set_option("build", "triggers", "some.invalid.trigger.path.Trigger")
configuration.remove_option("build", "triggers_known")
_, repository_id = configuration.check_loaded()
assert Validate.schema(repository_id, configuration) == CONFIGURATION_SCHEMA
assert Validate.schema(configuration) == CONFIGURATION_SCHEMA
def test_schema_erase_required() -> None:

View File

@ -4,6 +4,7 @@ from pathlib import Path
from pytest_mock import MockerFixture
from ahriman.core.alpm.repo import Repo
from ahriman.models.package import Package
from ahriman.models.repository_paths import RepositoryPaths
@ -56,21 +57,37 @@ def test_repo_init(repo: Repo, mocker: MockerFixture) -> None:
assert check_output_mock.call_args[0][0] == "repo-add"
def test_repo_remove(repo: Repo, mocker: MockerFixture) -> None:
def test_repo_remove(repo: Repo, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must call repo-remove on package addition
must call repo-remove on package removal
"""
filepath = package_ahriman.packages[package_ahriman.base].filepath
mocker.patch("pathlib.Path.glob", return_value=[])
check_output_mock = mocker.patch("ahriman.core.alpm.repo.check_output")
repo.remove("package", Path("package.pkg.tar.xz"))
repo.remove(package_ahriman.base, filepath)
check_output_mock.assert_called_once() # it will be checked later
assert check_output_mock.call_args[0][0] == "repo-remove"
assert package_ahriman.base in check_output_mock.call_args[0]
def test_repo_remove_guess_package(repo: Repo, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must call repo-remove on package removal if no package name set
"""
filepath = package_ahriman.packages[package_ahriman.base].filepath
mocker.patch("pathlib.Path.glob", return_value=[])
check_output_mock = mocker.patch("ahriman.core.alpm.repo.check_output")
repo.remove(None, filepath)
check_output_mock.assert_called_once() # it will be checked later
assert check_output_mock.call_args[0][0] == "repo-remove"
assert package_ahriman.base in check_output_mock.call_args[0]
def test_repo_remove_fail_no_file(repo: Repo, mocker: MockerFixture) -> None:
"""
must fail on missing file
must fail removal on missing file
"""
mocker.patch("pathlib.Path.glob", return_value=[Path("package.pkg.tar.xz")])
mocker.patch("pathlib.Path.unlink", side_effect=FileNotFoundError)

View File

@ -0,0 +1,34 @@
import pytest
from ahriman.core.archive import ArchiveTrigger
from ahriman.core.archive.archive_tree import ArchiveTree
from ahriman.core.configuration import Configuration
@pytest.fixture
def archive_tree(configuration: Configuration) -> ArchiveTree:
"""
archive tree fixture
Args:
configuration(Configuration): configuration fixture
Returns:
ArchiveTree: archive tree test instance
"""
return ArchiveTree(configuration.repository_paths, [])
@pytest.fixture
def archive_trigger(configuration: Configuration) -> ArchiveTrigger:
"""
archive trigger fixture
Args:
configuration(Configuration): configuration fixture
Returns:
ArchiveTrigger: archive trigger test instance
"""
_, repository_id = configuration.check_loaded()
return ArchiveTrigger(repository_id, configuration)

View File

@ -0,0 +1,135 @@
from pathlib import Path
from pytest_mock import MockerFixture
from ahriman.core.archive.archive_tree import ArchiveTree
from ahriman.core.utils import utcnow
from ahriman.models.package import Package
def test_repository_for(archive_tree: ArchiveTree) -> None:
"""
must correctly generate path to repository
"""
path = archive_tree.repository_for()
assert path.is_relative_to(archive_tree.paths.archive / "repos")
assert (archive_tree.repository_id.name, archive_tree.repository_id.architecture) == path.parts[-2:]
assert set(map("{:02d}".format, utcnow().timetuple()[:3])).issubset(path.parts)
def test_symlinks_create(archive_tree: ArchiveTree, package_ahriman: Package, package_python_schedule: Package,
mocker: MockerFixture) -> None:
"""
must create symlinks
"""
_original_exists = Path.exists
def exists_mock(path: Path) -> bool:
if path.name in (package.filename for package in package_python_schedule.packages.values()):
return True
return _original_exists(path)
symlinks_mock = mocker.patch("pathlib.Path.symlink_to")
add_mock = mocker.patch("ahriman.core.alpm.repo.Repo.add")
mocker.patch("pathlib.Path.glob", autospec=True, side_effect=lambda path, name: [path / name[:-1]])
mocker.patch("pathlib.Path.exists", autospec=True, side_effect=exists_mock)
archive_tree.symlinks_create([package_ahriman, package_python_schedule])
symlinks_mock.assert_called_once_with(
Path("..") /
".." /
".." /
".." /
".." /
".." /
archive_tree.paths.archive_for(package_ahriman.base)
.relative_to(archive_tree.paths.root)
.relative_to("archive") /
package_ahriman.packages[package_ahriman.base].filename
)
add_mock.assert_called_once_with(
archive_tree.repository_for() / package_ahriman.packages[package_ahriman.base].filename
)
def test_symlinks_create_empty_filename(archive_tree: ArchiveTree, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must skip symlinks creation if filename is not set
"""
package_ahriman.packages[package_ahriman.base].filename = None
symlinks_mock = mocker.patch("pathlib.Path.symlink_to")
archive_tree.symlinks_create([package_ahriman])
symlinks_mock.assert_not_called()
def test_symlinks_fix(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
"""
must fix broken symlinks
"""
_original_exists = Path.exists
def exists_mock(path: Path) -> bool:
if path.name == "symlink":
return True
return _original_exists(path)
mocker.patch("pathlib.Path.is_symlink", side_effect=[True, True, False])
mocker.patch("pathlib.Path.exists", autospec=True, side_effect=exists_mock)
walk_mock = mocker.patch("ahriman.core.archive.archive_tree.walk", return_value=[
archive_tree.repository_for() / filename
for filename in ("symlink", "broken_symlink", "file")
])
remove_mock = mocker.patch("ahriman.core.alpm.repo.Repo.remove")
archive_tree.symlinks_fix()
walk_mock.assert_called_once_with(archive_tree.paths.archive / "repos")
remove_mock.assert_called_once_with(None, archive_tree.repository_for() / "broken_symlink")
def test_symlinks_fix_foreign_repository(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
"""
must skip symlinks check if repository name or architecture doesn't match
"""
_original_exists = Path.exists
def exists_mock(path: Path) -> bool:
if path.name == "symlink":
return True
return _original_exists(path)
mocker.patch("pathlib.Path.is_symlink", side_effect=[True, True, False])
mocker.patch("pathlib.Path.exists", autospec=True, side_effect=exists_mock)
mocker.patch("ahriman.core.archive.archive_tree.walk", return_value=[
archive_tree.repository_for().with_name("i686") / filename
for filename in ("symlink", "broken_symlink", "file")
])
remove_mock = mocker.patch("ahriman.core.alpm.repo.Repo.remove")
archive_tree.symlinks_fix()
remove_mock.assert_not_called()
def test_tree_create(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
"""
must create repository root if not exists
"""
owner_guard_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
init_mock = mocker.patch("ahriman.core.alpm.repo.Repo.init")
archive_tree.tree_create()
owner_guard_mock.assert_called_once_with(archive_tree.paths.archive)
mkdir_mock.assert_called_once_with(0o755, parents=True)
init_mock.assert_called_once_with()
def test_tree_create_exists(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
"""
must skip directory creation if already exists
"""
mocker.patch("pathlib.Path.exists", return_value=True)
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
archive_tree.tree_create()
mkdir_mock.assert_not_called()

View File

@ -0,0 +1,32 @@
from pytest_mock import MockerFixture
from ahriman.core.archive import ArchiveTrigger
from ahriman.models.package import Package
from ahriman.models.result import Result
def test_on_result(archive_trigger: ArchiveTrigger, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must create symlinks for actual repository
"""
symlinks_mock = mocker.patch("ahriman.core.archive.archive_tree.ArchiveTree.symlinks_create")
archive_trigger.on_result(Result(), [package_ahriman])
symlinks_mock.assert_called_once_with([package_ahriman])
def test_on_start(archive_trigger: ArchiveTrigger, mocker: MockerFixture) -> None:
"""
must create repository tree on load
"""
tree_mock = mocker.patch("ahriman.core.archive.archive_tree.ArchiveTree.tree_create")
archive_trigger.on_start()
tree_mock.assert_called_once_with()
def test_on_stop(archive_trigger: ArchiveTrigger, mocker: MockerFixture) -> None:
"""
must create repository tree on load
"""
symlinks_mock = mocker.patch("ahriman.core.archive.archive_tree.ArchiveTree.symlinks_fix")
archive_trigger.on_stop()
symlinks_mock.assert_called_once_with()

View File

@ -20,6 +20,40 @@ def test_architecture(configuration: Configuration) -> None:
assert configuration.architecture == "x86_64"
def test_repository_id(configuration: Configuration, repository_id: RepositoryId) -> None:
"""
must return repository identifier
"""
assert configuration.repository_id == repository_id
assert configuration.get("repository", "name") == repository_id.name
assert configuration.get("repository", "architecture") == repository_id.architecture
def test_repository_id_erase(configuration: Configuration) -> None:
"""
must remove repository identifier properties if empty identifier supplied
"""
configuration.repository_id = None
assert configuration.get("repository", "name", fallback=None) is None
assert configuration.get("repository", "architecture", fallback=None) is None
configuration.repository_id = RepositoryId("", "")
assert configuration.get("repository", "name", fallback=None) is None
assert configuration.get("repository", "architecture", fallback=None) is None
def test_repository_id_update(configuration: Configuration, repository_id: RepositoryId) -> None:
"""
must update repository identifier and related configuration options
"""
repository_id = RepositoryId("i686", repository_id.name)
configuration.repository_id = repository_id
assert configuration.repository_id == repository_id
assert configuration.get("repository", "name") == repository_id.name
assert configuration.get("repository", "architecture") == repository_id.architecture
def test_repository_name(configuration: Configuration) -> None:
"""
must return valid repository name

View File

@ -1,5 +1,6 @@
import pytest
from dataclasses import replace
from pathlib import Path
from pytest_mock import MockerFixture
from typing import Any
@ -13,6 +14,57 @@ from ahriman.models.packagers import Packagers
from ahriman.models.user import User
def test_archive_lookup(executor: Executor, package_ahriman: Package, package_python_schedule: Package,
mocker: MockerFixture) -> None:
"""
must existing packages which match the version
"""
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
Path("2.pkg.tar.zst"),
Path("3.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", side_effect=[
package_ahriman,
package_python_schedule,
replace(package_ahriman, version="1"),
])
glob_mock = mocker.patch("pathlib.Path.glob", return_value=[Path("1.pkg.tar.xz")])
assert list(executor._archive_lookup(package_ahriman)) == [Path("1.pkg.tar.xz")]
glob_mock.assert_called_once_with(f"{package_ahriman.packages[package_ahriman.base].filename}*")
def test_archive_lookup_version_mismatch(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must return nothing if no packages found with the same version
"""
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", return_value=replace(package_ahriman, version="1"))
assert list(executor._archive_lookup(package_ahriman)) == []
def test_archive_lookup_architecture_mismatch(executor: Executor, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must return nothing if architecture doesn't match
"""
package_ahriman.packages[package_ahriman.base].architecture = "x86_64"
mocker.patch("ahriman.core.repository.executor.Executor.architecture", return_value="i686")
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", return_value=package_ahriman)
assert list(executor._archive_lookup(package_ahriman)) == []
def test_archive_rename(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must correctly remove package archive
@ -45,16 +97,39 @@ def test_package_build(executor: Executor, package_ahriman: Package, mocker: Moc
mocker.patch("ahriman.core.build_tools.task.Task.build", return_value=[Path(package_ahriman.base)])
status_client_mock = mocker.patch("ahriman.core.status.Client.set_building")
init_mock = mocker.patch("ahriman.core.build_tools.task.Task.init", return_value="sha")
package_mock = mocker.patch("ahriman.models.package.Package.from_build", return_value=package_ahriman)
lookup_mock = mocker.patch("ahriman.core.repository.executor.Executor._archive_lookup", return_value=[])
with_packages_mock = mocker.patch("ahriman.models.package.Package.with_packages")
rename_mock = mocker.patch("ahriman.core.repository.executor.atomic_move")
assert executor._package_build(package_ahriman, Path("local"), "packager", None) == "sha"
status_client_mock.assert_called_once_with(package_ahriman.base)
init_mock.assert_called_once_with(pytest.helpers.anyvar(int), pytest.helpers.anyvar(int), None)
package_mock.assert_called_once_with(Path("local"), executor.architecture, None)
lookup_mock.assert_called_once_with(package_ahriman)
with_packages_mock.assert_called_once_with([Path(package_ahriman.base)], executor.pacman)
rename_mock.assert_called_once_with(Path(package_ahriman.base), executor.paths.packages / package_ahriman.base)
def test_package_build_copy(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must copy package from archive if there are already built ones
"""
path = package_ahriman.packages[package_ahriman.base].filepath
mocker.patch("ahriman.core.build_tools.task.Task.build", return_value=[Path(package_ahriman.base)])
mocker.patch("ahriman.core.build_tools.task.Task.init")
mocker.patch("ahriman.models.package.Package.from_build", return_value=package_ahriman)
mocker.patch("ahriman.core.repository.executor.Executor._archive_lookup", return_value=[path])
mocker.patch("ahriman.core.repository.executor.atomic_move")
mocker.patch("ahriman.models.package.Package.with_packages")
copy_mock = mocker.patch("shutil.copy")
rename_mock = mocker.patch("ahriman.core.repository.executor.atomic_move")
executor._package_build(package_ahriman, Path("local"), "packager", None)
copy_mock.assert_called_once_with(path, Path("local"))
rename_mock.assert_called_once_with(Path("local") / path, executor.paths.packages / path)
def test_package_remove(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must run remove for packages
@ -111,9 +186,7 @@ def test_package_update(executor: Executor, package_ahriman: Package, user: User
Path("..") /
".." /
".." /
executor.paths.archive_for(
package_ahriman.base).relative_to(
executor.paths.root) /
executor.paths.archive_for(package_ahriman.base).relative_to(executor.paths.root) /
filepath)
# must add package
repo_add_mock.assert_called_once_with(executor.paths.repository / filepath)

View File

@ -17,6 +17,7 @@ from ahriman.core.utils import (
dataclass_view,
enum_values,
extract_user,
filelock,
filter_json,
full_version,
minmax,
@ -43,47 +44,12 @@ def test_atomic_move(mocker: MockerFixture) -> None:
"""
must move file with locking
"""
lock_mock = mocker.patch("fcntl.flock")
open_mock = mocker.patch("pathlib.Path.open", autospec=True)
filelock_mock = mocker.patch("ahriman.core.utils.filelock")
move_mock = mocker.patch("shutil.move")
unlink_mock = mocker.patch("pathlib.Path.unlink")
atomic_move(Path("source"), Path("destination"))
open_mock.assert_called_once_with(Path(".destination"), "ab")
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
filelock_mock.assert_called_once_with(Path("destination"))
move_mock.assert_called_once_with(Path("source"), Path("destination"))
unlink_mock.assert_called_once_with(missing_ok=True)
def test_atomic_move_remove_lock(mocker: MockerFixture) -> None:
"""
must remove lock file in case of exception
"""
mocker.patch("pathlib.Path.open", side_effect=Exception)
unlink_mock = mocker.patch("pathlib.Path.unlink")
with pytest.raises(Exception):
atomic_move(Path("source"), Path("destination"))
unlink_mock.assert_called_once_with(missing_ok=True)
def test_atomic_move_unlock(mocker: MockerFixture) -> None:
"""
must unlock file in case of exception
"""
mocker.patch("pathlib.Path.open")
mocker.patch("shutil.move", side_effect=Exception)
lock_mock = mocker.patch("fcntl.flock")
with pytest.raises(Exception):
atomic_move(Path("source"), Path("destination"))
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
def test_check_output(mocker: MockerFixture) -> None:
@ -305,6 +271,53 @@ def test_extract_user() -> None:
assert extract_user() == "doas"
def test_filelock(mocker: MockerFixture) -> None:
"""
must perform file locking
"""
lock_mock = mocker.patch("fcntl.flock")
open_mock = mocker.patch("pathlib.Path.open", autospec=True)
unlink_mock = mocker.patch("pathlib.Path.unlink")
with filelock(Path("local")):
pass
open_mock.assert_called_once_with(Path(".local"), "ab")
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
unlink_mock.assert_called_once_with(missing_ok=True)
def test_filelock_remove_lock(mocker: MockerFixture) -> None:
"""
must remove lock file in case of exception
"""
mocker.patch("pathlib.Path.open", side_effect=Exception)
unlink_mock = mocker.patch("pathlib.Path.unlink")
with pytest.raises(Exception):
with filelock(Path("local")):
pass
unlink_mock.assert_called_once_with(missing_ok=True)
def test_filelock_unlock(mocker: MockerFixture) -> None:
"""
must unlock file in case of exception
"""
mocker.patch("pathlib.Path.open")
lock_mock = mocker.patch("fcntl.flock")
with pytest.raises(Exception):
with filelock(Path("local")):
raise Exception
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
def test_filter_json(package_ahriman: Package) -> None:
"""
must filter fields by known list

View File

@ -3,6 +3,7 @@ from unittest.mock import MagicMock
from ahriman.core.configuration import Configuration
from ahriman.core.report import ReportTrigger
from ahriman.core.triggers import Trigger
from ahriman.models.repository_id import RepositoryId
from ahriman.models.result import Result
@ -13,16 +14,28 @@ def test_architecture(trigger: Trigger) -> None:
assert trigger.architecture == trigger.repository_id.architecture
def test_is_allowed_to_run(trigger: Trigger) -> None:
"""
must return flag correctly
"""
assert trigger.is_allowed_to_run
trigger.repository_id = RepositoryId("", "")
assert not trigger.is_allowed_to_run
trigger.REQUIRES_REPOSITORY = False
assert trigger.is_allowed_to_run
def test_configuration_schema(configuration: Configuration) -> None:
"""
must return used configuration schema
"""
section = "console"
configuration.set_option("report", "target", section)
_, repository_id = configuration.check_loaded()
expected = {section: ReportTrigger.CONFIGURATION_SCHEMA[section]}
assert ReportTrigger.configuration_schema(repository_id, configuration) == expected
assert ReportTrigger.configuration_schema(configuration) == expected
def test_configuration_schema_no_section(configuration: Configuration) -> None:
@ -31,9 +44,7 @@ def test_configuration_schema_no_section(configuration: Configuration) -> None:
"""
section = "abracadabra"
configuration.set_option("report", "target", section)
_, repository_id = configuration.check_loaded()
assert ReportTrigger.configuration_schema(repository_id, configuration) == {}
assert ReportTrigger.configuration_schema(configuration) == {}
def test_configuration_schema_no_schema(configuration: Configuration) -> None:
@ -43,17 +54,15 @@ def test_configuration_schema_no_schema(configuration: Configuration) -> None:
section = "abracadabra"
configuration.set_option("report", "target", section)
configuration.set_option(section, "key", "value")
_, repository_id = configuration.check_loaded()
assert ReportTrigger.configuration_schema(repository_id, configuration) == {}
assert ReportTrigger.configuration_schema(configuration) == {}
def test_configuration_schema_empty(configuration: Configuration) -> None:
"""
must return default schema if no configuration set
"""
_, repository_id = configuration.check_loaded()
assert ReportTrigger.configuration_schema(repository_id, None) == ReportTrigger.CONFIGURATION_SCHEMA
assert ReportTrigger.configuration_schema(None) == ReportTrigger.CONFIGURATION_SCHEMA
def test_configuration_schema_variables() -> None:

View File

@ -153,38 +153,12 @@ def test_on_start(trigger_loader: TriggerLoader, mocker: MockerFixture) -> None:
"""
upload_mock = mocker.patch("ahriman.core.upload.UploadTrigger.on_start")
report_mock = mocker.patch("ahriman.core.report.ReportTrigger.on_start")
atexit_mock = mocker.patch("atexit.register")
trigger_loader.on_start()
assert trigger_loader._on_stop_requested
report_mock.assert_called_once_with()
upload_mock.assert_called_once_with()
def test_on_stop_with_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
"""
must call on_stop on exit if on_start was called
"""
mocker.patch("ahriman.core.upload.UploadTrigger.on_start")
mocker.patch("ahriman.core.report.ReportTrigger.on_start")
on_stop_mock = mocker.patch("ahriman.core.triggers.trigger_loader.TriggerLoader.on_stop")
_, repository_id = configuration.check_loaded()
trigger_loader = TriggerLoader.load(repository_id, configuration)
trigger_loader.on_start()
del trigger_loader
on_stop_mock.assert_called_once_with()
def test_on_stop_without_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
"""
must call not on_stop on exit if on_start wasn't called
"""
on_stop_mock = mocker.patch("ahriman.core.triggers.trigger_loader.TriggerLoader.on_stop")
_, repository_id = configuration.check_loaded()
trigger_loader = TriggerLoader.load(repository_id, configuration)
del trigger_loader
on_stop_mock.assert_not_called()
atexit_mock.assert_called_once_with(trigger_loader.on_stop)
def test_on_stop(trigger_loader: TriggerLoader, mocker: MockerFixture) -> None:

View File

@ -140,8 +140,6 @@ dynamic_version = "{[project]name}.__version__"
extras = [
{ replace = "ref", of = ["project", "extras"], extend = true },
]
# TODO: steamline shlex usage after https://github.com/iterative/shtab/pull/192 merge
handle_redirect = true
pip_pre = true
set_env.PYTHONPATH = "src"
set_env.SPHINX_APIDOC_OPTIONS = "members,no-undoc-members,show-inheritance"
@ -149,18 +147,14 @@ commands = [
[
"shtab",
{ replace = "ref", of = ["flags", "shtab"], extend = true },
"--shell",
"bash",
">",
"package/share/bash-completion/completions/_ahriman",
"--shell", "bash",
"--output", "package/share/bash-completion/completions/_ahriman",
],
[
"shtab",
{ replace = "ref", of = ["flags", "shtab"], extend = true },
"--shell",
"zsh",
">",
"package/share/zsh/site-functions/_ahriman",
"--shell", "zsh",
"--output", "package/share/zsh/site-functions/_ahriman",
],
[
"argparse-manpage",

View File

@ -18,11 +18,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import importlib
import shlex
import sys
from tox.config.sets import EnvConfigSet
from tox.config.types import Command
from tox.plugin import impl
from tox.session.state import State
from tox.tox_env.api import ToxEnv
@ -56,35 +54,6 @@ def _extract_version(env_conf: EnvConfigSet, python_path: str | None = None) ->
return {"VERSION": version}
def _wrap_commands(env_conf: EnvConfigSet, shell: str = "bash") -> None:
"""
wrap commands into shell if there is redirect
Args:
env_conf(EnvConfigSet): the core configuration object
shell(str, optional): shell command to use (Default value = "bash")
"""
if not env_conf["handle_redirect"]:
return
# append shell just in case
env_conf["allowlist_externals"].append(shell)
for command in env_conf["commands"]:
if len(command.args) < 3: # command itself, redirect and output
continue
redirect, output = command.args[-2:]
if redirect not in (">", "2>", "&>"):
continue
command.args = [
shell,
"-c",
f"{Command(command.args[:-2]).shell} {redirect} {shlex.quote(output)}",
]
@impl
def tox_add_env_config(env_conf: EnvConfigSet, state: State) -> None:
"""
@ -103,12 +72,6 @@ def tox_add_env_config(env_conf: EnvConfigSet, state: State) -> None:
default="",
desc="import path for the version variable",
)
env_conf.add_config(
keys=["handle_redirect"],
of_type=bool,
default=False,
desc="wrap commands to handle redirects if any",
)
@impl
@ -124,5 +87,3 @@ def tox_before_run_commands(tox_env: ToxEnv) -> None:
python_path = set_env.load("PYTHONPATH") if "PYTHONPATH" in set_env else None
set_env.update(_extract_version(env_conf, python_path))
_wrap_commands(env_conf)