diff --git a/pyproject.toml b/pyproject.toml index aae0ba62..9b706873 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ web = [ "aiohttp", "aiohttp_cors", "aiohttp_jinja2", + "aiohttp_sse", ] web-auth = [ "ahriman[web]", diff --git a/src/ahriman/core/status/event_bus.py b/src/ahriman/core/status/event_bus.py new file mode 100644 index 00000000..349f415d --- /dev/null +++ b/src/ahriman/core/status/event_bus.py @@ -0,0 +1,95 @@ +# +# Copyright (c) 2021-2026 ahriman team. +# +# This file is part of ahriman +# (see https://github.com/arcan1s/ahriman). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import uuid + +from asyncio import Lock, Queue, QueueFull +from typing import Any + +from ahriman.core.log import LazyLogging + + +SSEvent = tuple[str, dict[str, Any]] + + +class EventBus(LazyLogging): + """ + event bus implementation + + Attributes: + max_size(int): maximum size of queue + """ + + def __init__(self, max_size: int) -> None: + """ + Args: + max_size(int): maximum size of queue + """ + self.max_size = max_size + + self._lock = Lock() + self._subscribers: dict[str, Queue[SSEvent | None]] = {} + + async def broadcast(self, event_type: str, event: dict[str, Any]) -> None: + """ + broadcast event to all subscribers of ``repository_id`` + + Args: + event_type(str): event type + event(dict[str, Any]): event data + """ + async with self._lock: + for subscriber_id, queue in self._subscribers.items(): + try: + queue.put_nowait((event_type, event)) + except QueueFull: + self.logger.warning("discard message to slow subscriber %s", subscriber_id) + + async def shutdown(self) -> None: + """ + gracefully shutdown all subscribers + """ + async with self._lock: + for queue in self._subscribers.values(): + queue.put_nowait(None) + + async def subscribe(self) -> tuple[str, Queue[SSEvent | None]]: + """ + register new subscriber to ``repository_id`` + + Returns: + tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue + """ + subscriber_id = str(uuid.uuid4()) + queue: Queue[SSEvent | None] = Queue(self.max_size) + + async with self._lock: + self._subscribers[subscriber_id] = queue + + return subscriber_id, queue + + async def unsubscribe(self, subscriber_id: str) -> None: + """ + unsubscribe from events + + Args: + subscriber_id(str): subscriber unique identifier + """ + async with self._lock: + self._subscribers.pop(subscriber_id, None) diff --git a/src/ahriman/core/status/watcher.py b/src/ahriman/core/status/watcher.py index 12863a6a..0674f2bf 100644 --- a/src/ahriman/core/status/watcher.py +++ b/src/ahriman/core/status/watcher.py @@ -26,6 +26,7 @@ from ahriman.core.exceptions import UnknownPackageError from ahriman.core.log import LazyLogging from ahriman.core.repository.package_info import PackageInfo from ahriman.core.status import Client +from ahriman.core.status.event_bus import EventBus from ahriman.models.build_status import BuildStatus, BuildStatusEnum from ahriman.models.changes import Changes from ahriman.models.dependencies import Dependencies @@ -41,11 +42,12 @@ class Watcher(LazyLogging): Attributes: client(Client): reporter instance + event_bus(EventBus): event bus instance package_info(PackageInfo): package info instance status(BuildStatus): daemon status """ - def __init__(self, client: Client, package_info: PackageInfo) -> None: + def __init__(self, client: Client, package_info: PackageInfo, event_bus: EventBus) -> None: """ Args: client(Client): reporter instance @@ -53,6 +55,7 @@ class Watcher(LazyLogging): """ self.client = client self.package_info = package_info + self.event_bus = event_bus self._lock = Lock() self._known: dict[str, tuple[Package, BuildStatus]] = {} diff --git a/src/ahriman/web/web.py b/src/ahriman/web/web.py index 142f2508..405081e9 100644 --- a/src/ahriman/web/web.py +++ b/src/ahriman/web/web.py @@ -33,6 +33,7 @@ from ahriman.core.exceptions import InitializeError from ahriman.core.repository.package_info import PackageInfo from ahriman.core.spawn import Spawn from ahriman.core.status import Client +from ahriman.core.status.event_bus import EventBus from ahriman.core.status.watcher import Watcher from ahriman.models.repository_id import RepositoryId from ahriman.web.apispec.info import setup_apispec @@ -108,7 +109,9 @@ def _create_watcher(path: Path, repository_id: RepositoryId) -> Watcher: package_info.reporter = client package_info.repository_id = repository_id - return Watcher(client, package_info) + event_bus = EventBus(configuration.getint("web", "max_queue_size", fallback=0)) + + return Watcher(client, package_info, event_bus) async def _on_shutdown(application: Application) -> None: