diff --git a/src/ahriman/application/ahriman.py b/src/ahriman/application/ahriman.py
index 24ace99d..5bddaf9e 100644
--- a/src/ahriman/application/ahriman.py
+++ b/src/ahriman/application/ahriman.py
@@ -135,6 +135,8 @@ def _parser() -> argparse.ArgumentParser:
_set_service_setup_parser(subparsers)
_set_service_shell_parser(subparsers)
_set_service_tree_migrate_parser(subparsers)
+ _set_service_worker_register_parser(subparsers)
+ _set_service_worker_unregister_parser(subparsers)
_set_user_add_parser(subparsers)
_set_user_list_parser(subparsers)
_set_user_remove_parser(subparsers)
@@ -1052,6 +1054,40 @@ def _set_service_tree_migrate_parser(root: SubParserAction) -> argparse.Argument
return parser
+def _set_service_worker_register_parser(root: SubParserAction) -> argparse.ArgumentParser:
+ """
+ add parser for remote worker registration subcommand
+
+ Args:
+ root(SubParserAction): subparsers for the commands
+
+ Returns:
+ argparse.ArgumentParser: created argument parser
+ """
+ parser = root.add_parser("service-worker-register", help="register itself as worker",
+ description="call remote service registering itself as available worker",
+ formatter_class=_formatter)
+ parser.set_defaults(handler=handlers.Triggers, trigger=["ahriman.core.distributed.WorkerRegisterTrigger"])
+ return parser
+
+
+def _set_service_worker_unregister_parser(root: SubParserAction) -> argparse.ArgumentParser:
+ """
+ add parser for remote worker removal subcommand
+
+ Args:
+ root(SubParserAction): subparsers for the commands
+
+ Returns:
+ argparse.ArgumentParser: created argument parser
+ """
+ parser = root.add_parser("service-worker-unregister", help="unregister itself as worker",
+ description="call remote service removing itself from list of available workers",
+ formatter_class=_formatter)
+ parser.set_defaults(handler=handlers.Triggers, trigger=["ahriman.core.distributed.WorkerUnregisterTrigger"])
+ return parser
+
+
def _set_user_add_parser(root: SubParserAction) -> argparse.ArgumentParser:
"""
add parser for create user subcommand
diff --git a/src/ahriman/application/handlers/web.py b/src/ahriman/application/handlers/web.py
index 7baaaf70..39098d55 100644
--- a/src/ahriman/application/handlers/web.py
+++ b/src/ahriman/application/handlers/web.py
@@ -24,6 +24,7 @@ from collections.abc import Generator
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
from ahriman.core.spawn import Spawn
+from ahriman.core.triggers import TriggerLoader
from ahriman.models.repository_id import RepositoryId
@@ -53,13 +54,16 @@ class Web(Handler):
spawner = Spawn(args.parser(), list(spawner_args))
spawner.start()
+ triggers = TriggerLoader.load(repository_id, configuration)
+ triggers.on_start()
+
dummy_args = argparse.Namespace(
architecture=None,
configuration=args.configuration,
repository=None,
repository_id=None,
)
- repositories = cls.repositories_extract(dummy_args)
+ repositories = Web.repositories_extract(dummy_args)
application = setup_server(configuration, spawner, repositories)
run_server(application)
diff --git a/src/ahriman/core/database/migrations/m013_workers.py b/src/ahriman/core/database/migrations/m013_workers.py
new file mode 100644
index 00000000..a5a1098c
--- /dev/null
+++ b/src/ahriman/core/database/migrations/m013_workers.py
@@ -0,0 +1,31 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+__all__ = ["steps"]
+
+
+steps = [
+ """
+ create table workers (
+ identifier text not null,
+ address text not null,
+ unique (identifier)
+ )
+ """
+]
diff --git a/src/ahriman/core/database/operations/__init__.py b/src/ahriman/core/database/operations/__init__.py
index 13ca0404..0fa35c84 100644
--- a/src/ahriman/core/database/operations/__init__.py
+++ b/src/ahriman/core/database/operations/__init__.py
@@ -25,3 +25,4 @@ from ahriman.core.database.operations.changes_operations import ChangesOperation
from ahriman.core.database.operations.logs_operations import LogsOperations
from ahriman.core.database.operations.package_operations import PackageOperations
from ahriman.core.database.operations.patch_operations import PatchOperations
+from ahriman.core.database.operations.workers_operations import WorkersOperations
diff --git a/src/ahriman/core/database/operations/workers_operations.py b/src/ahriman/core/database/operations/workers_operations.py
new file mode 100644
index 00000000..8703515f
--- /dev/null
+++ b/src/ahriman/core/database/operations/workers_operations.py
@@ -0,0 +1,83 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+from sqlite3 import Connection
+
+from ahriman.core.database.operations import Operations
+from ahriman.models.worker import Worker
+
+
+class WorkersOperations(Operations):
+ """
+ operations for remote workers
+ """
+
+ def workers_get(self) -> list[Worker]:
+ """
+ retrieve registered workers
+
+ Returns:
+ list[Worker]: list of available workers
+ """
+ def run(connection: Connection) -> list[Worker]:
+ return [
+ Worker(row["address"], identifier=row["identifier"])
+ for row in connection.execute("""select * from workers""")
+ ]
+
+ return self.with_connection(run)
+
+ def workers_insert(self, worker: Worker) -> None:
+ """
+ insert or update worker in database
+
+ Args:
+ worker(Worker): remote worker descriptor
+ """
+ def run(connection: Connection) -> None:
+ connection.execute(
+ """
+ insert into workers
+ (identifier, address)
+ values
+ (:identifier, :address)
+ on conflict (identifier) do update set
+ address = :address
+ """,
+ worker.view()
+ )
+
+ return self.with_connection(run, commit=True)
+
+ def workers_remove(self, identifier: str | None = None) -> None:
+ """
+ unregister remote worker
+
+ Args:
+ identifier(str | None, optional): remote worker identifier. If none set it will clear all workers
+ (Default value = None)
+ """
+ def run(connection: Connection) -> None:
+ connection.execute(
+ """
+ delete from workers where (:identifier is null or identifier = :identifier)
+ """,
+ {"identifier": identifier})
+
+ return self.with_connection(run, commit=True)
diff --git a/src/ahriman/core/database/sqlite.py b/src/ahriman/core/database/sqlite.py
index 663729ec..2a7c992f 100644
--- a/src/ahriman/core/database/sqlite.py
+++ b/src/ahriman/core/database/sqlite.py
@@ -26,11 +26,12 @@ from typing import Self
from ahriman.core.configuration import Configuration
from ahriman.core.database.migrations import Migrations
from ahriman.core.database.operations import AuthOperations, BuildOperations, ChangesOperations, LogsOperations, \
- PackageOperations, PatchOperations
+ PackageOperations, PatchOperations, WorkersOperations
# pylint: disable=too-many-ancestors
-class SQLite(AuthOperations, BuildOperations, ChangesOperations, LogsOperations, PackageOperations, PatchOperations):
+class SQLite(AuthOperations, BuildOperations, ChangesOperations, LogsOperations, PackageOperations, PatchOperations,
+ WorkersOperations):
"""
wrapper for sqlite3 database
diff --git a/src/ahriman/core/distributed/__init__.py b/src/ahriman/core/distributed/__init__.py
new file mode 100644
index 00000000..15de7396
--- /dev/null
+++ b/src/ahriman/core/distributed/__init__.py
@@ -0,0 +1,23 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+from ahriman.core.distributed.worker_loader_trigger import WorkerLoaderTrigger
+from ahriman.core.distributed.worker_register_trigger import WorkerRegisterTrigger
+from ahriman.core.distributed.worker_trigger import WorkerTrigger
+from ahriman.core.distributed.worker_unregister_trigger import WorkerUnregisterTrigger
diff --git a/src/ahriman/core/distributed/distributed_system.py b/src/ahriman/core/distributed/distributed_system.py
new file mode 100644
index 00000000..7ed059e0
--- /dev/null
+++ b/src/ahriman/core/distributed/distributed_system.py
@@ -0,0 +1,170 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+import tempfile
+import uuid
+
+from pathlib import Path
+
+from ahriman.core.configuration import Configuration
+from ahriman.core.configuration.schema import ConfigurationSchema
+from ahriman.core.status.web_client import WebClient
+from ahriman.core.triggers import Trigger
+from ahriman.models.repository_id import RepositoryId
+from ahriman.models.worker import Worker
+
+
+class DistributedSystem(Trigger, WebClient):
+ """
+ simple class to (un)register itself as a distributed worker
+
+ Attributes:
+ identifier_path(Path): path to cached worker identifier
+ worker(Worker): unique self identifier
+ """
+
+ CONFIGURATION_SCHEMA: ConfigurationSchema = {
+ "worker": {
+ "type": "dict",
+ "schema": {
+ "address": {
+ "type": "string",
+ "required": True,
+ "empty": False,
+ "is_url": [],
+ },
+ "identifier": {
+ "type": "string",
+ "empty": False,
+ },
+ "identifier_path": {
+ "type": "path",
+ "coerce": "absolute_path",
+ },
+ },
+ },
+ }
+
+ def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None:
+ """
+ default constructor
+
+ Args:
+ repository_id(RepositoryId): repository unique identifier
+ configuration(Configuration): configuration instance
+ """
+ Trigger.__init__(self, repository_id, configuration)
+ WebClient.__init__(self, repository_id, configuration)
+
+ section = next(iter(self.configuration_sections(configuration)))
+ self.identifier_path = configuration.getpath(
+ section, "identifier_path", fallback=Path(tempfile.gettempdir()) / "ahriman-worker-identifier")
+ self._owe_identifier = False
+
+ identifier = self.load_identifier(configuration, section)
+ self.worker = Worker(configuration.get(section, "address"), identifier=identifier)
+
+ @classmethod
+ def configuration_sections(cls, configuration: Configuration) -> list[str]:
+ """
+ extract configuration sections from configuration
+
+ Args:
+ configuration(Configuration): configuration instance
+
+ Returns:
+ list[str]: read configuration sections belong to this trigger
+ """
+ return list(cls.CONFIGURATION_SCHEMA.keys())
+
+ def _workers_url(self, identifier: str = "") -> str:
+ """
+ workers url generator
+
+ Args:
+ identifier(str, optional): worker identifier (Default value = "")
+
+ Returns:
+ str: full url of web service for specific worker
+ """
+ suffix = f"/{identifier}" if identifier else ""
+ return f"{self.address}/api/v1/distributed{suffix}"
+
+ def load_identifier(self, configuration: Configuration, section: str) -> str:
+ """
+ load identifier from filesystem if available or from configuration otherwise. If cache file is available,
+ the method will read from it. Otherwise, it will try to read it from configuration. And, finally, if no
+ identifier set, it will generate uuid
+
+ Args:
+ configuration(Configuration): configuration instance
+ section(str): settings section name
+
+ Returns:
+ str: unique worker identifier
+ """
+ if self.identifier_path.is_file(): # load cached value
+ return self.identifier_path.read_text(encoding="utf8")
+ return configuration.get(section, "identifier", fallback=str(uuid.uuid4()))
+
+ def register(self, force: bool = False) -> None:
+ """
+ register itself in remote system
+
+ Args:
+ force(bool, optional): register worker even if it has been already registered before (Default value = False)
+ """
+ if self.identifier_path.is_file() and not force:
+ return # there is already registered identifier
+
+ self.make_request("POST", self._workers_url(), json=self.worker.view())
+ # save identifier
+ self.identifier_path.write_text(self.worker.identifier, encoding="utf8")
+ self._owe_identifier = True
+ self.logger.info("registered instance %s at %s", self.worker, self.address)
+
+ def unregister(self, force: bool = False) -> None:
+ """
+ unregister itself in remote system
+
+ Args:
+ force(bool, optional): unregister worker even if it has been registered in another process
+ (Default value = False)
+ """
+ if not self._owe_identifier and not force:
+ return # we do not owe this identifier
+
+ self.make_request("DELETE", self._workers_url(self.worker.identifier))
+ self.identifier_path.unlink(missing_ok=True)
+ self.logger.info("unregistered instance %s at %s", self.worker, self.address)
+
+ def workers(self) -> list[Worker]:
+ """
+ retrieve list of available remote workers
+
+ Returns:
+ list[Worker]: currently registered workers
+ """
+ response = self.make_request("GET", self._workers_url())
+ response_json = response.json()
+
+ return [
+ Worker(worker["address"], identifier=worker["identifier"])
+ for worker in response_json
+ ]
diff --git a/src/ahriman/core/distributed/worker_loader_trigger.py b/src/ahriman/core/distributed/worker_loader_trigger.py
new file mode 100644
index 00000000..f04fcd01
--- /dev/null
+++ b/src/ahriman/core/distributed/worker_loader_trigger.py
@@ -0,0 +1,37 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+from ahriman.core.distributed.distributed_system import DistributedSystem
+
+
+class WorkerLoaderTrigger(DistributedSystem):
+ """
+ remote worker processor trigger (server side)
+ """
+
+ def on_start(self) -> None:
+ """
+ trigger action which will be called at the start of the application
+ """
+ if self.configuration.has_option("build", "workers"):
+ return # there is manually set option
+
+ workers = [worker.address for worker in self.workers()]
+ self.logger.info("load workers %s", workers)
+ self.configuration.set_option("build", "workers", " ".join(workers))
diff --git a/src/ahriman/core/distributed/worker_register_trigger.py b/src/ahriman/core/distributed/worker_register_trigger.py
new file mode 100644
index 00000000..03d94b37
--- /dev/null
+++ b/src/ahriman/core/distributed/worker_register_trigger.py
@@ -0,0 +1,32 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+from ahriman.core.distributed.distributed_system import DistributedSystem
+
+
+class WorkerRegisterTrigger(DistributedSystem):
+ """
+ remote worker registration trigger
+ """
+
+ def on_start(self) -> None:
+ """
+ trigger action which will be called at the start of the application
+ """
+ self.register(force=True)
diff --git a/src/ahriman/core/distributed/worker_trigger.py b/src/ahriman/core/distributed/worker_trigger.py
new file mode 100644
index 00000000..87905b5c
--- /dev/null
+++ b/src/ahriman/core/distributed/worker_trigger.py
@@ -0,0 +1,38 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+from ahriman.core.distributed.distributed_system import DistributedSystem
+
+
+class WorkerTrigger(DistributedSystem):
+ """
+ remote worker processor trigger (client side)
+ """
+
+ def on_start(self) -> None:
+ """
+ trigger action which will be called at the start of the application
+ """
+ self.register()
+
+ def on_stop(self) -> None:
+ """
+ trigger action which will be called before the stop of the application
+ """
+ self.unregister()
diff --git a/src/ahriman/core/distributed/worker_unregister_trigger.py b/src/ahriman/core/distributed/worker_unregister_trigger.py
new file mode 100644
index 00000000..33cac3fc
--- /dev/null
+++ b/src/ahriman/core/distributed/worker_unregister_trigger.py
@@ -0,0 +1,32 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+from ahriman.core.distributed.distributed_system import DistributedSystem
+
+
+class WorkerUnregisterTrigger(DistributedSystem):
+ """
+ remote worker registration trigger
+ """
+
+ def on_start(self) -> None:
+ """
+ trigger action which will be called at the start of the application
+ """
+ self.unregister(force=True)
diff --git a/src/ahriman/core/http/sync_http_client.py b/src/ahriman/core/http/sync_http_client.py
index 6b670c31..5f919e49 100644
--- a/src/ahriman/core/http/sync_http_client.py
+++ b/src/ahriman/core/http/sync_http_client.py
@@ -47,8 +47,8 @@ class SyncHttpClient(LazyLogging):
default constructor
Args:
- configuration(Configuration | None): configuration instance (Default value = None)
- section(str, optional): settings section name (Default value = None)
+ configuration(Configuration | None, optional): configuration instance (Default value = None)
+ section(str | None, optional): settings section name (Default value = None)
suppress_errors(bool, optional): suppress logging of request errors (Default value = False)
"""
if configuration is None:
diff --git a/src/ahriman/core/status/watcher.py b/src/ahriman/core/status/watcher.py
index aada9e29..81bc5903 100644
--- a/src/ahriman/core/status/watcher.py
+++ b/src/ahriman/core/status/watcher.py
@@ -26,6 +26,7 @@ from ahriman.models.log_record_id import LogRecordId
from ahriman.models.package import Package
from ahriman.models.pkgbuild_patch import PkgbuildPatch
from ahriman.models.repository_id import RepositoryId
+from ahriman.models.worker import Worker
class Watcher(LazyLogging):
@@ -223,3 +224,30 @@ class Watcher(LazyLogging):
status(BuildStatusEnum): new service status
"""
self.status = BuildStatus(status)
+
+ def workers_get(self) -> list[Worker]:
+ """
+ retrieve registered remote workers
+
+ Returns:
+ list[Worker]: list of currently available workers
+ """
+ return self.database.workers_get()
+
+ def workers_remove(self, identifier: str | None = None) -> None:
+ """
+ unregister remote worker
+
+ Args:
+ identifier(str | None, optional): remote worker identifier if any (Default value = None)
+ """
+ self.database.workers_remove(identifier)
+
+ def workers_update(self, worker: Worker) -> None:
+ """
+ register or update remote worker
+
+ Args:
+ worker(Worker): worker to register
+ """
+ self.database.workers_insert(worker)
diff --git a/src/ahriman/core/status/web_client.py b/src/ahriman/core/status/web_client.py
index e32214ba..a56e6cde 100644
--- a/src/ahriman/core/status/web_client.py
+++ b/src/ahriman/core/status/web_client.py
@@ -118,7 +118,6 @@ class WebClient(Client, SyncAhrimanClient):
Returns:
str: full url of web service for specific package base
"""
- # in case if unix socket is used we need to normalize url
suffix = f"/{package_base}" if package_base else ""
return f"{self.address}/api/v1/packages{suffix}"
diff --git a/src/ahriman/models/worker.py b/src/ahriman/models/worker.py
index 72215ff1..3eaf3ff9 100644
--- a/src/ahriman/models/worker.py
+++ b/src/ahriman/models/worker.py
@@ -18,8 +18,11 @@
# along with this program. If not, see .
#
from dataclasses import dataclass, field
+from typing import Any
from urllib.parse import urlparse
+from ahriman.core.util import dataclass_view
+
@dataclass(frozen=True)
class Worker:
@@ -39,3 +42,12 @@ class Worker:
update identifier based on settings
"""
object.__setattr__(self, "identifier", self.identifier or urlparse(self.address).netloc)
+
+ def view(self) -> dict[str, Any]:
+ """
+ generate json patch view
+
+ Returns:
+ dict[str, Any]: json-friendly dictionary
+ """
+ return dataclass_view(self)
diff --git a/src/ahriman/web/schemas/__init__.py b/src/ahriman/web/schemas/__init__.py
index 998d6ee6..de61f3cb 100644
--- a/src/ahriman/web/schemas/__init__.py
+++ b/src/ahriman/web/schemas/__init__.py
@@ -47,5 +47,7 @@ from ahriman.web.schemas.remote_schema import RemoteSchema
from ahriman.web.schemas.repository_id_schema import RepositoryIdSchema
from ahriman.web.schemas.search_schema import SearchSchema
from ahriman.web.schemas.status_schema import StatusSchema
-from ahriman.web.schemas.versioned_log_schema import VersionedLogSchema
from ahriman.web.schemas.update_flags_schema import UpdateFlagsSchema
+from ahriman.web.schemas.versioned_log_schema import VersionedLogSchema
+from ahriman.web.schemas.worker_id_schema import WorkerIdSchema
+from ahriman.web.schemas.worker_schema import WorkerSchema
diff --git a/src/ahriman/web/schemas/worker_id_schema.py b/src/ahriman/web/schemas/worker_id_schema.py
new file mode 100644
index 00000000..fb14b7c7
--- /dev/null
+++ b/src/ahriman/web/schemas/worker_id_schema.py
@@ -0,0 +1,31 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+from marshmallow import Schema, fields
+
+
+class WorkerIdSchema(Schema):
+ """
+ request and response schema for workers
+ """
+
+ identifier = fields.String(required=True, metadata={
+ "description": "Worker unique identifier",
+ "example": "42f03a62-48f7-46b7-af40-dacc720e92fa",
+ })
diff --git a/src/ahriman/web/schemas/worker_schema.py b/src/ahriman/web/schemas/worker_schema.py
new file mode 100644
index 00000000..1051f99d
--- /dev/null
+++ b/src/ahriman/web/schemas/worker_schema.py
@@ -0,0 +1,33 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+from marshmallow import fields
+
+from ahriman.web.schemas.worker_id_schema import WorkerIdSchema
+
+
+class WorkerSchema(WorkerIdSchema):
+ """
+ request and response schema for workers
+ """
+
+ address = fields.String(required=True, metadata={
+ "description": "Worker address",
+ "example": "http://localhost:8081",
+ })
diff --git a/src/ahriman/web/views/v1/distributed/__init__.py b/src/ahriman/web/views/v1/distributed/__init__.py
new file mode 100644
index 00000000..8fc622e9
--- /dev/null
+++ b/src/ahriman/web/views/v1/distributed/__init__.py
@@ -0,0 +1,19 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
diff --git a/src/ahriman/web/views/v1/distributed/worker.py b/src/ahriman/web/views/v1/distributed/worker.py
new file mode 100644
index 00000000..fa3a23ff
--- /dev/null
+++ b/src/ahriman/web/views/v1/distributed/worker.py
@@ -0,0 +1,99 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+import aiohttp_apispec # type: ignore[import-untyped]
+
+from aiohttp.web import HTTPNoContent, HTTPNotFound, Response, json_response
+
+from ahriman.models.user_access import UserAccess
+from ahriman.web.schemas import AuthSchema, ErrorSchema, WorkerIdSchema, WorkerSchema
+from ahriman.web.views.base import BaseView
+
+
+class WorkerView(BaseView):
+ """
+ distributed worker view
+
+ Attributes:
+ DELETE_PERMISSION(UserAccess): (class attribute) delete permissions of self
+ GET_PERMISSION(UserAccess): (class attribute) get permissions of self
+ """
+
+ DELETE_PERMISSION = GET_PERMISSION = UserAccess.Full
+ ROUTES = ["/api/v1/distributed/{identifier}"]
+
+ @aiohttp_apispec.docs(
+ tags=["Distributed"],
+ summary="Unregister worker",
+ description="Unregister worker and remove it from the service",
+ responses={
+ 204: {"description": "Success response"},
+ 401: {"description": "Authorization required", "schema": ErrorSchema},
+ 403: {"description": "Access is forbidden", "schema": ErrorSchema},
+ 500: {"description": "Internal server error", "schema": ErrorSchema},
+ },
+ security=[{"token": [DELETE_PERMISSION]}],
+ )
+ @aiohttp_apispec.cookies_schema(AuthSchema)
+ @aiohttp_apispec.match_info_schema(WorkerIdSchema)
+ async def delete(self) -> None:
+ """
+ unregister worker
+
+ Raises:
+ HTTPNoContent: on success response
+ """
+ identifier = self.request.match_info["identifier"]
+ self.service().workers_remove(identifier)
+
+ raise HTTPNoContent
+
+ @aiohttp_apispec.docs(
+ tags=["Distributed"],
+ summary="Get worker",
+ description="Retrieve registered worker by its identifier",
+ responses={
+ 200: {"description": "Success response", "schema": WorkerSchema(many=True)},
+ 401: {"description": "Authorization required", "schema": ErrorSchema},
+ 403: {"description": "Access is forbidden", "schema": ErrorSchema},
+ 404: {"description": "Worker is unknown", "schema": ErrorSchema},
+ 500: {"description": "Internal server error", "schema": ErrorSchema},
+ },
+ security=[{"token": [GET_PERMISSION]}],
+ )
+ @aiohttp_apispec.cookies_schema(AuthSchema)
+ @aiohttp_apispec.match_info_schema(WorkerIdSchema)
+ async def get(self) -> Response:
+ """
+ get worker by identifier
+
+ Returns:
+ Response: 200 with workers list on success
+
+ Raises:
+ HTTPNotFound: if no worker was found
+ """
+ identifier = self.request.match_info["identifier"]
+
+ try:
+ worker = next(worker for worker in self.service().workers_get() if worker.identifier == identifier)
+ except StopIteration:
+ raise HTTPNotFound(reason=f"Worker {identifier} not found")
+
+ return json_response([worker.view()])
diff --git a/src/ahriman/web/views/v1/distributed/workers.py b/src/ahriman/web/views/v1/distributed/workers.py
new file mode 100644
index 00000000..fe5b5f0a
--- /dev/null
+++ b/src/ahriman/web/views/v1/distributed/workers.py
@@ -0,0 +1,126 @@
+#
+# Copyright (c) 2021-2023 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+import aiohttp_apispec # type: ignore[import-untyped]
+
+from collections.abc import Callable
+from aiohttp.web import HTTPBadRequest, HTTPNoContent, Response, json_response
+
+from ahriman.models.user_access import UserAccess
+from ahriman.models.worker import Worker
+from ahriman.web.schemas import AuthSchema, ErrorSchema, WorkerSchema
+from ahriman.web.views.base import BaseView
+
+
+class WorkersView(BaseView):
+ """
+ distributed workers view
+
+ Attributes:
+ DELETE_PERMISSION(UserAccess): (class attribute) delete permissions of self
+ GET_PERMISSION(UserAccess): (class attribute) get permissions of self
+ POST_PERMISSION(UserAccess): (class attribute) post permissions of self
+ """
+
+ DELETE_PERMISSION = GET_PERMISSION = POST_PERMISSION = UserAccess.Full
+ ROUTES = ["/api/v1/distributed"]
+
+ @aiohttp_apispec.docs(
+ tags=["Distributed"],
+ summary="Unregister all workers",
+ description="Unregister and remove all known workers from the service",
+ responses={
+ 204: {"description": "Success response"},
+ 401: {"description": "Authorization required", "schema": ErrorSchema},
+ 403: {"description": "Access is forbidden", "schema": ErrorSchema},
+ 500: {"description": "Internal server error", "schema": ErrorSchema},
+ },
+ security=[{"token": [DELETE_PERMISSION]}],
+ )
+ @aiohttp_apispec.cookies_schema(AuthSchema)
+ async def delete(self) -> None:
+ """
+ unregister worker
+
+ Raises:
+ HTTPNoContent: on success response
+ """
+ self.service().workers_remove()
+
+ raise HTTPNoContent
+
+ @aiohttp_apispec.docs(
+ tags=["Distributed"],
+ summary="Get workers",
+ description="Retrieve registered workers",
+ responses={
+ 200: {"description": "Success response", "schema": WorkerSchema(many=True)},
+ 401: {"description": "Authorization required", "schema": ErrorSchema},
+ 403: {"description": "Access is forbidden", "schema": ErrorSchema},
+ 500: {"description": "Internal server error", "schema": ErrorSchema},
+ },
+ security=[{"token": [GET_PERMISSION]}],
+ )
+ @aiohttp_apispec.cookies_schema(AuthSchema)
+ async def get(self) -> Response:
+ """
+ get workers list
+
+ Returns:
+ Response: 200 with workers list on success
+ """
+ workers = self.service().workers_get()
+
+ comparator: Callable[[Worker], str] = lambda item: item.identifier
+ response = [worker.view() for worker in sorted(workers, key=comparator)]
+
+ return json_response(response)
+
+ @aiohttp_apispec.docs(
+ tags=["Distributed"],
+ summary="Register worker",
+ description="Register or update remote worker",
+ responses={
+ 204: {"description": "Success response"},
+ 400: {"description": "Bad data is supplied", "schema": ErrorSchema},
+ 401: {"description": "Authorization required", "schema": ErrorSchema},
+ 403: {"description": "Access is forbidden", "schema": ErrorSchema},
+ 500: {"description": "Internal server error", "schema": ErrorSchema},
+ },
+ security=[{"token": [POST_PERMISSION]}],
+ )
+ @aiohttp_apispec.cookies_schema(AuthSchema)
+ @aiohttp_apispec.json_schema(WorkerSchema)
+ async def post(self) -> None:
+ """
+ register remote worker
+
+ Raises:
+ HTTPBadRequest: if bad data is supplied
+ HTTPNoContent: in case of success response
+ """
+ try:
+ data = await self.request.json()
+ worker = Worker(data["address"], identifier=data["identifier"])
+ except Exception as ex:
+ raise HTTPBadRequest(reason=str(ex))
+
+ self.service().workers_update(worker)
+
+ raise HTTPNoContent
diff --git a/tests/ahriman/application/handlers/test_handler_web.py b/tests/ahriman/application/handlers/test_handler_web.py
index 79376ee6..fe2861c2 100644
--- a/tests/ahriman/application/handlers/test_handler_web.py
+++ b/tests/ahriman/application/handlers/test_handler_web.py
@@ -39,6 +39,7 @@ def test_run(args: argparse.Namespace, configuration: Configuration, repository:
setup_mock = mocker.patch("ahriman.web.web.setup_server")
run_mock = mocker.patch("ahriman.web.web.run_server")
start_mock = mocker.patch("ahriman.core.spawn.Spawn.start")
+ trigger_mock = mocker.patch("ahriman.core.triggers.TriggerLoader.load")
stop_mock = mocker.patch("ahriman.core.spawn.Spawn.stop")
join_mock = mocker.patch("ahriman.core.spawn.Spawn.join")
_, repository_id = configuration.check_loaded()
@@ -48,6 +49,8 @@ def test_run(args: argparse.Namespace, configuration: Configuration, repository:
setup_mock.assert_called_once_with(configuration, pytest.helpers.anyvar(int), [repository_id])
run_mock.assert_called_once_with(pytest.helpers.anyvar(int))
start_mock.assert_called_once_with()
+ trigger_mock.assert_called_once_with(repository_id, configuration)
+ trigger_mock().on_start.assert_called_once_with()
stop_mock.assert_called_once_with()
join_mock.assert_called_once_with()
diff --git a/tests/ahriman/application/test_ahriman.py b/tests/ahriman/application/test_ahriman.py
index d4121150..428137be 100644
--- a/tests/ahriman/application/test_ahriman.py
+++ b/tests/ahriman/application/test_ahriman.py
@@ -1320,6 +1320,80 @@ def test_subparsers_service_tree_migrate(parser: argparse.ArgumentParser) -> Non
assert not args.report
+def test_subparsers_service_worker_register(parser: argparse.ArgumentParser) -> None:
+ """
+ service-worker-register command must imply trigger
+ """
+ args = parser.parse_args(["service-worker-register"])
+ assert args.trigger == ["ahriman.core.distributed.WorkerRegisterTrigger"]
+
+
+def test_subparsers_service_worker_register_option_architecture(parser: argparse.ArgumentParser) -> None:
+ """
+ service-worker-register command must correctly parse architecture list
+ """
+ args = parser.parse_args(["service-worker-register"])
+ assert args.architecture is None
+ args = parser.parse_args(["-a", "x86_64", "service-worker-register"])
+ assert args.architecture == "x86_64"
+
+
+def test_subparsers_service_worker_register_option_repository(parser: argparse.ArgumentParser) -> None:
+ """
+ service-worker-register command must correctly parse repository list
+ """
+ args = parser.parse_args(["service-worker-register"])
+ assert args.repository is None
+ args = parser.parse_args(["-r", "repo", "service-worker-register"])
+ assert args.repository == "repo"
+
+
+def test_subparsers_service_worker_register_repo_triggers(parser: argparse.ArgumentParser) -> None:
+ """
+ service-worker-register must have same keys as repo-triggers
+ """
+ args = parser.parse_args(["service-worker-register"])
+ reference_args = parser.parse_args(["repo-triggers"])
+ assert dir(args) == dir(reference_args)
+
+
+def test_subparsers_service_worker_unregister(parser: argparse.ArgumentParser) -> None:
+ """
+ service-worker-unregister command must imply trigger
+ """
+ args = parser.parse_args(["service-worker-unregister"])
+ assert args.trigger == ["ahriman.core.distributed.WorkerUnregisterTrigger"]
+
+
+def test_subparsers_service_worker_unregister_option_architecture(parser: argparse.ArgumentParser) -> None:
+ """
+ service-worker-unregister command must correctly parse architecture list
+ """
+ args = parser.parse_args(["service-worker-unregister"])
+ assert args.architecture is None
+ args = parser.parse_args(["-a", "x86_64", "service-worker-unregister"])
+ assert args.architecture == "x86_64"
+
+
+def test_subparsers_service_worker_unregister_option_repository(parser: argparse.ArgumentParser) -> None:
+ """
+ service-worker-unregister command must correctly parse repository list
+ """
+ args = parser.parse_args(["service-worker-unregister"])
+ assert args.repository is None
+ args = parser.parse_args(["-r", "repo", "service-worker-unregister"])
+ assert args.repository == "repo"
+
+
+def test_subparsers_service_worker_unregister_repo_triggers(parser: argparse.ArgumentParser) -> None:
+ """
+ service-worker-unregister must have same keys as repo-triggers
+ """
+ args = parser.parse_args(["service-worker-unregister"])
+ reference_args = parser.parse_args(["repo-triggers"])
+ assert dir(args) == dir(reference_args)
+
+
def test_subparsers_user_add(parser: argparse.ArgumentParser) -> None:
"""
user-add command must imply action, architecture, exit code, lock, quiet, report and repository
diff --git a/tests/ahriman/core/database/migrations/test_m013_workers.py b/tests/ahriman/core/database/migrations/test_m013_workers.py
new file mode 100644
index 00000000..ed1a3ab6
--- /dev/null
+++ b/tests/ahriman/core/database/migrations/test_m013_workers.py
@@ -0,0 +1,8 @@
+from ahriman.core.database.migrations.m013_workers import steps
+
+
+def test_migration_workers() -> None:
+ """
+ migration must not be empty
+ """
+ assert steps
diff --git a/tests/ahriman/core/database/operations/test_workers_operations.py b/tests/ahriman/core/database/operations/test_workers_operations.py
new file mode 100644
index 00000000..0835a5c9
--- /dev/null
+++ b/tests/ahriman/core/database/operations/test_workers_operations.py
@@ -0,0 +1,46 @@
+from ahriman.core.database import SQLite
+from ahriman.models.worker import Worker
+
+
+def test_workers_get_insert(database: SQLite) -> None:
+ """
+ must insert workers to database
+ """
+ database.workers_insert(Worker("address1", identifier="1"))
+ database.workers_insert(Worker("address2", identifier="2"))
+ assert database.workers_get() == [
+ Worker("address1", identifier="1"), Worker("address2", identifier="2")
+ ]
+
+
+def test_workers_insert_remove(database: SQLite) -> None:
+ """
+ must remove worker from database
+ """
+ database.workers_insert(Worker("address1", identifier="1"))
+ database.workers_insert(Worker("address2", identifier="2"))
+ database.workers_remove("1")
+
+ assert database.workers_get() == [Worker("address2", identifier="2")]
+
+
+def test_workers_insert_remove_all(database: SQLite) -> None:
+ """
+ must remove all workers
+ """
+ database.workers_insert(Worker("address1", identifier="1"))
+ database.workers_insert(Worker("address2", identifier="2"))
+ database.workers_remove()
+
+ assert database.workers_get() == []
+
+
+def test_workers_insert_insert(database: SQLite) -> None:
+ """
+ must update worker in database
+ """
+ database.workers_insert(Worker("address1", identifier="1"))
+ assert database.workers_get() == [Worker("address1", identifier="1")]
+
+ database.workers_insert(Worker("address2", identifier="1"))
+ assert database.workers_get() == [Worker("address2", identifier="1")]
diff --git a/tests/ahriman/core/distributed/conftest.py b/tests/ahriman/core/distributed/conftest.py
new file mode 100644
index 00000000..9f7ddb87
--- /dev/null
+++ b/tests/ahriman/core/distributed/conftest.py
@@ -0,0 +1,20 @@
+import pytest
+
+from ahriman.core.configuration import Configuration
+from ahriman.core.distributed.distributed_system import DistributedSystem
+
+
+@pytest.fixture
+def distributed_system(configuration: Configuration) -> DistributedSystem:
+ """
+ distributed system fixture
+
+ Args:
+ configuration(Configuration): configuration fixture
+
+ Returns:
+ DistributedSystem: distributed system test instance
+ """
+ configuration.set_option("status", "address", "http://localhost:8081")
+ _, repository_id = configuration.check_loaded()
+ return DistributedSystem(repository_id, configuration)
diff --git a/tests/ahriman/core/distributed/test_distributed_system.py b/tests/ahriman/core/distributed/test_distributed_system.py
new file mode 100644
index 00000000..d5c8e9f1
--- /dev/null
+++ b/tests/ahriman/core/distributed/test_distributed_system.py
@@ -0,0 +1,176 @@
+import json
+import requests
+
+from pytest_mock import MockerFixture
+
+from ahriman.core.configuration import Configuration
+from ahriman.core.distributed.distributed_system import DistributedSystem
+from ahriman.models.worker import Worker
+
+
+def test_identifier_path(configuration: Configuration) -> None:
+ """
+ must correctly set default identifier path
+ """
+ configuration.set_option("status", "address", "http://localhost:8081")
+ _, repository_id = configuration.check_loaded()
+ assert DistributedSystem(repository_id, configuration).identifier_path
+
+
+def test_configuration_sections(configuration: Configuration) -> None:
+ """
+ must correctly parse target list
+ """
+ assert DistributedSystem.configuration_sections(configuration) == ["worker"]
+
+
+def test_workers_url(distributed_system: DistributedSystem) -> None:
+ """
+ must generate workers url correctly
+ """
+ assert distributed_system._workers_url().startswith(distributed_system.address)
+ assert distributed_system._workers_url().endswith("/api/v1/distributed")
+
+ assert distributed_system._workers_url("id").startswith(distributed_system.address)
+ assert distributed_system._workers_url("id").endswith("/api/v1/distributed/id")
+
+
+def test_load_identifier(configuration: Configuration, mocker: MockerFixture) -> None:
+ """
+ must generate identifier
+ """
+ mocker.patch("pathlib.Path.is_file", return_value=False)
+ configuration.set_option("status", "address", "http://localhost:8081")
+ _, repository_id = configuration.check_loaded()
+ system = DistributedSystem(repository_id, configuration)
+
+ assert system.load_identifier(configuration, "worker")
+
+
+def test_load_identifier_configuration(configuration: Configuration, mocker: MockerFixture) -> None:
+ """
+ must load identifier from configuration
+ """
+ identifier = "id"
+ mocker.patch("pathlib.Path.is_file", return_value=False)
+ configuration.set_option("worker", "identifier", identifier)
+ configuration.set_option("status", "address", "http://localhost:8081")
+ _, repository_id = configuration.check_loaded()
+ system = DistributedSystem(repository_id, configuration)
+
+ assert system.worker.identifier == identifier
+
+
+def test_load_identifier_filesystem(configuration: Configuration, mocker: MockerFixture) -> None:
+ """
+ must load identifier from filesystem
+ """
+ identifier = "id"
+ mocker.patch("pathlib.Path.is_file", return_value=True)
+ read_mock = mocker.patch("pathlib.Path.read_text", return_value=identifier)
+ configuration.set_option("status", "address", "http://localhost:8081")
+ _, repository_id = configuration.check_loaded()
+ system = DistributedSystem(repository_id, configuration)
+
+ assert system.worker.identifier == identifier
+ read_mock.assert_called_once_with(encoding="utf8")
+
+
+def test_register(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
+ """
+ must register service
+ """
+ mocker.patch("pathlib.Path.is_file", return_value=False)
+ run_mock = mocker.patch("ahriman.core.distributed.distributed_system.DistributedSystem.make_request")
+ write_mock = mocker.patch("pathlib.Path.write_text")
+
+ distributed_system.register()
+ run_mock.assert_called_once_with("POST", f"{distributed_system.address}/api/v1/distributed",
+ json=distributed_system.worker.view())
+ write_mock.assert_called_once_with(distributed_system.worker.identifier, encoding="utf8")
+ assert distributed_system._owe_identifier
+
+
+def test_register_skip(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
+ """
+ must skip service registration if it doesn't owe the identifier
+ """
+ mocker.patch("pathlib.Path.is_file", return_value=True)
+ run_mock = mocker.patch("ahriman.core.distributed.distributed_system.DistributedSystem.make_request")
+ write_mock = mocker.patch("pathlib.Path.write_text")
+
+ distributed_system.register()
+ run_mock.assert_not_called()
+ write_mock.assert_not_called()
+ assert not distributed_system._owe_identifier
+
+
+def test_register_force(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
+ """
+ must register service even if it doesn't owe the identifier if force is supplied
+ """
+ mocker.patch("pathlib.Path.is_file", return_value=True)
+ run_mock = mocker.patch("ahriman.core.distributed.distributed_system.DistributedSystem.make_request")
+ write_mock = mocker.patch("pathlib.Path.write_text")
+
+ distributed_system.register(force=True)
+ run_mock.assert_called_once_with("POST", f"{distributed_system.address}/api/v1/distributed",
+ json=distributed_system.worker.view())
+ write_mock.assert_called_once_with(distributed_system.worker.identifier, encoding="utf8")
+ assert distributed_system._owe_identifier
+
+
+def test_unregister(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
+ """
+ must unregister service
+ """
+ run_mock = mocker.patch("ahriman.core.distributed.distributed_system.DistributedSystem.make_request")
+ remove_mock = mocker.patch("pathlib.Path.unlink")
+ distributed_system._owe_identifier = True
+
+ distributed_system.unregister()
+ run_mock.assert_called_once_with(
+ "DELETE", f"{distributed_system.address}/api/v1/distributed/{distributed_system.worker.identifier}")
+ remove_mock.assert_called_once_with(missing_ok=True)
+
+
+def test_unregister_skip(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
+ """
+ must skip service removal if it doesn't owe the identifier
+ """
+ run_mock = mocker.patch("ahriman.core.distributed.distributed_system.DistributedSystem.make_request")
+ remove_mock = mocker.patch("pathlib.Path.unlink")
+
+ distributed_system.unregister()
+ run_mock.assert_not_called()
+ remove_mock.assert_not_called()
+
+
+def test_unregister_force(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
+ """
+ must remove service even if it doesn't owe the identifier if force is supplied
+ """
+ run_mock = mocker.patch("ahriman.core.distributed.distributed_system.DistributedSystem.make_request")
+ remove_mock = mocker.patch("pathlib.Path.unlink")
+
+ distributed_system.unregister(force=True)
+ run_mock.assert_called_once_with(
+ "DELETE", f"{distributed_system.address}/api/v1/distributed/{distributed_system.worker.identifier}")
+ remove_mock.assert_called_once_with(missing_ok=True)
+
+
+def test_workers_get(distributed_system: DistributedSystem, mocker: MockerFixture) -> None:
+ """
+ must return available remote workers
+ """
+ worker = Worker("remote")
+ response_obj = requests.Response()
+ response_obj._content = json.dumps([worker.view()]).encode("utf8")
+ response_obj.status_code = 200
+
+ requests_mock = mocker.patch("ahriman.core.status.web_client.WebClient.make_request",
+ return_value=response_obj)
+
+ result = distributed_system.workers()
+ requests_mock.assert_called_once_with("GET", distributed_system._workers_url())
+ assert result == [worker]
diff --git a/tests/ahriman/core/distributed/test_worker_loader_trigger.py b/tests/ahriman/core/distributed/test_worker_loader_trigger.py
new file mode 100644
index 00000000..7a1200da
--- /dev/null
+++ b/tests/ahriman/core/distributed/test_worker_loader_trigger.py
@@ -0,0 +1,34 @@
+from pytest_mock import MockerFixture
+
+from ahriman.core.configuration import Configuration
+from ahriman.core.distributed import WorkerLoaderTrigger
+from ahriman.models.worker import Worker
+
+
+def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
+ """
+ must load workers from remote
+ """
+ worker = Worker("address")
+ configuration.set_option("status", "address", "http://localhost:8081")
+ run_mock = mocker.patch("ahriman.core.distributed.WorkerLoaderTrigger.workers", return_value=[worker])
+ _, repository_id = configuration.check_loaded()
+
+ trigger = WorkerLoaderTrigger(repository_id, configuration)
+ trigger.on_start()
+ run_mock.assert_called_once_with()
+ assert configuration.getlist("build", "workers") == [worker.address]
+
+
+def test_on_start_skip(configuration: Configuration, mocker: MockerFixture) -> None:
+ """
+ must skip loading if option is already set
+ """
+ configuration.set_option("status", "address", "http://localhost:8081")
+ configuration.set_option("build", "workers", "address")
+ run_mock = mocker.patch("ahriman.core.distributed.WorkerLoaderTrigger.workers")
+ _, repository_id = configuration.check_loaded()
+
+ trigger = WorkerLoaderTrigger(repository_id, configuration)
+ trigger.on_start()
+ run_mock.assert_not_called()
diff --git a/tests/ahriman/core/distributed/test_worker_register_trigger.py b/tests/ahriman/core/distributed/test_worker_register_trigger.py
new file mode 100644
index 00000000..c8f1beb8
--- /dev/null
+++ b/tests/ahriman/core/distributed/test_worker_register_trigger.py
@@ -0,0 +1,17 @@
+from pytest_mock import MockerFixture
+
+from ahriman.core.configuration import Configuration
+from ahriman.core.distributed import WorkerRegisterTrigger
+
+
+def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
+ """
+ must register itself as worker
+ """
+ configuration.set_option("status", "address", "http://localhost:8081")
+ run_mock = mocker.patch("ahriman.core.distributed.WorkerRegisterTrigger.register")
+ _, repository_id = configuration.check_loaded()
+
+ trigger = WorkerRegisterTrigger(repository_id, configuration)
+ trigger.on_start()
+ run_mock.assert_called_once_with(force=True)
diff --git a/tests/ahriman/core/distributed/test_worker_trigger.py b/tests/ahriman/core/distributed/test_worker_trigger.py
new file mode 100644
index 00000000..bdbd904b
--- /dev/null
+++ b/tests/ahriman/core/distributed/test_worker_trigger.py
@@ -0,0 +1,30 @@
+from pytest_mock import MockerFixture
+
+from ahriman.core.configuration import Configuration
+from ahriman.core.distributed import WorkerTrigger
+
+
+def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
+ """
+ must register itself as worker
+ """
+ configuration.set_option("status", "address", "http://localhost:8081")
+ run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register")
+ _, repository_id = configuration.check_loaded()
+
+ trigger = WorkerTrigger(repository_id, configuration)
+ trigger.on_start()
+ run_mock.assert_called_once_with()
+
+
+def test_on_stop(configuration: Configuration, mocker: MockerFixture) -> None:
+ """
+ must unregister itself as worker
+ """
+ configuration.set_option("status", "address", "http://localhost:8081")
+ run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.unregister")
+ _, repository_id = configuration.check_loaded()
+
+ trigger = WorkerTrigger(repository_id, configuration)
+ trigger.on_stop()
+ run_mock.assert_called_once_with()
diff --git a/tests/ahriman/core/distributed/test_worker_unregister_trigger.py b/tests/ahriman/core/distributed/test_worker_unregister_trigger.py
new file mode 100644
index 00000000..3a35f359
--- /dev/null
+++ b/tests/ahriman/core/distributed/test_worker_unregister_trigger.py
@@ -0,0 +1,17 @@
+from pytest_mock import MockerFixture
+
+from ahriman.core.configuration import Configuration
+from ahriman.core.distributed import WorkerUnregisterTrigger
+
+
+def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
+ """
+ must unregister itself as worker
+ """
+ configuration.set_option("status", "address", "http://localhost:8081")
+ run_mock = mocker.patch("ahriman.core.distributed.WorkerUnregisterTrigger.unregister")
+ _, repository_id = configuration.check_loaded()
+
+ trigger = WorkerUnregisterTrigger(repository_id, configuration)
+ trigger.on_start()
+ run_mock.assert_called_once_with(force=True)
diff --git a/tests/ahriman/core/status/test_watcher.py b/tests/ahriman/core/status/test_watcher.py
index ba36cfcb..fa86f939 100644
--- a/tests/ahriman/core/status/test_watcher.py
+++ b/tests/ahriman/core/status/test_watcher.py
@@ -10,6 +10,7 @@ from ahriman.models.changes import Changes
from ahriman.models.log_record_id import LogRecordId
from ahriman.models.package import Package
from ahriman.models.pkgbuild_patch import PkgbuildPatch
+from ahriman.models.worker import Worker
def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -227,3 +228,40 @@ def test_status_update(watcher: Watcher) -> None:
"""
watcher.status_update(BuildStatusEnum.Success)
assert watcher.status.status == BuildStatusEnum.Success
+
+
+def test_workers_get(watcher: Watcher, mocker: MockerFixture) -> None:
+ """
+ must retrieve workers
+ """
+ worker = Worker("remote")
+ worker_mock = mocker.patch("ahriman.core.database.SQLite.workers_get", return_value=[worker])
+
+ assert watcher.workers_get() == [worker]
+ worker_mock.assert_called_once_with()
+
+
+def test_workers_remove(watcher: Watcher, mocker: MockerFixture) -> None:
+ """
+ must remove workers
+ """
+ identifier = "identifier"
+ worker_mock = mocker.patch("ahriman.core.database.SQLite.workers_remove")
+
+ watcher.workers_remove(identifier)
+ watcher.workers_remove()
+ worker_mock.assert_has_calls([
+ MockCall(identifier),
+ MockCall(None),
+ ])
+
+
+def test_workers_update(watcher: Watcher, mocker: MockerFixture) -> None:
+ """
+ must update workers
+ """
+ worker = Worker("remote")
+ worker_mock = mocker.patch("ahriman.core.database.SQLite.workers_insert")
+
+ watcher.workers_update(worker)
+ worker_mock.assert_called_once_with(worker)
diff --git a/tests/ahriman/core/status/test_web_client.py b/tests/ahriman/core/status/test_web_client.py
index 47eaaa16..2ec25c38 100644
--- a/tests/ahriman/core/status/test_web_client.py
+++ b/tests/ahriman/core/status/test_web_client.py
@@ -12,6 +12,7 @@ from ahriman.models.changes import Changes
from ahriman.models.internal_status import InternalStatus
from ahriman.models.log_record_id import LogRecordId
from ahriman.models.package import Package
+from ahriman.models.worker import Worker
def test_parse_address(configuration: Configuration) -> None:
@@ -32,14 +33,6 @@ def test_parse_address(configuration: Configuration) -> None:
assert WebClient.parse_address(configuration) == ("status", "http://localhost:8082")
-def test_status_url(web_client: WebClient) -> None:
- """
- must generate package status url correctly
- """
- assert web_client._status_url().startswith(web_client.address)
- assert web_client._status_url().endswith("/api/v1/status")
-
-
def test_changes_url(web_client: WebClient, package_ahriman: Package) -> None:
"""
must generate changes url correctly
@@ -67,6 +60,14 @@ def test_package_url(web_client: WebClient, package_ahriman: Package) -> None:
assert web_client._package_url(package_ahriman.base).endswith(f"/api/v1/packages/{package_ahriman.base}")
+def test_status_url(web_client: WebClient) -> None:
+ """
+ must generate package status url correctly
+ """
+ assert web_client._status_url().startswith(web_client.address)
+ assert web_client._status_url().endswith("/api/v1/status")
+
+
def test_package_add(web_client: WebClient, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must process package addition
diff --git a/tests/ahriman/models/test_worker.py b/tests/ahriman/models/test_worker.py
index b6e5e72e..1200bf50 100644
--- a/tests/ahriman/models/test_worker.py
+++ b/tests/ahriman/models/test_worker.py
@@ -8,3 +8,17 @@ def test_post_init() -> None:
assert Worker("http://localhost:8080").identifier == "localhost:8080"
assert Worker("remote").identifier == "" # not a valid url
assert Worker("remote", identifier="id").identifier == "id"
+
+
+def test_view() -> None:
+ """
+ must generate json view
+ """
+ worker = Worker("address")
+ assert worker.view() == {"address": worker.address, "identifier": worker.identifier}
+
+ worker = Worker("http://localhost:8080")
+ assert worker.view() == {"address": worker.address, "identifier": worker.identifier}
+
+ worker = Worker("http://localhost:8080", identifier="abc")
+ assert worker.view() == {"address": worker.address, "identifier": worker.identifier}
diff --git a/tests/ahriman/web/schemas/test_worker_id_schema.py b/tests/ahriman/web/schemas/test_worker_id_schema.py
new file mode 100644
index 00000000..1982fb6b
--- /dev/null
+++ b/tests/ahriman/web/schemas/test_worker_id_schema.py
@@ -0,0 +1 @@
+# schema testing goes in view class tests
diff --git a/tests/ahriman/web/schemas/test_worker_schema.py b/tests/ahriman/web/schemas/test_worker_schema.py
new file mode 100644
index 00000000..1982fb6b
--- /dev/null
+++ b/tests/ahriman/web/schemas/test_worker_schema.py
@@ -0,0 +1 @@
+# schema testing goes in view class tests
diff --git a/tests/ahriman/web/views/v1/distributed/test_view_v1_distributed_worker.py b/tests/ahriman/web/views/v1/distributed/test_view_v1_distributed_worker.py
new file mode 100644
index 00000000..d3e2134d
--- /dev/null
+++ b/tests/ahriman/web/views/v1/distributed/test_view_v1_distributed_worker.py
@@ -0,0 +1,70 @@
+import pytest
+
+from aiohttp.test_utils import TestClient
+
+from ahriman.models.user_access import UserAccess
+from ahriman.models.worker import Worker
+from ahriman.web.views.v1.distributed.worker import WorkerView
+
+
+async def test_get_permission() -> None:
+ """
+ must return correct permission for the request
+ """
+ for method in ("DELETE", "GET"):
+ request = pytest.helpers.request("", "", method)
+ assert await WorkerView.get_permission(request) == UserAccess.Full
+
+
+def test_routes() -> None:
+ """
+ must return correct routes
+ """
+ assert WorkerView.ROUTES == ["/api/v1/distributed/{identifier}"]
+
+
+async def test_delete(client: TestClient) -> None:
+ """
+ must delete single worker
+ """
+ await client.post("/api/v1/distributed", json={"address": "address1", "identifier": "1"})
+ await client.post("/api/v1/distributed", json={"address": "address2", "identifier": "2"})
+
+ response = await client.delete("/api/v1/distributed/1")
+ assert response.status == 204
+
+ response = await client.get("/api/v1/distributed/1")
+ assert response.status == 404
+
+ response = await client.get("/api/v1/distributed/2")
+ assert response.ok
+
+
+async def test_get(client: TestClient) -> None:
+ """
+ must return specific worker
+ """
+ worker = Worker("address1", identifier="1")
+
+ await client.post("/api/v1/distributed", json=worker.view())
+ await client.post("/api/v1/distributed", json={"address": "address2", "identifier": "2"})
+ response_schema = pytest.helpers.schema_response(WorkerView.get)
+
+ response = await client.get(f"/api/v1/distributed/{worker.identifier}")
+ assert response.ok
+ json = await response.json()
+ assert not response_schema.validate(json, many=True)
+
+ workers = [Worker(item["address"], identifier=item["identifier"]) for item in json]
+ assert workers == [worker]
+
+
+async def test_get_not_found(client: TestClient) -> None:
+ """
+ must return Not Found for unknown package
+ """
+ response_schema = pytest.helpers.schema_response(WorkerView.get, code=404)
+
+ response = await client.get("/api/v1/distributed/1")
+ assert response.status == 404
+ assert not response_schema.validate(await response.json())
diff --git a/tests/ahriman/web/views/v1/distributed/test_view_v1_distributed_workers.py b/tests/ahriman/web/views/v1/distributed/test_view_v1_distributed_workers.py
new file mode 100644
index 00000000..397e37b2
--- /dev/null
+++ b/tests/ahriman/web/views/v1/distributed/test_view_v1_distributed_workers.py
@@ -0,0 +1,83 @@
+import pytest
+
+from aiohttp.test_utils import TestClient
+
+from ahriman.models.user_access import UserAccess
+from ahriman.models.worker import Worker
+from ahriman.web.views.v1.distributed.workers import WorkersView
+
+
+async def test_get_permission() -> None:
+ """
+ must return correct permission for the request
+ """
+ for method in ("DELETE", "GET", "POST"):
+ request = pytest.helpers.request("", "", method)
+ assert await WorkersView.get_permission(request) == UserAccess.Full
+
+
+def test_routes() -> None:
+ """
+ must return correct routes
+ """
+ assert WorkersView.ROUTES == ["/api/v1/distributed"]
+
+
+async def test_delete(client: TestClient) -> None:
+ """
+ must delete all workers
+ """
+ await client.post("/api/v1/distributed", json={"address": "address1", "identifier": "1"})
+ await client.post("/api/v1/distributed", json={"address": "address2", "identifier": "2"})
+
+ response = await client.delete("/api/v1/distributed")
+ assert response.status == 204
+
+ response = await client.get("/api/v1/distributed")
+ json = await response.json()
+ assert not json
+
+
+async def test_get(client: TestClient) -> None:
+ """
+ must return all workers
+ """
+ await client.post("/api/v1/distributed", json={"address": "address1", "identifier": "1"})
+ await client.post("/api/v1/distributed", json={"address": "address2", "identifier": "2"})
+ response_schema = pytest.helpers.schema_response(WorkersView.get)
+
+ response = await client.get("/api/v1/distributed")
+ assert response.ok
+ json = await response.json()
+ assert not response_schema.validate(json, many=True)
+
+ workers = [Worker(item["address"], identifier=item["identifier"]) for item in json]
+ assert workers == [Worker("address1", identifier="1"), Worker("address2", identifier="2")]
+
+
+async def test_post(client: TestClient) -> None:
+ """
+ must update worker
+ """
+ worker = Worker("address1", identifier="1")
+ request_schema = pytest.helpers.schema_request(WorkersView.post)
+
+ payload = worker.view()
+ assert not request_schema.validate(payload)
+
+ response = await client.post("/api/v1/distributed", json=payload)
+ assert response.status == 204
+
+ response = await client.get(f"/api/v1/distributed/{worker.identifier}")
+ assert response.ok
+
+
+async def test_post_exception(client: TestClient) -> None:
+ """
+ must raise exception on invalid payload
+ """
+ response_schema = pytest.helpers.schema_response(WorkersView.post, code=400)
+
+ response = await client.post("/api/v1/distributed", json={})
+ assert response.status == 400
+ assert not response_schema.validate(await response.json())
diff --git a/tests/testresources/core/ahriman.ini b/tests/testresources/core/ahriman.ini
index 21aba0e9..904cac17 100644
--- a/tests/testresources/core/ahriman.ini
+++ b/tests/testresources/core/ahriman.ini
@@ -115,4 +115,7 @@ username = arcan1s
enable_archive_upload = yes
host = 127.0.0.1
static_path = ../web/templates/static
-templates = ../web/templates
\ No newline at end of file
+templates = ../web/templates
+
+[worker]
+address = http://localhost:8081