Compare commits

..

5 Commits

25 changed files with 180 additions and 726 deletions

View File

@ -1,29 +0,0 @@
ahriman.core.archive package
============================
Submodules
----------
ahriman.core.archive.archive\_tree module
-----------------------------------------
.. automodule:: ahriman.core.archive.archive_tree
:members:
:no-undoc-members:
:show-inheritance:
ahriman.core.archive.archive\_trigger module
--------------------------------------------
.. automodule:: ahriman.core.archive.archive_trigger
:members:
:no-undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: ahriman.core.archive
:members:
:no-undoc-members:
:show-inheritance:

View File

@ -8,7 +8,6 @@ Subpackages
:maxdepth: 4 :maxdepth: 4
ahriman.core.alpm ahriman.core.alpm
ahriman.core.archive
ahriman.core.auth ahriman.core.auth
ahriman.core.build_tools ahriman.core.build_tools
ahriman.core.configuration ahriman.core.configuration

View File

@ -1,6 +1,5 @@
[build] [build]
; List of well-known triggers. Used only for configuration purposes. ; List of well-known triggers. Used only for configuration purposes.
triggers_known[] = ahriman.core.archive.ArchiveTrigger
triggers_known[] = ahriman.core.distributed.WorkerLoaderTrigger triggers_known[] = ahriman.core.distributed.WorkerLoaderTrigger
triggers_known[] = ahriman.core.distributed.WorkerTrigger triggers_known[] = ahriman.core.distributed.WorkerTrigger
triggers_known[] = ahriman.core.support.KeyringTrigger triggers_known[] = ahriman.core.support.KeyringTrigger

View File

@ -66,7 +66,7 @@ class Status(Handler):
Status.check_status(args.exit_code, packages) Status.check_status(args.exit_code, packages)
comparator: Callable[[tuple[Package, BuildStatus]], Comparable] = lambda item: item[0].base comparator: Callable[[tuple[Package, BuildStatus]], Comparable] = lambda item: item[0].base
filter_fn: Callable[[tuple[Package, BuildStatus]], bool] = \ filter_fn: Callable[[tuple[Package, BuildStatus]], bool] =\
lambda item: args.status is None or item[1].status == args.status lambda item: args.status is None or item[1].status == args.status
for package, package_status in sorted(filter(filter_fn, packages), key=comparator): for package, package_status in sorted(filter(filter_fn, packages), key=comparator):
PackagePrinter(package, package_status)(verbose=args.info) PackagePrinter(package, package_status)(verbose=args.info)

View File

@ -52,7 +52,7 @@ class Validate(Handler):
""" """
from ahriman.core.configuration.validator import Validator from ahriman.core.configuration.validator import Validator
schema = Validate.schema(configuration) schema = Validate.schema(repository_id, configuration)
validator = Validator(configuration=configuration, schema=schema) validator = Validator(configuration=configuration, schema=schema)
if validator.validate(configuration.dump()): if validator.validate(configuration.dump()):
@ -83,11 +83,12 @@ class Validate(Handler):
return parser return parser
@staticmethod @staticmethod
def schema(configuration: Configuration) -> ConfigurationSchema: def schema(repository_id: RepositoryId, configuration: Configuration) -> ConfigurationSchema:
""" """
get schema with triggers get schema with triggers
Args: Args:
repository_id(RepositoryId): repository unique identifier
configuration(Configuration): configuration instance configuration(Configuration): configuration instance
Returns: Returns:
@ -106,12 +107,12 @@ class Validate(Handler):
continue continue
# default settings if any # default settings if any
for schema_name, schema in trigger_class.configuration_schema(None).items(): for schema_name, schema in trigger_class.configuration_schema(repository_id, None).items():
erased = Validate.schema_erase_required(copy.deepcopy(schema)) erased = Validate.schema_erase_required(copy.deepcopy(schema))
root[schema_name] = Validate.schema_merge(root.get(schema_name, {}), erased) root[schema_name] = Validate.schema_merge(root.get(schema_name, {}), erased)
# settings according to enabled triggers # settings according to enabled triggers
for schema_name, schema in trigger_class.configuration_schema(configuration).items(): for schema_name, schema in trigger_class.configuration_schema(repository_id, configuration).items():
root[schema_name] = Validate.schema_merge(root.get(schema_name, {}), copy.deepcopy(schema)) root[schema_name] = Validate.schema_merge(root.get(schema_name, {}), copy.deepcopy(schema))
return root return root

View File

