diff --git a/src/ahriman/core/distributed/distributed_system.py b/src/ahriman/core/distributed/distributed_system.py
index a85a09ff..e0d3d3bb 100644
--- a/src/ahriman/core/distributed/distributed_system.py
+++ b/src/ahriman/core/distributed/distributed_system.py
@@ -51,7 +51,7 @@ class DistributedSystem(Trigger, WebClient):
"time_to_live": {
"type": "integer",
"coerce": "integer",
- "min": 0,
+ "min": 1,
},
},
},
diff --git a/src/ahriman/core/distributed/worker_trigger.py b/src/ahriman/core/distributed/worker_trigger.py
index 8f0898e2..78885099 100644
--- a/src/ahriman/core/distributed/worker_trigger.py
+++ b/src/ahriman/core/distributed/worker_trigger.py
@@ -17,7 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
-from threading import Timer
+from collections import deque
+from threading import Lock, Timer
from ahriman.core.configuration import Configuration
from ahriman.core.distributed.distributed_system import DistributedSystem
@@ -30,7 +31,6 @@ class WorkerTrigger(DistributedSystem):
Attributes:
ping_interval(float): interval to call remote service in seconds, defined as ``worker.time_to_live / 4``
- timer(Timer): timer object
"""
def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None:
@@ -45,26 +45,47 @@ class WorkerTrigger(DistributedSystem):
section = next(iter(self.configuration_sections(configuration)))
self.ping_interval = configuration.getint(section, "time_to_live", fallback=60) / 4.0
- self.timer = Timer(self.ping_interval, self.ping)
+
+ self._lock = Lock()
+ self._timers: deque[Timer] = deque() # because python doesn't have atomics
+
+ def create_timer(self) -> None:
+ """
+ create timer object and put it to queue
+ """
+ timer = Timer(self.ping_interval, self.ping)
+ timer.start()
+ self._timers.append(timer)
def on_start(self) -> None:
"""
trigger action which will be called at the start of the application
"""
- self.logger.info("registering instance %s at %s", self.worker, self.address)
- self.timer.start()
+ self.logger.info("registering instance %s in %s", self.worker, self.address)
+ with self._lock:
+ self.create_timer()
def on_stop(self) -> None:
"""
trigger action which will be called before the stop of the application
"""
- self.logger.info("removing instance %s at %s", self.worker, self.address)
- self.timer.cancel()
+ self.logger.info("removing instance %s in %s", self.worker, self.address)
+ with self._lock:
+ current_timers = self._timers.copy() # will be used later
+ self._timers.clear() # clear timer list
+
+ for timer in current_timers:
+ timer.cancel() # cancel remaining timers
def ping(self) -> None:
"""
register itself as alive worker and update the timer
"""
- self.register()
- self.timer = Timer(self.ping_interval, self.ping)
- self.timer.start()
+ with self._lock:
+ if not self._timers: # make sure that there is related specific timer
+ return
+
+ self._timers.popleft() # pop first timer
+
+ self.register()
+ self.create_timer()
diff --git a/src/ahriman/core/distributed/workers_cache.py b/src/ahriman/core/distributed/workers_cache.py
index 7a981fea..809816e7 100644
--- a/src/ahriman/core/distributed/workers_cache.py
+++ b/src/ahriman/core/distributed/workers_cache.py
@@ -19,6 +19,8 @@
#
import time
+from threading import Lock
+
from ahriman.core.configuration import Configuration
from ahriman.core.log import LazyLogging
from ahriman.models.worker import Worker
@@ -40,6 +42,7 @@ class WorkersCache(LazyLogging):
configuration(Configuration): configuration instance
"""
self.time_to_live = configuration.getint("worker", "time_to_live", fallback=60)
+ self._lock = Lock()
self._workers: dict[str, tuple[Worker, float]] = {}
@property
@@ -51,17 +54,19 @@ class WorkersCache(LazyLogging):
list[Worker]: list of currently registered workers which have been seen not earlier than :attr:`time_to_live`
"""
valid_from = time.monotonic() - self.time_to_live
- return [
- worker
- for worker, last_seen in self._workers.values()
- if last_seen > valid_from
- ]
+ with self._lock:
+ return [
+ worker
+ for worker, last_seen in self._workers.values()
+ if last_seen > valid_from
+ ]
def workers_remove(self) -> None:
"""
remove all workers from the cache
"""
- self._workers = {}
+ with self._lock:
+ self._workers = {}
def workers_update(self, worker: Worker) -> None:
"""
@@ -70,4 +75,5 @@ class WorkersCache(LazyLogging):
Args:
worker(Worker): worker to register
"""
- self._workers[worker.identifier] = (worker, time.monotonic())
+ with self._lock:
+ self._workers[worker.identifier] = (worker, time.monotonic())
diff --git a/src/ahriman/core/spawn.py b/src/ahriman/core/spawn.py
index 00b277a7..5d7290f4 100644
--- a/src/ahriman/core/spawn.py
+++ b/src/ahriman/core/spawn.py
@@ -57,7 +57,7 @@ class Spawn(Thread, LazyLogging):
self.args_parser = args_parser
self.command_arguments = command_arguments
- self.lock = Lock()
+ self._lock = Lock()
self.active: dict[str, Process] = {}
# stupid pylint does not know that it is possible
self.queue: Queue[ProcessStatus | None] = Queue() # pylint: disable=unsubscriptable-object
@@ -141,7 +141,7 @@ class Spawn(Thread, LazyLogging):
daemon=True)
process.start()
- with self.lock:
+ with self._lock:
self.active[process_id] = process
return process_id
@@ -155,7 +155,7 @@ class Spawn(Thread, LazyLogging):
Returns:
bool: True in case if process still counts as active and False otherwise
"""
- with self.lock:
+ with self._lock:
return process_id in self.active
def key_import(self, key: str, server: str | None) -> str:
@@ -273,7 +273,7 @@ class Spawn(Thread, LazyLogging):
self.logger.info("process %s has been terminated with status %s, consumed time %ss",
terminated.process_id, terminated.status, terminated.consumed_time / 1000)
- with self.lock:
+ with self._lock:
process = self.active.pop(terminated.process_id, None)
if process is not None:
diff --git a/src/ahriman/core/status/watcher.py b/src/ahriman/core/status/watcher.py
index aada9e29..0affdb13 100644
--- a/src/ahriman/core/status/watcher.py
+++ b/src/ahriman/core/status/watcher.py
@@ -17,6 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
+from threading import Lock
+
from ahriman.core.database import SQLite
from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.log import LazyLogging
@@ -34,8 +36,6 @@ class Watcher(LazyLogging):
Attributes:
database(SQLite): database instance
- known(dict[str, tuple[Package, BuildStatus]]): list of known packages. For the most cases :attr:`packages`
- should be used instead
repository_id(RepositoryId): repository unique identifier
status(BuildStatus): daemon status
"""
@@ -51,7 +51,8 @@ class Watcher(LazyLogging):
self.repository_id = repository_id
self.database = database
- self.known: dict[str, tuple[Package, BuildStatus]] = {}
+ self._lock = Lock()
+ self._known: dict[str, tuple[Package, BuildStatus]] = {}
self.status = BuildStatus()
# special variables for updating logs
@@ -65,15 +66,18 @@ class Watcher(LazyLogging):
Returns:
list[tuple[Package, BuildStatus]]: list of packages together with their statuses
"""
- return list(self.known.values())
+ with self._lock:
+ return list(self._known.values())
def load(self) -> None:
"""
load packages from local database
"""
- self.known = {} # reset state
- for package, status in self.database.packages_get(self.repository_id):
- self.known[package.base] = (package, status)
+ with self._lock:
+ self._known = {
+ package.base: (package, status)
+ for package, status in self.database.packages_get(self.repository_id)
+ }
def logs_get(self, package_base: str, limit: int = -1, offset: int = 0) -> list[tuple[float, str]]:
"""
@@ -87,6 +91,7 @@ class Watcher(LazyLogging):
Returns:
list[tuple[float, str]]: package logs
"""
+ self.package_get(package_base)
return self.database.logs_get(package_base, limit, offset, self.repository_id)
def logs_remove(self, package_base: str, version: str | None) -> None:
@@ -123,12 +128,8 @@ class Watcher(LazyLogging):
Returns:
Changes: package changes if available
-
- Raises:
- UnknownPackageError: if no package found
"""
- if package_base not in self.known:
- raise UnknownPackageError(package_base)
+ self.package_get(package_base)
return self.database.changes_get(package_base, self.repository_id)
def package_get(self, package_base: str) -> tuple[Package, BuildStatus]:
@@ -145,7 +146,8 @@ class Watcher(LazyLogging):
UnknownPackageError: if no package found
"""
try:
- return self.known[package_base]
+ with self._lock:
+ return self._known[package_base]
except KeyError:
raise UnknownPackageError(package_base) from None
@@ -156,7 +158,8 @@ class Watcher(LazyLogging):
Args:
package_base(str): package base
"""
- self.known.pop(package_base, None)
+ with self._lock:
+ self._known.pop(package_base, None)
self.database.package_remove(package_base, self.repository_id)
self.logs_remove(package_base, None)
@@ -168,17 +171,12 @@ class Watcher(LazyLogging):
package_base(str): package base to update
status(BuildStatusEnum): new build status
package(Package | None): optional package description. In case if not set current properties will be used
-
- Raises:
- UnknownPackageError: if no package found
"""
if package is None:
- try:
- package, _ = self.known[package_base]
- except KeyError:
- raise UnknownPackageError(package_base) from None
+ package, _ = self.package_get(package_base)
full_status = BuildStatus(status)
- self.known[package_base] = (package, full_status)
+ with self._lock:
+ self._known[package_base] = (package, full_status)
self.database.package_update(package, full_status, self.repository_id)
def patches_get(self, package_base: str, variable: str | None) -> list[PkgbuildPatch]:
@@ -192,6 +190,7 @@ class Watcher(LazyLogging):
Returns:
list[PkgbuildPatch]: list of patches which are stored for the package
"""
+ self.package_get(package_base)
variables = [variable] if variable is not None else None
return self.database.patches_list(package_base, variables).get(package_base, [])
diff --git a/tests/ahriman/core/distributed/conftest.py b/tests/ahriman/core/distributed/conftest.py
index d40a7476..e2c1b040 100644
--- a/tests/ahriman/core/distributed/conftest.py
+++ b/tests/ahriman/core/distributed/conftest.py
@@ -1,7 +1,7 @@
import pytest
from ahriman.core.configuration import Configuration
-from ahriman.core.distributed import WorkersCache
+from ahriman.core.distributed import WorkerTrigger, WorkersCache
from ahriman.core.distributed.distributed_system import DistributedSystem
@@ -21,6 +21,22 @@ def distributed_system(configuration: Configuration) -> DistributedSystem:
return DistributedSystem(repository_id, configuration)
+@pytest.fixture
+def worker_trigger(configuration: Configuration) -> WorkerTrigger:
+ """
+ worker trigger fixture
+
+ Args:
+ configuration(Configuration): configuration fixture
+
+ Returns:
+ WorkerTrigger: worker trigger test instance
+ """
+ configuration.set_option("status", "address", "http://localhost:8081")
+ _, repository_id = configuration.check_loaded()
+ return WorkerTrigger(repository_id, configuration)
+
+
@pytest.fixture
def workers_cache(configuration: Configuration) -> WorkersCache:
"""
diff --git a/tests/ahriman/core/distributed/test_worker_trigger.py b/tests/ahriman/core/distributed/test_worker_trigger.py
index b0eb4418..4c340a32 100644
--- a/tests/ahriman/core/distributed/test_worker_trigger.py
+++ b/tests/ahriman/core/distributed/test_worker_trigger.py
@@ -1,52 +1,66 @@
+from threading import Timer
+
from pytest_mock import MockerFixture
from ahriman.core.configuration import Configuration
from ahriman.core.distributed import WorkerTrigger
-def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
+def test_create_timer(worker_trigger: WorkerTrigger) -> None:
+ """
+ must create a timer and put it to queue
+ """
+ worker_trigger.create_timer()
+
+ timer = worker_trigger._timers.popleft()
+ assert timer.function == worker_trigger.ping
+ timer.cancel()
+
+
+def test_on_start(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
"""
must register itself as worker
"""
- configuration.set_option("status", "address", "http://localhost:8081")
- run_mock = mocker.patch("threading.Timer.start")
- _, repository_id = configuration.check_loaded()
-
- WorkerTrigger(repository_id, configuration).on_start()
+ run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.create_timer")
+ worker_trigger.on_start()
run_mock.assert_called_once_with()
-def test_on_stop(configuration: Configuration, mocker: MockerFixture) -> None:
+def test_on_stop(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
"""
must unregister itself as worker
"""
- configuration.set_option("status", "address", "http://localhost:8081")
run_mock = mocker.patch("threading.Timer.cancel")
- _, repository_id = configuration.check_loaded()
+ worker_trigger._timers.append(Timer(1, print)) # doesn't matter
- WorkerTrigger(repository_id, configuration).on_stop()
+ worker_trigger.on_stop()
run_mock.assert_called_once_with()
-def test_on_stop_empty_timer(configuration: Configuration) -> None:
+def test_on_stop_empty_timer(worker_trigger: WorkerTrigger) -> None:
"""
must do not fail if no timer was started
"""
- configuration.set_option("status", "address", "http://localhost:8081")
- _, repository_id = configuration.check_loaded()
-
- WorkerTrigger(repository_id, configuration).on_stop()
+ worker_trigger.on_stop()
-def test_ping(configuration: Configuration, mocker: MockerFixture) -> None:
+def test_ping(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
"""
must correctly process timer action
"""
- configuration.set_option("status", "address", "http://localhost:8081")
run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register")
timer_mock = mocker.patch("threading.Timer.start")
- _, repository_id = configuration.check_loaded()
+ worker_trigger._timers.append(Timer(1, print)) # doesn't matter
- WorkerTrigger(repository_id, configuration).ping()
+ worker_trigger.ping()
run_mock.assert_called_once_with()
timer_mock.assert_called_once_with()
+
+
+def test_ping_empty_queue(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
+ """
+ must do nothing in case of empty queue
+ """
+ run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register")
+ worker_trigger.ping()
+ run_mock.assert_not_called()
diff --git a/tests/ahriman/core/status/test_watcher.py b/tests/ahriman/core/status/test_watcher.py
index ba36cfcb..288b8609 100644
--- a/tests/ahriman/core/status/test_watcher.py
+++ b/tests/ahriman/core/status/test_watcher.py
@@ -21,7 +21,7 @@ def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture)
watcher.load()
cache_mock.assert_called_once_with(watcher.repository_id)
- package, status = watcher.known[package_ahriman.base]
+ package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown
@@ -32,10 +32,10 @@ def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFi
"""
status = BuildStatus(BuildStatusEnum.Success)
mocker.patch("ahriman.core.database.SQLite.packages_get", return_value=[(package_ahriman, status)])
- watcher.known = {package_ahriman.base: (package_ahriman, status)}
+ watcher._known = {package_ahriman.base: (package_ahriman, status)}
watcher.load()
- _, status = watcher.known[package_ahriman.base]
+ _, status = watcher._known[package_ahriman.base]
assert status.status == BuildStatusEnum.Success
@@ -43,11 +43,21 @@ def test_logs_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixt
"""
must return package logs
"""
+ watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
logs_mock = mocker.patch("ahriman.core.database.SQLite.logs_get")
+
watcher.logs_get(package_ahriman.base, 1, 2)
logs_mock.assert_called_once_with(package_ahriman.base, 1, 2, watcher.repository_id)
+def test_logs_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
+ """
+ must raise UnknownPackageError on logs in case of unknown package
+ """
+ with pytest.raises(UnknownPackageError):
+ watcher.logs_get(package_ahriman.base)
+
+
def test_logs_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must remove package logs
@@ -94,7 +104,7 @@ def test_package_changes_get(watcher: Watcher, package_ahriman: Package, mocker:
must return package changes
"""
get_mock = mocker.patch("ahriman.core.database.SQLite.changes_get", return_value=Changes("sha"))
- watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())}
+ watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
assert watcher.package_changes_get(package_ahriman.base) == Changes("sha")
get_mock.assert_called_once_with(package_ahriman.base, watcher.repository_id)
@@ -102,7 +112,7 @@ def test_package_changes_get(watcher: Watcher, package_ahriman: Package, mocker:
def test_package_changes_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
"""
- must raise UnknownPackageError in case of unknown package
+ must raise UnknownPackageError on changes in case of unknown package
"""
with pytest.raises(UnknownPackageError):
watcher.package_changes_get(package_ahriman.base)
@@ -112,7 +122,7 @@ def test_package_get(watcher: Watcher, package_ahriman: Package) -> None:
"""
must return package status
"""
- watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())}
+ watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
package, status = watcher.package_get(package_ahriman.base)
assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown
@@ -132,10 +142,10 @@ def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: Mock
"""
cache_mock = mocker.patch("ahriman.core.database.SQLite.package_remove")
logs_mock = mocker.patch("ahriman.core.status.watcher.Watcher.logs_remove")
- watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())}
+ watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher.package_remove(package_ahriman.base)
- assert not watcher.known
+ assert not watcher._known
cache_mock.assert_called_once_with(package_ahriman.base, watcher.repository_id)
logs_mock.assert_called_once_with(package_ahriman.base, None)
@@ -158,7 +168,7 @@ def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: Mock
watcher.package_update(package_ahriman.base, BuildStatusEnum.Unknown, package_ahriman)
cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int), watcher.repository_id)
- package, status = watcher.known[package_ahriman.base]
+ package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown
@@ -168,11 +178,11 @@ def test_package_update_ping(watcher: Watcher, package_ahriman: Package, mocker:
must update package status only for known package
"""
cache_mock = mocker.patch("ahriman.core.database.SQLite.package_update")
- watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())}
+ watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher.package_update(package_ahriman.base, BuildStatusEnum.Success, None)
cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int), watcher.repository_id)
- package, status = watcher.known[package_ahriman.base]
+ package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Success
@@ -189,6 +199,7 @@ def test_patches_get(watcher: Watcher, package_ahriman: Package, mocker: MockerF
"""
must return patches for the package
"""
+ watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
patches_mock = mocker.patch("ahriman.core.database.SQLite.patches_list")
watcher.patches_get(package_ahriman.base, None)
@@ -201,6 +212,14 @@ def test_patches_get(watcher: Watcher, package_ahriman: Package, mocker: MockerF
])
+def test_patches_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
+ """
+ must raise UnknownPackageError on patches in case of unknown package
+ """
+ with pytest.raises(UnknownPackageError):
+ watcher.patches_get(package_ahriman.base, None)
+
+
def test_patches_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must remove patches for the package