Compare commits

..

2 Commits

Author SHA1 Message Date
4165e87ce5 update tests 2026-03-29 22:16:54 +03:00
ced90dce10 event bus implementation 2026-03-29 10:56:07 +03:00
15 changed files with 157 additions and 293 deletions

View File

@@ -1,7 +1,7 @@
version: 2
build:
os: ubuntu-lts-latest
os: ubuntu-20.04
tools:
python: "3.12"
apt_packages:

View File

@@ -12,14 +12,6 @@ ahriman.core.status.client module
:no-undoc-members:
:show-inheritance:
ahriman.core.status.event\_bus module
-------------------------------------
.. automodule:: ahriman.core.status.event_bus
:members:
:no-undoc-members:
:show-inheritance:
ahriman.core.status.local\_client module
----------------------------------------

View File

@@ -92,14 +92,6 @@ ahriman.web.schemas.error\_schema module
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.event\_bus\_filter\_schema module
-----------------------------------------------------
.. automodule:: ahriman.web.schemas.event_bus_filter_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.event\_schema module
----------------------------------------
@@ -364,14 +356,6 @@ ahriman.web.schemas.search\_schema module
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.sse\_schema module
--------------------------------------
.. automodule:: ahriman.web.schemas.sse_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.status\_schema module
-----------------------------------------

View File

@@ -4,14 +4,6 @@ ahriman.web.views.v1.auditlog package
Submodules
----------
ahriman.web.views.v1.auditlog.event\_bus module
-----------------------------------------------
.. automodule:: ahriman.web.views.v1.auditlog.event_bus
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.views.v1.auditlog.events module
-------------------------------------------

View File

@@ -7,13 +7,10 @@ aiohttp==3.11.18
# ahriman (pyproject.toml)
# aiohttp-cors
# aiohttp-jinja2
# aiohttp-sse
aiohttp-cors==0.8.1
# via ahriman (pyproject.toml)
aiohttp-jinja2==1.6
# via ahriman (pyproject.toml)
aiohttp-sse==2.2.0
# via ahriman (pyproject.toml)
aiosignal==1.3.2
# via aiohttp
alabaster==1.0.0

View File

@@ -28,7 +28,6 @@ from ahriman.web.schemas.configuration_schema import ConfigurationSchema
from ahriman.web.schemas.counters_schema import CountersSchema
from ahriman.web.schemas.dependencies_schema import DependenciesSchema
from ahriman.web.schemas.error_schema import ErrorSchema
from ahriman.web.schemas.event_bus_filter_schema import EventBusFilterSchema
from ahriman.web.schemas.event_schema import EventSchema
from ahriman.web.schemas.event_search_schema import EventSearchSchema
from ahriman.web.schemas.file_schema import FileSchema
@@ -62,7 +61,6 @@ from ahriman.web.schemas.repository_id_schema import RepositoryIdSchema
from ahriman.web.schemas.repository_stats_schema import RepositoryStatsSchema
from ahriman.web.schemas.rollback_schema import RollbackSchema
from ahriman.web.schemas.search_schema import SearchSchema
from ahriman.web.schemas.sse_schema import SSESchema
from ahriman.web.schemas.status_schema import StatusSchema
from ahriman.web.schemas.update_flags_schema import UpdateFlagsSchema
from ahriman.web.schemas.worker_schema import WorkerSchema

View File

@@ -1,33 +0,0 @@
#
# Copyright (c) 2021-2026 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.models.event import EventType
from ahriman.web.apispec import fields
from ahriman.web.schemas.repository_id_schema import RepositoryIdSchema
class EventBusFilterSchema(RepositoryIdSchema):
"""
request event bus filter schema
"""
event = fields.List(fields.String(), metadata={
"description": "Event type filter",
"example": [EventType.PackageUpdated],
})

View File

