feat: SSE support (#162)

* event bus implementation

* update tests

* docs update

* review fixes

* update configs

* fix typo

* frontend changes

* install missing pacakge

* queue processing simplification
This commit is contained in:
2026-05-08 10:20:03 +03:00
committed by GitHub
parent 18fe38c30b
commit 3e1e24cb50
54 changed files with 1311 additions and 383 deletions
+3 -1
View File
@@ -19,6 +19,7 @@ from ahriman.core.repository import Repository
from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.spawn import Spawn
from ahriman.core.status import Client
from ahriman.core.status.event_bus import EventBus
from ahriman.core.status.watcher import Watcher
from ahriman.models.aur_package import AURPackage
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
@@ -690,4 +691,5 @@ def watcher(local_client: Client) -> Watcher:
Watcher: package status watcher test instance
"""
package_info = PackageInfo()
return Watcher(local_client, package_info)
event_bus = EventBus(0)
return Watcher(local_client, package_info, event_bus)
+12
View File
@@ -2,6 +2,7 @@ import pytest
from ahriman.core.configuration import Configuration
from ahriman.core.status import Client
from ahriman.core.status.event_bus import EventBus
from ahriman.core.status.web_client import WebClient
@@ -16,6 +17,17 @@ def client() -> Client:
return Client()
@pytest.fixture
def event_bus() -> EventBus:
"""
fixture for event bus
Returns:
EventBus: event bus test instance
"""
return EventBus(0)
@pytest.fixture
def web_client(configuration: Configuration) -> WebClient:
"""
+144
View File
@@ -0,0 +1,144 @@
import pytest
from asyncio import QueueShutDown
from ahriman.core.status.event_bus import EventBus
from ahriman.models.event import EventType
from ahriman.models.package import Package
async def test_broadcast(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must broadcast event to all subscribers
"""
_, queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base, version=package_ahriman.version)
message = queue.get_nowait()
assert message == (
EventType.PackageUpdated,
{"object_id": package_ahriman.base, "version": package_ahriman.version},
)
async def test_broadcast_with_topics(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must broadcast event to subscribers with matching topics
"""
_, queue = await event_bus.subscribe([EventType.PackageUpdated])
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
assert not queue.empty()
async def test_broadcast_topic_isolation(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must not broadcast event to subscribers with non-matching topics
"""
_, queue = await event_bus.subscribe([EventType.BuildLog])
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
assert queue.empty()
async def test_broadcast_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must discard message to slow subscriber
"""
event_bus.max_size = 1
_, queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
await event_bus.broadcast(EventType.PackageRemoved, package_ahriman.base)
assert queue.qsize() == 1
async def test_broadcast_queue_shutdown(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must skip subscriber whose queue was shutdown concurrently
"""
_, queue = await event_bus.subscribe()
queue.shutdown()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
async def test_shutdown(event_bus: EventBus) -> None:
"""
must shutdown all subscriber queues on shutdown
"""
_, queue = await event_bus.subscribe()
await event_bus.shutdown()
with pytest.raises(QueueShutDown):
queue.get_nowait()
async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must handle shutdown when queue is full
"""
event_bus.max_size = 1
_, queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
await event_bus.shutdown()
async def test_subscribe(event_bus: EventBus) -> None:
"""
must register new subscriber
"""
subscriber_id, queue = await event_bus.subscribe()
assert subscriber_id
assert queue.empty()
assert subscriber_id in event_bus._subscribers
async def test_broadcast_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must broadcast event to subscribers with matching object_id
"""
_, queue = await event_bus.subscribe(object_id=package_ahriman.base)
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
assert not queue.empty()
async def test_broadcast_object_id_isolation(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must not broadcast event to subscribers with non-matching object_id
"""
_, queue = await event_bus.subscribe(object_id="other-package")
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
assert queue.empty()
async def test_subscribe_with_topics(event_bus: EventBus) -> None:
"""
must register subscriber with topic filter
"""
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog])
assert event_bus._subscribers[subscriber_id].topics == [EventType.BuildLog]
async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must register subscriber with object_id filter
"""
subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base)
assert event_bus._subscribers[subscriber_id].object_id == package_ahriman.base
async def test_unsubscribe(event_bus: EventBus) -> None:
"""
must remove subscriber
"""
subscriber_id, _ = await event_bus.subscribe()
await event_bus.unsubscribe(subscriber_id)
assert subscriber_id not in event_bus._subscribers
async def test_unsubscribe_unknown(event_bus: EventBus) -> None:
"""
must not fail on unknown subscriber removal
"""
await event_bus.unsubscribe("unknown")
+216 -52
View File
@@ -5,34 +5,53 @@ from pytest_mock import MockerFixture
from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.status.watcher import Watcher
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
from ahriman.models.changes import Changes
from ahriman.models.dependencies import Dependencies
from ahriman.models.event import Event, EventType
from ahriman.models.log_record import LogRecord
from ahriman.models.log_record_id import LogRecordId
from ahriman.models.package import Package
from ahriman.models.pkgbuild_patch import PkgbuildPatch
def test_packages(watcher: Watcher, package_ahriman: Package) -> None:
async def test_event_add(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must return list of available packages
must create new event
"""
assert not watcher.packages
event = Event("event", "object")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_add")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
assert watcher.packages
await watcher.event_add(event)
cache_mock.assert_called_once_with(event)
def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
async def test_event_get(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must retrieve events
"""
event = Event("event", "object")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_get", return_value=[event])
result = await watcher.event_get(None, None)
assert result == [event]
cache_mock.assert_called_once_with(None, None, None, None, -1, 0)
async def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must correctly load packages
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_get",
return_value=[(package_ahriman, BuildStatus())])
watcher.load()
await watcher.load()
cache_mock.assert_called_once_with(None)
package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown
def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
async def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must correctly load packages with known statuses
"""
@@ -40,147 +59,307 @@ def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFi
mocker.patch("ahriman.core.status.local_client.LocalClient.package_get", return_value=[(package_ahriman, status)])
watcher._known = {package_ahriman.base: (package_ahriman, status)}
watcher.load()
await watcher.load()
_, status = watcher._known[package_ahriman.base]
assert status.status == BuildStatusEnum.Success
def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
async def test_logs_rotate(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must rotate logs
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.logs_rotate")
await watcher.logs_rotate(42)
cache_mock.assert_called_once_with(42)
async def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must return package archives from package info
"""
archives_mock = mocker.patch("ahriman.core.repository.package_info.PackageInfo.package_archives",
return_value=[package_ahriman])
result = watcher.package_archives(package_ahriman.base)
result = await watcher.package_archives(package_ahriman.base)
assert result == [package_ahriman]
archives_mock.assert_called_once_with(package_ahriman.base)
def test_package_get(watcher: Watcher, package_ahriman: Package) -> None:
async def test_package_get(watcher: Watcher, package_ahriman: Package) -> None:
"""
must return package status
"""
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
package, status = watcher.package_get(package_ahriman.base)
package, status = await watcher.package_get(package_ahriman.base)
assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown
def test_package_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
async def test_package_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
"""
must fail on unknown package
"""
with pytest.raises(UnknownPackageError):
watcher.package_get(package_ahriman.base)
await watcher.package_get(package_ahriman.base)
def test_package_hold_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
async def test_package_changes_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must return package changes
"""
changes = Changes("sha")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_changes_get",
return_value=changes)
assert await watcher.package_changes_get(package_ahriman.base) == changes
cache_mock.assert_called_once_with(package_ahriman.base)
async def test_package_changes_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must update package changes
"""
changes = Changes("sha")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_changes_update")
await watcher.package_changes_update(package_ahriman.base, changes)
cache_mock.assert_called_once_with(package_ahriman.base, changes)
async def test_package_dependencies_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must return package dependencies
"""
dependencies = Dependencies({"path": [package_ahriman.base]})
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_dependencies_get",
return_value=dependencies)
assert await watcher.package_dependencies_get(package_ahriman.base) == dependencies
cache_mock.assert_called_once_with(package_ahriman.base)
async def test_package_dependencies_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must update package dependencies
"""
dependencies = Dependencies({"path": [package_ahriman.base]})
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_dependencies_update")
await watcher.package_dependencies_update(package_ahriman.base, dependencies)
cache_mock.assert_called_once_with(package_ahriman.base, dependencies)
async def test_package_hold_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must update package hold status
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_hold_update")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher.package_hold_update(package_ahriman.base, enabled=True)
await watcher.package_hold_update(package_ahriman.base, enabled=True)
cache_mock.assert_called_once_with(package_ahriman.base, enabled=True)
_, status = watcher._known[package_ahriman.base]
assert status.is_held is True
broadcast_mock.assert_called_once_with(EventType.PackageHeld, package_ahriman.base, is_held=True)
def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
async def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
"""
must fail on unknown package hold update
"""
with pytest.raises(UnknownPackageError):
watcher.package_hold_update(package_ahriman.base, enabled=True)
await watcher.package_hold_update(package_ahriman.base, enabled=True)
def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
async def test_package_logs_add(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must post log record
"""
log_record = LogRecord(LogRecordId(package_ahriman.base, "1.0.0"), 42.0, "message")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_add")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.package_logs_add(log_record)
cache_mock.assert_called_once_with(log_record)
broadcast_mock.assert_called_once_with(EventType.BuildLog, package_ahriman.base, **log_record.view())
async def test_package_logs_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must return package logs
"""
log_record = LogRecord(LogRecordId(package_ahriman.base, "1.0.0"), 42.0, "message")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_get",
return_value=[log_record])
assert await watcher.package_logs_get(package_ahriman.base) == [log_record]
cache_mock.assert_called_once_with(package_ahriman.base, None, None, -1, 0)
async def test_package_logs_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must remove package logs
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_remove")
await watcher.package_logs_remove(package_ahriman.base, None)
cache_mock.assert_called_once_with(package_ahriman.base, None)
async def test_package_patches_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must return package patches
"""
patch = PkgbuildPatch("key", "value")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_get", return_value=[patch])
assert await watcher.package_patches_get(package_ahriman.base, None) == [patch]
cache_mock.assert_called_once_with(package_ahriman.base, None)
async def test_package_patches_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must remove package patches
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_remove")
await watcher.package_patches_remove(package_ahriman.base, None)
cache_mock.assert_called_once_with(package_ahriman.base, None)
async def test_package_patches_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must update package patches
"""
patch = PkgbuildPatch("key", "value")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_update")
await watcher.package_patches_update(package_ahriman.base, patch)
cache_mock.assert_called_once_with(package_ahriman.base, patch)
async def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must remove package base
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher.package_remove(package_ahriman.base)
await watcher.package_remove(package_ahriman.base)
assert not watcher._known
cache_mock.assert_called_once_with(package_ahriman.base)
broadcast_mock.assert_called_once_with(EventType.PackageRemoved, package_ahriman.base)
def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
async def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must not fail on unknown base removal
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
watcher.package_remove(package_ahriman.base)
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.package_remove(package_ahriman.base)
cache_mock.assert_called_once_with(package_ahriman.base)
broadcast_mock.assert_called_once_with(EventType.PackageRemoved, package_ahriman.base)
def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
async def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must update package status only for known package
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
cache_mock.assert_called_once_with(package_ahriman.base, pytest.helpers.anyvar(int))
package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Success
broadcast_mock.assert_called_once_with(
EventType.PackageStatusChanged, package_ahriman.base, status=BuildStatusEnum.Success.value,
)
def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
mocker: MockerFixture) -> None:
async def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must preserve hold status on package status update
"""
mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))}
watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
_, status = watcher._known[package_ahriman.base]
assert status.is_held is True
def test_package_status_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
async def test_package_status_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
"""
must fail on unknown package status update only
"""
with pytest.raises(UnknownPackageError):
watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Unknown)
await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Unknown)
def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
async def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must add package to cache
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_update")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher.package_update(package_ahriman, BuildStatusEnum.Unknown)
assert watcher.packages
await watcher.package_update(package_ahriman, BuildStatusEnum.Unknown)
assert await watcher.packages()
cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int))
broadcast_mock.assert_called_once_with(
EventType.PackageUpdated, package_ahriman.base,
status=BuildStatusEnum.Unknown.value, version=package_ahriman.version,
)
def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
mocker: MockerFixture) -> None:
"""
must preserve hold status on package update
"""
mocker.patch("ahriman.core.status.local_client.LocalClient.package_update")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))}
watcher.package_update(package_ahriman, BuildStatusEnum.Success)
await watcher.package_update(package_ahriman, BuildStatusEnum.Success)
_, status = watcher._known[package_ahriman.base]
assert status.is_held is True
def test_status_update(watcher: Watcher) -> None:
async def test_packages(watcher: Watcher, package_ahriman: Package) -> None:
"""
must return list of available packages
"""
assert not await watcher.packages()
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
assert await watcher.packages()
async def test_shutdown(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must gracefully shutdown watcher
"""
shutdown_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.shutdown")
await watcher.shutdown()
shutdown_mock.assert_called_once_with()
async def test_status_update(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must update service status
"""
watcher.status_update(BuildStatusEnum.Success)
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.status_update(BuildStatusEnum.Success)
assert watcher.status.status == BuildStatusEnum.Success
broadcast_mock.assert_called_once_with(EventType.ServiceStatusChanged, None, status=BuildStatusEnum.Success.value)
def test_call(watcher: Watcher, package_ahriman: Package) -> None:
@@ -204,18 +383,3 @@ def test_call_failed(watcher: Watcher, package_ahriman: Package) -> None:
"""
with pytest.raises(UnknownPackageError):
assert watcher(package_ahriman.base)
def test_getattr(watcher: Watcher) -> None:
"""
must return client method call
"""
assert watcher.package_logs_remove
def test_getattr_unknown_method(watcher: Watcher) -> None:
"""
must raise AttributeError in case if no reporter attribute found
"""
with pytest.raises(AttributeError):
assert watcher.random_method
@@ -0,0 +1 @@
# schema testing goes in view class tests
@@ -0,0 +1 @@
# schema testing goes in view class tests
@@ -0,0 +1,147 @@
import asyncio
import pytest
from aiohttp.test_utils import TestClient
from asyncio import Queue
from pytest_mock import MockerFixture
from unittest.mock import AsyncMock
from ahriman.core.status.watcher import Watcher
from ahriman.models.event import EventType
from ahriman.models.package import Package
from ahriman.models.user_access import UserAccess
from ahriman.web.keys import WatcherKey
from ahriman.web.views.v1.auditlog.event_bus import EventBusView
async def _producer(watcher: Watcher, package_ahriman: Package) -> None:
"""
create producer
Args:
watcher(Watcher): watcher test instance
package_ahriman(Package): package test instance
"""
await asyncio.sleep(0.1)
await watcher.event_bus.broadcast(EventType.PackageRemoved, package_ahriman.base)
await watcher.event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base, status="success")
await asyncio.sleep(0.1)
await watcher.event_bus.shutdown()
async def test_get_permission() -> None:
"""
must return correct permission for the request
"""
for method in ("GET",):
request = pytest.helpers.request("", "", method)
assert await EventBusView.get_permission(request) == UserAccess.Full
def test_routes() -> None:
"""
must return correct routes
"""
assert EventBusView.ROUTES == ["/api/v1/events/stream"]
async def test_run_timeout() -> None:
"""
must handle timeout and continue loop
"""
queue = Queue()
async def _shutdown() -> None:
await asyncio.sleep(0.05)
queue.shutdown()
response = AsyncMock()
response.is_connected = lambda: True
response.ping_interval = 0.01
asyncio.create_task(_shutdown())
await EventBusView._run(response, queue)
async def test_get(client: TestClient, package_ahriman: Package) -> None:
"""
must stream events via SSE
"""
watcher = next(iter(client.app[WatcherKey].values()))
asyncio.create_task(_producer(watcher, package_ahriman))
request_schema = pytest.helpers.schema_request(EventBusView.get, location="querystring")
# no content validation here because it is a streaming response
assert not request_schema.validate({})
response = await client.get("/api/v1/events/stream")
assert response.status == 200
body = await response.text()
assert EventType.PackageUpdated in body
assert "ahriman" in body
async def test_get_with_topic_filter(client: TestClient, package_ahriman: Package) -> None:
"""
must filter events by topic
"""
watcher = next(iter(client.app[WatcherKey].values()))
asyncio.create_task(_producer(watcher, package_ahriman))
request_schema = pytest.helpers.schema_request(EventBusView.get, location="querystring")
payload = {"event": [EventType.PackageUpdated]}
assert not request_schema.validate(payload)
response = await client.get("/api/v1/events/stream", params=payload)
assert response.status == 200
body = await response.text()
assert EventType.PackageUpdated in body
assert EventType.PackageRemoved not in body
async def test_get_with_object_id_filter(client: TestClient, package_ahriman: Package) -> None:
"""
must filter events by object_id
"""
watcher = next(iter(client.app[WatcherKey].values()))
asyncio.create_task(_producer(watcher, package_ahriman))
request_schema = pytest.helpers.schema_request(EventBusView.get, location="querystring")
payload = {"object_id": "non-existent-package"}
assert not request_schema.validate(payload)
response = await client.get("/api/v1/events/stream", params=payload)
assert response.status == 200
body = await response.text()
assert "ahriman" not in body
async def test_get_bad_request(client: TestClient) -> None:
"""
must return bad request for invalid event type
"""
response_schema = pytest.helpers.schema_response(EventBusView.get, code=400)
response = await client.get("/api/v1/events/stream", params={"event": "invalid"})
assert response.status == 400
assert not response_schema.validate(await response.json())
async def test_get_not_found(client: TestClient) -> None:
"""
must return not found for unknown repository
"""
response_schema = pytest.helpers.schema_response(EventBusView.get, code=404)
response = await client.get("/api/v1/events/stream", params={"architecture": "unknown", "repository": "unknown"})
assert response.status == 404
assert not response_schema.validate(await response.json())
async def test_get_connection_reset(client: TestClient, mocker: MockerFixture) -> None:
"""
must handle connection reset
"""
mocker.patch.object(EventBusView, "_run", side_effect=ConnectionResetError)
response = await client.get("/api/v1/events/stream")
assert response.status == 200