diff --git a/docker/Dockerfile b/docker/Dockerfile
index 061f9ff9..16dfdc23 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -29,17 +29,16 @@ RUN pacman -S --noconfirm --asdeps \
python-filelock \
python-inflection \
python-pyelftools \
- python-requests \
- && \
- pacman -S --noconfirm --asdeps \
+ python-requests
+RUN pacman -S --noconfirm --asdeps \
base-devel \
python-build \
python-flit \
python-installer \
+ python-setuptools \
python-tox \
- python-wheel \
- && \
- pacman -S --noconfirm --asdeps \
+ python-wheel
+RUN pacman -S --noconfirm --asdeps \
git \
python-aiohttp \
python-aiohttp-openmetrics \
@@ -48,9 +47,8 @@ RUN pacman -S --noconfirm --asdeps \
python-cryptography \
python-jinja \
python-systemd \
- rsync \
- && \
- runuser -u build -- install-aur-package \
+ rsync
+RUN runuser -u build -- install-aur-package \
python-aioauth-client \
python-sphinx-typlog-theme \
python-webargs \
@@ -59,6 +57,7 @@ RUN pacman -S --noconfirm --asdeps \
python-aiohttp-jinja2 \
python-aiohttp-session \
python-aiohttp-security \
+ python-aiohttp-sse-git \
python-requests-unixsocket2
# install ahriman
diff --git a/package/archlinux/PKGBUILD b/package/archlinux/PKGBUILD
index 84e5e3c6..322ec56e 100644
--- a/package/archlinux/PKGBUILD
+++ b/package/archlinux/PKGBUILD
@@ -77,7 +77,7 @@ package_ahriman-triggers() {
package_ahriman-web() {
pkgname='ahriman-web'
pkgdesc="ArcH linux ReposItory MANager, web server"
- depends=("$pkgbase-core=$pkgver" 'python-aiohttp-cors' 'python-aiohttp-jinja2')
+ depends=("$pkgbase-core=$pkgver" 'python-aiohttp-cors' 'python-aiohttp-jinja2' 'python-aiohttp-sse-git')
optdepends=('python-aioauth-client: OAuth2 authorization support'
'python-aiohttp-apispec>=3.0.0: autogenerated API documentation'
'python-aiohttp-openmetrics: HTTP metrics support'
diff --git a/pyproject.toml b/pyproject.toml
index aae0ba62..9b706873 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -58,6 +58,7 @@ web = [
"aiohttp",
"aiohttp_cors",
"aiohttp_jinja2",
+ "aiohttp_sse",
]
web-auth = [
"ahriman[web]",
diff --git a/src/ahriman/core/status/event_bus.py b/src/ahriman/core/status/event_bus.py
new file mode 100644
index 00000000..c28f3e8c
--- /dev/null
+++ b/src/ahriman/core/status/event_bus.py
@@ -0,0 +1,101 @@
+#
+# Copyright (c) 2021-2026 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+import uuid
+
+from asyncio import Lock, Queue, QueueFull
+from typing import Any
+
+from ahriman.core.log import LazyLogging
+from ahriman.models.event import EventType
+
+
+SSEvent = tuple[str, dict[str, Any]]
+
+
+class EventBus(LazyLogging):
+ """
+ event bus implementation
+
+ Attributes:
+ max_size(int): maximum size of queue
+ """
+
+ def __init__(self, max_size: int) -> None:
+ """
+ Args:
+ max_size(int): maximum size of queue
+ """
+ self.max_size = max_size
+
+ self._lock = Lock()
+ self._subscribers: dict[str, Queue[SSEvent | None]] = {}
+
+ async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None:
+ """
+ broadcast event to all subscribers
+
+ Args:
+ event_type(EventType): event type
+ object_id(str | None): object identifier (e.g. package base)
+ **kwargs(Any): additional event data
+ """
+ event: dict[str, Any] = {"object_id": object_id}
+ event.update(kwargs)
+
+ async with self._lock:
+ for subscriber_id, queue in self._subscribers.items():
+ try:
+ queue.put_nowait((event_type, event))
+ except QueueFull:
+ self.logger.warning("discard message to slow subscriber %s", subscriber_id)
+
+ async def shutdown(self) -> None:
+ """
+ gracefully shutdown all subscribers
+ """
+ async with self._lock:
+ for queue in self._subscribers.values():
+ queue.put_nowait(None)
+ queue.shutdown()
+
+ async def subscribe(self) -> tuple[str, Queue[SSEvent | None]]:
+ """
+ register new subscriber to ``repository_id``
+
+ Returns:
+ tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue
+ """
+ subscriber_id = str(uuid.uuid4())
+ queue: Queue[SSEvent | None] = Queue(self.max_size)
+
+ async with self._lock:
+ self._subscribers[subscriber_id] = queue
+
+ return subscriber_id, queue
+
+ async def unsubscribe(self, subscriber_id: str) -> None:
+ """
+ unsubscribe from events
+
+ Args:
+ subscriber_id(str): subscriber unique identifier
+ """
+ async with self._lock:
+ self._subscribers.pop(subscriber_id, None)
diff --git a/src/ahriman/core/status/watcher.py b/src/ahriman/core/status/watcher.py
index 12863a6a..bd4a4088 100644
--- a/src/ahriman/core/status/watcher.py
+++ b/src/ahriman/core/status/watcher.py
@@ -17,15 +17,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
-from collections.abc import Callable
+# pylint: disable=too-many-public-methods
+from asyncio import Lock
from dataclasses import replace
-from threading import Lock
-from typing import Any, Self
+from typing import Self
from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.log import LazyLogging
from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.status import Client
+from ahriman.core.status.event_bus import EventBus
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
from ahriman.models.changes import Changes
from ahriman.models.dependencies import Dependencies
@@ -41,51 +42,74 @@ class Watcher(LazyLogging):
Attributes:
client(Client): reporter instance
+ event_bus(EventBus): event bus instance
package_info(PackageInfo): package info instance
status(BuildStatus): daemon status
"""
- def __init__(self, client: Client, package_info: PackageInfo) -> None:
+ def __init__(self, client: Client, package_info: PackageInfo, event_bus: EventBus) -> None:
"""
Args:
client(Client): reporter instance
package_info(PackageInfo): package info instance
+ event_bus(EventBus): event bus instance
"""
self.client = client
self.package_info = package_info
+ self.event_bus = event_bus
self._lock = Lock()
self._known: dict[str, tuple[Package, BuildStatus]] = {}
self.status = BuildStatus()
- @property
- def packages(self) -> list[tuple[Package, BuildStatus]]:
+ async def event_add(self, event: Event) -> None:
"""
- get current known packages list
+ create new event
+
+ Args:
+ event(Event): audit log event
+ """
+ self.client.event_add(event)
+
+ async def event_get(self, event: str | EventType | None, object_id: str | None,
+ from_date: int | float | None = None, to_date: int | float | None = None,
+ limit: int = -1, offset: int = 0) -> list[Event]:
+ """
+ retrieve list of events
+
+ Args:
+ event(str | EventType | None): filter by event type
+ object_id(str | None): filter by event object
+ from_date(int | float | None, optional): minimal creation date, inclusive (Default value = None)
+ to_date(int | float | None, optional): maximal creation date, exclusive (Default value = None)
+ limit(int, optional): limit records to the specified count, -1 means unlimited (Default value = -1)
+ offset(int, optional): records offset (Default value = 0)
Returns:
- list[tuple[Package, BuildStatus]]: list of packages together with their statuses
+ list[Event]: list of audit log events
"""
- with self._lock:
- return list(self._known.values())
+ return self.client.event_get(event, object_id, from_date, to_date, limit, offset)
- event_add: Callable[[Event], None]
-
- event_get: Callable[[str | EventType | None, str | None, int | None, int | None, int, int], list[Event]]
-
- def load(self) -> None:
+ async def load(self) -> None:
"""
load packages from local database
"""
- with self._lock:
+ async with self._lock:
self._known = {
package.base: (package, status)
for package, status in self.client.package_get(None)
}
- logs_rotate: Callable[[int], None]
+ async def logs_rotate(self, keep_last_records: int) -> None:
+ """
+ remove older logs from storage
- def package_archives(self, package_base: str) -> list[Package]:
+ Args:
+ keep_last_records(int): number of last records to keep
+ """
+ self.client.logs_rotate(keep_last_records)
+
+ async def package_archives(self, package_base: str) -> list[Package]:
"""
get known package archives
@@ -97,15 +121,51 @@ class Watcher(LazyLogging):
"""
return self.package_info.package_archives(package_base)
- package_changes_get: Callable[[str], Changes]
+ async def package_changes_get(self, package_base: str) -> Changes:
+ """
+ get package changes
- package_changes_update: Callable[[str, Changes], None]
+ Args:
+ package_base(str): package base to retrieve
- package_dependencies_get: Callable[[str], Dependencies]
+ Returns:
+ Changes: package changes if available and empty object otherwise
+ """
+ return self.client.package_changes_get(package_base)
- package_dependencies_update: Callable[[str, Dependencies], None]
+ async def package_changes_update(self, package_base: str, changes: Changes) -> None:
+ """
+ update package changes
- def package_get(self, package_base: str) -> tuple[Package, BuildStatus]:
+ Args:
+ package_base(str): package base to update
+ changes(Changes): changes descriptor
+ """
+ self.client.package_changes_update(package_base, changes)
+
+ async def package_dependencies_get(self, package_base: str) -> Dependencies:
+ """
+ get package dependencies
+
+ Args:
+ package_base(str): package base to retrieve
+
+ Returns:
+ list[Dependencies]: package implicit dependencies if available
+ """
+ return self.client.package_dependencies_get(package_base)
+
+ async def package_dependencies_update(self, package_base: str, dependencies: Dependencies) -> None:
+ """
+ update package dependencies
+
+ Args:
+ package_base(str): package base to update
+ dependencies(Dependencies): dependencies descriptor
+ """
+ self.client.package_dependencies_update(package_base, dependencies)
+
+ async def package_get(self, package_base: str) -> tuple[Package, BuildStatus]:
"""
get current package base build status
@@ -119,18 +179,12 @@ class Watcher(LazyLogging):
UnknownPackageError: if no package found
"""
try:
- with self._lock:
+ async with self._lock:
return self._known[package_base]
except KeyError:
raise UnknownPackageError(package_base) from None
- package_logs_add: Callable[[LogRecord], None]
-
- package_logs_get: Callable[[str, str | None, str | None, int, int], list[LogRecord]]
-
- package_logs_remove: Callable[[str, str | None], None]
-
- def package_hold_update(self, package_base: str, *, enabled: bool) -> None:
+ async def package_hold_update(self, package_base: str, *, enabled: bool) -> None:
"""
update package hold status
@@ -138,29 +192,104 @@ class Watcher(LazyLogging):
package_base(str): package base name
enabled(bool): new hold status
"""
- package, status = self.package_get(package_base)
- with self._lock:
+ package, status = await self.package_get(package_base)
+ async with self._lock:
self._known[package_base] = (package, replace(status, is_held=enabled))
self.client.package_hold_update(package_base, enabled=enabled)
- package_patches_get: Callable[[str, str | None], list[PkgbuildPatch]]
+ await self.event_bus.broadcast(EventType.PackageHeld, package_base, is_held=enabled)
- package_patches_remove: Callable[[str, str], None]
+ async def package_logs_add(self, log_record: LogRecord) -> None:
+ """
+ post log record
- package_patches_update: Callable[[str, PkgbuildPatch], None]
+ Args:
+ log_record(LogRecord): log record
+ """
+ self.client.package_logs_add(log_record)
- def package_remove(self, package_base: str) -> None:
+ await self.event_bus.broadcast(
+ EventType.BuildLog,
+ log_record.log_record_id.package_base,
+ created=log_record.created,
+ message=log_record.message,
+ version=log_record.log_record_id.version,
+ )
+
+ async def package_logs_get(self, package_base: str, version: str | None = None, process_id: str | None = None,
+ limit: int = -1, offset: int = 0) -> list[LogRecord]:
+ """
+ get package logs
+
+ Args:
+ package_base(str): package base
+ version(str | None, optional): package version to search (Default value = None)
+ process_id(str | None, optional): process identifier to search (Default value = None)
+ limit(int, optional): limit records to the specified count, -1 means unlimited (Default value = -1)
+ offset(int, optional): records offset (Default value = 0)
+
+ Returns:
+ list[LogRecord]: package logs
+ """
+ return self.client.package_logs_get(package_base, version, process_id, limit, offset)
+
+ async def package_logs_remove(self, package_base: str, version: str | None) -> None:
+ """
+ remove package logs
+
+ Args:
+ package_base(str): package base
+ version(str | None): package version to remove logs. If ``None`` is set, all logs will be removed
+ """
+ self.client.package_logs_remove(package_base, version)
+
+ async def package_patches_get(self, package_base: str, variable: str | None) -> list[PkgbuildPatch]:
+ """
+ get package patches
+
+ Args:
+ package_base(str): package base to retrieve
+ variable(str | None): optional filter by patch variable
+
+ Returns:
+ list[PkgbuildPatch]: list of patches for the specified package
+ """
+ return self.client.package_patches_get(package_base, variable)
+
+ async def package_patches_remove(self, package_base: str, variable: str | None) -> None:
+ """
+ remove package patch
+
+ Args:
+ package_base(str): package base to update
+ variable(str | None): patch name. If ``None`` is set, all patches will be removed
+ """
+ self.client.package_patches_remove(package_base, variable)
+
+ async def package_patches_update(self, package_base: str, patch: PkgbuildPatch) -> None:
+ """
+ create or update package patch
+
+ Args:
+ package_base(str): package base to update
+ patch(PkgbuildPatch): package patch
+ """
+ self.client.package_patches_update(package_base, patch)
+
+ async def package_remove(self, package_base: str) -> None:
"""
remove package base from known list if any
Args:
package_base(str): package base
"""
- with self._lock:
+ async with self._lock:
self._known.pop(package_base, None)
self.client.package_remove(package_base)
- def package_status_update(self, package_base: str, status: BuildStatusEnum) -> None:
+ await self.event_bus.broadcast(EventType.PackageRemoved, package_base)
+
+ async def package_status_update(self, package_base: str, status: BuildStatusEnum) -> None:
"""
update package status
@@ -168,12 +297,14 @@ class Watcher(LazyLogging):
package_base(str): package base to update
status(BuildStatusEnum): new build status
"""
- package, current_status = self.package_get(package_base)
- with self._lock:
+ package, current_status = await self.package_get(package_base)
+ async with self._lock:
self._known[package_base] = (package, BuildStatus(status, is_held=current_status.is_held))
self.client.package_status_update(package_base, status)
- def package_update(self, package: Package, status: BuildStatusEnum) -> None:
+ await self.event_bus.broadcast(EventType.PackageStatusChanged, package_base, status=status.value)
+
+ async def package_update(self, package: Package, status: BuildStatusEnum) -> None:
"""
update package
@@ -181,12 +312,32 @@ class Watcher(LazyLogging):
package(Package): package description
status(BuildStatusEnum): new build status
"""
- with self._lock:
+ async with self._lock:
_, current_status = self._known.get(package.base, (package, BuildStatus()))
self._known[package.base] = (package, BuildStatus(status, is_held=current_status.is_held))
self.client.package_update(package, status)
- def status_update(self, status: BuildStatusEnum) -> None:
+ await self.event_bus.broadcast(
+ EventType.PackageUpdated, package.base, status=status.value, version=package.version,
+ )
+
+ async def packages(self) -> list[tuple[Package, BuildStatus]]:
+ """
+ get current known packages list
+
+ Returns:
+ list[tuple[Package, BuildStatus]]: list of packages together with their statuses
+ """
+ async with self._lock:
+ return list(self._known.values())
+
+ async def shutdown(self) -> None:
+ """
+ gracefully shutdown watcher
+ """
+ await self.event_bus.shutdown()
+
+ async def status_update(self, status: BuildStatusEnum) -> None:
"""
update service status
@@ -195,6 +346,8 @@ class Watcher(LazyLogging):
"""
self.status = BuildStatus(status)
+ await self.event_bus.broadcast(EventType.ServiceStatusChanged, None, status=status.value)
+
def __call__(self, package_base: str | None) -> Self:
"""
extract client for future calls
@@ -204,24 +357,11 @@ class Watcher(LazyLogging):
Returns:
Self: instance of self to pass calls to the client
- """
- if package_base is not None:
- _ = self.package_get(package_base)
- return self
-
- def __getattr__(self, item: str) -> Any:
- """
- proxy methods for reporter client
-
- Args:
- item(str): property name
-
- Returns:
- Any: attribute by its name
Raises:
- AttributeError: in case if no such attribute found
+ UnknownPackageError: if no package found
"""
- if (method := getattr(self.client, item, None)) is not None:
- return method
- raise AttributeError(f"'{self.__class__.__qualname__}' object has no attribute '{item}'")
+ # keep check here instead of calling package_get to keep this method synchronized
+ if package_base is not None and package_base not in self._known:
+ raise UnknownPackageError(package_base)
+ return self
diff --git a/src/ahriman/models/event.py b/src/ahriman/models/event.py
index f6ceb46c..cbda6180 100644
--- a/src/ahriman/models/event.py
+++ b/src/ahriman/models/event.py
@@ -28,16 +28,24 @@ class EventType(StrEnum):
predefined event types
Attributes:
+ BuildLog(EventType): new build log line
+ PackageHeld(EventType): package hold status has been changed
PackageOutdated(EventType): package has been marked as out-of-date
PackageRemoved(EventType): package has been removed
+ PackageStatusChanged(EventType): package build status has been changed
PackageUpdateFailed(EventType): package update has been failed
PackageUpdated(EventType): package has been updated
+ ServiceStatusChanged(EventType): service status has been changed
"""
+ BuildLog = "build-log"
+ PackageHeld = "package-held"
PackageOutdated = "package-outdated"
PackageRemoved = "package-removed"
+ PackageStatusChanged = "package-status-changed"
PackageUpdateFailed = "package-update-failed"
PackageUpdated = "package-updated"
+ ServiceStatusChanged = "service-status-changed"
class Event:
diff --git a/src/ahriman/web/views/v1/auditlog/event_bus.py b/src/ahriman/web/views/v1/auditlog/event_bus.py
new file mode 100644
index 00000000..f2f2d23d
--- /dev/null
+++ b/src/ahriman/web/views/v1/auditlog/event_bus.py
@@ -0,0 +1,70 @@
+#
+# Copyright (c) 2021-2026 ahriman team.
+#
+# This file is part of ahriman
+# (see https://github.com/arcan1s/ahriman).
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+import json
+
+from aiohttp.web import StreamResponse
+from aiohttp_sse import sse_response
+from typing import ClassVar
+
+from ahriman.models.user_access import UserAccess
+from ahriman.web.apispec.decorators import apidocs
+from ahriman.web.schemas import EventSchema, RepositoryIdSchema
+from ahriman.web.views.base import BaseView
+
+
+class EventBusView(BaseView):
+ """
+ audit log view
+
+ Attributes:
+ GET_PERMISSION(UserAccess): (class attribute) get permissions of self
+ """
+
+ GET_PERMISSION: ClassVar[UserAccess] = UserAccess.Full
+ ROUTES = ["/api/v1/events/stream"]
+
+ @apidocs(
+ tags=["Audit log"],
+ summary="Live updates",
+ description="Stream live updates via SSE",
+ permission=GET_PERMISSION,
+ error_404_description="Repository is unknown",
+ schema=EventSchema(many=True),
+ query_schema=RepositoryIdSchema,
+ )
+ async def get(self) -> StreamResponse:
+ """
+ subscribe on updates
+
+ Returns:
+ StreamResponse: 200 with streaming updates
+ """
+ async with sse_response(self.request) as response:
+ subscription_id, queue = await self.service().event_bus.subscribe()
+
+ try:
+ while response.is_connected():
+ while (message := await queue.get()) is not None:
+ event_type, data = message
+ await response.send(json.dumps(data), event=event_type)
+ finally:
+ await self.service().event_bus.unsubscribe(subscription_id)
+
+ return response
diff --git a/src/ahriman/web/views/v1/auditlog/events.py b/src/ahriman/web/views/v1/auditlog/events.py
index 968e174c..2f6c3db8 100644
--- a/src/ahriman/web/views/v1/auditlog/events.py
+++ b/src/ahriman/web/views/v1/auditlog/events.py
@@ -67,7 +67,7 @@ class EventsView(BaseView):
except ValueError as ex:
raise HTTPBadRequest(reason=str(ex))
- events = self.service().event_get(event, object_id, from_date, to_date, limit, offset)
+ events = await self.service().event_get(event, object_id, from_date, to_date, limit, offset)
response = [event.view() for event in events]
return self.json_response(response)
@@ -94,6 +94,6 @@ class EventsView(BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
- self.service().event_add(event)
+ await self.service().event_add(event)
raise HTTPNoContent
diff --git a/src/ahriman/web/views/v1/packages/archives.py b/src/ahriman/web/views/v1/packages/archives.py
index 8c577bf5..9e7a140d 100644
--- a/src/ahriman/web/views/v1/packages/archives.py
+++ b/src/ahriman/web/views/v1/packages/archives.py
@@ -60,6 +60,6 @@ class Archives(StatusViewGuard, BaseView):
"""
package_base = self.request.match_info["package"]
- archives = self.service(package_base=package_base).package_archives(package_base)
+ archives = await self.service(package_base=package_base).package_archives(package_base)
return self.json_response([archive.view() for archive in archives])
diff --git a/src/ahriman/web/views/v1/packages/changes.py b/src/ahriman/web/views/v1/packages/changes.py
index 0e98eb1c..562a0859 100644
--- a/src/ahriman/web/views/v1/packages/changes.py
+++ b/src/ahriman/web/views/v1/packages/changes.py
@@ -63,7 +63,7 @@ class ChangesView(StatusViewGuard, BaseView):
"""
package_base = self.request.match_info["package"]
- changes = self.service(package_base=package_base).package_changes_get(package_base)
+ changes = await self.service(package_base=package_base).package_changes_get(package_base)
return self.json_response(changes.view())
@@ -97,6 +97,6 @@ class ChangesView(StatusViewGuard, BaseView):
raise HTTPBadRequest(reason=str(ex))
changes = Changes(last_commit_sha, change, pkgbuild)
- self.service().package_changes_update(package_base, changes)
+ await self.service().package_changes_update(package_base, changes)
raise HTTPNoContent
diff --git a/src/ahriman/web/views/v1/packages/dependencies.py b/src/ahriman/web/views/v1/packages/dependencies.py
index 9d4de53e..e8467576 100644
--- a/src/ahriman/web/views/v1/packages/dependencies.py
+++ b/src/ahriman/web/views/v1/packages/dependencies.py
@@ -63,7 +63,7 @@ class DependenciesView(StatusViewGuard, BaseView):
"""
package_base = self.request.match_info["package"]
- dependencies = self.service(package_base=package_base).package_dependencies_get(package_base)
+ dependencies = await self.service(package_base=package_base).package_dependencies_get(package_base)
return self.json_response(dependencies.view())
@@ -95,6 +95,6 @@ class DependenciesView(StatusViewGuard, BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
- self.service(package_base=package_base).package_dependencies_update(package_base, dependencies)
+ await self.service(package_base=package_base).package_dependencies_update(package_base, dependencies)
raise HTTPNoContent
diff --git a/src/ahriman/web/views/v1/packages/hold.py b/src/ahriman/web/views/v1/packages/hold.py
index 0a342ce4..cc6e01f4 100644
--- a/src/ahriman/web/views/v1/packages/hold.py
+++ b/src/ahriman/web/views/v1/packages/hold.py
@@ -68,7 +68,7 @@ class HoldView(StatusViewGuard, BaseView):
raise HTTPBadRequest(reason=str(ex))
try:
- self.service().package_hold_update(package_base, enabled=is_held)
+ await self.service().package_hold_update(package_base, enabled=is_held)
except UnknownPackageError:
raise HTTPNotFound(reason=f"Package {package_base} is unknown")
diff --git a/src/ahriman/web/views/v1/packages/logs.py b/src/ahriman/web/views/v1/packages/logs.py
index 26f4bf10..c63a6d6c 100644
--- a/src/ahriman/web/views/v1/packages/logs.py
+++ b/src/ahriman/web/views/v1/packages/logs.py
@@ -62,7 +62,7 @@ class LogsView(StatusViewGuard, BaseView):
"""
package_base = self.request.match_info["package"]
version = self.request.query.get("version")
- self.service().package_logs_remove(package_base, version)
+ await self.service().package_logs_remove(package_base, version)
raise HTTPNoContent
@@ -89,8 +89,8 @@ class LogsView(StatusViewGuard, BaseView):
package_base = self.request.match_info["package"]
try:
- _, status = self.service().package_get(package_base)
- logs = self.service(package_base=package_base).package_logs_get(package_base, None, None, -1, 0)
+ _, status = await self.service().package_get(package_base)
+ logs = await self.service(package_base=package_base).package_logs_get(package_base, None, None, -1, 0)
except UnknownPackageError:
raise HTTPNotFound(reason=f"Package {package_base} is unknown")
@@ -127,6 +127,6 @@ class LogsView(StatusViewGuard, BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
- self.service().package_logs_add(log_record)
+ await self.service().package_logs_add(log_record)
raise HTTPNoContent
diff --git a/src/ahriman/web/views/v1/packages/package.py b/src/ahriman/web/views/v1/packages/package.py
index 79e2546b..41f44d6c 100644
--- a/src/ahriman/web/views/v1/packages/package.py
+++ b/src/ahriman/web/views/v1/packages/package.py
@@ -66,7 +66,7 @@ class PackageView(StatusViewGuard, BaseView):
HTTPNoContent: on success response
"""
package_base = self.request.match_info["package"]
- self.service().package_remove(package_base)
+ await self.service().package_remove(package_base)
raise HTTPNoContent
@@ -94,7 +94,7 @@ class PackageView(StatusViewGuard, BaseView):
repository_id = self.repository_id()
try:
- package, status = self.service(repository_id).package_get(package_base)
+ package, status = await self.service(repository_id).package_get(package_base)
except UnknownPackageError:
raise HTTPNotFound(reason=f"Package {package_base} is unknown")
@@ -137,9 +137,9 @@ class PackageView(StatusViewGuard, BaseView):
try:
if package is None:
- self.service().package_status_update(package_base, status)
+ await self.service().package_status_update(package_base, status)
else:
- self.service().package_update(package, status)
+ await self.service().package_update(package, status)
except UnknownPackageError:
raise HTTPBadRequest(reason=f"Package {package_base} is unknown, but no package body set")
diff --git a/src/ahriman/web/views/v1/packages/packages.py b/src/ahriman/web/views/v1/packages/packages.py
index a93d82b9..f01c7760 100644
--- a/src/ahriman/web/views/v1/packages/packages.py
+++ b/src/ahriman/web/views/v1/packages/packages.py
@@ -67,7 +67,7 @@ class PackagesView(StatusViewGuard, BaseView):
stop = offset + limit if limit >= 0 else None
repository_id = self.repository_id()
- packages = self.service(repository_id).packages
+ packages = await self.service(repository_id).packages()
comparator: Callable[[tuple[Package, BuildStatus]], Comparable] = lambda items: items[0].base
response = [
@@ -95,6 +95,6 @@ class PackagesView(StatusViewGuard, BaseView):
Raises:
HTTPNoContent: on success response
"""
- self.service().load()
+ await self.service().load()
raise HTTPNoContent
diff --git a/src/ahriman/web/views/v1/packages/patch.py b/src/ahriman/web/views/v1/packages/patch.py
index 9ec13bd4..936e5bec 100644
--- a/src/ahriman/web/views/v1/packages/patch.py
+++ b/src/ahriman/web/views/v1/packages/patch.py
@@ -57,7 +57,7 @@ class PatchView(StatusViewGuard, BaseView):
package_base = self.request.match_info["package"]
variable = self.request.match_info["patch"]
- self.service().package_patches_remove(package_base, variable)
+ await self.service().package_patches_remove(package_base, variable)
raise HTTPNoContent
@@ -83,7 +83,7 @@ class PatchView(StatusViewGuard, BaseView):
package_base = self.request.match_info["package"]
variable = self.request.match_info["patch"]
- patches = self.service().package_patches_get(package_base, variable)
+ patches = await self.service().package_patches_get(package_base, variable)
selected = next((patch for patch in patches if patch.key == variable), None)
if selected is None:
diff --git a/src/ahriman/web/views/v1/packages/patches.py b/src/ahriman/web/views/v1/packages/patches.py
index dd8346e0..cf4533df 100644
--- a/src/ahriman/web/views/v1/packages/patches.py
+++ b/src/ahriman/web/views/v1/packages/patches.py
@@ -57,7 +57,7 @@ class PatchesView(StatusViewGuard, BaseView):
Response: 200 with package patches on success
"""
package_base = self.request.match_info["package"]
- patches = self.service().package_patches_get(package_base, None)
+ patches = await self.service().package_patches_get(package_base, None)
response = [patch.view() for patch in patches]
return self.json_response(response)
@@ -88,6 +88,6 @@ class PatchesView(StatusViewGuard, BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
- self.service().package_patches_update(package_base, PkgbuildPatch.parse(key, value))
+ await self.service().package_patches_update(package_base, PkgbuildPatch.parse(key, value))
raise HTTPNoContent
diff --git a/src/ahriman/web/views/v1/service/logs.py b/src/ahriman/web/views/v1/service/logs.py
index a249883e..94767494 100644
--- a/src/ahriman/web/views/v1/service/logs.py
+++ b/src/ahriman/web/views/v1/service/logs.py
@@ -59,6 +59,6 @@ class LogsView(BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
- self.service().logs_rotate(keep_last_records)
+ await self.service().logs_rotate(keep_last_records)
raise HTTPNoContent
diff --git a/src/ahriman/web/views/v1/status/status.py b/src/ahriman/web/views/v1/status/status.py
index 55a851a0..a06fd4f9 100644
--- a/src/ahriman/web/views/v1/status/status.py
+++ b/src/ahriman/web/views/v1/status/status.py
@@ -62,7 +62,7 @@ class StatusView(StatusViewGuard, BaseView):
Response: 200 with service status object
"""
repository_id = self.repository_id()
- packages = self.service(repository_id).packages
+ packages = await self.service(repository_id).packages()
counters = Counters.from_packages(packages)
stats = RepositoryStats.from_packages([package for package, _ in packages])
@@ -101,6 +101,6 @@ class StatusView(StatusViewGuard, BaseView):
except Exception as ex:
raise HTTPBadRequest(reason=str(ex))
- self.service().status_update(status)
+ await self.service().status_update(status)
raise HTTPNoContent
diff --git a/src/ahriman/web/views/v2/packages/logs.py b/src/ahriman/web/views/v2/packages/logs.py
index 231e7091..6ed20482 100644
--- a/src/ahriman/web/views/v2/packages/logs.py
+++ b/src/ahriman/web/views/v2/packages/logs.py
@@ -67,7 +67,7 @@ class LogsView(StatusViewGuard, BaseView):
version = self.request.query.get("version", None)
process = self.request.query.get("process_id", None)
- logs = self.service(package_base=package_base).package_logs_get(package_base, version, process, limit, offset)
+ logs = await self.service(package_base=package_base).package_logs_get(package_base, version, process, limit, offset)
head = self.request.query.get("head", "false")
# pylint: disable=protected-access
diff --git a/src/ahriman/web/web.py b/src/ahriman/web/web.py
index 142f2508..7cc9905f 100644
--- a/src/ahriman/web/web.py
+++ b/src/ahriman/web/web.py
@@ -33,6 +33,7 @@ from ahriman.core.exceptions import InitializeError
from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.spawn import Spawn
from ahriman.core.status import Client
+from ahriman.core.status.event_bus import EventBus
from ahriman.core.status.watcher import Watcher
from ahriman.models.repository_id import RepositoryId
from ahriman.web.apispec.info import setup_apispec
@@ -108,7 +109,9 @@ def _create_watcher(path: Path, repository_id: RepositoryId) -> Watcher:
package_info.reporter = client
package_info.repository_id = repository_id
- return Watcher(client, package_info)
+ event_bus = EventBus(configuration.getint("web", "max_queue_size", fallback=0))
+
+ return Watcher(client, package_info, event_bus)
async def _on_shutdown(application: Application) -> None:
@@ -118,6 +121,8 @@ async def _on_shutdown(application: Application) -> None:
Args:
application(Application): web application instance
"""
+ for watcher in application[WatcherKey].values():
+ await watcher.shutdown()
application.logger.warning("server terminated")
@@ -135,7 +140,7 @@ async def _on_startup(application: Application) -> None:
try:
for watcher in application[WatcherKey].values():
- watcher.load()
+ await watcher.load()
except Exception:
message = "could not load packages"
application.logger.exception(message)
diff --git a/tests/ahriman/conftest.py b/tests/ahriman/conftest.py
index 56fadf19..99d27d83 100644
--- a/tests/ahriman/conftest.py
+++ b/tests/ahriman/conftest.py
@@ -19,6 +19,7 @@ from ahriman.core.repository import Repository
from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.spawn import Spawn
from ahriman.core.status import Client
+from ahriman.core.status.event_bus import EventBus
from ahriman.core.status.watcher import Watcher
from ahriman.models.aur_package import AURPackage
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
@@ -690,4 +691,5 @@ def watcher(local_client: Client) -> Watcher:
Watcher: package status watcher test instance
"""
package_info = PackageInfo()
- return Watcher(local_client, package_info)
+ event_bus = EventBus(0)
+ return Watcher(local_client, package_info, event_bus)
diff --git a/tests/ahriman/core/status/test_watcher.py b/tests/ahriman/core/status/test_watcher.py
index 7646e553..f353a722 100644
--- a/tests/ahriman/core/status/test_watcher.py
+++ b/tests/ahriman/core/status/test_watcher.py
@@ -18,21 +18,21 @@ def test_packages(watcher: Watcher, package_ahriman: Package) -> None:
assert watcher.packages
-def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
+async def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must correctly load packages
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_get",
return_value=[(package_ahriman, BuildStatus())])
- watcher.load()
+ await watcher.load()
cache_mock.assert_called_once_with(None)
package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown
-def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
+async def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must correctly load packages with known statuses
"""
@@ -40,146 +40,147 @@ def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFi
mocker.patch("ahriman.core.status.local_client.LocalClient.package_get", return_value=[(package_ahriman, status)])
watcher._known = {package_ahriman.base: (package_ahriman, status)}
- watcher.load()
+ await watcher.load()
_, status = watcher._known[package_ahriman.base]
assert status.status == BuildStatusEnum.Success
-def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
+async def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must return package archives from package info
"""
archives_mock = mocker.patch("ahriman.core.repository.package_info.PackageInfo.package_archives",
return_value=[package_ahriman])
- result = watcher.package_archives(package_ahriman.base)
+ result = await watcher.package_archives(package_ahriman.base)
assert result == [package_ahriman]
archives_mock.assert_called_once_with(package_ahriman.base)
-def test_package_get(watcher: Watcher, package_ahriman: Package) -> None:
+async def test_package_get(watcher: Watcher, package_ahriman: Package) -> None:
"""
must return package status
"""
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
- package, status = watcher.package_get(package_ahriman.base)
+ package, status = await watcher.package_get(package_ahriman.base)
assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown
-def test_package_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
+async def test_package_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
"""
must fail on unknown package
"""
with pytest.raises(UnknownPackageError):
- watcher.package_get(package_ahriman.base)
+ await watcher.package_get(package_ahriman.base)
-def test_package_hold_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
+async def test_package_hold_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must update package hold status
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_hold_update")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
- watcher.package_hold_update(package_ahriman.base, enabled=True)
+ await watcher.package_hold_update(package_ahriman.base, enabled=True)
cache_mock.assert_called_once_with(package_ahriman.base, enabled=True)
_, status = watcher._known[package_ahriman.base]
assert status.is_held is True
-def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
+async def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
"""
must fail on unknown package hold update
"""
with pytest.raises(UnknownPackageError):
- watcher.package_hold_update(package_ahriman.base, enabled=True)
+ await watcher.package_hold_update(package_ahriman.base, enabled=True)
-def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
+async def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must remove package base
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
- watcher.package_remove(package_ahriman.base)
+ await watcher.package_remove(package_ahriman.base)
assert not watcher._known
cache_mock.assert_called_once_with(package_ahriman.base)
-def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
+async def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must not fail on unknown base removal
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
- watcher.package_remove(package_ahriman.base)
+ await watcher.package_remove(package_ahriman.base)
cache_mock.assert_called_once_with(package_ahriman.base)
-def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
+async def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must update package status only for known package
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
- watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
+ await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
cache_mock.assert_called_once_with(package_ahriman.base, pytest.helpers.anyvar(int))
package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Success
-def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
- mocker: MockerFixture) -> None:
+async def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
+ mocker: MockerFixture) -> None:
"""
must preserve hold status on package status update
"""
mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))}
- watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
+ await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
_, status = watcher._known[package_ahriman.base]
assert status.is_held is True
-def test_package_status_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
+async def test_package_status_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
"""
must fail on unknown package status update only
"""
with pytest.raises(UnknownPackageError):
- watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Unknown)
+ await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Unknown)
-def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
+async def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must add package to cache
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_update")
- watcher.package_update(package_ahriman, BuildStatusEnum.Unknown)
+ await watcher.package_update(package_ahriman, BuildStatusEnum.Unknown)
assert watcher.packages
cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int))
-def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
+async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
+ mocker: MockerFixture) -> None:
"""
must preserve hold status on package update
"""
mocker.patch("ahriman.core.status.local_client.LocalClient.package_update")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))}
- watcher.package_update(package_ahriman, BuildStatusEnum.Success)
+ await watcher.package_update(package_ahriman, BuildStatusEnum.Success)
_, status = watcher._known[package_ahriman.base]
assert status.is_held is True
-def test_status_update(watcher: Watcher) -> None:
+async def test_status_update(watcher: Watcher) -> None:
"""
must update service status
"""
- watcher.status_update(BuildStatusEnum.Success)
+ await watcher.status_update(BuildStatusEnum.Success)
assert watcher.status.status == BuildStatusEnum.Success
@@ -204,18 +205,3 @@ def test_call_failed(watcher: Watcher, package_ahriman: Package) -> None:
"""
with pytest.raises(UnknownPackageError):
assert watcher(package_ahriman.base)
-
-
-def test_getattr(watcher: Watcher) -> None:
- """
- must return client method call
- """
- assert watcher.package_logs_remove
-
-
-def test_getattr_unknown_method(watcher: Watcher) -> None:
- """
- must raise AttributeError in case if no reporter attribute found
- """
- with pytest.raises(AttributeError):
- assert watcher.random_method