mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-05-22 14:46:15 +00:00
queue processing simplification
This commit is contained in:
@@ -19,7 +19,8 @@
|
||||
#
|
||||
import uuid
|
||||
|
||||
from asyncio import Lock, Queue, QueueFull
|
||||
from asyncio import Lock, Queue, QueueFull, QueueShutDown
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from ahriman.core.log import LazyLogging
|
||||
@@ -29,6 +30,22 @@ from ahriman.models.event import EventType
|
||||
SSEvent = tuple[str, dict[str, Any]]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _Subscription:
|
||||
"""
|
||||
internal event bus subscription record
|
||||
|
||||
Attributes:
|
||||
topics(list[EventType] | None): event type filter, ``None`` means all
|
||||
object_id(str | None): object identifier filter, ``None`` means all
|
||||
queue(Queue[SSEvent]): per-subscriber event queue
|
||||
"""
|
||||
|
||||
topics: list[EventType] | None
|
||||
object_id: str | None
|
||||
queue: Queue[SSEvent]
|
||||
|
||||
|
||||
class EventBus(LazyLogging):
|
||||
"""
|
||||
event bus implementation
|
||||
@@ -45,7 +62,7 @@ class EventBus(LazyLogging):
|
||||
self.max_size = max_size
|
||||
|
||||
self._lock = Lock()
|
||||
self._subscribers: dict[str, tuple[list[EventType] | None, str | None, Queue[SSEvent | None]]] = {}
|
||||
self._subscribers: dict[str, _Subscription] = {}
|
||||
|
||||
async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None:
|
||||
"""
|
||||
@@ -60,30 +77,31 @@ class EventBus(LazyLogging):
|
||||
event.update(kwargs)
|
||||
|
||||
async with self._lock:
|
||||
for subscriber_id, (topics, filter_object_id, queue) in self._subscribers.items():
|
||||
if topics is not None and event_type not in topics:
|
||||
continue
|
||||
if filter_object_id is not None and object_id != filter_object_id:
|
||||
continue
|
||||
try:
|
||||
queue.put_nowait((event_type, event))
|
||||
except QueueFull:
|
||||
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
|
||||
snapshot = list(self._subscribers.items())
|
||||
|
||||
for subscriber_id, subscription in snapshot:
|
||||
if subscription.topics is not None and event_type not in subscription.topics:
|
||||
continue
|
||||
if subscription.object_id is not None and object_id != subscription.object_id:
|
||||
continue
|
||||
|
||||
try:
|
||||
subscription.queue.put_nowait((event_type, event))
|
||||
except QueueFull:
|
||||
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
|
||||
except QueueShutDown:
|
||||
pass
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""
|
||||
gracefully shutdown all subscribers
|
||||
"""
|
||||
async with self._lock:
|
||||
for _, _, queue in self._subscribers.values():
|
||||
try:
|
||||
queue.put_nowait(None)
|
||||
except QueueFull:
|
||||
pass
|
||||
queue.shutdown()
|
||||
for subscription in self._subscribers.values():
|
||||
subscription.queue.shutdown()
|
||||
|
||||
async def subscribe(self, topics: list[EventType] | None = None,
|
||||
object_id: str | None = None) -> tuple[str, Queue[SSEvent | None]]:
|
||||
object_id: str | None = None) -> tuple[str, Queue[SSEvent]]:
|
||||
"""
|
||||
register new subscriber
|
||||
|
||||
@@ -94,13 +112,13 @@ class EventBus(LazyLogging):
|
||||
events for all objects will be delivered (Default value = None)
|
||||
|
||||
Returns:
|
||||
tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue
|
||||
tuple[str, Queue[SSEvent]]: subscriber identifier and associated queue
|
||||
"""
|
||||
subscriber_id = str(uuid.uuid4())
|
||||
queue: Queue[SSEvent | None] = Queue(self.max_size)
|
||||
queue: Queue[SSEvent] = Queue(self.max_size)
|
||||
|
||||
async with self._lock:
|
||||
self._subscribers[subscriber_id] = (topics, object_id, queue)
|
||||
self._subscribers[subscriber_id] = _Subscription(topics=topics, object_id=object_id, queue=queue)
|
||||
|
||||
return subscriber_id, queue
|
||||
|
||||
@@ -112,7 +130,6 @@ class EventBus(LazyLogging):
|
||||
subscriber_id(str): subscriber unique identifier
|
||||
"""
|
||||
async with self._lock:
|
||||
result = self._subscribers.pop(subscriber_id, None)
|
||||
if result is not None:
|
||||
_, _, queue = result
|
||||
queue.shutdown()
|
||||
subscription = self._subscribers.pop(subscriber_id, None)
|
||||
if subscription is not None:
|
||||
subscription.queue.shutdown()
|
||||
|
||||
@@ -44,23 +44,21 @@ class EventBusView(BaseView):
|
||||
ROUTES = ["/api/v1/events/stream"]
|
||||
|
||||
@staticmethod
|
||||
async def _run(response: EventSourceResponse, queue: Queue[SSEvent | None]) -> None:
|
||||
async def _run(response: EventSourceResponse, queue: Queue[SSEvent]) -> None:
|
||||
"""
|
||||
read events from queue and send them to the client
|
||||
|
||||
Args:
|
||||
response(EventSourceResponse): SSE response instance
|
||||
queue(Queue[SSEvent | None]): subscriber queue
|
||||
queue(Queue[SSEvent]): subscriber queue
|
||||
"""
|
||||
while response.is_connected():
|
||||
try:
|
||||
message = await wait_for(queue.get(), timeout=response.ping_interval)
|
||||
event_type, data = await wait_for(queue.get(), timeout=response.ping_interval)
|
||||
except TimeoutError:
|
||||
continue
|
||||
|
||||
if message is None:
|
||||
break # terminate queue on sentinel event
|
||||
event_type, data = message
|
||||
except QueueShutDown:
|
||||
break
|
||||
|
||||
await response.send(json.dumps(data), event=event_type)
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import pytest
|
||||
|
||||
from asyncio import QueueShutDown
|
||||
|
||||
from ahriman.core.status.event_bus import EventBus
|
||||
from ahriman.models.event import EventType
|
||||
from ahriman.models.package import Package
|
||||
@@ -49,15 +51,25 @@ async def test_broadcast_queue_full(event_bus: EventBus, package_ahriman: Packag
|
||||
assert queue.qsize() == 1
|
||||
|
||||
|
||||
async def test_broadcast_queue_shutdown(event_bus: EventBus, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must skip subscriber whose queue was shutdown concurrently
|
||||
"""
|
||||
_, queue = await event_bus.subscribe()
|
||||
queue.shutdown()
|
||||
|
||||
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
|
||||
|
||||
|
||||
async def test_shutdown(event_bus: EventBus) -> None:
|
||||
"""
|
||||
must send sentinel to all subscribers on shutdown
|
||||
must shutdown all subscriber queues on shutdown
|
||||
"""
|
||||
_, queue = await event_bus.subscribe()
|
||||
|
||||
await event_bus.shutdown()
|
||||
message = queue.get_nowait()
|
||||
assert message is None
|
||||
with pytest.raises(QueueShutDown):
|
||||
queue.get_nowait()
|
||||
|
||||
|
||||
async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
|
||||
@@ -105,8 +117,7 @@ async def test_subscribe_with_topics(event_bus: EventBus) -> None:
|
||||
must register subscriber with topic filter
|
||||
"""
|
||||
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog])
|
||||
topics, _, _ = event_bus._subscribers[subscriber_id]
|
||||
assert topics == [EventType.BuildLog]
|
||||
assert event_bus._subscribers[subscriber_id].topics == [EventType.BuildLog]
|
||||
|
||||
|
||||
async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None:
|
||||
@@ -114,8 +125,7 @@ async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Pa
|
||||
must register subscriber with object_id filter
|
||||
"""
|
||||
subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base)
|
||||
_, object_id, _ = event_bus._subscribers[subscriber_id]
|
||||
assert object_id == package_ahriman.base
|
||||
assert event_bus._subscribers[subscriber_id].object_id == package_ahriman.base
|
||||
|
||||
|
||||
async def test_unsubscribe(event_bus: EventBus) -> None:
|
||||
|
||||
@@ -53,7 +53,7 @@ async def test_run_timeout() -> None:
|
||||
|
||||
async def _shutdown() -> None:
|
||||
await asyncio.sleep(0.05)
|
||||
await queue.put(None)
|
||||
queue.shutdown()
|
||||
|
||||
response = AsyncMock()
|
||||
response.is_connected = lambda: True
|
||||
|
||||
Reference in New Issue
Block a user