@@ -1,35 +0,0 @@
#
# Copyright (c) 2021-2026 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.models.event import EventType
from ahriman.web.apispec import Schema, fields
class SSESchema(Schema):
"""
response SSE schema
"""
event = fields.String(required=True, metadata={
"description": "Event type",
"example": EventType.PackageUpdated,
})
data = fields.Dict(keys=fields.String(), values=fields.Raw(), metadata={
"description": "Event data",
})

View File

@@ -28,7 +28,7 @@ from ahriman.core.status.event_bus import SSEvent
from ahriman.models.event import EventType
from ahriman.models.user_access import UserAccess
from ahriman.web.apispec.decorators import apidocs
from ahriman.web.schemas import EventBusFilterSchema, SSESchema
from ahriman.web.schemas import EventSchema, RepositoryIdSchema
from ahriman.web.views.base import BaseView
@@ -70,8 +70,8 @@ class EventBusView(BaseView):
description="Stream live updates via SSE",
permission=GET_PERMISSION,
error_404_description="Repository is unknown",
schema=SSESchema(many=True),
query_schema=EventBusFilterSchema,
schema=EventSchema(many=True),
query_schema=RepositoryIdSchema,
)
async def get(self) -> StreamResponse:
"""
@@ -81,16 +81,15 @@ class EventBusView(BaseView):
StreamResponse: 200 with streaming updates
"""
topics = [EventType(event) for event in self.request.query.getall("event", [])] or None
event_bus = self.service().event_bus
async with sse_response(self.request) as response:
subscription_id, queue = await event_bus.subscribe(topics)
subscription_id, queue = await self.service().event_bus.subscribe(topics)
try:
await self._run(response, queue)
except (ConnectionResetError, QueueShutDown):
pass
finally:
await event_bus.unsubscribe(subscription_id)
await self.service().event_bus.unsubscribe(subscription_id)
return response

View File

