feat: threadsafe services

In the most cases it was enough to just add lock. In case of worker
trigger, since there is atomic operation on timer, it was also required
to add queue (coz python doesn't have atomics)
This commit is contained in:
Evgenii Alekseev 2024-01-03 03:25:13 +02:00
parent 2d42424477
commit 8635ee8953
8 changed files with 150 additions and 75 deletions

View File

@ -51,7 +51,7 @@ class DistributedSystem(Trigger, WebClient):
"time_to_live": {
"type": "integer",
"coerce": "integer",
"min": 0,
"min": 1,
},
},
},

View File

@ -17,7 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from threading import Timer
from collections import deque
from threading import Lock, Timer
from ahriman.core.configuration import Configuration
from ahriman.core.distributed.distributed_system import DistributedSystem
@ -30,7 +31,6 @@ class WorkerTrigger(DistributedSystem):
Attributes:
ping_interval(float): interval to call remote service in seconds, defined as ``worker.time_to_live / 4``
timer(Timer): timer object
"""
def __init__(self, repository_id: RepositoryId, configuration: Configuration) -> None:
@ -45,26 +45,47 @@ class WorkerTrigger(DistributedSystem):
section = next(iter(self.configuration_sections(configuration)))
self.ping_interval = configuration.getint(section, "time_to_live", fallback=60) / 4.0
self.timer = Timer(self.ping_interval, self.ping)
self._lock = Lock()
self._timers: deque[Timer] = deque() # because python doesn't have atomics
def create_timer(self) -> None:
"""
create timer object and put it to queue
"""
timer = Timer(self.ping_interval, self.ping)
timer.start()
self._timers.append(timer)
def on_start(self) -> None:
"""
trigger action which will be called at the start of the application
"""
self.logger.info("registering instance %s at %s", self.worker, self.address)
self.timer.start()
self.logger.info("registering instance %s in %s", self.worker, self.address)
with self._lock:
self.create_timer()
def on_stop(self) -> None:
"""
trigger action which will be called before the stop of the application
"""
self.logger.info("removing instance %s at %s", self.worker, self.address)
self.timer.cancel()
self.logger.info("removing instance %s in %s", self.worker, self.address)
with self._lock:
current_timers = self._timers.copy() # will be used later
self._timers.clear() # clear timer list
for timer in current_timers:
timer.cancel() # cancel remaining timers
def ping(self) -> None:
"""
register itself as alive worker and update the timer
"""
self.register()
self.timer = Timer(self.ping_interval, self.ping)
self.timer.start()
with self._lock:
if not self._timers: # make sure that there is related specific timer
return
self._timers.popleft() # pop first timer
self.register()
self.create_timer()

View File

@ -19,6 +19,8 @@
#
import time
from threading import Lock
from ahriman.core.configuration import Configuration
from ahriman.core.log import LazyLogging
from ahriman.models.worker import Worker
@ -40,6 +42,7 @@ class WorkersCache(LazyLogging):
configuration(Configuration): configuration instance
"""
self.time_to_live = configuration.getint("worker", "time_to_live", fallback=60)
self._lock = Lock()
self._workers: dict[str, tuple[Worker, float]] = {}
@property
@ -51,17 +54,19 @@ class WorkersCache(LazyLogging):
list[Worker]: list of currently registered workers which have been seen not earlier than :attr:`time_to_live`
"""
valid_from = time.monotonic() - self.time_to_live
return [
worker
for worker, last_seen in self._workers.values()
if last_seen > valid_from
]
with self._lock:
return [
worker
for worker, last_seen in self._workers.values()
if last_seen > valid_from
]
def workers_remove(self) -> None:
"""
remove all workers from the cache
"""
self._workers = {}
with self._lock:
self._workers = {}
def workers_update(self, worker: Worker) -> None:
"""
@ -70,4 +75,5 @@ class WorkersCache(LazyLogging):
Args:
worker(Worker): worker to register
"""
self._workers[worker.identifier] = (worker, time.monotonic())
with self._lock:
self._workers[worker.identifier] = (worker, time.monotonic())

View File

@ -57,7 +57,7 @@ class Spawn(Thread, LazyLogging):
self.args_parser = args_parser
self.command_arguments = command_arguments
self.lock = Lock()
self._lock = Lock()
self.active: dict[str, Process] = {}
# stupid pylint does not know that it is possible
self.queue: Queue[ProcessStatus | None] = Queue() # pylint: disable=unsubscriptable-object
@ -141,7 +141,7 @@ class Spawn(Thread, LazyLogging):
daemon=True)
process.start()
with self.lock:
with self._lock:
self.active[process_id] = process
return process_id
@ -155,7 +155,7 @@ class Spawn(Thread, LazyLogging):
Returns:
bool: True in case if process still counts as active and False otherwise
"""
with self.lock:
with self._lock:
return process_id in self.active
def key_import(self, key: str, server: str | None) -> str:
@ -273,7 +273,7 @@ class Spawn(Thread, LazyLogging):
self.logger.info("process %s has been terminated with status %s, consumed time %ss",
terminated.process_id, terminated.status, terminated.consumed_time / 1000)
with self.lock:
with self._lock:
process = self.active.pop(terminated.process_id, None)
if process is not None:

