event bus implementation

This commit is contained in:
2026-03-24 02:23:14 +02:00
parent af8e2c9e9b
commit 3265bb913f
23 changed files with 490 additions and 147 deletions

View File

@@ -0,0 +1,95 @@
#
# Copyright (c) 2021-2026 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import json
from aiohttp.web import StreamResponse
from aiohttp_sse import EventSourceResponse, sse_response
from asyncio import Queue, QueueShutDown, wait_for
from typing import ClassVar
from ahriman.core.status.event_bus import SSEvent
from ahriman.models.event import EventType
from ahriman.models.user_access import UserAccess
from ahriman.web.apispec.decorators import apidocs
from ahriman.web.schemas import EventSchema, RepositoryIdSchema
from ahriman.web.views.base import BaseView
class EventBusView(BaseView):
"""
event bus SSE view
Attributes:
GET_PERMISSION(UserAccess): (class attribute) get permissions of self
"""
GET_PERMISSION: ClassVar[UserAccess] = UserAccess.Full
ROUTES = ["/api/v1/events/stream"]
@staticmethod
async def _run(response: EventSourceResponse, queue: Queue[SSEvent | None]) -> None:
"""
read events from queue and send them to the client
Args:
response(EventSourceResponse): SSE response instance
queue(Queue[SSEvent | None]): subscriber queue
"""
while response.is_connected():
try:
message = await wait_for(queue.get(), timeout=response.ping_interval)
except TimeoutError:
continue
if message is None:
break # terminate queue on sentinel event
event_type, data = message
await response.send(json.dumps(data), event=event_type)
@apidocs(
tags=["Audit log"],
summary="Live updates",
description="Stream live updates via SSE",
permission=GET_PERMISSION,
error_404_description="Repository is unknown",
schema=EventSchema(many=True),
query_schema=RepositoryIdSchema,
)
async def get(self) -> StreamResponse:
"""
subscribe on updates
Returns:
StreamResponse: 200 with streaming updates
"""
topics = [EventType(event) for event in self.request.query.getall("event", [])] or None
async with sse_response(self.request) as response:
subscription_id, queue = await self.service().event_bus.subscribe(topics)
try:
await self._run(response, queue)
except (ConnectionResetError, QueueShutDown):
pass
finally:
await self.service().event_bus.unsubscribe(subscription_id)
return response

View File

