diff --git a/tests/ahriman/core/status/test_event_bus.py b/tests/ahriman/core/status/test_event_bus.py new file mode 100644 index 00000000..a603c60b --- /dev/null +++ b/tests/ahriman/core/status/test_event_bus.py @@ -0,0 +1,120 @@ +import pytest + +from asyncio import QueueFull + +from ahriman.core.status.event_bus import EventBus +from ahriman.models.event import EventType + + +async def test_broadcast() -> None: + """ + must broadcast event to all general subscribers + """ + event_bus = EventBus(0) + _, queue = await event_bus.subscribe() + + await event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success") + message = queue.get_nowait() + assert message == (EventType.PackageUpdated, {"object_id": "ahriman", "status": "success"}) + + +async def test_broadcast_with_topics() -> None: + """ + must deliver event only to subscribers with matching topics + """ + event_bus = EventBus(0) + _, filtered_queue = await event_bus.subscribe([EventType.PackageUpdated]) + _, wildcard_queue = await event_bus.subscribe() + + await event_bus.broadcast(EventType.PackageUpdated, "ahriman") + assert not filtered_queue.empty() + assert not wildcard_queue.empty() + + +async def test_broadcast_topic_isolation() -> None: + """ + must not deliver event to subscribers with non-matching topics + """ + event_bus = EventBus(0) + _, queue = await event_bus.subscribe([EventType.BuildLog]) + + await event_bus.broadcast(EventType.PackageUpdated, "ahriman") + assert queue.empty() + + +async def test_broadcast_queue_full() -> None: + """ + must discard message to slow subscriber + """ + event_bus = EventBus(1) + _, queue = await event_bus.subscribe() + + await event_bus.broadcast(EventType.PackageUpdated, "ahriman") + await event_bus.broadcast(EventType.PackageRemoved, "ahriman") + assert queue.qsize() == 1 + + +async def test_shutdown() -> None: + """ + must send sentinel to all subscribers on shutdown + """ + event_bus = EventBus(0) + _, queue = await event_bus.subscribe() + + await event_bus.shutdown() + message = queue.get_nowait() + assert message is None + + +async def test_shutdown_queue_full() -> None: + """ + must handle shutdown when queue is full + """ + event_bus = EventBus(1) + _, queue = await event_bus.subscribe() + + await event_bus.broadcast(EventType.PackageUpdated, "ahriman") + await event_bus.shutdown() + # sentinel was not delivered but shutdown still completed + message = queue.get_nowait() + assert message is not None + + +async def test_subscribe() -> None: + """ + must register new subscriber + """ + event_bus = EventBus(0) + subscriber_id, queue = await event_bus.subscribe() + assert subscriber_id + assert queue.empty() + assert subscriber_id in event_bus._subscribers + + +async def test_subscribe_with_topics() -> None: + """ + must register subscriber with topic filter + """ + event_bus = EventBus(0) + subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog, EventType.PackageUpdated]) + topics, _ = event_bus._subscribers[subscriber_id] + assert topics == [EventType.BuildLog, EventType.PackageUpdated] + + +async def test_unsubscribe() -> None: + """ + must remove subscriber + """ + event_bus = EventBus(0) + subscriber_id, _ = await event_bus.subscribe() + + await event_bus.unsubscribe(subscriber_id) + assert subscriber_id not in event_bus._subscribers + + +async def test_unsubscribe_unknown() -> None: + """ + must not fail on unknown subscriber removal + """ + event_bus = EventBus(0) + await event_bus.unsubscribe("unknown") diff --git a/tests/ahriman/core/status/test_watcher.py b/tests/ahriman/core/status/test_watcher.py index f353a722..519129bf 100644 --- a/tests/ahriman/core/status/test_watcher.py +++ b/tests/ahriman/core/status/test_watcher.py @@ -1,21 +1,28 @@ import pytest from pytest_mock import MockerFixture +from unittest.mock import MagicMock from ahriman.core.exceptions import UnknownPackageError from ahriman.core.status.watcher import Watcher from ahriman.models.build_status import BuildStatus, BuildStatusEnum +from ahriman.models.changes import Changes +from ahriman.models.dependencies import Dependencies +from ahriman.models.event import Event +from ahriman.models.log_record import LogRecord +from ahriman.models.log_record_id import LogRecordId from ahriman.models.package import Package +from ahriman.models.pkgbuild_patch import PkgbuildPatch -def test_packages(watcher: Watcher, package_ahriman: Package) -> None: +async def test_packages(watcher: Watcher, package_ahriman: Package) -> None: """ must return list of available packages """ - assert not watcher.packages + assert not await watcher.packages() watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} - assert watcher.packages + assert await watcher.packages() async def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: @@ -45,6 +52,39 @@ async def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: Mo assert status.status == BuildStatusEnum.Success +async def test_event_add(watcher: Watcher, mocker: MockerFixture) -> None: + """ + must create new event + """ + event = Event("event", "object") + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_add") + + await watcher.event_add(event) + cache_mock.assert_called_once_with(event) + + +async def test_event_get(watcher: Watcher, mocker: MockerFixture) -> None: + """ + must retrieve events + """ + event = Event("event", "object") + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_get", return_value=[event]) + + result = await watcher.event_get(None, None) + assert result == [event] + cache_mock.assert_called_once_with(None, None, None, None, -1, 0) + + +async def test_logs_rotate(watcher: Watcher, mocker: MockerFixture) -> None: + """ + must rotate logs + """ + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.logs_rotate") + + await watcher.logs_rotate(10) + cache_mock.assert_called_once_with(10) + + async def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must return package archives from package info @@ -75,6 +115,54 @@ async def test_package_get_failed(watcher: Watcher, package_ahriman: Package) -> await watcher.package_get(package_ahriman.base) +async def test_package_changes_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: + """ + must return package changes + """ + changes = Changes() + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_changes_get", + return_value=changes) + + result = await watcher.package_changes_get(package_ahriman.base) + assert result == changes + cache_mock.assert_called_once_with(package_ahriman.base) + + +async def test_package_changes_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: + """ + must update package changes + """ + changes = Changes() + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_changes_update") + + await watcher.package_changes_update(package_ahriman.base, changes) + cache_mock.assert_called_once_with(package_ahriman.base, changes) + + +async def test_package_dependencies_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: + """ + must return package dependencies + """ + dependencies = Dependencies() + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_dependencies_get", + return_value=dependencies) + + result = await watcher.package_dependencies_get(package_ahriman.base) + assert result == dependencies + cache_mock.assert_called_once_with(package_ahriman.base) + + +async def test_package_dependencies_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: + """ + must update package dependencies + """ + dependencies = Dependencies() + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_dependencies_update") + + await watcher.package_dependencies_update(package_ahriman.base, dependencies) + cache_mock.assert_called_once_with(package_ahriman.base, dependencies) + + async def test_package_hold_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must update package hold status @@ -96,6 +184,74 @@ async def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Pa await watcher.package_hold_update(package_ahriman.base, enabled=True) +async def test_package_logs_add(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: + """ + must post log record + """ + log_record = LogRecord(LogRecordId(package_ahriman.base, "1.0.0"), 42.0, "message") + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_add") + + await watcher.package_logs_add(log_record) + cache_mock.assert_called_once_with(log_record) + + +async def test_package_logs_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: + """ + must return package logs + """ + log_record = LogRecord(LogRecordId(package_ahriman.base, "1.0.0"), 42.0, "message") + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_get", + return_value=[log_record]) + + result = await watcher.package_logs_get(package_ahriman.base) + assert result == [log_record] + cache_mock.assert_called_once_with(package_ahriman.base, None, None, -1, 0) + + +async def test_package_logs_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: + """ + must remove package logs + """ + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_remove") + + await watcher.package_logs_remove(package_ahriman.base, None) + cache_mock.assert_called_once_with(package_ahriman.base, None) + + +async def test_package_patches_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: + """ + must return package patches + """ + patch = PkgbuildPatch("key", "value") + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_get", + return_value=[patch]) + + result = await watcher.package_patches_get(package_ahriman.base, None) + assert result == [patch] + cache_mock.assert_called_once_with(package_ahriman.base, None) + + +async def test_package_patches_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: + """ + must remove package patches + """ + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_remove") + + await watcher.package_patches_remove(package_ahriman.base, None) + cache_mock.assert_called_once_with(package_ahriman.base, None) + + +async def test_package_patches_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: + """ + must update package patches + """ + patch = PkgbuildPatch("key", "value") + cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_update") + + await watcher.package_patches_update(package_ahriman.base, patch) + cache_mock.assert_called_once_with(package_ahriman.base, patch) + + async def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must remove package base @@ -159,7 +315,7 @@ async def test_package_update(watcher: Watcher, package_ahriman: Package, mocker cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_update") await watcher.package_update(package_ahriman, BuildStatusEnum.Unknown) - assert watcher.packages + assert await watcher.packages() cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int)) @@ -176,6 +332,16 @@ async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: assert status.is_held is True +async def test_shutdown(watcher: Watcher) -> None: + """ + must gracefully shutdown watcher + """ + _, queue = await watcher.event_bus.subscribe() + await watcher.shutdown() + message = queue.get_nowait() + assert message is None + + async def test_status_update(watcher: Watcher) -> None: """ must update service status diff --git a/tests/ahriman/web/views/v1/auditlog/test_view_v1_auditlog_event_bus.py b/tests/ahriman/web/views/v1/auditlog/test_view_v1_auditlog_event_bus.py new file mode 100644 index 00000000..f44bc060 --- /dev/null +++ b/tests/ahriman/web/views/v1/auditlog/test_view_v1_auditlog_event_bus.py @@ -0,0 +1,102 @@ +import asyncio + +import pytest + +from aiohttp.test_utils import TestClient +from asyncio import Queue +from pytest_mock import MockerFixture +from unittest.mock import AsyncMock + +from ahriman.core.status.event_bus import SSEvent +from ahriman.models.event import EventType +from ahriman.models.user_access import UserAccess +from ahriman.web.keys import WatcherKey +from ahriman.web.views.v1.auditlog.event_bus import EventBusView + + +async def test_get_permission() -> None: + """ + must return correct permission for the request + """ + request = pytest.helpers.request("", "", "GET") + assert await EventBusView.get_permission(request) == UserAccess.Full + + +def test_routes() -> None: + """ + must return correct routes + """ + assert EventBusView.ROUTES == ["/api/v1/events/stream"] + + +async def test_get(client: TestClient) -> None: + """ + must stream events via SSE + """ + watcher = next(iter(client.app[WatcherKey].values())) + + async def _producer() -> None: + await asyncio.sleep(0.1) + await watcher.event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success") + await asyncio.sleep(0.1) + await watcher.event_bus.shutdown() + + asyncio.create_task(_producer()) + + response = await client.get("/api/v1/events/stream") + assert response.status == 200 + + body = await response.text() + assert "package-updated" in body + assert "ahriman" in body + + +async def test_get_with_topic_filter(client: TestClient) -> None: + """ + must filter events by topic + """ + watcher = next(iter(client.app[WatcherKey].values())) + + async def _producer() -> None: + await asyncio.sleep(0.1) + await watcher.event_bus.broadcast(EventType.PackageRemoved, "filtered") + await watcher.event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success") + await asyncio.sleep(0.1) + await watcher.event_bus.shutdown() + + asyncio.create_task(_producer()) + + response = await client.get("/api/v1/events/stream", params={"event": "package-updated"}) + assert response.status == 200 + + body = await response.text() + assert "package-updated" in body + assert "filtered" not in body + + +async def test_run_timeout() -> None: + """ + must handle timeout and continue loop + """ + response = AsyncMock() + response.is_connected = lambda: True + response.ping_interval = 0.01 + + queue: Queue[SSEvent | None] = Queue() + + async def _shutdown() -> None: + await asyncio.sleep(0.05) + await queue.put(None) + + asyncio.create_task(_shutdown()) + await EventBusView._run(response, queue) + + +async def test_get_connection_reset(client: TestClient, mocker: MockerFixture) -> None: + """ + must handle connection reset gracefully + """ + mocker.patch.object(EventBusView, "_run", side_effect=ConnectionResetError) + + response = await client.get("/api/v1/events/stream") + assert response.status == 200