event bus implementation

This commit is contained in:
2026-03-24 02:23:14 +02:00
parent 1c312bb528
commit d2aed850b6
23 changed files with 459 additions and 147 deletions

View File

@@ -29,17 +29,16 @@ RUN pacman -S --noconfirm --asdeps \
python-filelock \ python-filelock \
python-inflection \ python-inflection \
python-pyelftools \ python-pyelftools \
python-requests \ python-requests
&& \ RUN pacman -S --noconfirm --asdeps \
pacman -S --noconfirm --asdeps \
base-devel \ base-devel \
python-build \ python-build \
python-flit \ python-flit \
python-installer \ python-installer \
python-setuptools \
python-tox \ python-tox \
python-wheel \ python-wheel
&& \ RUN pacman -S --noconfirm --asdeps \
pacman -S --noconfirm --asdeps \
git \ git \
python-aiohttp \ python-aiohttp \
python-aiohttp-openmetrics \ python-aiohttp-openmetrics \
@@ -48,9 +47,8 @@ RUN pacman -S --noconfirm --asdeps \
python-cryptography \ python-cryptography \
python-jinja \ python-jinja \
python-systemd \ python-systemd \
rsync \ rsync
&& \ RUN runuser -u build -- install-aur-package \
runuser -u build -- install-aur-package \
python-aioauth-client \ python-aioauth-client \
python-sphinx-typlog-theme \ python-sphinx-typlog-theme \
python-webargs \ python-webargs \
@@ -59,6 +57,7 @@ RUN pacman -S --noconfirm --asdeps \
python-aiohttp-jinja2 \ python-aiohttp-jinja2 \
python-aiohttp-session \ python-aiohttp-session \
python-aiohttp-security \ python-aiohttp-security \
python-aiohttp-sse-git \
python-requests-unixsocket2 python-requests-unixsocket2
# install ahriman # install ahriman

View File

