mirror of
				https://github.com/arcan1s/ahriman.git
				synced 2025-11-03 23:33:41 +00:00 
			
		
		
		
	refactor: simplify lock processing in worker trigger
This commit is contained in:
		@ -17,7 +17,6 @@
 | 
				
			|||||||
# You should have received a copy of the GNU General Public License
 | 
					# You should have received a copy of the GNU General Public License
 | 
				
			||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
 | 
					# along with this program. If not, see <http://www.gnu.org/licenses/>.
 | 
				
			||||||
#
 | 
					#
 | 
				
			||||||
from collections import deque
 | 
					 | 
				
			||||||
from threading import Lock, Timer
 | 
					from threading import Lock, Timer
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from ahriman.core.configuration import Configuration
 | 
					from ahriman.core.configuration import Configuration
 | 
				
			||||||
@ -47,15 +46,14 @@ class WorkerTrigger(DistributedSystem):
 | 
				
			|||||||
        self.ping_interval = configuration.getint(section, "time_to_live", fallback=60) / 4.0
 | 
					        self.ping_interval = configuration.getint(section, "time_to_live", fallback=60) / 4.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self._lock = Lock()
 | 
					        self._lock = Lock()
 | 
				
			||||||
        self._timers: deque[Timer] = deque()  # because python doesn't have atomics
 | 
					        self._timer: Timer | None = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def create_timer(self) -> None:
 | 
					    def create_timer(self) -> None:
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        create timer object and put it to queue
 | 
					        create timer object and put it to queue
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        timer = Timer(self.ping_interval, self.ping)
 | 
					        self._timer = Timer(self.ping_interval, self.ping)
 | 
				
			||||||
        timer.start()
 | 
					        self._timer.start()
 | 
				
			||||||
        self._timers.append(timer)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def on_start(self) -> None:
 | 
					    def on_start(self) -> None:
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
@ -71,21 +69,19 @@ class WorkerTrigger(DistributedSystem):
 | 
				
			|||||||
        """
 | 
					        """
 | 
				
			||||||
        self.logger.info("removing instance %s in %s", self.worker, self.address)
 | 
					        self.logger.info("removing instance %s in %s", self.worker, self.address)
 | 
				
			||||||
        with self._lock:
 | 
					        with self._lock:
 | 
				
			||||||
            current_timers = self._timers.copy()  # will be used later
 | 
					            if self._timer is None:
 | 
				
			||||||
            self._timers.clear()  # clear timer list
 | 
					                return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for timer in current_timers:
 | 
					            self._timer.cancel()  # cancel remaining timers
 | 
				
			||||||
            timer.cancel()  # cancel remaining timers
 | 
					            self._timer = None  # reset state
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def ping(self) -> None:
 | 
					    def ping(self) -> None:
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        register itself as alive worker and update the timer
 | 
					        register itself as alive worker and update the timer
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        with self._lock:
 | 
					        with self._lock:
 | 
				
			||||||
            if not self._timers:  # make sure that there is related specific timer
 | 
					            if self._timer is None:  # no active timer set, exit loop
 | 
				
			||||||
                return
 | 
					                return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            self._timers.popleft()  # pop first timer
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            self.register()
 | 
					            self.register()
 | 
				
			||||||
            self.create_timer()
 | 
					            self.create_timer()
 | 
				
			||||||
 | 
				
			|||||||
@ -1,8 +1,6 @@
 | 
				
			|||||||
 | 
					from pytest_mock import MockerFixture
 | 
				
			||||||
from threading import Timer
 | 
					from threading import Timer
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from pytest_mock import MockerFixture
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
from ahriman.core.configuration import Configuration
 | 
					 | 
				
			||||||
from ahriman.core.distributed import WorkerTrigger
 | 
					from ahriman.core.distributed import WorkerTrigger
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -11,10 +9,8 @@ def test_create_timer(worker_trigger: WorkerTrigger) -> None:
 | 
				
			|||||||
    must create a timer and put it to queue
 | 
					    must create a timer and put it to queue
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    worker_trigger.create_timer()
 | 
					    worker_trigger.create_timer()
 | 
				
			||||||
 | 
					    assert worker_trigger._timer.function == worker_trigger.ping
 | 
				
			||||||
    timer = worker_trigger._timers.popleft()
 | 
					    worker_trigger._timer.cancel()
 | 
				
			||||||
    assert timer.function == worker_trigger.ping
 | 
					 | 
				
			||||||
    timer.cancel()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def test_on_start(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
 | 
					def test_on_start(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
 | 
				
			||||||
@ -31,7 +27,7 @@ def test_on_stop(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
 | 
				
			|||||||
    must unregister itself as worker
 | 
					    must unregister itself as worker
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    run_mock = mocker.patch("threading.Timer.cancel")
 | 
					    run_mock = mocker.patch("threading.Timer.cancel")
 | 
				
			||||||
    worker_trigger._timers.append(Timer(1, print))  # doesn't matter
 | 
					    worker_trigger._timer = Timer(1, print)  # doesn't matter
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    worker_trigger.on_stop()
 | 
					    worker_trigger.on_stop()
 | 
				
			||||||
    run_mock.assert_called_once_with()
 | 
					    run_mock.assert_called_once_with()
 | 
				
			||||||
@ -50,7 +46,7 @@ def test_ping(worker_trigger: WorkerTrigger, mocker: MockerFixture) -> None:
 | 
				
			|||||||
    """
 | 
					    """
 | 
				
			||||||
    run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register")
 | 
					    run_mock = mocker.patch("ahriman.core.distributed.WorkerTrigger.register")
 | 
				
			||||||
    timer_mock = mocker.patch("threading.Timer.start")
 | 
					    timer_mock = mocker.patch("threading.Timer.start")
 | 
				
			||||||
    worker_trigger._timers.append(Timer(1, print))  # doesn't matter
 | 
					    worker_trigger._timer = Timer(1, print)  # doesn't matter
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    worker_trigger.ping()
 | 
					    worker_trigger.ping()
 | 
				
			||||||
    run_mock.assert_called_once_with()
 | 
					    run_mock.assert_called_once_with()
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user