View File

@ -17,6 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from threading import Lock
from ahriman.core.database import SQLite
from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.log import LazyLogging
@ -34,8 +36,6 @@ class Watcher(LazyLogging):
Attributes:
database(SQLite): database instance
known(dict[str, tuple[Package, BuildStatus]]): list of known packages. For the most cases :attr:`packages`
should be used instead
repository_id(RepositoryId): repository unique identifier
status(BuildStatus): daemon status
"""
@ -51,7 +51,8 @@ class Watcher(LazyLogging):
self.repository_id = repository_id
self.database = database
self.known: dict[str, tuple[Package, BuildStatus]] = {}
self._lock = Lock()
self._known: dict[str, tuple[Package, BuildStatus]] = {}
self.status = BuildStatus()
# special variables for updating logs
@ -65,15 +66,18 @@ class Watcher(LazyLogging):
Returns:
list[tuple[Package, BuildStatus]]: list of packages together with their statuses
"""
return list(self.known.values())
with self._lock:
return list(self._known.values())
def load(self) -> None:
"""
load packages from local database
"""
self.known = {} # reset state
for package, status in self.database.packages_get(self.repository_id):
self.known[package.base] = (package, status)
with self._lock:
self._known = {
package.base: (package, status)
for package, status in self.database.packages_get(self.repository_id)
}
def logs_get(self, package_base: str, limit: int = -1, offset: int = 0) -> list[tuple[float, str]]:
"""
@ -87,6 +91,7 @@ class Watcher(LazyLogging):
Returns:
list[tuple[float, str]]: package logs
"""
self.package_get(package_base)
return self.database.logs_get(package_base, limit, offset, self.repository_id)
def logs_remove(self, package_base: str, version: str | None) -> None:
@ -123,12 +128,8 @@ class Watcher(LazyLogging):
Returns:
Changes: package changes if available
Raises:
UnknownPackageError: if no package found
"""
if package_base not in self.known:
raise UnknownPackageError(package_base)
self.package_get(package_base)
return self.database.changes_get(package_base, self.repository_id)
def package_get(self, package_base: str) -> tuple[Package, BuildStatus]:
@ -145,7 +146,8 @@ class Watcher(LazyLogging):
UnknownPackageError: if no package found
"""
try:
return self.known[package_base]
with self._lock:
return self._known[package_base]
except KeyError:
raise UnknownPackageError(package_base) from None
@ -156,7 +158,8 @@ class Watcher(LazyLogging):
Args:
package_base(str): package base
"""
self.known.pop(package_base, None)
with self._lock:
self._known.pop(package_base, None)
self.database.package_remove(package_base, self.repository_id)
self.logs_remove(package_base, None)
@ -168,17 +171,12 @@ class Watcher(LazyLogging):
package_base(str): package base to update
status(BuildStatusEnum): new build status
package(Package | None): optional package description. In case if not set current properties will be used
Raises:
UnknownPackageError: if no package found
"""
if package is None:
try:
package, _ = self.known[package_base]
except KeyError:
raise UnknownPackageError(package_base) from None
package, _ = self.package_get(package_base)
full_status = BuildStatus(status)
self.known[package_base] = (package, full_status)
with self._lock:
self._known[package_base] = (package, full_status)
self.database.package_update(package, full_status, self.repository_id)
def patches_get(self, package_base: str, variable: str | None) -> list[PkgbuildPatch]:
@ -192,6 +190,7 @@ class Watcher(LazyLogging):
Returns:
list[PkgbuildPatch]: list of patches which are stored for the package
"""
self.package_get(package_base)
variables = [variable] if variable is not None else None
return self.database.patches_list(package_base, variables).get(package_base, [])

View File

@ -1,7 +1,7 @@
import pytest
from ahriman.core.configuration import Configuration
from ahriman.core.distributed import WorkersCache
from ahriman.core.distributed import WorkerTrigger, WorkersCache
from ahriman.core.distributed.distributed_system import DistributedSystem
@ -21,6 +21,22 @@ def distributed_system(configuration: Configuration) -> DistributedSystem:
return DistributedSystem(repository_id, configuration)
@pytest.fixture
def worker_trigger(configuration: Configuration) -> WorkerTrigger:
"""
worker trigger fixture
Args:
configuration(Configuration): configuration fixture
Returns:
WorkerTrigger: worker trigger test instance
"""
configuration.set_option("status", "address", "http://localhost:8081")
_, repository_id = configuration.check_loaded()
return WorkerTrigger(repository_id, configuration)
@pytest.fixture
def workers_cache(configuration: Configuration) -> WorkersCache:
"""

