Compare commits

..

7 Commits

Author SHA1 Message Date
arcanis a713c67948 zz 2026-04-27 13:01:56 +03:00
arcanis 4934205b9e fix typo 2026-04-27 13:01:56 +03:00
arcanis 97c121731b update configs 2026-04-27 13:01:56 +03:00
arcanis 578bb797ef review fixes 2026-04-27 13:01:56 +03:00
arcanis 8623b26a04 docs update 2026-04-27 13:01:56 +03:00
arcanis 0903378a64 update tests 2026-04-27 13:01:56 +03:00
arcanis 47cc99292f event bus implementation 2026-04-27 13:01:56 +03:00
9 changed files with 50 additions and 168 deletions
-6
View File
@@ -26,10 +26,6 @@ jobs:
- uses: docker/setup-buildx-action@v3
- name: Set image date
id: args
run: echo "::set-output name=date::$(date -d yesterday +'%Y-%m-%d')"
- name: Login to docker hub
uses: docker/login-action@v3
with:
@@ -57,8 +53,6 @@ jobs:
- name: Build an image and push
uses: docker/build-push-action@v6
with:
build-args: |
BUILD_DATE=${{ steps.args.outputs.date }}
file: docker/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
+1 -1
View File
@@ -16,7 +16,7 @@ pacman -S --noconfirm --asdeps base-devel python-build python-flit python-instal
# optional dependencies
if [[ -z $MINIMAL_INSTALL ]]; then
# web server
pacman -S --noconfirm python-aioauth-client python-aiohttp python-aiohttp-apispec-git python-aiohttp-cors python-aiohttp-jinja2 python-aiohttp-security python-aiohttp-session python-aiohttp-sse-git python-cryptography python-jinja
pacman -S --noconfirm python-aioauth-client python-aiohttp python-aiohttp-apispec-git python-aiohttp-cors python-aiohttp-jinja2 python-aiohttp-security python-aiohttp-session python-cryptography python-jinja
# additional features
pacman -S --noconfirm gnupg ipython python-boto3 python-cerberus python-matplotlib rsync
fi
+1 -1
View File
@@ -3,7 +3,7 @@ version: 2
build:
os: ubuntu-lts-latest
tools:
python: "3.13"
python: "3.12"
apt_packages:
- graphviz
+3 -5
View File
@@ -1,15 +1,13 @@
# build image
FROM archlinux:base AS build
ARG BUILD_DATE
# install environment
## create build user
RUN useradd -m -d "/home/build" -s "/usr/bin/nologin" build
## extract container creation date and set mirror for this timestamp, set PKGEXT and refresh database next
RUN echo "Server = https://archive.archlinux.org/repos/${BUILD_DATE//-/\/}/\$repo/os/\$arch" > "/etc/pacman.d/mirrorlist" && \
pacman -Syyuu --noconfirm
RUN echo "Server = https://archive.archlinux.org/repos/$(stat -c "%y" "/var/lib/pacman" | cut -d " " -f 1 | sed "s,-,/,g")/\$repo/os/\$arch" > "/etc/pacman.d/mirrorlist" && \
pacman -Sy
## setup package cache
RUN runuser -u build -- mkdir "/tmp/pkg" && \
echo "PKGDEST=/tmp/pkg" >> "/etc/makepkg.conf" && \
@@ -110,7 +108,7 @@ RUN cp "/etc/pacman.d/mirrorlist" "/etc/pacman.d/mirrorlist.orig" && \
echo "Server = file:///var/cache/pacman/pkg" > "/etc/pacman.d/mirrorlist" && \
cp "/etc/pacman.conf" "/etc/pacman.conf.orig" && \
sed -i "s/SigLevel *=.*/SigLevel = Optional/g" "/etc/pacman.conf" && \
pacman -Syyuu --noconfirm
pacman -Sy
## install package and its optional dependencies
RUN pacman -S --noconfirm ahriman
RUN pacman -S --noconfirm --asdeps \
+1 -3
View File
@@ -7,10 +7,8 @@ for PACKAGE in "$@"; do
# clone the remote source
git clone https://aur.archlinux.org/"$PACKAGE".git "$BUILD_DIR"
cd "$BUILD_DIR"
# FIXME monkey patch PKGBUILD for python
sed -i 's/python -m build/python -m build --skip-dependency-check/g' "PKGBUILD"
# checkout to the image date
git checkout "$(git rev-list -1 --before="$BUILD_DATE" master)"
git checkout "$(git rev-list -1 --before="$(stat -c "%y" "/var/lib/pacman" | cut -d " " -f 1)" master)"
# build and install the package
makepkg --nocheck --noconfirm --install --rmdeps --syncdeps
cd /
+20 -37
View File
@@ -19,8 +19,7 @@
#
import uuid
from asyncio import Lock, Queue, QueueFull, QueueShutDown
from dataclasses import dataclass
from asyncio import Lock, Queue, QueueFull
from typing import Any
from ahriman.core.log import LazyLogging
@@ -30,22 +29,6 @@ from ahriman.models.event import EventType
SSEvent = tuple[str, dict[str, Any]]
@dataclass(frozen=True)
class _Subscription:
"""
internal event bus subscription record
Attributes:
topics(list[EventType] | None): event type filter, ``None`` means all
object_id(str | None): object identifier filter, ``None`` means all
queue(Queue[SSEvent]): per-subscriber event queue
"""
topics: list[EventType] | None
object_id: str | None
queue: Queue[SSEvent]
class EventBus(LazyLogging):
"""
event bus implementation
@@ -62,7 +45,7 @@ class EventBus(LazyLogging):
self.max_size = max_size
self._lock = Lock()
self._subscribers: dict[str, _Subscription] = {}
self._subscribers: dict[str, tuple[list[EventType] | None, str | None, Queue[SSEvent | None]]] = {}
async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None:
"""
@@ -77,31 +60,30 @@ class EventBus(LazyLogging):
event.update(kwargs)
async with self._lock:
snapshot = list(self._subscribers.items())
for subscriber_id, subscription in snapshot:
if subscription.topics is not None and event_type not in subscription.topics:
for subscriber_id, (topics, filter_object_id, queue) in self._subscribers.items():
if topics is not None and event_type not in topics:
continue
if subscription.object_id is not None and object_id != subscription.object_id:
if filter_object_id is not None and object_id != filter_object_id:
continue
try:
subscription.queue.put_nowait((event_type, event))
queue.put_nowait((event_type, event))
except QueueFull:
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
except QueueShutDown:
pass
async def shutdown(self) -> None:
"""
gracefully shutdown all subscribers
"""
async with self._lock:
for subscription in self._subscribers.values():
subscription.queue.shutdown()
for _, _, queue in self._subscribers.values():
try:
queue.put_nowait(None)
except QueueFull:
pass
queue.shutdown()
async def subscribe(self, topics: list[EventType] | None = None,
object_id: str | None = None) -> tuple[str, Queue[SSEvent]]:
object_id: str | None = None) -> tuple[str, Queue[SSEvent | None]]:
"""
register new subscriber
@@ -112,13 +94,13 @@ class EventBus(LazyLogging):
events for all objects will be delivered (Default value = None)
Returns:
tuple[str, Queue[SSEvent]]: subscriber identifier and associated queue
tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue
"""
subscriber_id = str(uuid.uuid4())
queue: Queue[SSEvent] = Queue(self.max_size)
queue: Queue[SSEvent | None] = Queue(self.max_size)
async with self._lock:
self._subscribers[subscriber_id] = _Subscription(topics=topics, object_id=object_id, queue=queue)
self._subscribers[subscriber_id] = (topics, object_id, queue)
return subscriber_id, queue
@@ -130,6 +112,7 @@ class EventBus(LazyLogging):
subscriber_id(str): subscriber unique identifier
"""
async with self._lock:
subscription = self._subscribers.pop(subscriber_id, None)
if subscription is not None:
subscription.queue.shutdown()
result = self._subscribers.pop(subscriber_id, None)
if result is not None:
_, _, queue = result
queue.shutdown()
+11 -45
View File
@@ -19,7 +19,7 @@
#
import json
from aiohttp.web import HTTPBadRequest, Request, StreamResponse
from aiohttp.web import HTTPBadRequest, StreamResponse
from aiohttp_sse import EventSourceResponse, sse_response
from asyncio import Queue, QueueShutDown, wait_for
from typing import ClassVar
@@ -37,64 +37,30 @@ class EventBusView(BaseView):
event bus SSE view
Attributes:
READ_EVENTS(set[EventType]): (class attribute) events which are allowed for read-only users
GET_PERMISSION(UserAccess): (class attribute) get permissions of self
"""
READ_EVENTS: ClassVar[set[EventType]] = {
EventType.PackageHeld,
EventType.PackageOutdated,
EventType.PackageRemoved,
EventType.PackageStatusChanged,
EventType.PackageUpdateFailed,
EventType.PackageUpdated,
EventType.ServiceStatusChanged,
}
GET_PERMISSION: ClassVar[UserAccess] = UserAccess.Full
ROUTES = ["/api/v1/events/stream"]
@classmethod
async def get_permission(cls, request: Request) -> UserAccess:
"""
retrieve user permission from the request
Args:
request(Request): request object
Returns:
UserAccess: extracted permission
"""
if request.method.upper() not in ("GET", "HEAD"):
return await BaseView.get_permission(request)
permission = UserAccess.Full
event_filter = request.query.getall("event", []) if request.query is not None else []
if event_filter:
try:
topics = {EventType(event) for event in event_filter}
except ValueError:
pass
else:
if topics.issubset(cls.READ_EVENTS):
permission = UserAccess.Read
return permission
@staticmethod
async def _run(response: EventSourceResponse, queue: Queue[SSEvent]) -> None:
async def _run(response: EventSourceResponse, queue: Queue[SSEvent | None]) -> None:
"""
read events from queue and send them to the client
Args:
response(EventSourceResponse): SSE response instance
queue(Queue[SSEvent]): subscriber queue
queue(Queue[SSEvent | None]): subscriber queue
"""
while response.is_connected():
try:
event_type, data = await wait_for(queue.get(), timeout=response.ping_interval)
message = await wait_for(queue.get(), timeout=response.ping_interval)
except TimeoutError:
continue
except QueueShutDown:
break
if message is None:
break # terminate queue on sentinel event
event_type, data = message
await response.send(json.dumps(data), event=event_type)
@@ -102,7 +68,7 @@ class EventBusView(BaseView):
tags=["Audit log"],
summary="Live updates",
description="Stream live updates via SSE",
permission=UserAccess.Full,
permission=GET_PERMISSION,
error_400_enabled=True,
error_404_description="Repository is unknown",
schema=SSESchema(many=True),
+7 -17
View File
@@ -1,7 +1,5 @@
import pytest
from asyncio import QueueShutDown
from ahriman.core.status.event_bus import EventBus
from ahriman.models.event import EventType
from ahriman.models.package import Package
@@ -51,25 +49,15 @@ async def test_broadcast_queue_full(event_bus: EventBus, package_ahriman: Packag
assert queue.qsize() == 1
async def test_broadcast_queue_shutdown(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must skip subscriber whose queue was shutdown concurrently
"""
_, queue = await event_bus.subscribe()
queue.shutdown()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
async def test_shutdown(event_bus: EventBus) -> None:
"""
must shutdown all subscriber queues on shutdown
must send sentinel to all subscribers on shutdown
"""
_, queue = await event_bus.subscribe()
await event_bus.shutdown()
with pytest.raises(QueueShutDown):
queue.get_nowait()
message = queue.get_nowait()
assert message is None
async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
@@ -117,7 +105,8 @@ async def test_subscribe_with_topics(event_bus: EventBus) -> None:
must register subscriber with topic filter
"""
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog])
assert event_bus._subscribers[subscriber_id].topics == [EventType.BuildLog]
topics, _, _ = event_bus._subscribers[subscriber_id]
assert topics == [EventType.BuildLog]
async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None:
@@ -125,7 +114,8 @@ async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Pa
must register subscriber with object_id filter
"""
subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base)
assert event_bus._subscribers[subscriber_id].object_id == package_ahriman.base
_, object_id, _ = event_bus._subscribers[subscriber_id]
assert object_id == package_ahriman.base
async def test_unsubscribe(event_bus: EventBus) -> None:
@@ -3,7 +3,6 @@ import pytest
from aiohttp.test_utils import TestClient
from asyncio import Queue
from multidict import MultiDict
from pytest_mock import MockerFixture
from unittest.mock import AsyncMock
@@ -12,7 +11,6 @@ from ahriman.models.event import EventType
from ahriman.models.package import Package
from ahriman.models.user_access import UserAccess
from ahriman.web.keys import WatcherKey
from ahriman.web.views.base import BaseView
from ahriman.web.views.v1.auditlog.event_bus import EventBusView
@@ -40,51 +38,6 @@ async def test_get_permission() -> None:
assert await EventBusView.get_permission(request) == UserAccess.Full
async def test_get_permission_build_log() -> None:
"""
must return full permission for build log stream
"""
request = pytest.helpers.request("", "", "GET", params=MultiDict(event=EventType.BuildLog))
assert await EventBusView.get_permission(request) == UserAccess.Full
async def test_get_permission_build_log_with_read_events() -> None:
"""
must return full permission for mixed build log and read event stream
"""
request = pytest.helpers.request("", "", "GET", params=MultiDict([
("event", EventType.BuildLog),
("event", EventType.PackageUpdated),
]))
assert await EventBusView.get_permission(request) == UserAccess.Full
async def test_get_permission_invalid_event() -> None:
"""
must return full permission for invalid event type
"""
request = pytest.helpers.request("", "", "GET", params=MultiDict(event="invalid"))
assert await EventBusView.get_permission(request) == UserAccess.Full
async def test_get_permission_post() -> None:
"""
must use default permission for non-get requests
"""
request = pytest.helpers.request("", "", "POST", params=MultiDict(event=EventType.PackageUpdated))
assert await EventBusView.get_permission(request) == await BaseView.get_permission(request)
async def test_get_permission_read_events() -> None:
"""
must return read permission for package and status streams
"""
request = pytest.helpers.request("", "", "GET", params=MultiDict(
("event", event_type) for event_type in EventBusView.READ_EVENTS
))
assert await EventBusView.get_permission(request) == UserAccess.Read
def test_routes() -> None:
"""
must return correct routes
@@ -100,7 +53,7 @@ async def test_run_timeout() -> None:
async def _shutdown() -> None:
await asyncio.sleep(0.05)
queue.shutdown()
await queue.put(None)
response = AsyncMock()
response.is_connected = lambda: True