@@ -67,7 +67,7 @@ class EventsView(BaseView):
except ValueError as ex:
raise HTTPBadRequest(reason=str(ex))
events = self.service().event_get(event, object_id, from_date, to_date, limit, offset)
events = await self.service().event_get(event, object_id, from_date, to_date, limit, offset)
response = [event.view() for event in events]
return self.json_response(response)
@@ -94,6 +94,6 @@ class EventsView(BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
self.service().event_add(event)
await self.service().event_add(event)
raise HTTPNoContent

View File

@@ -60,6 +60,6 @@ class Archives(StatusViewGuard, BaseView):
"""
package_base = self.request.match_info["package"]
archives = self.service(package_base=package_base).package_archives(package_base)
archives = await self.service(package_base=package_base).package_archives(package_base)
return self.json_response([archive.view() for archive in archives])

View File

@@ -63,7 +63,7 @@ class ChangesView(StatusViewGuard, BaseView):
"""
package_base = self.request.match_info["package"]
changes = self.service(package_base=package_base).package_changes_get(package_base)
changes = await self.service(package_base=package_base).package_changes_get(package_base)
return self.json_response(changes.view())
@@ -97,6 +97,6 @@ class ChangesView(StatusViewGuard, BaseView):
raise HTTPBadRequest(reason=str(ex))
changes = Changes(last_commit_sha, change, pkgbuild)
self.service().package_changes_update(package_base, changes)
await self.service().package_changes_update(package_base, changes)
raise HTTPNoContent

View File

@@ -63,7 +63,7 @@ class DependenciesView(StatusViewGuard, BaseView):
"""
package_base = self.request.match_info["package"]
dependencies = self.service(package_base=package_base).package_dependencies_get(package_base)
dependencies = await self.service(package_base=package_base).package_dependencies_get(package_base)
return self.json_response(dependencies.view())
@@ -95,6 +95,6 @@ class DependenciesView(StatusViewGuard, BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
self.service(package_base=package_base).package_dependencies_update(package_base, dependencies)
await self.service(package_base=package_base).package_dependencies_update(package_base, dependencies)
raise HTTPNoContent

View File

@@ -68,7 +68,7 @@ class HoldView(StatusViewGuard, BaseView):
raise HTTPBadRequest(reason=str(ex))
try:
self.service().package_hold_update(package_base, enabled=is_held)
await self.service().package_hold_update(package_base, enabled=is_held)
except UnknownPackageError:
raise HTTPNotFound(reason=f"Package {package_base} is unknown")

View File

@@ -62,7 +62,7 @@ class LogsView(StatusViewGuard, BaseView):
"""
package_base = self.request.match_info["package"]
version = self.request.query.get("version")
self.service().package_logs_remove(package_base, version)
await self.service().package_logs_remove(package_base, version)
raise HTTPNoContent
@@ -89,8 +89,8 @@ class LogsView(StatusViewGuard, BaseView):
package_base = self.request.match_info["package"]
try:
_, status = self.service().package_get(package_base)
logs = self.service(package_base=package_base).package_logs_get(package_base, None, None, -1, 0)
_, status = await self.service().package_get(package_base)
logs = await self.service(package_base=package_base).package_logs_get(package_base, None, None, -1, 0)
except UnknownPackageError:
raise HTTPNotFound(reason=f"Package {package_base} is unknown")
@@ -127,6 +127,6 @@ class LogsView(StatusViewGuard, BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
self.service().package_logs_add(log_record)
await self.service().package_logs_add(log_record)
raise HTTPNoContent

View File

@@ -66,7 +66,7 @@ class PackageView(StatusViewGuard, BaseView):
HTTPNoContent: on success response
"""
package_base = self.request.match_info["package"]
self.service().package_remove(package_base)
await self.service().package_remove(package_base)
raise HTTPNoContent
@@ -94,7 +94,7 @@ class PackageView(StatusViewGuard, BaseView):
repository_id = self.repository_id()
try:
package, status = self.service(repository_id).package_get(package_base)
package, status = await self.service(repository_id).package_get(package_base)
except UnknownPackageError:
raise HTTPNotFound(reason=f"Package {package_base} is unknown")
@@ -137,9 +137,9 @@ class PackageView(StatusViewGuard, BaseView):
try:
if package is None:
self.service().package_status_update(package_base, status)
await self.service().package_status_update(package_base, status)
else:
self.service().package_update(package, status)
await self.service().package_update(package, status)
except UnknownPackageError:
raise HTTPBadRequest(reason=f"Package {package_base} is unknown, but no package body set")

View File

@@ -67,7 +67,7 @@ class PackagesView(StatusViewGuard, BaseView):
stop = offset + limit if limit >= 0 else None
repository_id = self.repository_id()
packages = self.service(repository_id).packages
packages = await self.service(repository_id).packages()
comparator: Callable[[tuple[Package, BuildStatus]], Comparable] = lambda items: items[0].base
response = [
@@ -95,6 +95,6 @@ class PackagesView(StatusViewGuard, BaseView):
Raises:
HTTPNoContent: on success response
"""
self.service().load()
await self.service().load()
raise HTTPNoContent

View File

@@ -57,7 +57,7 @@ class PatchView(StatusViewGuard, BaseView):
package_base = self.request.match_info["package"]
variable = self.request.match_info["patch"]
self.service().package_patches_remove(package_base, variable)
await self.service().package_patches_remove(package_base, variable)
raise HTTPNoContent
@@ -83,7 +83,7 @@ class PatchView(StatusViewGuard, BaseView):
package_base = self.request.match_info["package"]
variable = self.request.match_info["patch"]
patches = self.service().package_patches_get(package_base, variable)
patches = await self.service().package_patches_get(package_base, variable)
selected = next((patch for patch in patches if patch.key == variable), None)
if selected is None:

View File

@@ -57,7 +57,7 @@ class PatchesView(StatusViewGuard, BaseView):
Response: 200 with package patches on success
"""
package_base = self.request.match_info["package"]
patches = self.service().package_patches_get(package_base, None)
patches = await self.service().package_patches_get(package_base, None)
response = [patch.view() for patch in patches]
return self.json_response(response)
@@ -88,6 +88,6 @@ class PatchesView(StatusViewGuard, BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
self.service().package_patches_update(package_base, PkgbuildPatch.parse(key, value))
await self.service().package_patches_update(package_base, PkgbuildPatch.parse(key, value))
raise HTTPNoContent

View File

@@ -59,6 +59,6 @@ class LogsView(BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
self.service().logs_rotate(keep_last_records)
await self.service().logs_rotate(keep_last_records)
raise HTTPNoContent

View File

@@ -62,7 +62,7 @@ class StatusView(StatusViewGuard, BaseView):
Response: 200 with service status object
"""
repository_id = self.repository_id()
packages = self.service(repository_id).packages
packages = await self.service(repository_id).packages()
counters = Counters.from_packages(packages)
stats = RepositoryStats.from_packages([package for package, _ in packages])
@@ -101,6 +101,6 @@ class StatusView(StatusViewGuard, BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
self.service().status_update(status)
await self.service().status_update(status)
raise HTTPNoContent

View File

@@ -67,7 +67,7 @@ class LogsView(StatusViewGuard, BaseView):
version = self.request.query.get("version", None)
process = self.request.query.get("process_id", None)
logs = self.service(package_base=package_base).package_logs_get(package_base, version, process, limit, offset)
logs = await self.service(package_base=package_base).package_logs_get(package_base, version, process, limit, offset)
head = self.request.query.get("head", "false")
# pylint: disable=protected-access