View File

@ -1,52 +1,66 @@
from threading import Timer
from pytest_mock import MockerFixture
from ahriman.core.configuration import Configuration
from ahriman.core.distributed import WorkerTrigger
def test_on_start(configuration: Configuration, mocker: MockerFixture) -> None:
def test_create_timer(worker_trigger: WorkerTrigger) -> None:
"""
must create a timer and put it to queue
"""
worker_trigger.create_timer()
timer = worker_trigger._timers.popleft()
assert timer.function == worker_trigger.ping
timer.cancel()
def test_on_start(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
"""
must register itself as worker
"""
configuration.set_option("status", "address", "http://localhost:8081")
run_mock = mocker.patch("threading.Timer.start")
_, repository_id = configuration.check_loaded()
WorkerTrigger(repository_id, configuration).on_start()
run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.create_timer")
worker_trigger.on_start()
run_mock.assert_called_once_with()
def test_on_stop(configuration: Configuration, mocker: MockerFixture) -> None:
def test_on_stop(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
"""
must unregister itself as worker
"""
configuration.set_option("status", "address", "http://localhost:8081")
run_mock = mocker.patch("threading.Timer.cancel")
_, repository_id = configuration.check_loaded()
worker_trigger._timers.append(Timer(1, print)) # doesn't matter
WorkerTrigger(repository_id, configuration).on_stop()
worker_trigger.on_stop()
run_mock.assert_called_once_with()
def test_on_stop_empty_timer(configuration: Configuration) -> None:
def test_on_stop_empty_timer(worker_trigger: WorkerTrigger) -> None:
"""
must do not fail if no timer was started
"""
configuration.set_option("status", "address", "http://localhost:8081")
_, repository_id = configuration.check_loaded()
WorkerTrigger(repository_id, configuration).on_stop()
worker_trigger.on_stop()
def test_ping(configuration: Configuration, mocker: MockerFixture) -> None:
def test_ping(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
"""
must correctly process timer action
"""
configuration.set_option("status", "address", "http://localhost:8081")
run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register")
timer_mock = mocker.patch("threading.Timer.start")
_, repository_id = configuration.check_loaded()
worker_trigger._timers.append(Timer(1, print)) # doesn't matter
WorkerTrigger(repository_id, configuration).ping()
worker_trigger.ping()
run_mock.assert_called_once_with()
timer_mock.assert_called_once_with()
def test_ping_empty_queue(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
"""
must do nothing in case of empty queue
"""
run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register")
worker_trigger.ping()
run_mock.assert_not_called()

View File

@ -21,7 +21,7 @@ def test_load(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture)
watcher.load()
cache_mock.assert_called_once_with(watcher.repository_id)
package, status = watcher.known[package_ahriman.base]
package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown
@ -32,10 +32,10 @@ def test_load_known(watcher: Watcher, package_ahriman: Package, mocker: MockerFi
"""
status = BuildStatus(BuildStatusEnum.Success)
mocker.patch("ahriman.core.database.SQLite.packages_get", return_value=[(package_ahriman, status)])
watcher.known = {package_ahriman.base: (package_ahriman, status)}
watcher._known = {package_ahriman.base: (package_ahriman, status)}
watcher.load()
_, status = watcher.known[package_ahriman.base]
_, status = watcher._known[package_ahriman.base]
assert status.status == BuildStatusEnum.Success
@ -43,11 +43,21 @@ def test_logs_get(watcher: Watcher, package_ahriman: Package, mocker: MockerFixt
"""
must return package logs
"""
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
logs_mock = mocker.patch("ahriman.core.database.SQLite.logs_get")
watcher.logs_get(package_ahriman.base, 1, 2)
logs_mock.assert_called_once_with(package_ahriman.base, 1, 2, watcher.repository_id)
def test_logs_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
"""
must raise UnknownPackageError on logs in case of unknown package
"""
with pytest.raises(UnknownPackageError):
watcher.logs_get(package_ahriman.base)
def test_logs_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must remove package logs
@ -94,7 +104,7 @@ def test_package_changes_get(watcher: Watcher, package_ahriman: Package, mocker:
must return package changes
"""
get_mock = mocker.patch("ahriman.core.database.SQLite.changes_get", return_value=Changes("sha"))
watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
assert watcher.package_changes_get(package_ahriman.base) == Changes("sha")
get_mock.assert_called_once_with(package_ahriman.base, watcher.repository_id)
@ -102,7 +112,7 @@ def test_package_changes_get(watcher: Watcher, package_ahriman: Package, mocker:
def test_package_changes_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
"""
must raise UnknownPackageError in case of unknown package
must raise UnknownPackageError on changes in case of unknown package
"""
with pytest.raises(UnknownPackageError):
watcher.package_changes_get(package_ahriman.base)
@ -112,7 +122,7 @@ def test_package_get(watcher: Watcher, package_ahriman: Package) -> None:
"""
must return package status
"""
watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
package, status = watcher.package_get(package_ahriman.base)
assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown
@ -132,10 +142,10 @@ def test_package_remove(watcher: Watcher, package_ahriman: Package, mocker: Mock
"""
cache_mock = mocker.patch("ahriman.core.database.SQLite.package_remove")
logs_mock = mocker.patch("ahriman.core.status.watcher.Watcher.logs_remove")
watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher.package_remove(package_ahriman.base)
assert not watcher.known
assert not watcher._known
cache_mock.assert_called_once_with(package_ahriman.base, watcher.repository_id)
logs_mock.assert_called_once_with(package_ahriman.base, None)
@ -158,7 +168,7 @@ def test_package_update(watcher: Watcher, package_ahriman: Package, mocker: Mock
watcher.package_update(package_ahriman.base, BuildStatusEnum.Unknown, package_ahriman)
cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int), watcher.repository_id)
package, status = watcher.known[package_ahriman.base]
package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Unknown
@ -168,11 +178,11 @@ def test_package_update_ping(watcher: Watcher, package_ahriman: Package, mocker:
must update package status only for known package
"""
cache_mock = mocker.patch("ahriman.core.database.SQLite.package_update")
watcher.known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
watcher.package_update(package_ahriman.base, BuildStatusEnum.Success, None)
cache_mock.assert_called_once_with(package_ahriman, pytest.helpers.anyvar(int), watcher.repository_id)
package, status = watcher.known[package_ahriman.base]
package, status = watcher._known[package_ahriman.base]
assert package == package_ahriman
assert status.status == BuildStatusEnum.Success
@ -189,6 +199,7 @@ def test_patches_get(watcher: Watcher, package_ahriman: Package, mocker: MockerF
"""
must return patches for the package
"""
watcher._known = {package_ahriman.base: (package_ahriman, BuildStatus())}
patches_mock = mocker.patch("ahriman.core.database.SQLite.patches_list")
watcher.patches_get(package_ahriman.base, None)
@ -201,6 +212,14 @@ def test_patches_get(watcher: Watcher, package_ahriman: Package, mocker: MockerF
])
def test_patches_get_failed(watcher: Watcher, package_ahriman: Package) -> None:
"""
must raise UnknownPackageError on patches in case of unknown package
"""
with pytest.raises(UnknownPackageError):
watcher.patches_get(package_ahriman.base, None)
def test_patches_remove(watcher: Watcher, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must remove patches for the package