@@ -77,7 +77,7 @@ package_ahriman-triggers() {
package_ahriman-web() { package_ahriman-web() {
pkgname='ahriman-web' pkgname='ahriman-web'
pkgdesc="ArcH linux ReposItory MANager, web server" pkgdesc="ArcH linux ReposItory MANager, web server"
depends=("$pkgbase-core=$pkgver" 'python-aiohttp-cors' 'python-aiohttp-jinja2') depends=("$pkgbase-core=$pkgver" 'python-aiohttp-cors' 'python-aiohttp-jinja2' 'python-aiohttp-sse-git')
optdepends=('python-aioauth-client: OAuth2 authorization support' optdepends=('python-aioauth-client: OAuth2 authorization support'
'python-aiohttp-apispec>=3.0.0: autogenerated API documentation' 'python-aiohttp-apispec>=3.0.0: autogenerated API documentation'
'python-aiohttp-openmetrics: HTTP metrics support' 'python-aiohttp-openmetrics: HTTP metrics support'

View File

@@ -58,6 +58,7 @@ web = [
"aiohttp", "aiohttp",
"aiohttp_cors", "aiohttp_cors",
"aiohttp_jinja2", "aiohttp_jinja2",
"aiohttp_sse",
] ]
web-auth = [ web-auth = [
"ahriman[web]", "ahriman[web]",

View File

@@ -0,0 +1,101 @@
#
# Copyright (c) 2021-2026 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import uuid
from asyncio import Lock, Queue, QueueFull
from typing import Any
from ahriman.core.log import LazyLogging
from ahriman.models.event import EventType
SSEvent = tuple[str, dict[str, Any]]
class EventBus(LazyLogging):
"""
event bus implementation
Attributes:
max_size(int): maximum size of queue
"""
def __init__(self, max_size: int) -> None:
"""
Args:
max_size(int): maximum size of queue
"""
self.max_size = max_size
self._lock = Lock()
self._subscribers: dict[str, Queue[SSEvent | None]] = {}
async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None:
"""
broadcast event to all subscribers
Args:
event_type(EventType): event type
object_id(str | None): object identifier (e.g. package base)
**kwargs(Any): additional event data
"""
event: dict[str, Any] = {"object_id": object_id}
event.update(kwargs)
async with self._lock:
for subscriber_id, queue in self._subscribers.items():
try:
queue.put_nowait((event_type, event))
except QueueFull:
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
async def shutdown(self) -> None:
"""
gracefully shutdown all subscribers
"""
async with self._lock:
for queue in self._subscribers.values():
queue.put_nowait(None)
queue.shutdown()
async def subscribe(self) -> tuple[str, Queue[SSEvent | None]]:
"""
register new subscriber to ``repository_id``
Returns:
tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue
"""
subscriber_id = str(uuid.uuid4())
queue: Queue[SSEvent | None] = Queue(self.max_size)
async with self._lock:
self._subscribers[subscriber_id] = queue
return subscriber_id, queue
async def unsubscribe(self, subscriber_id: str) -> None:
"""
unsubscribe from events
Args:
subscriber_id(str): subscriber unique identifier
"""
async with self._lock:
self._subscribers.pop(subscriber_id, None)

View File

@@ -17,15 +17,16 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
from collections.abc import Callable # pylint: disable=too-many-public-methods
from asyncio import Lock
from dataclasses import replace from dataclasses import replace
from threading import Lock from typing import Self
from typing import Any, Self
from ahriman.core.exceptions import UnknownPackageError from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.log import LazyLogging from ahriman.core.log import LazyLogging
from ahriman.core.repository.package_info import PackageInfo from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.status import Client from ahriman.core.status import Client
from ahriman.core.status.event_bus import EventBus
from ahriman.models.build_status import BuildStatus, BuildStatusEnum from ahriman.models.build_status import BuildStatus, BuildStatusEnum
from ahriman.models.changes import Changes from ahriman.models.changes import Changes
from ahriman.models.dependencies import Dependencies from ahriman.models.dependencies import Dependencies
@@ -41,51 +42,74 @@ class Watcher(LazyLogging):
Attributes: Attributes:
client(Client): reporter instance client(Client): reporter instance
event_bus(EventBus): event bus instance
package_info(PackageInfo): package info instance package_info(PackageInfo): package info instance
status(BuildStatus): daemon status status(BuildStatus): daemon status
""" """
def __init__(self, client: Client, package_info: PackageInfo) -> None: def __init__(self, client: Client, package_info: PackageInfo, event_bus: EventBus) -> None:
""" """
Args: Args:
client(Client): reporter instance client(Client): reporter instance
package_info(PackageInfo): package info instance package_info(PackageInfo): package info instance
event_bus(EventBus): event bus instance
""" """
self.client = client self.client = client
self.package_info = package_info self.package_info = package_info
self.event_bus = event_bus
self._lock = Lock() self._lock = Lock()
self._known: dict[str, tuple[Package, BuildStatus]] = {} self._known: dict[str, tuple[Package, BuildStatus]] = {}
self.status = BuildStatus() self.status = BuildStatus()
@property async def event_add(self, event: Event) -> None:
def packages(self) -> list[tuple[Package, BuildStatus]]:
""" """
get current known packages list create new event
Args:
event(Event): audit log event
"""
self.client.event_add(event)
async def event_get(self, event: str | EventType | None, object_id: str | None,
from_date: int | float | None = None, to_date: int | float | None = None,
limit: int = -1, offset: int = 0) -> list[Event]:
"""
retrieve list of events
Args:
event(str | EventType | None): filter by event type
object_id(str | None): filter by event object
from_date(int | float | None, optional): minimal creation date, inclusive (Default value = None)
to_date(int | float | None, optional): maximal creation date, exclusive (Default value = None)
limit(int, optional): limit records to the specified count, -1 means unlimited (Default value = -1)
offset(int, optional): records offset (Default value = 0)
Returns: Returns:
list[tuple[Package, BuildStatus]]: list of packages together with their statuses list[Event]: list of audit log events
""" """
with self._lock: return self.client.event_get(event, object_id, from_date, to_date, limit, offset)
return list(self._known.values())
event_add: Callable[[Event], None] async def load(self) -> None:
event_get: Callable[[str | EventType | None, str | None, int | None, int | None, int, int], list[Event]]
def load(self) -> None:
""" """
load packages from local database load packages from local database
""" """
with self._lock: async with self._lock:
self._known = { self._known = {
package.base: (package, status) package.base: (package, status)
for package, status in self.client.package_get(None) for package, status in self.client.package_get(None)
} }
logs_rotate: Callable[[int], None] async def logs_rotate(self, keep_last_records: int) -> None:
"""
remove older logs from storage
def package_archives(self, package_base: str) -> list[Package]: Args:
keep_last_records(int): number of last records to keep
"""
self.client.logs_rotate(keep_last_records)
async def package_archives(self, package_base: str) -> list[Package]:
""" """
get known package archives get known package archives
@@ -97,15 +121,51 @@ class Watcher(LazyLogging):
""" """
return self.package_info.package_archives(package_base) return self.package_info.package_archives(package_base)
package_changes_get: Callable[[str], Changes] async def package_changes_get(self, package_base: str) -> Changes:
"""
get package changes
package_changes_update: Callable[[str, Changes], None] Args:
package_base(str): package base to retrieve
package_dependencies_get: Callable[[str], Dependencies] Returns:
Changes: package changes if available and empty object otherwise
"""
return self.client.package_changes_get(package_base)
package_dependencies_update: Callable[[str, Dependencies], None] async def package_changes_update(self, package_base: str, changes: Changes) -> None:
"""
update package changes
def package_get(self, package_base: str) -> tuple[Package, BuildStatus]: Args:
package_base(str): package base to update
changes(Changes): changes descriptor
"""
self.client.package_changes_update(package_base, changes)
async def package_dependencies_get(self, package_base: str) -> Dependencies:
"""
get package dependencies
Args:
package_base(str): package base to retrieve
Returns:
list[Dependencies]: package implicit dependencies if available
"""
return self.client.package_dependencies_get(package_base)
async def package_dependencies_update(self, package_base: str, dependencies: Dependencies) -> None:
"""
update package dependencies
Args:
package_base(str): package base to update
dependencies(Dependencies): dependencies descriptor
"""
self.client.package_dependencies_update(package_base, dependencies)
async def package_get(self, package_base: str) -> tuple[Package, BuildStatus]:
""" """
get current package base build status get current package base build status
@@ -119,18 +179,12 @@ class Watcher(LazyLogging):
UnknownPackageError: if no package found UnknownPackageError: if no package found
""" """
try: try:
with self._lock: async with self._lock:
return self._known[package_base] return self._known[package_base]
except KeyError: except KeyError:
raise UnknownPackageError(package_base) from None raise UnknownPackageError(package_base) from None
package_logs_add: Callable[[LogRecord], None] async def package_hold_update(self, package_base: str, *, enabled: bool) -> None:
package_logs_get: Callable[[str, str | None, str | None, int, int], list[LogRecord]]
package_logs_remove: Callable[[str, str | None], None]
def package_hold_update(self, package_base: str, *, enabled: bool) -> None:
""" """
update package hold status update package hold status
@@ -138,29 +192,104 @@ class Watcher(LazyLogging):
package_base(str): package base name package_base(str): package base name
enabled(bool): new hold status enabled(bool): new hold status
""" """
package, status = self.package_get(package_base) package, status = await self.package_get(package_base)
with self._lock: async with self._lock:
self._known[package_base] = (package, replace(status, is_held=enabled)) self._known[package_base] = (package, replace(status, is_held=enabled))
self.client.package_hold_update(package_base, enabled=enabled) self.client.package_hold_update(package_base, enabled=enabled)
package_patches_get: Callable[[str, str | None], list[PkgbuildPatch]] await self.event_bus.broadcast(EventType.PackageHeld, package_base, is_held=enabled)
package_patches_remove: Callable[[str, str], None] async def package_logs_add(self, log_record: LogRecord) -> None:
"""
post log record
package_patches_update: Callable[[str, PkgbuildPatch], None] Args:
log_record(LogRecord): log record
"""
self.client.package_logs_add(log_record)
def package_remove(self, package_base: str) -> None: await self.event_bus.broadcast(
EventType.BuildLog,
log_record.log_record_id.package_base,
created=log_record.created,
message=log_record.message,
version=log_record.log_record_id.version,
)
async def package_logs_get(self, package_base: str, version: str | None = None, process_id: str | None = None,
limit: int = -1, offset: int = 0) -> list[LogRecord]:
"""
get package logs
Args:
package_base(str): package base
version(str | None, optional): package version to search (Default value = None)
process_id(str | None, optional): process identifier to search (Default value = None)
limit(int, optional): limit records to the specified count, -1 means unlimited (Default value = -1)
offset(int, optional): records offset (Default value = 0)
Returns:
list[LogRecord]: package logs
"""
return self.client.package_logs_get(package_base, version, process_id, limit, offset)
async def package_logs_remove(self, package_base: str, version: str | None) -> None:
"""
remove package logs
Args:
package_base(str): package base
version(str | None): package version to remove logs. If ``None`` is set, all logs will be removed
"""
self.client.package_logs_remove(package_base, version)
async def package_patches_get(self, package_base: str, variable: str | None) -> list[PkgbuildPatch]:
"""
get package patches
Args:
package_base(str): package base to retrieve
variable(str | None): optional filter by patch variable
Returns:
list[PkgbuildPatch]: list of patches for the specified package
"""
return self.client.package_patches_get(package_base, variable)
async def package_patches_remove(self, package_base: str, variable: str | None) -> None:
"""
remove package patch
Args:
package_base(str): package base to update
variable(str | None): patch name. If ``None`` is set, all patches will be removed
"""
self.client.package_patches_remove(package_base, variable)
async def package_patches_update(self, package_base: str, patch: PkgbuildPatch) -> None:
"""
create or update package patch
Args:
package_base(str): package base to update
patch(PkgbuildPatch): package patch
"""
self.client.package_patches_update(package_base, patch)
async def package_remove(self, package_base: str) -> None:
""" """
remove package base from known list if any remove package base from known list if any
Args: Args:
package_base(str): package base package_base(str): package base
""" """
with self._lock: async with self._lock:
self._known.pop(package_base, None) self._known.pop(package_base, None)
self.client.package_remove(package_base) self.client.package_remove(package_base)
def package_status_update(self, package_base: str, status: BuildStatusEnum) -> None: await self.event_bus.broadcast(EventType.PackageRemoved, package_base)
async def package_status_update(self, package_base: str, status: BuildStatusEnum) -> None:
""" """
update package status update package status
@@ -168,12 +297,14 @@ class Watcher(LazyLogging):
package_base(str): package base to update package_base(str): package base to update
status(BuildStatusEnum): new build status status(BuildStatusEnum): new build status
""" """
package, current_status = self.package_get(package_base) package, current_status = await self.package_get(package_base)
with self._lock: async with self._lock:
self._known[package_base] = (package, BuildStatus(status, is_held=current_status.is_held)) self._known[package_base] = (package, BuildStatus(status, is_held=current_status.is_held))
self.client.package_status_update(package_base, status) self.client.package_status_update(package_base, status)
def package_update(self, package: Package, status: BuildStatusEnum) -> None: await self.event_bus.broadcast(EventType.PackageStatusChanged, package_base, status=status.value)
async def package_update(self, package: Package, status: BuildStatusEnum) -> None:
""" """
update package update package
@@ -181,12 +312,32 @@ class Watcher(LazyLogging):
package(Package): package description package(Package): package description
status(BuildStatusEnum): new build status status(BuildStatusEnum): new build status
""" """
with self._lock: async with self._lock:
_, current_status = self._known.get(package.base, (package, BuildStatus())) _, current_status = self._known.get(package.base, (package, BuildStatus()))
self._known[package.base] = (package, BuildStatus(status, is_held=current_status.is_held)) self._known[package.base] = (package, BuildStatus(status, is_held=current_status.is_held))
self.client.package_update(package, status) self.client.package_update(package, status)
def status_update(self, status: BuildStatusEnum) -> None: await self.event_bus.broadcast(
EventType.PackageUpdated, package.base, status=status.value, version=package.version,
)
async def packages(self) -> list[tuple[Package, BuildStatus]]:
"""
get current known packages list
Returns:
list[tuple[Package, BuildStatus]]: list of packages together with their statuses
"""
async with self._lock:
return list(self._known.values())
async def shutdown(self) -> None:
"""
gracefully shutdown watcher
"""
await self.event_bus.shutdown()
async def status_update(self, status: BuildStatusEnum) -> None:
""" """
update service status update service status
@@ -195,6 +346,8 @@ class Watcher(LazyLogging):
""" """
self.status = BuildStatus(status) self.status = BuildStatus(status)
await self.event_bus.broadcast(EventType.ServiceStatusChanged, None, status=status.value)
def __call__(self, package_base: str | None) -> Self: def __call__(self, package_base: str | None) -> Self:
""" """
extract client for future calls extract client for future calls
@@ -204,24 +357,11 @@ class Watcher(LazyLogging):
Returns: Returns:
Self: instance of self to pass calls to the client Self: instance of self to pass calls to the client
"""
if package_base is not None:
_ = self.package_get(package_base)
return self
def __getattr__(self, item: str) -> Any:
"""
proxy methods for reporter client
Args:
item(str): property name
Returns:
Any: attribute by its name
Raises: Raises:
AttributeError: in case if no such attribute found UnknownPackageError: if no package found
""" """
if (method := getattr(self.client, item, None)) is not None: # keep check here instead of calling package_get to keep this method synchronized
return method if package_base is not None and package_base not in self._known:
raise AttributeError(f"'{self.__class__.__qualname__}' object has no attribute '{item}'") raise UnknownPackageError(package_base)
return self

View File

@@ -28,16 +28,24 @@ class EventType(StrEnum):
predefined event types predefined event types
Attributes: Attributes:
BuildLog(EventType): new build log line
PackageHeld(EventType): package hold status has been changed
PackageOutdated(EventType): package has been marked as out-of-date PackageOutdated(EventType): package has been marked as out-of-date
PackageRemoved(EventType): package has been removed PackageRemoved(EventType): package has been removed
PackageStatusChanged(EventType): package build status has been changed
PackageUpdateFailed(EventType): package update has been failed PackageUpdateFailed(EventType): package update has been failed
PackageUpdated(EventType): package has been updated PackageUpdated(EventType): package has been updated
ServiceStatusChanged(EventType): service status has been changed
""" """
BuildLog = "build-log"
PackageHeld = "package-held"
PackageOutdated = "package-outdated" PackageOutdated = "package-outdated"
PackageRemoved = "package-removed" PackageRemoved = "package-removed"
PackageStatusChanged = "package-status-changed"
PackageUpdateFailed = "package-update-failed" PackageUpdateFailed = "package-update-failed"
PackageUpdated = "package-updated" PackageUpdated = "package-updated"
ServiceStatusChanged = "service-status-changed"
class Event: class Event:

View File

@@ -0,0 +1,70 @@
#
# Copyright (c) 2021-2026 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import json
from aiohttp.web import StreamResponse
from aiohttp_sse import sse_response
from typing import ClassVar
from ahriman.models.user_access import UserAccess
from ahriman.web.apispec.decorators import apidocs
from ahriman.web.schemas import EventSchema, RepositoryIdSchema
from ahriman.web.views.base import BaseView
class EventBusView(BaseView):
"""
audit log view
Attributes:
GET_PERMISSION(UserAccess): (class attribute) get permissions of self
"""
GET_PERMISSION: ClassVar[UserAccess] = UserAccess.Full
ROUTES = ["/api/v1/events/stream"]
@apidocs(
tags=["Audit log"],
summary="Live updates",
description="Stream live updates via SSE",
permission=GET_PERMISSION,
error_404_description="Repository is unknown",
schema=EventSchema(many=True),
query_schema=RepositoryIdSchema,
)
async def get(self) -> StreamResponse:
"""
subscribe on updates
Returns:
StreamResponse: 200 with streaming updates
"""
async with sse_response(self.request) as response:
subscription_id, queue = await self.service().event_bus.subscribe()
try:
while response.is_connected():
while (message := await queue.get()) is not None:
event_type, data = message
await response.send(json.dumps(data), event=event_type)
finally:
await self.service().event_bus.unsubscribe(subscription_id)
return response

View File

@@ -67,7 +67,7 @@ class EventsView(BaseView):
except ValueError as ex: except ValueError as ex:
raise HTTPBadRequest(reason=str(ex)) raise HTTPBadRequest(reason=str(ex))
events = self.service().event_get(event, object_id, from_date, to_date, limit, offset) events = await self.service().event_get(event, object_id, from_date, to_date, limit, offset)
response = [event.view() for event in events] response = [event.view() for event in events]
return self.json_response(response) return self.json_response(response)
@@ -94,6 +94,6 @@ class EventsView(BaseView):
except Exception as ex: except Exception as ex:
raise HTTPBadRequest(reason=str(ex)) raise HTTPBadRequest(reason=str(ex))
self.service().event_add(event) await self.service().event_add(event)
raise HTTPNoContent raise HTTPNoContent

View File

@@ -60,6 +60,6 @@ class Archives(StatusViewGuard, BaseView):
""" """
package_base = self.request.match_info["package"] package_base = self.request.match_info["package"]
archives = self.service(package_base=package_base).package_archives(package_base) archives = await self.service(package_base=package_base).package_archives(package_base)
return self.json_response([archive.view() for archive in archives]) return self.json_response([archive.view() for archive in archives])

View File

@@ -63,7 +63,7 @@ class ChangesView(StatusViewGuard, BaseView):
""" """
package_base = self.request.match_info["package"] package_base = self.request.match_info["package"]
changes = self.service(package_base=package_base).package_changes_get(package_base) changes = await self.service(package_base=package_base).package_changes_get(package_base)
return self.json_response(changes.view()) return self.json_response(changes.view())
@@ -97,6 +97,6 @@ class ChangesView(StatusViewGuard, BaseView):
raise HTTPBadRequest(reason=str(ex)) raise HTTPBadRequest(reason=str(ex))
changes = Changes(last_commit_sha, change, pkgbuild) changes = Changes(last_commit_sha, change, pkgbuild)
self.service().package_changes_update(package_base, changes) await self.service().package_changes_update(package_base, changes)
raise HTTPNoContent raise HTTPNoContent

View File

@@ -63,7 +63,7 @@ class DependenciesView(StatusViewGuard, BaseView):
""" """
package_base = self.request.match_info["package"] package_base = self.request.match_info["package"]
dependencies = self.service(package_base=package_base).package_dependencies_get(package_base) dependencies = await self.service(package_base=package_base).package_dependencies_get(package_base)
return self.json_response(dependencies.view()) return self.json_response(dependencies.view())
@@ -95,6 +95,6 @@ class DependenciesView(StatusViewGuard, BaseView):
except Exception as ex: except Exception as ex:
raise HTTPBadRequest(reason=str(ex)) raise HTTPBadRequest(reason=str(ex))
self.service(package_base=package_base).package_dependencies_update(package_base, dependencies) await self.service(package_base=package_base).package_dependencies_update(package_base, dependencies)
raise HTTPNoContent raise HTTPNoContent

View File

@@ -68,7 +68,7 @@ class HoldView(StatusViewGuard, BaseView):
raise HTTPBadRequest(reason=str(ex)) raise HTTPBadRequest(reason=str(ex))
try: try:
self.service().package_hold_update(package_base, enabled=is_held) await self.service().package_hold_update(package_base, enabled=is_held)
except UnknownPackageError: except UnknownPackageError:
raise HTTPNotFound(reason=f"Package {package_base} is unknown") raise HTTPNotFound(reason=f"Package {package_base} is unknown")

View File

@@ -62,7 +62,7 @@ class LogsView(StatusViewGuard, BaseView):
""" """
package_base = self.request.match_info["package"] package_base = self.request.match_info["package"]
version = self.request.query.get("version") version = self.request.query.get("version")
self.service().package_logs_remove(package_base, version) await self.service().package_logs_remove(package_base, version)
raise HTTPNoContent raise HTTPNoContent
@@ -89,8 +89,8 @@ class LogsView(StatusViewGuard, BaseView):
package_base = self.request.match_info["package"] package_base = self.request.match_info["package"]
try: try:
_, status = self.service().package_get(package_base) _, status = await self.service().package_get(package_base)
logs = self.service(package_base=package_base).package_logs_get(package_base, None, None, -1, 0) logs = await self.service(package_base=package_base).package_logs_get(package_base, None, None, -1, 0)
except UnknownPackageError: except UnknownPackageError:
raise HTTPNotFound(reason=f"Package {package_base} is unknown") raise HTTPNotFound(reason=f"Package {package_base} is unknown")
@@ -127,6 +127,6 @@ class LogsView(StatusViewGuard, BaseView):
except Exception as ex: except Exception as ex:
raise HTTPBadRequest(reason=str(ex)) raise HTTPBadRequest(reason=str(ex))
self.service().package_logs_add(log_record) await self.service().package_logs_add(log_record)
raise HTTPNoContent raise HTTPNoContent

View File

@@ -66,7 +66,7 @@ class PackageView(StatusViewGuard, BaseView):
HTTPNoContent: on success response HTTPNoContent: on success response
""" """
package_base = self.request.match_info["package"] package_base = self.request.match_info["package"]
self.service().package_remove(package_base) await self.service().package_remove(package_base)
raise HTTPNoContent raise HTTPNoContent
@@ -94,7 +94,7 @@ class PackageView(StatusViewGuard, BaseView):
repository_id = self.repository_id() repository_id = self.repository_id()
try: try:
package, status = self.service(repository_id).package_get(package_base) package, status = await self.service(repository_id).package_get(package_base)
except UnknownPackageError: except UnknownPackageError:
raise HTTPNotFound(reason=f"Package {package_base} is unknown") raise HTTPNotFound(reason=f"Package {package_base} is unknown")
@@ -137,9 +137,9 @@ class PackageView(StatusViewGuard, BaseView):
try: try:
if package is None: if package is None:
self.service().package_status_update(package_base, status) await self.service().package_status_update(package_base, status)
else: else:
self.service().package_update(package, status) await self.service().package_update(package, status)
except UnknownPackageError: except UnknownPackageError:
raise HTTPBadRequest(reason=f"Package {package_base} is unknown, but no package body set") raise HTTPBadRequest(reason=f"Package {package_base} is unknown, but no package body set")

View File

@@ -67,7 +67,7 @@ class PackagesView(StatusViewGuard, BaseView):
stop = offset + limit if limit >= 0 else None stop = offset + limit if limit >= 0 else None
repository_id = self.repository_id() repository_id = self.repository_id()
packages = self.service(repository_id).packages packages = await self.service(repository_id).packages()
comparator: Callable[[tuple[Package, BuildStatus]], Comparable] = lambda items: items[0].base comparator: Callable[[tuple[Package, BuildStatus]], Comparable] = lambda items: items[0].base
response = [ response = [
@@ -95,6 +95,6 @@ class PackagesView(StatusViewGuard, BaseView):
Raises: Raises:
HTTPNoContent: on success response HTTPNoContent: on success response
""" """
self.service().load() await self.service().load()
raise HTTPNoContent raise HTTPNoContent

View File

@@ -57,7 +57,7 @@ class PatchView(StatusViewGuard, BaseView):
package_base = self.request.match_info["package"] package_base = self.request.match_info["package"]
variable = self.request.match_info["patch"] variable = self.request.match_info["patch"]
self.service().package_patches_remove(package_base, variable) await self.service().package_patches_remove(package_base, variable)
raise HTTPNoContent raise HTTPNoContent
@@ -83,7 +83,7 @@ class PatchView(StatusViewGuard, BaseView):
package_base = self.request.match_info["package"] package_base = self.request.match_info["package"]
variable = self.request.match_info["patch"] variable = self.request.match_info["patch"]
patches = self.service().package_patches_get(package_base, variable) patches = await self.service().package_patches_get(package_base, variable)
selected = next((patch for patch in patches if patch.key == variable), None) selected = next((patch for patch in patches if patch.key == variable), None)
if selected is None: if selected is None:

View File

@@ -57,7 +57,7 @@ class PatchesView(StatusViewGuard, BaseView):
Response: 200 with package patches on success Response: 200 with package patches on success
""" """
package_base = self.request.match_info["package"] package_base = self.request.match_info["package"]
patches = self.service().package_patches_get(package_base, None) patches = await self.service().package_patches_get(package_base, None)
response = [patch.view() for patch in patches] response = [patch.view() for patch in patches]
return self.json_response(response) return self.json_response(response)
@@ -88,6 +88,6 @@ class PatchesView(StatusViewGuard, BaseView):
except Exception as ex: except Exception as ex:
raise HTTPBadRequest(reason=str(ex)) raise HTTPBadRequest(reason=str(ex))
self.service().package_patches_update(package_base, PkgbuildPatch.parse(key, value)) await self.service().package_patches_update(package_base, PkgbuildPatch.parse(key, value))
raise HTTPNoContent raise HTTPNoContent

View File

@@ -59,6 +59,6 @@ class LogsView(BaseView):
except Exception as ex: except Exception as ex:
raise HTTPBadRequest(reason=str(ex)) raise HTTPBadRequest(reason=str(ex))
self.service().logs_rotate(keep_last_records) await self.service().logs_rotate(keep_last_records)
raise HTTPNoContent raise HTTPNoContent

View File

@@ -62,7 +62,7 @@ class StatusView(StatusViewGuard, BaseView):
Response: 200 with service status object Response: 200 with service status object
""" """
repository_id = self.repository_id() repository_id = self.repository_id()
packages = self.service(repository_id).packages packages = await self.service(repository_id).packages()
counters = Counters.from_packages(packages) counters = Counters.from_packages(packages)
stats = RepositoryStats.from_packages([package for package, _ in packages]) stats = RepositoryStats.from_packages([package for package, _ in packages])
@@ -101,6 +101,6 @@ class StatusView(StatusViewGuard, BaseView):
except Exception as ex: except Exception as ex:
raise HTTPBadRequest(reason=str(ex)) raise HTTPBadRequest(reason=str(ex))
self.service().status_update(status) await self.service().status_update(status)
raise HTTPNoContent raise HTTPNoContent

View File

@@ -67,7 +67,7 @@ class LogsView(StatusViewGuard, BaseView):
version = self.request.query.get("version", None) version = self.request.query.get("version", None)
process = self.request.query.get("process_id", None) process = self.request.query.get("process_id", None)
logs = self.service(package_base=package_base).package_logs_get(package_base, version, process, limit, offset) logs = await self.service(package_base=package_base).package_logs_get(package_base, version, process, limit, offset)
head = self.request.query.get("head", "false") head = self.request.query.get("head", "false")
# pylint: disable=protected-access # pylint: disable=protected-access

View File

@@ -33,6 +33,7 @@ from ahriman.core.exceptions import InitializeError
from ahriman.core.repository.package_info import PackageInfo from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.spawn import Spawn from ahriman.core.spawn import Spawn
from ahriman.core.status import Client from ahriman.core.status import Client
from ahriman.core.status.event_bus import EventBus
from ahriman.core.status.watcher import Watcher from ahriman.core.status.watcher import Watcher
from ahriman.models.repository_id import RepositoryId from ahriman.models.repository_id import RepositoryId
from ahriman.web.apispec.info import setup_apispec from ahriman.web.apispec.info import setup_apispec
@@ -108,7 +109,9 @@ def _create_watcher(path: Path, repository_id: RepositoryId) -> Watcher:
package_info.reporter = client package_info.reporter = client
package_info.repository_id = repository_id package_info.repository_id = repository_id
return Watcher(client, package_info) event_bus = EventBus(configuration.getint("web", "max_queue_size", fallback=0))
return Watcher(client, package_info, event_bus)
async def _on_shutdown(application: Application) -> None: async def _on_shutdown(application: Application) -> None:
@@ -118,6 +121,8 @@ async def _on_shutdown(application: Application) -> None:
Args: Args:
application(Application): web application instance application(Application): web application instance
""" """
for watcher in application[WatcherKey].values():
await watcher.shutdown()
application.logger.warning("server terminated") application.logger.warning("server terminated")
@@ -135,7 +140,7 @@ async def _on_startup(application: Application) -> None:
try: try:
for watcher in application[WatcherKey].values(): for watcher in application[WatcherKey].values():
watcher.load() await watcher.load()
except Exception: except Exception:
message = "could not load packages" message = "could not load packages"
application.logger.exception(message) application.logger.exception(message)

View File

@@ -19,6 +19,7 @@ from ahriman.core.repository import Repository
from ahriman.core.repository.package_info import PackageInfo from ahriman.core.repository.package_info import PackageInfo
from ahriman.core.spawn import Spawn from ahriman.core.spawn import Spawn
from ahriman.core.status import Client from ahriman.core.status import Client
from ahriman.core.status.event_bus import EventBus
from ahriman.core.status.watcher import Watcher from ahriman.core.status.watcher import Watcher
from ahriman.models.aur_package import AURPackage from ahriman.models.aur_package import AURPackage
from ahriman.models.build_status import BuildStatus, BuildStatusEnum from ahriman.models.build_status import BuildStatus, BuildStatusEnum
@@ -690,4 +691,5 @@ def watcher(local_client: Client) -> Watcher:
Watcher: package status watcher test instance Watcher: package status watcher test instance
""" """
package_info = PackageInfo() package_info = PackageInfo()
return Watcher(local_client, package_info) event_bus = EventBus(0)
return Watcher(local_client, package_info, event_bus)

View File

@@ -18,21 +18,21 @@ def test_packages(watcher: Watcher, package_ahriman: Package) -> None:
assert watcher.packages assert watcher.packages
def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: async def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must correctly load packages must correctly load packages
""" """
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_get", cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_get",
return_value=[(package_ahriman, BuildStatus())]) return_value=[(package_ahriman, BuildStatus())])
watcher.load() await watcher.load()
cache_mock.assert_called_once_with(None) cache_mock.assert_called_once_with(None)
package, status = watcher._known[package_ahriman.base] package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown assert status.status == BuildStatusEnum.Unknown
def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: async def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must correctly load packages with known statuses must correctly load packages with known statuses
""" """
@@ -40,146 +40,147 @@ def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFi
mocker.patch("ahriman.core.status.local_client.LocalClient.package_get", return_value=[(package_ahriman, status)]) mocker.patch("ahriman.core.status.local_client.LocalClient.package_get", return_value=[(package_ahriman, status)])
watcher._known = {package_ahriman.base: (package_ahriman, status)} watcher._known = {package_ahriman.base: (package_ahriman, status)}
watcher.load() await watcher.load()
_, status = watcher._known[package_ahriman.base] _, status = watcher._known[package_ahriman.base]
assert status.status == BuildStatusEnum.Success assert status.status == BuildStatusEnum.Success
def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: async def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must return package archives from package info must return package archives from package info
""" """
archives_mock = mocker.patch("ahriman.core.repository.package_info.PackageInfo.package_archives", archives_mock = mocker.patch("ahriman.core.repository.package_info.PackageInfo.package_archives",
return_value=[package_ahriman]) return_value=[package_ahriman])
result = watcher.package_archives(package_ahriman.base) result = await watcher.package_archives(package_ahriman.base)
assert result == [package_ahriman] assert result == [package_ahriman]
archives_mock.assert_called_once_with(package_ahriman.base) archives_mock.assert_called_once_with(package_ahriman.base)
def test_package_get(watcher: Watcher, package_ahriman: Package) -> None: async def test_package_get(watcher: Watcher, package_ahriman: Package) -> None:
""" """
must return package status must return package status
""" """
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
package, status = watcher.package_get(package_ahriman.base) package, status = await watcher.package_get(package_ahriman.base)
assert package == package_ahriman assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown assert status.status == BuildStatusEnum.Unknown
def test_package_get_failed(watcher: Watcher, package_ahriman: Package) -> None: async def test_package_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
""" """
must fail on unknown package must fail on unknown package
""" """
with pytest.raises(UnknownPackageError): with pytest.raises(UnknownPackageError):
watcher.package_get(package_ahriman.base) await watcher.package_get(package_ahriman.base)
def test_package_hold_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: async def test_package_hold_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must update package hold status must update package hold status
""" """
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_hold_update") cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_hold_update")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher.package_hold_update(package_ahriman.base, enabled=True) await watcher.package_hold_update(package_ahriman.base, enabled=True)
cache_mock.assert_called_once_with(package_ahriman.base, enabled=True) cache_mock.assert_called_once_with(package_ahriman.base, enabled=True)
_, status = watcher._known[package_ahriman.base] _, status = watcher._known[package_ahriman.base]
assert status.is_held is True assert status.is_held is True
def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None: async def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
""" """
must fail on unknown package hold update must fail on unknown package hold update
""" """
with pytest.raises(UnknownPackageError): with pytest.raises(UnknownPackageError):
watcher.package_hold_update(package_ahriman.base, enabled=True) await watcher.package_hold_update(package_ahriman.base, enabled=True)
def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: async def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must remove package base must remove package base
""" """
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove") cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher.package_remove(package_ahriman.base) await watcher.package_remove(package_ahriman.base)
assert not watcher._known assert not watcher._known
cache_mock.assert_called_once_with(package_ahriman.base) cache_mock.assert_called_once_with(package_ahriman.base)
def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: async def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must not fail on unknown base removal must not fail on unknown base removal
""" """
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove") cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
watcher.package_remove(package_ahriman.base) await watcher.package_remove(package_ahriman.base)
cache_mock.assert_called_once_with(package_ahriman.base) cache_mock.assert_called_once_with(package_ahriman.base)
def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: async def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must update package status only for known package must update package status only for known package
""" """
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update") cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success) await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
cache_mock.assert_called_once_with(package_ahriman.base, pytest.helpers.anyvar(int)) cache_mock.assert_called_once_with(package_ahriman.base, pytest.helpers.anyvar(int))
package, status = watcher._known[package_ahriman.base] package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman assert package == package_ahriman
assert status.status == BuildStatusEnum.Success assert status.status == BuildStatusEnum.Success
def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package, async def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
mocker: MockerFixture) -> None: mocker: MockerFixture) -> None:
""" """
must preserve hold status on package status update must preserve hold status on package status update
""" """
mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update") mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))} watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))}
watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success) await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
_, status = watcher._known[package_ahriman.base] _, status = watcher._known[package_ahriman.base]
assert status.is_held is True assert status.is_held is True
def test_package_status_update_unknown(watcher: Watcher, package_ahriman: Package) -> None: async def test_package_status_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
""" """
must fail on unknown package status update only must fail on unknown package status update only
""" """
with pytest.raises(UnknownPackageError): with pytest.raises(UnknownPackageError):
watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Unknown) await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Unknown)
def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: async def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must add package to cache must add package to cache
""" """
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_update") cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_update")
watcher.package_update(package_ahriman, BuildStatusEnum.Unknown) await watcher.package_update(package_ahriman, BuildStatusEnum.Unknown)
assert watcher.packages assert watcher.packages
cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int)) cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int))
def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
mocker: MockerFixture) -> None:
""" """
must preserve hold status on package update must preserve hold status on package update
""" """
mocker.patch("ahriman.core.status.local_client.LocalClient.package_update") mocker.patch("ahriman.core.status.local_client.LocalClient.package_update")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))} watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))}
watcher.package_update(package_ahriman, BuildStatusEnum.Success) await watcher.package_update(package_ahriman, BuildStatusEnum.Success)
_, status = watcher._known[package_ahriman.base] _, status = watcher._known[package_ahriman.base]
assert status.is_held is True assert status.is_held is True
def test_status_update(watcher: Watcher) -> None: async def test_status_update(watcher: Watcher) -> None:
""" """
must update service status must update service status
""" """
watcher.status_update(BuildStatusEnum.Success) await watcher.status_update(BuildStatusEnum.Success)
assert watcher.status.status == BuildStatusEnum.Success assert watcher.status.status == BuildStatusEnum.Success
@@ -204,18 +205,3 @@ def test_call_failed(watcher: Watcher, package_ahriman: Package) -> None:
""" """
with pytest.raises(UnknownPackageError): with pytest.raises(UnknownPackageError):
assert watcher(package_ahriman.base) assert watcher(package_ahriman.base)
def test_getattr(watcher: Watcher) -> None:
"""
must return client method call
"""
assert watcher.package_logs_remove
def test_getattr_unknown_method(watcher: Watcher) -> None:
"""
must raise AttributeError in case if no reporter attribute found
"""
with pytest.raises(AttributeError):
assert watcher.random_method