mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-03-24 02:13:38 +00:00
event bus implementation
This commit is contained in:
@@ -58,6 +58,7 @@ web = [
|
|||||||
"aiohttp",
|
"aiohttp",
|
||||||
"aiohttp_cors",
|
"aiohttp_cors",
|
||||||
"aiohttp_jinja2",
|
"aiohttp_jinja2",
|
||||||
|
"aiohttp_sse",
|
||||||
]
|
]
|
||||||
web-auth = [
|
web-auth = [
|
||||||
"ahriman[web]",
|
"ahriman[web]",
|
||||||
|
|||||||
95
src/ahriman/core/status/event_bus.py
Normal file
95
src/ahriman/core/status/event_bus.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2021-2026 ahriman team.
|
||||||
|
#
|
||||||
|
# This file is part of ahriman
|
||||||
|
# (see https://github.com/arcan1s/ahriman).
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from asyncio import Lock, Queue, QueueFull
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from ahriman.core.log import LazyLogging
|
||||||
|
|
||||||
|
|
||||||
|
SSEvent = tuple[str, dict[str, Any]]
|
||||||
|
|
||||||
|
|
||||||
|
class EventBus(LazyLogging):
|
||||||
|
"""
|
||||||
|
event bus implementation
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
max_size(int): maximum size of queue
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, max_size: int) -> None:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
max_size(int): maximum size of queue
|
||||||
|
"""
|
||||||
|
self.max_size = max_size
|
||||||
|
|
||||||
|
self._lock = Lock()
|
||||||
|
self._subscribers: dict[str, Queue[SSEvent | None]] = {}
|
||||||
|
|
||||||
|
async def broadcast(self, event_type: str, event: dict[str, Any]) -> None:
|
||||||
|
"""
|
||||||
|
broadcast event to all subscribers of ``repository_id``
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_type(str): event type
|
||||||
|
event(dict[str, Any]): event data
|
||||||
|
"""
|
||||||
|
async with self._lock:
|
||||||
|
for subscriber_id, queue in self._subscribers.items():
|
||||||
|
try:
|
||||||
|
queue.put_nowait((event_type, event))
|
||||||
|
except QueueFull:
|
||||||
|
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
|
||||||
|
|
||||||
|
async def shutdown(self) -> None:
|
||||||
|
"""
|
||||||
|
gracefully shutdown all subscribers
|
||||||
|
"""
|
||||||
|
async with self._lock:
|
||||||
|
for queue in self._subscribers.values():
|
||||||
|
queue.put_nowait(None)
|
||||||
|
|
||||||
|
async def subscribe(self) -> tuple[str, Queue[SSEvent | None]]:
|
||||||
|
"""
|
||||||
|
register new subscriber to ``repository_id``
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue
|
||||||
|
"""
|
||||||
|
subscriber_id = str(uuid.uuid4())
|
||||||
|
queue: Queue[SSEvent | None] = Queue(self.max_size)
|
||||||
|
|
||||||
|
async with self._lock:
|
||||||
|
self._subscribers[subscriber_id] = queue
|
||||||
|
|
||||||
|
return subscriber_id, queue
|
||||||
|
|
||||||
|
async def unsubscribe(self, subscriber_id: str) -> None:
|
||||||
|
"""
|
||||||
|
unsubscribe from events
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subscriber_id(str): subscriber unique identifier
|
||||||
|
"""
|
||||||
|
async with self._lock:
|
||||||
|
self._subscribers.pop(subscriber_id, None)
|
||||||
@@ -26,6 +26,7 @@ from ahriman.core.exceptions import UnknownPackageError
|
|||||||
from ahriman.core.log import LazyLogging
|
from ahriman.core.log import LazyLogging
|
||||||
from ahriman.core.repository.package_info import PackageInfo
|
from ahriman.core.repository.package_info import PackageInfo
|
||||||
from ahriman.core.status import Client
|
from ahriman.core.status import Client
|
||||||
|
from ahriman.core.status.event_bus import EventBus
|
||||||
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
|
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
|
||||||
from ahriman.models.changes import Changes
|
from ahriman.models.changes import Changes
|
||||||
from ahriman.models.dependencies import Dependencies
|
from ahriman.models.dependencies import Dependencies
|
||||||
@@ -41,11 +42,12 @@ class Watcher(LazyLogging):
|
|||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
client(Client): reporter instance
|
client(Client): reporter instance
|
||||||
|
event_bus(EventBus): event bus instance
|
||||||
package_info(PackageInfo): package info instance
|
package_info(PackageInfo): package info instance
|
||||||
status(BuildStatus): daemon status
|
status(BuildStatus): daemon status
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, client: Client, package_info: PackageInfo) -> None:
|
def __init__(self, client: Client, package_info: PackageInfo, event_bus: EventBus) -> None:
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
client(Client): reporter instance
|
client(Client): reporter instance
|
||||||
@@ -53,6 +55,7 @@ class Watcher(LazyLogging):
|
|||||||
"""
|
"""
|
||||||
self.client = client
|
self.client = client
|
||||||
self.package_info = package_info
|
self.package_info = package_info
|
||||||
|
self.event_bus = event_bus
|
||||||
|
|
||||||
self._lock = Lock()
|
self._lock = Lock()
|
||||||
self._known: dict[str, tuple[Package, BuildStatus]] = {}
|
self._known: dict[str, tuple[Package, BuildStatus]] = {}
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ from ahriman.core.exceptions import InitializeError
|
|||||||
from ahriman.core.repository.package_info import PackageInfo
|
from ahriman.core.repository.package_info import PackageInfo
|
||||||
from ahriman.core.spawn import Spawn
|
from ahriman.core.spawn import Spawn
|
||||||
from ahriman.core.status import Client
|
from ahriman.core.status import Client
|
||||||
|
from ahriman.core.status.event_bus import EventBus
|
||||||
from ahriman.core.status.watcher import Watcher
|
from ahriman.core.status.watcher import Watcher
|
||||||
from ahriman.models.repository_id import RepositoryId
|
from ahriman.models.repository_id import RepositoryId
|
||||||
from ahriman.web.apispec.info import setup_apispec
|
from ahriman.web.apispec.info import setup_apispec
|
||||||
@@ -108,7 +109,9 @@ def _create_watcher(path: Path, repository_id: RepositoryId) -> Watcher:
|
|||||||
package_info.reporter = client
|
package_info.reporter = client
|
||||||
package_info.repository_id = repository_id
|
package_info.repository_id = repository_id
|
||||||
|
|
||||||
return Watcher(client, package_info)
|
event_bus = EventBus(configuration.getint("web", "max_queue_size", fallback=0))
|
||||||
|
|
||||||
|
return Watcher(client, package_info, event_bus)
|
||||||
|
|
||||||
|
|
||||||
async def _on_shutdown(application: Application) -> None:
|
async def _on_shutdown(application: Application) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user