@ -88,24 +88,22 @@ class Repo(LazyLogging):
check_output("repo-add", *self.sign_args, str(self.repo_path), check_output("repo-add", *self.sign_args, str(self.repo_path),
cwd=self.root, logger=self.logger, user=self.uid) cwd=self.root, logger=self.logger, user=self.uid)
def remove(self, package_name: str | None, filename: Path) -> None: def remove(self, package: str, filename: Path) -> None:
""" """
remove package from repository remove package from repository
Args: Args:
package_name(str | None): package name to remove. If none set, it will be guessed from filename package(str): package name to remove
filename(Path): package filename to remove filename(Path): package filename to remove
""" """
package_name = package_name or filename.name.rsplit("-", maxsplit=3)[0]
# remove package and signature (if any) from filesystem # remove package and signature (if any) from filesystem
for full_path in self.root.glob(f"**/{filename.name}*"): for full_path in self.root.glob(f"**/{filename.name}*"):
full_path.unlink() full_path.unlink()
# remove package from registry # remove package from registry
check_output( check_output(
"repo-remove", *self.sign_args, str(self.repo_path), package_name, "repo-remove", *self.sign_args, str(self.repo_path), package,
exception=BuildError.from_process(package_name), exception=BuildError.from_process(package),
cwd=self.root, cwd=self.root,
logger=self.logger, logger=self.logger,
user=self.uid, user=self.uid,

View File

@ -1,20 +0,0 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.core.archive.archive_trigger import ArchiveTrigger

View File

@ -1,130 +0,0 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import datetime
from pathlib import Path
from ahriman.core.alpm.repo import Repo
from ahriman.core.log import LazyLogging
from ahriman.core.utils import utcnow, walk
from ahriman.models.package import Package
from ahriman.models.repository_paths import RepositoryPaths
class ArchiveTree(LazyLogging):
"""
wrapper around archive tree
Attributes:
paths(RepositoryPaths): repository paths instance
repository_id(RepositoryId): repository unique identifier
sign_args(list[str]): additional args which have to be used to sign repository archive
"""
def __init__(self, repository_path: RepositoryPaths, sign_args: list[str]) -> None:
"""
Args:
repository_path(RepositoryPaths): repository paths instance
sign_args(list[str]): additional args which have to be used to sign repository archive
"""
self.paths = repository_path
self.repository_id = repository_path.repository_id
self.sign_args = sign_args
def repository_for(self, date: datetime.date | None = None) -> Path:
"""
get full path to repository at the specified date
Args:
date(datetime.date | None, optional): date to generate path. If none supplied then today will be used
(Default value = None)
Returns:
Path: path to the repository root
"""
date = date or utcnow().date()
return (
self.paths.archive
/ "repos"
/ date.strftime("%Y")
/ date.strftime("%m")
/ date.strftime("%d")
/ self.repository_id.name
/ self.repository_id.architecture
)
def symlinks_create(self, packages: list[Package]) -> None:
"""
create symlinks for the specified packages in today's repository
Args:
packages(list[Package]): list of packages to be updated
"""
root = self.repository_for()
repo = Repo(self.repository_id.name, self.paths, self.sign_args, root)
for package in packages:
archive = self.paths.archive_for(package.base)
for package_name, single in package.packages.items():
if single.filename is None:
self.logger.warning("received empty package filename for %s", package_name)
continue
has_file = False
for file in archive.glob(f"{single.filename}*"):
symlink = root / file.name
if symlink.exists():
continue # symlink is already created, skip processing
has_file = True
symlink.symlink_to(file.relative_to(symlink.parent, walk_up=True))
if has_file:
repo.add(root / single.filename)
def symlinks_fix(self) -> None:
"""
remove broken symlinks across repositories for all dates
"""
for path in walk(self.paths.archive / "repos"):
root = path.parent
*_, name, architecture = root.parts
if self.repository_id.name != name or self.repository_id.architecture != architecture:
continue # we only process same name repositories
if not path.is_symlink():
continue # find symlinks only
if path.exists():
continue # filter out not broken symlinks
Repo(self.repository_id.name, self.paths, self.sign_args, root).remove(None, path)
def tree_create(self) -> None:
"""
create repository tree for current repository
"""
root = self.repository_for()
if root.exists():
return
with self.paths.preserve_owner(self.paths.archive):
root.mkdir(0o755, parents=True)
# init empty repository here
Repo(self.repository_id.name, self.paths, self.sign_args, root).init()

View File

@ -1,69 +0,0 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.core.archive.archive_tree import ArchiveTree
from ahriman.core.configuration import Configuration
from ahriman.core.sign.gpg import GPG
from ahriman.core.triggers import Trigger
from ahriman.models.package import Package
from ahriman.models.repository_id import RepositoryId
from ahriman.models.result import Result
class ArchiveTrigger(Trigger):
"""
archive repository extension
Attributes:
paths(RepositoryPaths): repository paths instance
tree(ArchiveTree): archive tree wrapper
"""
def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None:
"""
Args:
repository_id(RepositoryId): repository unique identifier
configuration(Configuration): configuration instance
"""
Trigger.__init__(self, repository_id, configuration)
self.paths = configuration.repository_paths
self.tree = ArchiveTree(self.paths, GPG(configuration).repository_sign_args)
def on_result(self, result: Result, packages: list[Package]) -> None:
"""
run trigger
Args:
result(Result): build result
packages(list[Package]): list of all available packages
"""
self.tree.symlinks_create(packages)
def on_start(self) -> None:
"""
trigger action which will be called at the start of the application
"""
self.tree.tree_create()
def on_stop(self) -> None:
"""
trigger action which will be called before the stop of the application
"""
self.tree.symlinks_fix()

View File

@ -57,7 +57,7 @@ class ConfigurationMultiDict(dict[str, Any]):
OptionError: if the key already exists in the dictionary, but not a single value list or a string OptionError: if the key already exists in the dictionary, but not a single value list or a string
""" """
match self.get(key): match self.get(key):
case [current_value] | (str() as current_value): case [current_value] | str(current_value):
value = f"{current_value} {value}" value = f"{current_value} {value}"
case None: case None:
pass pass

View File

@ -52,24 +52,16 @@ class Executor(PackageInfo, Cleaner):
Path: list of built packages and signatures if available, empty list otherwise Path: list of built packages and signatures if available, empty list otherwise
""" """
archive = self.paths.archive_for(package.base) archive = self.paths.archive_for(package.base)
for path in filter(package_like, archive.iterdir()):
# find all packages which have same version built = Package.from_archive(path, self.pacman)
same_version = [ # check if there is an archive with exact same version
built if built.version != package.version:
for path in filter(package_like, archive.iterdir()) continue
if (built := Package.from_archive(path, self.pacman)).version == package.version for single in built.packages.values():
] # we allow packages with either same architecture or any
# no packages of the same version found if single.architecture not in ("any", self.architecture):
if not same_version: continue
return yield from archive.glob(f"{single.filename}*")
packages = [single for built in same_version for single in built.packages.values()]
# all packages must be either any or same architecture
if not all(single.architecture in ("any", self.architecture) for single in packages):
return
for single in packages:
yield from archive.glob(f"{single.filename}*")
def _archive_rename(self, description: PackageDescription, package_base: str) -> None: def _archive_rename(self, description: PackageDescription, package_base: str) -> None:
""" """
@ -80,7 +72,7 @@ class Executor(PackageInfo, Cleaner):
package_base(str): package base name package_base(str): package base name
""" """
if description.filename is None: if description.filename is None:
self.logger.warning("received empty package filename for base %s", package_base) self.logger.warning("received empty package name for base %s", package_base)
return # suppress type checking, it never can be none actually return # suppress type checking, it never can be none actually
if (safe := safe_filename(description.filename)) != description.filename: if (safe := safe_filename(description.filename)) != description.filename:
@ -109,7 +101,6 @@ class Executor(PackageInfo, Cleaner):
loaded_package = Package.from_build(path, self.architecture, None) loaded_package = Package.from_build(path, self.architecture, None)
if prebuilt := list(self._archive_lookup(loaded_package)): if prebuilt := list(self._archive_lookup(loaded_package)):
self.logger.info("using prebuilt packages for %s-%s", loaded_package.base, loaded_package.version)
built = [] built = []
for artefact in prebuilt: for artefact in prebuilt:
with filelock(artefact): with filelock(artefact):
@ -161,7 +152,7 @@ class Executor(PackageInfo, Cleaner):
packager_key(str | None): packager key identifier packager_key(str | None): packager key identifier
""" """
if filename is None: if filename is None:
self.logger.warning("received empty package filename for base %s", package_base) self.logger.warning("received empty package name for base %s", package_base)
return # suppress type checking, it never can be none actually return # suppress type checking, it never can be none actually
# in theory, it might be NOT packages directory, but we suppose it is # in theory, it might be NOT packages directory, but we suppose it is

View File

@ -36,7 +36,6 @@ class Trigger(LazyLogging):
CONFIGURATION_SCHEMA(ConfigurationSchema): (class attribute) configuration schema template CONFIGURATION_SCHEMA(ConfigurationSchema): (class attribute) configuration schema template
CONFIGURATION_SCHEMA_FALLBACK(str | None): (class attribute) optional fallback option for defining CONFIGURATION_SCHEMA_FALLBACK(str | None): (class attribute) optional fallback option for defining
configuration schema type used configuration schema type used
REQUIRES_REPOSITORY(bool): (class attribute) either trigger requires loaded repository or not
configuration(Configuration): configuration instance configuration(Configuration): configuration instance
repository_id(RepositoryId): repository unique identifier repository_id(RepositoryId): repository unique identifier
@ -60,7 +59,6 @@ class Trigger(LazyLogging):
CONFIGURATION_SCHEMA: ClassVar[ConfigurationSchema] = {} CONFIGURATION_SCHEMA: ClassVar[ConfigurationSchema] = {}
CONFIGURATION_SCHEMA_FALLBACK: ClassVar[str | None] = None CONFIGURATION_SCHEMA_FALLBACK: ClassVar[str | None] = None
REQUIRES_REPOSITORY: ClassVar[bool] = True
def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None: def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None:
""" """
@ -81,18 +79,9 @@ class Trigger(LazyLogging):
""" """
return self.repository_id.architecture return self.repository_id.architecture
@property
def is_allowed_to_run(self) -> bool:
"""
whether trigger allowed to run or not
Returns:
bool: ``True`` in case if trigger allowed to run and ``False`` otherwise
"""
return not (self.REQUIRES_REPOSITORY and self.repository_id.is_empty)
@classmethod @classmethod
def configuration_schema(cls, configuration: Configuration | None) -> ConfigurationSchema: def configuration_schema(cls, repository_id: RepositoryId,
configuration: Configuration | None) -> ConfigurationSchema:
""" """
configuration schema based on supplied service configuration configuration schema based on supplied service configuration
@ -100,6 +89,7 @@ class Trigger(LazyLogging):
Schema must be in cerberus format, for details and examples you can check built-in triggers. Schema must be in cerberus format, for details and examples you can check built-in triggers.
Args: Args:
repository_id(str): repository unique identifier
configuration(Configuration | None): configuration instance. If set to None, the default schema configuration(Configuration | None): configuration instance. If set to None, the default schema
should be returned should be returned
@ -111,15 +101,13 @@ class Trigger(LazyLogging):
result: ConfigurationSchema = {} result: ConfigurationSchema = {}
for target in cls.configuration_sections(configuration): for target in cls.configuration_sections(configuration):
for section in configuration.sections(): if not configuration.has_section(target):
if not (section == target or section.startswith(f"{target}:")): continue
# either repository specific or exact name section, schema_name = configuration.gettype(
continue target, repository_id, fallback=cls.CONFIGURATION_SCHEMA_FALLBACK)
schema_name = configuration.get(section, "type", fallback=section) if schema_name not in cls.CONFIGURATION_SCHEMA:
continue
if schema_name not in cls.CONFIGURATION_SCHEMA: result[section] = cls.CONFIGURATION_SCHEMA[schema_name]
continue
result[section] = cls.CONFIGURATION_SCHEMA[schema_name]
return result return result

View File

@ -17,7 +17,6 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
import atexit
import contextlib import contextlib
import os import os
@ -61,8 +60,17 @@ class TriggerLoader(LazyLogging):
def __init__(self) -> None: def __init__(self) -> None:
"""""" """"""
self._on_stop_requested = False
self.triggers: list[Trigger] = [] self.triggers: list[Trigger] = []
def __del__(self) -> None:
"""
custom destructor object which calls on_stop in case if it was requested
"""
if not self._on_stop_requested:
return
self.on_stop()
@classmethod @classmethod
def load(cls, repository_id: RepositoryId, configuration: Configuration) -> Self: def load(cls, repository_id: RepositoryId, configuration: Configuration) -> Self:
""" """
@ -77,9 +85,8 @@ class TriggerLoader(LazyLogging):
""" """
instance = cls() instance = cls()
instance.triggers = [ instance.triggers = [
trigger instance.load_trigger(trigger, repository_id, configuration)
for trigger_name in instance.selected_triggers(configuration) for trigger in instance.selected_triggers(configuration)
if (trigger := instance.load_trigger(trigger_name, repository_id, configuration)).is_allowed_to_run
] ]
return instance return instance
@ -243,11 +250,10 @@ class TriggerLoader(LazyLogging):
run triggers on load run triggers on load
""" """
self.logger.debug("executing triggers on start") self.logger.debug("executing triggers on start")
self._on_stop_requested = True
for trigger in self.triggers: for trigger in self.triggers:
with self.__execute_trigger(trigger): with self.__execute_trigger(trigger):
trigger.on_start() trigger.on_start()
# register on_stop call
atexit.register(self.on_stop)
def on_stop(self) -> None: def on_stop(self) -> None:
""" """

View File

@ -37,7 +37,6 @@ SUBPACKAGES = {
"ahriman-triggers": [ "ahriman-triggers": [
prefix / "share" / "ahriman" / "settings" / "ahriman.ini.d" / "00-triggers.ini", prefix / "share" / "ahriman" / "settings" / "ahriman.ini.d" / "00-triggers.ini",
site_packages / "ahriman" / "application" / "handlers" / "triggers_support.py", site_packages / "ahriman" / "application" / "handlers" / "triggers_support.py",
site_packages / "ahriman" / "core" / "archive",
site_packages / "ahriman" / "core" / "distributed", site_packages / "ahriman" / "core" / "distributed",
site_packages / "ahriman" / "core" / "support", site_packages / "ahriman" / "core" / "support",
], ],

View File

@ -2,7 +2,6 @@ import argparse
import json import json
import pytest import pytest
from pathlib import Path
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from ahriman.application.handlers.validate import Validate from ahriman.application.handlers.validate import Validate
@ -54,50 +53,12 @@ def test_run_skip(args: argparse.Namespace, configuration: Configuration, mocker
print_mock.assert_not_called() print_mock.assert_not_called()
def test_run_default(args: argparse.Namespace, configuration: Configuration) -> None:
"""
must run on default configuration without errors
"""
args.exit_code = True
_, repository_id = configuration.check_loaded()
default = Configuration.from_path(Configuration.SYSTEM_CONFIGURATION_PATH, repository_id)
# copy autogenerated values
for section, key in (("build", "build_command"), ("repository", "root")):
value = configuration.get(section, key)
default.set_option(section, key, value)
Validate.run(args, repository_id, default, report=False)
def test_run_repo_specific_triggers(args: argparse.Namespace, configuration: Configuration,
resource_path_root: Path) -> None:
"""
must correctly insert repo specific triggers
"""
args.exit_code = True
_, repository_id = configuration.check_loaded()
# remove unused sections
for section in ("archive", "customs3", "github:x86_64", "logs-rotation", "mirrorlist"):
configuration.remove_section(section)
configuration.set_option("report", "target", "test")
for section in ("test", "test:i686", "test:another-repo:x86_64"):
configuration.set_option(section, "type", "html")
configuration.set_option(section, "link_path", "http://link_path")
configuration.set_option(section, "path", "path")
configuration.set_option(section, "template", "template")
configuration.set_option(section, "templates", str(resource_path_root))
Validate.run(args, repository_id, configuration, report=False)
def test_schema(configuration: Configuration) -> None: def test_schema(configuration: Configuration) -> None:
""" """
must generate full schema correctly must generate full schema correctly
""" """
schema = Validate.schema(configuration) _, repository_id = configuration.check_loaded()
schema = Validate.schema(repository_id, configuration)
# defaults # defaults
assert schema.pop("console") assert schema.pop("console")
@ -130,7 +91,9 @@ def test_schema_invalid_trigger(configuration: Configuration) -> None:
""" """
configuration.set_option("build", "triggers", "some.invalid.trigger.path.Trigger") configuration.set_option("build", "triggers", "some.invalid.trigger.path.Trigger")
configuration.remove_option("build", "triggers_known") configuration.remove_option("build", "triggers_known")
assert Validate.schema(configuration) == CONFIGURATION_SCHEMA _, repository_id = configuration.check_loaded()
assert Validate.schema(repository_id, configuration) == CONFIGURATION_SCHEMA
def test_schema_erase_required() -> None: def test_schema_erase_required() -> None:

View File

@ -4,7 +4,6 @@ from pathlib import Path
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from ahriman.core.alpm.repo import Repo from ahriman.core.alpm.repo import Repo
from ahriman.models.package import Package
from ahriman.models.repository_paths import RepositoryPaths from ahriman.models.repository_paths import RepositoryPaths
@ -57,37 +56,21 @@ def test_repo_init(repo: Repo, mocker: MockerFixture) -> None:
assert check_output_mock.call_args[0][0] == "repo-add" assert check_output_mock.call_args[0][0] == "repo-add"
def test_repo_remove(repo: Repo, package_ahriman: Package, mocker: MockerFixture) -> None: def test_repo_remove(repo: Repo, mocker: MockerFixture) -> None:
""" """
must call repo-remove on package removal must call repo-remove on package addition
""" """
filepath = package_ahriman.packages[package_ahriman.base].filepath
mocker.patch("pathlib.Path.glob", return_value=[]) mocker.patch("pathlib.Path.glob", return_value=[])
check_output_mock = mocker.patch("ahriman.core.alpm.repo.check_output") check_output_mock = mocker.patch("ahriman.core.alpm.repo.check_output")
repo.remove(package_ahriman.base, filepath) repo.remove("package", Path("package.pkg.tar.xz"))
check_output_mock.assert_called_once() # it will be checked later check_output_mock.assert_called_once() # it will be checked later
assert check_output_mock.call_args[0][0] == "repo-remove" assert check_output_mock.call_args[0][0] == "repo-remove"
assert package_ahriman.base in check_output_mock.call_args[0]
def test_repo_remove_guess_package(repo: Repo, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must call repo-remove on package removal if no package name set
"""
filepath = package_ahriman.packages[package_ahriman.base].filepath
mocker.patch("pathlib.Path.glob", return_value=[])
check_output_mock = mocker.patch("ahriman.core.alpm.repo.check_output")
repo.remove(None, filepath)
check_output_mock.assert_called_once() # it will be checked later
assert check_output_mock.call_args[0][0] == "repo-remove"
assert package_ahriman.base in check_output_mock.call_args[0]
def test_repo_remove_fail_no_file(repo: Repo, mocker: MockerFixture) -> None: def test_repo_remove_fail_no_file(repo: Repo, mocker: MockerFixture) -> None:
""" """
must fail removal on missing file must fail on missing file
""" """
mocker.patch("pathlib.Path.glob", return_value=[Path("package.pkg.tar.xz")]) mocker.patch("pathlib.Path.glob", return_value=[Path("package.pkg.tar.xz")])
mocker.patch("pathlib.Path.unlink", side_effect=FileNotFoundError) mocker.patch("pathlib.Path.unlink", side_effect=FileNotFoundError)

View File

@ -1,34 +0,0 @@
import pytest
from ahriman.core.archive import ArchiveTrigger
from ahriman.core.archive.archive_tree import ArchiveTree
from ahriman.core.configuration import Configuration
@pytest.fixture
def archive_tree(configuration: Configuration) -> ArchiveTree:
"""
archive tree fixture
Args:
configuration(Configuration): configuration fixture
Returns:
ArchiveTree: archive tree test instance
"""
return ArchiveTree(configuration.repository_paths, [])
@pytest.fixture
def archive_trigger(configuration: Configuration) -> ArchiveTrigger:
"""
archive trigger fixture
Args:
configuration(Configuration): configuration fixture
Returns:
ArchiveTrigger: archive trigger test instance
"""
_, repository_id = configuration.check_loaded()
return ArchiveTrigger(repository_id, configuration)

View File

@ -1,135 +0,0 @@
from pathlib import Path
from pytest_mock import MockerFixture
from ahriman.core.archive.archive_tree import ArchiveTree
from ahriman.core.utils import utcnow
from ahriman.models.package import Package
def test_repository_for(archive_tree: ArchiveTree) -> None:
"""
must correctly generate path to repository
"""
path = archive_tree.repository_for()
assert path.is_relative_to(archive_tree.paths.archive / "repos")
assert (archive_tree.repository_id.name, archive_tree.repository_id.architecture) == path.parts[-2:]
assert set(map("{:02d}".format, utcnow().timetuple()[:3])).issubset(path.parts)
def test_symlinks_create(archive_tree: ArchiveTree, package_ahriman: Package, package_python_schedule: Package,
mocker: MockerFixture) -> None:
"""
must create symlinks
"""
_original_exists = Path.exists
def exists_mock(path: Path) -> bool:
if path.name in (package.filename for package in package_python_schedule.packages.values()):
return True
return _original_exists(path)
symlinks_mock = mocker.patch("pathlib.Path.symlink_to")
add_mock = mocker.patch("ahriman.core.alpm.repo.Repo.add")
mocker.patch("pathlib.Path.glob", autospec=True, side_effect=lambda path, name: [path / name[:-1]])
mocker.patch("pathlib.Path.exists", autospec=True, side_effect=exists_mock)
archive_tree.symlinks_create([package_ahriman, package_python_schedule])
symlinks_mock.assert_called_once_with(
Path("..") /
".." /
".." /
".." /
".." /
".." /
archive_tree.paths.archive_for(package_ahriman.base)
.relative_to(archive_tree.paths.root)
.relative_to("archive") /
package_ahriman.packages[package_ahriman.base].filename
)
add_mock.assert_called_once_with(
archive_tree.repository_for() / package_ahriman.packages[package_ahriman.base].filename
)
def test_symlinks_create_empty_filename(archive_tree: ArchiveTree, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must skip symlinks creation if filename is not set
"""
package_ahriman.packages[package_ahriman.base].filename = None
symlinks_mock = mocker.patch("pathlib.Path.symlink_to")
archive_tree.symlinks_create([package_ahriman])
symlinks_mock.assert_not_called()
def test_symlinks_fix(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
"""
must fix broken symlinks
"""
_original_exists = Path.exists
def exists_mock(path: Path) -> bool:
if path.name == "symlink":
return True
return _original_exists(path)
mocker.patch("pathlib.Path.is_symlink", side_effect=[True, True, False])
mocker.patch("pathlib.Path.exists", autospec=True, side_effect=exists_mock)
walk_mock = mocker.patch("ahriman.core.archive.archive_tree.walk", return_value=[
archive_tree.repository_for() / filename
for filename in ("symlink", "broken_symlink", "file")
])
remove_mock = mocker.patch("ahriman.core.alpm.repo.Repo.remove")
archive_tree.symlinks_fix()
walk_mock.assert_called_once_with(archive_tree.paths.archive / "repos")
remove_mock.assert_called_once_with(None, archive_tree.repository_for() / "broken_symlink")
def test_symlinks_fix_foreign_repository(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
"""
must skip symlinks check if repository name or architecture doesn't match
"""
_original_exists = Path.exists
def exists_mock(path: Path) -> bool:
if path.name == "symlink":
return True
return _original_exists(path)
mocker.patch("pathlib.Path.is_symlink", side_effect=[True, True, False])
mocker.patch("pathlib.Path.exists", autospec=True, side_effect=exists_mock)
mocker.patch("ahriman.core.archive.archive_tree.walk", return_value=[
archive_tree.repository_for().with_name("i686") / filename
for filename in ("symlink", "broken_symlink", "file")
])
remove_mock = mocker.patch("ahriman.core.alpm.repo.Repo.remove")
archive_tree.symlinks_fix()
remove_mock.assert_not_called()
def test_tree_create(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
"""
must create repository root if not exists
"""
owner_guard_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
init_mock = mocker.patch("ahriman.core.alpm.repo.Repo.init")
archive_tree.tree_create()
owner_guard_mock.assert_called_once_with(archive_tree.paths.archive)
mkdir_mock.assert_called_once_with(0o755, parents=True)
init_mock.assert_called_once_with()
def test_tree_create_exists(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
"""
must skip directory creation if already exists
"""
mocker.patch("pathlib.Path.exists", return_value=True)
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
archive_tree.tree_create()
mkdir_mock.assert_not_called()

View File

@ -1,32 +0,0 @@
from pytest_mock import MockerFixture
from ahriman.core.archive import ArchiveTrigger
from ahriman.models.package import Package
from ahriman.models.result import Result
def test_on_result(archive_trigger: ArchiveTrigger, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must create symlinks for actual repository
"""
symlinks_mock = mocker.patch("ahriman.core.archive.archive_tree.ArchiveTree.symlinks_create")
archive_trigger.on_result(Result(), [package_ahriman])
symlinks_mock.assert_called_once_with([package_ahriman])
def test_on_start(archive_trigger: ArchiveTrigger, mocker: MockerFixture) -> None:
"""
must create repository tree on load
"""
tree_mock = mocker.patch("ahriman.core.archive.archive_tree.ArchiveTree.tree_create")
archive_trigger.on_start()
tree_mock.assert_called_once_with()
def test_on_stop(archive_trigger: ArchiveTrigger, mocker: MockerFixture) -> None:
"""
must create repository tree on load
"""
symlinks_mock = mocker.patch("ahriman.core.archive.archive_tree.ArchiveTree.symlinks_fix")
archive_trigger.on_stop()
symlinks_mock.assert_called_once_with()

View File

@ -1,6 +1,5 @@
import pytest import pytest
from dataclasses import replace
from pathlib import Path from pathlib import Path
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from typing import Any from typing import Any
@ -14,57 +13,6 @@ from ahriman.models.packagers import Packagers
from ahriman.models.user import User from ahriman.models.user import User
def test_archive_lookup(executor: Executor, package_ahriman: Package, package_python_schedule: Package,
mocker: MockerFixture) -> None:
"""
must existing packages which match the version
"""
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
Path("2.pkg.tar.zst"),
Path("3.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", side_effect=[
package_ahriman,
package_python_schedule,
replace(package_ahriman, version="1"),
])
glob_mock = mocker.patch("pathlib.Path.glob", return_value=[Path("1.pkg.tar.xz")])
assert list(executor._archive_lookup(package_ahriman)) == [Path("1.pkg.tar.xz")]
glob_mock.assert_called_once_with(f"{package_ahriman.packages[package_ahriman.base].filename}*")
def test_archive_lookup_version_mismatch(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must return nothing if no packages found with the same version
"""
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", return_value=replace(package_ahriman, version="1"))
assert list(executor._archive_lookup(package_ahriman)) == []
def test_archive_lookup_architecture_mismatch(executor: Executor, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must return nothing if architecture doesn't match
"""
package_ahriman.packages[package_ahriman.base].architecture = "x86_64"
mocker.patch("ahriman.core.repository.executor.Executor.architecture", return_value="i686")
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", return_value=package_ahriman)
assert list(executor._archive_lookup(package_ahriman)) == []
def test_archive_rename(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None: def test_archive_rename(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must correctly remove package archive must correctly remove package archive
@ -97,39 +45,16 @@ def test_package_build(executor: Executor, package_ahriman: Package, mocker: Moc
mocker.patch("ahriman.core.build_tools.task.Task.build", return_value=[Path(package_ahriman.base)]) mocker.patch("ahriman.core.build_tools.task.Task.build", return_value=[Path(package_ahriman.base)])
status_client_mock = mocker.patch("ahriman.core.status.Client.set_building") status_client_mock = mocker.patch("ahriman.core.status.Client.set_building")
init_mock = mocker.patch("ahriman.core.build_tools.task.Task.init", return_value="sha") init_mock = mocker.patch("ahriman.core.build_tools.task.Task.init", return_value="sha")
package_mock = mocker.patch("ahriman.models.package.Package.from_build", return_value=package_ahriman)
lookup_mock = mocker.patch("ahriman.core.repository.executor.Executor._archive_lookup", return_value=[])
with_packages_mock = mocker.patch("ahriman.models.package.Package.with_packages") with_packages_mock = mocker.patch("ahriman.models.package.Package.with_packages")
rename_mock = mocker.patch("ahriman.core.repository.executor.atomic_move") rename_mock = mocker.patch("ahriman.core.repository.executor.atomic_move")
assert executor._package_build(package_ahriman, Path("local"), "packager", None) == "sha" assert executor._package_build(package_ahriman, Path("local"), "packager", None) == "sha"
status_client_mock.assert_called_once_with(package_ahriman.base) status_client_mock.assert_called_once_with(package_ahriman.base)
init_mock.assert_called_once_with(pytest.helpers.anyvar(int), pytest.helpers.anyvar(int), None) init_mock.assert_called_once_with(pytest.helpers.anyvar(int), pytest.helpers.anyvar(int), None)
package_mock.assert_called_once_with(Path("local"), executor.architecture, None)
lookup_mock.assert_called_once_with(package_ahriman)
with_packages_mock.assert_called_once_with([Path(package_ahriman.base)], executor.pacman) with_packages_mock.assert_called_once_with([Path(package_ahriman.base)], executor.pacman)
rename_mock.assert_called_once_with(Path(package_ahriman.base), executor.paths.packages / package_ahriman.base) rename_mock.assert_called_once_with(Path(package_ahriman.base), executor.paths.packages / package_ahriman.base)
def test_package_build_copy(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must copy package from archive if there are already built ones
"""
path = package_ahriman.packages[package_ahriman.base].filepath
mocker.patch("ahriman.core.build_tools.task.Task.build", return_value=[Path(package_ahriman.base)])
mocker.patch("ahriman.core.build_tools.task.Task.init")
mocker.patch("ahriman.models.package.Package.from_build", return_value=package_ahriman)
mocker.patch("ahriman.core.repository.executor.Executor._archive_lookup", return_value=[path])
mocker.patch("ahriman.core.repository.executor.atomic_move")
mocker.patch("ahriman.models.package.Package.with_packages")
copy_mock = mocker.patch("shutil.copy")
rename_mock = mocker.patch("ahriman.core.repository.executor.atomic_move")
executor._package_build(package_ahriman, Path("local"), "packager", None)
copy_mock.assert_called_once_with(path, Path("local"))
rename_mock.assert_called_once_with(Path("local") / path, executor.paths.packages / path)
def test_package_remove(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None: def test_package_remove(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must run remove for packages must run remove for packages
@ -186,7 +111,9 @@ def test_package_update(executor: Executor, package_ahriman: Package, user: User
Path("..") / Path("..") /
".." / ".." /
".." / ".." /
executor.paths.archive_for(package_ahriman.base).relative_to(executor.paths.root) / executor.paths.archive_for(
package_ahriman.base).relative_to(
executor.paths.root) /
filepath) filepath)
# must add package # must add package
repo_add_mock.assert_called_once_with(executor.paths.repository / filepath) repo_add_mock.assert_called_once_with(executor.paths.repository / filepath)

View File

@ -17,7 +17,6 @@ from ahriman.core.utils import (
dataclass_view, dataclass_view,
enum_values, enum_values,
extract_user, extract_user,
filelock,
filter_json, filter_json,
full_version, full_version,
minmax, minmax,
@ -44,12 +43,47 @@ def test_atomic_move(mocker: MockerFixture) -> None:
""" """
must move file with locking must move file with locking
""" """
filelock_mock = mocker.patch("ahriman.core.utils.filelock") lock_mock = mocker.patch("fcntl.flock")
open_mock = mocker.patch("pathlib.Path.open", autospec=True)
move_mock = mocker.patch("shutil.move") move_mock = mocker.patch("shutil.move")
unlink_mock = mocker.patch("pathlib.Path.unlink")
atomic_move(Path("source"), Path("destination")) atomic_move(Path("source"), Path("destination"))
filelock_mock.assert_called_once_with(Path("destination")) open_mock.assert_called_once_with(Path(".destination"), "ab")
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
move_mock.assert_called_once_with(Path("source"), Path("destination")) move_mock.assert_called_once_with(Path("source"), Path("destination"))
unlink_mock.assert_called_once_with(missing_ok=True)
def test_atomic_move_remove_lock(mocker: MockerFixture) -> None:
"""
must remove lock file in case of exception
"""
mocker.patch("pathlib.Path.open", side_effect=Exception)
unlink_mock = mocker.patch("pathlib.Path.unlink")
with pytest.raises(Exception):
atomic_move(Path("source"), Path("destination"))
unlink_mock.assert_called_once_with(missing_ok=True)
def test_atomic_move_unlock(mocker: MockerFixture) -> None:
"""
must unlock file in case of exception
"""
mocker.patch("pathlib.Path.open")
mocker.patch("shutil.move", side_effect=Exception)
lock_mock = mocker.patch("fcntl.flock")
with pytest.raises(Exception):
atomic_move(Path("source"), Path("destination"))
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
def test_check_output(mocker: MockerFixture) -> None: def test_check_output(mocker: MockerFixture) -> None:
@ -271,53 +305,6 @@ def test_extract_user() -> None:
assert extract_user() == "doas" assert extract_user() == "doas"
def test_filelock(mocker: MockerFixture) -> None:
"""
must perform file locking
"""
lock_mock = mocker.patch("fcntl.flock")
open_mock = mocker.patch("pathlib.Path.open", autospec=True)
unlink_mock = mocker.patch("pathlib.Path.unlink")
with filelock(Path("local")):
pass
open_mock.assert_called_once_with(Path(".local"), "ab")
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
unlink_mock.assert_called_once_with(missing_ok=True)
def test_filelock_remove_lock(mocker: MockerFixture) -> None:
"""
must remove lock file in case of exception
"""
mocker.patch("pathlib.Path.open", side_effect=Exception)
unlink_mock = mocker.patch("pathlib.Path.unlink")
with pytest.raises(Exception):
with filelock(Path("local")):
pass
unlink_mock.assert_called_once_with(missing_ok=True)
def test_filelock_unlock(mocker: MockerFixture) -> None:
"""
must unlock file in case of exception
"""
mocker.patch("pathlib.Path.open")
lock_mock = mocker.patch("fcntl.flock")
with pytest.raises(Exception):
with filelock(Path("local")):
raise Exception
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
def test_filter_json(package_ahriman: Package) -> None: def test_filter_json(package_ahriman: Package) -> None:
""" """
must filter fields by known list must filter fields by known list

View File

@ -3,7 +3,6 @@ from unittest.mock import MagicMock
from ahriman.core.configuration import Configuration from ahriman.core.configuration import Configuration
from ahriman.core.report import ReportTrigger from ahriman.core.report import ReportTrigger
from ahriman.core.triggers import Trigger from ahriman.core.triggers import Trigger
from ahriman.models.repository_id import RepositoryId
from ahriman.models.result import Result from ahriman.models.result import Result
@ -14,28 +13,16 @@ def test_architecture(trigger: Trigger) -> None:
assert trigger.architecture == trigger.repository_id.architecture assert trigger.architecture == trigger.repository_id.architecture
def test_is_allowed_to_run(trigger: Trigger) -> None:
"""
must return flag correctly
"""
assert trigger.is_allowed_to_run
trigger.repository_id = RepositoryId("", "")
assert not trigger.is_allowed_to_run
trigger.REQUIRES_REPOSITORY = False
assert trigger.is_allowed_to_run
def test_configuration_schema(configuration: Configuration) -> None: def test_configuration_schema(configuration: Configuration) -> None:
""" """
must return used configuration schema must return used configuration schema
""" """
section = "console" section = "console"
configuration.set_option("report", "target", section) configuration.set_option("report", "target", section)
_, repository_id = configuration.check_loaded()
expected = {section: ReportTrigger.CONFIGURATION_SCHEMA[section]} expected = {section: ReportTrigger.CONFIGURATION_SCHEMA[section]}
assert ReportTrigger.configuration_schema(configuration) == expected assert ReportTrigger.configuration_schema(repository_id, configuration) == expected
def test_configuration_schema_no_section(configuration: Configuration) -> None: def test_configuration_schema_no_section(configuration: Configuration) -> None:
@ -44,7 +31,9 @@ def test_configuration_schema_no_section(configuration: Configuration) -> None:
""" """
section = "abracadabra" section = "abracadabra"
configuration.set_option("report", "target", section) configuration.set_option("report", "target", section)
assert ReportTrigger.configuration_schema(configuration) == {} _, repository_id = configuration.check_loaded()
assert ReportTrigger.configuration_schema(repository_id, configuration) == {}
def test_configuration_schema_no_schema(configuration: Configuration) -> None: def test_configuration_schema_no_schema(configuration: Configuration) -> None:
@ -54,15 +43,17 @@ def test_configuration_schema_no_schema(configuration: Configuration) -> None:
section = "abracadabra" section = "abracadabra"
configuration.set_option("report", "target", section) configuration.set_option("report", "target", section)
configuration.set_option(section, "key", "value") configuration.set_option(section, "key", "value")
_, repository_id = configuration.check_loaded()
assert ReportTrigger.configuration_schema(configuration) == {} assert ReportTrigger.configuration_schema(repository_id, configuration) == {}
def test_configuration_schema_empty(configuration: Configuration) -> None: def test_configuration_schema_empty(configuration: Configuration) -> None:
""" """
must return default schema if no configuration set must return default schema if no configuration set
""" """
assert ReportTrigger.configuration_schema(None) == ReportTrigger.CONFIGURATION_SCHEMA _, repository_id = configuration.check_loaded()
assert ReportTrigger.configuration_schema(repository_id, None) == ReportTrigger.CONFIGURATION_SCHEMA
def test_configuration_schema_variables() -> None: def test_configuration_schema_variables() -> None:

View File

@ -153,12 +153,38 @@ def test_on_start(trigger_loader: TriggerLoader, mocker: MockerFixture) -> None:
""" """
upload_mock = mocker.patch("ahriman.core.upload.UploadTrigger.on_start") upload_mock = mocker.patch("ahriman.core.upload.UploadTrigger.on_start")
report_mock = mocker.patch("ahriman.core.report.ReportTrigger.on_start") report_mock = mocker.patch("ahriman.core.report.ReportTrigger.on_start")
atexit_mock = mocker.patch("atexit.register")
trigger_loader.on_start() trigger_loader.on_start()
assert trigger_loader._on_stop_requested
report_mock.assert_called_once_with() report_mock.assert_called_once_with()
upload_mock.assert_called_once_with() upload_mock.assert_called_once_with()
atexit_mock.assert_called_once_with(trigger_loader.on_stop)
def test_on_stop_with_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
"""
must call on_stop on exit if on_start was called
"""
mocker.patch("ahriman.core.upload.UploadTrigger.on_start")
mocker.patch("ahriman.core.report.ReportTrigger.on_start")
on_stop_mock = mocker.patch("ahriman.core.triggers.trigger_loader.TriggerLoader.on_stop")
_, repository_id = configuration.check_loaded()
trigger_loader = TriggerLoader.load(repository_id, configuration)
trigger_loader.on_start()
del trigger_loader
on_stop_mock.assert_called_once_with()
def test_on_stop_without_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
"""
must call not on_stop on exit if on_start wasn't called
"""
on_stop_mock = mocker.patch("ahriman.core.triggers.trigger_loader.TriggerLoader.on_stop")
_, repository_id = configuration.check_loaded()
trigger_loader = TriggerLoader.load(repository_id, configuration)
del trigger_loader
on_stop_mock.assert_not_called()
def test_on_stop(trigger_loader: TriggerLoader, mocker: MockerFixture) -> None: def test_on_stop(trigger_loader: TriggerLoader, mocker: MockerFixture) -> None:

View File

@ -140,6 +140,8 @@ dynamic_version = "{[project]name}.__version__"
extras = [ extras = [
{ replace = "ref", of = ["project", "extras"], extend = true }, { replace = "ref", of = ["project", "extras"], extend = true },
] ]
# TODO: steamline shlex usage after https://github.com/iterative/shtab/pull/192 merge
handle_redirect = true
pip_pre = true pip_pre = true
set_env.PYTHONPATH = "src" set_env.PYTHONPATH = "src"
set_env.SPHINX_APIDOC_OPTIONS = "members,no-undoc-members,show-inheritance" set_env.SPHINX_APIDOC_OPTIONS = "members,no-undoc-members,show-inheritance"
@ -147,14 +149,18 @@ commands = [
[ [
"shtab", "shtab",
{ replace = "ref", of = ["flags", "shtab"], extend = true }, { replace = "ref", of = ["flags", "shtab"], extend = true },
"--shell", "bash", "--shell",
"--output", "package/share/bash-completion/completions/_ahriman", "bash",
">",
"package/share/bash-completion/completions/_ahriman",
], ],
[ [
"shtab", "shtab",
{ replace = "ref", of = ["flags", "shtab"], extend = true }, { replace = "ref", of = ["flags", "shtab"], extend = true },
"--shell", "zsh", "--shell",
"--output", "package/share/zsh/site-functions/_ahriman", "zsh",
">",
"package/share/zsh/site-functions/_ahriman",
], ],
[ [
"argparse-manpage", "argparse-manpage",

View File

@ -18,9 +18,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
import importlib import importlib
import shlex
import sys import sys
from tox.config.sets import EnvConfigSet from tox.config.sets import EnvConfigSet
from tox.config.types import Command
from tox.plugin import impl from tox.plugin import impl
from tox.session.state import State from tox.session.state import State
from tox.tox_env.api import ToxEnv from tox.tox_env.api import ToxEnv
@ -54,6 +56,35 @@ def _extract_version(env_conf: EnvConfigSet, python_path: str | None = None) ->
return {"VERSION": version} return {"VERSION": version}
def _wrap_commands(env_conf: EnvConfigSet, shell: str = "bash") -> None:
"""
wrap commands into shell if there is redirect
Args:
env_conf(EnvConfigSet): the core configuration object
shell(str, optional): shell command to use (Default value = "bash")
"""
if not env_conf["handle_redirect"]:
return
# append shell just in case
env_conf["allowlist_externals"].append(shell)
for command in env_conf["commands"]:
if len(command.args) < 3: # command itself, redirect and output
continue
redirect, output = command.args[-2:]
if redirect not in (">", "2>", "&>"):
continue
command.args = [
shell,
"-c",
f"{Command(command.args[:-2]).shell} {redirect} {shlex.quote(output)}",
]
@impl @impl
def tox_add_env_config(env_conf: EnvConfigSet, state: State) -> None: def tox_add_env_config(env_conf: EnvConfigSet, state: State) -> None:
""" """
@ -72,6 +103,12 @@ def tox_add_env_config(env_conf: EnvConfigSet, state: State) -> None:
default="", default="",
desc="import path for the version variable", desc="import path for the version variable",
) )
env_conf.add_config(
keys=["handle_redirect"],
of_type=bool,
default=False,
desc="wrap commands to handle redirects if any",
)
@impl @impl
@ -87,3 +124,5 @@ def tox_before_run_commands(tox_env: ToxEnv) -> None:
python_path = set_env.load("PYTHONPATH") if "PYTHONPATH" in set_env else None python_path = set_env.load("PYTHONPATH") if "PYTHONPATH" in set_env else None
set_env.update(_extract_version(env_conf, python_path)) set_env.update(_extract_version(env_conf, python_path))
_wrap_commands(env_conf)