@@ -2,7 +2,6 @@ import pytest
from ahriman.core.configuration import Configuration
from ahriman.core.status import Client
from ahriman.core.status.event_bus import EventBus
from ahriman.core.status.web_client import WebClient
@@ -17,17 +16,6 @@ def client() -> Client:
return Client()
@pytest.fixture
def event_bus() -> EventBus:
"""
fixture for event bus
Returns:
EventBus: even bus test instance
"""
return EventBus(0)
@pytest.fixture
def web_client(configuration: Configuration) -> WebClient:
"""

View File

@@ -1,58 +1,64 @@
import pytest
from asyncio import QueueFull
from ahriman.core.status.event_bus import EventBus
from ahriman.models.event import EventType
from ahriman.models.package import Package
async def test_broadcast(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_broadcast() -> None:
"""
must broadcast event to all subscribers
must broadcast event to all general subscribers
"""
event_bus = EventBus(0)
_, queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base, version=package_ahriman.version)
await event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success")
message = queue.get_nowait()
assert message == (
EventType.PackageUpdated,
{"object_id": package_ahriman.base, "version": package_ahriman.version},
)
assert message == (EventType.PackageUpdated, {"object_id": "ahriman", "status": "success"})
async def test_broadcast_with_topics(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_broadcast_with_topics() -> None:
"""
must broadcast event to subscribers with matching topics
must deliver event only to subscribers with matching topics
"""
_, queue = await event_bus.subscribe([EventType.PackageUpdated])
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
assert not queue.empty()
event_bus = EventBus(0)
_, filtered_queue = await event_bus.subscribe([EventType.PackageUpdated])
_, wildcard_queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
assert not filtered_queue.empty()
assert not wildcard_queue.empty()
async def test_broadcast_topic_isolation(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_broadcast_topic_isolation() -> None:
"""
must not broadcast event to subscribers with non-matching topics
must not deliver event to subscribers with non-matching topics
"""
event_bus = EventBus(0)
_, queue = await event_bus.subscribe([EventType.BuildLog])
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
assert queue.empty()
async def test_broadcast_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_broadcast_queue_full() -> None:
"""
must discard message to slow subscriber
"""
event_bus.max_size = 1
event_bus = EventBus(1)
_, queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
await event_bus.broadcast(EventType.PackageRemoved, package_ahriman.base)
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
await event_bus.broadcast(EventType.PackageRemoved, "ahriman")
assert queue.qsize() == 1
async def test_shutdown(event_bus: EventBus) -> None:
async def test_shutdown() -> None:
"""
must send sentinel to all subscribers on shutdown
"""
event_bus = EventBus(0)
_, queue = await event_bus.subscribe()
await event_bus.shutdown()
@@ -60,48 +66,55 @@ async def test_shutdown(event_bus: EventBus) -> None:
assert message is None
async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_shutdown_queue_full() -> None:
"""
must handle shutdown when queue is full
"""
event_bus.max_size = 1
event_bus = EventBus(1)
_, queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
await event_bus.shutdown()
# sentinel was not delivered but shutdown still completed
message = queue.get_nowait()
assert message is not None
async def test_subscribe(event_bus: EventBus) -> None:
async def test_subscribe() -> None:
"""
must register new subscriber
"""
event_bus = EventBus(0)
subscriber_id, queue = await event_bus.subscribe()
assert subscriber_id
assert queue.empty()
assert subscriber_id in event_bus._subscribers
async def test_subscribe_with_topics(event_bus: EventBus) -> None:
async def test_subscribe_with_topics() -> None:
"""
must register subscriber with topic filter
"""
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog])
event_bus = EventBus(0)
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog, EventType.PackageUpdated])
topics, _ = event_bus._subscribers[subscriber_id]
assert topics == [EventType.BuildLog]
assert topics == [EventType.BuildLog, EventType.PackageUpdated]
async def test_unsubscribe(event_bus: EventBus) -> None:
async def test_unsubscribe() -> None:
"""
must remove subscriber
"""
event_bus = EventBus(0)
subscriber_id, _ = await event_bus.subscribe()
await event_bus.unsubscribe(subscriber_id)
assert subscriber_id not in event_bus._subscribers
async def test_unsubscribe_unknown(event_bus: EventBus) -> None:
async def test_unsubscribe_unknown() -> None:
"""
must not fail on unknown subscriber removal
"""
event_bus = EventBus(0)
await event_bus.unsubscribe("unknown")

View File

@@ -1,40 +1,28 @@
import pytest
from pytest_mock import MockerFixture
from unittest.mock import MagicMock
from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.status.watcher import Watcher
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
from ahriman.models.changes import Changes
from ahriman.models.dependencies import Dependencies
from ahriman.models.event import Event, EventType
from ahriman.models.event import Event
from ahriman.models.log_record import LogRecord
from ahriman.models.log_record_id import LogRecordId
from ahriman.models.package import Package
from ahriman.models.pkgbuild_patch import PkgbuildPatch
async def test_event_add(watcher: Watcher, mocker: MockerFixture) -> None:
async def test_packages(watcher: Watcher, package_ahriman: Package) -> None:
"""
must create new event
must return list of available packages
"""
event = Event("event", "object")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_add")
assert not await watcher.packages()
await watcher.event_add(event)
cache_mock.assert_called_once_with(event)
async def test_event_get(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must retrieve events
"""
event = Event("event", "object")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_get", return_value=[event])
result = await watcher.event_get(None, None)
assert result == [event]
cache_mock.assert_called_once_with(None, None, None, None, -1, 0)
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
assert await watcher.packages()
async def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -64,13 +52,37 @@ async def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: Mo
assert status.status == BuildStatusEnum.Success
async def test_event_add(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must create new event
"""
event = Event("event", "object")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_add")
await watcher.event_add(event)
cache_mock.assert_called_once_with(event)
async def test_event_get(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must retrieve events
"""
event = Event("event", "object")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_get", return_value=[event])
result = await watcher.event_get(None, None)
assert result == [event]
cache_mock.assert_called_once_with(None, None, None, None, -1, 0)
async def test_logs_rotate(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must rotate logs
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.logs_rotate")
await watcher.logs_rotate(42)
cache_mock.assert_called_once_with(42)
await watcher.logs_rotate(10)
cache_mock.assert_called_once_with(10)
async def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -107,11 +119,12 @@ async def test_package_changes_get(watcher: Watcher, package_ahriman: Package, m
"""
must return package changes
"""
changes = Changes("sha")
changes = Changes()
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_changes_get",
return_value=changes)
assert await watcher.package_changes_get(package_ahriman.base) == changes
result = await watcher.package_changes_get(package_ahriman.base)
assert result == changes
cache_mock.assert_called_once_with(package_ahriman.base)
@@ -119,7 +132,7 @@ async def test_package_changes_update(watcher: Watcher, package_ahriman: Package
"""
must update package changes
"""
changes = Changes("sha")
changes = Changes()
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_changes_update")
await watcher.package_changes_update(package_ahriman.base, changes)
@@ -130,11 +143,12 @@ async def test_package_dependencies_get(watcher: Watcher, package_ahriman: Packa
"""
must return package dependencies
"""
dependencies = Dependencies({"path": [package_ahriman.base]})
dependencies = Dependencies()
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_dependencies_get",
return_value=dependencies)
assert await watcher.package_dependencies_get(package_ahriman.base) == dependencies
result = await watcher.package_dependencies_get(package_ahriman.base)
assert result == dependencies
cache_mock.assert_called_once_with(package_ahriman.base)
@@ -142,7 +156,7 @@ async def test_package_dependencies_update(watcher: Watcher, package_ahriman: Pa
"""
must update package dependencies
"""
dependencies = Dependencies({"path": [package_ahriman.base]})
dependencies = Dependencies()
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_dependencies_update")
await watcher.package_dependencies_update(package_ahriman.base, dependencies)
@@ -154,14 +168,12 @@ async def test_package_hold_update(watcher: Watcher, package_ahriman: Package, m
must update package hold status
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_hold_update")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
await watcher.package_hold_update(package_ahriman.base, enabled=True)
cache_mock.assert_called_once_with(package_ahriman.base, enabled=True)
_, status = watcher._known[package_ahriman.base]
assert status.is_held is True
broadcast_mock.assert_called_once_with(EventType.PackageHeld, package_ahriman.base, is_held=True)
async def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
@@ -178,11 +190,9 @@ async def test_package_logs_add(watcher: Watcher, package_ahriman: Package, mock
"""
log_record = LogRecord(LogRecordId(package_ahriman.base, "1.0.0"), 42.0, "message")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_add")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.package_logs_add(log_record)
cache_mock.assert_called_once_with(log_record)
broadcast_mock.assert_called_once_with(EventType.BuildLog, package_ahriman.base, **log_record.view())
async def test_package_logs_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -193,7 +203,8 @@ async def test_package_logs_get(watcher: Watcher, package_ahriman: Package, mock
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_get",
return_value=[log_record])
assert await watcher.package_logs_get(package_ahriman.base) == [log_record]
result = await watcher.package_logs_get(package_ahriman.base)
assert result == [log_record]
cache_mock.assert_called_once_with(package_ahriman.base, None, None, -1, 0)
@@ -202,6 +213,7 @@ async def test_package_logs_remove(watcher: Watcher, package_ahriman: Package, m
must remove package logs
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_remove")
await watcher.package_logs_remove(package_ahriman.base, None)
cache_mock.assert_called_once_with(package_ahriman.base, None)
@@ -211,9 +223,11 @@ async def test_package_patches_get(watcher: Watcher, package_ahriman: Package, m
must return package patches
"""
patch = PkgbuildPatch("key", "value")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_get", return_value=[patch])
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_get",
return_value=[patch])
assert await watcher.package_patches_get(package_ahriman.base, None) == [patch]
result = await watcher.package_patches_get(package_ahriman.base, None)
assert result == [patch]
cache_mock.assert_called_once_with(package_ahriman.base, None)
@@ -222,6 +236,7 @@ async def test_package_patches_remove(watcher: Watcher, package_ahriman: Package
must remove package patches
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_remove")
await watcher.package_patches_remove(package_ahriman.base, None)
cache_mock.assert_called_once_with(package_ahriman.base, None)
@@ -242,13 +257,11 @@ async def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker
must remove package base
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
await watcher.package_remove(package_ahriman.base)
assert not watcher._known
cache_mock.assert_called_once_with(package_ahriman.base)
broadcast_mock.assert_called_once_with(EventType.PackageRemoved, package_ahriman.base)
async def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -256,11 +269,8 @@ async def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package
must not fail on unknown base removal
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.package_remove(package_ahriman.base)
cache_mock.assert_called_once_with(package_ahriman.base)
broadcast_mock.assert_called_once_with(EventType.PackageRemoved, package_ahriman.base)
async def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -268,7 +278,6 @@ async def test_package_status_update(watcher: Watcher, package_ahriman: Package,
must update package status only for known package
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
@@ -276,9 +285,6 @@ async def test_package_status_update(watcher: Watcher, package_ahriman: Package,
package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Success
broadcast_mock.assert_called_once_with(
EventType.PackageStatusChanged, package_ahriman.base, status=BuildStatusEnum.Success.value,
)
async def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
@@ -287,7 +293,6 @@ async def test_package_status_update_preserves_hold(watcher: Watcher, package_ah
must preserve hold status on package status update
"""
mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))}
await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
@@ -308,15 +313,10 @@ async def test_package_update(watcher: Watcher, package_ahriman: Package, mocker
must add package to cache
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_update")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.package_update(package_ahriman, BuildStatusEnum.Unknown)
assert await watcher.packages()
cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int))
broadcast_mock.assert_called_once_with(
EventType.PackageUpdated, package_ahriman.base,
status=BuildStatusEnum.Unknown.value, version=package_ahriman.version,
)
async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
@@ -332,34 +332,22 @@ async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman:
assert status.is_held is True
async def test_packages(watcher: Watcher, package_ahriman: Package) -> None:
"""
must return list of available packages
"""
assert not await watcher.packages()
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
assert await watcher.packages()
async def test_shutdown(watcher: Watcher, mocker: MockerFixture) -> None:
async def test_shutdown(watcher: Watcher) -> None:
"""
must gracefully shutdown watcher
"""
shutdown_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.shutdown")
_, queue = await watcher.event_bus.subscribe()
await watcher.shutdown()
shutdown_mock.assert_called_once_with()
message = queue.get_nowait()
assert message is None
async def test_status_update(watcher: Watcher, mocker: MockerFixture) -> None:
async def test_status_update(watcher: Watcher) -> None:
"""
must update service status
"""
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.status_update(BuildStatusEnum.Success)
assert watcher.status.status == BuildStatusEnum.Success
broadcast_mock.assert_called_once_with(EventType.ServiceStatusChanged, None, status=BuildStatusEnum.Success.value)
def test_call(watcher: Watcher, package_ahriman: Package) -> None:

View File

@@ -1 +0,0 @@
# schema testing goes in view class tests

View File

@@ -1 +0,0 @@
# schema testing goes in view class tests

View File

@@ -1,4 +1,5 @@
import asyncio
import pytest
from aiohttp.test_utils import TestClient
@@ -6,29 +7,13 @@ from asyncio import Queue
from pytest_mock import MockerFixture
from unittest.mock import AsyncMock
from ahriman.core.status.watcher import Watcher
from ahriman.core.status.event_bus import SSEvent
from ahriman.models.event import EventType
from ahriman.models.package import Package
from ahriman.models.user_access import UserAccess
from ahriman.web.keys import WatcherKey
from ahriman.web.views.v1.auditlog.event_bus import EventBusView
async def _producer(watcher: Watcher, package_ahriman: Package) -> None:
"""
create producer
Args:
watcher(Watcher): watcher test instance
package_ahriman(Package): package test instance
"""
await asyncio.sleep(0.1)
await watcher.event_bus.broadcast(EventType.PackageRemoved, package_ahriman.base)
await watcher.event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base, status="success")
await asyncio.sleep(0.1)
await watcher.event_bus.shutdown()
async def test_get_permission() -> None:
"""
must return correct permission for the request
@@ -44,74 +29,72 @@ def test_routes() -> None:
assert EventBusView.ROUTES == ["/api/v1/events/stream"]
async def test_get(client: TestClient) -> None:
"""
must stream events via SSE
"""
watcher = next(iter(client.app[WatcherKey].values()))
async def _producer() -> None:
await asyncio.sleep(0.1)
await watcher.event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success")
await asyncio.sleep(0.1)
await watcher.event_bus.shutdown()
asyncio.create_task(_producer())
response = await client.get("/api/v1/events/stream")
assert response.status == 200
body = await response.text()
assert "package-updated" in body
assert "ahriman" in body
async def test_get_with_topic_filter(client: TestClient) -> None:
"""
must filter events by topic
"""
watcher = next(iter(client.app[WatcherKey].values()))
async def _producer() -> None:
await asyncio.sleep(0.1)
await watcher.event_bus.broadcast(EventType.PackageRemoved, "filtered")
await watcher.event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success")
await asyncio.sleep(0.1)
await watcher.event_bus.shutdown()
asyncio.create_task(_producer())
response = await client.get("/api/v1/events/stream", params={"event": "package-updated"})
assert response.status == 200
body = await response.text()
assert "package-updated" in body
assert "filtered" not in body
async def test_run_timeout() -> None:
"""
must handle timeout and continue loop
"""
queue = Queue()
response = AsyncMock()
response.is_connected = lambda: True
response.ping_interval = 0.01
queue: Queue[SSEvent | None] = Queue()
async def _shutdown() -> None:
await asyncio.sleep(0.05)
await queue.put(None)
response = AsyncMock()
response.is_connected = lambda: True
response.ping_interval = 0.01
asyncio.create_task(_shutdown())
await EventBusView._run(response, queue)
async def test_get(client: TestClient, package_ahriman: Package) -> None:
"""
must stream events via SSE
"""
watcher = next(iter(client.app[WatcherKey].values()))
asyncio.create_task(_producer(watcher, package_ahriman))
request_schema = pytest.helpers.schema_request(EventBusView.get, location="querystring")
# no content validation here because it is a streaming response
assert not request_schema.validate({})
response = await client.get("/api/v1/events/stream")
assert response.status == 200
body = await response.text()
assert EventType.PackageUpdated in body
assert "ahriman" in body
async def test_get_with_topic_filter(client: TestClient, package_ahriman: Package) -> None:
"""
must filter events by topic
"""
watcher = next(iter(client.app[WatcherKey].values()))
asyncio.create_task(_producer(watcher, package_ahriman))
request_schema = pytest.helpers.schema_request(EventBusView.get, location="querystring")
payload = {"event": [EventType.PackageUpdated]}
assert not request_schema.validate(payload)
response = await client.get("/api/v1/events/stream", params=payload)
assert response.status == 200
body = await response.text()
assert EventType.PackageUpdated in body
assert EventType.PackageRemoved not in body
async def test_get_not_found(client: TestClient) -> None:
"""
must return not found for unknown repository
"""
response_schema = pytest.helpers.schema_response(EventBusView.get, code=404)
response = await client.get("/api/v1/events/stream", params={"architecture": "unknown", "repository": "unknown"})
assert response.status == 404
assert not response_schema.validate(await response.json())
async def test_get_connection_reset(client: TestClient, mocker: MockerFixture) -> None:
"""
must handle connection reset
must handle connection reset gracefully
"""
mocker.patch.object(EventBusView, "_run", side_effect=ConnectionResetError)