Compare commits

..

3 Commits

Author SHA1 Message Date
arcanis fa9fa73078 docs: bump python version to 2.13 2026-05-08 10:25:24 +03:00
arcanis 3e1e24cb50 feat: SSE support (#162)
* event bus implementation

* update tests

* docs update

* review fixes

* update configs

* fix typo

* frontend changes

* install missing pacakge

* queue processing simplification
2026-05-08 10:20:03 +03:00
arcanis 18fe38c30b build: use build_date argument for docker image instead of guessing from pacman root 2026-05-05 13:45:42 +03:00
8 changed files with 80 additions and 45 deletions
+6
View File
@@ -26,6 +26,10 @@ jobs:
- uses: docker/setup-buildx-action@v3 - uses: docker/setup-buildx-action@v3
- name: Set image date
id: args
run: echo "::set-output name=date::$(date -d yesterday +'%Y-%m-%d')"
- name: Login to docker hub - name: Login to docker hub
uses: docker/login-action@v3 uses: docker/login-action@v3
with: with:
@@ -53,6 +57,8 @@ jobs:
- name: Build an image and push - name: Build an image and push
uses: docker/build-push-action@v6 uses: docker/build-push-action@v6
with: with:
build-args: |
BUILD_DATE=${{ steps.args.outputs.date }}
file: docker/Dockerfile file: docker/Dockerfile
push: true push: true
tags: ${{ steps.meta.outputs.tags }} tags: ${{ steps.meta.outputs.tags }}
+1 -1
View File
@@ -3,7 +3,7 @@ version: 2
build: build:
os: ubuntu-lts-latest os: ubuntu-lts-latest
tools: tools:
python: "3.12" python: "3.13"
apt_packages: apt_packages:
- graphviz - graphviz
+5 -3
View File
@@ -1,13 +1,15 @@
# build image # build image
FROM archlinux:base AS build FROM archlinux:base AS build
ARG BUILD_DATE
# install environment # install environment
## create build user ## create build user
RUN useradd -m -d "/home/build" -s "/usr/bin/nologin" build RUN useradd -m -d "/home/build" -s "/usr/bin/nologin" build
## extract container creation date and set mirror for this timestamp, set PKGEXT and refresh database next ## extract container creation date and set mirror for this timestamp, set PKGEXT and refresh database next
RUN echo "Server = https://archive.archlinux.org/repos/$(stat -c "%y" "/var/lib/pacman" | cut -d " " -f 1 | sed "s,-,/,g")/\$repo/os/\$arch" > "/etc/pacman.d/mirrorlist" && \ RUN echo "Server = https://archive.archlinux.org/repos/${BUILD_DATE//-/\/}/\$repo/os/\$arch" > "/etc/pacman.d/mirrorlist" && \
pacman -Sy pacman -Syyuu --noconfirm
## setup package cache ## setup package cache
RUN runuser -u build -- mkdir "/tmp/pkg" && \ RUN runuser -u build -- mkdir "/tmp/pkg" && \
echo "PKGDEST=/tmp/pkg" >> "/etc/makepkg.conf" && \ echo "PKGDEST=/tmp/pkg" >> "/etc/makepkg.conf" && \
@@ -108,7 +110,7 @@ RUN cp "/etc/pacman.d/mirrorlist" "/etc/pacman.d/mirrorlist.orig" && \
echo "Server = file:///var/cache/pacman/pkg" > "/etc/pacman.d/mirrorlist" && \ echo "Server = file:///var/cache/pacman/pkg" > "/etc/pacman.d/mirrorlist" && \
cp "/etc/pacman.conf" "/etc/pacman.conf.orig" && \ cp "/etc/pacman.conf" "/etc/pacman.conf.orig" && \
sed -i "s/SigLevel *=.*/SigLevel = Optional/g" "/etc/pacman.conf" && \ sed -i "s/SigLevel *=.*/SigLevel = Optional/g" "/etc/pacman.conf" && \
pacman -Sy pacman -Syyuu --noconfirm
## install package and its optional dependencies ## install package and its optional dependencies
RUN pacman -S --noconfirm ahriman RUN pacman -S --noconfirm ahriman
RUN pacman -S --noconfirm --asdeps \ RUN pacman -S --noconfirm --asdeps \
+3 -1
View File
@@ -7,8 +7,10 @@ for PACKAGE in "$@"; do
# clone the remote source # clone the remote source
git clone https://aur.archlinux.org/"$PACKAGE".git "$BUILD_DIR" git clone https://aur.archlinux.org/"$PACKAGE".git "$BUILD_DIR"
cd "$BUILD_DIR" cd "$BUILD_DIR"
# FIXME monkey patch PKGBUILD for python
sed -i 's/python -m build/python -m build --skip-dependency-check/g' "PKGBUILD"
# checkout to the image date # checkout to the image date
git checkout "$(git rev-list -1 --before="$(stat -c "%y" "/var/lib/pacman" | cut -d " " -f 1)" master)" git checkout "$(git rev-list -1 --before="$BUILD_DATE" master)"
# build and install the package # build and install the package
makepkg --nocheck --noconfirm --install --rmdeps --syncdeps makepkg --nocheck --noconfirm --install --rmdeps --syncdeps
cd / cd /
+42 -25
View File
@@ -19,7 +19,8 @@
# #
import uuid import uuid
from asyncio import Lock, Queue, QueueFull from asyncio import Lock, Queue, QueueFull, QueueShutDown
from dataclasses import dataclass
from typing import Any from typing import Any
from ahriman.core.log import LazyLogging from ahriman.core.log import LazyLogging
@@ -29,6 +30,22 @@ from ahriman.models.event import EventType
SSEvent = tuple[str, dict[str, Any]] SSEvent = tuple[str, dict[str, Any]]
@dataclass(frozen=True)
class _Subscription:
"""
internal event bus subscription record
Attributes:
topics(list[EventType] | None): event type filter, ``None`` means all
object_id(str | None): object identifier filter, ``None`` means all
queue(Queue[SSEvent]): per-subscriber event queue
"""
topics: list[EventType] | None
object_id: str | None
queue: Queue[SSEvent]
class EventBus(LazyLogging): class EventBus(LazyLogging):
""" """
event bus implementation event bus implementation
@@ -45,7 +62,7 @@ class EventBus(LazyLogging):
self.max_size = max_size self.max_size = max_size
self._lock = Lock() self._lock = Lock()
self._subscribers: dict[str, tuple[list[EventType] | None, str | None, Queue[SSEvent | None]]] = {} self._subscribers: dict[str, _Subscription] = {}
async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None: async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None:
""" """
@@ -60,30 +77,31 @@ class EventBus(LazyLogging):
event.update(kwargs) event.update(kwargs)
async with self._lock: async with self._lock:
for subscriber_id, (topics, filter_object_id, queue) in self._subscribers.items(): snapshot = list(self._subscribers.items())
if topics is not None and event_type not in topics:
continue for subscriber_id, subscription in snapshot:
if filter_object_id is not None and object_id != filter_object_id: if subscription.topics is not None and event_type not in subscription.topics:
continue continue
try: if subscription.object_id is not None and object_id != subscription.object_id:
queue.put_nowait((event_type, event)) continue
except QueueFull:
self.logger.warning("discard message to slow subscriber %s", subscriber_id) try:
subscription.queue.put_nowait((event_type, event))
except QueueFull:
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
except QueueShutDown:
pass
async def shutdown(self) -> None: async def shutdown(self) -> None:
""" """
gracefully shutdown all subscribers gracefully shutdown all subscribers
""" """
async with self._lock: async with self._lock:
for _, _, queue in self._subscribers.values(): for subscription in self._subscribers.values():
try: subscription.queue.shutdown()
queue.put_nowait(None)
except QueueFull:
pass
queue.shutdown()
async def subscribe(self, topics: list[EventType] | None = None, async def subscribe(self, topics: list[EventType] | None = None,
object_id: str | None = None) -> tuple[str, Queue[SSEvent | None]]: object_id: str | None = None) -> tuple[str, Queue[SSEvent]]:
""" """
register new subscriber register new subscriber
@@ -94,13 +112,13 @@ class EventBus(LazyLogging):
events for all objects will be delivered (Default value = None) events for all objects will be delivered (Default value = None)
Returns: Returns:
tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue tuple[str, Queue[SSEvent]]: subscriber identifier and associated queue
""" """
subscriber_id = str(uuid.uuid4()) subscriber_id = str(uuid.uuid4())
queue: Queue[SSEvent | None] = Queue(self.max_size) queue: Queue[SSEvent] = Queue(self.max_size)
async with self._lock: async with self._lock:
self._subscribers[subscriber_id] = (topics, object_id, queue) self._subscribers[subscriber_id] = _Subscription(topics=topics, object_id=object_id, queue=queue)
return subscriber_id, queue return subscriber_id, queue
@@ -112,7 +130,6 @@ class EventBus(LazyLogging):
subscriber_id(str): subscriber unique identifier subscriber_id(str): subscriber unique identifier
""" """
async with self._lock: async with self._lock:
result = self._subscribers.pop(subscriber_id, None) subscription = self._subscribers.pop(subscriber_id, None)
if result is not None: if subscription is not None:
_, _, queue = result subscription.queue.shutdown()
queue.shutdown()
@@ -44,23 +44,21 @@ class EventBusView(BaseView):
ROUTES = ["/api/v1/events/stream"] ROUTES = ["/api/v1/events/stream"]
@staticmethod @staticmethod
async def _run(response: EventSourceResponse, queue: Queue[SSEvent | None]) -> None: async def _run(response: EventSourceResponse, queue: Queue[SSEvent]) -> None:
""" """
read events from queue and send them to the client read events from queue and send them to the client
Args: Args:
response(EventSourceResponse): SSE response instance response(EventSourceResponse): SSE response instance
queue(Queue[SSEvent | None]): subscriber queue queue(Queue[SSEvent]): subscriber queue
""" """
while response.is_connected(): while response.is_connected():
try: try:
message = await wait_for(queue.get(), timeout=response.ping_interval) event_type, data = await wait_for(queue.get(), timeout=response.ping_interval)
except TimeoutError: except TimeoutError:
continue continue
except QueueShutDown:
if message is None: break
break # terminate queue on sentinel event
event_type, data = message
await response.send(json.dumps(data), event=event_type) await response.send(json.dumps(data), event=event_type)
+17 -7
View File
@@ -1,5 +1,7 @@
import pytest import pytest
from asyncio import QueueShutDown
from ahriman.core.status.event_bus import EventBus from ahriman.core.status.event_bus import EventBus
from ahriman.models.event import EventType from ahriman.models.event import EventType
from ahriman.models.package import Package from ahriman.models.package import Package
@@ -49,15 +51,25 @@ async def test_broadcast_queue_full(event_bus: EventBus, package_ahriman: Packag
assert queue.qsize() == 1 assert queue.qsize() == 1
async def test_broadcast_queue_shutdown(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must skip subscriber whose queue was shutdown concurrently
"""
_, queue = await event_bus.subscribe()
queue.shutdown()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
async def test_shutdown(event_bus: EventBus) -> None: async def test_shutdown(event_bus: EventBus) -> None:
""" """
must send sentinel to all subscribers on shutdown must shutdown all subscriber queues on shutdown
""" """
_, queue = await event_bus.subscribe() _, queue = await event_bus.subscribe()
await event_bus.shutdown() await event_bus.shutdown()
message = queue.get_nowait() with pytest.raises(QueueShutDown):
assert message is None queue.get_nowait()
async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None: async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
@@ -105,8 +117,7 @@ async def test_subscribe_with_topics(event_bus: EventBus) -> None:
must register subscriber with topic filter must register subscriber with topic filter
""" """
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog]) subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog])
topics, _, _ = event_bus._subscribers[subscriber_id] assert event_bus._subscribers[subscriber_id].topics == [EventType.BuildLog]
assert topics == [EventType.BuildLog]
async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None: async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None:
@@ -114,8 +125,7 @@ async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Pa
must register subscriber with object_id filter must register subscriber with object_id filter
""" """
subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base) subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base)
_, object_id, _ = event_bus._subscribers[subscriber_id] assert event_bus._subscribers[subscriber_id].object_id == package_ahriman.base
assert object_id == package_ahriman.base
async def test_unsubscribe(event_bus: EventBus) -> None: async def test_unsubscribe(event_bus: EventBus) -> None:
@@ -53,7 +53,7 @@ async def test_run_timeout() -> None:
async def _shutdown() -> None: async def _shutdown() -> None:
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
await queue.put(None) queue.shutdown()
response = AsyncMock() response = AsyncMock()
response.is_connected = lambda: True response.is_connected = lambda: True