Compare commits

..

4 Commits

13 changed files with 379 additions and 93 deletions

View File

@ -1,5 +1,6 @@
[build] [build]
; List of well-known triggers. Used only for configuration purposes. ; List of well-known triggers. Used only for configuration purposes.
triggers_known[] = ahriman.core.archive.ArchiveTrigger
triggers_known[] = ahriman.core.distributed.WorkerLoaderTrigger triggers_known[] = ahriman.core.distributed.WorkerLoaderTrigger
triggers_known[] = ahriman.core.distributed.WorkerTrigger triggers_known[] = ahriman.core.distributed.WorkerTrigger
triggers_known[] = ahriman.core.support.KeyringTrigger triggers_known[] = ahriman.core.support.KeyringTrigger

View File

@ -66,7 +66,7 @@ class Status(Handler):
Status.check_status(args.exit_code, packages) Status.check_status(args.exit_code, packages)
comparator: Callable[[tuple[Package, BuildStatus]], Comparable] = lambda item: item[0].base comparator: Callable[[tuple[Package, BuildStatus]], Comparable] = lambda item: item[0].base
filter_fn: Callable[[tuple[Package, BuildStatus]], bool] =\ filter_fn: Callable[[tuple[Package, BuildStatus]], bool] = \
lambda item: args.status is None or item[1].status == args.status lambda item: args.status is None or item[1].status == args.status
for package, package_status in sorted(filter(filter_fn, packages), key=comparator): for package, package_status in sorted(filter(filter_fn, packages), key=comparator):
PackagePrinter(package, package_status)(verbose=args.info) PackagePrinter(package, package_status)(verbose=args.info)

View File

