From cc88bef1753508c14371a8cd3aeafcba20ddc4a6 Mon Sep 17 00:00:00 2001 From: Evgenii Alekseev Date: Wed, 6 May 2026 10:50:26 +0300 Subject: [PATCH] queue processing simplification --- src/ahriman/core/status/event_bus.py | 67 ++++++++++++------- .../web/views/v1/auditlog/event_bus.py | 12 ++-- tests/ahriman/core/status/test_event_bus.py | 24 +++++-- .../test_view_v1_auditlog_event_bus.py | 2 +- 4 files changed, 65 insertions(+), 40 deletions(-) diff --git a/src/ahriman/core/status/event_bus.py b/src/ahriman/core/status/event_bus.py index d934e72e..c1378712 100644 --- a/src/ahriman/core/status/event_bus.py +++ b/src/ahriman/core/status/event_bus.py @@ -19,7 +19,8 @@ # import uuid -from asyncio import Lock, Queue, QueueFull +from asyncio import Lock, Queue, QueueFull, QueueShutDown +from dataclasses import dataclass from typing import Any from ahriman.core.log import LazyLogging @@ -29,6 +30,22 @@ from ahriman.models.event import EventType SSEvent = tuple[str, dict[str, Any]] +@dataclass(frozen=True) +class _Subscription: + """ + internal event bus subscription record + + Attributes: + topics(list[EventType] | None): event type filter, ``None`` means all + object_id(str | None): object identifier filter, ``None`` means all + queue(Queue[SSEvent]): per-subscriber event queue + """ + + topics: list[EventType] | None + object_id: str | None + queue: Queue[SSEvent] + + class EventBus(LazyLogging): """ event bus implementation @@ -45,7 +62,7 @@ class EventBus(LazyLogging): self.max_size = max_size self._lock = Lock() - self._subscribers: dict[str, tuple[list[EventType] | None, str | None, Queue[SSEvent | None]]] = {} + self._subscribers: dict[str, _Subscription] = {} async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None: """ @@ -60,30 +77,31 @@ class EventBus(LazyLogging): event.update(kwargs) async with self._lock: - for subscriber_id, (topics, filter_object_id, queue) in self._subscribers.items(): - if topics is not None and event_type not in topics: - continue - if filter_object_id is not None and object_id != filter_object_id: - continue - try: - queue.put_nowait((event_type, event)) - except QueueFull: - self.logger.warning("discard message to slow subscriber %s", subscriber_id) + snapshot = list(self._subscribers.items()) + + for subscriber_id, subscription in snapshot: + if subscription.topics is not None and event_type not in subscription.topics: + continue + if subscription.object_id is not None and object_id != subscription.object_id: + continue + + try: + subscription.queue.put_nowait((event_type, event)) + except QueueFull: + self.logger.warning("discard message to slow subscriber %s", subscriber_id) + except QueueShutDown: + pass async def shutdown(self) -> None: """ gracefully shutdown all subscribers """ async with self._lock: - for _, _, queue in self._subscribers.values(): - try: - queue.put_nowait(None) - except QueueFull: - pass - queue.shutdown() + for subscription in self._subscribers.values(): + subscription.queue.shutdown() async def subscribe(self, topics: list[EventType] | None = None, - object_id: str | None = None) -> tuple[str, Queue[SSEvent | None]]: + object_id: str | None = None) -> tuple[str, Queue[SSEvent]]: """ register new subscriber @@ -94,13 +112,13 @@ class EventBus(LazyLogging): events for all objects will be delivered (Default value = None) Returns: - tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue + tuple[str, Queue[SSEvent]]: subscriber identifier and associated queue """ subscriber_id = str(uuid.uuid4()) - queue: Queue[SSEvent | None] = Queue(self.max_size) + queue: Queue[SSEvent] = Queue(self.max_size) async with self._lock: - self._subscribers[subscriber_id] = (topics, object_id, queue) + self._subscribers[subscriber_id] = _Subscription(topics=topics, object_id=object_id, queue=queue) return subscriber_id, queue @@ -112,7 +130,6 @@ class EventBus(LazyLogging): subscriber_id(str): subscriber unique identifier """ async with self._lock: - result = self._subscribers.pop(subscriber_id, None) - if result is not None: - _, _, queue = result - queue.shutdown() + subscription = self._subscribers.pop(subscriber_id, None) + if subscription is not None: + subscription.queue.shutdown() diff --git a/src/ahriman/web/views/v1/auditlog/event_bus.py b/src/ahriman/web/views/v1/auditlog/event_bus.py index 0fc01039..28d2fa63 100644 --- a/src/ahriman/web/views/v1/auditlog/event_bus.py +++ b/src/ahriman/web/views/v1/auditlog/event_bus.py @@ -44,23 +44,21 @@ class EventBusView(BaseView): ROUTES = ["/api/v1/events/stream"] @staticmethod - async def _run(response: EventSourceResponse, queue: Queue[SSEvent | None]) -> None: + async def _run(response: EventSourceResponse, queue: Queue[SSEvent]) -> None: """ read events from queue and send them to the client Args: response(EventSourceResponse): SSE response instance - queue(Queue[SSEvent | None]): subscriber queue + queue(Queue[SSEvent]): subscriber queue """ while response.is_connected(): try: - message = await wait_for(queue.get(), timeout=response.ping_interval) + event_type, data = await wait_for(queue.get(), timeout=response.ping_interval) except TimeoutError: continue - - if message is None: - break # terminate queue on sentinel event - event_type, data = message + except QueueShutDown: + break await response.send(json.dumps(data), event=event_type) diff --git a/tests/ahriman/core/status/test_event_bus.py b/tests/ahriman/core/status/test_event_bus.py index b8067e98..5c5d7996 100644 --- a/tests/ahriman/core/status/test_event_bus.py +++ b/tests/ahriman/core/status/test_event_bus.py @@ -1,5 +1,7 @@ import pytest +from asyncio import QueueShutDown + from ahriman.core.status.event_bus import EventBus from ahriman.models.event import EventType from ahriman.models.package import Package @@ -49,15 +51,25 @@ async def test_broadcast_queue_full(event_bus: EventBus, package_ahriman: Packag assert queue.qsize() == 1 +async def test_broadcast_queue_shutdown(event_bus: EventBus, package_ahriman: Package) -> None: + """ + must skip subscriber whose queue was shutdown concurrently + """ + _, queue = await event_bus.subscribe() + queue.shutdown() + + await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base) + + async def test_shutdown(event_bus: EventBus) -> None: """ - must send sentinel to all subscribers on shutdown + must shutdown all subscriber queues on shutdown """ _, queue = await event_bus.subscribe() await event_bus.shutdown() - message = queue.get_nowait() - assert message is None + with pytest.raises(QueueShutDown): + queue.get_nowait() async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None: @@ -105,8 +117,7 @@ async def test_subscribe_with_topics(event_bus: EventBus) -> None: must register subscriber with topic filter """ subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog]) - topics, _, _ = event_bus._subscribers[subscriber_id] - assert topics == [EventType.BuildLog] + assert event_bus._subscribers[subscriber_id].topics == [EventType.BuildLog] async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None: @@ -114,8 +125,7 @@ async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Pa must register subscriber with object_id filter """ subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base) - _, object_id, _ = event_bus._subscribers[subscriber_id] - assert object_id == package_ahriman.base + assert event_bus._subscribers[subscriber_id].object_id == package_ahriman.base async def test_unsubscribe(event_bus: EventBus) -> None: diff --git a/tests/ahriman/web/views/v1/auditlog/test_view_v1_auditlog_event_bus.py b/tests/ahriman/web/views/v1/auditlog/test_view_v1_auditlog_event_bus.py index 18808022..ce01dd27 100644 --- a/tests/ahriman/web/views/v1/auditlog/test_view_v1_auditlog_event_bus.py +++ b/tests/ahriman/web/views/v1/auditlog/test_view_v1_auditlog_event_bus.py @@ -53,7 +53,7 @@ async def test_run_timeout() -> None: async def _shutdown() -> None: await asyncio.sleep(0.05) - await queue.put(None) + queue.shutdown() response = AsyncMock() response.is_connected = lambda: True