mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-04-24 07:17:17 +00:00
feat: add workers autodicsovery feature (#121)
* add workers autodicsovery feature * suppress erros while retrieving worker list * update recipes * fix tests and update docs * filter health checks * ping based workers
This commit is contained in:
parent
3347212bf2
commit
aad607eaef
45
docs/ahriman.core.distributed.rst
Normal file
45
docs/ahriman.core.distributed.rst
Normal file
@ -0,0 +1,45 @@
|
||||
ahriman.core.distributed package
|
||||
================================
|
||||
|
||||
Submodules
|
||||
----------
|
||||
|
||||
ahriman.core.distributed.distributed\_system module
|
||||
---------------------------------------------------
|
||||
|
||||
.. automodule:: ahriman.core.distributed.distributed_system
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.core.distributed.worker\_loader\_trigger module
|
||||
-------------------------------------------------------
|
||||
|
||||
.. automodule:: ahriman.core.distributed.worker_loader_trigger
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.core.distributed.worker\_trigger module
|
||||
-----------------------------------------------
|
||||
|
||||
.. automodule:: ahriman.core.distributed.worker_trigger
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.core.distributed.workers\_cache module
|
||||
----------------------------------------------
|
||||
|
||||
.. automodule:: ahriman.core.distributed.workers_cache
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
Module contents
|
||||
---------------
|
||||
|
||||
.. automodule:: ahriman.core.distributed
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
@ -12,6 +12,7 @@ Subpackages
|
||||
ahriman.core.build_tools
|
||||
ahriman.core.configuration
|
||||
ahriman.core.database
|
||||
ahriman.core.distributed
|
||||
ahriman.core.formatters
|
||||
ahriman.core.gitremote
|
||||
ahriman.core.http
|
||||
|
@ -260,6 +260,14 @@ ahriman.web.schemas.versioned\_log\_schema module
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.web.schemas.worker\_schema module
|
||||
-----------------------------------------
|
||||
|
||||
.. automodule:: ahriman.web.schemas.worker_schema
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
Module contents
|
||||
---------------
|
||||
|
||||
|
21
docs/ahriman.web.views.v1.distributed.rst
Normal file
21
docs/ahriman.web.views.v1.distributed.rst
Normal file
@ -0,0 +1,21 @@
|
||||
ahriman.web.views.v1.distributed package
|
||||
========================================
|
||||
|
||||
Submodules
|
||||
----------
|
||||
|
||||
ahriman.web.views.v1.distributed.workers module
|
||||
-----------------------------------------------
|
||||
|
||||
.. automodule:: ahriman.web.views.v1.distributed.workers
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
Module contents
|
||||
---------------
|
||||
|
||||
.. automodule:: ahriman.web.views.v1.distributed
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
@ -7,6 +7,7 @@ Subpackages
|
||||
.. toctree::
|
||||
:maxdepth: 4
|
||||
|
||||
ahriman.web.views.v1.distributed
|
||||
ahriman.web.views.v1.service
|
||||
ahriman.web.views.v1.status
|
||||
ahriman.web.views.v1.user
|
||||
|
@ -37,6 +37,7 @@ This package contains everything required for the most of application actions an
|
||||
* ``ahriman.core.build_tools`` is a package which provides wrapper for ``devtools`` commands.
|
||||
* ``ahriman.core.configuration`` contains extension for standard ``configparser`` library and some validation related classes.
|
||||
* ``ahriman.core.database`` is everything for database, including data and schema migrations.
|
||||
* ``ahriman.core.distributed`` package with triggers and helpers for distributed build system.
|
||||
* ``ahriman.core.formatters`` package provides ``Printer`` sub-classes for printing data (e.g. package properties) to stdout which are used by some handlers.
|
||||
* ``ahriman.core.gitremote`` is a package with remote PKGBUILD triggers. Should not be called directly.
|
||||
* ``ahriman.core.http`` package provides HTTP clients which can be used later by other classes.
|
||||
|
@ -86,7 +86,7 @@ Build related configuration. Group name can refer to architecture, e.g. ``build:
|
||||
* ``triggers`` - list of ``ahriman.core.triggers.Trigger`` class implementation (e.g. ``ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger``) which will be loaded and run at the end of processing, space separated list of strings, optional. You can also specify triggers by their paths, e.g. ``/usr/lib/python3.10/site-packages/ahriman/core/report/report.py.ReportTrigger``. Triggers are run in the order of definition.
|
||||
* ``triggers_known`` - optional list of ``ahriman.core.triggers.Trigger`` class implementations which are not run automatically and used only for trigger discovery and configuration validation.
|
||||
* ``vcs_allowed_age`` - maximal age in seconds of the VCS packages before their version will be updated with its remote source, integer, optional, default is 7 days.
|
||||
* ``workers`` - list of worker nodes addresses used for build process, space separated list of strings, optional. Each worker address must be valid and reachable url, e.g. ``https://10.0.0.1:8080``. If none set, the build process will be run on the current node.
|
||||
* ``workers`` - list of worker nodes addresses used for build process, space separated list of strings, optional. Each worker address must be valid and reachable url, e.g. ``https://10.0.0.1:8080``. If none set, the build process will be run on the current node. There is also special trigger which loads this value based on the list of the discovered nodes.
|
||||
|
||||
``repository`` group
|
||||
--------------------
|
||||
@ -352,3 +352,12 @@ Requires ``boto3`` library to be installed. Section name must be either ``s3`` (
|
||||
* ``object_path`` - path prefix for stored objects, string, optional. If none set, the prefix as in repository tree will be used.
|
||||
* ``region`` - bucket region (e.g. ``eu-central-1``), string, required.
|
||||
* ``secret_key`` - AWS secret access key, string, required.
|
||||
|
||||
``worker`` group
|
||||
----------------
|
||||
|
||||
This section controls settings for ``ahriman.core.distributed.WorkerTrigger`` plugin.
|
||||
|
||||
* ``address`` - address of the instance, string, required. Must be reachable for the master instance.
|
||||
* ``identifier`` - unique identifier of the instance, string, optional.
|
||||
* ``time_to_live`` - amount of time which remote worker will be considered alive in seconds, integer, optional, default is ``60``. The ping interval will be set automatically equal this value divided by 4.
|
||||
|
@ -1168,6 +1168,11 @@ Addition of new package, package removal, repository update
|
||||
|
||||
In all scenarios, update process must be run only on ``master`` node. Unlike the manually distributed packages described above, automatic update must be enabled only for ``master`` node.
|
||||
|
||||
Automatic worker nodes discovery
|
||||
""""""""""""""""""""""""""""""""
|
||||
|
||||
Instead of setting ``build.workers`` option it is also possible to configure services to load worker list dynamically. To do so, the ``ahriman.core.distributed.WorkerLoaderTrigger`` and ``ahriman.core.distributed.WorkerTrigger`` must be used for ``master`` and ``worker`` nodes repsectively. See recipes for more details.
|
||||
|
||||
Known limitations
|
||||
"""""""""""""""""
|
||||
|
||||
|
@ -14,6 +14,16 @@ Built-in triggers
|
||||
|
||||
For the configuration details and settings explanation kindly refer to the :doc:`documentation <configuration>`.
|
||||
|
||||
``ahriman.core.distributed.WorkerLoaderTrigger``
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Special trigger to be used to load workers from database on the start of the application rather than configuration. If the option is already set, it will skip processing.
|
||||
|
||||
``ahriman.core.distributed.WorkerTrigger``
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Another trigger for the distributed system, which registers itself as remote worker, calling remote service periodically.
|
||||
|
||||
``ahriman.core.gitremote.RemotePullTrigger``
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
@ -24,7 +24,7 @@ ignore_packages =
|
||||
makechrootpkg_flags =
|
||||
makepkg_flags = --nocolor --ignorearch
|
||||
triggers = ahriman.core.gitremote.RemotePullTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.gitremote.RemotePushTrigger
|
||||
triggers_known = ahriman.core.gitremote.RemotePullTrigger ahriman.core.gitremote.RemotePushTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.support.KeyringTrigger ahriman.core.support.MirrorlistTrigger
|
||||
triggers_known = ahriman.core.distributed.WorkerLoaderTrigger ahriman.core.distributed.WorkerRegisterTrigger ahriman.core.distributed.WorkerTrigger ahriman.core.distributed.WorkerUnregisterTrigger ahriman.core.gitremote.RemotePullTrigger ahriman.core.gitremote.RemotePushTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.support.KeyringTrigger ahriman.core.support.MirrorlistTrigger
|
||||
vcs_allowed_age = 604800
|
||||
|
||||
[repository]
|
||||
|
@ -1,4 +1,4 @@
|
||||
.TH AHRIMAN "1" "2023\-12\-27" "ahriman" "Generated Python Manual"
|
||||
.TH AHRIMAN "1" "2024\-01\-02" "ahriman" "Generated Python Manual"
|
||||
.SH NAME
|
||||
ahriman
|
||||
.SH SYNOPSIS
|
||||
|
@ -8,4 +8,4 @@
|
||||
6. All updates from worker instances are uploaded to the web service.
|
||||
7. Repository is available at `http://localhost:8080/repo`.
|
||||
|
||||
Note, in this configuration, workers are spawned in replicated mode, thus the backend accesses them in round-robin-like manner.
|
||||
In this example, worker list is automatically defined based on the addresses they reported.
|
||||
|
@ -77,6 +77,10 @@ services:
|
||||
interval: 10s
|
||||
start_period: 30s
|
||||
|
||||
depends_on:
|
||||
backend:
|
||||
condition: service_healthy
|
||||
|
||||
command: web
|
||||
|
||||
configs:
|
||||
|
@ -2,7 +2,7 @@
|
||||
target = configuration
|
||||
|
||||
[build]
|
||||
workers = http://worker:8080 http://worker:8080
|
||||
triggers = ahriman.core.distributed.WorkerLoaderTrigger ahriman.core.gitremote.RemotePullTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.gitremote.RemotePushTrigger
|
||||
|
||||
[status]
|
||||
username = demo
|
||||
|
@ -2,7 +2,7 @@
|
||||
target = configuration
|
||||
|
||||
[build]
|
||||
triggers = ahriman.core.upload.UploadTrigger ahriman.core.report.ReportTrigger
|
||||
triggers = ahriman.core.distributed.WorkerTrigger ahriman.core.upload.UploadTrigger ahriman.core.report.ReportTrigger
|
||||
|
||||
[status]
|
||||
address = http://backend:8080
|
||||
@ -20,3 +20,6 @@ wait_timeout = 0
|
||||
target = remote-service
|
||||
|
||||
[remote-service]
|
||||
|
||||
[worker]
|
||||
address = http://$HOSTNAME:8080
|
||||
|
@ -24,6 +24,7 @@ from collections.abc import Generator
|
||||
from ahriman.application.handlers import Handler
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.spawn import Spawn
|
||||
from ahriman.core.triggers import TriggerLoader
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
|
||||
|
||||
@ -53,13 +54,16 @@ class Web(Handler):
|
||||
spawner = Spawn(args.parser(), list(spawner_args))
|
||||
spawner.start()
|
||||
|
||||
triggers = TriggerLoader.load(repository_id, configuration)
|
||||
triggers.on_start()
|
||||
|
||||
dummy_args = argparse.Namespace(
|
||||
architecture=None,
|
||||
configuration=args.configuration,
|
||||
repository=None,
|
||||
repository_id=None,
|
||||
)
|
||||
repositories = cls.repositories_extract(dummy_args)
|
||||
repositories = Web.repositories_extract(dummy_args)
|
||||
application = setup_server(configuration, spawner, repositories)
|
||||
run_server(application)
|
||||
|
||||
|
22
src/ahriman/core/distributed/__init__.py
Normal file
22
src/ahriman/core/distributed/__init__.py
Normal file
@ -0,0 +1,22 @@
|
||||
#
|
||||
# Copyright (c) 2021-2023 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from ahriman.core.distributed.worker_loader_trigger import WorkerLoaderTrigger
|
||||
from ahriman.core.distributed.worker_trigger import WorkerTrigger
|
||||
from ahriman.core.distributed.workers_cache import WorkersCache
|
130
src/ahriman/core/distributed/distributed_system.py
Normal file
130
src/ahriman/core/distributed/distributed_system.py
Normal file
@ -0,0 +1,130 @@
|
||||
#
|
||||
# Copyright (c) 2021-2023 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import contextlib
|
||||
|
||||
from functools import cached_property
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.configuration.schema import ConfigurationSchema
|
||||
from ahriman.core.status.web_client import WebClient
|
||||
from ahriman.core.triggers import Trigger
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.models.worker import Worker
|
||||
|
||||
|
||||
class DistributedSystem(Trigger, WebClient):
|
||||
"""
|
||||
simple class to (un)register itself as a distributed worker
|
||||
"""
|
||||
|
||||
CONFIGURATION_SCHEMA: ConfigurationSchema = {
|
||||
"worker": {
|
||||
"type": "dict",
|
||||
"schema": {
|
||||
"address": {
|
||||
"type": "string",
|
||||
"required": True,
|
||||
"empty": False,
|
||||
"is_url": [],
|
||||
},
|
||||
"identifier": {
|
||||
"type": "string",
|
||||
"empty": False,
|
||||
},
|
||||
"time_to_live": {
|
||||
"type": "integer",
|
||||
"coerce": "integer",
|
||||
"min": 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None:
|
||||
"""
|
||||
default constructor
|
||||
|
||||
Args:
|
||||
repository_id(RepositoryId): repository unique identifier
|
||||
configuration(Configuration): configuration instance
|
||||
"""
|
||||
Trigger.__init__(self, repository_id, configuration)
|
||||
WebClient.__init__(self, repository_id, configuration)
|
||||
|
||||
@cached_property
|
||||
def worker(self) -> Worker:
|
||||
"""
|
||||
load and set worker. Lazy property loaded because it is not always required
|
||||
|
||||
Returns:
|
||||
Worker: unique self worker identifier
|
||||
"""
|
||||
section = next(iter(self.configuration_sections(self.configuration)))
|
||||
|
||||
address = self.configuration.get(section, "address")
|
||||
identifier = self.configuration.get(section, "identifier", fallback="")
|
||||
return Worker(address, identifier=identifier)
|
||||
|
||||
@classmethod
|
||||
def configuration_sections(cls, configuration: Configuration) -> list[str]:
|
||||
"""
|
||||
extract configuration sections from configuration
|
||||
|
||||
Args:
|
||||
configuration(Configuration): configuration instance
|
||||
|
||||
Returns:
|
||||
list[str]: read configuration sections belong to this trigger
|
||||
"""
|
||||
return list(cls.CONFIGURATION_SCHEMA.keys())
|
||||
|
||||
def _workers_url(self) -> str:
|
||||
"""
|
||||
workers url generator
|
||||
|
||||
Returns:
|
||||
str: full url of web service for workers
|
||||
"""
|
||||
return f"{self.address}/api/v1/distributed"
|
||||
|
||||
def register(self) -> None:
|
||||
"""
|
||||
register itself in remote system
|
||||
"""
|
||||
with contextlib.suppress(Exception):
|
||||
self.make_request("POST", self._workers_url(), json=self.worker.view())
|
||||
|
||||
def workers(self) -> list[Worker]:
|
||||
"""
|
||||
retrieve list of available remote workers
|
||||
|
||||
Returns:
|
||||
list[Worker]: currently registered workers
|
||||
"""
|
||||
with contextlib.suppress(Exception):
|
||||
response = self.make_request("GET", self._workers_url())
|
||||
response_json = response.json()
|
||||
|
||||
return [
|
||||
Worker(worker["address"], identifier=worker["identifier"])
|
||||
for worker in response_json
|
||||
]
|
||||
|
||||
return []
|
40
src/ahriman/core/distributed/worker_loader_trigger.py
Normal file
40
src/ahriman/core/distributed/worker_loader_trigger.py
Normal file
@ -0,0 +1,40 @@
|
||||
#
|
||||
# Copyright (c) 2021-2023 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from ahriman.core.distributed.distributed_system import DistributedSystem
|
||||
|
||||
|
||||
class WorkerLoaderTrigger(DistributedSystem):
|
||||
"""
|
||||
remote worker processor trigger (server side)
|
||||
"""
|
||||
|
||||
def on_start(self) -> None:
|
||||
"""
|
||||
trigger action which will be called at the start of the application
|
||||
"""
|
||||
if self.configuration.has_option("build", "workers"):
|
||||
return # there is manually set option
|
||||
|
||||
workers = [worker.address for worker in self.workers()]
|
||||
if not workers:
|
||||
return
|
||||
|
||||
self.logger.info("load workers %s", workers)
|
||||
self.configuration.set_option("build", "workers", " ".join(workers))
|
70
src/ahriman/core/distributed/worker_trigger.py
Normal file
70
src/ahriman/core/distributed/worker_trigger.py
Normal file
@ -0,0 +1,70 @@
|
||||
#
|
||||
# Copyright (c) 2021-2023 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from threading import Timer
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.distributed.distributed_system import DistributedSystem
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
|
||||
|
||||
class WorkerTrigger(DistributedSystem):
|
||||
"""
|
||||
remote worker processor trigger (client side)
|
||||
|
||||
Attributes:
|
||||
ping_interval(float): interval to call remote service in seconds, defined as ``worker.time_to_live / 4``
|
||||
timer(Timer): timer object
|
||||
"""
|
||||
|
||||
def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None:
|
||||
"""
|
||||
default constructor
|
||||
|
||||
Args:
|
||||
repository_id(RepositoryId): repository unique identifier
|
||||
configuration(Configuration): configuration instance
|
||||
"""
|
||||
DistributedSystem.__init__(self, repository_id, configuration)
|
||||
|
||||
section = next(iter(self.configuration_sections(configuration)))
|
||||
self.ping_interval = configuration.getint(section, "time_to_live", fallback=60) / 4.0
|
||||
self.timer = Timer(self.ping_interval, self.ping)
|
||||
|
||||
def on_start(self) -> None:
|
||||
"""
|
||||
trigger action which will be called at the start of the application
|
||||
"""
|
||||
self.logger.info("registering instance %s at %s", self.worker, self.address)
|
||||
self.timer.start()
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""
|
||||
trigger action which will be called before the stop of the application
|
||||
"""
|
||||
self.logger.info("removing instance %s at %s", self.worker, self.address)
|
||||
self.timer.cancel()
|
||||
|
||||
def ping(self) -> None:
|
||||
"""
|
||||
register itself as alive worker and update the timer
|
||||
"""
|
||||
self.register()
|
||||
self.timer = Timer(self.ping_interval, self.ping)
|
||||
self.timer.start()
|
73
src/ahriman/core/distributed/workers_cache.py
Normal file
73
src/ahriman/core/distributed/workers_cache.py
Normal file
@ -0,0 +1,73 @@
|
||||
#
|
||||
# Copyright (c) 2021-2023 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import time
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.models.worker import Worker
|
||||
|
||||
|
||||
class WorkersCache(LazyLogging):
|
||||
"""
|
||||
cached storage for healthy workers
|
||||
|
||||
Attributes:
|
||||
time_to_live(int): maximal amount of time in seconds to keep worker alive
|
||||
"""
|
||||
|
||||
def __init__(self, configuration: Configuration) -> None:
|
||||
"""
|
||||
default constructor
|
||||
|
||||
Args:
|
||||
configuration(Configuration): configuration instance
|
||||
"""
|
||||
self.time_to_live = configuration.getint("worker", "time_to_live", fallback=60)
|
||||
self._workers: dict[str, tuple[Worker, float]] = {}
|
||||
|
||||
@property
|
||||
def workers(self) -> list[Worker]:
|
||||
"""
|
||||
extract currently healthy workers
|
||||
|
||||
Returns:
|
||||
list[Worker]: list of currently registered workers which have been seen not earlier than :attr:`time_to_live`
|
||||
"""
|
||||
valid_from = time.monotonic() - self.time_to_live
|
||||
return [
|
||||
worker
|
||||
for worker, last_seen in self._workers.values()
|
||||
if last_seen > valid_from
|
||||
]
|
||||
|
||||
def workers_remove(self) -> None:
|
||||
"""
|
||||
remove all workers from the cache
|
||||
"""
|
||||
self._workers = {}
|
||||
|
||||
def workers_update(self, worker: Worker) -> None:
|
||||
"""
|
||||
register or update remote worker
|
||||
|
||||
Args:
|
||||
worker(Worker): worker to register
|
||||
"""
|
||||
self._workers[worker.identifier] = (worker, time.monotonic())
|
@ -47,8 +47,8 @@ class SyncHttpClient(LazyLogging):
|
||||
default constructor
|
||||
|
||||
Args:
|
||||
configuration(Configuration | None): configuration instance (Default value = None)
|
||||
section(str, optional): settings section name (Default value = None)
|
||||
configuration(Configuration | None, optional): configuration instance (Default value = None)
|
||||
section(str | None, optional): settings section name (Default value = None)
|
||||
suppress_errors(bool, optional): suppress logging of request errors (Default value = False)
|
||||
"""
|
||||
if configuration is None:
|
||||
|
@ -27,13 +27,44 @@ class FilteredAccessLogger(AccessLogger):
|
||||
access logger implementation with log filter enabled
|
||||
|
||||
Attributes:
|
||||
DISTRIBUTED_PATH_REGEX(str): (class attribute) regex used for distributed system uri
|
||||
HEALTH_PATH_REGEX(re.Pattern): (class attribute) regex for health check endpoint
|
||||
LOG_PATH_REGEX(re.Pattern): (class attribute) regex for logs uri
|
||||
PROCESS_PATH_REGEX(re.Pattern): (class attribute) regex for process uri
|
||||
"""
|
||||
|
||||
DISTRIBUTED_PATH_REGEX = "/api/v1/distributed"
|
||||
HEALTH_PATH_REGEX = "/api/v1/info"
|
||||
LOG_PATH_REGEX = re.compile(r"^/api/v1/packages/[^/]+/logs$")
|
||||
# technically process id is uuid, but we might change it later
|
||||
PROCESS_PATH_REGEX = re.compile(r"^/api/v1/service/process/[^/]+$")
|
||||
|
||||
@staticmethod
|
||||
def is_distributed_post(request: BaseRequest) -> bool:
|
||||
"""
|
||||
check if the request is for distributed services ping
|
||||
|
||||
Args:
|
||||
request(BaseRequest): http reqeust descriptor
|
||||
|
||||
Returns:
|
||||
bool: True in case if request is distributed service ping endpoint and False otherwise
|
||||
"""
|
||||
return request.method == "POST" and FilteredAccessLogger.DISTRIBUTED_PATH_REGEX == request.path
|
||||
|
||||
@staticmethod
|
||||
def is_info_get(request: BaseRequest) -> bool:
|
||||
"""
|
||||
check if the request is for health check
|
||||
|
||||
Args:
|
||||
request(BaseRequest): http reqeust descriptor
|
||||
|
||||
Returns:
|
||||
bool: True in case if request is health check and false otherwise
|
||||
"""
|
||||
return request.method == "GET" and FilteredAccessLogger.HEALTH_PATH_REGEX == request.path
|
||||
|
||||
@staticmethod
|
||||
def is_logs_post(request: BaseRequest) -> bool:
|
||||
"""
|
||||
@ -69,7 +100,9 @@ class FilteredAccessLogger(AccessLogger):
|
||||
response(StreamResponse): streaming response object
|
||||
time(float): log record timestamp
|
||||
"""
|
||||
if self.is_logs_post(request) \
|
||||
if self.is_distributed_post(request) \
|
||||
or self.is_info_get(request) \
|
||||
or self.is_logs_post(request) \
|
||||
or self.is_process_get(request):
|
||||
return
|
||||
AccessLogger.log(self, request, response, time)
|
||||
|
@ -118,7 +118,6 @@ class WebClient(Client, SyncAhrimanClient):
|
||||
Returns:
|
||||
str: full url of web service for specific package base
|
||||
"""
|
||||
# in case if unix socket is used we need to normalize url
|
||||
suffix = f"/{package_base}" if package_base else ""
|
||||
return f"{self.address}/api/v1/packages{suffix}"
|
||||
|
||||
|
@ -262,6 +262,6 @@ class TriggerLoader(LazyLogging):
|
||||
run triggers before the application exit
|
||||
"""
|
||||
self.logger.debug("executing triggers on stop")
|
||||
for trigger in self.triggers:
|
||||
for trigger in reversed(self.triggers):
|
||||
with self.__execute_trigger(trigger):
|
||||
trigger.on_stop()
|
||||
|
@ -18,8 +18,11 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from ahriman.core.util import dataclass_view
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Worker:
|
||||
@ -39,3 +42,12 @@ class Worker:
|
||||
update identifier based on settings
|
||||
"""
|
||||
object.__setattr__(self, "identifier", self.identifier or urlparse(self.address).netloc)
|
||||
|
||||
def view(self) -> dict[str, Any]:
|
||||
"""
|
||||
generate json patch view
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: json-friendly dictionary
|
||||
"""
|
||||
return dataclass_view(self)
|
||||
|
@ -21,6 +21,7 @@ from aiohttp.web import AppKey
|
||||
|
||||
from ahriman.core.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.distributed import WorkersCache
|
||||
from ahriman.core.spawn import Spawn
|
||||
from ahriman.core.status.watcher import Watcher
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
@ -31,6 +32,7 @@ __all__ = [
|
||||
"ConfigurationKey",
|
||||
"SpawnKey",
|
||||
"WatcherKey",
|
||||
"WorkersKey",
|
||||
]
|
||||
|
||||
|
||||
@ -38,3 +40,4 @@ AuthKey = AppKey("validator", Auth)
|
||||
ConfigurationKey = AppKey("configuration", Configuration)
|
||||
SpawnKey = AppKey("spawn", Spawn)
|
||||
WatcherKey = AppKey("watcher", dict[RepositoryId, Watcher])
|
||||
WorkersKey = AppKey("workers", WorkersCache)
|
||||
|
@ -47,5 +47,6 @@ from ahriman.web.schemas.remote_schema import RemoteSchema
|
||||
from ahriman.web.schemas.repository_id_schema import RepositoryIdSchema
|
||||
from ahriman.web.schemas.search_schema import SearchSchema
|
||||
from ahriman.web.schemas.status_schema import StatusSchema
|
||||
from ahriman.web.schemas.versioned_log_schema import VersionedLogSchema
|
||||
from ahriman.web.schemas.update_flags_schema import UpdateFlagsSchema
|
||||
from ahriman.web.schemas.versioned_log_schema import VersionedLogSchema
|
||||
from ahriman.web.schemas.worker_schema import WorkerSchema
|
||||
|
35
src/ahriman/web/schemas/worker_schema.py
Normal file
35
src/ahriman/web/schemas/worker_schema.py
Normal file
@ -0,0 +1,35 @@
|
||||
#
|
||||
# Copyright (c) 2021-2023 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from marshmallow import Schema, fields
|
||||
|
||||
|
||||
class WorkerSchema(Schema):
|
||||
"""
|
||||
request and response schema for workers
|
||||
"""
|
||||
|
||||
address = fields.String(required=True, metadata={
|
||||
"description": "Worker address",
|
||||
"example": "http://localhost:8081",
|
||||
})
|
||||
identifier = fields.String(required=True, metadata={
|
||||
"description": "Worker unique identifier",
|
||||
"example": "42f03a62-48f7-46b7-af40-dacc720e92fa",
|
||||
})
|
@ -24,12 +24,13 @@ from typing import TypeVar
|
||||
|
||||
from ahriman.core.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.distributed import WorkersCache
|
||||
from ahriman.core.sign.gpg import GPG
|
||||
from ahriman.core.spawn import Spawn
|
||||
from ahriman.core.status.watcher import Watcher
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.web.keys import AuthKey, ConfigurationKey, SpawnKey, WatcherKey
|
||||
from ahriman.web.keys import AuthKey, ConfigurationKey, SpawnKey, WatcherKey, WorkersKey
|
||||
|
||||
|
||||
T = TypeVar("T", str, list[str])
|
||||
@ -97,6 +98,16 @@ class BaseView(View, CorsViewMixin):
|
||||
"""
|
||||
return self.request.app[AuthKey]
|
||||
|
||||
@property
|
||||
def workers(self) -> WorkersCache:
|
||||
"""
|
||||
get workers cache instance
|
||||
|
||||
Returns:
|
||||
WorkersCache: workers service
|
||||
"""
|
||||
return self.request.app[WorkersKey]
|
||||
|
||||
@classmethod
|
||||
async def get_permission(cls, request: Request) -> UserAccess:
|
||||
"""
|
||||
|
19
src/ahriman/web/views/v1/distributed/__init__.py
Normal file
19
src/ahriman/web/views/v1/distributed/__init__.py
Normal file
@ -0,0 +1,19 @@
|
||||
#
|
||||
# Copyright (c) 2021-2023 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
126
src/ahriman/web/views/v1/distributed/workers.py
Normal file
126
src/ahriman/web/views/v1/distributed/workers.py
Normal file
@ -0,0 +1,126 @@
|
||||
#
|
||||
# Copyright (c) 2021-2023 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import aiohttp_apispec # type: ignore[import-untyped]
|
||||
|
||||
from collections.abc import Callable
|
||||
from aiohttp.web import HTTPBadRequest, HTTPNoContent, Response, json_response
|
||||
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.models.worker import Worker
|
||||
from ahriman.web.schemas import AuthSchema, ErrorSchema, WorkerSchema
|
||||
from ahriman.web.views.base import BaseView
|
||||
|
||||
|
||||
class WorkersView(BaseView):
|
||||
"""
|
||||
distributed workers view
|
||||
|
||||
Attributes:
|
||||
DELETE_PERMISSION(UserAccess): (class attribute) delete permissions of self
|
||||
GET_PERMISSION(UserAccess): (class attribute) get permissions of self
|
||||
POST_PERMISSION(UserAccess): (class attribute) post permissions of self
|
||||
"""
|
||||
|
||||
DELETE_PERMISSION = GET_PERMISSION = POST_PERMISSION = UserAccess.Full
|
||||
ROUTES = ["/api/v1/distributed"]
|
||||
|
||||
@aiohttp_apispec.docs(
|
||||
tags=["Distributed"],
|
||||
summary="Unregister all workers",
|
||||
description="Unregister and remove all known workers from the service",
|
||||
responses={
|
||||
204: {"description": "Success response"},
|
||||
401: {"description": "Authorization required", "schema": ErrorSchema},
|
||||
403: {"description": "Access is forbidden", "schema": ErrorSchema},
|
||||
500: {"description": "Internal server error", "schema": ErrorSchema},
|
||||
},
|
||||
security=[{"token": [DELETE_PERMISSION]}],
|
||||
)
|
||||
@aiohttp_apispec.cookies_schema(AuthSchema)
|
||||
async def delete(self) -> None:
|
||||
"""
|
||||
unregister worker
|
||||
|
||||
Raises:
|
||||
HTTPNoContent: on success response
|
||||
"""
|
||||
self.workers.workers_remove()
|
||||
|
||||
raise HTTPNoContent
|
||||
|
||||
@aiohttp_apispec.docs(
|
||||
tags=["Distributed"],
|
||||
summary="Get workers",
|
||||
description="Retrieve registered workers",
|
||||
responses={
|
||||
200: {"description": "Success response", "schema": WorkerSchema(many=True)},
|
||||
401: {"description": "Authorization required", "schema": ErrorSchema},
|
||||
403: {"description": "Access is forbidden", "schema": ErrorSchema},
|
||||
500: {"description": "Internal server error", "schema": ErrorSchema},
|
||||
},
|
||||
security=[{"token": [GET_PERMISSION]}],
|
||||
)
|
||||
@aiohttp_apispec.cookies_schema(AuthSchema)
|
||||
async def get(self) -> Response:
|
||||
"""
|
||||
get workers list
|
||||
|
||||
Returns:
|
||||
Response: 200 with workers list on success
|
||||
"""
|
||||
workers = self.workers.workers
|
||||
|
||||
comparator: Callable[[Worker], str] = lambda item: item.identifier
|
||||
response = [worker.view() for worker in sorted(workers, key=comparator)]
|
||||
|
||||
return json_response(response)
|
||||
|
||||
@aiohttp_apispec.docs(
|
||||
tags=["Distributed"],
|
||||
summary="Register worker",
|
||||
description="Register or update remote worker",
|
||||
responses={
|
||||
204: {"description": "Success response"},
|
||||
400: {"description": "Bad data is supplied", "schema": ErrorSchema},
|
||||
401: {"description": "Authorization required", "schema": ErrorSchema},
|
||||
403: {"description": "Access is forbidden", "schema": ErrorSchema},
|
||||
500: {"description": "Internal server error", "schema": ErrorSchema},
|
||||
},
|
||||
security=[{"token": [POST_PERMISSION]}],
|
||||
)
|
||||
@aiohttp_apispec.cookies_schema(AuthSchema)
|
||||
@aiohttp_apispec.json_schema(WorkerSchema)
|
||||
async def post(self) -> None:
|
||||
"""
|
||||
register remote worker
|
||||
|
||||
Raises:
|
||||
HTTPBadRequest: if bad data is supplied
|
||||
HTTPNoContent: in case of success response
|
||||
"""
|
||||
try:
|
||||
data = await self.request.json()
|
||||
worker = Worker(data["address"], identifier=data["identifier"])
|
||||
except Exception as ex:
|
||||
raise HTTPBadRequest(reason=str(ex))
|
||||
|
||||
self.workers.workers_update(worker)
|
||||
|
||||
raise HTTPNoContent
|
@ -27,6 +27,7 @@ from aiohttp.web import Application, normalize_path_middleware, run_app
|
||||
from ahriman.core.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.database import SQLite
|
||||
from ahriman.core.distributed import WorkersCache
|
||||
from ahriman.core.exceptions import InitializeError
|
||||
from ahriman.core.log.filtered_access_logger import FilteredAccessLogger
|
||||
from ahriman.core.spawn import Spawn
|
||||
@ -34,7 +35,7 @@ from ahriman.core.status.watcher import Watcher
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.web.apispec import setup_apispec
|
||||
from ahriman.web.cors import setup_cors
|
||||
from ahriman.web.keys import AuthKey, ConfigurationKey, SpawnKey, WatcherKey
|
||||
from ahriman.web.keys import AuthKey, ConfigurationKey, SpawnKey, WatcherKey, WorkersKey
|
||||
from ahriman.web.middlewares.exception_handler import exception_handler
|
||||
from ahriman.web.routes import setup_routes
|
||||
|
||||
@ -159,7 +160,8 @@ def setup_server(configuration: Configuration, spawner: Spawn, repositories: lis
|
||||
application.logger.info("setup configuration")
|
||||
application[ConfigurationKey] = configuration
|
||||
|
||||
application.logger.info("setup watchers")
|
||||
application.logger.info("setup services")
|
||||
# package cache
|
||||
if not repositories:
|
||||
raise InitializeError("No repositories configured, exiting")
|
||||
database = SQLite.load(configuration)
|
||||
@ -168,8 +170,9 @@ def setup_server(configuration: Configuration, spawner: Spawn, repositories: lis
|
||||
application.logger.info("load repository %s", repository_id)
|
||||
watchers[repository_id] = Watcher(repository_id, database)
|
||||
application[WatcherKey] = watchers
|
||||
|
||||
application.logger.info("setup process spawner")
|
||||
# workers cache
|
||||
application[WorkersKey] = WorkersCache(configuration)
|
||||
# process spawner
|
||||
application[SpawnKey] = spawner
|
||||
|
||||
application.logger.info("setup authorization")
|
||||
|
@ -79,6 +79,7 @@ def test_schema(configuration: Configuration) -> None:
|
||||
assert schema.pop("s3")
|
||||
assert schema.pop("telegram")
|
||||
assert schema.pop("upload")
|
||||
assert schema.pop("worker")
|
||||
|
||||
assert schema == CONFIGURATION_SCHEMA
|
||||
|
||||
|
@ -39,6 +39,7 @@ def test_run(args: argparse.Namespace, configuration: Configuration, repository:
|
||||
setup_mock = mocker.patch("ahriman.web.web.setup_server")
|
||||
run_mock = mocker.patch("ahriman.web.web.run_server")
|
||||
start_mock = mocker.patch("ahriman.core.spawn.Spawn.start")
|
||||
trigger_mock = mocker.patch("ahriman.core.triggers.TriggerLoader.load")
|
||||
stop_mock = mocker.patch("ahriman.core.spawn.Spawn.stop")
|
||||
join_mock = mocker.patch("ahriman.core.spawn.Spawn.join")
|
||||
_, repository_id = configuration.check_loaded()
|
||||
@ -48,6 +49,8 @@ def test_run(args: argparse.Namespace, configuration: Configuration, repository:
|
||||
setup_mock.assert_called_once_with(configuration, pytest.helpers.anyvar(int), [repository_id])
|
||||
run_mock.assert_called_once_with(pytest.helpers.anyvar(int))
|
||||
start_mock.assert_called_once_with()
|
||||
trigger_mock.assert_called_once_with(repository_id, configuration)
|
||||
trigger_mock().on_start.assert_called_once_with()
|
||||
stop_mock.assert_called_once_with()
|
||||
join_mock.assert_called_once_with()
|
||||
|
||||
|
35
tests/ahriman/core/distributed/conftest.py
Normal file
35
tests/ahriman/core/distributed/conftest.py
Normal file
@ -0,0 +1,35 @@
|
||||
import pytest
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.distributed import WorkersCache
|
||||
from ahriman.core.distributed.distributed_system import DistributedSystem
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def distributed_system(configuration: Configuration) -> DistributedSystem:
|
||||
"""
|
||||
distributed system fixture
|
||||
|
||||
Args:
|
||||
configuration(Configuration): configuration fixture
|
||||
|
||||
Returns:
|
||||
DistributedSystem: distributed system test instance
|
||||
"""
|
||||
configuration.set_option("status", "address", "http://localhost:8081")
|
||||
_, repository_id = configuration.check_loaded()
|
||||
return DistributedSystem(repository_id, configuration)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def workers_cache(configuration: Configuration) -> WorkersCache:
|
||||
"""
|
||||
workers cache fixture
|
||||
|
||||
Args:
|
||||
configuration(Configuration): configuration fixture
|
||||
|
||||
Returns:
|
||||
WorkersCache: workers cache test instance
|
||||
"""
|
||||
return WorkersCache(configuration)
|
82
tests/ahriman/core/distributed/test_distributed_system.py
Normal file
82
tests/ahriman/core/distributed/test_distributed_system.py
Normal file
@ -0,0 +1,82 @@
|
||||
import json
|
||||
import requests
|
||||
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.distributed.distributed_system import DistributedSystem
|
||||
from ahriman.models.worker import Worker
|
||||
|
||||
|
||||
def test_configuration_sections(configuration: Configuration) -> None:
|
||||
"""
|
||||
must correctly parse target list
|
||||
"""
|
||||
assert DistributedSystem.configuration_sections(configuration) == ["worker"]
|
||||
|
||||
|
||||
def test_workers_url(distributed_system: DistributedSystem) -> None:
|
||||
"""
|
||||
must generate workers url correctly
|
||||
"""
|
||||
assert distributed_system._workers_url().startswith(distributed_system.address)
|
||||
assert distributed_system._workers_url().endswith("/api/v1/distributed")
|
||||
|
||||
|
||||
def test_register(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must register service
|
||||
"""
|
||||
run_mock = mocker.patch("ahriman.core.distributed.distributed_system.DistributedSystem.make_request")
|
||||
distributed_system.register()
|
||||
run_mock.assert_called_once_with("POST", f"{distributed_system.address}/api/v1/distributed",
|
||||
json=distributed_system.worker.view())
|
||||
|
||||
|
||||
def test_register_failed(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must suppress any exception happened during worker registration
|
||||
"""
|
||||
mocker.patch("requests.Session.request", side_effect=Exception())
|
||||
distributed_system.register()
|
||||
|
||||
|
||||
def test_register_failed_http_error(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must suppress HTTP exception happened during worker registration
|
||||
"""
|
||||
mocker.patch("requests.Session.request", side_effect=requests.HTTPError())
|
||||
distributed_system.register()
|
||||
|
||||
|
||||
def test_workers(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must return available remote workers
|
||||
"""
|
||||
worker = Worker("remote")
|
||||
response_obj = requests.Response()
|
||||
response_obj._content = json.dumps([worker.view()]).encode("utf8")
|
||||
response_obj.status_code = 200
|
||||
|
||||
requests_mock = mocker.patch("ahriman.core.status.web_client.WebClient.make_request",
|
||||
return_value=response_obj)
|
||||
|
||||
result = distributed_system.workers()
|
||||
requests_mock.assert_called_once_with("GET", distributed_system._workers_url())
|
||||
assert result == [worker]
|
||||
|
||||
|
||||
def test_workers_failed(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must suppress any exception happened during worker extraction
|
||||
"""
|
||||
mocker.patch("requests.Session.request", side_effect=Exception())
|
||||
distributed_system.workers()
|
||||
|
||||
|
||||
def test_workers_failed_http_error(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must suppress HTTP exception happened during worker extraction
|
||||
"""
|
||||
mocker.patch("requests.Session.request", side_effect=requests.HTTPError())
|
||||
distributed_system.workers()
|
47
tests/ahriman/core/distributed/test_worker_loader_trigger.py
Normal file
47
tests/ahriman/core/distributed/test_worker_loader_trigger.py
Normal file
@ -0,0 +1,47 @@
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.distributed import WorkerLoaderTrigger
|
||||
from ahriman.models.worker import Worker
|
||||
|
||||
|
||||
def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must load workers from remote
|
||||
"""
|
||||
worker = Worker("address")
|
||||
configuration.set_option("status", "address", "http://localhost:8081")
|
||||
run_mock = mocker.patch("ahriman.core.distributed.WorkerLoaderTrigger.workers", return_value=[worker])
|
||||
_, repository_id = configuration.check_loaded()
|
||||
|
||||
trigger = WorkerLoaderTrigger(repository_id, configuration)
|
||||
trigger.on_start()
|
||||
run_mock.assert_called_once_with()
|
||||
assert configuration.getlist("build", "workers") == [worker.address]
|
||||
|
||||
|
||||
def test_on_start_skip(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must skip loading if option is already set
|
||||
"""
|
||||
configuration.set_option("status", "address", "http://localhost:8081")
|
||||
configuration.set_option("build", "workers", "address")
|
||||
run_mock = mocker.patch("ahriman.core.distributed.WorkerLoaderTrigger.workers")
|
||||
_, repository_id = configuration.check_loaded()
|
||||
|
||||
trigger = WorkerLoaderTrigger(repository_id, configuration)
|
||||
trigger.on_start()
|
||||
run_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_on_start_empty_list(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must do not set anything if workers are not available
|
||||
"""
|
||||
configuration.set_option("status", "address", "http://localhost:8081")
|
||||
mocker.patch("ahriman.core.distributed.WorkerLoaderTrigger.workers", return_value=[])
|
||||
_, repository_id = configuration.check_loaded()
|
||||
|
||||
trigger = WorkerLoaderTrigger(repository_id, configuration)
|
||||
trigger.on_start()
|
||||
assert not configuration.has_option("build", "workers")
|
52
tests/ahriman/core/distributed/test_worker_trigger.py
Normal file
52
tests/ahriman/core/distributed/test_worker_trigger.py
Normal file
@ -0,0 +1,52 @@
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.distributed import WorkerTrigger
|
||||
|
||||
|
||||
def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must register itself as worker
|
||||
"""
|
||||
configuration.set_option("status", "address", "http://localhost:8081")
|
||||
run_mock = mocker.patch("threading.Timer.start")
|
||||
_, repository_id = configuration.check_loaded()
|
||||
|
||||
WorkerTrigger(repository_id, configuration).on_start()
|
||||
run_mock.assert_called_once_with()
|
||||
|
||||
|
||||
def test_on_stop(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must unregister itself as worker
|
||||
"""
|
||||
configuration.set_option("status", "address", "http://localhost:8081")
|
||||
run_mock = mocker.patch("threading.Timer.cancel")
|
||||
_, repository_id = configuration.check_loaded()
|
||||
|
||||
WorkerTrigger(repository_id, configuration).on_stop()
|
||||
run_mock.assert_called_once_with()
|
||||
|
||||
|
||||
def test_on_stop_empty_timer(configuration: Configuration) -> None:
|
||||
"""
|
||||
must do not fail if no timer was started
|
||||
"""
|
||||
configuration.set_option("status", "address", "http://localhost:8081")
|
||||
_, repository_id = configuration.check_loaded()
|
||||
|
||||
WorkerTrigger(repository_id, configuration).on_stop()
|
||||
|
||||
|
||||
def test_ping(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly process timer action
|
||||
"""
|
||||
configuration.set_option("status", "address", "http://localhost:8081")
|
||||
run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register")
|
||||
timer_mock = mocker.patch("threading.Timer.start")
|
||||
_, repository_id = configuration.check_loaded()
|
||||
|
||||
WorkerTrigger(repository_id, configuration).ping()
|
||||
run_mock.assert_called_once_with()
|
||||
timer_mock.assert_called_once_with()
|
43
tests/ahriman/core/distributed/test_workers_cache.py
Normal file
43
tests/ahriman/core/distributed/test_workers_cache.py
Normal file
@ -0,0 +1,43 @@
|
||||
import time
|
||||
|
||||
from ahriman.core.distributed import WorkersCache
|
||||
from ahriman.models.worker import Worker
|
||||
|
||||
|
||||
def test_workers(workers_cache: WorkersCache) -> None:
|
||||
"""
|
||||
must return alive workers
|
||||
"""
|
||||
workers_cache._workers = {
|
||||
str(index): (Worker(f"address{index}"), index)
|
||||
for index in range(2)
|
||||
}
|
||||
workers_cache.time_to_live = time.monotonic()
|
||||
|
||||
assert workers_cache.workers == [Worker("address1")]
|
||||
|
||||
|
||||
def test_workers_remove(workers_cache: WorkersCache) -> None:
|
||||
"""
|
||||
must remove all workers
|
||||
"""
|
||||
workers_cache.workers_update(Worker("address"))
|
||||
assert workers_cache.workers
|
||||
|
||||
workers_cache.workers_remove()
|
||||
assert not workers_cache.workers
|
||||
|
||||
|
||||
def test_workers_update(workers_cache: WorkersCache) -> None:
|
||||
"""
|
||||
must update worker
|
||||
"""
|
||||
worker = Worker("address")
|
||||
|
||||
workers_cache.workers_update(worker)
|
||||
assert workers_cache.workers == [worker]
|
||||
_, first_last_seen = workers_cache._workers[worker.identifier]
|
||||
|
||||
workers_cache.workers_update(worker)
|
||||
_, second_last_seen = workers_cache._workers[worker.identifier]
|
||||
assert first_last_seen < second_last_seen
|
@ -4,6 +4,44 @@ from unittest.mock import MagicMock
|
||||
from ahriman.core.log.filtered_access_logger import FilteredAccessLogger
|
||||
|
||||
|
||||
def test_is_distributed_post() -> None:
|
||||
"""
|
||||
must correctly define distributed services ping request
|
||||
"""
|
||||
request = MagicMock()
|
||||
|
||||
request.method = "POST"
|
||||
request.path = "/api/v1/distributed"
|
||||
assert FilteredAccessLogger.is_distributed_post(request)
|
||||
|
||||
request.method = "GET"
|
||||
request.path = "/api/v1/distributed"
|
||||
assert not FilteredAccessLogger.is_distributed_post(request)
|
||||
|
||||
request.method = "POST"
|
||||
request.path = "/api/v1/distributed/path"
|
||||
assert not FilteredAccessLogger.is_distributed_post(request)
|
||||
|
||||
|
||||
def test_is_info_get() -> None:
|
||||
"""
|
||||
must correctly define health check request
|
||||
"""
|
||||
request = MagicMock()
|
||||
|
||||
request.method = "GET"
|
||||
request.path = "/api/v1/info"
|
||||
assert FilteredAccessLogger.is_info_get(request)
|
||||
|
||||
request.method = "POST"
|
||||
request.path = "/api/v1/info"
|
||||
assert not FilteredAccessLogger.is_info_get(request)
|
||||
|
||||
request.method = "GET"
|
||||
request.path = "/api/v1/infos"
|
||||
assert not FilteredAccessLogger.is_info_get(request)
|
||||
|
||||
|
||||
def test_is_logs_post() -> None:
|
||||
"""
|
||||
must correctly define if request belongs to logs posting
|
||||
|
@ -12,6 +12,7 @@ from ahriman.models.changes import Changes
|
||||
from ahriman.models.internal_status import InternalStatus
|
||||
from ahriman.models.log_record_id import LogRecordId
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.worker import Worker
|
||||
|
||||
|
||||
def test_parse_address(configuration: Configuration) -> None:
|
||||
@ -32,14 +33,6 @@ def test_parse_address(configuration: Configuration) -> None:
|
||||
assert WebClient.parse_address(configuration) == ("status", "http://localhost:8082")
|
||||
|
||||
|
||||
def test_status_url(web_client: WebClient) -> None:
|
||||
"""
|
||||
must generate package status url correctly
|
||||
"""
|
||||
assert web_client._status_url().startswith(web_client.address)
|
||||
assert web_client._status_url().endswith("/api/v1/status")
|
||||
|
||||
|
||||
def test_changes_url(web_client: WebClient, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must generate changes url correctly
|
||||
@ -67,6 +60,14 @@ def test_package_url(web_client: WebClient, package_ahriman: Package) -> None:
|
||||
assert web_client._package_url(package_ahriman.base).endswith(f"/api/v1/packages/{package_ahriman.base}")
|
||||
|
||||
|
||||
def test_status_url(web_client: WebClient) -> None:
|
||||
"""
|
||||
must generate package status url correctly
|
||||
"""
|
||||
assert web_client._status_url().startswith(web_client.address)
|
||||
assert web_client._status_url().endswith("/api/v1/status")
|
||||
|
||||
|
||||
def test_package_add(web_client: WebClient, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must process package addition
|
||||
|
@ -8,3 +8,17 @@ def test_post_init() -> None:
|
||||
assert Worker("http://localhost:8080").identifier == "localhost:8080"
|
||||
assert Worker("remote").identifier == "" # not a valid url
|
||||
assert Worker("remote", identifier="id").identifier == "id"
|
||||
|
||||
|
||||
def test_view() -> None:
|
||||
"""
|
||||
must generate json view
|
||||
"""
|
||||
worker = Worker("address")
|
||||
assert worker.view() == {"address": worker.address, "identifier": worker.identifier}
|
||||
|
||||
worker = Worker("http://localhost:8080")
|
||||
assert worker.view() == {"address": worker.address, "identifier": worker.identifier}
|
||||
|
||||
worker = Worker("http://localhost:8080", identifier="abc")
|
||||
assert worker.view() == {"address": worker.address, "identifier": worker.identifier}
|
||||
|
1
tests/ahriman/web/schemas/test_worker_schema.py
Normal file
1
tests/ahriman/web/schemas/test_worker_schema.py
Normal file
@ -0,0 +1 @@
|
||||
# schema testing goes in view class tests
|
@ -50,11 +50,18 @@ def test_spawn(base: BaseView) -> None:
|
||||
|
||||
def test_validator(base: BaseView) -> None:
|
||||
"""
|
||||
must return service
|
||||
must return validator service
|
||||
"""
|
||||
assert base.validator
|
||||
|
||||
|
||||
def test_workers(base: BaseView) -> None:
|
||||
"""
|
||||
must return worker service
|
||||
"""
|
||||
assert base.workers
|
||||
|
||||
|
||||
async def test_get_permission(base: BaseView) -> None:
|
||||
"""
|
||||
must search for permission attribute in class
|
||||
|
@ -0,0 +1,80 @@
|
||||
import pytest
|
||||
|
||||
from aiohttp.test_utils import TestClient
|
||||
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.models.worker import Worker
|
||||
from ahriman.web.views.v1.distributed.workers import WorkersView
|
||||
|
||||
|
||||
async def test_get_permission() -> None:
|
||||
"""
|
||||
must return correct permission for the request
|
||||
"""
|
||||
for method in ("DELETE", "GET", "POST"):
|
||||
request = pytest.helpers.request("", "", method)
|
||||
assert await WorkersView.get_permission(request) == UserAccess.Full
|
||||
|
||||
|
||||
def test_routes() -> None:
|
||||
"""
|
||||
must return correct routes
|
||||
"""
|
||||
assert WorkersView.ROUTES == ["/api/v1/distributed"]
|
||||
|
||||
|
||||
async def test_delete(client: TestClient) -> None:
|
||||
"""
|
||||
must delete all workers
|
||||
"""
|
||||
await client.post("/api/v1/distributed", json={"address": "address1", "identifier": "1"})
|
||||
await client.post("/api/v1/distributed", json={"address": "address2", "identifier": "2"})
|
||||
|
||||
response = await client.delete("/api/v1/distributed")
|
||||
assert response.status == 204
|
||||
|
||||
response = await client.get("/api/v1/distributed")
|
||||
json = await response.json()
|
||||
assert not json
|
||||
|
||||
|
||||
async def test_get(client: TestClient) -> None:
|
||||
"""
|
||||
must return all workers
|
||||
"""
|
||||
await client.post("/api/v1/distributed", json={"address": "address1", "identifier": "1"})
|
||||
await client.post("/api/v1/distributed", json={"address": "address2", "identifier": "2"})
|
||||
response_schema = pytest.helpers.schema_response(WorkersView.get)
|
||||
|
||||
response = await client.get("/api/v1/distributed")
|
||||
assert response.ok
|
||||
json = await response.json()
|
||||
assert not response_schema.validate(json, many=True)
|
||||
|
||||
workers = [Worker(item["address"], identifier=item["identifier"]) for item in json]
|
||||
assert workers == [Worker("address1", identifier="1"), Worker("address2", identifier="2")]
|
||||
|
||||
|
||||
async def test_post(client: TestClient) -> None:
|
||||
"""
|
||||
must update worker
|
||||
"""
|
||||
worker = Worker("address1", identifier="1")
|
||||
request_schema = pytest.helpers.schema_request(WorkersView.post)
|
||||
|
||||
payload = worker.view()
|
||||
assert not request_schema.validate(payload)
|
||||
|
||||
response = await client.post("/api/v1/distributed", json=payload)
|
||||
assert response.status == 204
|
||||
|
||||
|
||||
async def test_post_exception(client: TestClient) -> None:
|
||||
"""
|
||||
must raise exception on invalid payload
|
||||
"""
|
||||
response_schema = pytest.helpers.schema_response(WorkersView.post, code=400)
|
||||
|
||||
response = await client.post("/api/v1/distributed", json={})
|
||||
assert response.status == 400
|
||||
assert not response_schema.validate(await response.json())
|
@ -25,7 +25,7 @@ ignore_packages =
|
||||
makechrootpkg_flags =
|
||||
makepkg_flags = --skippgpcheck
|
||||
triggers = ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger
|
||||
triggers_known = ahriman.core.gitremote.RemotePullTrigger ahriman.core.gitremote.RemotePushTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.support.KeyringTrigger ahriman.core.support.MirrorlistTrigger
|
||||
triggers_known = ahriman.core.distributed.WorkerLoaderTrigger ahriman.core.distributed.WorkerRegisterTrigger ahriman.core.distributed.WorkerTrigger ahriman.core.distributed.WorkerUnregisterTrigger ahriman.core.gitremote.RemotePullTrigger ahriman.core.gitremote.RemotePushTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.support.KeyringTrigger ahriman.core.support.MirrorlistTrigger
|
||||
|
||||
[repository]
|
||||
name = aur-clone
|
||||
@ -116,3 +116,6 @@ enable_archive_upload = yes
|
||||
host = 127.0.0.1
|
||||
static_path = ../web/templates/static
|
||||
templates = ../web/templates
|
||||
|
||||
[worker]
|
||||
address = http://localhost:8081
|
||||
|
Loading…
Reference in New Issue
Block a user