diff --git a/src/ahriman/core/distributed/distributed_system.py b/src/ahriman/core/distributed/distributed_system.py index a85a09ff..e0d3d3bb 100644 --- a/src/ahriman/core/distributed/distributed_system.py +++ b/src/ahriman/core/distributed/distributed_system.py @@ -51,7 +51,7 @@ class DistributedSystem(Trigger, WebClient): "time_to_live": { "type": "integer", "coerce": "integer", - "min": 0, + "min": 1, }, }, }, diff --git a/src/ahriman/core/distributed/worker_trigger.py b/src/ahriman/core/distributed/worker_trigger.py index 8f0898e2..78885099 100644 --- a/src/ahriman/core/distributed/worker_trigger.py +++ b/src/ahriman/core/distributed/worker_trigger.py @@ -17,7 +17,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . # -from threading import Timer +from collections import deque +from threading import Lock, Timer from ahriman.core.configuration import Configuration from ahriman.core.distributed.distributed_system import DistributedSystem @@ -30,7 +31,6 @@ class WorkerTrigger(DistributedSystem): Attributes: ping_interval(float): interval to call remote service in seconds, defined as ``worker.time_to_live / 4`` - timer(Timer): timer object """ def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None: @@ -45,26 +45,47 @@ class WorkerTrigger(DistributedSystem): section = next(iter(self.configuration_sections(configuration))) self.ping_interval = configuration.getint(section, "time_to_live", fallback=60) / 4.0 - self.timer = Timer(self.ping_interval, self.ping) + + self._lock = Lock() + self._timers: deque[Timer] = deque() # because python doesn't have atomics + + def create_timer(self) -> None: + """ + create timer object and put it to queue + """ + timer = Timer(self.ping_interval, self.ping) + timer.start() + self._timers.append(timer) def on_start(self) -> None: """ trigger action which will be called at the start of the application """ - self.logger.info("registering instance %s at %s", self.worker, self.address) - self.timer.start() + self.logger.info("registering instance %s in %s", self.worker, self.address) + with self._lock: + self.create_timer() def on_stop(self) -> None: """ trigger action which will be called before the stop of the application """ - self.logger.info("removing instance %s at %s", self.worker, self.address) - self.timer.cancel() + self.logger.info("removing instance %s in %s", self.worker, self.address) + with self._lock: + current_timers = self._timers.copy() # will be used later + self._timers.clear() # clear timer list + + for timer in current_timers: + timer.cancel() # cancel remaining timers def ping(self) -> None: """ register itself as alive worker and update the timer """ - self.register() - self.timer = Timer(self.ping_interval, self.ping) - self.timer.start() + with self._lock: + if not self._timers: # make sure that there is related specific timer + return + + self._timers.popleft() # pop first timer + + self.register() + self.create_timer() diff --git a/src/ahriman/core/distributed/workers_cache.py b/src/ahriman/core/distributed/workers_cache.py index 7a981fea..809816e7 100644 --- a/src/ahriman/core/distributed/workers_cache.py +++ b/src/ahriman/core/distributed/workers_cache.py @@ -19,6 +19,8 @@ # import time +from threading import Lock + from ahriman.core.configuration import Configuration from ahriman.core.log import LazyLogging from ahriman.models.worker import Worker @@ -40,6 +42,7 @@ class WorkersCache(LazyLogging): configuration(Configuration): configuration instance """ self.time_to_live = configuration.getint("worker", "time_to_live", fallback=60) + self._lock = Lock() self._workers: dict[str, tuple[Worker, float]] = {} @property @@ -51,17 +54,19 @@ class WorkersCache(LazyLogging): list[Worker]: list of currently registered workers which have been seen not earlier than :attr:`time_to_live` """ valid_from = time.monotonic() - self.time_to_live - return [ - worker - for worker, last_seen in self._workers.values() - if last_seen > valid_from - ] + with self._lock: + return [ + worker + for worker, last_seen in self._workers.values() + if last_seen > valid_from + ] def workers_remove(self) -> None: """ remove all workers from the cache """ - self._workers = {} + with self._lock: + self._workers = {} def workers_update(self, worker: Worker) -> None: """ @@ -70,4 +75,5 @@ class WorkersCache(LazyLogging): Args: worker(Worker): worker to register """ - self._workers[worker.identifier] = (worker, time.monotonic()) + with self._lock: + self._workers[worker.identifier] = (worker, time.monotonic()) diff --git a/src/ahriman/core/spawn.py b/src/ahriman/core/spawn.py index 00b277a7..5d7290f4 100644 --- a/src/ahriman/core/spawn.py +++ b/src/ahriman/core/spawn.py @@ -57,7 +57,7 @@ class Spawn(Thread, LazyLogging): self.args_parser = args_parser self.command_arguments = command_arguments - self.lock = Lock() + self._lock = Lock() self.active: dict[str, Process] = {} # stupid pylint does not know that it is possible self.queue: Queue[ProcessStatus | None] = Queue() # pylint: disable=unsubscriptable-object @@ -141,7 +141,7 @@ class Spawn(Thread, LazyLogging): daemon=True) process.start() - with self.lock: + with self._lock: self.active[process_id] = process return process_id @@ -155,7 +155,7 @@ class Spawn(Thread, LazyLogging): Returns: bool: True in case if process still counts as active and False otherwise """ - with self.lock: + with self._lock: return process_id in self.active def key_import(self, key: str, server: str | None) -> str: @@ -273,7 +273,7 @@ class Spawn(Thread, LazyLogging): self.logger.info("process %s has been terminated with status %s, consumed time %ss", terminated.process_id, terminated.status, terminated.consumed_time / 1000) - with self.lock: + with self._lock: process = self.active.pop(terminated.process_id, None) if process is not None: diff --git a/src/ahriman/core/status/watcher.py b/src/ahriman/core/status/watcher.py index aada9e29..0affdb13 100644 --- a/src/ahriman/core/status/watcher.py +++ b/src/ahriman/core/status/watcher.py @@ -17,6 +17,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . # +from threading import Lock + from ahriman.core.database import SQLite from ahriman.core.exceptions import UnknownPackageError from ahriman.core.log import LazyLogging @@ -34,8 +36,6 @@ class Watcher(LazyLogging): Attributes: database(SQLite): database instance - known(dict[str, tuple[Package, BuildStatus]]): list of known packages. For the most cases :attr:`packages` - should be used instead repository_id(RepositoryId): repository unique identifier status(BuildStatus): daemon status """ @@ -51,7 +51,8 @@ class Watcher(LazyLogging): self.repository_id = repository_id self.database = database - self.known: dict[str, tuple[Package, BuildStatus]] = {} + self._lock = Lock() + self._known: dict[str, tuple[Package, BuildStatus]] = {} self.status = BuildStatus() # special variables for updating logs @@ -65,15 +66,18 @@ class Watcher(LazyLogging): Returns: list[tuple[Package, BuildStatus]]: list of packages together with their statuses """ - return list(self.known.values()) + with self._lock: + return list(self._known.values()) def load(self) -> None: """ load packages from local database """ - self.known = {} # reset state - for package, status in self.database.packages_get(self.repository_id): - self.known[package.base] = (package, status) + with self._lock: + self._known = { + package.base: (package, status) + for package, status in self.database.packages_get(self.repository_id) + } def logs_get(self, package_base: str, limit: int = -1, offset: int = 0) -> list[tuple[float, str]]: """ @@ -87,6 +91,7 @@ class Watcher(LazyLogging): Returns: list[tuple[float, str]]: package logs """ + self.package_get(package_base) return self.database.logs_get(package_base, limit, offset, self.repository_id) def logs_remove(self, package_base: str, version: str | None) -> None: @@ -123,12 +128,8 @@ class Watcher(LazyLogging): Returns: Changes: package changes if available - - Raises: - UnknownPackageError: if no package found """ - if package_base not in self.known: - raise UnknownPackageError(package_base) + self.package_get(package_base) return self.database.changes_get(package_base, self.repository_id) def package_get(self, package_base: str) -> tuple[Package, BuildStatus]: @@ -145,7 +146,8 @@ class Watcher(LazyLogging): UnknownPackageError: if no package found """ try: - return self.known[package_base] + with self._lock: + return self._known[package_base] except KeyError: raise UnknownPackageError(package_base) from None @@ -156,7 +158,8 @@ class Watcher(LazyLogging): Args: package_base(str): package base """ - self.known.pop(package_base, None) + with self._lock: + self._known.pop(package_base, None) self.database.package_remove(package_base, self.repository_id) self.logs_remove(package_base, None) @@ -168,17 +171,12 @@ class Watcher(LazyLogging): package_base(str): package base to update status(BuildStatusEnum): new build status package(Package | None): optional package description. In case if not set current properties will be used - - Raises: - UnknownPackageError: if no package found """ if package is None: - try: - package, _ = self.known[package_base] - except KeyError: - raise UnknownPackageError(package_base) from None + package, _ = self.package_get(package_base) full_status = BuildStatus(status) - self.known[package_base] = (package, full_status) + with self._lock: + self._known[package_base] = (package, full_status) self.database.package_update(package, full_status, self.repository_id) def patches_get(self, package_base: str, variable: str | None) -> list[PkgbuildPatch]: @@ -192,6 +190,7 @@ class Watcher(LazyLogging): Returns: list[PkgbuildPatch]: list of patches which are stored for the package """ + self.package_get(package_base) variables = [variable] if variable is not None else None return self.database.patches_list(package_base, variables).get(package_base, []) diff --git a/tests/ahriman/core/distributed/conftest.py b/tests/ahriman/core/distributed/conftest.py index d40a7476..e2c1b040 100644 --- a/tests/ahriman/core/distributed/conftest.py +++ b/tests/ahriman/core/distributed/conftest.py @@ -1,7 +1,7 @@ import pytest from ahriman.core.configuration import Configuration -from ahriman.core.distributed import WorkersCache +from ahriman.core.distributed import WorkerTrigger, WorkersCache from ahriman.core.distributed.distributed_system import DistributedSystem @@ -21,6 +21,22 @@ def distributed_system(configuration: Configuration) -> DistributedSystem: return DistributedSystem(repository_id, configuration) +@pytest.fixture +def worker_trigger(configuration: Configuration) -> WorkerTrigger: + """ + worker trigger fixture + + Args: + configuration(Configuration): configuration fixture + + Returns: + WorkerTrigger: worker trigger test instance + """ + configuration.set_option("status", "address", "http://localhost:8081") + _, repository_id = configuration.check_loaded() + return WorkerTrigger(repository_id, configuration) + + @pytest.fixture def workers_cache(configuration: Configuration) -> WorkersCache: """ diff --git a/tests/ahriman/core/distributed/test_worker_trigger.py b/tests/ahriman/core/distributed/test_worker_trigger.py index b0eb4418..4c340a32 100644 --- a/tests/ahriman/core/distributed/test_worker_trigger.py +++ b/tests/ahriman/core/distributed/test_worker_trigger.py @@ -1,52 +1,66 @@ +from threading import Timer + from pytest_mock import MockerFixture from ahriman.core.configuration import Configuration from ahriman.core.distributed import WorkerTrigger -def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None: +def test_create_timer(worker_trigger: WorkerTrigger) -> None: + """ + must create a timer and put it to queue + """ + worker_trigger.create_timer() + + timer = worker_trigger._timers.popleft() + assert timer.function == worker_trigger.ping + timer.cancel() + + +def test_on_start(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None: """ must register itself as worker """ - configuration.set_option("status", "address", "http://localhost:8081") - run_mock = mocker.patch("threading.Timer.start") - _, repository_id = configuration.check_loaded() - - WorkerTrigger(repository_id, configuration).on_start() + run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.create_timer") + worker_trigger.on_start() run_mock.assert_called_once_with() -def test_on_stop(configuration: Configuration, mocker: MockerFixture) -> None: +def test_on_stop(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None: """ must unregister itself as worker """ - configuration.set_option("status", "address", "http://localhost:8081") run_mock = mocker.patch("threading.Timer.cancel") - _, repository_id = configuration.check_loaded() + worker_trigger._timers.append(Timer(1, print)) # doesn't matter - WorkerTrigger(repository_id, configuration).on_stop() + worker_trigger.on_stop() run_mock.assert_called_once_with() -def test_on_stop_empty_timer(configuration: Configuration) -> None: +def test_on_stop_empty_timer(worker_trigger: WorkerTrigger) -> None: """ must do not fail if no timer was started """ - configuration.set_option("status", "address", "http://localhost:8081") - _, repository_id = configuration.check_loaded() - - WorkerTrigger(repository_id, configuration).on_stop() + worker_trigger.on_stop() -def test_ping(configuration: Configuration, mocker: MockerFixture) -> None: +def test_ping(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None: """ must correctly process timer action """ - configuration.set_option("status", "address", "http://localhost:8081") run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register") timer_mock = mocker.patch("threading.Timer.start") - _, repository_id = configuration.check_loaded() + worker_trigger._timers.append(Timer(1, print)) # doesn't matter - WorkerTrigger(repository_id, configuration).ping() + worker_trigger.ping() run_mock.assert_called_once_with() timer_mock.assert_called_once_with() + + +def test_ping_empty_queue(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None: + """ + must do nothing in case of empty queue + """ + run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register") + worker_trigger.ping() + run_mock.assert_not_called() diff --git a/tests/ahriman/core/status/test_watcher.py b/tests/ahriman/core/status/test_watcher.py index ba36cfcb..288b8609 100644 --- a/tests/ahriman/core/status/test_watcher.py +++ b/tests/ahriman/core/status/test_watcher.py @@ -21,7 +21,7 @@ def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) watcher.load() cache_mock.assert_called_once_with(watcher.repository_id) - package, status = watcher.known[package_ahriman.base] + package, status = watcher._known[package_ahriman.base] assert package == package_ahriman assert status.status == BuildStatusEnum.Unknown @@ -32,10 +32,10 @@ def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFi """ status = BuildStatus(BuildStatusEnum.Success) mocker.patch("ahriman.core.database.SQLite.packages_get", return_value=[(package_ahriman, status)]) - watcher.known = {package_ahriman.base: (package_ahriman, status)} + watcher._known = {package_ahriman.base: (package_ahriman, status)} watcher.load() - _, status = watcher.known[package_ahriman.base] + _, status = watcher._known[package_ahriman.base] assert status.status == BuildStatusEnum.Success @@ -43,11 +43,21 @@ def test_logs_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixt """ must return package logs """ + watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} logs_mock = mocker.patch("ahriman.core.database.SQLite.logs_get") + watcher.logs_get(package_ahriman.base, 1, 2) logs_mock.assert_called_once_with(package_ahriman.base, 1, 2, watcher.repository_id) +def test_logs_get_failed(watcher: Watcher, package_ahriman: Package) -> None: + """ + must raise UnknownPackageError on logs in case of unknown package + """ + with pytest.raises(UnknownPackageError): + watcher.logs_get(package_ahriman.base) + + def test_logs_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must remove package logs @@ -94,7 +104,7 @@ def test_package_changes_get(watcher: Watcher, package_ahriman: Package, mocker: must return package changes """ get_mock = mocker.patch("ahriman.core.database.SQLite.changes_get", return_value=Changes("sha")) - watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())} + watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} assert watcher.package_changes_get(package_ahriman.base) == Changes("sha") get_mock.assert_called_once_with(package_ahriman.base, watcher.repository_id) @@ -102,7 +112,7 @@ def test_package_changes_get(watcher: Watcher, package_ahriman: Package, mocker: def test_package_changes_get_failed(watcher: Watcher, package_ahriman: Package) -> None: """ - must raise UnknownPackageError in case of unknown package + must raise UnknownPackageError on changes in case of unknown package """ with pytest.raises(UnknownPackageError): watcher.package_changes_get(package_ahriman.base) @@ -112,7 +122,7 @@ def test_package_get(watcher: Watcher, package_ahriman: Package) -> None: """ must return package status """ - watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())} + watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} package, status = watcher.package_get(package_ahriman.base) assert package == package_ahriman assert status.status == BuildStatusEnum.Unknown @@ -132,10 +142,10 @@ def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: Mock """ cache_mock = mocker.patch("ahriman.core.database.SQLite.package_remove") logs_mock = mocker.patch("ahriman.core.status.watcher.Watcher.logs_remove") - watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())} + watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} watcher.package_remove(package_ahriman.base) - assert not watcher.known + assert not watcher._known cache_mock.assert_called_once_with(package_ahriman.base, watcher.repository_id) logs_mock.assert_called_once_with(package_ahriman.base, None) @@ -158,7 +168,7 @@ def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: Mock watcher.package_update(package_ahriman.base, BuildStatusEnum.Unknown, package_ahriman) cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int), watcher.repository_id) - package, status = watcher.known[package_ahriman.base] + package, status = watcher._known[package_ahriman.base] assert package == package_ahriman assert status.status == BuildStatusEnum.Unknown @@ -168,11 +178,11 @@ def test_package_update_ping(watcher: Watcher, package_ahriman: Package, mocker: must update package status only for known package """ cache_mock = mocker.patch("ahriman.core.database.SQLite.package_update") - watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())} + watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} watcher.package_update(package_ahriman.base, BuildStatusEnum.Success, None) cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int), watcher.repository_id) - package, status = watcher.known[package_ahriman.base] + package, status = watcher._known[package_ahriman.base] assert package == package_ahriman assert status.status == BuildStatusEnum.Success @@ -189,6 +199,7 @@ def test_patches_get(watcher: Watcher, package_ahriman: Package, mocker: MockerF """ must return patches for the package """ + watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())} patches_mock = mocker.patch("ahriman.core.database.SQLite.patches_list") watcher.patches_get(package_ahriman.base, None) @@ -201,6 +212,14 @@ def test_patches_get(watcher: Watcher, package_ahriman: Package, mocker: MockerF ]) +def test_patches_get_failed(watcher: Watcher, package_ahriman: Package) -> None: + """ + must raise UnknownPackageError on patches in case of unknown package + """ + with pytest.raises(UnknownPackageError): + watcher.patches_get(package_ahriman.base, None) + + def test_patches_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None: """ must remove patches for the package