From d2aed850b63e9b302a8c536296b4a7cceb49d01b Mon Sep 17 00:00:00 2001 From: Evgenii Alekseev Date: Tue, 24 Mar 2026 02:23:14 +0200 Subject: [PATCH] event bus implementation --- docker/Dockerfile | 17 +- package/archlinux/PKGBUILD | 2 +- pyproject.toml | 1 + src/ahriman/core/status/event_bus.py | 101 +++++++ src/ahriman/core/status/watcher.py | 264 ++++++++++++++---- src/ahriman/models/event.py | 8 + .../web/views/v1/auditlog/event_bus.py | 70 +++++ src/ahriman/web/views/v1/auditlog/events.py | 4 +- src/ahriman/web/views/v1/packages/archives.py | 2 +- src/ahriman/web/views/v1/packages/changes.py | 4 +- .../web/views/v1/packages/dependencies.py | 4 +- src/ahriman/web/views/v1/packages/hold.py | 2 +- src/ahriman/web/views/v1/packages/logs.py | 8 +- src/ahriman/web/views/v1/packages/package.py | 8 +- src/ahriman/web/views/v1/packages/packages.py | 4 +- src/ahriman/web/views/v1/packages/patch.py | 4 +- src/ahriman/web/views/v1/packages/patches.py | 4 +- src/ahriman/web/views/v1/service/logs.py | 2 +- src/ahriman/web/views/v1/status/status.py | 4 +- src/ahriman/web/views/v2/packages/logs.py | 2 +- src/ahriman/web/web.py | 9 +- tests/ahriman/conftest.py | 4 +- tests/ahriman/core/status/test_watcher.py | 78 +++--- 23 files changed, 459 insertions(+), 147 deletions(-) create mode 100644 src/ahriman/core/status/event_bus.py create mode 100644 src/ahriman/web/views/v1/auditlog/event_bus.py diff --git a/docker/Dockerfile b/docker/Dockerfile index 061f9ff9..16dfdc23 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -29,17 +29,16 @@ RUN pacman -S --noconfirm --asdeps \ python-filelock \ python-inflection \ python-pyelftools \ - python-requests \ - && \ - pacman -S --noconfirm --asdeps \ + python-requests +RUN pacman -S --noconfirm --asdeps \ base-devel \ python-build \ python-flit \ python-installer \ + python-setuptools \ python-tox \ - python-wheel \ - && \ - pacman -S --noconfirm --asdeps \ + python-wheel +RUN pacman -S --noconfirm --asdeps \ git \ python-aiohttp \ python-aiohttp-openmetrics \ @@ -48,9 +47,8 @@ RUN pacman -S --noconfirm --asdeps \ python-cryptography \ python-jinja \ python-systemd \ - rsync \ - && \ - runuser -u build -- install-aur-package \ + rsync +RUN runuser -u build -- install-aur-package \ python-aioauth-client \ python-sphinx-typlog-theme \ python-webargs \ @@ -59,6 +57,7 @@ RUN pacman -S --noconfirm --asdeps \ python-aiohttp-jinja2 \ python-aiohttp-session \ python-aiohttp-security \ + python-aiohttp-sse-git \ python-requests-unixsocket2 # install ahriman diff --git a/package/archlinux/PKGBUILD b/package/archlinux/PKGBUILD index 84e5e3c6..322ec56e 100644 --- a/package/archlinux/PKGBUILD +++ b/package/archlinux/PKGBUILD @@ -77,7 +77,7 @@ package_ahriman-triggers() { package_ahriman-web() { pkgname='ahriman-web' pkgdesc="ArcH linux ReposItory MANager, web server" - depends=("$pkgbase-core=$pkgver" 'python-aiohttp-cors' 'python-aiohttp-jinja2') + depends=("$pkgbase-core=$pkgver" 'python-aiohttp-cors' 'python-aiohttp-jinja2' 'python-aiohttp-sse-git') optdepends=('python-aioauth-client: OAuth2 authorization support' 'python-aiohttp-apispec>=3.0.0: autogenerated API documentation' 'python-aiohttp-openmetrics: HTTP metrics support' diff --git a/pyproject.toml b/pyproject.toml index aae0ba62..9b706873 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ web = [ "aiohttp", "aiohttp_cors", "aiohttp_jinja2", + "aiohttp_sse", ] web-auth = [ "ahriman[web]", diff --git a/src/ahriman/core/status/event_bus.py b/src/ahriman/core/status/event_bus.py new file mode 100644 index 00000000..c28f3e8c --- /dev/null +++ b/src/ahriman/core/status/event_bus.py @@ -0,0 +1,101 @@ +# +# Copyright (c) 2021-2026 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import uuid + +from asyncio import Lock, Queue, QueueFull +from typing import Any + +from ahriman.core.log import LazyLogging +from ahriman.models.event import EventType + + +SSEvent = tuple[str, dict[str, Any]] + + +class EventBus(LazyLogging): + """ + event bus implementation + + Attributes: + max_size(int): maximum size of queue + """ + + def __init__(self, max_size: int) -> None: + """ + Args: + max_size(int): maximum size of queue + """ + self.max_size = max_size + + self._lock = Lock() + self._subscribers: dict[str, Queue[SSEvent | None]] = {} + + async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None: + """ + broadcast event to all subscribers + + Args: + event_type(EventType): event type + object_id(str | None): object identifier (e.g. package base) + **kwargs(Any): additional event data + """ + event: dict[str, Any] = {"object_id": object_id} + event.update(kwargs) + + async with self._lock: + for subscriber_id, queue in self._subscribers.items(): + try: + queue.put_nowait((event_type, event)) + except QueueFull: + self.logger.warning("discard message to slow subscriber %s", subscriber_id) + + async def shutdown(self) -> None: + """ + gracefully shutdown all subscribers + """ + async with self._lock: + for queue in self._subscribers.values(): + queue.put_nowait(None) + queue.shutdown() + + async def subscribe(self) -> tuple[str, Queue[SSEvent | None]]: + """ + register new subscriber to ``repository_id`` + + Returns: + tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue + """ + subscriber_id = str(uuid.uuid4()) + queue: Queue[SSEvent | None] = Queue(self.max_size) + + async with self._lock: + self._subscribers[subscriber_id] = queue + + return subscriber_id, queue + + async def unsubscribe(self, subscriber_id: str) -> None: + """ + unsubscribe from events + + Args: + subscriber_id(str): subscriber unique identifier + """ + async with self._lock: + self._subscribers.pop(subscriber_id, None) diff --git a/src/ahriman/core/status/watcher.py b/src/ahriman/core/status/watcher.py index 12863a6a..bd4a4088 100644 --- a/src/ahriman/core/status/watcher.py +++ b/src/ahriman/core/status/watcher.py @@ -17,15 +17,16 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -from collections.abc import Callable +# pylint: disable=too-many-public-methods +from asyncio import Lock from dataclasses import replace -from threading import Lock -from typing import Any, Self +from typing import Self from ahriman.core.exceptions import UnknownPackageError from ahriman.core.log import LazyLogging from ahriman.core.repository.package_info import PackageInfo from ahriman.core.status import Client +from ahriman.core.status.event_bus import EventBus from ahriman.models.build_status import BuildStatus, BuildStatusEnum from ahriman.models.changes import Changes from ahriman.models.dependencies import Dependencies @@ -41,51 +42,74 @@ class Watcher(LazyLogging): Attributes: client(Client): reporter instance + event_bus(EventBus): event bus instance package_info(PackageInfo): package info instance status(BuildStatus): daemon status """ - def __init__(self, client: Client, package_info: PackageInfo) -> None: + def __init__(self, client: Client, package_info: PackageInfo, event_bus: EventBus) -> None: """ Args: client(Client): reporter instance package_info(PackageInfo): package info instance + event_bus(EventBus): event bus instance """ self.client = client self.package_info = package_info + self.event_bus = event_bus self._lock = Lock() self._known: dict[str, tuple[Package, BuildStatus]] = {} self.status = BuildStatus() - @property - def packages(self) -> list[tuple[Package, BuildStatus]]: + async def event_add(self, event: Event) -> None: """ - get current known packages list + create new event + + Args: + event(Event): audit log event + """ + self.client.event_add(event) + + async def event_get(self, event: str | EventType | None, object_id: str | None, + from_date: int | float | None = None, to_date: int | float | None = None, + limit: int = -1, offset: int = 0) -> list[Event]: + """ + retrieve list of events + + Args: + event(str | EventType | None): filter by event type + object_id(str | None): filter by event object + from_date(int | float | None, optional): minimal creation date, inclusive (Default value = None) + to_date(int | float | None, optional): maximal creation date, exclusive (Default value = None) + limit(int, optional): limit records to the specified count, -1 means unlimited (Default value = -1) + offset(int, optional): records offset (Default value = 0) Returns: - list[tuple[Package, BuildStatus]]: list of packages together with their statuses + list[Event]: list of audit log events """ - with self._lock: - return list(self._known.values()) + return self.client.event_get(event, object_id, from_date, to_date, limit, offset) - event_add: Callable[[Event], None] - - event_get: Callable[[str | EventType | None, str | None, int | None, int | None, int, int], list[Event]] - - def load(self) -> None: + async def load(self) -> None: """ load packages from local database """ - with self._lock: + async with self._lock: self._known = { package.base: (package, status) for package, status in self.client.package_get(None) } - logs_rotate: Callable[[int], None] + async def logs_rotate(self, keep_last_records: int) -> None: + """ + remove older logs from storage - def package_archives(self, package_base: str) -> list[Package]: + Args: + keep_last_records(int): number of last records to keep + """ + self.client.logs_rotate(keep_last_records) + + async def package_archives(self, package_base: str) -> list[Package]: """ get known package archives @@ -97,15 +121,51 @@ class Watcher(LazyLogging): """ return self.package_info.package_archives(package_base) - package_changes_get: Callable[[str], Changes] + async def package_changes_get(self, package_base: str) -> Changes: + """ + get package changes - package_changes_update: Callable[[str, Changes], None] + Args: + package_base(str): package base to retrieve - package_dependencies_get: Callable[[str], Dependencies] + Returns: + Changes: package changes if available and empty object otherwise + """ + return self.client.package_changes_get(package_base) - package_dependencies_update: Callable[[str, Dependencies], None] + async def package_changes_update(self, package_base: str, changes: Changes) -> None: + """ + update package changes - def package_get(self, package_base: str) -> tuple[Package, BuildStatus]: + Args: + package_base(str): package base to update + changes(Changes): changes descriptor + """ + self.client.package_changes_update(package_base, changes) + + async def package_dependencies_get(self, package_base: str) -> Dependencies: + """ + get package dependencies + + Args: + package_base(str): package base to retrieve + + Returns: + list[Dependencies]: package implicit dependencies if available + """ + return self.client.package_dependencies_get(package_base) + + async def package_dependencies_update(self, package_base: str, dependencies: Dependencies) -> None: + """ + update package dependencies + + Args: + package_base(str): package base to update + dependencies(Dependencies): dependencies descriptor + """ + self.client.package_dependencies_update(package_base, dependencies) + + async def package_get(self, package_base: str) -> tuple[Package, BuildStatus]: """ get current package base build status @@ -119,18 +179,12 @@ class Watcher(LazyLogging): UnknownPackageError: if no package found """ try: - with self._lock: + async with self._lock: return self._known[package_base] except KeyError: raise UnknownPackageError(package_base) from None - package_logs_add: Callable[[LogRecord], None] - - package_logs_get: Callable[[str, str | None, str | None, int, int], list[LogRecord]] - - package_logs_remove: Callable[[str, str | None], None] - - def package_hold_update(self, package_base: str, *, enabled: bool) -> None: + async def package_hold_update(self, package_base: str, *, enabled: bool) -> None: """ update package hold status @@ -138,29 +192,104 @@ class Watcher(LazyLogging): package_base(str): package base name enabled(bool): new hold status """ - package, status = self.package_get(package_base) - with self._lock: + package, status = await self.package_get(package_base) + async with self._lock: self._known[package_base] = (package, replace(status, is_held=enabled)) self.client.package_hold_update(package_base, enabled=enabled) - package_patches_get: Callable[[str, str | None], list[PkgbuildPatch]] + await self.event_bus.broadcast(EventType.PackageHeld, package_base, is_held=enabled) - package_patches_remove: Callable[[str, str], None] + async def package_logs_add(self, log_record: LogRecord) -> None: + """ + post log record - package_patches_update: Callable[[str, PkgbuildPatch], None] + Args: + log_record(LogRecord): log record + """ + self.client.package_logs_add(log_record) - def package_remove(self, package_base: str) -> None: + await self.event_bus.broadcast( + EventType.BuildLog, + log_record.log_record_id.package_base, + created=log_record.created, + message=log_record.message, + version=log_record.log_record_id.version, + ) + + async def package_logs_get(self, package_base: str, version: str | None = None, process_id: str | None = None, + limit: int = -1, offset: int = 0) -> list[LogRecord]: + """ + get package logs + + Args: + package_base(str): package base + version(str | None, optional): package version to search (Default value = None) + process_id(str | None, optional): process identifier to search (Default value = None) + limit(int, optional): limit records to the specified count, -1 means unlimited (Default value = -1) + offset(int, optional): records offset (Default value = 0) + + Returns: + list[LogRecord]: package logs + """ + return self.client.package_logs_get(package_base, version, process_id, limit, offset) + + async def package_logs_remove(self, package_base: str, version: str | None) -> None: + """ + remove package logs + + Args: + package_base(str): package base + version(str | None): package version to remove logs. If ``None`` is set, all logs will be removed + """ + self.client.package_logs_remove(package_base, version) + + async def package_patches_get(self, package_base: str, variable: str | None) -> list[PkgbuildPatch]: + """ + get package patches + + Args: + package_base(str): package base to retrieve + variable(str | None): optional filter by patch variable + + Returns: + list[PkgbuildPatch]: list of patches for the specified package + """ + return self.client.package_patches_get(package_base, variable) + + async def package_patches_remove(self, package_base: str, variable: str | None) -> None: + """ + remove package patch + + Args: + package_base(str): package base to update + variable(str | None): patch name. If ``None`` is set, all patches will be removed + """ + self.client.package_patches_remove(package_base, variable) + + async def package_patches_update(self, package_base: str, patch: PkgbuildPatch) -> None: + """ + create or update package patch + + Args: + package_base(str): package base to update + patch(PkgbuildPatch): package patch + """ + self.client.package_patches_update(package_base, patch) + + async def package_remove(self, package_base: str) -> None: """ remove package base from known list if any Args: package_base(str): package base """ - with self._lock: + async with self._lock: self._known.pop(package_base, None) self.client.package_remove(package_base) - def package_status_update(self, package_base: str, status: BuildStatusEnum) -> None: + await self.event_bus.broadcast(EventType.PackageRemoved, package_base) + + async def package_status_update(self, package_base: str, status: BuildStatusEnum) -> None: """ update package status @@ -168,12 +297,14 @@ class Watcher(LazyLogging): package_base(str): package base to update status(BuildStatusEnum): new build status """ - package, current_status = self.package_get(package_base) - with self._lock: + package, current_status = await self.package_get(package_base) + async with self._lock: self._known[package_base] = (package, BuildStatus(status, is_held=current_status.is_held)) self.client.package_status_update(package_base, status) - def package_update(self, package: Package, status: BuildStatusEnum) -> None: + await self.event_bus.broadcast(EventType.PackageStatusChanged, package_base, status=status.value) + + async def package_update(self, package: Package, status: BuildStatusEnum) -> None: """ update package @@ -181,12 +312,32 @@ class Watcher(LazyLogging): package(Package): package description status(BuildStatusEnum): new build status """ - with self._lock: + async with self._lock: _, current_status = self._known.get(package.base, (package, BuildStatus())) self._known[package.base] = (package, BuildStatus(status, is_held=current_status.is_held)) self.client.package_update(package, status) - def status_update(self, status: BuildStatusEnum) -> None: + await self.event_bus.broadcast( + EventType.PackageUpdated, package.base, status=status.value, version=package.version, + ) + + async def packages(self) -> list[tuple[Package, BuildStatus]]: + """ + get current known packages list + + Returns: + list[tuple[Package, BuildStatus]]: list of packages together with their statuses + """ + async with self._lock: + return list(self._known.values()) + + async def shutdown(self) -> None: + """ + gracefully shutdown watcher + """ + await self.event_bus.shutdown() + + async def status_update(self, status: BuildStatusEnum) -> None: """ update service status @@ -195,6 +346,8 @@ class Watcher(LazyLogging): """ self.status = BuildStatus(status) + await self.event_bus.broadcast(EventType.ServiceStatusChanged, None, status=status.value) + def __call__(self, package_base: str | None) -> Self: """ extract client for future calls @@ -204,24 +357,11 @@ class Watcher(LazyLogging): Returns: Self: instance of self to pass calls to the client - """ - if package_base is not None: - _ = self.package_get(package_base) - return self - - def __getattr__(self, item: str) -> Any: - """ - proxy methods for reporter client - - Args: - item(str): property name - - Returns: - Any: attribute by its name Raises: - AttributeError: in case if no such attribute found + UnknownPackageError: if no package found """ - if (method := getattr(self.client, item, None)) is not None: - return method - raise AttributeError(f"'{self.__class__.__qualname__}' object has no attribute '{item}'") + # keep check here instead of calling package_get to keep this method synchronized + if package_base is not None and package_base not in self._known: + raise UnknownPackageError(package_base) + return self diff --git a/src/ahriman/models/event.py b/src/ahriman/models/event.py index f6ceb46c..cbda6180 100644 --- a/src/ahriman/models/event.py +++ b/src/ahriman/models/event.py @@ -28,16 +28,24 @@ class EventType(StrEnum): predefined event types Attributes: + BuildLog(EventType): new build log line + PackageHeld(EventType): package hold status has been changed PackageOutdated(EventType): package has been marked as out-of-date PackageRemoved(EventType): package has been removed + PackageStatusChanged(EventType): package build status has been changed PackageUpdateFailed(EventType): package update has been failed PackageUpdated(EventType): package has been updated + ServiceStatusChanged(EventType): service status has been changed """ + BuildLog = "build-log" + PackageHeld = "package-held" PackageOutdated = "package-outdated" PackageRemoved = "package-removed" + PackageStatusChanged = "package-status-changed" PackageUpdateFailed = "package-update-failed" PackageUpdated = "package-updated" + ServiceStatusChanged = "service-status-changed" class Event: diff --git a/src/ahriman/web/views/v1/auditlog/event_bus.py b/src/ahriman/web/views/v1/auditlog/event_bus.py new file mode 100644 index 00000000..f2f2d23d --- /dev/null +++ b/src/ahriman/web/views/v1/auditlog/event_bus.py @@ -0,0 +1,70 @@ +# +# Copyright (c) 2021-2026 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import json + +from aiohttp.web import StreamResponse +from aiohttp_sse import sse_response +from typing import ClassVar + +from ahriman.models.user_access import UserAccess +from ahriman.web.apispec.decorators import apidocs +from ahriman.web.schemas import EventSchema, RepositoryIdSchema +from ahriman.web.views.base import BaseView + + +class EventBusView(BaseView): + """ + audit log view + + Attributes: + GET_PERMISSION(UserAccess): (class attribute) get permissions of self + """ + + GET_PERMISSION: ClassVar[UserAccess] = UserAccess.Full + ROUTES = ["/api/v1/events/stream"] + + @apidocs( + tags=["Audit log"], + summary="Live updates", + description="Stream live updates via SSE", + permission=GET_PERMISSION, + error_404_description="Repository is unknown", + schema=EventSchema(many=True), + query_schema=RepositoryIdSchema, + ) + async def get(self) -> StreamResponse: + """ + subscribe on updates + + Returns: + StreamResponse: 200 with streaming updates + """ + async with sse_response(self.request) as response: + subscription_id, queue = await self.service().event_bus.subscribe() + + try: + while response.is_connected(): + while (message := await queue.get()) is not None: + event_type, data = message + await response.send(json.dumps(data), event=event_type) + finally: + await self.service().event_bus.unsubscribe(subscription_id) + + return response diff --git a/src/ahriman/web/views/v1/auditlog/events.py b/src/ahriman/web/views/v1/auditlog/events.py index 968e174c..2f6c3db8 100644 --- a/src/ahriman/web/views/v1/auditlog/events.py +++ b/src/ahriman/web/views/v1/auditlog/events.py @@ -67,7 +67,7 @@ class EventsView(BaseView): except ValueError as ex: raise HTTPBadRequest(reason=str(ex)) - events = self.service().event_get(event, object_id, from_date, to_date, limit, offset) + events = await self.service().event_get(event, object_id, from_date, to_date, limit, offset) response = [event.view() for event in events] return self.json_response(response) @@ -94,6 +94,6 @@ class EventsView(BaseView): except Exception as ex: raise HTTPBadRequest(reason=str(ex)) - self.service().event_add(event) + await self.service().event_add(event) raise HTTPNoContent diff --git a/src/ahriman/web/views/v1/packages/archives.py b/src/ahriman/web/views/v1/packages/archives.py index 8c577bf5..9e7a140d 100644 --- a/src/ahriman/web/views/v1/packages/archives.py +++ b/src/ahriman/web/views/v1/packages/archives.py @@ -60,6 +60,6 @@ class Archives(StatusViewGuard, BaseView): """ package_base = self.request.match_info["package"] - archives = self.service(package_base=package_base).package_archives(package_base) + archives = await self.service(package_base=package_base).package_archives(package_base) return self.json_response([archive.view() for archive in archives]) diff --git a/src/ahriman/web/views/v1/packages/changes.py b/src/ahriman/web/views/v1/packages/changes.py index 0e98eb1c..562a0859 100644 --- a/src/ahriman/web/views/v1/packages/changes.py +++ b/src/ahriman/web/views/v1/packages/changes.py @@ -63,7 +63,7 @@ class ChangesView(StatusViewGuard, BaseView): """ package_base = self.request.match_info["package"] - changes = self.service(package_base=package_base).package_changes_get(package_base) + changes = await self.service(package_base=package_base).package_changes_get(package_base) return self.json_response(changes.view()) @@ -97,6 +97,6 @@ class ChangesView(StatusViewGuard, BaseView): raise HTTPBadRequest(reason=str(ex)) changes = Changes(last_commit_sha, change, pkgbuild) - self.service().package_changes_update(package_base, changes) + await self.service().package_changes_update(package_base, changes) raise HTTPNoContent diff --git a/src/ahriman/web/views/v1/packages/dependencies.py b/src/ahriman/web/views/v1/packages/dependencies.py index 9d4de53e..e8467576 100644 --- a/src/ahriman/web/views/v1/packages/dependencies.py +++ b/src/ahriman/web/views/v1/packages/dependencies.py @@ -63,7 +63,7 @@ class DependenciesView(StatusViewGuard, BaseView): """ package_base = self.request.match_info["package"] - dependencies = self.service(package_base=package_base).package_dependencies_get(package_base) + dependencies = await self.service(package_base=package_base).package_dependencies_get(package_base) return self.json_response(dependencies.view()) @@ -95,6 +95,6 @@ class DependenciesView(StatusViewGuard, BaseView): except Exception as ex: raise HTTPBadRequest(reason=str(ex)) - self.service(package_base=package_base).package_dependencies_update(package_base, dependencies) + await self.service(package_base=package_base).package_dependencies_update(package_base, dependencies) raise HTTPNoContent diff --git a/src/ahriman/web/views/v1/packages/hold.py b/src/ahriman/web/views/v1/packages/hold.py index 0a342ce4..cc6e01f4 100644 --- a/src/ahriman/web/views/v1/packages/hold.py +++ b/src/ahriman/web/views/v1/packages/hold.py @@ -68,7 +68,7 @@ class HoldView(StatusViewGuard, BaseView): raise HTTPBadRequest(reason=str(ex)) try: - self.service().package_hold_update(package_base, enabled=is_held) + await self.service().package_hold_update(package_base, enabled=is_held) except UnknownPackageError: raise HTTPNotFound(reason=f"Package {package_base} is unknown") diff --git a/src/ahriman/web/views/v1/packages/logs.py b/src/ahriman/web/views/v1/packages/logs.py index 26f4bf10..c63a6d6c 100644 --- a/src/ahriman/web/views/v1/packages/logs.py +++ b/src/ahriman/web/views/v1/packages/logs.py @@ -62,7 +62,7 @@ class LogsView(StatusViewGuard, BaseView): """ package_base = self.request.match_info["package"] version = self.request.query.get("version") - self.service().package_logs_remove(package_base, version) + await self.service().package_logs_remove(package_base, version) raise HTTPNoContent @@ -89,8 +89,8 @@ class LogsView(StatusViewGuard, BaseView): package_base = self.request.match_info["package"] try: - _, status = self.service().package_get(package_base) - logs = self.service(package_base=package_base).package_logs_get(package_base, None, None, -1, 0) + _, status = await self.service().package_get(package_base) + logs = await self.service(package_base=package_base).package_logs_get(package_base, None, None, -1, 0) except UnknownPackageError: raise HTTPNotFound(reason=f"Package {package_base} is unknown") @@ -127,6 +127,6 @@ class LogsView(StatusViewGuard, BaseView): except Exception as ex: raise HTTPBadRequest(reason=str(ex)) - self.service().package_logs_add(log_record) + await self.service().package_logs_add(log_record) raise HTTPNoContent diff --git a/src/ahriman/web/views/v1/packages/package.py b/src/ahriman/web/views/v1/packages/package.py index 79e2546b..41f44d6c 100644 --- a/src/ahriman/web/views/v1/packages/package.py +++ b/src/ahriman/web/views/v1/packages/package.py @@ -66,7 +66,7 @@ class PackageView(StatusViewGuard, BaseView): HTTPNoContent: on success response """ package_base = self.request.match_info["package"] - self.service().package_remove(package_base) + await self.service().package_remove(package_base) raise HTTPNoContent @@ -94,7 +94,7 @@ class PackageView(StatusViewGuard, BaseView): repository_id = self.repository_id() try: - package, status = self.service(repository_id).package_get(package_base) + package, status = await self.service(repository_id).package_get(package_base) except UnknownPackageError: raise HTTPNotFound(reason=f"Package {package_base} is unknown") @@ -137,9 +137,9 @@ class PackageView(StatusViewGuard, BaseView): try: if package is None: - self.service().package_status_update(package_base, status) + await self.service().package_status_update(package_base, status) else: - self.service().package_update(package, status) + await self.service().package_update(package, status) except UnknownPackageError: raise HTTPBadRequest(reason=f"Package {package_base} is unknown, but no package body set") diff --git a/src/ahriman/web/views/v1/packages/packages.py b/src/ahriman/web/views/v1/packages/packages.py index a93d82b9..f01c7760 100644 --- a/src/ahriman/web/views/v1/packages/packages.py +++ b/src/ahriman/web/views/v1/packages/packages.py @@ -67,7 +67,7 @@ class PackagesView(StatusViewGuard, BaseView): stop = offset + limit if limit >= 0 else None repository_id = self.repository_id() - packages = self.service(repository_id).packages + packages = await self.service(repository_id).packages() comparator: Callable[[tuple[Package, BuildStatus]], Comparable] = lambda items: items[0].base response = [ @@ -95,6 +95,6 @@ class PackagesView(StatusViewGuard, BaseView): Raises: HTTPNoContent: on success response """ - self.service().load() + await self.service().load() raise HTTPNoContent diff --git a/src/ahriman/web/views/v1/packages/patch.py b/src/ahriman/web/views/v1/packages/patch.py index 9ec13bd4..936e5bec 100644 --- a/src/ahriman/web/views/v1/packages/patch.py +++ b/src/ahriman/web/views/v1/packages/patch.py @@ -57,7 +57,7 @@ class PatchView(StatusViewGuard, BaseView): package_base = self.request.match_info["package"] variable = self.request.match_info["patch"] - self.service().package_patches_remove(package_base, variable) + await self.service().package_patches_remove(package_base, variable) raise HTTPNoContent @@ -83,7 +83,7 @@ class PatchView(StatusViewGuard, BaseView): package_base = self.request.match_info["package"] variable = self.request.match_info["patch"] - patches = self.service().package_patches_get(package_base, variable) + patches = await self.service().package_patches_get(package_base, variable) selected = next((patch for patch in patches if patch.key == variable), None) if selected is None: diff --git a/src/ahriman/web/views/v1/packages/patches.py b/src/ahriman/web/views/v1/packages/patches.py index dd8346e0..cf4533df 100644 --- a/src/ahriman/web/views/v1/packages/patches.py +++ b/src/ahriman/web/views/v1/packages/patches.py @@ -57,7 +57,7 @@ class PatchesView(StatusViewGuard, BaseView): Response: 200 with package patches on success """ package_base = self.request.match_info["package"] - patches = self.service().package_patches_get(package_base, None) + patches = await self.service().package_patches_get(package_base, None) response = [patch.view() for patch in patches] return self.json_response(response) @@ -88,6 +88,6 @@ class PatchesView(StatusViewGuard, BaseView): except Exception as ex: raise HTTPBadRequest(reason=str(ex)) - self.service().package_patches_update(package_base, PkgbuildPatch.parse(key, value)) + await self.service().package_patches_update(package_base, PkgbuildPatch.parse(key, value)) raise HTTPNoContent diff --git a/src/ahriman/web/views/v1/service/logs.py b/src/ahriman/web/views/v1/service/logs.py index a249883e..94767494 100644 --- a/src/ahriman/web/views/v1/service/logs.py +++ b/src/ahriman/web/views/v1/service/logs.py @@ -59,6 +59,6 @@ class LogsView(BaseView): except Exception as ex: raise HTTPBadRequest(reason=str(ex)) - self.service().logs_rotate(keep_last_records) + await self.service().logs_rotate(keep_last_records) raise HTTPNoContent diff --git a/src/ahriman/web/views/v1/status/status.py b/src/ahriman/web/views/v1/status/status.py index 55a851a0..a06fd4f9 100644 --- a/src/ahriman/web/views/v1/status/status.py +++ b/src/ahriman/web/views/v1/status/status.py @@ -62,7 +62,7 @@ class StatusView(StatusViewGuard, BaseView): Response: 200 with service status object """ repository_id = self.repository_id() - packages = self.service(repository_id).packages + packages = await self.service(repository_id).packages() counters = Counters.from_packages(packages) stats = RepositoryStats.from_packages([package for package, _ in packages]) @@ -101,6 +101,6 @@ class StatusView(StatusViewGuard, BaseView): except Exception as ex: raise HTTPBadRequest(reason=str(ex)) - self.service().status_update(status) + await self.service().status_update(status) raise HTTPNoContent diff --git a/src/ahriman/web/views/v2/packages/logs.py b/src/ahriman/web/views/v2/packages/logs.py index 231e7091..6ed20482 100644 --- a/src/ahriman/web/views/v2/packages/logs.py +++ b/src/ahriman/web/views/v2/packages/logs.py @@ -67,7 +67,7 @@ class LogsView(StatusViewGuard, BaseView): version = self.request.query.get("version", None) process = self.request.query.get("process_id", None) - logs = self.service(package_base=package_base).package_logs_get(package_base, version, process, limit, offset) + logs = await self.service(package_base=package_base).package_logs_get(package_base, version, process, limit, offset) head = self.request.query.get("head", "false") # pylint: disable=protected-access diff --git a/src/ahriman/web/web.py b/src/ahriman/web/web.py index 142f2508..7cc9905f 100644 --- a/src/ahriman/web/web.py +++ b/src/ahriman/web/web.py @@ -33,6 +33,7 @@ from ahriman.core.exceptions import InitializeError from ahriman.core.repository.package_info import PackageInfo from ahriman.core.spawn import Spawn from ahriman.core.status import Client +from ahriman.core.status.event_bus import EventBus from ahriman.core.status.watcher import Watcher from ahriman.models.repository_id import RepositoryId from ahriman.web.apispec.info import setup_apispec @@ -108,7 +109,9 @@ def _create_watcher(path: Path, repository_id: RepositoryId) -> Watcher: package_info.reporter = client package_info.repository_id = repository_id - return Watcher(client, package_info) + event_bus = EventBus(configuration.getint("web", "max_queue_size", fallback=0)) + + return Watcher(client, package_info, event_bus) async def _on_shutdown(application: Application) -> None: @@ -118,6 +121,8 @@ async def _on_shutdown(application: Application) -> None: Args: application(Application): web application instance """ + for watcher in application[WatcherKey].values(): + await watcher.shutdown() application.logger.warning("server terminated") @@ -135,7 +140,7 @@ async def _on_startup(application: Application) -> None: try: for watcher in application[WatcherKey].values(): - watcher.load() + await watcher.load() except Exception: message = "could not load packages" application.logger.exception(message) diff --git a/tests/ahriman/conftest.py b/tests/ahriman/conftest.py index 56fadf19..99d27d83 100644 --- a/tests/ahriman/conftest.py +++ b/tests/ahriman/conftest.py @@ -19,6 +19,7 @@ from ahriman.core.repository import Repository from ahriman.core.repository.package_info import PackageInfo from ahriman.core.spawn import Spawn from ahriman.core.status import Client +from ahriman.core.status.event_bus import EventBus from ahriman.core.status.watcher import Watcher from ahriman.models.aur_package import AURPackage from ahriman.models.build_status import BuildStatus, BuildStatusEnum @@ -690,4 +691,5 @@ def watcher(local_client: Client) -> Watcher: Watcher: package status watcher test instance """ package_info = PackageInfo() - return Watcher(local_client, package_info) + event_bus = EventBus(0) + return Watcher(local_client, package_info, event_bus) diff --git a/tests/ahriman/core/status/test_watcher.py b/tests/ahriman/core/status/test_watcher.py index 7646e553..f353a722 100644 --- a/tests/ahriman/core/status/test_watcher.py +++ b/tests/ahriman/core/status/test_watcher.py @@ -18,21 +18,21 @@ def test_packages(watcher: Watcher, package_ahriman: Package) -> None: assert watcher.packages -def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: +async def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must correctly load packages """ cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_get", return_value=[(package_ahriman, BuildStatus())]) - watcher.load() + await watcher.load() cache_mock.assert_called_once_with(None) package, status = watcher._known[package_ahriman.base] assert package == package_ahriman assert status.status == BuildStatusEnum.Unknown -def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: +async def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must correctly load packages with known statuses """ @@ -40,146 +40,147 @@ def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFi mocker.patch("ahriman.core.status.local_client.LocalClient.package_get", return_value=[(package_ahriman, status)]) watcher._known = {package_ahriman.base: (package_ahriman, status)} - watcher.load() + await watcher.load() _, status = watcher._known[package_ahriman.base] assert status.status == BuildStatusEnum.Success -def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: +async def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must return package archives from package info """ archives_mock = mocker.patch("ahriman.core.repository.package_info.PackageInfo.package_archives", return_value=[package_ahriman]) - result = watcher.package_archives(package_ahriman.base) + result = await watcher.package_archives(package_ahriman.base) assert result == [package_ahriman] archives_mock.assert_called_once_with(package_ahriman.base) -def test_package_get(watcher: Watcher, package_ahriman: Package) -> None: +async def test_package_get(watcher: Watcher, package_ahriman: Package) -> None: """ must return package status """ watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} - package, status = watcher.package_get(package_ahriman.base) + package, status = await watcher.package_get(package_ahriman.base) assert package == package_ahriman assert status.status == BuildStatusEnum.Unknown -def test_package_get_failed(watcher: Watcher, package_ahriman: Package) -> None: +async def test_package_get_failed(watcher: Watcher, package_ahriman: Package) -> None: """ must fail on unknown package """ with pytest.raises(UnknownPackageError): - watcher.package_get(package_ahriman.base) + await watcher.package_get(package_ahriman.base) -def test_package_hold_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: +async def test_package_hold_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must update package hold status """ cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_hold_update") watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} - watcher.package_hold_update(package_ahriman.base, enabled=True) + await watcher.package_hold_update(package_ahriman.base, enabled=True) cache_mock.assert_called_once_with(package_ahriman.base, enabled=True) _, status = watcher._known[package_ahriman.base] assert status.is_held is True -def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None: +async def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None: """ must fail on unknown package hold update """ with pytest.raises(UnknownPackageError): - watcher.package_hold_update(package_ahriman.base, enabled=True) + await watcher.package_hold_update(package_ahriman.base, enabled=True) -def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: +async def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must remove package base """ cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove") watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} - watcher.package_remove(package_ahriman.base) + await watcher.package_remove(package_ahriman.base) assert not watcher._known cache_mock.assert_called_once_with(package_ahriman.base) -def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: +async def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must not fail on unknown base removal """ cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove") - watcher.package_remove(package_ahriman.base) + await watcher.package_remove(package_ahriman.base) cache_mock.assert_called_once_with(package_ahriman.base) -def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: +async def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must update package status only for known package """ cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update") watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} - watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success) + await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success) cache_mock.assert_called_once_with(package_ahriman.base, pytest.helpers.anyvar(int)) package, status = watcher._known[package_ahriman.base] assert package == package_ahriman assert status.status == BuildStatusEnum.Success -def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package, - mocker: MockerFixture) -> None: +async def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package, + mocker: MockerFixture) -> None: """ must preserve hold status on package status update """ mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update") watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))} - watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success) + await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success) _, status = watcher._known[package_ahriman.base] assert status.is_held is True -def test_package_status_update_unknown(watcher: Watcher, package_ahriman: Package) -> None: +async def test_package_status_update_unknown(watcher: Watcher, package_ahriman: Package) -> None: """ must fail on unknown package status update only """ with pytest.raises(UnknownPackageError): - watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Unknown) + await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Unknown) -def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: +async def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must add package to cache """ cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_update") - watcher.package_update(package_ahriman, BuildStatusEnum.Unknown) + await watcher.package_update(package_ahriman, BuildStatusEnum.Unknown) assert watcher.packages cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int)) -def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: +async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package, + mocker: MockerFixture) -> None: """ must preserve hold status on package update """ mocker.patch("ahriman.core.status.local_client.LocalClient.package_update") watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))} - watcher.package_update(package_ahriman, BuildStatusEnum.Success) + await watcher.package_update(package_ahriman, BuildStatusEnum.Success) _, status = watcher._known[package_ahriman.base] assert status.is_held is True -def test_status_update(watcher: Watcher) -> None: +async def test_status_update(watcher: Watcher) -> None: """ must update service status """ - watcher.status_update(BuildStatusEnum.Success) + await watcher.status_update(BuildStatusEnum.Success) assert watcher.status.status == BuildStatusEnum.Success @@ -204,18 +205,3 @@ def test_call_failed(watcher: Watcher, package_ahriman: Package) -> None: """ with pytest.raises(UnknownPackageError): assert watcher(package_ahriman.base) - - -def test_getattr(watcher: Watcher) -> None: - """ - must return client method call - """ - assert watcher.package_logs_remove - - -def test_getattr_unknown_method(watcher: Watcher) -> None: - """ - must raise AttributeError in case if no reporter attribute found - """ - with pytest.raises(AttributeError): - assert watcher.random_method