diff --git a/src/ahriman/application/lock.py b/src/ahriman/application/lock.py index c1d73c9d..22704e77 100644 --- a/src/ahriman/application/lock.py +++ b/src/ahriman/application/lock.py @@ -158,7 +158,7 @@ class Lock(LazyLogging): """ check if current user is actually owner of ahriman root """ - check_user(self.paths, unsafe=self.unsafe) + check_user(self.paths.root, unsafe=self.unsafe) self.paths.tree_create() def check_version(self) -> None: diff --git a/src/ahriman/core/utils.py b/src/ahriman/core/utils.py index 85b43a3f..fc282b5a 100644 --- a/src/ahriman/core/utils.py +++ b/src/ahriman/core/utils.py @@ -35,7 +35,6 @@ from pwd import getpwuid from typing import Any, IO, TypeVar from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError -from ahriman.models.repository_paths import RepositoryPaths __all__ = [ @@ -47,12 +46,14 @@ __all__ = [ "filter_json", "full_version", "minmax", + "owner", "package_like", "parse_version", "partition", "pretty_datetime", "pretty_size", "safe_filename", + "safe_iterdir", "srcinfo_property", "srcinfo_property_list", "trim_package", @@ -169,12 +170,13 @@ def check_output(*args: str, exception: Exception | Callable[[int, list[str], st return stdout -def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None: +def check_user(root: Path, *, unsafe: bool) -> None: """ check if current user is the owner of the root Args: - paths(RepositoryPaths): repository paths object + root(Path): path to root directory (e.g. repository root + :attr:`ahriman.models.repository_paths.RepositoryPaths.root`) unsafe(bool): if set no user check will be performed before path creation Raises: @@ -183,14 +185,16 @@ def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None: Examples: Simply run function with arguments:: - >>> check_user(paths, unsafe=False) + >>> check_user(root, unsafe=False) """ - if not paths.root.exists(): + if not root.exists(): return # no directory found, skip check if unsafe: return # unsafe flag is enabled, no check performed + current_uid = os.getuid() - root_uid, _ = paths.root_owner + root_uid, _ = owner(root) + if current_uid != root_uid: raise UnsafeRunError(current_uid, root_uid) @@ -288,6 +292,20 @@ def minmax(source: Iterable[T], *, key: Callable[[T], Any] | None = None) -> tup return min(first_iter, key=key), max(second_iter, key=key) # type: ignore +def owner(path: Path) -> tuple[int, int]: + """ + retrieve owner information by path + + Args: + path(Path): path for which extract ids + + Returns: + tuple[int, int]: owner user and group ids of the directory + """ + stat = path.stat() + return stat.st_uid, stat.st_gid + + def package_like(filename: Path) -> bool: """ check if file looks like package @@ -408,6 +426,22 @@ def safe_filename(source: str) -> str: return re.sub(r"[^A-Za-z\d\-._~:\[\]@]", "-", source) +def safe_iterdir(path: Path) -> Iterator[Path]: + """ + wrapper around :func:`pathlib.Path.iterdir`, which suppresses :exc:`PermissionError` + + Args: + path(Path): path to iterate + + Yields: + Path: content of the directory + """ + try: + yield from path.iterdir() + except PermissionError: + pass + + def srcinfo_property(key: str, srcinfo: Mapping[str, Any], package_srcinfo: Mapping[str, Any], *, default: Any = None) -> Any: """ diff --git a/src/ahriman/models/repository_paths.py b/src/ahriman/models/repository_paths.py index 57a75a92..a81ade70 100644 --- a/src/ahriman/models/repository_paths.py +++ b/src/ahriman/models/repository_paths.py @@ -29,6 +29,7 @@ from pwd import getpwuid from ahriman.core.exceptions import PathError from ahriman.core.log import LazyLogging +from ahriman.core.utils import owner, safe_iterdir from ahriman.models.repository_id import RepositoryId @@ -93,7 +94,7 @@ class RepositoryPaths(LazyLogging): Returns: Path: path to directory in which build process is run """ - uid, _ = self.owner(self.root) + uid, _ = owner(self.root) return self.chroot / f"{self.repository_id.name}-{self.repository_id.architecture}" / getpwuid(uid).pw_name @property @@ -155,7 +156,7 @@ class RepositoryPaths(LazyLogging): Returns: tuple[int, int]: owner user and group of the root directory """ - return self.owner(self.root) + return owner(self.root) # pylint: disable=protected-access @classmethod @@ -208,20 +209,6 @@ class RepositoryPaths(LazyLogging): return set(walk(instance)) - @staticmethod - def owner(path: Path) -> tuple[int, int]: - """ - retrieve owner information by path - - Args: - path(Path): path for which extract ids - - Returns: - tuple[int, int]: owner user and group ids of the directory - """ - stat = path.stat() - return stat.st_uid, stat.st_gid - def _chown(self, path: Path) -> None: """ set owner of path recursively (from root) to root owner @@ -237,7 +224,7 @@ class RepositoryPaths(LazyLogging): PathError: if path does not belong to root """ def set_owner(current: Path) -> None: - uid, gid = self.owner(current) + uid, gid = owner(current) if uid == root_uid and gid == root_gid: return os.chown(current, root_uid, root_gid, follow_symlinks=False) @@ -283,10 +270,10 @@ class RepositoryPaths(LazyLogging): def walk(root: Path) -> Iterator[Path]: # basically walk, but skipping some content - for child in root.iterdir(): + for child in safe_iterdir(root): yield child if child in (self.chroot.parent,): - yield from child.iterdir() # we only yield top-level in chroot directory + yield from safe_iterdir(child) # we only yield top-level in chroot directory elif child.is_dir(): yield from walk(child) diff --git a/tests/ahriman/application/test_lock.py b/tests/ahriman/application/test_lock.py index 910492e9..c416e510 100644 --- a/tests/ahriman/application/test_lock.py +++ b/tests/ahriman/application/test_lock.py @@ -142,7 +142,7 @@ def test_check_user(lock: Lock, mocker: MockerFixture) -> None: tree_create = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.tree_create") lock.check_user() - check_user_patch.assert_called_once_with(lock.paths, unsafe=False) + check_user_patch.assert_called_once_with(lock.paths.root, unsafe=False) tree_create.assert_called_once_with() diff --git a/tests/ahriman/core/test_utils.py b/tests/ahriman/core/test_utils.py index deafc04e..2f144b52 100644 --- a/tests/ahriman/core/test_utils.py +++ b/tests/ahriman/core/test_utils.py @@ -6,12 +6,10 @@ import pytest from pathlib import Path from pytest_mock import MockerFixture from typing import Any -from unittest.mock import call as MockCall +from unittest.mock import MagicMock, call as MockCall from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError -from ahriman.core.utils import check_output, check_user, dataclass_view, enum_values, extract_user, filter_json, \ - full_version, minmax, package_like, parse_version, partition, pretty_datetime, pretty_size, safe_filename, \ - srcinfo_property, srcinfo_property_list, trim_package, utcnow, walk +from ahriman.core.utils import * from ahriman.models.package import Package from ahriman.models.package_source import PackageSource from ahriman.models.repository_id import RepositoryId @@ -163,7 +161,7 @@ def test_check_user(repository_id: RepositoryId, mocker: MockerFixture) -> None: """ paths = RepositoryPaths(Path.cwd(), repository_id) mocker.patch("os.getuid", return_value=paths.root_owner[0]) - check_user(paths, unsafe=False) + check_user(paths.root, unsafe=False) def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None: @@ -171,7 +169,7 @@ def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: Mock must not fail in case if no directory found """ mocker.patch("pathlib.Path.exists", return_value=False) - check_user(repository_paths, unsafe=False) + check_user(repository_paths.root, unsafe=False) def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture) -> None: @@ -182,7 +180,7 @@ def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1) with pytest.raises(UnsafeRunError): - check_user(paths, unsafe=False) + check_user(paths.root, unsafe=False) def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -> None: @@ -191,7 +189,7 @@ def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) - """ paths = RepositoryPaths(Path.cwd(), repository_id) mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1) - check_user(paths, unsafe=True) + check_user(paths.root, unsafe=True) def test_dataclass_view(package_ahriman: Package) -> None: @@ -275,6 +273,18 @@ def test_minmax() -> None: assert minmax([[1, 2, 3], [4, 5], [6, 7, 8, 9]], key=len) == ([4, 5], [6, 7, 8, 9]) +def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None: + """ + must correctly retrieve owner of the path + """ + stat_mock = MagicMock() + stat_mock.st_uid = 42 + stat_mock.st_gid = 142 + mocker.patch("pathlib.Path.stat", return_value=stat_mock) + + assert owner(repository_paths.root) == (42, 142) + + def test_package_like(package_ahriman: Package) -> None: """ package_like must return true for archives @@ -414,6 +424,17 @@ def test_safe_filename() -> None: assert safe_filename("tolua++-1.0.93-4-x86_64.pkg.tar.zst") == "tolua---1.0.93-4-x86_64.pkg.tar.zst" +def test_safe_iterdir(mocker: MockerFixture, resource_path_root: Path) -> None: + """ + must suppress PermissionError + """ + assert list(safe_iterdir(resource_path_root)) + + iterdir_mock = mocker.patch("pathlib.Path.iterdir", side_effect=PermissionError) + assert list(safe_iterdir(Path("root"))) == [] + iterdir_mock.assert_called_once_with() + + def test_srcinfo_property() -> None: """ must correctly extract properties diff --git a/tests/ahriman/models/test_repository_paths.py b/tests/ahriman/models/test_repository_paths.py index 4100d720..c9bb7ac4 100644 --- a/tests/ahriman/models/test_repository_paths.py +++ b/tests/ahriman/models/test_repository_paths.py @@ -55,7 +55,7 @@ def test_root_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> """ must correctly define root directory owner """ - mocker.patch("ahriman.models.repository_paths.RepositoryPaths.owner", return_value=(42, 142)) + mocker.patch("ahriman.models.repository_paths.owner", return_value=(42, 142)) assert repository_paths.root_owner == (42, 142) @@ -186,23 +186,11 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc iterdir_mock.assert_not_called() -def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None: - """ - must correctly retrieve owner of the path - """ - stat_mock = MagicMock() - stat_mock.st_uid = 42 - stat_mock.st_gid = 142 - mocker.patch("pathlib.Path.stat", return_value=stat_mock) - - assert RepositoryPaths.owner(repository_paths.root) == (42, 142) - - def test_chown(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None: """ must correctly set owner for the directory """ - object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False)) + mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=False)) mocker.patch.object(RepositoryPaths, "root_owner", (42, 42)) chown_mock = mocker.patch("os.chown") @@ -215,7 +203,7 @@ def test_chown_parent(repository_paths: RepositoryPaths, mocker: MockerFixture) """ must correctly set owner for the directory including parents """ - object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False)) + mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=False)) mocker.patch.object(RepositoryPaths, "root_owner", (42, 42)) chown_mock = mocker.patch("os.chown") @@ -231,7 +219,7 @@ def test_chown_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) -> """ must skip ownership set in case if it is same as root """ - object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=True)) + mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=True)) mocker.patch.object(RepositoryPaths, "root_owner", (42, 42)) chown_mock = mocker.patch("os.chown")