Compare commits

..

2 Commits

Author SHA1 Message Date
arcanis 4165e87ce5 update tests 2026-03-29 22:16:54 +03:00
arcanis ced90dce10 event bus implementation 2026-03-29 10:56:07 +03:00
42 changed files with 450 additions and 734 deletions
-6
View File
@@ -26,10 +26,6 @@ jobs:
- uses: docker/setup-buildx-action@v3
- name: Set image date
id: args
run: echo "::set-output name=date::$(date -d yesterday +'%Y-%m-%d')"
- name: Login to docker hub
uses: docker/login-action@v3
with:
@@ -57,8 +53,6 @@ jobs:
- name: Build an image and push
uses: docker/build-push-action@v6
with:
build-args: |
BUILD_DATE=${{ steps.args.outputs.date }}
file: docker/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
+1 -1
View File
@@ -16,7 +16,7 @@ pacman -S --noconfirm --asdeps base-devel python-build python-flit python-instal
# optional dependencies
if [[ -z $MINIMAL_INSTALL ]]; then
# web server
pacman -S --noconfirm python-aioauth-client python-aiohttp python-aiohttp-apispec-git python-aiohttp-cors python-aiohttp-jinja2 python-aiohttp-security python-aiohttp-session python-aiohttp-sse-git python-cryptography python-jinja
pacman -S --noconfirm python-aioauth-client python-aiohttp python-aiohttp-apispec-git python-aiohttp-cors python-aiohttp-jinja2 python-aiohttp-security python-aiohttp-session python-cryptography python-jinja
# additional features
pacman -S --noconfirm gnupg ipython python-boto3 python-cerberus python-matplotlib rsync
fi
+2 -2
View File
@@ -1,9 +1,9 @@
version: 2
build:
os: ubuntu-lts-latest
os: ubuntu-20.04
tools:
python: "3.13"
python: "3.12"
apt_packages:
- graphviz
+3 -5
View File
@@ -1,15 +1,13 @@
# build image
FROM archlinux:base AS build
ARG BUILD_DATE
# install environment
## create build user
RUN useradd -m -d "/home/build" -s "/usr/bin/nologin" build
## extract container creation date and set mirror for this timestamp, set PKGEXT and refresh database next
RUN echo "Server = https://archive.archlinux.org/repos/${BUILD_DATE//-/\/}/\$repo/os/\$arch" > "/etc/pacman.d/mirrorlist" && \
pacman -Syyuu --noconfirm
RUN echo "Server = https://archive.archlinux.org/repos/$(stat -c "%y" "/var/lib/pacman" | cut -d " " -f 1 | sed "s,-,/,g")/\$repo/os/\$arch" > "/etc/pacman.d/mirrorlist" && \
pacman -Sy
## setup package cache
RUN runuser -u build -- mkdir "/tmp/pkg" && \
echo "PKGDEST=/tmp/pkg" >> "/etc/makepkg.conf" && \
@@ -110,7 +108,7 @@ RUN cp "/etc/pacman.d/mirrorlist" "/etc/pacman.d/mirrorlist.orig" && \
echo "Server = file:///var/cache/pacman/pkg" > "/etc/pacman.d/mirrorlist" && \
cp "/etc/pacman.conf" "/etc/pacman.conf.orig" && \
sed -i "s/SigLevel *=.*/SigLevel = Optional/g" "/etc/pacman.conf" && \
pacman -Syyuu --noconfirm
pacman -Sy
## install package and its optional dependencies
RUN pacman -S --noconfirm ahriman
RUN pacman -S --noconfirm --asdeps \
+1 -3
View File
@@ -7,10 +7,8 @@ for PACKAGE in "$@"; do
# clone the remote source
git clone https://aur.archlinux.org/"$PACKAGE".git "$BUILD_DIR"
cd "$BUILD_DIR"
# FIXME monkey patch PKGBUILD for python
sed -i 's/python -m build/python -m build --skip-dependency-check/g' "PKGBUILD"
# checkout to the image date
git checkout "$(git rev-list -1 --before="$BUILD_DATE" master)"
git checkout "$(git rev-list -1 --before="$(stat -c "%y" "/var/lib/pacman" | cut -d " " -f 1)" master)"
# build and install the package
makepkg --nocheck --noconfirm --install --rmdeps --syncdeps
cd /
-8
View File
@@ -12,14 +12,6 @@ ahriman.core.status.client module
:no-undoc-members:
:show-inheritance:
ahriman.core.status.event\_bus module
-------------------------------------
.. automodule:: ahriman.core.status.event_bus
:members:
:no-undoc-members:
:show-inheritance:
ahriman.core.status.local\_client module
----------------------------------------
-16
View File
@@ -92,14 +92,6 @@ ahriman.web.schemas.error\_schema module
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.event\_bus\_filter\_schema module
-----------------------------------------------------
.. automodule:: ahriman.web.schemas.event_bus_filter_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.event\_schema module
----------------------------------------
@@ -364,14 +356,6 @@ ahriman.web.schemas.search\_schema module
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.sse\_schema module
--------------------------------------
.. automodule:: ahriman.web.schemas.sse_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.status\_schema module
-----------------------------------------
-8
View File
@@ -4,14 +4,6 @@ ahriman.web.views.v1.auditlog package
Submodules
----------
ahriman.web.views.v1.auditlog.event\_bus module
-----------------------------------------------
.. automodule:: ahriman.web.views.v1.auditlog.event_bus
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.views.v1.auditlog.events module
-------------------------------------------
+1 -2
View File
@@ -188,7 +188,6 @@ Web server settings. This feature requires ``aiohttp`` libraries to be installed
* ``host`` - host to bind, string, optional.
* ``index_url`` - full URL of the repository index page, string, optional.
* ``max_body_size`` - max body size in bytes to be validated for archive upload, integer, optional. If not set, validation will be disabled.
* ``max_queue_size`` - max queue size for server sent event streams, integer, optional, default ``0``. If set to ``0``, queue is unlimited.
* ``port`` - port to bind, integer, optional.
* ``service_only`` - disable status routes (including logs), boolean, optional, default ``no``.
* ``static_path`` - path to directory with static files, string, required.
@@ -196,7 +195,7 @@ Web server settings. This feature requires ``aiohttp`` libraries to be installed
* ``templates`` - path to templates directories, space separated list of paths, required.
* ``unix_socket`` - path to the listening unix socket, string, optional. If set, server will create the socket on the specified address which can (and will) be used by application. Note, that unlike usual host/port configuration, unix socket allows to perform requests without authorization.
* ``unix_socket_unsafe`` - set unsafe (o+w) permissions to unix socket, boolean, optional, default ``yes``. This option is enabled by default, because it is supposed that unix socket is created in safe environment (only web service is supposed to be used in unsafe), but it can be disabled by configuration.
* ``wait_timeout`` - wait timeout in seconds, maximum amount of time to be waited before lock will be free, integer, optional. If set to ``0``, wait infinitely.
* ``wait_timeout`` - wait timeout in seconds, maximum amount of time to be waited before lock will be free, integer, optional.
``archive`` group
-----------------
-3
View File
@@ -7,13 +7,10 @@ aiohttp==3.11.18
# ahriman (pyproject.toml)
# aiohttp-cors
# aiohttp-jinja2
# aiohttp-sse
aiohttp-cors==0.8.1
# via ahriman (pyproject.toml)
aiohttp-jinja2==1.6
# via ahriman (pyproject.toml)
aiohttp-sse==2.2.0
# via ahriman (pyproject.toml)
aiosignal==1.3.2
# via aiohttp
alabaster==1.0.0
+1 -4
View File
@@ -21,7 +21,6 @@ import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import AppLayout from "components/layout/AppLayout";
import { AuthProvider } from "contexts/AuthProvider";
import { ClientProvider } from "contexts/ClientProvider";
import { EventStreamProvider } from "contexts/EventStreamProvider";
import { NotificationProvider } from "contexts/NotificationProvider";
import { RepositoryProvider } from "contexts/RepositoryProvider";
import { ThemeProvider } from "contexts/ThemeProvider";
@@ -43,9 +42,7 @@ export default function App(): React.JSX.Element {
<ClientProvider>
<AuthProvider>
<RepositoryProvider>
<EventStreamProvider>
<AppLayout />
</EventStreamProvider>
<AppLayout />
</RepositoryProvider>
</AuthProvider>
</ClientProvider>
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2021-2026 ahriman team.
*
* This file is part of ahriman
* (see https://github.com/arcan1s/ahriman).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import CheckIcon from "@mui/icons-material/Check";
import TimerIcon from "@mui/icons-material/Timer";
import TimerOffIcon from "@mui/icons-material/TimerOff";
import { IconButton, ListItemIcon, ListItemText, Menu, MenuItem, Tooltip } from "@mui/material";
import type { AutoRefreshInterval } from "models/AutoRefreshInterval";
import React, { useState } from "react";
interface AutoRefreshControlProps {
currentInterval: number;
intervals: AutoRefreshInterval[];
onIntervalChange: (interval: number) => void;
}
export default function AutoRefreshControl({
currentInterval,
intervals,
onIntervalChange,
}: AutoRefreshControlProps): React.JSX.Element | null {
const [anchorEl, setAnchorEl] = useState<HTMLElement | null>(null);
if (intervals.length === 0) {
return null;
}
const enabled = currentInterval > 0;
return <>
<Tooltip title="Auto-refresh">
<IconButton
aria-label="Auto-refresh"
color={enabled ? "primary" : "default"}
onClick={event => setAnchorEl(event.currentTarget)}
size="small"
>
{enabled ? <TimerIcon fontSize="small" /> : <TimerOffIcon fontSize="small" />}
</IconButton>
</Tooltip>
<Menu
anchorEl={anchorEl}
onClose={() => setAnchorEl(null)}
open={Boolean(anchorEl)}
>
<MenuItem
onClick={() => {
onIntervalChange(0);
setAnchorEl(null);
}}
selected={!enabled}
>
<ListItemIcon>
{!enabled && <CheckIcon fontSize="small" />}
</ListItemIcon>
<ListItemText>Off</ListItemText>
</MenuItem>
{intervals.map(interval =>
<MenuItem
key={interval.interval}
onClick={() => {
onIntervalChange(interval.interval);
setAnchorEl(null);
}}
selected={enabled && interval.interval === currentInterval}
>
<ListItemIcon>
{enabled && interval.interval === currentInterval && <CheckIcon fontSize="small" />}
</ListItemIcon>
<ListItemText>{interval.text}</ListItemText>
</MenuItem>,
)}
</Menu>
</>;
}
@@ -32,22 +32,27 @@ import PkgbuildTab from "components/package/PkgbuildTab";
import { type TabKey, tabs } from "components/package/TabKey";
import { QueryKeys } from "hooks/QueryKeys";
import { useAuth } from "hooks/useAuth";
import { useAutoRefresh } from "hooks/useAutoRefresh";
import { useClient } from "hooks/useClient";
import { useNotification } from "hooks/useNotification";
import { useRepository } from "hooks/useRepository";
import type { AutoRefreshInterval } from "models/AutoRefreshInterval";
import type { Dependencies } from "models/Dependencies";
import type { PackageStatus } from "models/PackageStatus";
import type { Patch } from "models/Patch";
import React, { useState } from "react";
import { StatusHeaderStyles } from "theme/StatusColors";
import { defaultInterval } from "utils";
interface PackageInfoDialogProps {
autoRefreshIntervals: AutoRefreshInterval[];
onClose: () => void;
open: boolean;
packageBase: string | null;
}
export default function PackageInfoDialog({
autoRefreshIntervals,
onClose,
open,
packageBase,
@@ -72,11 +77,14 @@ export default function PackageInfoDialog({
onClose();
};
const autoRefresh = useAutoRefresh("package-info-autoreload-button", defaultInterval(autoRefreshIntervals));
const { data: packageData } = useQuery<PackageStatus[]>({
enabled: open,
queryFn: localPackageBase && currentRepository ?
() => client.fetch.fetchPackage(localPackageBase, currentRepository) : skipToken,
queryKey: localPackageBase && currentRepository ? QueryKeys.package(localPackageBase, currentRepository) : ["packages"],
refetchInterval: autoRefresh.interval > 0 ? autoRefresh.interval : false,
});
const { data: dependencies } = useQuery<Dependencies>({
@@ -174,6 +182,7 @@ export default function PackageInfoDialog({
{activeTab === "logs" && localPackageBase && currentRepository &&
<BuildLogsTab
packageBase={localPackageBase}
refreshInterval={autoRefresh.interval}
repository={currentRepository}
/>
}
@@ -198,8 +207,11 @@ export default function PackageInfoDialog({
</DialogContent>
<PackageInfoActions
autoRefreshInterval={autoRefresh.interval}
autoRefreshIntervals={autoRefreshIntervals}
isAuthorized={isAuthorized}
isHeld={status?.is_held ?? false}
onAutoRefreshIntervalChange={autoRefresh.setInterval}
onHoldToggle={() => void handleHoldToggle()}
onRefreshDatabaseChange={setRefreshDatabase}
onRemove={() => void handleRemove()}
+1 -1
View File
@@ -69,7 +69,7 @@ export default function AppLayout(): React.JSX.Element {
</Tooltip>
</Box>
<PackageTable />
<PackageTable autoRefreshIntervals={info?.autorefresh_intervals ?? []} />
<Footer
docsEnabled={info?.docs_enabled ?? false}
@@ -23,7 +23,6 @@ import { keepPreviousData, skipToken, useQuery } from "@tanstack/react-query";
import CodeBlock from "components/common/CodeBlock";
import { QueryKeys } from "hooks/QueryKeys";
import { useAutoScroll } from "hooks/useAutoScroll";
import { useBuildLogStream } from "hooks/useBuildLogStream";
import { useClient } from "hooks/useClient";
import type { LogRecord } from "models/LogRecord";
import type { RepositoryId } from "models/RepositoryId";
@@ -38,6 +37,7 @@ interface Logs {
interface BuildLogsTabProps {
packageBase: string;
refreshInterval: number;
repository: RepositoryId;
}
@@ -50,10 +50,10 @@ function convertLogs(records: LogRecord[], filter?: (record: LogRecord) => boole
export default function BuildLogsTab({
packageBase,
refreshInterval,
repository,
}: BuildLogsTabProps): React.JSX.Element {
const client = useClient();
useBuildLogStream(packageBase, repository);
const [selectedVersionKey, setSelectedVersionKey] = useState<string | null>(null);
const [anchorEl, setAnchorEl] = useState<HTMLElement | null>(null);
@@ -61,6 +61,7 @@ export default function BuildLogsTab({
enabled: !!packageBase,
queryFn: () => client.fetch.fetchPackageLogs(packageBase, repository),
queryKey: QueryKeys.logs(packageBase, repository),
refetchInterval: refreshInterval > 0 ? refreshInterval : false,
});
// Build version selectors from all logs
@@ -116,6 +117,7 @@ export default function BuildLogsTab({
)
: skipToken,
queryKey: QueryKeys.logsVersion(packageBase, repository, activeVersion?.version ?? "", activeVersion?.processId ?? ""),
refetchInterval: refreshInterval > 0 ? refreshInterval : false,
});
// Derive displayed logs: prefer fresh polled data when available
@@ -98,7 +98,7 @@ export default function PackageDetailsGrid({ dependencies, pkg }: PackageDetails
<Grid size={{ md: 5, xs: 8 }}>
<Typography variant="body2">
{aurUrl &&
<Link href={aurUrl} rel="noopener noreferrer" target="_blank" underline="hover">{aurUrl}</Link>
<Link href={aurUrl} rel="noopener noreferrer" target="_blank" underline="hover">AUR link</Link>
}
</Typography>
</Grid>
@@ -22,11 +22,16 @@ import PauseCircleIcon from "@mui/icons-material/PauseCircle";
import PlayArrowIcon from "@mui/icons-material/PlayArrow";
import PlayCircleIcon from "@mui/icons-material/PlayCircle";
import { Button, Checkbox, DialogActions, FormControlLabel } from "@mui/material";
import AutoRefreshControl from "components/common/AutoRefreshControl";
import type { AutoRefreshInterval } from "models/AutoRefreshInterval";
import type React from "react";
interface PackageInfoActionsProps {
autoRefreshInterval: number;
autoRefreshIntervals: AutoRefreshInterval[];
isAuthorized: boolean;
isHeld: boolean;
onAutoRefreshIntervalChange: (interval: number) => void;
onHoldToggle: () => void;
onRefreshDatabaseChange: (checked: boolean) => void;
onRemove: () => void;
@@ -35,8 +40,11 @@ interface PackageInfoActionsProps {
}
export default function PackageInfoActions({
autoRefreshInterval,
autoRefreshIntervals,
isAuthorized,
isHeld,
onAutoRefreshIntervalChange,
onHoldToggle,
onRefreshDatabaseChange,
onRemove,
@@ -61,5 +69,10 @@ export default function PackageInfoActions({
</Button>
</>
}
<AutoRefreshControl
currentInterval={autoRefreshInterval}
intervals={autoRefreshIntervals}
onIntervalChange={onAutoRefreshIntervalChange}
/>
</DialogActions>;
}
+13 -2
View File
@@ -35,9 +35,14 @@ import PackageTableToolbar from "components/table/PackageTableToolbar";
import StatusCell from "components/table/StatusCell";
import { useDebounce } from "hooks/useDebounce";
import { usePackageTable } from "hooks/usePackageTable";
import type { AutoRefreshInterval } from "models/AutoRefreshInterval";
import type { PackageRow } from "models/PackageRow";
import React, { useMemo } from "react";
interface PackageTableProps {
autoRefreshIntervals: AutoRefreshInterval[];
}
function createListColumn(
field: keyof PackageRow,
headerName: string,
@@ -54,8 +59,8 @@ function createListColumn(
};
}
export default function PackageTable(): React.JSX.Element {
const table = usePackageTable();
export default function PackageTable({ autoRefreshIntervals }: PackageTableProps): React.JSX.Element {
const table = usePackageTable(autoRefreshIntervals);
const apiRef = useGridApiRef();
const debouncedSearch = useDebounce(table.searchText, 300);
@@ -113,6 +118,11 @@ export default function PackageTable(): React.JSX.Element {
onRemoveClick: () => void table.handleRemove(),
onUpdateClick: () => void table.handleUpdate(),
}}
autoRefresh={{
autoRefreshIntervals,
currentInterval: table.autoRefreshInterval,
onIntervalChange: table.onAutoRefreshIntervalChange,
}}
isAuthorized={table.isAuthorized}
hasSelection={table.selectionModel.length > 0}
onSearchChange={table.setSearchText}
@@ -165,6 +175,7 @@ export default function PackageTable(): React.JSX.Element {
<PackageRebuildDialog onClose={() => table.setDialogOpen(null)} open={table.dialogOpen === "rebuild"} />
<KeyImportDialog onClose={() => table.setDialogOpen(null)} open={table.dialogOpen === "keyImport"} />
<PackageInfoDialog
autoRefreshIntervals={autoRefreshIntervals}
onClose={() => table.setSelectedPackage(null)}
open={table.selectedPackage !== null}
packageBase={table.selectedPackage}
@@ -30,10 +30,18 @@ import ReplayIcon from "@mui/icons-material/Replay";
import SearchIcon from "@mui/icons-material/Search";
import VpnKeyIcon from "@mui/icons-material/VpnKey";
import { Box, Button, Divider, IconButton, InputAdornment, Menu, MenuItem, TextField, Tooltip } from "@mui/material";
import AutoRefreshControl from "components/common/AutoRefreshControl";
import type { AutoRefreshInterval } from "models/AutoRefreshInterval";
import type { BuildStatus } from "models/BuildStatus";
import React, { useState } from "react";
import { StatusColors } from "theme/StatusColors";
export interface AutoRefreshProps {
autoRefreshIntervals: AutoRefreshInterval[];
currentInterval: number;
onIntervalChange: (interval: number) => void;
}
export interface ToolbarActions {
onAddClick: () => void;
onDashboardClick: () => void;
@@ -48,6 +56,7 @@ export interface ToolbarActions {
interface PackageTableToolbarProps {
actions: ToolbarActions;
autoRefresh: AutoRefreshProps;
hasSelection: boolean;
isAuthorized: boolean;
onSearchChange: (text: string) => void;
@@ -57,6 +66,7 @@ interface PackageTableToolbarProps {
export default function PackageTableToolbar({
actions,
autoRefresh,
hasSelection,
isAuthorized,
onSearchChange,
@@ -133,6 +143,12 @@ export default function PackageTableToolbar({
reload
</Button>
<AutoRefreshControl
currentInterval={autoRefresh.currentInterval}
intervals={autoRefresh.autoRefreshIntervals}
onIntervalChange={autoRefresh.onIntervalChange}
/>
<Box sx={{ flexGrow: 1 }} />
<TextField
+49
View File
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2021-2026 ahriman team.
*
* This file is part of ahriman
* (see https://github.com/arcan1s/ahriman).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { useLocalStorage } from "hooks/useLocalStorage";
import { type Dispatch, type SetStateAction, useEffect, useState } from "react";
interface AutoRefreshResult {
interval: number;
setInterval: Dispatch<SetStateAction<number>>;
setPaused: Dispatch<SetStateAction<boolean>>;
}
export function useAutoRefresh(key: string, defaultInterval: number): AutoRefreshResult {
const storageKey = `ahriman-${key}`;
const [interval, setInterval] = useLocalStorage<number>(storageKey, defaultInterval);
const [paused, setPaused] = useState(false);
// Apply defaultInterval when it becomes available (e.g. after info endpoint loads)
// but only if the user hasn't explicitly set a preference
useEffect(() => {
if (defaultInterval > 0 && window.localStorage.getItem(storageKey) === null) {
setInterval(defaultInterval);
}
}, [storageKey, defaultInterval, setInterval]);
const effectiveInterval = paused ? 0 : interval;
return {
interval: effectiveInterval,
setInterval,
setPaused,
};
}
-70
View File
@@ -1,70 +0,0 @@
/*
* Copyright (c) 2021-2026 ahriman team.
*
* This file is part of ahriman
* (see https://github.com/arcan1s/ahriman).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { useQueryClient } from "@tanstack/react-query";
import { buildEventStreamUrl } from "hooks/useEventStream";
import type { LogRecord } from "models/LogRecord";
import type { RepositoryId } from "models/RepositoryId";
import { useEffect } from "react";
interface BuildLogEvent {
created: number;
message: string;
process_id: string;
version: string;
}
function appendLogRecord(existing: LogRecord[] | undefined, record: LogRecord): LogRecord[] {
return [...existing ?? [], record];
}
export function useBuildLogStream(packageBase: string, repository: RepositoryId): void {
const queryClient = useQueryClient();
useEffect(() => {
const source = new EventSource(buildEventStreamUrl(repository, ["build-log"], packageBase));
source.addEventListener("build-log", (event: MessageEvent<string>) => {
const data = JSON.parse(event.data) as BuildLogEvent;
const record: LogRecord = {
created: data.created,
message: data.message,
process_id: data.process_id,
version: data.version,
};
// Append to the all-logs cache
queryClient.setQueryData<LogRecord[]>(
["logs", repository.key, packageBase],
existing => appendLogRecord(existing, record),
);
// Append to the version-specific cache
queryClient.setQueryData<LogRecord[]>(
["logs", repository.key, packageBase, record.version, record.process_id],
existing => appendLogRecord(existing, record),
);
});
return () => {
source.close();
};
}, [queryClient, packageBase, repository]);
}
-101
View File
@@ -1,101 +0,0 @@
/*
* Copyright (c) 2021-2026 ahriman team.
*
* This file is part of ahriman
* (see https://github.com/arcan1s/ahriman).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import type { QueryClient } from "@tanstack/react-query";
import type { RepositoryId } from "models/RepositoryId";
import { useEffect } from "react";
const GLOBAL_EVENT_TYPES = [
"package-held",
"package-outdated",
"package-removed",
"package-status-changed",
"package-update-failed",
"package-updated",
"service-status-changed",
] as const;
function invalidateForEvent(
queryClient: QueryClient,
repositoryKey: string,
eventType: string,
objectId?: string,
): void {
switch (eventType) {
case "package-status-changed":
case "package-updated":
case "package-removed":
case "package-held":
void queryClient.invalidateQueries({ queryKey: ["packages", repositoryKey] });
void queryClient.invalidateQueries({ queryKey: ["status", repositoryKey] });
if (objectId) {
void queryClient.invalidateQueries({ queryKey: ["packages", repositoryKey, objectId] });
void queryClient.invalidateQueries({ queryKey: ["events", repositoryKey, objectId] });
}
break;
case "service-status-changed":
void queryClient.invalidateQueries({ queryKey: ["status", repositoryKey] });
break;
case "package-outdated":
case "package-update-failed":
void queryClient.invalidateQueries({ queryKey: ["packages", repositoryKey] });
if (objectId) {
void queryClient.invalidateQueries({ queryKey: ["packages", repositoryKey, objectId] });
}
break;
}
}
export function buildEventStreamUrl(
repository: RepositoryId,
events?: readonly string[],
objectId?: string,
): string {
const params = new URLSearchParams(repository.toQuery());
if (events) {
for (const event of events) {
params.append("event", event);
}
}
if (objectId) {
params.set("object_id", objectId);
}
return `/api/v1/events/stream?${params.toString()}`;
}
export function useEventStream(queryClient: QueryClient, repository: RepositoryId | null): void {
useEffect(() => {
if (!repository) {
return;
}
const source = new EventSource(buildEventStreamUrl(repository, GLOBAL_EVENT_TYPES));
for (const eventType of GLOBAL_EVENT_TYPES) {
source.addEventListener(eventType, (event: MessageEvent<string>) => {
const data = JSON.parse(event.data) as { object_id?: string };
invalidateForEvent(queryClient, repository.key, eventType, data.object_id ?? undefined);
});
}
return () => {
source.close();
};
}, [queryClient, repository]);
}
+10 -1
View File
@@ -20,37 +20,46 @@
import { skipToken, useQuery } from "@tanstack/react-query";
import { QueryKeys } from "hooks/QueryKeys";
import { useAuth } from "hooks/useAuth";
import { useAutoRefresh } from "hooks/useAutoRefresh";
import { useClient } from "hooks/useClient";
import { useRepository } from "hooks/useRepository";
import type { AutoRefreshInterval } from "models/AutoRefreshInterval";
import type { BuildStatus } from "models/BuildStatus";
import { PackageRow } from "models/PackageRow";
import { useMemo } from "react";
import { defaultInterval } from "utils";
export interface UsePackageDataResult {
autoRefresh: ReturnType<typeof useAutoRefresh>;
isAuthorized: boolean;
isLoading: boolean;
rows: PackageRow[];
status: BuildStatus | undefined;
}
export function usePackageData(): UsePackageDataResult {
export function usePackageData(autoRefreshIntervals: AutoRefreshInterval[]): UsePackageDataResult {
const client = useClient();
const { currentRepository } = useRepository();
const { isAuthorized } = useAuth();
const autoRefresh = useAutoRefresh("table-autoreload-button", defaultInterval(autoRefreshIntervals));
const { data: packages = [], isLoading } = useQuery({
queryFn: currentRepository ? () => client.fetch.fetchPackages(currentRepository) : skipToken,
queryKey: currentRepository ? QueryKeys.packages(currentRepository) : ["packages"],
refetchInterval: autoRefresh.interval > 0 ? autoRefresh.interval : false,
});
const { data: status } = useQuery({
queryFn: currentRepository ? () => client.fetch.fetchServerStatus(currentRepository) : skipToken,
queryKey: currentRepository ? QueryKeys.status(currentRepository) : ["status"],
refetchInterval: autoRefresh.interval > 0 ? autoRefresh.interval : false,
});
const rows = useMemo(() => packages.map(descriptor => new PackageRow(descriptor)), [packages]);
return {
autoRefresh,
isLoading,
isAuthorized,
rows,
+15 -2
View File
@@ -21,10 +21,13 @@ import type { GridFilterModel } from "@mui/x-data-grid";
import { usePackageActions } from "hooks/usePackageActions";
import { usePackageData } from "hooks/usePackageData";
import { useTableState } from "hooks/useTableState";
import type { AutoRefreshInterval } from "models/AutoRefreshInterval";
import type { BuildStatus } from "models/BuildStatus";
import type { PackageRow } from "models/PackageRow";
import { useEffect } from "react";
export interface UsePackageTableResult {
autoRefreshInterval: number;
columnVisibility: Record<string, boolean>;
dialogOpen: "dashboard" | "add" | "rebuild" | "keyImport" | null;
filterModel: GridFilterModel;
@@ -34,6 +37,7 @@ export interface UsePackageTableResult {
handleUpdate: () => Promise<void>;
isAuthorized: boolean;
isLoading: boolean;
onAutoRefreshIntervalChange: (interval: number) => void;
paginationModel: { page: number; pageSize: number };
rows: PackageRow[];
searchText: string;
@@ -49,14 +53,23 @@ export interface UsePackageTableResult {
status: BuildStatus | undefined;
}
export function usePackageTable(): UsePackageTableResult {
const { rows, isLoading, isAuthorized, status } = usePackageData();
export function usePackageTable(autoRefreshIntervals: AutoRefreshInterval[]): UsePackageTableResult {
const { rows, isLoading, isAuthorized, status, autoRefresh } = usePackageData(autoRefreshIntervals);
const tableState = useTableState();
const actions = usePackageActions(tableState.selectionModel, tableState.setSelectionModel);
// Pause auto-refresh when dialog is open
const isDialogOpen = tableState.dialogOpen !== null || tableState.selectedPackage !== null;
const setPaused = autoRefresh.setPaused;
useEffect(() => {
setPaused(isDialogOpen);
}, [isDialogOpen, setPaused]);
return {
autoRefreshInterval: autoRefresh.interval,
isLoading,
isAuthorized,
onAutoRefreshIntervalChange: autoRefresh.setInterval,
rows,
status,
...actions,
@@ -17,16 +17,8 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { useQueryClient } from "@tanstack/react-query";
import { useEventStream } from "hooks/useEventStream";
import { useRepository } from "hooks/useRepository";
import type { ReactNode } from "react";
export function EventStreamProvider({ children }: { children: ReactNode }): ReactNode {
const queryClient = useQueryClient();
const { currentRepository } = useRepository();
useEventStream(queryClient, currentRepository);
return children;
export interface AutoRefreshInterval {
interval: number;
is_active: boolean;
text: string;
}
+2
View File
@@ -18,10 +18,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import type { AuthInfo } from "models/AuthInfo";
import type { AutoRefreshInterval } from "models/AutoRefreshInterval";
import type { RepositoryId } from "models/RepositoryId";
export interface InfoResponse {
auth: AuthInfo;
autorefresh_intervals: AutoRefreshInterval[];
docs_enabled: boolean;
index_url?: string;
repositories: RepositoryId[];
+6
View File
@@ -17,6 +17,8 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import type { AutoRefreshInterval } from "models/AutoRefreshInterval";
export const DETAIL_TABLE_PROPS = {
density: "compact" as const,
disableColumnSorting: true,
@@ -25,6 +27,10 @@ export const DETAIL_TABLE_PROPS = {
sx: { height: 400, mt: 1 },
};
export function defaultInterval(intervals: AutoRefreshInterval[]): number {
return intervals.find(interval => interval.is_active)?.interval ?? 0;
}
declare global {
interface Array<T> {
unique(): T[];
@@ -46,8 +46,6 @@ host = 127.0.0.1
;index_url =
; Max file size in bytes which can be uploaded to the server. Requires ${web:enable_archive_upload} to be enabled.
;max_body_size =
; Max event queue size used for server sent event endpoints (0 is infinite)
;max_queue_size = 0
; Port to listen. Must be set, if the web service is enabled.
;port =
; Disable status (e.g. package status, logs, etc) endpoints. Useful for build only modes.
@@ -80,7 +80,7 @@ class Configuration(configparser.RawConfigParser):
"""
configparser.RawConfigParser.__init__(
self,
dict_type=ConfigurationMultiDict if allow_multi_key else dict,
dict_type=ConfigurationMultiDict if allow_multi_key else dict, # type: ignore[arg-type]
allow_no_value=allow_no_value,
strict=False,
empty_lines_in_values=not allow_multi_key,
@@ -150,6 +150,6 @@ class ShellTemplate(Template):
break
kwargs.update(mapping)
kwargs.update(dict(generator(kwargs)))
substituted = dict(generator(kwargs))
return self.safe_substitute(kwargs)
return self.safe_substitute(kwargs | substituted)
+23 -45
View File
@@ -19,8 +19,7 @@
#
import uuid
from asyncio import Lock, Queue, QueueFull, QueueShutDown
from dataclasses import dataclass
from asyncio import Lock, Queue, QueueFull
from typing import Any
from ahriman.core.log import LazyLogging
@@ -30,22 +29,6 @@ from ahriman.models.event import EventType
SSEvent = tuple[str, dict[str, Any]]
@dataclass(frozen=True)
class _Subscription:
"""
internal event bus subscription record
Attributes:
topics(list[EventType] | None): event type filter, ``None`` means all
object_id(str | None): object identifier filter, ``None`` means all
queue(Queue[SSEvent]): per-subscriber event queue
"""
topics: list[EventType] | None
object_id: str | None
queue: Queue[SSEvent]
class EventBus(LazyLogging):
"""
event bus implementation
@@ -62,7 +45,7 @@ class EventBus(LazyLogging):
self.max_size = max_size
self._lock = Lock()
self._subscribers: dict[str, _Subscription] = {}
self._subscribers: dict[str, tuple[list[EventType] | None, Queue[SSEvent | None]]] = {}
async def broadcast(self, event_type: EventType, object_id: str | None, **kwargs: Any) -> None:
"""
@@ -77,48 +60,42 @@ class EventBus(LazyLogging):
event.update(kwargs)
async with self._lock:
snapshot = list(self._subscribers.items())
for subscriber_id, subscription in snapshot:
if subscription.topics is not None and event_type not in subscription.topics:
continue
if subscription.object_id is not None and object_id != subscription.object_id:
continue
try:
subscription.queue.put_nowait((event_type, event))
except QueueFull:
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
except QueueShutDown:
pass
for subscriber_id, (topics, queue) in self._subscribers.items():
if topics is not None and event_type not in topics:
continue
try:
queue.put_nowait((event_type, event))
except QueueFull:
self.logger.warning("discard message to slow subscriber %s", subscriber_id)
async def shutdown(self) -> None:
"""
gracefully shutdown all subscribers
"""
async with self._lock:
for subscription in self._subscribers.values():
subscription.queue.shutdown()
for _, queue in self._subscribers.values():
try:
queue.put_nowait(None)
except QueueFull:
pass
queue.shutdown()
async def subscribe(self, topics: list[EventType] | None = None,
object_id: str | None = None) -> tuple[str, Queue[SSEvent]]:
async def subscribe(self, topics: list[EventType] | None = None) -> tuple[str, Queue[SSEvent | None]]:
"""
register new subscriber
Args:
topics(list[EventType] | None, optional): list of event types to filter by. If ``None`` is set,
all events will be delivered (Default value = None)
object_id(str | None, optional): object identifier to filter by. If ``None`` is set,
events for all objects will be delivered (Default value = None)
Returns:
tuple[str, Queue[SSEvent]]: subscriber identifier and associated queue
tuple[str, Queue[SSEvent | None]]: subscriber identifier and associated queue
"""
subscriber_id = str(uuid.uuid4())
queue: Queue[SSEvent] = Queue(self.max_size)
queue: Queue[SSEvent | None] = Queue(self.max_size)
async with self._lock:
self._subscribers[subscriber_id] = _Subscription(topics=topics, object_id=object_id, queue=queue)
self._subscribers[subscriber_id] = (topics, queue)
return subscriber_id, queue
@@ -130,6 +107,7 @@ class EventBus(LazyLogging):
subscriber_id(str): subscriber unique identifier
"""
async with self._lock:
subscription = self._subscribers.pop(subscriber_id, None)
if subscription is not None:
subscription.queue.shutdown()
result = self._subscribers.pop(subscriber_id, None)
if result is not None:
_, queue = result
queue.shutdown()
-2
View File
@@ -28,7 +28,6 @@ from ahriman.web.schemas.configuration_schema import ConfigurationSchema
from ahriman.web.schemas.counters_schema import CountersSchema
from ahriman.web.schemas.dependencies_schema import DependenciesSchema
from ahriman.web.schemas.error_schema import ErrorSchema
from ahriman.web.schemas.event_bus_filter_schema import EventBusFilterSchema
from ahriman.web.schemas.event_schema import EventSchema
from ahriman.web.schemas.event_search_schema import EventSearchSchema
from ahriman.web.schemas.file_schema import FileSchema
@@ -62,7 +61,6 @@ from ahriman.web.schemas.repository_id_schema import RepositoryIdSchema
from ahriman.web.schemas.repository_stats_schema import RepositoryStatsSchema
from ahriman.web.schemas.rollback_schema import RollbackSchema
from ahriman.web.schemas.search_schema import SearchSchema
from ahriman.web.schemas.sse_schema import SSESchema
from ahriman.web.schemas.status_schema import StatusSchema
from ahriman.web.schemas.update_flags_schema import UpdateFlagsSchema
from ahriman.web.schemas.worker_schema import WorkerSchema
@@ -1,37 +0,0 @@
#
# Copyright (c) 2021-2026 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.models.event import EventType
from ahriman.web.apispec import fields
from ahriman.web.schemas.repository_id_schema import RepositoryIdSchema
class EventBusFilterSchema(RepositoryIdSchema):
"""
request event bus filter schema
"""
event = fields.List(fields.String(), metadata={
"description": "Event type filter",
"example": [EventType.PackageUpdated],
})
object_id = fields.String(metadata={
"description": "Object identifier filter",
"example": "ahriman",
})
-35
View File
@@ -1,35 +0,0 @@
#
# Copyright (c) 2021-2026 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.models.event import EventType
from ahriman.web.apispec import Schema, fields
class SSESchema(Schema):
"""
response SSE schema
"""
event = fields.String(required=True, metadata={
"description": "Event type",
"example": EventType.PackageUpdated,
})
data = fields.Dict(keys=fields.String(), values=fields.Raw(), metadata={
"description": "Event data",
})
+2 -2
View File
@@ -209,8 +209,8 @@ class BaseView(View, CorsViewMixin):
HTTPBadRequest: if supplied parameters are invalid
"""
try:
limit = int(self.request.query.get("limit", -1))
offset = int(self.request.query.get("offset", 0))
limit = int(self.request.query.get("limit", default=-1))
offset = int(self.request.query.get("offset", default=0))
except ValueError as ex:
raise HTTPBadRequest(reason=str(ex))
+19 -59
View File
@@ -19,7 +19,7 @@
#
import json
from aiohttp.web import HTTPBadRequest, Request, StreamResponse
from aiohttp.web import StreamResponse
from aiohttp_sse import EventSourceResponse, sse_response
from asyncio import Queue, QueueShutDown, wait_for
from typing import ClassVar
@@ -28,70 +28,39 @@ from ahriman.core.status.event_bus import SSEvent
from ahriman.models.event import EventType
from ahriman.models.user_access import UserAccess
from ahriman.web.apispec.decorators import apidocs
from ahriman.web.schemas import EventBusFilterSchema, SSESchema
from ahriman.web.schemas import EventSchema, RepositoryIdSchema
from ahriman.web.views.base import BaseView
class EventBusView(BaseView):
"""
event bus SSE view
Attributes:
GET_PERMISSION(UserAccess): (class attribute) get permissions of self
"""
READ_EVENTS: ClassVar[set[EventType]] = {
EventType.PackageHeld,
EventType.PackageOutdated,
EventType.PackageRemoved,
EventType.PackageStatusChanged,
EventType.PackageUpdateFailed,
EventType.PackageUpdated,
EventType.ServiceStatusChanged,
}
GET_PERMISSION: ClassVar[UserAccess] = UserAccess.Full
ROUTES = ["/api/v1/events/stream"]
@classmethod
async def get_permission(cls, request: Request) -> UserAccess:
"""
retrieve user permission from the request
Args:
request(Request): request object
Returns:
UserAccess: extracted permission
"""
if request.method.upper() not in ("GET", "HEAD"):
return await BaseView.get_permission(request)
permission = UserAccess.Full
event_filter = request.query.getall("event", []) if request.query is not None else []
if event_filter:
try:
topics = {EventType(event) for event in event_filter}
except ValueError:
pass
else:
if topics.issubset(cls.READ_EVENTS):
permission = UserAccess.Read
return permission
@staticmethod
async def _run(response: EventSourceResponse, queue: Queue[SSEvent]) -> None:
async def _run(response: EventSourceResponse, queue: Queue[SSEvent | None]) -> None:
"""
read events from queue and send them to the client
Args:
response(EventSourceResponse): SSE response instance
queue(Queue[SSEvent]): subscriber queue
queue(Queue[SSEvent | None]): subscriber queue
"""
while response.is_connected():
try:
event_type, data = await wait_for(queue.get(), timeout=response.ping_interval)
message = await wait_for(queue.get(), timeout=response.ping_interval)
except TimeoutError:
continue
except QueueShutDown:
break
if message is None:
break # terminate queue on sentinel event
event_type, data = message
await response.send(json.dumps(data), event=event_type)
@@ -99,11 +68,10 @@ class EventBusView(BaseView):
tags=["Audit log"],
summary="Live updates",
description="Stream live updates via SSE",
permission=UserAccess.Full,
error_400_enabled=True,
permission=GET_PERMISSION,
error_404_description="Repository is unknown",
schema=SSESchema(many=True),
query_schema=EventBusFilterSchema,
schema=EventSchema(many=True),
query_schema=RepositoryIdSchema,
)
async def get(self) -> StreamResponse:
"""
@@ -111,25 +79,17 @@ class EventBusView(BaseView):
Returns:
StreamResponse: 200 with streaming updates
Raises:
HTTPBadRequest: if invalid event type is supplied
"""
try:
topics = [EventType(event) for event in self.request.query.getall("event", [])] or None
except ValueError as ex:
raise HTTPBadRequest(reason=str(ex))
object_id = self.request.query.get("object_id")
event_bus = self.service().event_bus
topics = [EventType(event) for event in self.request.query.getall("event", [])] or None
async with sse_response(self.request) as response:
subscription_id, queue = await event_bus.subscribe(topics, object_id=object_id)
subscription_id, queue = await self.service().event_bus.subscribe(topics)
try:
await self._run(response, queue)
except (ConnectionResetError, QueueShutDown):
pass
finally:
await event_bus.unsubscribe(subscription_id)
await self.service().event_bus.unsubscribe(subscription_id)
return response
-12
View File
@@ -2,7 +2,6 @@ import pytest
from ahriman.core.configuration import Configuration
from ahriman.core.status import Client
from ahriman.core.status.event_bus import EventBus
from ahriman.core.status.web_client import WebClient
@@ -17,17 +16,6 @@ def client() -> Client:
return Client()
@pytest.fixture
def event_bus() -> EventBus:
"""
fixture for event bus
Returns:
EventBus: event bus test instance
"""
return EventBus(0)
@pytest.fixture
def web_client(configuration: Configuration) -> WebClient:
"""
+47 -71
View File
@@ -1,144 +1,120 @@
import pytest
from asyncio import QueueShutDown
from asyncio import QueueFull
from ahriman.core.status.event_bus import EventBus
from ahriman.models.event import EventType
from ahriman.models.package import Package
async def test_broadcast(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_broadcast() -> None:
"""
must broadcast event to all subscribers
must broadcast event to all general subscribers
"""
event_bus = EventBus(0)
_, queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base, version=package_ahriman.version)
await event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success")
message = queue.get_nowait()
assert message == (
EventType.PackageUpdated,
{"object_id": package_ahriman.base, "version": package_ahriman.version},
)
assert message == (EventType.PackageUpdated, {"object_id": "ahriman", "status": "success"})
async def test_broadcast_with_topics(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_broadcast_with_topics() -> None:
"""
must broadcast event to subscribers with matching topics
must deliver event only to subscribers with matching topics
"""
_, queue = await event_bus.subscribe([EventType.PackageUpdated])
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
assert not queue.empty()
event_bus = EventBus(0)
_, filtered_queue = await event_bus.subscribe([EventType.PackageUpdated])
_, wildcard_queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
assert not filtered_queue.empty()
assert not wildcard_queue.empty()
async def test_broadcast_topic_isolation(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_broadcast_topic_isolation() -> None:
"""
must not broadcast event to subscribers with non-matching topics
must not deliver event to subscribers with non-matching topics
"""
event_bus = EventBus(0)
_, queue = await event_bus.subscribe([EventType.BuildLog])
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
assert queue.empty()
async def test_broadcast_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_broadcast_queue_full() -> None:
"""
must discard message to slow subscriber
"""
event_bus.max_size = 1
event_bus = EventBus(1)
_, queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
await event_bus.broadcast(EventType.PackageRemoved, package_ahriman.base)
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
await event_bus.broadcast(EventType.PackageRemoved, "ahriman")
assert queue.qsize() == 1
async def test_broadcast_queue_shutdown(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_shutdown() -> None:
"""
must skip subscriber whose queue was shutdown concurrently
"""
_, queue = await event_bus.subscribe()
queue.shutdown()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
async def test_shutdown(event_bus: EventBus) -> None:
"""
must shutdown all subscriber queues on shutdown
must send sentinel to all subscribers on shutdown
"""
event_bus = EventBus(0)
_, queue = await event_bus.subscribe()
await event_bus.shutdown()
with pytest.raises(QueueShutDown):
queue.get_nowait()
message = queue.get_nowait()
assert message is None
async def test_shutdown_queue_full(event_bus: EventBus, package_ahriman: Package) -> None:
async def test_shutdown_queue_full() -> None:
"""
must handle shutdown when queue is full
"""
event_bus.max_size = 1
event_bus = EventBus(1)
_, queue = await event_bus.subscribe()
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
await event_bus.broadcast(EventType.PackageUpdated, "ahriman")
await event_bus.shutdown()
# sentinel was not delivered but shutdown still completed
message = queue.get_nowait()
assert message is not None
async def test_subscribe(event_bus: EventBus) -> None:
async def test_subscribe() -> None:
"""
must register new subscriber
"""
event_bus = EventBus(0)
subscriber_id, queue = await event_bus.subscribe()
assert subscriber_id
assert queue.empty()
assert subscriber_id in event_bus._subscribers
async def test_broadcast_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must broadcast event to subscribers with matching object_id
"""
_, queue = await event_bus.subscribe(object_id=package_ahriman.base)
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
assert not queue.empty()
async def test_broadcast_object_id_isolation(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must not broadcast event to subscribers with non-matching object_id
"""
_, queue = await event_bus.subscribe(object_id="other-package")
await event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base)
assert queue.empty()
async def test_subscribe_with_topics(event_bus: EventBus) -> None:
async def test_subscribe_with_topics() -> None:
"""
must register subscriber with topic filter
"""
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog])
assert event_bus._subscribers[subscriber_id].topics == [EventType.BuildLog]
event_bus = EventBus(0)
subscriber_id, _ = await event_bus.subscribe([EventType.BuildLog, EventType.PackageUpdated])
topics, _ = event_bus._subscribers[subscriber_id]
assert topics == [EventType.BuildLog, EventType.PackageUpdated]
async def test_subscribe_with_object_id(event_bus: EventBus, package_ahriman: Package) -> None:
"""
must register subscriber with object_id filter
"""
subscriber_id, _ = await event_bus.subscribe(object_id=package_ahriman.base)
assert event_bus._subscribers[subscriber_id].object_id == package_ahriman.base
async def test_unsubscribe(event_bus: EventBus) -> None:
async def test_unsubscribe() -> None:
"""
must remove subscriber
"""
event_bus = EventBus(0)
subscriber_id, _ = await event_bus.subscribe()
await event_bus.unsubscribe(subscriber_id)
assert subscriber_id not in event_bus._subscribers
async def test_unsubscribe_unknown(event_bus: EventBus) -> None:
async def test_unsubscribe_unknown() -> None:
"""
must not fail on unknown subscriber removal
"""
event_bus = EventBus(0)
await event_bus.unsubscribe("unknown")
+54 -66
View File
@@ -1,40 +1,28 @@
import pytest
from pytest_mock import MockerFixture
from unittest.mock import MagicMock
from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.status.watcher import Watcher
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
from ahriman.models.changes import Changes
from ahriman.models.dependencies import Dependencies
from ahriman.models.event import Event, EventType
from ahriman.models.event import Event
from ahriman.models.log_record import LogRecord
from ahriman.models.log_record_id import LogRecordId
from ahriman.models.package import Package
from ahriman.models.pkgbuild_patch import PkgbuildPatch
async def test_event_add(watcher: Watcher, mocker: MockerFixture) -> None:
async def test_packages(watcher: Watcher, package_ahriman: Package) -> None:
"""
must create new event
must return list of available packages
"""
event = Event("event", "object")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_add")
assert not await watcher.packages()
await watcher.event_add(event)
cache_mock.assert_called_once_with(event)
async def test_event_get(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must retrieve events
"""
event = Event("event", "object")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_get", return_value=[event])
result = await watcher.event_get(None, None)
assert result == [event]
cache_mock.assert_called_once_with(None, None, None, None, -1, 0)
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
assert await watcher.packages()
async def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -64,13 +52,37 @@ async def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: Mo
assert status.status == BuildStatusEnum.Success
async def test_event_add(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must create new event
"""
event = Event("event", "object")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_add")
await watcher.event_add(event)
cache_mock.assert_called_once_with(event)
async def test_event_get(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must retrieve events
"""
event = Event("event", "object")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.event_get", return_value=[event])
result = await watcher.event_get(None, None)
assert result == [event]
cache_mock.assert_called_once_with(None, None, None, None, -1, 0)
async def test_logs_rotate(watcher: Watcher, mocker: MockerFixture) -> None:
"""
must rotate logs
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.logs_rotate")
await watcher.logs_rotate(42)
cache_mock.assert_called_once_with(42)
await watcher.logs_rotate(10)
cache_mock.assert_called_once_with(10)
async def test_package_archives(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -107,11 +119,12 @@ async def test_package_changes_get(watcher: Watcher, package_ahriman: Package, m
"""
must return package changes
"""
changes = Changes("sha")
changes = Changes()
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_changes_get",
return_value=changes)
assert await watcher.package_changes_get(package_ahriman.base) == changes
result = await watcher.package_changes_get(package_ahriman.base)
assert result == changes
cache_mock.assert_called_once_with(package_ahriman.base)
@@ -119,7 +132,7 @@ async def test_package_changes_update(watcher: Watcher, package_ahriman: Package
"""
must update package changes
"""
changes = Changes("sha")
changes = Changes()
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_changes_update")
await watcher.package_changes_update(package_ahriman.base, changes)
@@ -130,11 +143,12 @@ async def test_package_dependencies_get(watcher: Watcher, package_ahriman: Packa
"""
must return package dependencies
"""
dependencies = Dependencies({"path": [package_ahriman.base]})
dependencies = Dependencies()
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_dependencies_get",
return_value=dependencies)
assert await watcher.package_dependencies_get(package_ahriman.base) == dependencies
result = await watcher.package_dependencies_get(package_ahriman.base)
assert result == dependencies
cache_mock.assert_called_once_with(package_ahriman.base)
@@ -142,7 +156,7 @@ async def test_package_dependencies_update(watcher: Watcher, package_ahriman: Pa
"""
must update package dependencies
"""
dependencies = Dependencies({"path": [package_ahriman.base]})
dependencies = Dependencies()
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_dependencies_update")
await watcher.package_dependencies_update(package_ahriman.base, dependencies)
@@ -154,14 +168,12 @@ async def test_package_hold_update(watcher: Watcher, package_ahriman: Package, m
must update package hold status
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_hold_update")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
await watcher.package_hold_update(package_ahriman.base, enabled=True)
cache_mock.assert_called_once_with(package_ahriman.base, enabled=True)
_, status = watcher._known[package_ahriman.base]
assert status.is_held is True
broadcast_mock.assert_called_once_with(EventType.PackageHeld, package_ahriman.base, is_held=True)
async def test_package_hold_update_unknown(watcher: Watcher, package_ahriman: Package) -> None:
@@ -178,11 +190,9 @@ async def test_package_logs_add(watcher: Watcher, package_ahriman: Package, mock
"""
log_record = LogRecord(LogRecordId(package_ahriman.base, "1.0.0"), 42.0, "message")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_add")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.package_logs_add(log_record)
cache_mock.assert_called_once_with(log_record)
broadcast_mock.assert_called_once_with(EventType.BuildLog, package_ahriman.base, **log_record.view())
async def test_package_logs_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -193,7 +203,8 @@ async def test_package_logs_get(watcher: Watcher, package_ahriman: Package, mock
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_get",
return_value=[log_record])
assert await watcher.package_logs_get(package_ahriman.base) == [log_record]
result = await watcher.package_logs_get(package_ahriman.base)
assert result == [log_record]
cache_mock.assert_called_once_with(package_ahriman.base, None, None, -1, 0)
@@ -202,6 +213,7 @@ async def test_package_logs_remove(watcher: Watcher, package_ahriman: Package, m
must remove package logs
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_logs_remove")
await watcher.package_logs_remove(package_ahriman.base, None)
cache_mock.assert_called_once_with(package_ahriman.base, None)
@@ -211,9 +223,11 @@ async def test_package_patches_get(watcher: Watcher, package_ahriman: Package, m
must return package patches
"""
patch = PkgbuildPatch("key", "value")
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_get", return_value=[patch])
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_get",
return_value=[patch])
assert await watcher.package_patches_get(package_ahriman.base, None) == [patch]
result = await watcher.package_patches_get(package_ahriman.base, None)
assert result == [patch]
cache_mock.assert_called_once_with(package_ahriman.base, None)
@@ -222,6 +236,7 @@ async def test_package_patches_remove(watcher: Watcher, package_ahriman: Package
must remove package patches
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_patches_remove")
await watcher.package_patches_remove(package_ahriman.base, None)
cache_mock.assert_called_once_with(package_ahriman.base, None)
@@ -242,13 +257,11 @@ async def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker
must remove package base
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
await watcher.package_remove(package_ahriman.base)
assert not watcher._known
cache_mock.assert_called_once_with(package_ahriman.base)
broadcast_mock.assert_called_once_with(EventType.PackageRemoved, package_ahriman.base)
async def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -256,11 +269,8 @@ async def test_package_remove_unknown(watcher: Watcher, package_ahriman: Package
must not fail on unknown base removal
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_remove")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.package_remove(package_ahriman.base)
cache_mock.assert_called_once_with(package_ahriman.base)
broadcast_mock.assert_called_once_with(EventType.PackageRemoved, package_ahriman.base)
async def test_package_status_update(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
@@ -268,7 +278,6 @@ async def test_package_status_update(watcher: Watcher, package_ahriman: Package,
must update package status only for known package
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
@@ -276,9 +285,6 @@ async def test_package_status_update(watcher: Watcher, package_ahriman: Package,
package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Success
broadcast_mock.assert_called_once_with(
EventType.PackageStatusChanged, package_ahriman.base, status=BuildStatusEnum.Success.value,
)
async def test_package_status_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
@@ -287,7 +293,6 @@ async def test_package_status_update_preserves_hold(watcher: Watcher, package_ah
must preserve hold status on package status update
"""
mocker.patch("ahriman.core.status.local_client.LocalClient.package_status_update")
mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus(is_held=True))}
await watcher.package_status_update(package_ahriman.base, BuildStatusEnum.Success)
@@ -308,15 +313,10 @@ async def test_package_update(watcher: Watcher, package_ahriman: Package, mocker
must add package to cache
"""
cache_mock = mocker.patch("ahriman.core.status.local_client.LocalClient.package_update")
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.package_update(package_ahriman, BuildStatusEnum.Unknown)
assert await watcher.packages()
cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int))
broadcast_mock.assert_called_once_with(
EventType.PackageUpdated, package_ahriman.base,
status=BuildStatusEnum.Unknown.value, version=package_ahriman.version,
)
async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman: Package,
@@ -332,34 +332,22 @@ async def test_package_update_preserves_hold(watcher: Watcher, package_ahriman:
assert status.is_held is True
async def test_packages(watcher: Watcher, package_ahriman: Package) -> None:
"""
must return list of available packages
"""
assert not await watcher.packages()
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
assert await watcher.packages()
async def test_shutdown(watcher: Watcher, mocker: MockerFixture) -> None:
async def test_shutdown(watcher: Watcher) -> None:
"""
must gracefully shutdown watcher
"""
shutdown_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.shutdown")
_, queue = await watcher.event_bus.subscribe()
await watcher.shutdown()
shutdown_mock.assert_called_once_with()
message = queue.get_nowait()
assert message is None
async def test_status_update(watcher: Watcher, mocker: MockerFixture) -> None:
async def test_status_update(watcher: Watcher) -> None:
"""
must update service status
"""
broadcast_mock = mocker.patch("ahriman.core.status.event_bus.EventBus.broadcast")
await watcher.status_update(BuildStatusEnum.Success)
assert watcher.status.status == BuildStatusEnum.Success
broadcast_mock.assert_called_once_with(EventType.ServiceStatusChanged, None, status=BuildStatusEnum.Success.value)
def test_call(watcher: Watcher, package_ahriman: Package) -> None:
@@ -1 +0,0 @@
# schema testing goes in view class tests
@@ -1 +0,0 @@
# schema testing goes in view class tests
@@ -1,90 +1,27 @@
import asyncio
import pytest
from aiohttp.test_utils import TestClient
from asyncio import Queue
from multidict import MultiDict
from pytest_mock import MockerFixture
from unittest.mock import AsyncMock
from ahriman.core.status.watcher import Watcher
from ahriman.core.status.event_bus import SSEvent
from ahriman.models.event import EventType
from ahriman.models.package import Package
from ahriman.models.user_access import UserAccess
from ahriman.web.keys import WatcherKey
from ahriman.web.views.base import BaseView
from ahriman.web.views.v1.auditlog.event_bus import EventBusView
async def _producer(watcher: Watcher, package_ahriman: Package) -> None:
"""
create producer
Args:
watcher(Watcher): watcher test instance
package_ahriman(Package): package test instance
"""
await asyncio.sleep(0.1)
await watcher.event_bus.broadcast(EventType.PackageRemoved, package_ahriman.base)
await watcher.event_bus.broadcast(EventType.PackageUpdated, package_ahriman.base, status="success")
await asyncio.sleep(0.1)
await watcher.event_bus.shutdown()
async def test_get_permission() -> None:
"""
must return correct permission for the request
"""
for method in ("GET",):
request = pytest.helpers.request("", "", method)
assert await EventBusView.get_permission(request) == UserAccess.Full
async def test_get_permission_build_log() -> None:
"""
must return full permission for build log stream
"""
request = pytest.helpers.request("", "", "GET", params=MultiDict(event=EventType.BuildLog))
request = pytest.helpers.request("", "", "GET")
assert await EventBusView.get_permission(request) == UserAccess.Full
async def test_get_permission_build_log_with_read_events() -> None:
"""
must return full permission for mixed build log and read event stream
"""
request = pytest.helpers.request("", "", "GET", params=MultiDict([
("event", EventType.BuildLog),
("event", EventType.PackageUpdated),
]))
assert await EventBusView.get_permission(request) == UserAccess.Full
async def test_get_permission_invalid_event() -> None:
"""
must return full permission for invalid event type
"""
request = pytest.helpers.request("", "", "GET", params=MultiDict(event="invalid"))
assert await EventBusView.get_permission(request) == UserAccess.Full
async def test_get_permission_post() -> None:
"""
must use default permission for non-get requests
"""
request = pytest.helpers.request("", "", "POST", params=MultiDict(event=EventType.PackageUpdated))
assert await EventBusView.get_permission(request) == await BaseView.get_permission(request)
async def test_get_permission_read_events() -> None:
"""
must return read permission for package and status streams
"""
request = pytest.helpers.request("", "", "GET", params=MultiDict(
("event", event_type) for event_type in EventBusView.READ_EVENTS
))
assert await EventBusView.get_permission(request) == UserAccess.Read
def test_routes() -> None:
"""
must return correct routes
@@ -92,103 +29,74 @@ def test_routes() -> None:
assert EventBusView.ROUTES == ["/api/v1/events/stream"]
async def test_get(client: TestClient) -> None:
"""
must stream events via SSE
"""
watcher = next(iter(client.app[WatcherKey].values()))
async def _producer() -> None:
await asyncio.sleep(0.1)
await watcher.event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success")
await asyncio.sleep(0.1)
await watcher.event_bus.shutdown()
asyncio.create_task(_producer())
response = await client.get("/api/v1/events/stream")
assert response.status == 200
body = await response.text()
assert "package-updated" in body
assert "ahriman" in body
async def test_get_with_topic_filter(client: TestClient) -> None:
"""
must filter events by topic
"""
watcher = next(iter(client.app[WatcherKey].values()))
async def _producer() -> None:
await asyncio.sleep(0.1)
await watcher.event_bus.broadcast(EventType.PackageRemoved, "filtered")
await watcher.event_bus.broadcast(EventType.PackageUpdated, "ahriman", status="success")
await asyncio.sleep(0.1)
await watcher.event_bus.shutdown()
asyncio.create_task(_producer())
response = await client.get("/api/v1/events/stream", params={"event": "package-updated"})
assert response.status == 200
body = await response.text()
assert "package-updated" in body
assert "filtered" not in body
async def test_run_timeout() -> None:
"""
must handle timeout and continue loop
"""
queue = Queue()
async def _shutdown() -> None:
await asyncio.sleep(0.05)
queue.shutdown()
response = AsyncMock()
response.is_connected = lambda: True
response.ping_interval = 0.01
queue: Queue[SSEvent | None] = Queue()
async def _shutdown() -> None:
await asyncio.sleep(0.05)
await queue.put(None)
asyncio.create_task(_shutdown())
await EventBusView._run(response, queue)
async def test_get(client: TestClient, package_ahriman: Package) -> None:
"""
must stream events via SSE
"""
watcher = next(iter(client.app[WatcherKey].values()))
asyncio.create_task(_producer(watcher, package_ahriman))
request_schema = pytest.helpers.schema_request(EventBusView.get, location="querystring")
# no content validation here because it is a streaming response
assert not request_schema.validate({})
response = await client.get("/api/v1/events/stream")
assert response.status == 200
body = await response.text()
assert EventType.PackageUpdated in body
assert "ahriman" in body
async def test_get_with_topic_filter(client: TestClient, package_ahriman: Package) -> None:
"""
must filter events by topic
"""
watcher = next(iter(client.app[WatcherKey].values()))
asyncio.create_task(_producer(watcher, package_ahriman))
request_schema = pytest.helpers.schema_request(EventBusView.get, location="querystring")
payload = {"event": [EventType.PackageUpdated]}
assert not request_schema.validate(payload)
response = await client.get("/api/v1/events/stream", params=payload)
assert response.status == 200
body = await response.text()
assert EventType.PackageUpdated in body
assert EventType.PackageRemoved not in body
async def test_get_with_object_id_filter(client: TestClient, package_ahriman: Package) -> None:
"""
must filter events by object_id
"""
watcher = next(iter(client.app[WatcherKey].values()))
asyncio.create_task(_producer(watcher, package_ahriman))
request_schema = pytest.helpers.schema_request(EventBusView.get, location="querystring")
payload = {"object_id": "non-existent-package"}
assert not request_schema.validate(payload)
response = await client.get("/api/v1/events/stream", params=payload)
assert response.status == 200
body = await response.text()
assert "ahriman" not in body
async def test_get_bad_request(client: TestClient) -> None:
"""
must return bad request for invalid event type
"""
response_schema = pytest.helpers.schema_response(EventBusView.get, code=400)
response = await client.get("/api/v1/events/stream", params={"event": "invalid"})
assert response.status == 400
assert not response_schema.validate(await response.json())
async def test_get_not_found(client: TestClient) -> None:
"""
must return not found for unknown repository
"""
response_schema = pytest.helpers.schema_response(EventBusView.get, code=404)
response = await client.get("/api/v1/events/stream", params={"architecture": "unknown", "repository": "unknown"})
assert response.status == 404
assert not response_schema.validate(await response.json())
async def test_get_connection_reset(client: TestClient, mocker: MockerFixture) -> None:
"""
must handle connection reset
must handle connection reset gracefully
"""
mocker.patch.object(EventBusView, "_run", side_effect=ConnectionResetError)
response = await client.get("/api/v1/events/stream")
assert response.status == 200