mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-04-07 11:03:37 +00:00
Compare commits
7 Commits
4165e87ce5
...
feature/ss
| Author | SHA1 | Date | |
|---|---|---|---|
| a170e43073 | |||
| 190b6665de | |||
| 71f9044f27 | |||
| a69e3338b1 | |||
| 96ebb3793d | |||
| 3265bb913f | |||
| af8e2c9e9b |
@@ -1,7 +1,7 @@
|
||||
version: 2
|
||||
|
||||
build:
|
||||
os: ubuntu-20.04
|
||||
os: ubuntu-lts-latest
|
||||
tools:
|
||||
python: "3.12"
|
||||
apt_packages:
|
||||
|
||||
@@ -12,6 +12,14 @@ ahriman.core.status.client module
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.core.status.event\_bus module
|
||||
-------------------------------------
|
||||
|
||||
.. automodule:: ahriman.core.status.event_bus
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.core.status.local\_client module
|
||||
----------------------------------------
|
||||
|
||||
|
||||
@@ -92,6 +92,14 @@ ahriman.web.schemas.error\_schema module
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.web.schemas.event\_bus\_filter\_schema module
|
||||
-----------------------------------------------------
|
||||
|
||||
.. automodule:: ahriman.web.schemas.event_bus_filter_schema
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.web.schemas.event\_schema module
|
||||
----------------------------------------
|
||||
|
||||
@@ -356,6 +364,14 @@ ahriman.web.schemas.search\_schema module
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.web.schemas.sse\_schema module
|
||||
--------------------------------------
|
||||
|
||||
.. automodule:: ahriman.web.schemas.sse_schema
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.web.schemas.status\_schema module
|
||||
-----------------------------------------
|
||||
|
||||
|
||||
@@ -4,6 +4,14 @@ ahriman.web.views.v1.auditlog package
|
||||
Submodules
|
||||
----------
|
||||
|
||||
ahriman.web.views.v1.auditlog.event\_bus module
|
||||
-----------------------------------------------
|
||||
|
||||
.. automodule:: ahriman.web.views.v1.auditlog.event_bus
|
||||
:members:
|
||||
:no-undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
ahriman.web.views.v1.auditlog.events module
|
||||
-------------------------------------------
|
||||
|
||||
|
||||
@@ -188,6 +188,7 @@ Web server settings. This feature requires ``aiohttp`` libraries to be installed
|
||||
* ``host`` - host to bind, string, optional.
|
||||
* ``index_url`` - full URL of the repository index page, string, optional.
|
||||
* ``max_body_size`` - max body size in bytes to be validated for archive upload, integer, optional. If not set, validation will be disabled.
|
||||
* ``max_queue_size`` - max queue size for server sent event streams, integer, optional, default ``0``. If set to ``0``, queue is unlimited.
|
||||
* ``port`` - port to bind, integer, optional.
|
||||
* ``service_only`` - disable status routes (including logs), boolean, optional, default ``no``.
|
||||
* ``static_path`` - path to directory with static files, string, required.
|
||||
@@ -195,7 +196,7 @@ Web server settings. This feature requires ``aiohttp`` libraries to be installed
|
||||
* ``templates`` - path to templates directories, space separated list of paths, required.
|
||||
* ``unix_socket`` - path to the listening unix socket, string, optional. If set, server will create the socket on the specified address which can (and will) be used by application. Note, that unlike usual host/port configuration, unix socket allows to perform requests without authorization.
|
||||
* ``unix_socket_unsafe`` - set unsafe (o+w) permissions to unix socket, boolean, optional, default ``yes``. This option is enabled by default, because it is supposed that unix socket is created in safe environment (only web service is supposed to be used in unsafe), but it can be disabled by configuration.
|
||||
* ``wait_timeout`` - wait timeout in seconds, maximum amount of time to be waited before lock will be free, integer, optional.
|
||||
* ``wait_timeout`` - wait timeout in seconds, maximum amount of time to be waited before lock will be free, integer, optional. If set to ``0``, wait infinitely.
|
||||
|
||||
``archive`` group
|
||||
-----------------
|
||||
|
||||
@@ -7,10 +7,13 @@ aiohttp==3.11.18
|
||||
# ahriman (pyproject.toml)
|
||||
# aiohttp-cors
|
||||
# aiohttp-jinja2
|
||||
# aiohttp-sse
|
||||
aiohttp-cors==0.8.1
|
||||
# via ahriman (pyproject.toml)
|
||||
aiohttp-jinja2==1.6
|
||||
# via ahriman (pyproject.toml)
|
||||
aiohttp-sse==2.2.0
|
||||
# via ahriman (pyproject.toml)
|
||||
aiosignal==1.3.2
|
||||
# via aiohttp
|
||||
alabaster==1.0.0
|
||||
|
||||
@@ -46,6 +46,8 @@ host = 127.0.0.1
|
||||
;index_url =
|
||||
; Max file size in bytes which can be uploaded to the server. Requires ${web:enable_archive_upload} to be enabled.
|
||||
;max_body_size =
|
||||
; Max event queue size used for server sent event endpoints (0 is infinite)
|
||||
;max_queue_size = 0
|
||||
; Port to listen. Must be set, if the web service is enabled.
|
||||
;port =
|
||||
; Disable status (e.g. package status, logs, etc) endpoints. Useful for build only modes.
|
||||
|
||||
@@ -28,6 +28,7 @@ from ahriman.web.schemas.configuration_schema import ConfigurationSchema
|
||||
from ahriman.web.schemas.counters_schema import CountersSchema
|
||||
from ahriman.web.schemas.dependencies_schema import DependenciesSchema
|
||||
from ahriman.web.schemas.error_schema import ErrorSchema
|
||||
from ahriman.web.schemas.event_bus_filter_schema import EventBusFilterSchema
|
||||
from ahriman.web.schemas.event_schema import EventSchema
|
||||
from ahriman.web.schemas.event_search_schema import EventSearchSchema
|
||||
from ahriman.web.schemas.file_schema import FileSchema
|
||||
@@ -61,6 +62,7 @@ from ahriman.web.schemas.repository_id_schema import RepositoryIdSchema
|
||||
from ahriman.web.schemas.repository_stats_schema import RepositoryStatsSchema
|
||||
from ahriman.web.schemas.rollback_schema import RollbackSchema
|
||||
from ahriman.web.schemas.search_schema import SearchSchema
|
||||
from ahriman.web.schemas.sse_schema import SSESchema
|
||||
from ahriman.web.schemas.status_schema import StatusSchema
|
||||
from ahriman.web.schemas.update_flags_schema import UpdateFlagsSchema
|
||||
from ahriman.web.schemas.worker_schema import WorkerSchema
|
||||
|
||||
33
src/ahriman/web/schemas/event_bus_filter_schema.py
Normal file
33
src/ahriman/web/schemas/event_bus_filter_schema.py
Normal file
@@ -0,0 +1,33 @@
|
||||
#
|
||||
# Copyright (c) 2021-2026 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from ahriman.models.event import EventType
|
||||
from ahriman.web.apispec import fields
|
||||
from ahriman.web.schemas.repository_id_schema import RepositoryIdSchema
|
||||
|
||||
|
||||
class EventBusFilterSchema(RepositoryIdSchema):
|
||||
"""
|
||||
request event bus filter schema
|
||||
"""
|
||||
|
||||
event = fields.List(fields.String(), metadata={
|
||||
"description": "Event type filter",
|
||||
"example": [EventType.PackageUpdated],
|
||||
})
|
||||
35
src/ahriman/web/schemas/sse_schema.py
Normal file
35
src/ahriman/web/schemas/sse_schema.py
Normal file
@@ -0,0 +1,35 @@
|
||||
#
|
||||
# Copyright (c) 2021-2026 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from ahriman.models.event import EventType
|
||||
from ahriman.web.apispec import Schema, fields
|
||||
|
||||
|
||||
class SSESchema(Schema):
|
||||
"""
|
||||
response SSE schema
|
||||
"""
|
||||
|
||||
event = fields.String(required=True, metadata={
|
||||
"description": "Event type",
|
||||
"example": EventType.PackageUpdated,
|
||||
})
|
||||
data = fields.Dict(keys=fields.String(), values=fields.Raw(), metadata={
|
||||
"description": "Event data",
|
||||
})
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
import json
|
||||
|
||||
from aiohttp.web import StreamResponse
|
||||
from aiohttp.web import HTTPBadRequest, StreamResponse
|
||||
from aiohttp_sse import EventSourceResponse, sse_response
|
||||
from asyncio import Queue, QueueShutDown, wait_for
|
||||
from typing import ClassVar
|
||||
@@ -28,7 +28,7 @@ from ahriman.core.status.event_bus import SSEvent
|
||||
from ahriman.models.event import EventType
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.web.apispec.decorators import apidocs
|
||||
from ahriman.web.schemas import EventSchema, RepositoryIdSchema
|
||||
from ahriman.web.schemas import EventBusFilterSchema, SSESchema
|
||||
from ahriman.web.views.base import BaseView
|
||||
|
||||
|
||||
@@ -69,9 +69,10 @@ class EventBusView(BaseView):
|
||||
summary="Live updates",
|
||||
description="Stream live updates via SSE",
|
||||
permission=GET_PERMISSION,
|
||||
error_400_enabled=True,
|
||||
error_404_description="Repository is unknown",
|
||||
schema=EventSchema(many=True),
|
||||
query_schema=RepositoryIdSchema,
|
||||
schema=SSESchema(many=True),
|
||||
query_schema=EventBusFilterSchema,
|
||||
)
|
||||
async def get(self) -> StreamResponse:
|
||||
"""
|
||||
@@ -79,17 +80,24 @@ class EventBusView(BaseView):
|
||||
|
||||
Returns:
|
||||
StreamResponse: 200 with streaming updates
|
||||
|
||||
Raises:
|
||||
HTTPBadRequest: if invalid event type is supplied
|
||||
"""
|
||||
topics = [EventType(event) for event in self.request.query.getall("event", [])] or None
|
||||
try:
|
||||
topics = [EventType(event) for event in self.request.query.getall("event", [])] or None
|
||||
except ValueError as ex:
|
||||
raise HTTPBadRequest(reason=str(ex))
|
||||
event_bus = self.service().event_bus
|
||||
|
||||
async with sse_response(self.request) as response:
|
||||
subscription_id, queue = await self.service().event_bus.subscribe(topics)
|
||||
subscription_id, queue = await event_bus.subscribe(topics)
|
||||
|
||||
try:
|
||||
await self._run(response, queue)
|
||||
except (ConnectionResetError, QueueShutDown):
|
||||
pass
|
||||
finally:
|
||||
await self.service().event_bus.unsubscribe(subscription_id)
|
||||
await event_bus.unsubscribe(subscription_id)
|
||||
|
||||
return response
|
||||
|
||||
@@ -2,6 +2,7 @@ import pytest
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.status import Client
|
||||
from ahriman.core.status.event_bus import EventBus
|
||||
from ahriman.core.status.web_client import WebClient
|
||||
|
||||
|
||||
@@ -16,6 +17,17 @@ def client() -> Client:
|
||||
return Client()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def event_bus() -> EventBus:
|
||||
"""
|
||||
fixture for event bus
|
||||
|
||||
Returns:
|
||||
EventBus: event bus test instance
|
||||
"""
|
||||
return EventBus(0)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def web_client(configuration: Configuration) -> WebClient:
|
||||
"""
|
||||
|
||||
@@ -1,64 +1,58 @@
|
||||
import pytest
|
||||
|
||||
from asyncio import QueueFull
|
||||
|
||||
from ahriman.core.status.event_bus import EventBus
|
||||
from ahriman.models.event import EventType
|
||||
from ahriman.models.package import Package
|
||||
|
||||
|
||||
async def test_broadcast() -> None:
|
||||
async def test_broadcast(event_bus: EventBus, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must broadcast event to all general subscribers
|
||||
must broadcast event to all subscribers
|
||||
"""
|
||||
event_bus = EventBus(0)
|
||||
_, queue = await event_bus.subscribe()
|
||||
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base, version=package_ahriman.version)
|
||||
|
||||
await event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success")
|
||||
message = queue.get_nowait()
|
||||
assert message == (EventType.PackageUpdated, {"object_id": "ahriman", "status": "success"})
|
||||
assert message == (
|
||||
EventType.PackageUpdated,
|
||||
{"object_id": package_ahriman.base, "version": package_ahriman.version},
|
||||
)
|
||||
|
||||
|
||||
async def test_broadcast_with_topics() -> None:
|
||||
async def test_broadcast_with_topics(event_bus: EventBus, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must deliver event only to subscribers with matching topics
|
||||
must broadcast event to subscribers with matching topics
|
||||
"""
|
||||
event_bus = EventBus(0)
|
||||
_, filtered_queue = await event_bus.subscribe([EventType.PackageUpdated])
|
||||
_, wildcard_queue = await event_bus.subscribe()
|
||||
|
||||
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
|
||||
assert not filtered_queue.empty()
|
||||
assert not wildcard_queue.empty()
|
||||
_, queue = await event_bus.subscribe([EventType.PackageUpdated])
|
||||
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
|
||||
assert not queue.empty()
|
||||
|
||||
|
||||
async def test_broadcast_topic_isolation() -> None:
|
||||
async def test_broadcast_topic_isolation(event_bus: EventBus, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must not deliver event to subscribers with non-matching topics
|
||||
must not broadcast event to subscribers with non-matching topics
|
||||
"""
|
||||
event_bus = EventBus(0)
|
||||
_, queue = await event_bus.subscribe([EventType.BuildLog])
|
||||
|
||||
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
|
||||
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
|
||||
assert queue.empty()
|
||||
|
||||
|
||||
async def test_broadcast_queue_full() -> None:
|
||||
async def test_broadcast_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must discard message to slow subscriber
|
||||
"""
|
||||
event_bus = EventBus(1)
|
||||
event_bus.max_size = 1
|
||||
_, queue = await event_bus.subscribe()
|
||||
|
||||
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
|
||||
await event_bus.broadcast(EventType.PackageRemoved, "ahriman")
|
||||
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
|
||||
await event_bus.broadcast(EventType.PackageRemoved, package_ahriman.base)
|
||||
assert queue.qsize() == 1
|
||||
|
||||
|
||||
async def test_shutdown() -> None:
|
||||
async def test_shutdown(event_bus: EventBus) -> None:
|
||||
"""
|
||||
must send sentinel to all subscribers on shutdown
|
||||
"""
|
||||
event_bus = EventBus(0)
|
||||
_, queue = await event_bus.subscribe()
|
||||
|
||||
await event_bus.shutdown()
|
||||
@@ -66,55 +60,48 @@ async def test_shutdown() -> None:
|
||||
assert message is None
|
||||
|
||||
|
||||
async def test_shutdown_queue_full() -> None:
|
||||
async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must handle shutdown when queue is full
|
||||
"""
|
||||
event_bus = EventBus(1)
|
||||
event_bus.max_size = 1
|
||||
_, queue = await event_bus.subscribe()
|
||||
|
||||
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
|
||||
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
|
||||
await event_bus.shutdown()
|
||||
# sentinel was not delivered but shutdown still completed
|
||||
message = queue.get_nowait()
|
||||
assert message is not None
|
||||
|
||||
|
||||
async def test_subscribe() -> None:
|
||||
async def test_subscribe(event_bus: EventBus) -> None:
|
||||
"""
|
||||
must register new subscriber
|
||||
"""
|
||||
event_bus = EventBus(0)
|
||||
subscriber_id, queue = await event_bus.subscribe()
|
||||
|
||||
assert subscriber_id
|
||||
assert queue.empty()
|
||||
assert subscriber_id in event_bus._subscribers
|
||||
|
||||
|
||||
async def test_subscribe_with_topics() -> None:
|
||||
async def test_subscribe_with_topics(event_bus: EventBus) -> None:
|
||||
"""
|
||||
must register subscriber with topic filter
|
||||
"""
|
||||
event_bus = EventBus(0)
|
||||
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog, EventType.PackageUpdated])
|
||||
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog])
|
||||
topics, _ = event_bus._subscribers[subscriber_id]
|
||||
assert topics == [EventType.BuildLog, EventType.PackageUpdated]
|
||||
assert topics == [EventType.BuildLog]
|
||||
|
||||
|
||||
async def test_unsubscribe() -> None:
|
||||
async def test_unsubscribe(event_bus: EventBus) -> None:
|
||||
"""
|
||||
must remove subscriber
|
||||
"""
|
||||
event_bus = EventBus(0)
|
||||
subscriber_id, _ = await event_bus.subscribe()
|
||||
|
||||
await event_bus.unsubscribe(subscriber_id)
|
||||
assert subscriber_id not in event_bus._subscribers
|
||||
|
||||
|
||||
async def test_unsubscribe_unknown() -> None:
|
||||
async def test_unsubscribe_unknown(event_bus: EventBus) -> None:
|
||||
"""
|
||||
must not fail on unknown subscriber removal
|
||||
"""
|
||||
event_bus = EventBus(0)
|
||||
await event_bus.unsubscribe("unknown")
|
||||
|
||||
@@ -1,28 +1,40 @@
|
||||
import pytest
|
||||
|
||||
from pytest_mock import MockerFixture
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from ahriman.core.exceptions import UnknownPackageError
|
||||
from ahriman.core.status.watcher import Watcher
|
||||
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
|
||||
from ahriman.models.changes import Changes
|
||||
from ahriman.models.dependencies import Dependencies
|
||||
from ahriman.models.event import Event
|
||||
from ahriman.models.event import Event, EventType
|
||||
from ahriman.models.log_record import LogRecord
|
||||
from ahriman.models.log_record_id import LogRecordId
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.pkgbuild_patch import PkgbuildPatch
|
||||
|
||||
|
||||
async def test_packages(watcher: Watcher, package_ahriman: Package) -> None:
|
||||
async def test_event_add(watcher: Watcher, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must return list of available packages
|
||||
must create new event
|
||||
"""
|
||||
assert not await watcher.packages()
|
||||
event = Event("event", "object")
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_add")
|
||||
|
||||
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
|
||||
assert await watcher.packages()
|
||||
await watcher.event_add(event)
|
||||
cache_mock.assert_called_once_with(event)
|
||||
|
||||
|
||||
async def test_event_get(watcher: Watcher, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must retrieve events
|
||||
"""
|
||||
event = Event("event", "object")
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_get", return_value=[event])
|
||||
|
||||
result = await watcher.event_get(None, None)
|
||||
assert result == [event]
|
||||
cache_mock.assert_called_once_with(None, None, None, None, -1, 0)
|
||||
|
||||
|
||||
async def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
@@ -52,37 +64,13 @@ async def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: Mo
|
||||
assert status.status == BuildStatusEnum.Success
|
||||
|
||||
|
||||
async def test_event_add(watcher: Watcher, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must create new event
|
||||
"""
|
||||
event = Event("event", "object")
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_add")
|
||||
|
||||
await watcher.event_add(event)
|
||||
cache_mock.assert_called_once_with(event)
|
||||
|
||||
|
||||
async def test_event_get(watcher: Watcher, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must retrieve events
|
||||
"""
|
||||
event = Event("event", "object")
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_get", return_value=[event])
|
||||
|
||||
result = await watcher.event_get(None, None)
|
||||
assert result == [event]
|
||||
cache_mock.assert_called_once_with(None, None, None, None, -1, 0)
|
||||
|
||||
|
||||
async def test_logs_rotate(watcher: Watcher, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must rotate logs
|
||||
"""
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.logs_rotate")
|
||||
|
||||
await watcher.logs_rotate(10)
|
||||
cache_mock.assert_called_once_with(10)
|
||||
await watcher.logs_rotate(42)
|
||||
cache_mock.assert_called_once_with(42)
|
||||
|
||||
|
||||
async def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
@@ -119,12 +107,11 @@ async def test_package_changes_get(watcher: Watcher, package_ahriman: Package, m
|
||||
"""
|
||||
must return package changes
|
||||
"""
|
||||
changes = Changes()
|
||||
changes = Changes("sha")
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_changes_get",
|
||||
return_value=changes)
|
||||
|
||||
result = await watcher.package_changes_get(package_ahriman.base)
|
||||
assert result == changes
|
||||
assert await watcher.package_changes_get(package_ahriman.base) == changes
|
||||
cache_mock.assert_called_once_with(package_ahriman.base)
|
||||
|
||||
|
||||
@@ -132,7 +119,7 @@ async def test_package_changes_update(watcher: Watcher, package_ahriman: Package
|
||||
"""
|
||||
must update package changes
|
||||
"""
|
||||
changes = Changes()
|
||||
changes = Changes("sha")
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_changes_update")
|
||||
|
||||
await watcher.package_changes_update(package_ahriman.base, changes)
|
||||
@@ -143,12 +130,11 @@ async def test_package_dependencies_get(watcher: Watcher, package_ahriman: Packa
|
||||
"""
|
||||
must return package dependencies
|
||||
"""
|
||||
dependencies = Dependencies()
|
||||
dependencies = Dependencies({"path": [package_ahriman.base]})
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_dependencies_get",
|
||||
return_value=dependencies)
|
||||
|
||||
result = await watcher.package_dependencies_get(package_ahriman.base)
|
||||
assert result == dependencies
|
||||
assert await watcher.package_dependencies_get(package_ahriman.base) == dependencies
|
||||
cache_mock.assert_called_once_with(package_ahriman.base)
|
||||
|
||||
|
||||
@@ -156,7 +142,7 @@ async def test_package_dependencies_update(watcher: Watcher, package_ahriman: Pa
|
||||
"""
|
||||
must update package dependencies
|
||||
"""
|
||||
dependencies = Dependencies()
|
||||
dependencies = Dependencies({"path": [package_ahriman.base]})
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_dependencies_update")
|
||||
|
||||
await watcher.package_dependencies_update(package_ahriman.base, dependencies)
|
||||
@@ -168,12 +154,14 @@ async def test_package_hold_update(watcher: Watcher, package_ahriman: Package, m
|
||||
must update package hold status
|
||||
"""
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_hold_update")
|
||||
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
|
||||
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
|
||||
|
||||
await watcher.package_hold_update(package_ahriman.base, enabled=True)
|
||||
cache_mock.assert_called_once_with(package_ahriman.base, enabled=True)
|
||||
_, status = watcher._known[package_ahriman.base]
|
||||
assert status.is_held is True
|
||||
broadcast_mock.assert_called_once_with(EventType.PackageHeld, package_ahriman.base, is_held=True)
|
||||
|
||||
|
||||
async def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
|
||||
@@ -190,9 +178,11 @@ async def test_package_logs_add(watcher: Watcher, package_ahriman: Package, mock
|
||||
"""
|
||||
log_record = LogRecord(LogRecordId(package_ahriman.base, "1.0.0"), 42.0, "message")
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_add")
|
||||
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
|
||||
|
||||
await watcher.package_logs_add(log_record)
|
||||
cache_mock.assert_called_once_with(log_record)
|
||||
broadcast_mock.assert_called_once_with(EventType.BuildLog, package_ahriman.base, **log_record.view())
|
||||
|
||||
|
||||
async def test_package_logs_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
@@ -203,8 +193,7 @@ async def test_package_logs_get(watcher: Watcher, package_ahriman: Package, mock
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_get",
|
||||
return_value=[log_record])
|
||||
|
||||
result = await watcher.package_logs_get(package_ahriman.base)
|
||||
assert result == [log_record]
|
||||
assert await watcher.package_logs_get(package_ahriman.base) == [log_record]
|
||||
cache_mock.assert_called_once_with(package_ahriman.base, None, None, -1, 0)
|
||||
|
||||
|
||||
@@ -213,7 +202,6 @@ async def test_package_logs_remove(watcher: Watcher, package_ahriman: Package, m
|
||||
must remove package logs
|
||||
"""
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_remove")
|
||||
|
||||
await watcher.package_logs_remove(package_ahriman.base, None)
|
||||
cache_mock.assert_called_once_with(package_ahriman.base, None)
|
||||
|
||||
@@ -223,11 +211,9 @@ async def test_package_patches_get(watcher: Watcher, package_ahriman: Package, m
|
||||
must return package patches
|
||||
"""
|
||||
patch = PkgbuildPatch("key", "value")
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_get",
|
||||
return_value=[patch])
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_get", return_value=[patch])
|
||||
|
||||
result = await watcher.package_patches_get(package_ahriman.base, None)
|
||||
assert result == [patch]
|
||||
assert await watcher.package_patches_get(package_ahriman.base, None) == [patch]
|
||||
cache_mock.assert_called_once_with(package_ahriman.base, None)
|
||||
|
||||
|
||||
@@ -236,7 +222,6 @@ async def test_package_patches_remove(watcher: Watcher, package_ahriman: Package
|
||||
must remove package patches
|
||||
"""
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_remove")
|
||||
|
||||
await watcher.package_patches_remove(package_ahriman.base, None)
|
||||
cache_mock.assert_called_once_with(package_ahriman.base, None)
|
||||
|
||||
@@ -257,11 +242,13 @@ async def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker
|
||||
must remove package base
|
||||
"""
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
|
||||
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
|
||||
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
|
||||
|
||||
await watcher.package_remove(package_ahriman.base)
|
||||
assert not watcher._known
|
||||
cache_mock.assert_called_once_with(package_ahriman.base)
|
||||
broadcast_mock.assert_called_once_with(EventType.PackageRemoved, package_ahriman.base)
|
||||
|
||||
|
||||
async def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
@@ -269,8 +256,11 @@ async def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package
|
||||
must not fail on unknown base removal
|
||||
"""
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
|
||||
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
|
||||
|
||||
await watcher.package_remove(package_ahriman.base)
|
||||
cache_mock.assert_called_once_with(package_ahriman.base)
|
||||
broadcast_mock.assert_called_once_with(EventType.PackageRemoved, package_ahriman.base)
|
||||
|
||||
|
||||
async def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
@@ -278,6 +268,7 @@ async def test_package_status_update(watcher: Watcher, package_ahriman: Package,
|
||||
must update package status only for known package
|
||||
"""
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
|
||||
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
|
||||
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
|
||||
|
||||
await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
|
||||
@@ -285,6 +276,9 @@ async def test_package_status_update(watcher: Watcher, package_ahriman: Package,
|
||||
package, status = watcher._known[package_ahriman.base]
|
||||
assert package == package_ahriman
|
||||
assert status.status == BuildStatusEnum.Success
|
||||
broadcast_mock.assert_called_once_with(
|
||||
EventType.PackageStatusChanged, package_ahriman.base, status=BuildStatusEnum.Success.value,
|
||||
)
|
||||
|
||||
|
||||
async def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
|
||||
@@ -293,6 +287,7 @@ async def test_package_status_update_preserves_hold(watcher: Watcher, package_ah
|
||||
must preserve hold status on package status update
|
||||
"""
|
||||
mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
|
||||
mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
|
||||
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))}
|
||||
|
||||
await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
|
||||
@@ -313,10 +308,15 @@ async def test_package_update(watcher: Watcher, package_ahriman: Package, mocker
|
||||
must add package to cache
|
||||
"""
|
||||
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_update")
|
||||
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
|
||||
|
||||
await watcher.package_update(package_ahriman, BuildStatusEnum.Unknown)
|
||||
assert await watcher.packages()
|
||||
cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int))
|
||||
broadcast_mock.assert_called_once_with(
|
||||
EventType.PackageUpdated, package_ahriman.base,
|
||||
status=BuildStatusEnum.Unknown.value, version=package_ahriman.version,
|
||||
)
|
||||
|
||||
|
||||
async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
|
||||
@@ -332,22 +332,34 @@ async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman:
|
||||
assert status.is_held is True
|
||||
|
||||
|
||||
async def test_shutdown(watcher: Watcher) -> None:
|
||||
async def test_packages(watcher: Watcher, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must return list of available packages
|
||||
"""
|
||||
assert not await watcher.packages()
|
||||
|
||||
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
|
||||
assert await watcher.packages()
|
||||
|
||||
|
||||
async def test_shutdown(watcher: Watcher, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must gracefully shutdown watcher
|
||||
"""
|
||||
_, queue = await watcher.event_bus.subscribe()
|
||||
shutdown_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.shutdown")
|
||||
await watcher.shutdown()
|
||||
message = queue.get_nowait()
|
||||
assert message is None
|
||||
shutdown_mock.assert_called_once_with()
|
||||
|
||||
|
||||
async def test_status_update(watcher: Watcher) -> None:
|
||||
async def test_status_update(watcher: Watcher, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must update service status
|
||||
"""
|
||||
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
|
||||
|
||||
await watcher.status_update(BuildStatusEnum.Success)
|
||||
assert watcher.status.status == BuildStatusEnum.Success
|
||||
broadcast_mock.assert_called_once_with(EventType.ServiceStatusChanged, None, status=BuildStatusEnum.Success.value)
|
||||
|
||||
|
||||
def test_call(watcher: Watcher, package_ahriman: Package) -> None:
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
# schema testing goes in view class tests
|
||||
1
tests/ahriman/web/schemas/test_sse_schema.py
Normal file
1
tests/ahriman/web/schemas/test_sse_schema.py
Normal file
@@ -0,0 +1 @@
|
||||
# schema testing goes in view class tests
|
||||
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from aiohttp.test_utils import TestClient
|
||||
@@ -7,19 +6,36 @@ from asyncio import Queue
|
||||
from pytest_mock import MockerFixture
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from ahriman.core.status.event_bus import SSEvent
|
||||
from ahriman.core.status.watcher import Watcher
|
||||
from ahriman.models.event import EventType
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.web.keys import WatcherKey
|
||||
from ahriman.web.views.v1.auditlog.event_bus import EventBusView
|
||||
|
||||
|
||||
async def _producer(watcher: Watcher, package_ahriman: Package) -> None:
|
||||
"""
|
||||
create producer
|
||||
|
||||
Args:
|
||||
watcher(Watcher): watcher test instance
|
||||
package_ahriman(Package): package test instance
|
||||
"""
|
||||
await asyncio.sleep(0.1)
|
||||
await watcher.event_bus.broadcast(EventType.PackageRemoved, package_ahriman.base)
|
||||
await watcher.event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base, status="success")
|
||||
await asyncio.sleep(0.1)
|
||||
await watcher.event_bus.shutdown()
|
||||
|
||||
|
||||
async def test_get_permission() -> None:
|
||||
"""
|
||||
must return correct permission for the request
|
||||
"""
|
||||
request = pytest.helpers.request("", "", "GET")
|
||||
assert await EventBusView.get_permission(request) == UserAccess.Full
|
||||
for method in ("GET",):
|
||||
request = pytest.helpers.request("", "", method)
|
||||
assert await EventBusView.get_permission(request) == UserAccess.Full
|
||||
|
||||
|
||||
def test_routes() -> None:
|
||||
@@ -29,74 +45,86 @@ def test_routes() -> None:
|
||||
assert EventBusView.ROUTES == ["/api/v1/events/stream"]
|
||||
|
||||
|
||||
async def test_get(client: TestClient) -> None:
|
||||
"""
|
||||
must stream events via SSE
|
||||
"""
|
||||
watcher = next(iter(client.app[WatcherKey].values()))
|
||||
|
||||
async def _producer() -> None:
|
||||
await asyncio.sleep(0.1)
|
||||
await watcher.event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success")
|
||||
await asyncio.sleep(0.1)
|
||||
await watcher.event_bus.shutdown()
|
||||
|
||||
asyncio.create_task(_producer())
|
||||
|
||||
response = await client.get("/api/v1/events/stream")
|
||||
assert response.status == 200
|
||||
|
||||
body = await response.text()
|
||||
assert "package-updated" in body
|
||||
assert "ahriman" in body
|
||||
|
||||
|
||||
async def test_get_with_topic_filter(client: TestClient) -> None:
|
||||
"""
|
||||
must filter events by topic
|
||||
"""
|
||||
watcher = next(iter(client.app[WatcherKey].values()))
|
||||
|
||||
async def _producer() -> None:
|
||||
await asyncio.sleep(0.1)
|
||||
await watcher.event_bus.broadcast(EventType.PackageRemoved, "filtered")
|
||||
await watcher.event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success")
|
||||
await asyncio.sleep(0.1)
|
||||
await watcher.event_bus.shutdown()
|
||||
|
||||
asyncio.create_task(_producer())
|
||||
|
||||
response = await client.get("/api/v1/events/stream", params={"event": "package-updated"})
|
||||
assert response.status == 200
|
||||
|
||||
body = await response.text()
|
||||
assert "package-updated" in body
|
||||
assert "filtered" not in body
|
||||
|
||||
|
||||
async def test_run_timeout() -> None:
|
||||
"""
|
||||
must handle timeout and continue loop
|
||||
"""
|
||||
response = AsyncMock()
|
||||
response.is_connected = lambda: True
|
||||
response.ping_interval = 0.01
|
||||
|
||||
queue: Queue[SSEvent | None] = Queue()
|
||||
queue = Queue()
|
||||
|
||||
async def _shutdown() -> None:
|
||||
await asyncio.sleep(0.05)
|
||||
await queue.put(None)
|
||||
|
||||
response = AsyncMock()
|
||||
response.is_connected = lambda: True
|
||||
response.ping_interval = 0.01
|
||||
|
||||
asyncio.create_task(_shutdown())
|
||||
await EventBusView._run(response, queue)
|
||||
|
||||
|
||||
async def test_get_connection_reset(client: TestClient, mocker: MockerFixture) -> None:
|
||||
async def test_get(client: TestClient, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must handle connection reset gracefully
|
||||
must stream events via SSE
|
||||
"""
|
||||
mocker.patch.object(EventBusView, "_run", side_effect=ConnectionResetError)
|
||||
watcher = next(iter(client.app[WatcherKey].values()))
|
||||
asyncio.create_task(_producer(watcher, package_ahriman))
|
||||
request_schema = pytest.helpers.schema_request(EventBusView.get, location="querystring")
|
||||
# no content validation here because it is a streaming response
|
||||
|
||||
assert not request_schema.validate({})
|
||||
response = await client.get("/api/v1/events/stream")
|
||||
assert response.status == 200
|
||||
|
||||
body = await response.text()
|
||||
assert EventType.PackageUpdated in body
|
||||
assert "ahriman" in body
|
||||
|
||||
|
||||
async def test_get_with_topic_filter(client: TestClient, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must filter events by topic
|
||||
"""
|
||||
watcher = next(iter(client.app[WatcherKey].values()))
|
||||
asyncio.create_task(_producer(watcher, package_ahriman))
|
||||
request_schema = pytest.helpers.schema_request(EventBusView.get, location="querystring")
|
||||
|
||||
payload = {"event": [EventType.PackageUpdated]}
|
||||
assert not request_schema.validate(payload)
|
||||
response = await client.get("/api/v1/events/stream", params=payload)
|
||||
assert response.status == 200
|
||||
|
||||
body = await response.text()
|
||||
assert EventType.PackageUpdated in body
|
||||
assert EventType.PackageRemoved not in body
|
||||
|
||||
|
||||
async def test_get_bad_request(client: TestClient) -> None:
|
||||
"""
|
||||
must return bad request for invalid event type
|
||||
"""
|
||||
response_schema = pytest.helpers.schema_response(EventBusView.get, code=400)
|
||||
|
||||
response = await client.get("/api/v1/events/stream", params={"event": "invalid"})
|
||||
assert response.status == 400
|
||||
assert not response_schema.validate(await response.json())
|
||||
|
||||
|
||||
async def test_get_not_found(client: TestClient) -> None:
|
||||
"""
|
||||
must return not found for unknown repository
|
||||
"""
|
||||
response_schema = pytest.helpers.schema_response(EventBusView.get, code=404)
|
||||
|
||||
response = await client.get("/api/v1/events/stream", params={"architecture": "unknown", "repository": "unknown"})
|
||||
assert response.status == 404
|
||||
assert not response_schema.validate(await response.json())
|
||||
|
||||
|
||||
async def test_get_connection_reset(client: TestClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must handle connection reset
|
||||
"""
|
||||
mocker.patch.object(EventBusView, "_run", side_effect=ConnectionResetError)
|
||||
response = await client.get("/api/v1/events/stream")
|
||||
assert response.status == 200
|
||||
|
||||
Reference in New Issue
Block a user