Compare commits

...

1 Commits

Author SHA1 Message Date
c2e3ef9405 event bus implementation 2026-03-24 02:23:14 +02:00
4 changed files with 104 additions and 2 deletions

View File

@@ -58,6 +58,7 @@ web = [
"aiohttp", "aiohttp",
"aiohttp_cors", "aiohttp_cors",
"aiohttp_jinja2", "aiohttp_jinja2",
"aiohttp_sse",
] ]
web-auth = [ web-auth = [
"ahriman[web]", "ahriman[web]",

View File

@@ -0,0 +1,95 @@
#
# Copyright (c) 2021-2026 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import uuid
from asyncio import Lock, Queue, QueueFull
from typing import Any
from ahriman.core.log import LazyLogging
SSEvent = tuple[str, dict[str, Any]]
class EventBus(LazyLogging):
"""
event bus implementation
Attributes:
max_size(int): maximum size of queue
"""
def __init__(self, max_size: int) -> None:
"""
Args:
max_size(int): maximum size of queue
"""
self.max_size = max_size
self._lock = Lock()
self._subscribers: dict[str, Queue[SSEvent | None]] = {}
async def broadcast(self, event_type: str, event: dict[str, Any]) -> None:
"""
broadcast event to all subscribers of ``repository_id``
Args:
event_type(str): event type
event(dict[str, Any]): event data
"""
async with self._lock:
for subscriber_id, queue in self._subscribers.items():
try:
queue.put_nowait((event_type, event))
except QueueFull:
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
async def shutdown(self) -> None:
"""
gracefully shutdown all subscribers
"""
async with self._lock:
for queue in self._subscribers.values():
queue.put_nowait(None)
async def subscribe(self) -> tuple[str, Queue[SSEvent | None]]:
"""
register new subscriber to ``repository_id``
Returns:
tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue
"""
subscriber_id = str(uuid.uuid4())
queue: Queue[SSEvent | None] = Queue(self.max_size)
async with self._lock:
self._subscribers[subscriber_id] = queue
return subscriber_id, queue
async def unsubscribe(self, subscriber_id: str) -> None:
"""
unsubscribe from events
Args:
subscriber_id(str): subscriber unique identifier
"""
async with self._lock:
self._subscribers.pop(subscriber_id, None)

View File

@@ -26,6 +26,7 @@ from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.log import LazyLogging from ahriman.core.log import LazyLogging
from ahriman.core.repository.package_info import PackageInfo from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.status import Client from ahriman.core.status import Client
from ahriman.core.status.event_bus import EventBus
from ahriman.models.build_status import BuildStatus, BuildStatusEnum from ahriman.models.build_status import BuildStatus, BuildStatusEnum
from ahriman.models.changes import Changes from ahriman.models.changes import Changes
from ahriman.models.dependencies import Dependencies from ahriman.models.dependencies import Dependencies
@@ -41,11 +42,12 @@ class Watcher(LazyLogging):
Attributes: Attributes:
client(Client): reporter instance client(Client): reporter instance
event_bus(EventBus): event bus instance
package_info(PackageInfo): package info instance package_info(PackageInfo): package info instance
status(BuildStatus): daemon status status(BuildStatus): daemon status
""" """
def __init__(self, client: Client, package_info: PackageInfo) -> None: def __init__(self, client: Client, package_info: PackageInfo, event_bus: EventBus) -> None:
""" """
Args: Args:
client(Client): reporter instance client(Client): reporter instance
@@ -53,6 +55,7 @@ class Watcher(LazyLogging):
""" """
self.client = client self.client = client
self.package_info = package_info self.package_info = package_info
self.event_bus = event_bus
self._lock = Lock() self._lock = Lock()
self._known: dict[str, tuple[Package, BuildStatus]] = {} self._known: dict[str, tuple[Package, BuildStatus]] = {}

View File

@@ -33,6 +33,7 @@ from ahriman.core.exceptions import InitializeError
from ahriman.core.repository.package_info import PackageInfo from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.spawn import Spawn from ahriman.core.spawn import Spawn
from ahriman.core.status import Client from ahriman.core.status import Client
from ahriman.core.status.event_bus import EventBus
from ahriman.core.status.watcher import Watcher from ahriman.core.status.watcher import Watcher
from ahriman.models.repository_id import RepositoryId from ahriman.models.repository_id import RepositoryId
from ahriman.web.apispec.info import setup_apispec from ahriman.web.apispec.info import setup_apispec
@@ -108,7 +109,9 @@ def _create_watcher(path: Path, repository_id: RepositoryId) -> Watcher:
package_info.reporter = client package_info.reporter = client
package_info.repository_id = repository_id package_info.repository_id = repository_id
return Watcher(client, package_info) event_bus = EventBus(configuration.getint("web", "max_queue_size", fallback=0))
return Watcher(client, package_info, event_bus)
async def _on_shutdown(application: Application) -> None: async def _on_shutdown(application: Application) -> None: