Compare commits

..

8 Commits

Author SHA1 Message Date
arcanis b8a75c110d install missing pacakge 2026-05-04 16:10:35 +03:00
arcanis a713c67948 zz 2026-04-27 13:01:56 +03:00
arcanis 4934205b9e fix typo 2026-04-27 13:01:56 +03:00
arcanis 97c121731b update configs 2026-04-27 13:01:56 +03:00
arcanis 578bb797ef review fixes 2026-04-27 13:01:56 +03:00
arcanis 8623b26a04 docs update 2026-04-27 13:01:56 +03:00
arcanis 0903378a64 update tests 2026-04-27 13:01:56 +03:00
arcanis 47cc99292f event bus implementation 2026-04-27 13:01:56 +03:00
8 changed files with 45 additions and 80 deletions
-6
View File
@@ -26,10 +26,6 @@ jobs:
- uses: docker/setup-buildx-action@v3 - uses: docker/setup-buildx-action@v3
- name: Set image date
id: args
run: echo "::set-output name=date::$(date -d yesterday +'%Y-%m-%d')"
- name: Login to docker hub - name: Login to docker hub
uses: docker/login-action@v3 uses: docker/login-action@v3
with: with:
@@ -57,8 +53,6 @@ jobs:
- name: Build an image and push - name: Build an image and push
uses: docker/build-push-action@v6 uses: docker/build-push-action@v6
with: with:
build-args: |
BUILD_DATE=${{ steps.args.outputs.date }}
file: docker/Dockerfile file: docker/Dockerfile
push: true push: true
tags: ${{ steps.meta.outputs.tags }} tags: ${{ steps.meta.outputs.tags }}
+1 -1
View File
@@ -3,7 +3,7 @@ version: 2
build: build:
os: ubuntu-lts-latest os: ubuntu-lts-latest
tools: tools:
python: "3.13" python: "3.12"
apt_packages: apt_packages:
- graphviz - graphviz
+3 -5
View File
@@ -1,15 +1,13 @@
# build image # build image
FROM archlinux:base AS build FROM archlinux:base AS build
ARG BUILD_DATE
# install environment # install environment
## create build user ## create build user
RUN useradd -m -d "/home/build" -s "/usr/bin/nologin" build RUN useradd -m -d "/home/build" -s "/usr/bin/nologin" build
## extract container creation date and set mirror for this timestamp, set PKGEXT and refresh database next ## extract container creation date and set mirror for this timestamp, set PKGEXT and refresh database next
RUN echo "Server = https://archive.archlinux.org/repos/${BUILD_DATE//-/\/}/\$repo/os/\$arch" > "/etc/pacman.d/mirrorlist" && \ RUN echo "Server = https://archive.archlinux.org/repos/$(stat -c "%y" "/var/lib/pacman" | cut -d " " -f 1 | sed "s,-,/,g")/\$repo/os/\$arch" > "/etc/pacman.d/mirrorlist" && \
pacman -Syyuu --noconfirm pacman -Sy
## setup package cache ## setup package cache
RUN runuser -u build -- mkdir "/tmp/pkg" && \ RUN runuser -u build -- mkdir "/tmp/pkg" && \
echo "PKGDEST=/tmp/pkg" >> "/etc/makepkg.conf" && \ echo "PKGDEST=/tmp/pkg" >> "/etc/makepkg.conf" && \
@@ -110,7 +108,7 @@ RUN cp "/etc/pacman.d/mirrorlist" "/etc/pacman.d/mirrorlist.orig" && \
echo "Server = file:///var/cache/pacman/pkg" > "/etc/pacman.d/mirrorlist" && \ echo "Server = file:///var/cache/pacman/pkg" > "/etc/pacman.d/mirrorlist" && \
cp "/etc/pacman.conf" "/etc/pacman.conf.orig" && \ cp "/etc/pacman.conf" "/etc/pacman.conf.orig" && \
sed -i "s/SigLevel *=.*/SigLevel = Optional/g" "/etc/pacman.conf" && \ sed -i "s/SigLevel *=.*/SigLevel = Optional/g" "/etc/pacman.conf" && \
pacman -Syyuu --noconfirm pacman -Sy
## install package and its optional dependencies ## install package and its optional dependencies
RUN pacman -S --noconfirm ahriman RUN pacman -S --noconfirm ahriman
RUN pacman -S --noconfirm --asdeps \ RUN pacman -S --noconfirm --asdeps \
+1 -3
View File
@@ -7,10 +7,8 @@ for PACKAGE in "$@"; do
# clone the remote source # clone the remote source
git clone https://aur.archlinux.org/"$PACKAGE".git "$BUILD_DIR" git clone https://aur.archlinux.org/"$PACKAGE".git "$BUILD_DIR"
cd "$BUILD_DIR" cd "$BUILD_DIR"
# FIXME monkey patch PKGBUILD for python
sed -i 's/python -m build/python -m build --skip-dependency-check/g' "PKGBUILD"
# checkout to the image date # checkout to the image date
git checkout "$(git rev-list -1 --before="$BUILD_DATE" master)" git checkout "$(git rev-list -1 --before="$(stat -c "%y" "/var/lib/pacman" | cut -d " " -f 1)" master)"
# build and install the package # build and install the package
makepkg --nocheck --noconfirm --install --rmdeps --syncdeps makepkg --nocheck --noconfirm --install --rmdeps --syncdeps
cd / cd /
+25 -42
View File
@@ -19,8 +19,7 @@
# #
import uuid import uuid
from asyncio import Lock, Queue, QueueFull, QueueShutDown from asyncio import Lock, Queue, QueueFull
from dataclasses import dataclass
from typing import Any from typing import Any
from ahriman.core.log import LazyLogging from ahriman.core.log import LazyLogging
@@ -30,22 +29,6 @@ from ahriman.models.event import EventType
SSEvent = tuple[str, dict[str, Any]] SSEvent = tuple[str, dict[str, Any]]
@dataclass(frozen=True)
class _Subscription:
"""
internal event bus subscription record
Attributes:
topics(list[EventType] | None): event type filter, ``None`` means all
object_id(str | None): object identifier filter, ``None`` means all
queue(Queue[SSEvent]): per-subscriber event queue
"""
topics: list[EventType] | None
object_id: str | None
queue: Queue[SSEvent]
class EventBus(LazyLogging): class EventBus(LazyLogging):
""" """
event bus implementation event bus implementation
@@ -62,7 +45,7 @@ class EventBus(LazyLogging):
self.max_size = max_size self.max_size = max_size
self._lock = Lock() self._lock = Lock()
self._subscribers: dict[str, _Subscription] = {} self._subscribers: dict[str, tuple[list[EventType] | None, str | None, Queue[SSEvent | None]]] = {}
async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None: async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None:
""" """
@@ -77,31 +60,30 @@ class EventBus(LazyLogging):
event.update(kwargs) event.update(kwargs)
async with self._lock: async with self._lock:
snapshot = list(self._subscribers.items()) for subscriber_id, (topics, filter_object_id, queue) in self._subscribers.items():
if topics is not None and event_type not in topics:
for subscriber_id, subscription in snapshot: continue
if subscription.topics is not None and event_type not in subscription.topics: if filter_object_id is not None and object_id != filter_object_id:
continue continue
if subscription.object_id is not None and object_id != subscription.object_id: try:
continue queue.put_nowait((event_type, event))
except QueueFull:
try: self.logger.warning("discard message to slow subscriber %s", subscriber_id)
subscription.queue.put_nowait((event_type, event))
except QueueFull:
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
except QueueShutDown:
pass
async def shutdown(self) -> None: async def shutdown(self) -> None:
""" """
gracefully shutdown all subscribers gracefully shutdown all subscribers
""" """
async with self._lock: async with self._lock:
for subscription in self._subscribers.values(): for _, _, queue in self._subscribers.values():
subscription.queue.shutdown() try:
queue.put_nowait(None)
except QueueFull:
pass
queue.shutdown()
async def subscribe(self, topics: list[EventType] | None = None, async def subscribe(self, topics: list[EventType] | None = None,
object_id: str | None = None) -> tuple[str, Queue[SSEvent]]: object_id: str | None = None) -> tuple[str, Queue[SSEvent | None]]:
""" """
register new subscriber register new subscriber
@@ -112,13 +94,13 @@ class EventBus(LazyLogging):
events for all objects will be delivered (Default value = None) events for all objects will be delivered (Default value = None)
Returns: Returns:
tuple[str, Queue[SSEvent]]: subscriber identifier and associated queue tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue
""" """
subscriber_id = str(uuid.uuid4()) subscriber_id = str(uuid.uuid4())
queue: Queue[SSEvent] = Queue(self.max_size) queue: Queue[SSEvent | None] = Queue(self.max_size)
async with self._lock: async with self._lock:
self._subscribers[subscriber_id] = _Subscription(topics=topics, object_id=object_id, queue=queue) self._subscribers[subscriber_id] = (topics, object_id, queue)
return subscriber_id, queue return subscriber_id, queue
@@ -130,6 +112,7 @@ class EventBus(LazyLogging):
subscriber_id(str): subscriber unique identifier subscriber_id(str): subscriber unique identifier
""" """
async with self._lock: async with self._lock:
subscription = self._subscribers.pop(subscriber_id, None) result = self._subscribers.pop(subscriber_id, None)
if subscription is not None: if result is not None:
subscription.queue.shutdown() _, _, queue = result
queue.shutdown()
@@ -44,21 +44,23 @@ class EventBusView(BaseView):
ROUTES = ["/api/v1/events/stream"] ROUTES = ["/api/v1/events/stream"]
@staticmethod @staticmethod
async def _run(response: EventSourceResponse, queue: Queue[SSEvent]) -> None: async def _run(response: EventSourceResponse, queue: Queue[SSEvent | None]) -> None:
""" """
read events from queue and send them to the client read events from queue and send them to the client
Args: Args:
response(EventSourceResponse): SSE response instance response(EventSourceResponse): SSE response instance
queue(Queue[SSEvent]): subscriber queue queue(Queue[SSEvent | None]): subscriber queue
""" """
while response.is_connected(): while response.is_connected():
try: try:
event_type, data = await wait_for(queue.get(), timeout=response.ping_interval) message = await wait_for(queue.get(), timeout=response.ping_interval)
except TimeoutError: except TimeoutError:
continue continue
except QueueShutDown:
break if message is None:
break # terminate queue on sentinel event
event_type, data = message
await response.send(json.dumps(data), event=event_type) await response.send(json.dumps(data), event=event_type)
+7 -17
View File
@@ -1,7 +1,5 @@
import pytest import pytest
from asyncio import QueueShutDown
from ahriman.core.status.event_bus import EventBus from ahriman.core.status.event_bus import EventBus
from ahriman.models.event import EventType from ahriman.models.event import EventType
from ahriman.models.package import Package from ahriman.models.package import Package
@@ -51,25 +49,15 @@ async def test_broadcast_queue_full(event_bus: EventBus, package_ahriman: Packag
assert queue.qsize() == 1 assert queue.qsize() == 1
async def test_broadcast_queue_shutdown(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must skip subscriber whose queue was shutdown concurrently
"""
_, queue = await event_bus.subscribe()
queue.shutdown()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
async def test_shutdown(event_bus: EventBus) -> None: async def test_shutdown(event_bus: EventBus) -> None:
""" """
must shutdown all subscriber queues on shutdown must send sentinel to all subscribers on shutdown
""" """
_, queue = await event_bus.subscribe() _, queue = await event_bus.subscribe()
await event_bus.shutdown() await event_bus.shutdown()
with pytest.raises(QueueShutDown): message = queue.get_nowait()
queue.get_nowait() assert message is None
async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None: async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
@@ -117,7 +105,8 @@ async def test_subscribe_with_topics(event_bus: EventBus) -> None:
must register subscriber with topic filter must register subscriber with topic filter
""" """
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog]) subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog])
assert event_bus._subscribers[subscriber_id].topics == [EventType.BuildLog] topics, _, _ = event_bus._subscribers[subscriber_id]
assert topics == [EventType.BuildLog]
async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None: async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None:
@@ -125,7 +114,8 @@ async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Pa
must register subscriber with object_id filter must register subscriber with object_id filter
""" """
subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base) subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base)
assert event_bus._subscribers[subscriber_id].object_id == package_ahriman.base _, object_id, _ = event_bus._subscribers[subscriber_id]
assert object_id == package_ahriman.base
async def test_unsubscribe(event_bus: EventBus) -> None: async def test_unsubscribe(event_bus: EventBus) -> None:
@@ -53,7 +53,7 @@ async def test_run_timeout() -> None:
async def _shutdown() -> None: async def _shutdown() -> None:
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
queue.shutdown() await queue.put(None)
response = AsyncMock() response = AsyncMock()
response.is_connected = lambda: True response.is_connected = lambda: True