Compare commits

..

8 Commits

Author SHA1 Message Date
arcanis b94cba4d25 install missing pacakge 2026-05-05 13:46:20 +03:00
arcanis dbe7d208ac zz 2026-05-05 13:46:20 +03:00
arcanis d7a61f089d fix typo 2026-05-05 13:46:20 +03:00
arcanis 418018bfb5 update configs 2026-05-05 13:46:20 +03:00
arcanis 7183be6fab review fixes 2026-05-05 13:46:20 +03:00
arcanis cdfac85ce5 docs update 2026-05-05 13:46:20 +03:00
arcanis 3da8fb889d update tests 2026-05-05 13:46:20 +03:00
arcanis 12ef8fd6e6 event bus implementation 2026-05-05 13:46:20 +03:00
5 changed files with 41 additions and 66 deletions
+1 -1
View File
@@ -3,7 +3,7 @@ version: 2
build: build:
os: ubuntu-lts-latest os: ubuntu-lts-latest
tools: tools:
python: "3.13" python: "3.12"
apt_packages: apt_packages:
- graphviz - graphviz
+25 -42
View File
@@ -19,8 +19,7 @@
# #
import uuid import uuid
from asyncio import Lock, Queue, QueueFull, QueueShutDown from asyncio import Lock, Queue, QueueFull
from dataclasses import dataclass
from typing import Any from typing import Any
from ahriman.core.log import LazyLogging from ahriman.core.log import LazyLogging
@@ -30,22 +29,6 @@ from ahriman.models.event import EventType
SSEvent = tuple[str, dict[str, Any]] SSEvent = tuple[str, dict[str, Any]]
@dataclass(frozen=True)
class _Subscription:
"""
internal event bus subscription record
Attributes:
topics(list[EventType] | None): event type filter, ``None`` means all
object_id(str | None): object identifier filter, ``None`` means all
queue(Queue[SSEvent]): per-subscriber event queue
"""
topics: list[EventType] | None
object_id: str | None
queue: Queue[SSEvent]
class EventBus(LazyLogging): class EventBus(LazyLogging):
""" """
event bus implementation event bus implementation
@@ -62,7 +45,7 @@ class EventBus(LazyLogging):
self.max_size = max_size self.max_size = max_size
self._lock = Lock() self._lock = Lock()
self._subscribers: dict[str, _Subscription] = {} self._subscribers: dict[str, tuple[list[EventType] | None, str | None, Queue[SSEvent | None]]] = {}
async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None: async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None:
""" """
@@ -77,31 +60,30 @@ class EventBus(LazyLogging):
event.update(kwargs) event.update(kwargs)
async with self._lock: async with self._lock:
snapshot = list(self._subscribers.items()) for subscriber_id, (topics, filter_object_id, queue) in self._subscribers.items():
if topics is not None and event_type not in topics:
for subscriber_id, subscription in snapshot: continue
if subscription.topics is not None and event_type not in subscription.topics: if filter_object_id is not None and object_id != filter_object_id:
continue continue
if subscription.object_id is not None and object_id != subscription.object_id: try:
continue queue.put_nowait((event_type, event))
except QueueFull:
try: self.logger.warning("discard message to slow subscriber %s", subscriber_id)
subscription.queue.put_nowait((event_type, event))
except QueueFull:
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
except QueueShutDown:
pass
async def shutdown(self) -> None: async def shutdown(self) -> None:
""" """
gracefully shutdown all subscribers gracefully shutdown all subscribers
""" """
async with self._lock: async with self._lock:
for subscription in self._subscribers.values(): for _, _, queue in self._subscribers.values():
subscription.queue.shutdown() try:
queue.put_nowait(None)
except QueueFull:
pass
queue.shutdown()
async def subscribe(self, topics: list[EventType] | None = None, async def subscribe(self, topics: list[EventType] | None = None,
object_id: str | None = None) -> tuple[str, Queue[SSEvent]]: object_id: str | None = None) -> tuple[str, Queue[SSEvent | None]]:
""" """
register new subscriber register new subscriber
@@ -112,13 +94,13 @@ class EventBus(LazyLogging):
events for all objects will be delivered (Default value = None) events for all objects will be delivered (Default value = None)
Returns: Returns:
tuple[str, Queue[SSEvent]]: subscriber identifier and associated queue tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue
""" """
subscriber_id = str(uuid.uuid4()) subscriber_id = str(uuid.uuid4())
queue: Queue[SSEvent] = Queue(self.max_size) queue: Queue[SSEvent | None] = Queue(self.max_size)
async with self._lock: async with self._lock:
self._subscribers[subscriber_id] = _Subscription(topics=topics, object_id=object_id, queue=queue) self._subscribers[subscriber_id] = (topics, object_id, queue)
return subscriber_id, queue return subscriber_id, queue
@@ -130,6 +112,7 @@ class EventBus(LazyLogging):
subscriber_id(str): subscriber unique identifier subscriber_id(str): subscriber unique identifier
""" """
async with self._lock: async with self._lock:
subscription = self._subscribers.pop(subscriber_id, None) result = self._subscribers.pop(subscriber_id, None)
if subscription is not None: if result is not None:
subscription.queue.shutdown() _, _, queue = result
queue.shutdown()
@@ -44,21 +44,23 @@ class EventBusView(BaseView):
ROUTES = ["/api/v1/events/stream"] ROUTES = ["/api/v1/events/stream"]
@staticmethod @staticmethod
async def _run(response: EventSourceResponse, queue: Queue[SSEvent]) -> None: async def _run(response: EventSourceResponse, queue: Queue[SSEvent | None]) -> None:
""" """
read events from queue and send them to the client read events from queue and send them to the client
Args: Args:
response(EventSourceResponse): SSE response instance response(EventSourceResponse): SSE response instance
queue(Queue[SSEvent]): subscriber queue queue(Queue[SSEvent | None]): subscriber queue
""" """
while response.is_connected(): while response.is_connected():
try: try:
event_type, data = await wait_for(queue.get(), timeout=response.ping_interval) message = await wait_for(queue.get(), timeout=response.ping_interval)
except TimeoutError: except TimeoutError:
continue continue
except QueueShutDown:
break if message is None:
break # terminate queue on sentinel event
event_type, data = message
await response.send(json.dumps(data), event=event_type) await response.send(json.dumps(data), event=event_type)
+7 -17
View File
@@ -1,7 +1,5 @@
import pytest import pytest
from asyncio import QueueShutDown
from ahriman.core.status.event_bus import EventBus from ahriman.core.status.event_bus import EventBus
from ahriman.models.event import EventType from ahriman.models.event import EventType
from ahriman.models.package import Package from ahriman.models.package import Package
@@ -51,25 +49,15 @@ async def test_broadcast_queue_full(event_bus: EventBus, package_ahriman: Packag
assert queue.qsize() == 1 assert queue.qsize() == 1
async def test_broadcast_queue_shutdown(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must skip subscriber whose queue was shutdown concurrently
"""
_, queue = await event_bus.subscribe()
queue.shutdown()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
async def test_shutdown(event_bus: EventBus) -> None: async def test_shutdown(event_bus: EventBus) -> None:
""" """
must shutdown all subscriber queues on shutdown must send sentinel to all subscribers on shutdown
""" """
_, queue = await event_bus.subscribe() _, queue = await event_bus.subscribe()
await event_bus.shutdown() await event_bus.shutdown()
with pytest.raises(QueueShutDown): message = queue.get_nowait()
queue.get_nowait() assert message is None
async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None: async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
@@ -117,7 +105,8 @@ async def test_subscribe_with_topics(event_bus: EventBus) -> None:
must register subscriber with topic filter must register subscriber with topic filter
""" """
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog]) subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog])
assert event_bus._subscribers[subscriber_id].topics == [EventType.BuildLog] topics, _, _ = event_bus._subscribers[subscriber_id]
assert topics == [EventType.BuildLog]
async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None: async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None:
@@ -125,7 +114,8 @@ async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Pa
must register subscriber with object_id filter must register subscriber with object_id filter
""" """
subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base) subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base)
assert event_bus._subscribers[subscriber_id].object_id == package_ahriman.base _, object_id, _ = event_bus._subscribers[subscriber_id]
assert object_id == package_ahriman.base
async def test_unsubscribe(event_bus: EventBus) -> None: async def test_unsubscribe(event_bus: EventBus) -> None:
@@ -53,7 +53,7 @@ async def test_run_timeout() -> None:
async def _shutdown() -> None: async def _shutdown() -> None:
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
queue.shutdown() await queue.put(None)
response = AsyncMock() response = AsyncMock()
response.is_connected = lambda: True response.is_connected = lambda: True