@ -88,22 +88,24 @@ class Repo(LazyLogging):
check_output("repo-add", *self.sign_args, str(self.repo_path), check_output("repo-add", *self.sign_args, str(self.repo_path),
cwd=self.root, logger=self.logger, user=self.uid) cwd=self.root, logger=self.logger, user=self.uid)
def remove(self, package: str, filename: Path) -> None: def remove(self, package_name: str | None, filename: Path) -> None:
""" """
remove package from repository remove package from repository
Args: Args:
package(str): package name to remove package_name(str | None): package name to remove. If none set, it will be guessed from filename
filename(Path): package filename to remove filename(Path): package filename to remove
""" """
package_name = package_name or filename.name.rsplit("-", maxsplit=3)[0]
# remove package and signature (if any) from filesystem # remove package and signature (if any) from filesystem
for full_path in self.root.glob(f"**/{filename.name}*"): for full_path in self.root.glob(f"**/{filename.name}*"):
full_path.unlink() full_path.unlink()
# remove package from registry # remove package from registry
check_output( check_output(
"repo-remove", *self.sign_args, str(self.repo_path), package, "repo-remove", *self.sign_args, str(self.repo_path), package_name,
exception=BuildError.from_process(package), exception=BuildError.from_process(package_name),
cwd=self.root, cwd=self.root,
logger=self.logger, logger=self.logger,
user=self.uid, user=self.uid,

View File

@ -0,0 +1,20 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.core.archive.archive_trigger import ArchiveTrigger

View File

@ -0,0 +1,127 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import datetime
from pathlib import Path
from ahriman.core.alpm.repo import Repo
from ahriman.core.log import LazyLogging
from ahriman.core.utils import utcnow
from ahriman.models.package import Package
from ahriman.models.repository_paths import RepositoryPaths
class ArchiveTree(LazyLogging):
"""
wrapper around archive tree
Attributes:
paths(RepositoryPaths): repository paths instance
repository_id(RepositoryId): repository unique identifier
sign_args(list[str]): additional args which have to be used to sign repository archive
"""
def __init__(self, repository_path: RepositoryPaths, sign_args: list[str]) -> None:
"""
Args:
repository_path(RepositoryPaths): repository paths instance
sign_args(list[str]): additional args which have to be used to sign repository archive
"""
self.paths = repository_path
self.repository_id = repository_path.repository_id
self.sign_args = sign_args
def repository_for(self, date: datetime.date | None = None) -> Path:
"""
get full path to repository at the specified date
Args:
date(datetime.date | None, optional): date to generate path. If none supplied then today will be used
Returns:
Path: path to the repository root
"""
date = date or utcnow().today()
return (
self.paths.archive
/ "repos"
/ date.strftime("%Y")
/ date.strftime("%m")
/ date.strftime("%d")
/ self.repository_id.name
/ self.repository_id.architecture
)
def symlinks_create(self, packages: list[Package]) -> None:
"""
create symlinks for the specified packages in today's repository
Args:
packages(list[Package]): list of packages to be updated
"""
root = self.repository_for()
repo = Repo(self.repository_id.name, self.paths, self.sign_args, root)
for package in packages:
archive = self.paths.archive_for(package.base)
for package_name, single in package.packages.items():
if single.filename is None:
self.logger.warning("received empty package filename for %s", package_name)
continue
has_file = False
for file in archive.glob(f"{single.filename}*"):
if not (symlink := root / file.name).exists():
has_file |= True
symlink.symlink_to(file.relative_to(symlink.parent, walk_up=True))
if has_file:
repo.add(root / single.filename)
def symlinks_fix(self) -> None:
"""
remove broken symlinks across all repositories
"""
for root, _, files in self.paths.archive.walk():
*_, name, architecture = root.parts
if self.repository_id.name != name or self.repository_id.architecture != architecture:
continue # we only process same name repositories
for file in files:
path = root / file
if not path.is_symlink():
continue # find symlinks only
if path.exists():
continue # filter out not broken symlinks
repo = Repo(self.repository_id.name, self.paths, self.sign_args, root)
repo.remove(None, path)
def tree_create(self) -> None:
"""
create repository tree for current repository
"""
path = self.repository_for()
if path.exists():
return
with self.paths.preserve_owner(self.paths.archive):
path.mkdir(0o755, parents=True)

View File

@ -0,0 +1,71 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.core import context
from ahriman.core.archive.archive_tree import ArchiveTree
from ahriman.core.configuration import Configuration
from ahriman.core.sign.gpg import GPG
from ahriman.core.triggers import Trigger
from ahriman.models.package import Package
from ahriman.models.repository_id import RepositoryId
from ahriman.models.result import Result
class ArchiveTrigger(Trigger):
"""
archive repository extension
Attributes:
paths(RepositoryPaths): repository paths instance
"""
def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None:
"""
Args:
repository_id(RepositoryId): repository unique identifier
configuration(Configuration): configuration instance
"""
Trigger.__init__(self, repository_id, configuration)
self.paths = configuration.repository_paths
ctx = context.get()
self.tree = ArchiveTree(self.paths, ctx.get(GPG).repository_sign_args)
def on_result(self, result: Result, packages: list[Package]) -> None:
"""
run trigger
Args:
result(Result): build result
packages(list[Package]): list of all available packages
"""
self.tree.symlinks_create(packages)
def on_start(self) -> None:
"""
trigger action which will be called at the start of the application
"""
self.tree.tree_create()
def on_stop(self) -> None:
"""
trigger action which will be called before the stop of the application
"""
self.tree.symlinks_fix()

View File

@ -52,16 +52,24 @@ class Executor(PackageInfo, Cleaner):
Path: list of built packages and signatures if available, empty list otherwise Path: list of built packages and signatures if available, empty list otherwise
""" """
archive = self.paths.archive_for(package.base) archive = self.paths.archive_for(package.base)
for path in filter(package_like, archive.iterdir()):
built = Package.from_archive(path, self.pacman) # find all packages which have same version
# check if there is an archive with exact same version same_version = [
if built.version != package.version: built
continue for path in filter(package_like, archive.iterdir())
for single in built.packages.values(): if (built := Package.from_archive(path, self.pacman)).version == package.version
# we allow packages with either same architecture or any ]
if single.architecture not in ("any", self.architecture): # no packages of the same version found
continue if not same_version:
yield from archive.glob(f"{single.filename}*") return
packages = [single for built in same_version for single in built.packages.values()]
# all packages must be either any or same architecture
if not all(single.architecture in ("any", self.architecture) for single in packages):
return
for single in packages:
yield from archive.glob(f"{single.filename}*")
def _archive_rename(self, description: PackageDescription, package_base: str) -> None: def _archive_rename(self, description: PackageDescription, package_base: str) -> None:
""" """
@ -72,7 +80,7 @@ class Executor(PackageInfo, Cleaner):
package_base(str): package base name package_base(str): package base name
""" """
if description.filename is None: if description.filename is None:
self.logger.warning("received empty package name for base %s", package_base) self.logger.warning("received empty package filename for base %s", package_base)
return # suppress type checking, it never can be none actually return # suppress type checking, it never can be none actually
if (safe := safe_filename(description.filename)) != description.filename: if (safe := safe_filename(description.filename)) != description.filename:
@ -101,6 +109,7 @@ class Executor(PackageInfo, Cleaner):
loaded_package = Package.from_build(path, self.architecture, None) loaded_package = Package.from_build(path, self.architecture, None)
if prebuilt := list(self._archive_lookup(loaded_package)): if prebuilt := list(self._archive_lookup(loaded_package)):
self.logger.info("using prebuilt packages for %s-%s", loaded_package.base, loaded_package.version)
built = [] built = []
for artefact in prebuilt: for artefact in prebuilt:
with filelock(artefact): with filelock(artefact):
@ -152,7 +161,7 @@ class Executor(PackageInfo, Cleaner):
packager_key(str | None): packager key identifier packager_key(str | None): packager key identifier
""" """
if filename is None: if filename is None:
self.logger.warning("received empty package name for base %s", package_base) self.logger.warning("received empty package filename for base %s", package_base)
return # suppress type checking, it never can be none actually return # suppress type checking, it never can be none actually
# in theory, it might be NOT packages directory, but we suppose it is # in theory, it might be NOT packages directory, but we suppose it is

View File

@ -17,6 +17,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
import atexit
import contextlib import contextlib
import os import os
@ -60,17 +61,8 @@ class TriggerLoader(LazyLogging):
def __init__(self) -> None: def __init__(self) -> None:
"""""" """"""
self._on_stop_requested = False
self.triggers: list[Trigger] = [] self.triggers: list[Trigger] = []
def __del__(self) -> None:
"""
custom destructor object which calls on_stop in case if it was requested
"""
if not self._on_stop_requested:
return
self.on_stop()
@classmethod @classmethod
def load(cls, repository_id: RepositoryId, configuration: Configuration) -> Self: def load(cls, repository_id: RepositoryId, configuration: Configuration) -> Self:
""" """
@ -250,10 +242,11 @@ class TriggerLoader(LazyLogging):
run triggers on load run triggers on load
""" """
self.logger.debug("executing triggers on start") self.logger.debug("executing triggers on start")
self._on_stop_requested = True
for trigger in self.triggers: for trigger in self.triggers:
with self.__execute_trigger(trigger): with self.__execute_trigger(trigger):
trigger.on_start() trigger.on_start()
# register on_stop call
atexit.register(self.on_stop)
def on_stop(self) -> None: def on_stop(self) -> None:
""" """

View File

@ -37,6 +37,7 @@ SUBPACKAGES = {
"ahriman-triggers": [ "ahriman-triggers": [
prefix / "share" / "ahriman" / "settings" / "ahriman.ini.d" / "00-triggers.ini", prefix / "share" / "ahriman" / "settings" / "ahriman.ini.d" / "00-triggers.ini",
site_packages / "ahriman" / "application" / "handlers" / "triggers_support.py", site_packages / "ahriman" / "application" / "handlers" / "triggers_support.py",
site_packages / "ahriman" / "core" / "archive",
site_packages / "ahriman" / "core" / "distributed", site_packages / "ahriman" / "core" / "distributed",
site_packages / "ahriman" / "core" / "support", site_packages / "ahriman" / "core" / "support",
], ],

View File

@ -79,7 +79,7 @@ def test_run_repo_specific_triggers(args: argparse.Namespace, configuration: Con
_, repository_id = configuration.check_loaded() _, repository_id = configuration.check_loaded()
# remove unused sections # remove unused sections
for section in ("customs3", "github:x86_64", "logs-rotation", "mirrorlist"): for section in ("archive", "customs3", "github:x86_64", "logs-rotation", "mirrorlist"):
configuration.remove_section(section) configuration.remove_section(section)
configuration.set_option("report", "target", "test") configuration.set_option("report", "target", "test")

View File

@ -1,5 +1,6 @@
import pytest import pytest
from dataclasses import replace
from pathlib import Path from pathlib import Path
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from typing import Any from typing import Any
@ -13,6 +14,57 @@ from ahriman.models.packagers import Packagers
from ahriman.models.user import User from ahriman.models.user import User
def test_archive_lookup(executor: Executor, package_ahriman: Package, package_python_schedule: Package,
mocker: MockerFixture) -> None:
"""
must existing packages which match the version
"""
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
Path("2.pkg.tar.zst"),
Path("3.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", side_effect=[
package_ahriman,
package_python_schedule,
replace(package_ahriman, version="1"),
])
glob_mock = mocker.patch("pathlib.Path.glob", return_value=[Path("1.pkg.tar.xz")])
assert list(executor._archive_lookup(package_ahriman)) == [Path("1.pkg.tar.xz")]
glob_mock.assert_called_once_with(f"{package_ahriman.packages[package_ahriman.base].filename}*")
def test_archive_lookup_version_mismatch(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must return nothing if no packages found with the same version
"""
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", return_value=replace(package_ahriman, version="1"))
assert list(executor._archive_lookup(package_ahriman)) == []
def test_archive_lookup_architecture_mismatch(executor: Executor, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must return nothing if architecture doesn't match
"""
package_ahriman.packages[package_ahriman.base].architecture = "x86_64"
mocker.patch("ahriman.core.repository.executor.Executor.architecture", return_value="i686")
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.preserve_owner")
mocker.patch("pathlib.Path.iterdir", return_value=[
Path("1.pkg.tar.zst"),
])
mocker.patch("ahriman.models.package.Package.from_archive", return_value=package_ahriman)
assert list(executor._archive_lookup(package_ahriman)) == []
def test_archive_rename(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None: def test_archive_rename(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must correctly remove package archive must correctly remove package archive
@ -45,16 +97,39 @@ def test_package_build(executor: Executor, package_ahriman: Package, mocker: Moc
mocker.patch("ahriman.core.build_tools.task.Task.build", return_value=[Path(package_ahriman.base)]) mocker.patch("ahriman.core.build_tools.task.Task.build", return_value=[Path(package_ahriman.base)])
status_client_mock = mocker.patch("ahriman.core.status.Client.set_building") status_client_mock = mocker.patch("ahriman.core.status.Client.set_building")
init_mock = mocker.patch("ahriman.core.build_tools.task.Task.init", return_value="sha") init_mock = mocker.patch("ahriman.core.build_tools.task.Task.init", return_value="sha")
package_mock = mocker.patch("ahriman.models.package.Package.from_build", return_value=package_ahriman)
lookup_mock = mocker.patch("ahriman.core.repository.executor.Executor._archive_lookup", return_value=[])
with_packages_mock = mocker.patch("ahriman.models.package.Package.with_packages") with_packages_mock = mocker.patch("ahriman.models.package.Package.with_packages")
rename_mock = mocker.patch("ahriman.core.repository.executor.atomic_move") rename_mock = mocker.patch("ahriman.core.repository.executor.atomic_move")
assert executor._package_build(package_ahriman, Path("local"), "packager", None) == "sha" assert executor._package_build(package_ahriman, Path("local"), "packager", None) == "sha"
status_client_mock.assert_called_once_with(package_ahriman.base) status_client_mock.assert_called_once_with(package_ahriman.base)
init_mock.assert_called_once_with(pytest.helpers.anyvar(int), pytest.helpers.anyvar(int), None) init_mock.assert_called_once_with(pytest.helpers.anyvar(int), pytest.helpers.anyvar(int), None)
package_mock.assert_called_once_with(Path("local"), executor.architecture, None)
lookup_mock.assert_called_once_with(package_ahriman)
with_packages_mock.assert_called_once_with([Path(package_ahriman.base)], executor.pacman) with_packages_mock.assert_called_once_with([Path(package_ahriman.base)], executor.pacman)
rename_mock.assert_called_once_with(Path(package_ahriman.base), executor.paths.packages / package_ahriman.base) rename_mock.assert_called_once_with(Path(package_ahriman.base), executor.paths.packages / package_ahriman.base)
def test_package_build_copy(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must copy package from archive if there are already built ones
"""
path = package_ahriman.packages[package_ahriman.base].filepath
mocker.patch("ahriman.core.build_tools.task.Task.build", return_value=[Path(package_ahriman.base)])
mocker.patch("ahriman.core.build_tools.task.Task.init")
mocker.patch("ahriman.models.package.Package.from_build", return_value=package_ahriman)
mocker.patch("ahriman.core.repository.executor.Executor._archive_lookup", return_value=[path])
mocker.patch("ahriman.core.repository.executor.atomic_move")
mocker.patch("ahriman.models.package.Package.with_packages")
copy_mock = mocker.patch("shutil.copy")
rename_mock = mocker.patch("ahriman.core.repository.executor.atomic_move")
executor._package_build(package_ahriman, Path("local"), "packager", None)
copy_mock.assert_called_once_with(path, Path("local"))
rename_mock.assert_called_once_with(Path("local") / path, executor.paths.packages / path)
def test_package_remove(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None: def test_package_remove(executor: Executor, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must run remove for packages must run remove for packages

View File

@ -17,6 +17,7 @@ from ahriman.core.utils import (
dataclass_view, dataclass_view,
enum_values, enum_values,
extract_user, extract_user,
filelock,
filter_json, filter_json,
full_version, full_version,
minmax, minmax,
@ -43,47 +44,12 @@ def test_atomic_move(mocker: MockerFixture) -> None:
""" """
must move file with locking must move file with locking
""" """
lock_mock = mocker.patch("fcntl.flock") filelock_mock = mocker.patch("ahriman.core.utils.filelock")
open_mock = mocker.patch("pathlib.Path.open", autospec=True)
move_mock = mocker.patch("shutil.move") move_mock = mocker.patch("shutil.move")
unlink_mock = mocker.patch("pathlib.Path.unlink")
atomic_move(Path("source"), Path("destination")) atomic_move(Path("source"), Path("destination"))
open_mock.assert_called_once_with(Path(".destination"), "ab") filelock_mock.assert_called_once_with(Path("destination"))
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
move_mock.assert_called_once_with(Path("source"), Path("destination")) move_mock.assert_called_once_with(Path("source"), Path("destination"))
unlink_mock.assert_called_once_with(missing_ok=True)
def test_atomic_move_remove_lock(mocker: MockerFixture) -> None:
"""
must remove lock file in case of exception
"""
mocker.patch("pathlib.Path.open", side_effect=Exception)
unlink_mock = mocker.patch("pathlib.Path.unlink")
with pytest.raises(Exception):
atomic_move(Path("source"), Path("destination"))
unlink_mock.assert_called_once_with(missing_ok=True)
def test_atomic_move_unlock(mocker: MockerFixture) -> None:
"""
must unlock file in case of exception
"""
mocker.patch("pathlib.Path.open")
mocker.patch("shutil.move", side_effect=Exception)
lock_mock = mocker.patch("fcntl.flock")
with pytest.raises(Exception):
atomic_move(Path("source"), Path("destination"))
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
def test_check_output(mocker: MockerFixture) -> None: def test_check_output(mocker: MockerFixture) -> None:
@ -305,6 +271,53 @@ def test_extract_user() -> None:
assert extract_user() == "doas" assert extract_user() == "doas"
def test_filelock(mocker: MockerFixture) -> None:
"""
must perform file locking
"""
lock_mock = mocker.patch("fcntl.flock")
open_mock = mocker.patch("pathlib.Path.open", autospec=True)
unlink_mock = mocker.patch("pathlib.Path.unlink")
with filelock(Path("local")):
pass
open_mock.assert_called_once_with(Path(".local"), "ab")
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
unlink_mock.assert_called_once_with(missing_ok=True)
def test_filelock_remove_lock(mocker: MockerFixture) -> None:
"""
must remove lock file in case of exception
"""
mocker.patch("pathlib.Path.open", side_effect=Exception)
unlink_mock = mocker.patch("pathlib.Path.unlink")
with pytest.raises(Exception):
with filelock(Path("local")):
pass
unlink_mock.assert_called_once_with(missing_ok=True)
def test_filelock_unlock(mocker: MockerFixture) -> None:
"""
must unlock file in case of exception
"""
mocker.patch("pathlib.Path.open")
lock_mock = mocker.patch("fcntl.flock")
with pytest.raises(Exception):
with filelock(Path("local")):
raise Exception
lock_mock.assert_has_calls([
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_EX),
MockCall(pytest.helpers.anyvar(int), fcntl.LOCK_UN),
])
def test_filter_json(package_ahriman: Package) -> None: def test_filter_json(package_ahriman: Package) -> None:
""" """
must filter fields by known list must filter fields by known list

View File

@ -153,38 +153,12 @@ def test_on_start(trigger_loader: TriggerLoader, mocker: MockerFixture) -> None:
""" """
upload_mock = mocker.patch("ahriman.core.upload.UploadTrigger.on_start") upload_mock = mocker.patch("ahriman.core.upload.UploadTrigger.on_start")
report_mock = mocker.patch("ahriman.core.report.ReportTrigger.on_start") report_mock = mocker.patch("ahriman.core.report.ReportTrigger.on_start")
atexit_mock = mocker.patch("atexit.register")
trigger_loader.on_start() trigger_loader.on_start()
assert trigger_loader._on_stop_requested
report_mock.assert_called_once_with() report_mock.assert_called_once_with()
upload_mock.assert_called_once_with() upload_mock.assert_called_once_with()
atexit_mock.assert_called_once_with(trigger_loader.on_stop)
def test_on_stop_with_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
"""
must call on_stop on exit if on_start was called
"""
mocker.patch("ahriman.core.upload.UploadTrigger.on_start")
mocker.patch("ahriman.core.report.ReportTrigger.on_start")
on_stop_mock = mocker.patch("ahriman.core.triggers.trigger_loader.TriggerLoader.on_stop")
_, repository_id = configuration.check_loaded()
trigger_loader = TriggerLoader.load(repository_id, configuration)
trigger_loader.on_start()
del trigger_loader
on_stop_mock.assert_called_once_with()
def test_on_stop_without_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
"""
must call not on_stop on exit if on_start wasn't called
"""
on_stop_mock = mocker.patch("ahriman.core.triggers.trigger_loader.TriggerLoader.on_stop")
_, repository_id = configuration.check_loaded()
trigger_loader = TriggerLoader.load(repository_id, configuration)
del trigger_loader
on_stop_mock.assert_not_called()
def test_on_stop(trigger_loader: TriggerLoader, mocker: MockerFixture) -> None: def test_on_stop(trigger_loader: TriggerLoader, mocker: MockerFixture) -> None: