mirror of
				https://github.com/arcan1s/ahriman.git
				synced 2025-11-03 23:33:41 +00:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			9a1b34b08d
			...
			0626078319
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 0626078319 | |||
| 9bbbd9da2e | |||
| 985307a89e | 
@ -475,7 +475,7 @@ The following environment variables are supported:
 | 
			
		||||
* ``AHRIMAN_REPOSITORY`` - repository name, default is ``aur-clone``.
 | 
			
		||||
* ``AHRIMAN_REPOSITORY_SERVER`` - optional override for the repository URL. Useful if you would like to download packages from remote instead of local filesystem.
 | 
			
		||||
* ``AHRIMAN_REPOSITORY_ROOT`` - repository root. Because of filesystem rights it is required to override default repository root. By default, it uses ``ahriman`` directory inside ahriman's home, which can be passed as mount volume.
 | 
			
		||||
* ``AHRIMAN_UNIX_SOCKET`` - full path to unix socket which is used by web server, default is empty. Note that more likely you would like to put it inside ``AHRIMAN_REPOSITORY_ROOT`` directory (e.g. ``/var/lib/ahriman/ahriman/ahriman-web.sock``) or to ``/tmp``.
 | 
			
		||||
* ``AHRIMAN_UNIX_SOCKET`` - full path to unix socket which is used by web server, default is empty. Note that more likely you would like to put it inside ``AHRIMAN_REPOSITORY_ROOT`` directory (e.g. ``/var/lib/ahriman/ahriman/ahriman-web.sock``) or to ``/run/ahriman``.
 | 
			
		||||
* ``AHRIMAN_USER`` - ahriman user, usually must not be overwritten, default is ``ahriman``.
 | 
			
		||||
* ``AHRIMAN_VALIDATE_CONFIGURATION`` - if set (default) validate service configuration.
 | 
			
		||||
 | 
			
		||||
@ -1318,7 +1318,7 @@ How to enable basic authorization
 | 
			
		||||
   .. code-block:: ini
 | 
			
		||||
 | 
			
		||||
      [web]
 | 
			
		||||
      unix_socket = /var/lib/ahriman/ahriman-web.sock
 | 
			
		||||
      unix_socket = /run/ahriman/ahriman-web.sock
 | 
			
		||||
 | 
			
		||||
   This socket path must be available for web service instance and must be available for all application instances (e.g. in case if you are using docker container - see above - you need to make sure that the socket is passed to the root filesystem).
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1 +1,2 @@
 | 
			
		||||
d /var/lib/ahriman 0755 ahriman ahriman
 | 
			
		||||
d /run/ahriman 0755 ahriman ahriman
 | 
			
		||||
@ -19,7 +19,6 @@
 | 
			
		||||
#
 | 
			
		||||
# pylint: disable=too-many-lines
 | 
			
		||||
import argparse
 | 
			
		||||
import tempfile
 | 
			
		||||
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from typing import TypeVar
 | 
			
		||||
@ -73,8 +72,7 @@ def _parser() -> argparse.ArgumentParser:
 | 
			
		||||
    parser.add_argument("-c", "--configuration", help="configuration path", type=Path,
 | 
			
		||||
                        default=Path("/") / "etc" / "ahriman.ini")
 | 
			
		||||
    parser.add_argument("--force", help="force run, remove file lock", action="store_true")
 | 
			
		||||
    parser.add_argument("-l", "--lock", help="lock file", type=Path,
 | 
			
		||||
                        default=Path(tempfile.gettempdir()) / "ahriman.lock")
 | 
			
		||||
    parser.add_argument("-l", "--lock", help="lock file", type=Path, default=Path("ahriman.pid"))
 | 
			
		||||
    parser.add_argument("--log-handler", help="explicit log handler specification. If none set, the handler will be "
 | 
			
		||||
                                              "guessed from environment",
 | 
			
		||||
                        type=LogHandler, choices=enum_values(LogHandler))
 | 
			
		||||
@ -628,8 +626,7 @@ def _set_repo_daemon_parser(root: SubParserAction) -> argparse.ArgumentParser:
 | 
			
		||||
    parser.add_argument("-y", "--refresh", help="download fresh package databases from the mirror before actions, "
 | 
			
		||||
                                                "-yy to force refresh even if up to date",
 | 
			
		||||
                        action="count", default=False)
 | 
			
		||||
    parser.set_defaults(handler=handlers.Daemon, exit_code=False,
 | 
			
		||||
                        lock=Path(tempfile.gettempdir()) / "ahriman-daemon.lock", package=[])
 | 
			
		||||
    parser.set_defaults(handler=handlers.Daemon, exit_code=False, lock=Path("ahriman-daemon.pid"), package=[])
 | 
			
		||||
    return parser
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -880,7 +877,7 @@ def _set_service_clean_parser(root: SubParserAction) -> argparse.ArgumentParser:
 | 
			
		||||
                        action=argparse.BooleanOptionalAction, default=False)
 | 
			
		||||
    parser.add_argument("--pacman", help="clear directory with pacman local database cache",
 | 
			
		||||
                        action=argparse.BooleanOptionalAction, default=False)
 | 
			
		||||
    parser.set_defaults(handler=handlers.Clean, quiet=True, unsafe=True)
 | 
			
		||||
    parser.set_defaults(handler=handlers.Clean, lock=None, quiet=True, unsafe=True)
 | 
			
		||||
    return parser
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1139,8 +1136,8 @@ def _set_web_parser(root: SubParserAction) -> argparse.ArgumentParser:
 | 
			
		||||
        argparse.ArgumentParser: created argument parser
 | 
			
		||||
    """
 | 
			
		||||
    parser = root.add_parser("web", help="web server", description="start web server", formatter_class=_formatter)
 | 
			
		||||
    parser.set_defaults(handler=handlers.Web, architecture="", lock=Path(tempfile.gettempdir()) / "ahriman-web.lock",
 | 
			
		||||
                        report=False, repository="", parser=_parser)
 | 
			
		||||
    parser.set_defaults(handler=handlers.Web, architecture="", lock=Path("ahriman-web.pid"), report=False,
 | 
			
		||||
                        repository="", parser=_parser)
 | 
			
		||||
    return parser
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -18,7 +18,10 @@
 | 
			
		||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
#
 | 
			
		||||
import argparse
 | 
			
		||||
import fcntl
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
from io import TextIOWrapper
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from types import TracebackType
 | 
			
		||||
from typing import Literal, Self
 | 
			
		||||
@ -36,7 +39,7 @@ from ahriman.models.waiter import Waiter
 | 
			
		||||
 | 
			
		||||
class Lock(LazyLogging):
 | 
			
		||||
    """
 | 
			
		||||
    wrapper for application lock file
 | 
			
		||||
    wrapper for application lock file. Credits for idea to https://github.com/bmhatfield/python-pidfile.git
 | 
			
		||||
 | 
			
		||||
    Attributes:
 | 
			
		||||
        force(bool): remove lock file on start if any
 | 
			
		||||
@ -70,8 +73,13 @@ class Lock(LazyLogging):
 | 
			
		||||
            repository_id(RepositoryId): repository unique identifier
 | 
			
		||||
            configuration(Configuration): configuration instance
 | 
			
		||||
        """
 | 
			
		||||
        self.path: Path | None = \
 | 
			
		||||
            args.lock.with_stem(f"{args.lock.stem}_{repository_id.id}") if args.lock is not None else None
 | 
			
		||||
        self.path: Path | None = None
 | 
			
		||||
        if args.lock is not None:
 | 
			
		||||
            self.path = args.lock.with_stem(f"{args.lock.stem}_{repository_id.id}")
 | 
			
		||||
            if not self.path.is_absolute():
 | 
			
		||||
                # prepend full path to the lock file
 | 
			
		||||
                self.path = Path("/") / "run" / "ahriman" / self.path
 | 
			
		||||
        self._pid_file: TextIOWrapper | None = None
 | 
			
		||||
 | 
			
		||||
        self.force: bool = args.force
 | 
			
		||||
        self.unsafe: bool = args.unsafe
 | 
			
		||||
@ -80,6 +88,72 @@ class Lock(LazyLogging):
 | 
			
		||||
        self.paths = configuration.repository_paths
 | 
			
		||||
        self.reporter = Client.load(repository_id, configuration, report=args.report)
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def perform_lock(fd: int) -> bool:
 | 
			
		||||
        """
 | 
			
		||||
        perform file lock
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            fd(int): file descriptor:
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            bool: True in case if file is locked and False otherwise
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
 | 
			
		||||
        except OSError:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def _open(self) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        create lock file
 | 
			
		||||
        """
 | 
			
		||||
        if self.path is None:
 | 
			
		||||
            return
 | 
			
		||||
        self._pid_file = self.path.open("a+")
 | 
			
		||||
 | 
			
		||||
    def _watch(self) -> bool:
 | 
			
		||||
        """
 | 
			
		||||
        watch until lock disappear
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            bool: True in case if file is locked and False otherwise
 | 
			
		||||
        """
 | 
			
		||||
        # there are reasons why we are not using inotify here. First of all, if we would use it, it would bring to
 | 
			
		||||
        # race conditions because multiple processes will be notified at the same time. Secondly, it is good library,
 | 
			
		||||
        # but platform-specific, and we only need to check if file exists
 | 
			
		||||
        if self._pid_file is None:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        waiter = Waiter(self.wait_timeout)
 | 
			
		||||
        return bool(waiter.wait(lambda fd: not self.perform_lock(fd), self._pid_file.fileno()))
 | 
			
		||||
 | 
			
		||||
    def _write(self, *, is_locked: bool = False) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        write pid to the lock file
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            is_locked(bool, optional): indicates if file was already locked or not (Default value = False)
 | 
			
		||||
 | 
			
		||||
        Raises:
 | 
			
		||||
            DuplicateRunError: if it cannot lock PID file
 | 
			
		||||
        """
 | 
			
		||||
        if self._pid_file is None:
 | 
			
		||||
            return
 | 
			
		||||
        if not is_locked:
 | 
			
		||||
            if not self.perform_lock(self._pid_file.fileno()):
 | 
			
		||||
                raise DuplicateRunError
 | 
			
		||||
 | 
			
		||||
        self._pid_file.seek(0)  # reset position and remove file content if any
 | 
			
		||||
        self._pid_file.truncate()
 | 
			
		||||
 | 
			
		||||
        self._pid_file.write(str(os.getpid()))  # write current pid
 | 
			
		||||
        self._pid_file.flush()  # flush data to disk
 | 
			
		||||
 | 
			
		||||
        self._pid_file.seek(0)  # reset position again
 | 
			
		||||
 | 
			
		||||
    def check_user(self) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        check if current user is actually owner of ahriman root
 | 
			
		||||
@ -100,46 +174,33 @@ class Lock(LazyLogging):
 | 
			
		||||
        """
 | 
			
		||||
        remove lock file
 | 
			
		||||
        """
 | 
			
		||||
        if self.path is None:
 | 
			
		||||
            return
 | 
			
		||||
        self.path.unlink(missing_ok=True)
 | 
			
		||||
        if self._pid_file is not None:  # close file descriptor
 | 
			
		||||
            try:
 | 
			
		||||
                self._pid_file.close()
 | 
			
		||||
            except IOError:
 | 
			
		||||
                pass  # suppress any IO errors occur
 | 
			
		||||
        if self.path is not None:  # remove lock file
 | 
			
		||||
            self.path.unlink(missing_ok=True)
 | 
			
		||||
 | 
			
		||||
    def create(self) -> None:
 | 
			
		||||
    def lock(self) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        create lock file
 | 
			
		||||
 | 
			
		||||
        Raises:
 | 
			
		||||
            DuplicateRunError: if lock exists and no force flag supplied
 | 
			
		||||
        create pid file
 | 
			
		||||
        """
 | 
			
		||||
        if self.path is None:
 | 
			
		||||
            return
 | 
			
		||||
        try:
 | 
			
		||||
            self.path.touch(exist_ok=self.force)
 | 
			
		||||
        except FileExistsError:
 | 
			
		||||
            raise DuplicateRunError from None
 | 
			
		||||
 | 
			
		||||
    def watch(self) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        watch until lock disappear
 | 
			
		||||
        """
 | 
			
		||||
        # there are reasons why we are not using inotify here. First of all, if we would use it, it would bring to
 | 
			
		||||
        # race conditions because multiple processes will be notified in the same time. Secondly, it is good library,
 | 
			
		||||
        # but platform-specific, and we only need to check if file exists
 | 
			
		||||
        if self.path is None:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        waiter = Waiter(self.wait_timeout)
 | 
			
		||||
        waiter.wait(self.path.is_file)
 | 
			
		||||
        if self.force:  # remove lock if force flag is set
 | 
			
		||||
            self.clear()
 | 
			
		||||
        self._open()
 | 
			
		||||
        is_locked = self._watch()
 | 
			
		||||
        self._write(is_locked=is_locked)
 | 
			
		||||
 | 
			
		||||
    def __enter__(self) -> Self:
 | 
			
		||||
        """
 | 
			
		||||
        default workflow is the following:
 | 
			
		||||
 | 
			
		||||
            #. Check user UID
 | 
			
		||||
            #. Check if there is lock file
 | 
			
		||||
            #. Check web status watcher status
 | 
			
		||||
            #. Open lock file
 | 
			
		||||
            #. Wait for lock file to be free
 | 
			
		||||
            #. Create lock file and directory tree
 | 
			
		||||
            #. Write current PID to the lock file
 | 
			
		||||
            #. Report to status page if enabled
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
@ -147,8 +208,7 @@ class Lock(LazyLogging):
 | 
			
		||||
        """
 | 
			
		||||
        self.check_user()
 | 
			
		||||
        self.check_version()
 | 
			
		||||
        self.watch()
 | 
			
		||||
        self.create()
 | 
			
		||||
        self.lock()
 | 
			
		||||
        self.reporter.status_update(BuildStatusEnum.Building)
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -46,8 +46,8 @@ class SyncAhrimanClient(SyncHttpClient):
 | 
			
		||||
            request.Session: created session object
 | 
			
		||||
        """
 | 
			
		||||
        if urlparse(self.address).scheme == "http+unix":
 | 
			
		||||
            import requests_unixsocket  # type: ignore[import-untyped]
 | 
			
		||||
            session: requests.Session = requests_unixsocket.Session()
 | 
			
		||||
            import requests_unixsocket
 | 
			
		||||
            session: requests.Session = requests_unixsocket.Session()  # type: ignore[no-untyped-call]
 | 
			
		||||
            session.headers["User-Agent"] = f"ahriman/{__version__}"
 | 
			
		||||
            return session
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -80,7 +80,7 @@ class Executor(PackageInfo, Cleaner):
 | 
			
		||||
                    # clear changes and update commit hash
 | 
			
		||||
                    self.reporter.package_changes_update(single.base, Changes(last_commit_sha))
 | 
			
		||||
                    # update dependencies list
 | 
			
		||||
                    dependencies = PackageArchive(self.paths.build_directory, single).depends_on()
 | 
			
		||||
                    dependencies = PackageArchive(self.paths.build_directory, single, self.pacman).depends_on()
 | 
			
		||||
                    self.reporter.package_dependencies_update(single.base, dependencies)
 | 
			
		||||
                    # update result set
 | 
			
		||||
                    result.add_updated(single)
 | 
			
		||||
 | 
			
		||||
@ -57,6 +57,7 @@ class AURPackage:
 | 
			
		||||
        provides(list[str]): list of packages which this package provides
 | 
			
		||||
        license(list[str]): list of package licenses
 | 
			
		||||
        keywords(list[str]): list of package keywords
 | 
			
		||||
        groups(list[str]): list of package groups
 | 
			
		||||
 | 
			
		||||
    Examples:
 | 
			
		||||
        Mainly this class must be used from class methods instead of default :func:`__init__()`::
 | 
			
		||||
@ -100,6 +101,7 @@ class AURPackage:
 | 
			
		||||
    provides: list[str] = field(default_factory=list)
 | 
			
		||||
    license: list[str] = field(default_factory=list)
 | 
			
		||||
    keywords: list[str] = field(default_factory=list)
 | 
			
		||||
    groups: list[str] = field(default_factory=list)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def from_json(cls, dump: dict[str, Any]) -> Self:
 | 
			
		||||
@ -153,6 +155,7 @@ class AURPackage:
 | 
			
		||||
            provides=package.provides,
 | 
			
		||||
            license=package.licenses,
 | 
			
		||||
            keywords=[],
 | 
			
		||||
            groups=package.groups,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
@ -191,6 +194,7 @@ class AURPackage:
 | 
			
		||||
            provides=dump["provides"],
 | 
			
		||||
            license=dump["licenses"],
 | 
			
		||||
            keywords=[],
 | 
			
		||||
            groups=dump["groups"],
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
 | 
			
		||||
@ -34,6 +34,13 @@ class Dependencies:
 | 
			
		||||
 | 
			
		||||
    paths: dict[str, list[str]] = field(default_factory=dict)
 | 
			
		||||
 | 
			
		||||
    def __post_init__(self) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        remove empty paths
 | 
			
		||||
        """
 | 
			
		||||
        paths = {path: packages for path, packages in self.paths.items() if packages}
 | 
			
		||||
        object.__setattr__(self, "paths", paths)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def from_json(cls, dump: dict[str, Any]) -> Self:
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										50
									
								
								src/ahriman/models/filesystem_package.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										50
									
								
								src/ahriman/models/filesystem_package.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,50 @@
 | 
			
		||||
#
 | 
			
		||||
# Copyright (c) 2021-2024 ahriman team.
 | 
			
		||||
#
 | 
			
		||||
# This file is part of ahriman
 | 
			
		||||
# (see https://github.com/arcan1s/ahriman).
 | 
			
		||||
#
 | 
			
		||||
# This program is free software: you can redistribute it and/or modify
 | 
			
		||||
# it under the terms of the GNU General Public License as published by
 | 
			
		||||
# the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
# (at your option) any later version.
 | 
			
		||||
#
 | 
			
		||||
# This program is distributed in the hope that it will be useful,
 | 
			
		||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
# GNU General Public License for more details.
 | 
			
		||||
#
 | 
			
		||||
# You should have received a copy of the GNU General Public License
 | 
			
		||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
#
 | 
			
		||||
from dataclasses import dataclass, field
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dataclass(frozen=True, kw_only=True)
 | 
			
		||||
class FilesystemPackage:
 | 
			
		||||
    """
 | 
			
		||||
    class representing a simplified model for the package installed to filesystem
 | 
			
		||||
 | 
			
		||||
    Attributes:
 | 
			
		||||
        package_name(str): package name
 | 
			
		||||
        depends(list[str]): list of package dependencies
 | 
			
		||||
        directories(list[Path]): list of directories this package contains
 | 
			
		||||
        files(list[Path]): list of files this package contains
 | 
			
		||||
        groups(list[str]): list of groups of the package
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    package_name: str
 | 
			
		||||
    groups: set[str]
 | 
			
		||||
    depends: set[str]
 | 
			
		||||
    directories: list[Path] = field(default_factory=list)
 | 
			
		||||
    files: list[Path] = field(default_factory=list)
 | 
			
		||||
 | 
			
		||||
    def __repr__(self) -> str:
 | 
			
		||||
        """
 | 
			
		||||
        generate string representation of object
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: unique string representation
 | 
			
		||||
        """
 | 
			
		||||
        return f'FilesystemPackage(package_name={self.package_name}, depends={self.depends})'
 | 
			
		||||
@ -23,8 +23,12 @@ from elftools.elf.elffile import ELFFile
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from typing import IO
 | 
			
		||||
 | 
			
		||||
from ahriman.core.alpm.pacman import Pacman
 | 
			
		||||
from ahriman.core.alpm.remote import OfficialSyncdb
 | 
			
		||||
from ahriman.core.exceptions import UnknownPackageError
 | 
			
		||||
from ahriman.core.util import walk
 | 
			
		||||
from ahriman.models.dependencies import Dependencies
 | 
			
		||||
from ahriman.models.filesystem_package import FilesystemPackage
 | 
			
		||||
from ahriman.models.package import Package
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -40,6 +44,7 @@ class PackageArchive:
 | 
			
		||||
 | 
			
		||||
    root: Path
 | 
			
		||||
    package: Package
 | 
			
		||||
    pacman: Pacman
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def dynamic_needed(binary_path: Path) -> list[str]:
 | 
			
		||||
@ -90,6 +95,27 @@ class PackageArchive:
 | 
			
		||||
 | 
			
		||||
        return magic_bytes == expected
 | 
			
		||||
 | 
			
		||||
    def _load_pacman_package(self, path: Path) -> FilesystemPackage:
 | 
			
		||||
        """
 | 
			
		||||
        load pacman package model from path
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            path(Path): path to package files database
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            FilesystemPackage: generated pacman package model with empty paths
 | 
			
		||||
        """
 | 
			
		||||
        package_name, *_ = path.parent.name.rsplit("-", 2)
 | 
			
		||||
        try:
 | 
			
		||||
            pacman_package = OfficialSyncdb.info(package_name, pacman=self.pacman)
 | 
			
		||||
            return FilesystemPackage(
 | 
			
		||||
                package_name=package_name,
 | 
			
		||||
                groups=set(pacman_package.groups),
 | 
			
		||||
                depends=set(pacman_package.depends),
 | 
			
		||||
            )
 | 
			
		||||
        except UnknownPackageError:
 | 
			
		||||
            return FilesystemPackage(package_name=package_name, groups=set(), depends=set())
 | 
			
		||||
 | 
			
		||||
    def depends_on(self) -> Dependencies:
 | 
			
		||||
        """
 | 
			
		||||
        extract packages and paths which are required for this package
 | 
			
		||||
@ -98,17 +124,49 @@ class PackageArchive:
 | 
			
		||||
            Dependencies: map of the package name to set of paths used by this package
 | 
			
		||||
        """
 | 
			
		||||
        dependencies, roots = self.depends_on_paths()
 | 
			
		||||
        installed_packages = self.installed_packages()
 | 
			
		||||
 | 
			
		||||
        result: dict[str, list[str]] = {}
 | 
			
		||||
        for package, (directories, files) in self.installed_packages().items():
 | 
			
		||||
            if package in self.package.packages:
 | 
			
		||||
        # build initial map of file path -> packages containing this path
 | 
			
		||||
        # in fact, keys will contain all libraries the package linked to and all directories it contains
 | 
			
		||||
        dependencies_per_path: dict[Path, list[FilesystemPackage]] = {}
 | 
			
		||||
        for package_base, package in installed_packages.items():
 | 
			
		||||
            if package_base in self.package.packages:
 | 
			
		||||
                continue  # skip package itself
 | 
			
		||||
 | 
			
		||||
            required_by = [directory for directory in directories if directory in roots]
 | 
			
		||||
            required_by.extend(library for library in files if library.name in dependencies)
 | 
			
		||||
            required_by = [directory for directory in package.directories if directory in roots]
 | 
			
		||||
            required_by.extend(library for library in package.files if library.name in dependencies)
 | 
			
		||||
 | 
			
		||||
            for path in required_by:
 | 
			
		||||
                result.setdefault(str(path), []).append(package)
 | 
			
		||||
                dependencies_per_path.setdefault(path, []).append(package)
 | 
			
		||||
 | 
			
		||||
        # reduce trees
 | 
			
		||||
        result = {}
 | 
			
		||||
        base_packages = OfficialSyncdb.info("base", pacman=self.pacman).depends
 | 
			
		||||
        # sort items from children directories to root
 | 
			
		||||
        for path, packages in reversed(sorted(dependencies_per_path.items())):
 | 
			
		||||
            package_names = [package.package_name for package in packages]
 | 
			
		||||
            reduced_packages_list = [
 | 
			
		||||
                package.package_name
 | 
			
		||||
                for package in packages
 | 
			
		||||
                # if there is any package which is dependency of this package, we can skip it here
 | 
			
		||||
                if not package.depends.intersection(package_names)
 | 
			
		||||
            ]
 | 
			
		||||
 | 
			
		||||
            # skip if this path belongs to the one of the base packages
 | 
			
		||||
            if any(package in reduced_packages_list for package in base_packages):
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            # check if there is already parent of current path in the result and has the same packages
 | 
			
		||||
            for children_path, children_packages in result.items():
 | 
			
		||||
                if not Path(children_path).is_relative_to(path):
 | 
			
		||||
                    continue
 | 
			
		||||
                reduced_packages_list = [
 | 
			
		||||
                    package_name
 | 
			
		||||
                    for package_name in reduced_packages_list
 | 
			
		||||
                    if package_name not in children_packages
 | 
			
		||||
                ]
 | 
			
		||||
 | 
			
		||||
            result[str(path)] = reduced_packages_list
 | 
			
		||||
 | 
			
		||||
        return Dependencies(result)
 | 
			
		||||
 | 
			
		||||
@ -130,7 +188,7 @@ class PackageArchive:
 | 
			
		||||
 | 
			
		||||
        return dependencies, roots
 | 
			
		||||
 | 
			
		||||
    def installed_packages(self) -> dict[str, tuple[list[Path], list[Path]]]:
 | 
			
		||||
    def installed_packages(self) -> dict[str, FilesystemPackage]:
 | 
			
		||||
        """
 | 
			
		||||
        extract list of the installed packages and their content
 | 
			
		||||
 | 
			
		||||
@ -142,24 +200,23 @@ class PackageArchive:
 | 
			
		||||
 | 
			
		||||
        pacman_local_files = self.root / "var" / "lib" / "pacman" / "local"
 | 
			
		||||
        for path in filter(lambda fn: fn.name == "files", walk(pacman_local_files)):
 | 
			
		||||
            package, *_ = path.parent.name.rsplit("-", 2)
 | 
			
		||||
            package = self._load_pacman_package(path)
 | 
			
		||||
 | 
			
		||||
            directories, files = [], []
 | 
			
		||||
            is_files = False
 | 
			
		||||
            is_files_section = False
 | 
			
		||||
            for line in path.read_text(encoding="utf8").splitlines():
 | 
			
		||||
                if not line:  # skip empty lines
 | 
			
		||||
                    continue
 | 
			
		||||
                if line.startswith("%") and line.endswith("%"):  # directive started
 | 
			
		||||
                    is_files = line == "%FILES%"
 | 
			
		||||
                if not is_files:  # not a files directive
 | 
			
		||||
                    is_files_section = line == "%FILES%"
 | 
			
		||||
                if not is_files_section:  # not a files directive
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                entry = Path(line)
 | 
			
		||||
                if line.endswith("/"):  # simple check if it is directory
 | 
			
		||||
                    directories.append(entry)
 | 
			
		||||
                    package.directories.append(entry)
 | 
			
		||||
                else:
 | 
			
		||||
                    files.append(entry)
 | 
			
		||||
                    package.files.append(entry)
 | 
			
		||||
 | 
			
		||||
            result[package] = directories, files
 | 
			
		||||
            result[package.package_name] = package
 | 
			
		||||
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
@ -21,27 +21,87 @@ import time
 | 
			
		||||
 | 
			
		||||
from collections.abc import Callable
 | 
			
		||||
from dataclasses import dataclass, field
 | 
			
		||||
from typing import ParamSpec
 | 
			
		||||
from typing import Literal, ParamSpec
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
Params = ParamSpec("Params")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dataclass(frozen=True)
 | 
			
		||||
class WaiterResult:
 | 
			
		||||
    """
 | 
			
		||||
    representation of a waiter result. This class should not be used directly, use derivatives instead
 | 
			
		||||
 | 
			
		||||
    Attributes:
 | 
			
		||||
        took(float): consumed time in seconds
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    took: float
 | 
			
		||||
 | 
			
		||||
    def __bool__(self) -> bool:
 | 
			
		||||
        """
 | 
			
		||||
        indicates whether the waiter completed with success or not
 | 
			
		||||
 | 
			
		||||
        Raises:
 | 
			
		||||
            NotImplementedError: not implemented method
 | 
			
		||||
        """
 | 
			
		||||
        raise NotImplementedError
 | 
			
		||||
 | 
			
		||||
    def __float__(self) -> float:
 | 
			
		||||
        """
 | 
			
		||||
        extract time spent to retrieve the result in seconds
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            float: consumed time in seconds
 | 
			
		||||
        """
 | 
			
		||||
        return self.took
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class WaiterTaskFinished(WaiterResult):
 | 
			
		||||
    """
 | 
			
		||||
    a waiter result used to notify that the task has been completed successfully
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __bool__(self) -> Literal[True]:
 | 
			
		||||
        """
 | 
			
		||||
        indicates whether the waiter completed with success or not
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            Literal[True]: always False
 | 
			
		||||
        """
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class WaiterTimedOut(WaiterResult):
 | 
			
		||||
    """
 | 
			
		||||
    a waiter result used to notify that the waiter run out of time
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __bool__(self) -> Literal[False]:
 | 
			
		||||
        """
 | 
			
		||||
        indicates whether the waiter completed with success or not
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            Literal[False]: always False
 | 
			
		||||
        """
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dataclass(frozen=True)
 | 
			
		||||
class Waiter:
 | 
			
		||||
    """
 | 
			
		||||
    simple waiter implementation
 | 
			
		||||
 | 
			
		||||
    Attributes:
 | 
			
		||||
        interval(int): interval in seconds between checks
 | 
			
		||||
        interval(float): interval in seconds between checks
 | 
			
		||||
        start_time(float): monotonic time of the waiter start. More likely must not be assigned explicitly
 | 
			
		||||
        wait_timeout(int): timeout in seconds to wait for. Negative value will result in immediate exit. Zero value
 | 
			
		||||
        wait_timeout(float): timeout in seconds to wait for. Negative value will result in immediate exit. Zero value
 | 
			
		||||
    means infinite timeout
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    wait_timeout: int
 | 
			
		||||
    wait_timeout: float
 | 
			
		||||
    start_time: float = field(default_factory=time.monotonic, kw_only=True)
 | 
			
		||||
    interval: int = field(default=10, kw_only=True)
 | 
			
		||||
    interval: float = field(default=10, kw_only=True)
 | 
			
		||||
 | 
			
		||||
    def is_timed_out(self) -> bool:
 | 
			
		||||
        """
 | 
			
		||||
@ -51,10 +111,10 @@ class Waiter:
 | 
			
		||||
            bool: True in case current monotonic time is more than :attr:`start_time` and :attr:`wait_timeout`
 | 
			
		||||
            doesn't equal to 0
 | 
			
		||||
        """
 | 
			
		||||
        since_start: float = time.monotonic() - self.start_time
 | 
			
		||||
        since_start = time.monotonic() - self.start_time
 | 
			
		||||
        return self.wait_timeout != 0 and since_start > self.wait_timeout
 | 
			
		||||
 | 
			
		||||
    def wait(self, in_progress: Callable[Params, bool], *args: Params.args, **kwargs: Params.kwargs) -> float:
 | 
			
		||||
    def wait(self, in_progress: Callable[Params, bool], *args: Params.args, **kwargs: Params.kwargs) -> WaiterResult:
 | 
			
		||||
        """
 | 
			
		||||
        wait until requirements are not met
 | 
			
		||||
 | 
			
		||||
@ -64,9 +124,12 @@ class Waiter:
 | 
			
		||||
            **kwargs(Params.kwargs): keyword arguments for check call
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            float: consumed time in seconds
 | 
			
		||||
            WaiterResult: consumed time in seconds
 | 
			
		||||
        """
 | 
			
		||||
        while not self.is_timed_out() and in_progress(*args, **kwargs):
 | 
			
		||||
        while not (timed_out := self.is_timed_out()) and in_progress(*args, **kwargs):
 | 
			
		||||
            time.sleep(self.interval)
 | 
			
		||||
        took = time.monotonic() - self.start_time
 | 
			
		||||
 | 
			
		||||
        return time.monotonic() - self.start_time
 | 
			
		||||
        if timed_out:
 | 
			
		||||
            return WaiterTimedOut(took)
 | 
			
		||||
        return WaiterTaskFinished(took)
 | 
			
		||||
 | 
			
		||||
@ -1097,9 +1097,10 @@ def test_subparsers_repo_update_option_refresh(parser: argparse.ArgumentParser)
 | 
			
		||||
 | 
			
		||||
def test_subparsers_service_clean(parser: argparse.ArgumentParser) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    service-clean command must imply quiet and unsafe
 | 
			
		||||
    service-clean command must imply lock, quiet and unsafe
 | 
			
		||||
    """
 | 
			
		||||
    args = parser.parse_args(["service-clean"])
 | 
			
		||||
    assert args.lock is None
 | 
			
		||||
    assert args.quiet
 | 
			
		||||
    assert args.unsafe
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,10 +1,12 @@
 | 
			
		||||
import argparse
 | 
			
		||||
import fcntl
 | 
			
		||||
import os
 | 
			
		||||
import pytest
 | 
			
		||||
import tempfile
 | 
			
		||||
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from pytest_mock import MockerFixture
 | 
			
		||||
from unittest.mock import call as MockCall
 | 
			
		||||
from tempfile import NamedTemporaryFile
 | 
			
		||||
from unittest.mock import MagicMock, call as MockCall
 | 
			
		||||
 | 
			
		||||
from ahriman import __version__
 | 
			
		||||
from ahriman.application.lock import Lock
 | 
			
		||||
@ -22,14 +24,113 @@ def test_path(args: argparse.Namespace, configuration: Configuration) -> None:
 | 
			
		||||
 | 
			
		||||
    assert Lock(args, repository_id, configuration).path is None
 | 
			
		||||
 | 
			
		||||
    args.lock = Path("/run/ahriman.lock")
 | 
			
		||||
    assert Lock(args, repository_id, configuration).path == Path("/run/ahriman_x86_64-aur-clone.lock")
 | 
			
		||||
    args.lock = Path("/run/ahriman.pid")
 | 
			
		||||
    assert Lock(args, repository_id, configuration).path == Path("/run/ahriman_x86_64-aur-clone.pid")
 | 
			
		||||
 | 
			
		||||
    args.lock = Path("ahriman.pid")
 | 
			
		||||
    assert Lock(args, repository_id, configuration).path == Path("/run/ahriman/ahriman_x86_64-aur-clone.pid")
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(ValueError):
 | 
			
		||||
        args.lock = Path("/")
 | 
			
		||||
        assert Lock(args, repository_id, configuration).path  # special case
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_perform_lock(mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must lock file with fcntl
 | 
			
		||||
    """
 | 
			
		||||
    flock_mock = mocker.patch("fcntl.flock")
 | 
			
		||||
    assert Lock.perform_lock(1)
 | 
			
		||||
    flock_mock.assert_called_once_with(1, fcntl.LOCK_EX | fcntl.LOCK_NB)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_perform_lock_exception(mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return False on OSError
 | 
			
		||||
    """
 | 
			
		||||
    mocker.patch("fcntl.flock", side_effect=OSError)
 | 
			
		||||
    assert not Lock.perform_lock(1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_open(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must open file
 | 
			
		||||
    """
 | 
			
		||||
    open_mock = mocker.patch("pathlib.Path.open")
 | 
			
		||||
    lock.path = Path("ahriman.pid")
 | 
			
		||||
 | 
			
		||||
    lock._open()
 | 
			
		||||
    open_mock.assert_called_once_with("a+")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_open_skip(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must skip file opening if path is not set
 | 
			
		||||
    """
 | 
			
		||||
    open_mock = mocker.patch("pathlib.Path.open")
 | 
			
		||||
    lock._open()
 | 
			
		||||
    open_mock.assert_not_called()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_watch(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must check if lock file exists
 | 
			
		||||
    """
 | 
			
		||||
    lock._pid_file = MagicMock()
 | 
			
		||||
    lock._pid_file.fileno.return_value = 1
 | 
			
		||||
    wait_mock = mocker.patch("ahriman.models.waiter.Waiter.wait")
 | 
			
		||||
 | 
			
		||||
    lock._watch()
 | 
			
		||||
    wait_mock.assert_called_once_with(pytest.helpers.anyvar(int), 1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_watch_skip(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must skip watch on empty path
 | 
			
		||||
    """
 | 
			
		||||
    mocker.patch("ahriman.application.lock.Lock.perform_lock", return_value=True)
 | 
			
		||||
    lock._watch()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_write(lock: Lock) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must write PID to lock file
 | 
			
		||||
    """
 | 
			
		||||
    with NamedTemporaryFile("a+") as pid_file:
 | 
			
		||||
        lock._pid_file = pid_file
 | 
			
		||||
        lock._write(is_locked=False)
 | 
			
		||||
 | 
			
		||||
        assert int(lock._pid_file.readline()) == os.getpid()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_write_skip(lock: Lock) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must skip write to file if no path set
 | 
			
		||||
    """
 | 
			
		||||
    lock._write(is_locked=False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_write_locked(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must raise DuplicateRunError if cannot lock file
 | 
			
		||||
    """
 | 
			
		||||
    mocker.patch("ahriman.application.lock.Lock.perform_lock", return_value=False)
 | 
			
		||||
    with pytest.raises(DuplicateRunError):
 | 
			
		||||
        lock._pid_file = MagicMock()
 | 
			
		||||
        lock._write(is_locked=False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_write_locked_before(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must skip lock in case if file was locked before
 | 
			
		||||
    """
 | 
			
		||||
    lock_mock = mocker.patch("ahriman.application.lock.Lock.perform_lock")
 | 
			
		||||
    lock._pid_file = MagicMock()
 | 
			
		||||
 | 
			
		||||
    lock._write(is_locked=True)
 | 
			
		||||
    lock_mock.assert_not_called()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_check_user(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must check user correctly
 | 
			
		||||
@ -88,7 +189,7 @@ def test_clear(lock: Lock) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must remove lock file
 | 
			
		||||
    """
 | 
			
		||||
    lock.path = Path(tempfile.gettempdir()) / "ahriman-test.lock"
 | 
			
		||||
    lock.path = Path("ahriman-test.pid")
 | 
			
		||||
    lock.path.touch()
 | 
			
		||||
 | 
			
		||||
    lock.clear()
 | 
			
		||||
@ -99,7 +200,7 @@ def test_clear_missing(lock: Lock) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must not fail on lock removal if file is missing
 | 
			
		||||
    """
 | 
			
		||||
    lock.path = Path(tempfile.gettempdir()) / "ahriman-test.lock"
 | 
			
		||||
    lock.path = Path("ahriman-test.pid")
 | 
			
		||||
    lock.clear()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -112,67 +213,52 @@ def test_clear_skip(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    unlink_mock.assert_not_called()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_create(lock: Lock) -> None:
 | 
			
		||||
def test_clear_close(lock: Lock) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must create lock
 | 
			
		||||
    must close pid file if opened
 | 
			
		||||
    """
 | 
			
		||||
    lock.path = Path(tempfile.gettempdir()) / "ahriman-test.lock"
 | 
			
		||||
 | 
			
		||||
    lock.create()
 | 
			
		||||
    assert lock.path.is_file()
 | 
			
		||||
    lock.path.unlink()
 | 
			
		||||
    close_mock = lock._pid_file = MagicMock()
 | 
			
		||||
    lock.clear()
 | 
			
		||||
    close_mock.close.assert_called_once_with()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_create_exception(lock: Lock) -> None:
 | 
			
		||||
def test_clear_close_exception(lock: Lock) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must raise exception if file already exists
 | 
			
		||||
    must suppress IO exception on file closure
 | 
			
		||||
    """
 | 
			
		||||
    lock.path = Path(tempfile.gettempdir()) / "ahriman-test.lock"
 | 
			
		||||
    lock.path.touch()
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(DuplicateRunError):
 | 
			
		||||
        lock.create()
 | 
			
		||||
    lock.path.unlink()
 | 
			
		||||
    close_mock = lock._pid_file = MagicMock()
 | 
			
		||||
    close_mock.close.side_effect = IOError()
 | 
			
		||||
    lock.clear()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_create_skip(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
def test_lock(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must skip creating if no file set
 | 
			
		||||
    must perform lock correctly
 | 
			
		||||
    """
 | 
			
		||||
    touch_mock = mocker.patch("pathlib.Path.touch")
 | 
			
		||||
    lock.create()
 | 
			
		||||
    touch_mock.assert_not_called()
 | 
			
		||||
    clear_mock = mocker.patch("ahriman.application.lock.Lock.clear")
 | 
			
		||||
    open_mock = mocker.patch("ahriman.application.lock.Lock._open")
 | 
			
		||||
    watch_mock = mocker.patch("ahriman.application.lock.Lock._watch", return_value=True)
 | 
			
		||||
    write_mock = mocker.patch("ahriman.application.lock.Lock._write")
 | 
			
		||||
 | 
			
		||||
    lock.lock()
 | 
			
		||||
    clear_mock.assert_not_called()
 | 
			
		||||
    open_mock.assert_called_once_with()
 | 
			
		||||
    watch_mock.assert_called_once_with()
 | 
			
		||||
    write_mock.assert_called_once_with(is_locked=True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_create_unsafe(lock: Lock) -> None:
 | 
			
		||||
def test_lock_clear(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must not raise exception if force flag set
 | 
			
		||||
    must clear lock file before lock if force flag is set
 | 
			
		||||
    """
 | 
			
		||||
    mocker.patch("ahriman.application.lock.Lock._open")
 | 
			
		||||
    mocker.patch("ahriman.application.lock.Lock._watch")
 | 
			
		||||
    mocker.patch("ahriman.application.lock.Lock._write")
 | 
			
		||||
    clear_mock = mocker.patch("ahriman.application.lock.Lock.clear")
 | 
			
		||||
    lock.force = True
 | 
			
		||||
    lock.path = Path(tempfile.gettempdir()) / "ahriman-test.lock"
 | 
			
		||||
    lock.path.touch()
 | 
			
		||||
 | 
			
		||||
    lock.create()
 | 
			
		||||
    lock.path.unlink()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_watch(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must check if lock file exists
 | 
			
		||||
    """
 | 
			
		||||
    wait_mock = mocker.patch("ahriman.models.waiter.Waiter.wait")
 | 
			
		||||
    lock.path = Path(tempfile.gettempdir()) / "ahriman-test.lock"
 | 
			
		||||
 | 
			
		||||
    lock.watch()
 | 
			
		||||
    wait_mock.assert_called_once_with(lock.path.is_file)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_watch_skip(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must skip watch on empty path
 | 
			
		||||
    """
 | 
			
		||||
    mocker.patch("pathlib.Path.is_file", return_value=True)
 | 
			
		||||
    lock.watch()
 | 
			
		||||
    lock.lock()
 | 
			
		||||
    clear_mock.assert_called_once_with()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_enter(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
@ -181,18 +267,14 @@ def test_enter(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    check_user_mock = mocker.patch("ahriman.application.lock.Lock.check_user")
 | 
			
		||||
    check_version_mock = mocker.patch("ahriman.application.lock.Lock.check_version")
 | 
			
		||||
    watch_mock = mocker.patch("ahriman.application.lock.Lock.watch")
 | 
			
		||||
    clear_mock = mocker.patch("ahriman.application.lock.Lock.clear")
 | 
			
		||||
    create_mock = mocker.patch("ahriman.application.lock.Lock.create")
 | 
			
		||||
    lock_mock = mocker.patch("ahriman.application.lock.Lock.lock")
 | 
			
		||||
    update_status_mock = mocker.patch("ahriman.core.status.Client.status_update")
 | 
			
		||||
 | 
			
		||||
    with lock:
 | 
			
		||||
        pass
 | 
			
		||||
    check_user_mock.assert_called_once_with()
 | 
			
		||||
    clear_mock.assert_called_once_with()
 | 
			
		||||
    create_mock.assert_called_once_with()
 | 
			
		||||
    check_version_mock.assert_called_once_with()
 | 
			
		||||
    watch_mock.assert_called_once_with()
 | 
			
		||||
    lock_mock.assert_called_once_with()
 | 
			
		||||
    update_status_mock.assert_has_calls([MockCall(BuildStatusEnum.Building), MockCall(BuildStatusEnum.Success)])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -202,7 +284,7 @@ def test_exit_with_exception(lock: Lock, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    mocker.patch("ahriman.application.lock.Lock.check_user")
 | 
			
		||||
    mocker.patch("ahriman.application.lock.Lock.clear")
 | 
			
		||||
    mocker.patch("ahriman.application.lock.Lock.create")
 | 
			
		||||
    mocker.patch("ahriman.application.lock.Lock.lock")
 | 
			
		||||
    update_status_mock = mocker.patch("ahriman.core.status.Client.status_update")
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(ValueError):
 | 
			
		||||
 | 
			
		||||
@ -30,7 +30,7 @@ def test_package_logger_set_reset(database: SQLite) -> None:
 | 
			
		||||
    database._package_logger_reset()
 | 
			
		||||
    record = logging.makeLogRecord({})
 | 
			
		||||
    with pytest.raises(AttributeError):
 | 
			
		||||
        record.package_id
 | 
			
		||||
        assert record.package_id
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_in_package_context(database: SQLite, package_ahriman: Package, mocker: MockerFixture) -> None:
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,36 @@
 | 
			
		||||
import pytest
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from ahriman.models.waiter import Waiter
 | 
			
		||||
from ahriman.models.waiter import Waiter, WaiterResult, WaiterTaskFinished, WaiterTimedOut
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_result_to_float() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must convert waiter result to float
 | 
			
		||||
    """
 | 
			
		||||
    assert float(WaiterResult(4.2)) == 4.2
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_result_not_implemented() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must raise NotImplementedError for abstract class
 | 
			
		||||
    """
 | 
			
		||||
    with pytest.raises(NotImplementedError):
 | 
			
		||||
        assert bool(WaiterResult(4.2))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_result_success_to_bool() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must convert success waiter result to bool
 | 
			
		||||
    """
 | 
			
		||||
    assert bool(WaiterTaskFinished(4.2))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_result_failure_to_bool() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must convert failure waiter result to bool
 | 
			
		||||
    """
 | 
			
		||||
    assert not bool(WaiterTimedOut(4.2))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_is_timed_out() -> None:
 | 
			
		||||
@ -22,8 +52,26 @@ def test_is_timed_out_infinite() -> None:
 | 
			
		||||
 | 
			
		||||
def test_wait() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must wait until file will disappear
 | 
			
		||||
    must wait for success result
 | 
			
		||||
    """
 | 
			
		||||
    results = iter([True, False])
 | 
			
		||||
    waiter = Waiter(1, interval=1)
 | 
			
		||||
    assert waiter.wait(lambda: next(results)) > 0
 | 
			
		||||
    waiter = Waiter(1, interval=0.1)
 | 
			
		||||
    assert float(waiter.wait(lambda: next(results))) > 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_wait_timeout() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return WaiterTimedOut on timeout
 | 
			
		||||
    """
 | 
			
		||||
    results = iter([True, False])
 | 
			
		||||
    waiter = Waiter(-1, interval=0.1)
 | 
			
		||||
    assert isinstance(waiter.wait(lambda: next(results)), WaiterTimedOut)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_wait_success() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return WaiterTaskFinished on success
 | 
			
		||||
    """
 | 
			
		||||
    results = iter([True, False])
 | 
			
		||||
    waiter = Waiter(1, interval=0.1)
 | 
			
		||||
    assert isinstance(waiter.wait(lambda: next(results)), WaiterTaskFinished)
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user