From aad607eaefc0f5a18ee919631ebd80db7f7fe6de Mon Sep 17 00:00:00 2001 From: Evgenii Alekseev Date: Wed, 3 Jan 2024 02:25:24 +0200 Subject: [PATCH] feat: add workers autodicsovery feature (#121) * add workers autodicsovery feature * suppress erros while retrieving worker list * update recipes * fix tests and update docs * filter health checks * ping based workers --- docs/ahriman.core.distributed.rst | 45 ++++++ docs/ahriman.core.rst | 1 + docs/ahriman.web.schemas.rst | 8 ++ docs/ahriman.web.views.v1.distributed.rst | 21 +++ docs/ahriman.web.views.v1.rst | 1 + docs/architecture.rst | 1 + docs/configuration.rst | 13 +- docs/faq.rst | 5 + docs/triggers.rst | 10 ++ package/share/ahriman/settings/ahriman.ini | 2 +- package/share/man/man1/ahriman.1 | 2 +- recipes/distributed/README.md | 2 +- recipes/distributed/compose.yml | 4 + recipes/distributed/service.ini | 2 +- recipes/distributed/worker.ini | 5 +- src/ahriman/application/handlers/web.py | 6 +- src/ahriman/core/distributed/__init__.py | 22 +++ .../core/distributed/distributed_system.py | 130 ++++++++++++++++++ .../core/distributed/worker_loader_trigger.py | 40 ++++++ .../core/distributed/worker_trigger.py | 70 ++++++++++ src/ahriman/core/distributed/workers_cache.py | 73 ++++++++++ src/ahriman/core/http/sync_http_client.py | 4 +- .../core/log/filtered_access_logger.py | 35 ++++- src/ahriman/core/status/web_client.py | 1 - src/ahriman/core/triggers/trigger_loader.py | 2 +- src/ahriman/models/worker.py | 12 ++ src/ahriman/web/keys.py | 3 + src/ahriman/web/schemas/__init__.py | 3 +- src/ahriman/web/schemas/worker_schema.py | 35 +++++ src/ahriman/web/views/base.py | 13 +- .../web/views/v1/distributed/__init__.py | 19 +++ .../web/views/v1/distributed/workers.py | 126 +++++++++++++++++ src/ahriman/web/web.py | 11 +- .../handlers/test_handler_validate.py | 1 + .../application/handlers/test_handler_web.py | 3 + tests/ahriman/core/distributed/conftest.py | 35 +++++ .../distributed/test_distributed_system.py | 82 +++++++++++ .../distributed/test_worker_loader_trigger.py | 47 +++++++ .../core/distributed/test_worker_trigger.py | 52 +++++++ .../core/distributed/test_workers_cache.py | 43 ++++++ .../core/log/test_filtered_access_logger.py | 38 +++++ tests/ahriman/core/status/test_web_client.py | 17 +-- tests/ahriman/models/test_worker.py | 14 ++ .../ahriman/web/schemas/test_worker_schema.py | 1 + tests/ahriman/web/views/test_view_base.py | 9 +- .../test_view_v1_distributed_workers.py | 80 +++++++++++ tests/testresources/core/ahriman.ini | 7 +- 47 files changed, 1126 insertions(+), 30 deletions(-) create mode 100644 docs/ahriman.core.distributed.rst create mode 100644 docs/ahriman.web.views.v1.distributed.rst create mode 100644 src/ahriman/core/distributed/__init__.py create mode 100644 src/ahriman/core/distributed/distributed_system.py create mode 100644 src/ahriman/core/distributed/worker_loader_trigger.py create mode 100644 src/ahriman/core/distributed/worker_trigger.py create mode 100644 src/ahriman/core/distributed/workers_cache.py create mode 100644 src/ahriman/web/schemas/worker_schema.py create mode 100644 src/ahriman/web/views/v1/distributed/__init__.py create mode 100644 src/ahriman/web/views/v1/distributed/workers.py create mode 100644 tests/ahriman/core/distributed/conftest.py create mode 100644 tests/ahriman/core/distributed/test_distributed_system.py create mode 100644 tests/ahriman/core/distributed/test_worker_loader_trigger.py create mode 100644 tests/ahriman/core/distributed/test_worker_trigger.py create mode 100644 tests/ahriman/core/distributed/test_workers_cache.py create mode 100644 tests/ahriman/web/schemas/test_worker_schema.py create mode 100644 tests/ahriman/web/views/v1/distributed/test_view_v1_distributed_workers.py diff --git a/docs/ahriman.core.distributed.rst b/docs/ahriman.core.distributed.rst new file mode 100644 index 00000000..b22cf82e --- /dev/null +++ b/docs/ahriman.core.distributed.rst @@ -0,0 +1,45 @@ +ahriman.core.distributed package +================================ + +Submodules +---------- + +ahriman.core.distributed.distributed\_system module +--------------------------------------------------- + +.. automodule:: ahriman.core.distributed.distributed_system + :members: + :no-undoc-members: + :show-inheritance: + +ahriman.core.distributed.worker\_loader\_trigger module +------------------------------------------------------- + +.. automodule:: ahriman.core.distributed.worker_loader_trigger + :members: + :no-undoc-members: + :show-inheritance: + +ahriman.core.distributed.worker\_trigger module +----------------------------------------------- + +.. automodule:: ahriman.core.distributed.worker_trigger + :members: + :no-undoc-members: + :show-inheritance: + +ahriman.core.distributed.workers\_cache module +---------------------------------------------- + +.. automodule:: ahriman.core.distributed.workers_cache + :members: + :no-undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: ahriman.core.distributed + :members: + :no-undoc-members: + :show-inheritance: diff --git a/docs/ahriman.core.rst b/docs/ahriman.core.rst index 8c3c99eb..31ca571f 100644 --- a/docs/ahriman.core.rst +++ b/docs/ahriman.core.rst @@ -12,6 +12,7 @@ Subpackages ahriman.core.build_tools ahriman.core.configuration ahriman.core.database + ahriman.core.distributed ahriman.core.formatters ahriman.core.gitremote ahriman.core.http diff --git a/docs/ahriman.web.schemas.rst b/docs/ahriman.web.schemas.rst index a3b9fcd6..ef02c15a 100644 --- a/docs/ahriman.web.schemas.rst +++ b/docs/ahriman.web.schemas.rst @@ -260,6 +260,14 @@ ahriman.web.schemas.versioned\_log\_schema module :no-undoc-members: :show-inheritance: +ahriman.web.schemas.worker\_schema module +----------------------------------------- + +.. automodule:: ahriman.web.schemas.worker_schema + :members: + :no-undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/docs/ahriman.web.views.v1.distributed.rst b/docs/ahriman.web.views.v1.distributed.rst new file mode 100644 index 00000000..eed37ee1 --- /dev/null +++ b/docs/ahriman.web.views.v1.distributed.rst @@ -0,0 +1,21 @@ +ahriman.web.views.v1.distributed package +======================================== + +Submodules +---------- + +ahriman.web.views.v1.distributed.workers module +----------------------------------------------- + +.. automodule:: ahriman.web.views.v1.distributed.workers + :members: + :no-undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: ahriman.web.views.v1.distributed + :members: + :no-undoc-members: + :show-inheritance: diff --git a/docs/ahriman.web.views.v1.rst b/docs/ahriman.web.views.v1.rst index 64b29734..27c53c84 100644 --- a/docs/ahriman.web.views.v1.rst +++ b/docs/ahriman.web.views.v1.rst @@ -7,6 +7,7 @@ Subpackages .. toctree:: :maxdepth: 4 + ahriman.web.views.v1.distributed ahriman.web.views.v1.service ahriman.web.views.v1.status ahriman.web.views.v1.user diff --git a/docs/architecture.rst b/docs/architecture.rst index cece384a..67d8af34 100644 --- a/docs/architecture.rst +++ b/docs/architecture.rst @@ -37,6 +37,7 @@ This package contains everything required for the most of application actions an * ``ahriman.core.build_tools`` is a package which provides wrapper for ``devtools`` commands. * ``ahriman.core.configuration`` contains extension for standard ``configparser`` library and some validation related classes. * ``ahriman.core.database`` is everything for database, including data and schema migrations. +* ``ahriman.core.distributed`` package with triggers and helpers for distributed build system. * ``ahriman.core.formatters`` package provides ``Printer`` sub-classes for printing data (e.g. package properties) to stdout which are used by some handlers. * ``ahriman.core.gitremote`` is a package with remote PKGBUILD triggers. Should not be called directly. * ``ahriman.core.http`` package provides HTTP clients which can be used later by other classes. diff --git a/docs/configuration.rst b/docs/configuration.rst index 67fbe86f..fbe99e66 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -86,7 +86,7 @@ Build related configuration. Group name can refer to architecture, e.g. ``build: * ``triggers`` - list of ``ahriman.core.triggers.Trigger`` class implementation (e.g. ``ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger``) which will be loaded and run at the end of processing, space separated list of strings, optional. You can also specify triggers by their paths, e.g. ``/usr/lib/python3.10/site-packages/ahriman/core/report/report.py.ReportTrigger``. Triggers are run in the order of definition. * ``triggers_known`` - optional list of ``ahriman.core.triggers.Trigger`` class implementations which are not run automatically and used only for trigger discovery and configuration validation. * ``vcs_allowed_age`` - maximal age in seconds of the VCS packages before their version will be updated with its remote source, integer, optional, default is 7 days. -* ``workers`` - list of worker nodes addresses used for build process, space separated list of strings, optional. Each worker address must be valid and reachable url, e.g. ``https://10.0.0.1:8080``. If none set, the build process will be run on the current node. +* ``workers`` - list of worker nodes addresses used for build process, space separated list of strings, optional. Each worker address must be valid and reachable url, e.g. ``https://10.0.0.1:8080``. If none set, the build process will be run on the current node. There is also special trigger which loads this value based on the list of the discovered nodes. ``repository`` group -------------------- @@ -351,4 +351,13 @@ Requires ``boto3`` library to be installed. Section name must be either ``s3`` ( * ``chunk_size`` - chunk size for calculating entity tags, integer, optional, default 8 * 1024 * 1024. * ``object_path`` - path prefix for stored objects, string, optional. If none set, the prefix as in repository tree will be used. * ``region`` - bucket region (e.g. ``eu-central-1``), string, required. -* ``secret_key`` - AWS secret access key, string, required. \ No newline at end of file +* ``secret_key`` - AWS secret access key, string, required. + +``worker`` group +---------------- + +This section controls settings for ``ahriman.core.distributed.WorkerTrigger`` plugin. + +* ``address`` - address of the instance, string, required. Must be reachable for the master instance. +* ``identifier`` - unique identifier of the instance, string, optional. +* ``time_to_live`` - amount of time which remote worker will be considered alive in seconds, integer, optional, default is ``60``. The ping interval will be set automatically equal this value divided by 4. diff --git a/docs/faq.rst b/docs/faq.rst index 3534e3b5..448138f9 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -1168,6 +1168,11 @@ Addition of new package, package removal, repository update In all scenarios, update process must be run only on ``master`` node. Unlike the manually distributed packages described above, automatic update must be enabled only for ``master`` node. +Automatic worker nodes discovery +"""""""""""""""""""""""""""""""" + +Instead of setting ``build.workers`` option it is also possible to configure services to load worker list dynamically. To do so, the ``ahriman.core.distributed.WorkerLoaderTrigger`` and ``ahriman.core.distributed.WorkerTrigger`` must be used for ``master`` and ``worker`` nodes repsectively. See recipes for more details. + Known limitations """"""""""""""""" diff --git a/docs/triggers.rst b/docs/triggers.rst index 9f0f3bf3..e1c3d814 100644 --- a/docs/triggers.rst +++ b/docs/triggers.rst @@ -14,6 +14,16 @@ Built-in triggers For the configuration details and settings explanation kindly refer to the :doc:`documentation `. +``ahriman.core.distributed.WorkerLoaderTrigger`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Special trigger to be used to load workers from database on the start of the application rather than configuration. If the option is already set, it will skip processing. + +``ahriman.core.distributed.WorkerTrigger`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Another trigger for the distributed system, which registers itself as remote worker, calling remote service periodically. + ``ahriman.core.gitremote.RemotePullTrigger`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/package/share/ahriman/settings/ahriman.ini b/package/share/ahriman/settings/ahriman.ini index 6bf2d950..a6fe74f9 100644 --- a/package/share/ahriman/settings/ahriman.ini +++ b/package/share/ahriman/settings/ahriman.ini @@ -24,7 +24,7 @@ ignore_packages = makechrootpkg_flags = makepkg_flags = --nocolor --ignorearch triggers = ahriman.core.gitremote.RemotePullTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.gitremote.RemotePushTrigger -triggers_known = ahriman.core.gitremote.RemotePullTrigger ahriman.core.gitremote.RemotePushTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.support.KeyringTrigger ahriman.core.support.MirrorlistTrigger +triggers_known = ahriman.core.distributed.WorkerLoaderTrigger ahriman.core.distributed.WorkerRegisterTrigger ahriman.core.distributed.WorkerTrigger ahriman.core.distributed.WorkerUnregisterTrigger ahriman.core.gitremote.RemotePullTrigger ahriman.core.gitremote.RemotePushTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.support.KeyringTrigger ahriman.core.support.MirrorlistTrigger vcs_allowed_age = 604800 [repository] diff --git a/package/share/man/man1/ahriman.1 b/package/share/man/man1/ahriman.1 index 50bb83fb..7d8a86b4 100644 --- a/package/share/man/man1/ahriman.1 +++ b/package/share/man/man1/ahriman.1 @@ -1,4 +1,4 @@ -.TH AHRIMAN "1" "2023\-12\-27" "ahriman" "Generated Python Manual" +.TH AHRIMAN "1" "2024\-01\-02" "ahriman" "Generated Python Manual" .SH NAME ahriman .SH SYNOPSIS diff --git a/recipes/distributed/README.md b/recipes/distributed/README.md index 52b3fd00..235d60b2 100644 --- a/recipes/distributed/README.md +++ b/recipes/distributed/README.md @@ -8,4 +8,4 @@ 6. All updates from worker instances are uploaded to the web service. 7. Repository is available at `http://localhost:8080/repo`. -Note, in this configuration, workers are spawned in replicated mode, thus the backend accesses them in round-robin-like manner. +In this example, worker list is automatically defined based on the addresses they reported. diff --git a/recipes/distributed/compose.yml b/recipes/distributed/compose.yml index db863ace..e510f023 100644 --- a/recipes/distributed/compose.yml +++ b/recipes/distributed/compose.yml @@ -77,6 +77,10 @@ services: interval: 10s start_period: 30s + depends_on: + backend: + condition: service_healthy + command: web configs: diff --git a/recipes/distributed/service.ini b/recipes/distributed/service.ini index 98685076..e03123f3 100644 --- a/recipes/distributed/service.ini +++ b/recipes/distributed/service.ini @@ -2,7 +2,7 @@ target = configuration [build] -workers = http://worker:8080 http://worker:8080 +triggers = ahriman.core.distributed.WorkerLoaderTrigger ahriman.core.gitremote.RemotePullTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.gitremote.RemotePushTrigger [status] username = demo diff --git a/recipes/distributed/worker.ini b/recipes/distributed/worker.ini index 653e1a4f..c04d5f6c 100644 --- a/recipes/distributed/worker.ini +++ b/recipes/distributed/worker.ini @@ -2,7 +2,7 @@ target = configuration [build] -triggers = ahriman.core.upload.UploadTrigger ahriman.core.report.ReportTrigger +triggers = ahriman.core.distributed.WorkerTrigger ahriman.core.upload.UploadTrigger ahriman.core.report.ReportTrigger [status] address = http://backend:8080 @@ -20,3 +20,6 @@ wait_timeout = 0 target = remote-service [remote-service] + +[worker] +address = http://$HOSTNAME:8080 diff --git a/src/ahriman/application/handlers/web.py b/src/ahriman/application/handlers/web.py index 7baaaf70..39098d55 100644 --- a/src/ahriman/application/handlers/web.py +++ b/src/ahriman/application/handlers/web.py @@ -24,6 +24,7 @@ from collections.abc import Generator from ahriman.application.handlers import Handler from ahriman.core.configuration import Configuration from ahriman.core.spawn import Spawn +from ahriman.core.triggers import TriggerLoader from ahriman.models.repository_id import RepositoryId @@ -53,13 +54,16 @@ class Web(Handler): spawner = Spawn(args.parser(), list(spawner_args)) spawner.start() + triggers = TriggerLoader.load(repository_id, configuration) + triggers.on_start() + dummy_args = argparse.Namespace( architecture=None, configuration=args.configuration, repository=None, repository_id=None, ) - repositories = cls.repositories_extract(dummy_args) + repositories = Web.repositories_extract(dummy_args) application = setup_server(configuration, spawner, repositories) run_server(application) diff --git a/src/ahriman/core/distributed/__init__.py b/src/ahriman/core/distributed/__init__.py new file mode 100644 index 00000000..15a7251e --- /dev/null +++ b/src/ahriman/core/distributed/__init__.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2021-2023 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +from ahriman.core.distributed.worker_loader_trigger import WorkerLoaderTrigger +from ahriman.core.distributed.worker_trigger import WorkerTrigger +from ahriman.core.distributed.workers_cache import WorkersCache diff --git a/src/ahriman/core/distributed/distributed_system.py b/src/ahriman/core/distributed/distributed_system.py new file mode 100644 index 00000000..a85a09ff --- /dev/null +++ b/src/ahriman/core/distributed/distributed_system.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2021-2023 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import contextlib + +from functools import cached_property + +from ahriman.core.configuration import Configuration +from ahriman.core.configuration.schema import ConfigurationSchema +from ahriman.core.status.web_client import WebClient +from ahriman.core.triggers import Trigger +from ahriman.models.repository_id import RepositoryId +from ahriman.models.worker import Worker + + +class DistributedSystem(Trigger, WebClient): + """ + simple class to (un)register itself as a distributed worker + """ + + CONFIGURATION_SCHEMA: ConfigurationSchema = { + "worker": { + "type": "dict", + "schema": { + "address": { + "type": "string", + "required": True, + "empty": False, + "is_url": [], + }, + "identifier": { + "type": "string", + "empty": False, + }, + "time_to_live": { + "type": "integer", + "coerce": "integer", + "min": 0, + }, + }, + }, + } + + def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None: + """ + default constructor + + Args: + repository_id(RepositoryId): repository unique identifier + configuration(Configuration): configuration instance + """ + Trigger.__init__(self, repository_id, configuration) + WebClient.__init__(self, repository_id, configuration) + + @cached_property + def worker(self) -> Worker: + """ + load and set worker. Lazy property loaded because it is not always required + + Returns: + Worker: unique self worker identifier + """ + section = next(iter(self.configuration_sections(self.configuration))) + + address = self.configuration.get(section, "address") + identifier = self.configuration.get(section, "identifier", fallback="") + return Worker(address, identifier=identifier) + + @classmethod + def configuration_sections(cls, configuration: Configuration) -> list[str]: + """ + extract configuration sections from configuration + + Args: + configuration(Configuration): configuration instance + + Returns: + list[str]: read configuration sections belong to this trigger + """ + return list(cls.CONFIGURATION_SCHEMA.keys()) + + def _workers_url(self) -> str: + """ + workers url generator + + Returns: + str: full url of web service for workers + """ + return f"{self.address}/api/v1/distributed" + + def register(self) -> None: + """ + register itself in remote system + """ + with contextlib.suppress(Exception): + self.make_request("POST", self._workers_url(), json=self.worker.view()) + + def workers(self) -> list[Worker]: + """ + retrieve list of available remote workers + + Returns: + list[Worker]: currently registered workers + """ + with contextlib.suppress(Exception): + response = self.make_request("GET", self._workers_url()) + response_json = response.json() + + return [ + Worker(worker["address"], identifier=worker["identifier"]) + for worker in response_json + ] + + return [] diff --git a/src/ahriman/core/distributed/worker_loader_trigger.py b/src/ahriman/core/distributed/worker_loader_trigger.py new file mode 100644 index 00000000..cd30095c --- /dev/null +++ b/src/ahriman/core/distributed/worker_loader_trigger.py @@ -0,0 +1,40 @@ +# +# Copyright (c) 2021-2023 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +from ahriman.core.distributed.distributed_system import DistributedSystem + + +class WorkerLoaderTrigger(DistributedSystem): + """ + remote worker processor trigger (server side) + """ + + def on_start(self) -> None: + """ + trigger action which will be called at the start of the application + """ + if self.configuration.has_option("build", "workers"): + return # there is manually set option + + workers = [worker.address for worker in self.workers()] + if not workers: + return + + self.logger.info("load workers %s", workers) + self.configuration.set_option("build", "workers", " ".join(workers)) diff --git a/src/ahriman/core/distributed/worker_trigger.py b/src/ahriman/core/distributed/worker_trigger.py new file mode 100644 index 00000000..8f0898e2 --- /dev/null +++ b/src/ahriman/core/distributed/worker_trigger.py @@ -0,0 +1,70 @@ +# +# Copyright (c) 2021-2023 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +from threading import Timer + +from ahriman.core.configuration import Configuration +from ahriman.core.distributed.distributed_system import DistributedSystem +from ahriman.models.repository_id import RepositoryId + + +class WorkerTrigger(DistributedSystem): + """ + remote worker processor trigger (client side) + + Attributes: + ping_interval(float): interval to call remote service in seconds, defined as ``worker.time_to_live / 4`` + timer(Timer): timer object + """ + + def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None: + """ + default constructor + + Args: + repository_id(RepositoryId): repository unique identifier + configuration(Configuration): configuration instance + """ + DistributedSystem.__init__(self, repository_id, configuration) + + section = next(iter(self.configuration_sections(configuration))) + self.ping_interval = configuration.getint(section, "time_to_live", fallback=60) / 4.0 + self.timer = Timer(self.ping_interval, self.ping) + + def on_start(self) -> None: + """ + trigger action which will be called at the start of the application + """ + self.logger.info("registering instance %s at %s", self.worker, self.address) + self.timer.start() + + def on_stop(self) -> None: + """ + trigger action which will be called before the stop of the application + """ + self.logger.info("removing instance %s at %s", self.worker, self.address) + self.timer.cancel() + + def ping(self) -> None: + """ + register itself as alive worker and update the timer + """ + self.register() + self.timer = Timer(self.ping_interval, self.ping) + self.timer.start() diff --git a/src/ahriman/core/distributed/workers_cache.py b/src/ahriman/core/distributed/workers_cache.py new file mode 100644 index 00000000..7a981fea --- /dev/null +++ b/src/ahriman/core/distributed/workers_cache.py @@ -0,0 +1,73 @@ +# +# Copyright (c) 2021-2023 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import time + +from ahriman.core.configuration import Configuration +from ahriman.core.log import LazyLogging +from ahriman.models.worker import Worker + + +class WorkersCache(LazyLogging): + """ + cached storage for healthy workers + + Attributes: + time_to_live(int): maximal amount of time in seconds to keep worker alive + """ + + def __init__(self, configuration: Configuration) -> None: + """ + default constructor + + Args: + configuration(Configuration): configuration instance + """ + self.time_to_live = configuration.getint("worker", "time_to_live", fallback=60) + self._workers: dict[str, tuple[Worker, float]] = {} + + @property + def workers(self) -> list[Worker]: + """ + extract currently healthy workers + + Returns: + list[Worker]: list of currently registered workers which have been seen not earlier than :attr:`time_to_live` + """ + valid_from = time.monotonic() - self.time_to_live + return [ + worker + for worker, last_seen in self._workers.values() + if last_seen > valid_from + ] + + def workers_remove(self) -> None: + """ + remove all workers from the cache + """ + self._workers = {} + + def workers_update(self, worker: Worker) -> None: + """ + register or update remote worker + + Args: + worker(Worker): worker to register + """ + self._workers[worker.identifier] = (worker, time.monotonic()) diff --git a/src/ahriman/core/http/sync_http_client.py b/src/ahriman/core/http/sync_http_client.py index 6b670c31..5f919e49 100644 --- a/src/ahriman/core/http/sync_http_client.py +++ b/src/ahriman/core/http/sync_http_client.py @@ -47,8 +47,8 @@ class SyncHttpClient(LazyLogging): default constructor Args: - configuration(Configuration | None): configuration instance (Default value = None) - section(str, optional): settings section name (Default value = None) + configuration(Configuration | None, optional): configuration instance (Default value = None) + section(str | None, optional): settings section name (Default value = None) suppress_errors(bool, optional): suppress logging of request errors (Default value = False) """ if configuration is None: diff --git a/src/ahriman/core/log/filtered_access_logger.py b/src/ahriman/core/log/filtered_access_logger.py index 77bf3224..57c76111 100644 --- a/src/ahriman/core/log/filtered_access_logger.py +++ b/src/ahriman/core/log/filtered_access_logger.py @@ -27,13 +27,44 @@ class FilteredAccessLogger(AccessLogger): access logger implementation with log filter enabled Attributes: + DISTRIBUTED_PATH_REGEX(str): (class attribute) regex used for distributed system uri + HEALTH_PATH_REGEX(re.Pattern): (class attribute) regex for health check endpoint LOG_PATH_REGEX(re.Pattern): (class attribute) regex for logs uri + PROCESS_PATH_REGEX(re.Pattern): (class attribute) regex for process uri """ + DISTRIBUTED_PATH_REGEX = "/api/v1/distributed" + HEALTH_PATH_REGEX = "/api/v1/info" LOG_PATH_REGEX = re.compile(r"^/api/v1/packages/[^/]+/logs$") # technically process id is uuid, but we might change it later PROCESS_PATH_REGEX = re.compile(r"^/api/v1/service/process/[^/]+$") + @staticmethod + def is_distributed_post(request: BaseRequest) -> bool: + """ + check if the request is for distributed services ping + + Args: + request(BaseRequest): http reqeust descriptor + + Returns: + bool: True in case if request is distributed service ping endpoint and False otherwise + """ + return request.method == "POST" and FilteredAccessLogger.DISTRIBUTED_PATH_REGEX == request.path + + @staticmethod + def is_info_get(request: BaseRequest) -> bool: + """ + check if the request is for health check + + Args: + request(BaseRequest): http reqeust descriptor + + Returns: + bool: True in case if request is health check and false otherwise + """ + return request.method == "GET" and FilteredAccessLogger.HEALTH_PATH_REGEX == request.path + @staticmethod def is_logs_post(request: BaseRequest) -> bool: """ @@ -69,7 +100,9 @@ class FilteredAccessLogger(AccessLogger): response(StreamResponse): streaming response object time(float): log record timestamp """ - if self.is_logs_post(request) \ + if self.is_distributed_post(request) \ + or self.is_info_get(request) \ + or self.is_logs_post(request) \ or self.is_process_get(request): return AccessLogger.log(self, request, response, time) diff --git a/src/ahriman/core/status/web_client.py b/src/ahriman/core/status/web_client.py index e32214ba..a56e6cde 100644 --- a/src/ahriman/core/status/web_client.py +++ b/src/ahriman/core/status/web_client.py @@ -118,7 +118,6 @@ class WebClient(Client, SyncAhrimanClient): Returns: str: full url of web service for specific package base """ - # in case if unix socket is used we need to normalize url suffix = f"/{package_base}" if package_base else "" return f"{self.address}/api/v1/packages{suffix}" diff --git a/src/ahriman/core/triggers/trigger_loader.py b/src/ahriman/core/triggers/trigger_loader.py index bece4270..89d47932 100644 --- a/src/ahriman/core/triggers/trigger_loader.py +++ b/src/ahriman/core/triggers/trigger_loader.py @@ -262,6 +262,6 @@ class TriggerLoader(LazyLogging): run triggers before the application exit """ self.logger.debug("executing triggers on stop") - for trigger in self.triggers: + for trigger in reversed(self.triggers): with self.__execute_trigger(trigger): trigger.on_stop() diff --git a/src/ahriman/models/worker.py b/src/ahriman/models/worker.py index 72215ff1..3eaf3ff9 100644 --- a/src/ahriman/models/worker.py +++ b/src/ahriman/models/worker.py @@ -18,8 +18,11 @@ # along with this program. If not, see . # from dataclasses import dataclass, field +from typing import Any from urllib.parse import urlparse +from ahriman.core.util import dataclass_view + @dataclass(frozen=True) class Worker: @@ -39,3 +42,12 @@ class Worker: update identifier based on settings """ object.__setattr__(self, "identifier", self.identifier or urlparse(self.address).netloc) + + def view(self) -> dict[str, Any]: + """ + generate json patch view + + Returns: + dict[str, Any]: json-friendly dictionary + """ + return dataclass_view(self) diff --git a/src/ahriman/web/keys.py b/src/ahriman/web/keys.py index 04e5a323..06402568 100644 --- a/src/ahriman/web/keys.py +++ b/src/ahriman/web/keys.py @@ -21,6 +21,7 @@ from aiohttp.web import AppKey from ahriman.core.auth import Auth from ahriman.core.configuration import Configuration +from ahriman.core.distributed import WorkersCache from ahriman.core.spawn import Spawn from ahriman.core.status.watcher import Watcher from ahriman.models.repository_id import RepositoryId @@ -31,6 +32,7 @@ __all__ = [ "ConfigurationKey", "SpawnKey", "WatcherKey", + "WorkersKey", ] @@ -38,3 +40,4 @@ AuthKey = AppKey("validator", Auth) ConfigurationKey = AppKey("configuration", Configuration) SpawnKey = AppKey("spawn", Spawn) WatcherKey = AppKey("watcher", dict[RepositoryId, Watcher]) +WorkersKey = AppKey("workers", WorkersCache) diff --git a/src/ahriman/web/schemas/__init__.py b/src/ahriman/web/schemas/__init__.py index 998d6ee6..ebae2233 100644 --- a/src/ahriman/web/schemas/__init__.py +++ b/src/ahriman/web/schemas/__init__.py @@ -47,5 +47,6 @@ from ahriman.web.schemas.remote_schema import RemoteSchema from ahriman.web.schemas.repository_id_schema import RepositoryIdSchema from ahriman.web.schemas.search_schema import SearchSchema from ahriman.web.schemas.status_schema import StatusSchema -from ahriman.web.schemas.versioned_log_schema import VersionedLogSchema from ahriman.web.schemas.update_flags_schema import UpdateFlagsSchema +from ahriman.web.schemas.versioned_log_schema import VersionedLogSchema +from ahriman.web.schemas.worker_schema import WorkerSchema diff --git a/src/ahriman/web/schemas/worker_schema.py b/src/ahriman/web/schemas/worker_schema.py new file mode 100644 index 00000000..3f4c31ac --- /dev/null +++ b/src/ahriman/web/schemas/worker_schema.py @@ -0,0 +1,35 @@ +# +# Copyright (c) 2021-2023 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +from marshmallow import Schema, fields + + +class WorkerSchema(Schema): + """ + request and response schema for workers + """ + + address = fields.String(required=True, metadata={ + "description": "Worker address", + "example": "http://localhost:8081", + }) + identifier = fields.String(required=True, metadata={ + "description": "Worker unique identifier", + "example": "42f03a62-48f7-46b7-af40-dacc720e92fa", + }) diff --git a/src/ahriman/web/views/base.py b/src/ahriman/web/views/base.py index 87905731..72b733a9 100644 --- a/src/ahriman/web/views/base.py +++ b/src/ahriman/web/views/base.py @@ -24,12 +24,13 @@ from typing import TypeVar from ahriman.core.auth import Auth from ahriman.core.configuration import Configuration +from ahriman.core.distributed import WorkersCache from ahriman.core.sign.gpg import GPG from ahriman.core.spawn import Spawn from ahriman.core.status.watcher import Watcher from ahriman.models.repository_id import RepositoryId from ahriman.models.user_access import UserAccess -from ahriman.web.keys import AuthKey, ConfigurationKey, SpawnKey, WatcherKey +from ahriman.web.keys import AuthKey, ConfigurationKey, SpawnKey, WatcherKey, WorkersKey T = TypeVar("T", str, list[str]) @@ -97,6 +98,16 @@ class BaseView(View, CorsViewMixin): """ return self.request.app[AuthKey] + @property + def workers(self) -> WorkersCache: + """ + get workers cache instance + + Returns: + WorkersCache: workers service + """ + return self.request.app[WorkersKey] + @classmethod async def get_permission(cls, request: Request) -> UserAccess: """ diff --git a/src/ahriman/web/views/v1/distributed/__init__.py b/src/ahriman/web/views/v1/distributed/__init__.py new file mode 100644 index 00000000..8fc622e9 --- /dev/null +++ b/src/ahriman/web/views/v1/distributed/__init__.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2021-2023 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# diff --git a/src/ahriman/web/views/v1/distributed/workers.py b/src/ahriman/web/views/v1/distributed/workers.py new file mode 100644 index 00000000..a5171db2 --- /dev/null +++ b/src/ahriman/web/views/v1/distributed/workers.py @@ -0,0 +1,126 @@ +# +# Copyright (c) 2021-2023 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import aiohttp_apispec # type: ignore[import-untyped] + +from collections.abc import Callable +from aiohttp.web import HTTPBadRequest, HTTPNoContent, Response, json_response + +from ahriman.models.user_access import UserAccess +from ahriman.models.worker import Worker +from ahriman.web.schemas import AuthSchema, ErrorSchema, WorkerSchema +from ahriman.web.views.base import BaseView + + +class WorkersView(BaseView): + """ + distributed workers view + + Attributes: + DELETE_PERMISSION(UserAccess): (class attribute) delete permissions of self + GET_PERMISSION(UserAccess): (class attribute) get permissions of self + POST_PERMISSION(UserAccess): (class attribute) post permissions of self + """ + + DELETE_PERMISSION = GET_PERMISSION = POST_PERMISSION = UserAccess.Full + ROUTES = ["/api/v1/distributed"] + + @aiohttp_apispec.docs( + tags=["Distributed"], + summary="Unregister all workers", + description="Unregister and remove all known workers from the service", + responses={ + 204: {"description": "Success response"}, + 401: {"description": "Authorization required", "schema": ErrorSchema}, + 403: {"description": "Access is forbidden", "schema": ErrorSchema}, + 500: {"description": "Internal server error", "schema": ErrorSchema}, + }, + security=[{"token": [DELETE_PERMISSION]}], + ) + @aiohttp_apispec.cookies_schema(AuthSchema) + async def delete(self) -> None: + """ + unregister worker + + Raises: + HTTPNoContent: on success response + """ + self.workers.workers_remove() + + raise HTTPNoContent + + @aiohttp_apispec.docs( + tags=["Distributed"], + summary="Get workers", + description="Retrieve registered workers", + responses={ + 200: {"description": "Success response", "schema": WorkerSchema(many=True)}, + 401: {"description": "Authorization required", "schema": ErrorSchema}, + 403: {"description": "Access is forbidden", "schema": ErrorSchema}, + 500: {"description": "Internal server error", "schema": ErrorSchema}, + }, + security=[{"token": [GET_PERMISSION]}], + ) + @aiohttp_apispec.cookies_schema(AuthSchema) + async def get(self) -> Response: + """ + get workers list + + Returns: + Response: 200 with workers list on success + """ + workers = self.workers.workers + + comparator: Callable[[Worker], str] = lambda item: item.identifier + response = [worker.view() for worker in sorted(workers, key=comparator)] + + return json_response(response) + + @aiohttp_apispec.docs( + tags=["Distributed"], + summary="Register worker", + description="Register or update remote worker", + responses={ + 204: {"description": "Success response"}, + 400: {"description": "Bad data is supplied", "schema": ErrorSchema}, + 401: {"description": "Authorization required", "schema": ErrorSchema}, + 403: {"description": "Access is forbidden", "schema": ErrorSchema}, + 500: {"description": "Internal server error", "schema": ErrorSchema}, + }, + security=[{"token": [POST_PERMISSION]}], + ) + @aiohttp_apispec.cookies_schema(AuthSchema) + @aiohttp_apispec.json_schema(WorkerSchema) + async def post(self) -> None: + """ + register remote worker + + Raises: + HTTPBadRequest: if bad data is supplied + HTTPNoContent: in case of success response + """ + try: + data = await self.request.json() + worker = Worker(data["address"], identifier=data["identifier"]) + except Exception as ex: + raise HTTPBadRequest(reason=str(ex)) + + self.workers.workers_update(worker) + + raise HTTPNoContent diff --git a/src/ahriman/web/web.py b/src/ahriman/web/web.py index 64a48f44..c4809730 100644 --- a/src/ahriman/web/web.py +++ b/src/ahriman/web/web.py @@ -27,6 +27,7 @@ from aiohttp.web import Application, normalize_path_middleware, run_app from ahriman.core.auth import Auth from ahriman.core.configuration import Configuration from ahriman.core.database import SQLite +from ahriman.core.distributed import WorkersCache from ahriman.core.exceptions import InitializeError from ahriman.core.log.filtered_access_logger import FilteredAccessLogger from ahriman.core.spawn import Spawn @@ -34,7 +35,7 @@ from ahriman.core.status.watcher import Watcher from ahriman.models.repository_id import RepositoryId from ahriman.web.apispec import setup_apispec from ahriman.web.cors import setup_cors -from ahriman.web.keys import AuthKey, ConfigurationKey, SpawnKey, WatcherKey +from ahriman.web.keys import AuthKey, ConfigurationKey, SpawnKey, WatcherKey, WorkersKey from ahriman.web.middlewares.exception_handler import exception_handler from ahriman.web.routes import setup_routes @@ -159,7 +160,8 @@ def setup_server(configuration: Configuration, spawner: Spawn, repositories: lis application.logger.info("setup configuration") application[ConfigurationKey] = configuration - application.logger.info("setup watchers") + application.logger.info("setup services") + # package cache if not repositories: raise InitializeError("No repositories configured, exiting") database = SQLite.load(configuration) @@ -168,8 +170,9 @@ def setup_server(configuration: Configuration, spawner: Spawn, repositories: lis application.logger.info("load repository %s", repository_id) watchers[repository_id] = Watcher(repository_id, database) application[WatcherKey] = watchers - - application.logger.info("setup process spawner") + # workers cache + application[WorkersKey] = WorkersCache(configuration) + # process spawner application[SpawnKey] = spawner application.logger.info("setup authorization") diff --git a/tests/ahriman/application/handlers/test_handler_validate.py b/tests/ahriman/application/handlers/test_handler_validate.py index 73af3fed..2f4a4e2f 100644 --- a/tests/ahriman/application/handlers/test_handler_validate.py +++ b/tests/ahriman/application/handlers/test_handler_validate.py @@ -79,6 +79,7 @@ def test_schema(configuration: Configuration) -> None: assert schema.pop("s3") assert schema.pop("telegram") assert schema.pop("upload") + assert schema.pop("worker") assert schema == CONFIGURATION_SCHEMA diff --git a/tests/ahriman/application/handlers/test_handler_web.py b/tests/ahriman/application/handlers/test_handler_web.py index 79376ee6..fe2861c2 100644 --- a/tests/ahriman/application/handlers/test_handler_web.py +++ b/tests/ahriman/application/handlers/test_handler_web.py @@ -39,6 +39,7 @@ def test_run(args: argparse.Namespace, configuration: Configuration, repository: setup_mock = mocker.patch("ahriman.web.web.setup_server") run_mock = mocker.patch("ahriman.web.web.run_server") start_mock = mocker.patch("ahriman.core.spawn.Spawn.start") + trigger_mock = mocker.patch("ahriman.core.triggers.TriggerLoader.load") stop_mock = mocker.patch("ahriman.core.spawn.Spawn.stop") join_mock = mocker.patch("ahriman.core.spawn.Spawn.join") _, repository_id = configuration.check_loaded() @@ -48,6 +49,8 @@ def test_run(args: argparse.Namespace, configuration: Configuration, repository: setup_mock.assert_called_once_with(configuration, pytest.helpers.anyvar(int), [repository_id]) run_mock.assert_called_once_with(pytest.helpers.anyvar(int)) start_mock.assert_called_once_with() + trigger_mock.assert_called_once_with(repository_id, configuration) + trigger_mock().on_start.assert_called_once_with() stop_mock.assert_called_once_with() join_mock.assert_called_once_with() diff --git a/tests/ahriman/core/distributed/conftest.py b/tests/ahriman/core/distributed/conftest.py new file mode 100644 index 00000000..d40a7476 --- /dev/null +++ b/tests/ahriman/core/distributed/conftest.py @@ -0,0 +1,35 @@ +import pytest + +from ahriman.core.configuration import Configuration +from ahriman.core.distributed import WorkersCache +from ahriman.core.distributed.distributed_system import DistributedSystem + + +@pytest.fixture +def distributed_system(configuration: Configuration) -> DistributedSystem: + """ + distributed system fixture + + Args: + configuration(Configuration): configuration fixture + + Returns: + DistributedSystem: distributed system test instance + """ + configuration.set_option("status", "address", "http://localhost:8081") + _, repository_id = configuration.check_loaded() + return DistributedSystem(repository_id, configuration) + + +@pytest.fixture +def workers_cache(configuration: Configuration) -> WorkersCache: + """ + workers cache fixture + + Args: + configuration(Configuration): configuration fixture + + Returns: + WorkersCache: workers cache test instance + """ + return WorkersCache(configuration) diff --git a/tests/ahriman/core/distributed/test_distributed_system.py b/tests/ahriman/core/distributed/test_distributed_system.py new file mode 100644 index 00000000..8965b0f5 --- /dev/null +++ b/tests/ahriman/core/distributed/test_distributed_system.py @@ -0,0 +1,82 @@ +import json +import requests + +from pytest_mock import MockerFixture + +from ahriman.core.configuration import Configuration +from ahriman.core.distributed.distributed_system import DistributedSystem +from ahriman.models.worker import Worker + + +def test_configuration_sections(configuration: Configuration) -> None: + """ + must correctly parse target list + """ + assert DistributedSystem.configuration_sections(configuration) == ["worker"] + + +def test_workers_url(distributed_system: DistributedSystem) -> None: + """ + must generate workers url correctly + """ + assert distributed_system._workers_url().startswith(distributed_system.address) + assert distributed_system._workers_url().endswith("/api/v1/distributed") + + +def test_register(distributed_system: DistributedSystem, mocker: MockerFixture) -> None: + """ + must register service + """ + run_mock = mocker.patch("ahriman.core.distributed.distributed_system.DistributedSystem.make_request") + distributed_system.register() + run_mock.assert_called_once_with("POST", f"{distributed_system.address}/api/v1/distributed", + json=distributed_system.worker.view()) + + +def test_register_failed(distributed_system: DistributedSystem, mocker: MockerFixture) -> None: + """ + must suppress any exception happened during worker registration + """ + mocker.patch("requests.Session.request", side_effect=Exception()) + distributed_system.register() + + +def test_register_failed_http_error(distributed_system: DistributedSystem, mocker: MockerFixture) -> None: + """ + must suppress HTTP exception happened during worker registration + """ + mocker.patch("requests.Session.request", side_effect=requests.HTTPError()) + distributed_system.register() + + +def test_workers(distributed_system: DistributedSystem, mocker: MockerFixture) -> None: + """ + must return available remote workers + """ + worker = Worker("remote") + response_obj = requests.Response() + response_obj._content = json.dumps([worker.view()]).encode("utf8") + response_obj.status_code = 200 + + requests_mock = mocker.patch("ahriman.core.status.web_client.WebClient.make_request", + return_value=response_obj) + + result = distributed_system.workers() + requests_mock.assert_called_once_with("GET", distributed_system._workers_url()) + assert result == [worker] + + +def test_workers_failed(distributed_system: DistributedSystem, mocker: MockerFixture) -> None: + """ + must suppress any exception happened during worker extraction + """ + mocker.patch("requests.Session.request", side_effect=Exception()) + distributed_system.workers() + + +def test_workers_failed_http_error(distributed_system: DistributedSystem, mocker: MockerFixture) -> None: + """ + must suppress HTTP exception happened during worker extraction + """ + mocker.patch("requests.Session.request", side_effect=requests.HTTPError()) + distributed_system.workers() diff --git a/tests/ahriman/core/distributed/test_worker_loader_trigger.py b/tests/ahriman/core/distributed/test_worker_loader_trigger.py new file mode 100644 index 00000000..b5b261a9 --- /dev/null +++ b/tests/ahriman/core/distributed/test_worker_loader_trigger.py @@ -0,0 +1,47 @@ +from pytest_mock import MockerFixture + +from ahriman.core.configuration import Configuration +from ahriman.core.distributed import WorkerLoaderTrigger +from ahriman.models.worker import Worker + + +def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None: + """ + must load workers from remote + """ + worker = Worker("address") + configuration.set_option("status", "address", "http://localhost:8081") + run_mock = mocker.patch("ahriman.core.distributed.WorkerLoaderTrigger.workers", return_value=[worker]) + _, repository_id = configuration.check_loaded() + + trigger = WorkerLoaderTrigger(repository_id, configuration) + trigger.on_start() + run_mock.assert_called_once_with() + assert configuration.getlist("build", "workers") == [worker.address] + + +def test_on_start_skip(configuration: Configuration, mocker: MockerFixture) -> None: + """ + must skip loading if option is already set + """ + configuration.set_option("status", "address", "http://localhost:8081") + configuration.set_option("build", "workers", "address") + run_mock = mocker.patch("ahriman.core.distributed.WorkerLoaderTrigger.workers") + _, repository_id = configuration.check_loaded() + + trigger = WorkerLoaderTrigger(repository_id, configuration) + trigger.on_start() + run_mock.assert_not_called() + + +def test_on_start_empty_list(configuration: Configuration, mocker: MockerFixture) -> None: + """ + must do not set anything if workers are not available + """ + configuration.set_option("status", "address", "http://localhost:8081") + mocker.patch("ahriman.core.distributed.WorkerLoaderTrigger.workers", return_value=[]) + _, repository_id = configuration.check_loaded() + + trigger = WorkerLoaderTrigger(repository_id, configuration) + trigger.on_start() + assert not configuration.has_option("build", "workers") diff --git a/tests/ahriman/core/distributed/test_worker_trigger.py b/tests/ahriman/core/distributed/test_worker_trigger.py new file mode 100644 index 00000000..b0eb4418 --- /dev/null +++ b/tests/ahriman/core/distributed/test_worker_trigger.py @@ -0,0 +1,52 @@ +from pytest_mock import MockerFixture + +from ahriman.core.configuration import Configuration +from ahriman.core.distributed import WorkerTrigger + + +def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None: + """ + must register itself as worker + """ + configuration.set_option("status", "address", "http://localhost:8081") + run_mock = mocker.patch("threading.Timer.start") + _, repository_id = configuration.check_loaded() + + WorkerTrigger(repository_id, configuration).on_start() + run_mock.assert_called_once_with() + + +def test_on_stop(configuration: Configuration, mocker: MockerFixture) -> None: + """ + must unregister itself as worker + """ + configuration.set_option("status", "address", "http://localhost:8081") + run_mock = mocker.patch("threading.Timer.cancel") + _, repository_id = configuration.check_loaded() + + WorkerTrigger(repository_id, configuration).on_stop() + run_mock.assert_called_once_with() + + +def test_on_stop_empty_timer(configuration: Configuration) -> None: + """ + must do not fail if no timer was started + """ + configuration.set_option("status", "address", "http://localhost:8081") + _, repository_id = configuration.check_loaded() + + WorkerTrigger(repository_id, configuration).on_stop() + + +def test_ping(configuration: Configuration, mocker: MockerFixture) -> None: + """ + must correctly process timer action + """ + configuration.set_option("status", "address", "http://localhost:8081") + run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register") + timer_mock = mocker.patch("threading.Timer.start") + _, repository_id = configuration.check_loaded() + + WorkerTrigger(repository_id, configuration).ping() + run_mock.assert_called_once_with() + timer_mock.assert_called_once_with() diff --git a/tests/ahriman/core/distributed/test_workers_cache.py b/tests/ahriman/core/distributed/test_workers_cache.py new file mode 100644 index 00000000..77b8ea78 --- /dev/null +++ b/tests/ahriman/core/distributed/test_workers_cache.py @@ -0,0 +1,43 @@ +import time + +from ahriman.core.distributed import WorkersCache +from ahriman.models.worker import Worker + + +def test_workers(workers_cache: WorkersCache) -> None: + """ + must return alive workers + """ + workers_cache._workers = { + str(index): (Worker(f"address{index}"), index) + for index in range(2) + } + workers_cache.time_to_live = time.monotonic() + + assert workers_cache.workers == [Worker("address1")] + + +def test_workers_remove(workers_cache: WorkersCache) -> None: + """ + must remove all workers + """ + workers_cache.workers_update(Worker("address")) + assert workers_cache.workers + + workers_cache.workers_remove() + assert not workers_cache.workers + + +def test_workers_update(workers_cache: WorkersCache) -> None: + """ + must update worker + """ + worker = Worker("address") + + workers_cache.workers_update(worker) + assert workers_cache.workers == [worker] + _, first_last_seen = workers_cache._workers[worker.identifier] + + workers_cache.workers_update(worker) + _, second_last_seen = workers_cache._workers[worker.identifier] + assert first_last_seen < second_last_seen diff --git a/tests/ahriman/core/log/test_filtered_access_logger.py b/tests/ahriman/core/log/test_filtered_access_logger.py index d532485c..f2b397e8 100644 --- a/tests/ahriman/core/log/test_filtered_access_logger.py +++ b/tests/ahriman/core/log/test_filtered_access_logger.py @@ -4,6 +4,44 @@ from unittest.mock import MagicMock from ahriman.core.log.filtered_access_logger import FilteredAccessLogger +def test_is_distributed_post() -> None: + """ + must correctly define distributed services ping request + """ + request = MagicMock() + + request.method = "POST" + request.path = "/api/v1/distributed" + assert FilteredAccessLogger.is_distributed_post(request) + + request.method = "GET" + request.path = "/api/v1/distributed" + assert not FilteredAccessLogger.is_distributed_post(request) + + request.method = "POST" + request.path = "/api/v1/distributed/path" + assert not FilteredAccessLogger.is_distributed_post(request) + + +def test_is_info_get() -> None: + """ + must correctly define health check request + """ + request = MagicMock() + + request.method = "GET" + request.path = "/api/v1/info" + assert FilteredAccessLogger.is_info_get(request) + + request.method = "POST" + request.path = "/api/v1/info" + assert not FilteredAccessLogger.is_info_get(request) + + request.method = "GET" + request.path = "/api/v1/infos" + assert not FilteredAccessLogger.is_info_get(request) + + def test_is_logs_post() -> None: """ must correctly define if request belongs to logs posting diff --git a/tests/ahriman/core/status/test_web_client.py b/tests/ahriman/core/status/test_web_client.py index 47eaaa16..2ec25c38 100644 --- a/tests/ahriman/core/status/test_web_client.py +++ b/tests/ahriman/core/status/test_web_client.py @@ -12,6 +12,7 @@ from ahriman.models.changes import Changes from ahriman.models.internal_status import InternalStatus from ahriman.models.log_record_id import LogRecordId from ahriman.models.package import Package +from ahriman.models.worker import Worker def test_parse_address(configuration: Configuration) -> None: @@ -32,14 +33,6 @@ def test_parse_address(configuration: Configuration) -> None: assert WebClient.parse_address(configuration) == ("status", "http://localhost:8082") -def test_status_url(web_client: WebClient) -> None: - """ - must generate package status url correctly - """ - assert web_client._status_url().startswith(web_client.address) - assert web_client._status_url().endswith("/api/v1/status") - - def test_changes_url(web_client: WebClient, package_ahriman: Package) -> None: """ must generate changes url correctly @@ -67,6 +60,14 @@ def test_package_url(web_client: WebClient, package_ahriman: Package) -> None: assert web_client._package_url(package_ahriman.base).endswith(f"/api/v1/packages/{package_ahriman.base}") +def test_status_url(web_client: WebClient) -> None: + """ + must generate package status url correctly + """ + assert web_client._status_url().startswith(web_client.address) + assert web_client._status_url().endswith("/api/v1/status") + + def test_package_add(web_client: WebClient, package_ahriman: Package, mocker: MockerFixture) -> None: """ must process package addition diff --git a/tests/ahriman/models/test_worker.py b/tests/ahriman/models/test_worker.py index b6e5e72e..1200bf50 100644 --- a/tests/ahriman/models/test_worker.py +++ b/tests/ahriman/models/test_worker.py @@ -8,3 +8,17 @@ def test_post_init() -> None: assert Worker("http://localhost:8080").identifier == "localhost:8080" assert Worker("remote").identifier == "" # not a valid url assert Worker("remote", identifier="id").identifier == "id" + + +def test_view() -> None: + """ + must generate json view + """ + worker = Worker("address") + assert worker.view() == {"address": worker.address, "identifier": worker.identifier} + + worker = Worker("http://localhost:8080") + assert worker.view() == {"address": worker.address, "identifier": worker.identifier} + + worker = Worker("http://localhost:8080", identifier="abc") + assert worker.view() == {"address": worker.address, "identifier": worker.identifier} diff --git a/tests/ahriman/web/schemas/test_worker_schema.py b/tests/ahriman/web/schemas/test_worker_schema.py new file mode 100644 index 00000000..1982fb6b --- /dev/null +++ b/tests/ahriman/web/schemas/test_worker_schema.py @@ -0,0 +1 @@ +# schema testing goes in view class tests diff --git a/tests/ahriman/web/views/test_view_base.py b/tests/ahriman/web/views/test_view_base.py index fce32fbc..acf691c6 100644 --- a/tests/ahriman/web/views/test_view_base.py +++ b/tests/ahriman/web/views/test_view_base.py @@ -50,11 +50,18 @@ def test_spawn(base: BaseView) -> None: def test_validator(base: BaseView) -> None: """ - must return service + must return validator service """ assert base.validator +def test_workers(base: BaseView) -> None: + """ + must return worker service + """ + assert base.workers + + async def test_get_permission(base: BaseView) -> None: """ must search for permission attribute in class diff --git a/tests/ahriman/web/views/v1/distributed/test_view_v1_distributed_workers.py b/tests/ahriman/web/views/v1/distributed/test_view_v1_distributed_workers.py new file mode 100644 index 00000000..5365f212 --- /dev/null +++ b/tests/ahriman/web/views/v1/distributed/test_view_v1_distributed_workers.py @@ -0,0 +1,80 @@ +import pytest + +from aiohttp.test_utils import TestClient + +from ahriman.models.user_access import UserAccess +from ahriman.models.worker import Worker +from ahriman.web.views.v1.distributed.workers import WorkersView + + +async def test_get_permission() -> None: + """ + must return correct permission for the request + """ + for method in ("DELETE", "GET", "POST"): + request = pytest.helpers.request("", "", method) + assert await WorkersView.get_permission(request) == UserAccess.Full + + +def test_routes() -> None: + """ + must return correct routes + """ + assert WorkersView.ROUTES == ["/api/v1/distributed"] + + +async def test_delete(client: TestClient) -> None: + """ + must delete all workers + """ + await client.post("/api/v1/distributed", json={"address": "address1", "identifier": "1"}) + await client.post("/api/v1/distributed", json={"address": "address2", "identifier": "2"}) + + response = await client.delete("/api/v1/distributed") + assert response.status == 204 + + response = await client.get("/api/v1/distributed") + json = await response.json() + assert not json + + +async def test_get(client: TestClient) -> None: + """ + must return all workers + """ + await client.post("/api/v1/distributed", json={"address": "address1", "identifier": "1"}) + await client.post("/api/v1/distributed", json={"address": "address2", "identifier": "2"}) + response_schema = pytest.helpers.schema_response(WorkersView.get) + + response = await client.get("/api/v1/distributed") + assert response.ok + json = await response.json() + assert not response_schema.validate(json, many=True) + + workers = [Worker(item["address"], identifier=item["identifier"]) for item in json] + assert workers == [Worker("address1", identifier="1"), Worker("address2", identifier="2")] + + +async def test_post(client: TestClient) -> None: + """ + must update worker + """ + worker = Worker("address1", identifier="1") + request_schema = pytest.helpers.schema_request(WorkersView.post) + + payload = worker.view() + assert not request_schema.validate(payload) + + response = await client.post("/api/v1/distributed", json=payload) + assert response.status == 204 + + +async def test_post_exception(client: TestClient) -> None: + """ + must raise exception on invalid payload + """ + response_schema = pytest.helpers.schema_response(WorkersView.post, code=400) + + response = await client.post("/api/v1/distributed", json={}) + assert response.status == 400 + assert not response_schema.validate(await response.json()) diff --git a/tests/testresources/core/ahriman.ini b/tests/testresources/core/ahriman.ini index 21aba0e9..3685f3ca 100644 --- a/tests/testresources/core/ahriman.ini +++ b/tests/testresources/core/ahriman.ini @@ -25,7 +25,7 @@ ignore_packages = makechrootpkg_flags = makepkg_flags = --skippgpcheck triggers = ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger -triggers_known = ahriman.core.gitremote.RemotePullTrigger ahriman.core.gitremote.RemotePushTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.support.KeyringTrigger ahriman.core.support.MirrorlistTrigger +triggers_known = ahriman.core.distributed.WorkerLoaderTrigger ahriman.core.distributed.WorkerRegisterTrigger ahriman.core.distributed.WorkerTrigger ahriman.core.distributed.WorkerUnregisterTrigger ahriman.core.gitremote.RemotePullTrigger ahriman.core.gitremote.RemotePushTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.support.KeyringTrigger ahriman.core.support.MirrorlistTrigger [repository] name = aur-clone @@ -115,4 +115,7 @@ username = arcan1s enable_archive_upload = yes host = 127.0.0.1 static_path = ../web/templates/static -templates = ../web/templates \ No newline at end of file +templates = ../web/templates + +[worker] +address = http://localhost:8081