mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-03-24 02:13:38 +00:00
Compare commits
1 Commits
master
...
feature/ss
| Author | SHA1 | Date | |
|---|---|---|---|
| c2e3ef9405 |
@@ -58,6 +58,7 @@ web = [
|
||||
"aiohttp",
|
||||
"aiohttp_cors",
|
||||
"aiohttp_jinja2",
|
||||
"aiohttp_sse",
|
||||
]
|
||||
web-auth = [
|
||||
"ahriman[web]",
|
||||
|
||||
95
src/ahriman/core/status/event_bus.py
Normal file
95
src/ahriman/core/status/event_bus.py
Normal file
@@ -0,0 +1,95 @@
|
||||
#
|
||||
# Copyright (c) 2021-2026 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import uuid
|
||||
|
||||
from asyncio import Lock, Queue, QueueFull
|
||||
from typing import Any
|
||||
|
||||
from ahriman.core.log import LazyLogging
|
||||
|
||||
|
||||
SSEvent = tuple[str, dict[str, Any]]
|
||||
|
||||
|
||||
class EventBus(LazyLogging):
|
||||
"""
|
||||
event bus implementation
|
||||
|
||||
Attributes:
|
||||
max_size(int): maximum size of queue
|
||||
"""
|
||||
|
||||
def __init__(self, max_size: int) -> None:
|
||||
"""
|
||||
Args:
|
||||
max_size(int): maximum size of queue
|
||||
"""
|
||||
self.max_size = max_size
|
||||
|
||||
self._lock = Lock()
|
||||
self._subscribers: dict[str, Queue[SSEvent | None]] = {}
|
||||
|
||||
async def broadcast(self, event_type: str, event: dict[str, Any]) -> None:
|
||||
"""
|
||||
broadcast event to all subscribers of ``repository_id``
|
||||
|
||||
Args:
|
||||
event_type(str): event type
|
||||
event(dict[str, Any]): event data
|
||||
"""
|
||||
async with self._lock:
|
||||
for subscriber_id, queue in self._subscribers.items():
|
||||
try:
|
||||
queue.put_nowait((event_type, event))
|
||||
except QueueFull:
|
||||
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""
|
||||
gracefully shutdown all subscribers
|
||||
"""
|
||||
async with self._lock:
|
||||
for queue in self._subscribers.values():
|
||||
queue.put_nowait(None)
|
||||
|
||||
async def subscribe(self) -> tuple[str, Queue[SSEvent | None]]:
|
||||
"""
|
||||
register new subscriber to ``repository_id``
|
||||
|
||||
Returns:
|
||||
tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue
|
||||
"""
|
||||
subscriber_id = str(uuid.uuid4())
|
||||
queue: Queue[SSEvent | None] = Queue(self.max_size)
|
||||
|
||||
async with self._lock:
|
||||
self._subscribers[subscriber_id] = queue
|
||||
|
||||
return subscriber_id, queue
|
||||
|
||||
async def unsubscribe(self, subscriber_id: str) -> None:
|
||||
"""
|
||||
unsubscribe from events
|
||||
|
||||
Args:
|
||||
subscriber_id(str): subscriber unique identifier
|
||||
"""
|
||||
async with self._lock:
|
||||
self._subscribers.pop(subscriber_id, None)
|
||||
@@ -26,6 +26,7 @@ from ahriman.core.exceptions import UnknownPackageError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.repository.package_info import PackageInfo
|
||||
from ahriman.core.status import Client
|
||||
from ahriman.core.status.event_bus import EventBus
|
||||
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
|
||||
from ahriman.models.changes import Changes
|
||||
from ahriman.models.dependencies import Dependencies
|
||||
@@ -41,11 +42,12 @@ class Watcher(LazyLogging):
|
||||
|
||||
Attributes:
|
||||
client(Client): reporter instance
|
||||
event_bus(EventBus): event bus instance
|
||||
package_info(PackageInfo): package info instance
|
||||
status(BuildStatus): daemon status
|
||||
"""
|
||||
|
||||
def __init__(self, client: Client, package_info: PackageInfo) -> None:
|
||||
def __init__(self, client: Client, package_info: PackageInfo, event_bus: EventBus) -> None:
|
||||
"""
|
||||
Args:
|
||||
client(Client): reporter instance
|
||||
@@ -53,6 +55,7 @@ class Watcher(LazyLogging):
|
||||
"""
|
||||
self.client = client
|
||||
self.package_info = package_info
|
||||
self.event_bus = event_bus
|
||||
|
||||
self._lock = Lock()
|
||||
self._known: dict[str, tuple[Package, BuildStatus]] = {}
|
||||
|
||||
@@ -33,6 +33,7 @@ from ahriman.core.exceptions import InitializeError
|
||||
from ahriman.core.repository.package_info import PackageInfo
|
||||
from ahriman.core.spawn import Spawn
|
||||
from ahriman.core.status import Client
|
||||
from ahriman.core.status.event_bus import EventBus
|
||||
from ahriman.core.status.watcher import Watcher
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.web.apispec.info import setup_apispec
|
||||
@@ -108,7 +109,9 @@ def _create_watcher(path: Path, repository_id: RepositoryId) -> Watcher:
|
||||
package_info.reporter = client
|
||||
package_info.repository_id = repository_id
|
||||
|
||||
return Watcher(client, package_info)
|
||||
event_bus = EventBus(configuration.getint("web", "max_queue_size", fallback=0))
|
||||
|
||||
return Watcher(client, package_info, event_bus)
|
||||
|
||||
|
||||
async def _on_shutdown(application: Application) -> None:
|
||||
|
||||
Reference in New Issue
Block a user