mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-02-03 03:49:48 +00:00
fix: handle permissionerror during walking over tree
Previously it tried to look into 700 directories (e.g. .gnupg) which breaks running as non-ahriman user
This commit is contained in:
@@ -158,7 +158,7 @@ class Lock(LazyLogging):
|
||||
"""
|
||||
check if current user is actually owner of ahriman root
|
||||
"""
|
||||
check_user(self.paths, unsafe=self.unsafe)
|
||||
check_user(self.paths.root, unsafe=self.unsafe)
|
||||
self.paths.tree_create()
|
||||
|
||||
def check_version(self) -> None:
|
||||
|
||||
@@ -35,7 +35,6 @@ from pwd import getpwuid
|
||||
from typing import Any, IO, TypeVar
|
||||
|
||||
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
|
||||
__all__ = [
|
||||
@@ -47,12 +46,14 @@ __all__ = [
|
||||
"filter_json",
|
||||
"full_version",
|
||||
"minmax",
|
||||
"owner",
|
||||
"package_like",
|
||||
"parse_version",
|
||||
"partition",
|
||||
"pretty_datetime",
|
||||
"pretty_size",
|
||||
"safe_filename",
|
||||
"safe_iterdir",
|
||||
"srcinfo_property",
|
||||
"srcinfo_property_list",
|
||||
"trim_package",
|
||||
@@ -169,12 +170,13 @@ def check_output(*args: str, exception: Exception | Callable[[int, list[str], st
|
||||
return stdout
|
||||
|
||||
|
||||
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
|
||||
def check_user(root: Path, *, unsafe: bool) -> None:
|
||||
"""
|
||||
check if current user is the owner of the root
|
||||
|
||||
Args:
|
||||
paths(RepositoryPaths): repository paths object
|
||||
root(Path): path to root directory (e.g. repository root
|
||||
:attr:`ahriman.models.repository_paths.RepositoryPaths.root`)
|
||||
unsafe(bool): if set no user check will be performed before path creation
|
||||
|
||||
Raises:
|
||||
@@ -183,14 +185,16 @@ def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
|
||||
Examples:
|
||||
Simply run function with arguments::
|
||||
|
||||
>>> check_user(paths, unsafe=False)
|
||||
>>> check_user(root, unsafe=False)
|
||||
"""
|
||||
if not paths.root.exists():
|
||||
if not root.exists():
|
||||
return # no directory found, skip check
|
||||
if unsafe:
|
||||
return # unsafe flag is enabled, no check performed
|
||||
|
||||
current_uid = os.getuid()
|
||||
root_uid, _ = paths.root_owner
|
||||
root_uid, _ = owner(root)
|
||||
|
||||
if current_uid != root_uid:
|
||||
raise UnsafeRunError(current_uid, root_uid)
|
||||
|
||||
@@ -288,6 +292,20 @@ def minmax(source: Iterable[T], *, key: Callable[[T], Any] | None = None) -> tup
|
||||
return min(first_iter, key=key), max(second_iter, key=key) # type: ignore
|
||||
|
||||
|
||||
def owner(path: Path) -> tuple[int, int]:
|
||||
"""
|
||||
retrieve owner information by path
|
||||
|
||||
Args:
|
||||
path(Path): path for which extract ids
|
||||
|
||||
Returns:
|
||||
tuple[int, int]: owner user and group ids of the directory
|
||||
"""
|
||||
stat = path.stat()
|
||||
return stat.st_uid, stat.st_gid
|
||||
|
||||
|
||||
def package_like(filename: Path) -> bool:
|
||||
"""
|
||||
check if file looks like package
|
||||
@@ -408,6 +426,22 @@ def safe_filename(source: str) -> str:
|
||||
return re.sub(r"[^A-Za-z\d\-._~:\[\]@]", "-", source)
|
||||
|
||||
|
||||
def safe_iterdir(path: Path) -> Iterator[Path]:
|
||||
"""
|
||||
wrapper around :func:`pathlib.Path.iterdir`, which suppresses :exc:`PermissionError`
|
||||
|
||||
Args:
|
||||
path(Path): path to iterate
|
||||
|
||||
Yields:
|
||||
Path: content of the directory
|
||||
"""
|
||||
try:
|
||||
yield from path.iterdir()
|
||||
except PermissionError:
|
||||
pass
|
||||
|
||||
|
||||
def srcinfo_property(key: str, srcinfo: Mapping[str, Any], package_srcinfo: Mapping[str, Any], *,
|
||||
default: Any = None) -> Any:
|
||||
"""
|
||||
|
||||
@@ -29,6 +29,7 @@ from pwd import getpwuid
|
||||
|
||||
from ahriman.core.exceptions import PathError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.utils import owner, safe_iterdir
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
|
||||
|
||||
@@ -93,7 +94,7 @@ class RepositoryPaths(LazyLogging):
|
||||
Returns:
|
||||
Path: path to directory in which build process is run
|
||||
"""
|
||||
uid, _ = self.owner(self.root)
|
||||
uid, _ = owner(self.root)
|
||||
return self.chroot / f"{self.repository_id.name}-{self.repository_id.architecture}" / getpwuid(uid).pw_name
|
||||
|
||||
@property
|
||||
@@ -155,7 +156,7 @@ class RepositoryPaths(LazyLogging):
|
||||
Returns:
|
||||
tuple[int, int]: owner user and group of the root directory
|
||||
"""
|
||||
return self.owner(self.root)
|
||||
return owner(self.root)
|
||||
|
||||
# pylint: disable=protected-access
|
||||
@classmethod
|
||||
@@ -208,20 +209,6 @@ class RepositoryPaths(LazyLogging):
|
||||
|
||||
return set(walk(instance))
|
||||
|
||||
@staticmethod
|
||||
def owner(path: Path) -> tuple[int, int]:
|
||||
"""
|
||||
retrieve owner information by path
|
||||
|
||||
Args:
|
||||
path(Path): path for which extract ids
|
||||
|
||||
Returns:
|
||||
tuple[int, int]: owner user and group ids of the directory
|
||||
"""
|
||||
stat = path.stat()
|
||||
return stat.st_uid, stat.st_gid
|
||||
|
||||
def _chown(self, path: Path) -> None:
|
||||
"""
|
||||
set owner of path recursively (from root) to root owner
|
||||
@@ -237,7 +224,7 @@ class RepositoryPaths(LazyLogging):
|
||||
PathError: if path does not belong to root
|
||||
"""
|
||||
def set_owner(current: Path) -> None:
|
||||
uid, gid = self.owner(current)
|
||||
uid, gid = owner(current)
|
||||
if uid == root_uid and gid == root_gid:
|
||||
return
|
||||
os.chown(current, root_uid, root_gid, follow_symlinks=False)
|
||||
@@ -283,10 +270,10 @@ class RepositoryPaths(LazyLogging):
|
||||
|
||||
def walk(root: Path) -> Iterator[Path]:
|
||||
# basically walk, but skipping some content
|
||||
for child in root.iterdir():
|
||||
for child in safe_iterdir(root):
|
||||
yield child
|
||||
if child in (self.chroot.parent,):
|
||||
yield from child.iterdir() # we only yield top-level in chroot directory
|
||||
yield from safe_iterdir(child) # we only yield top-level in chroot directory
|
||||
elif child.is_dir():
|
||||
yield from walk(child)
|
||||
|
||||
|
||||
@@ -142,7 +142,7 @@ def test_check_user(lock: Lock, mocker: MockerFixture) -> None:
|
||||
tree_create = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.tree_create")
|
||||
|
||||
lock.check_user()
|
||||
check_user_patch.assert_called_once_with(lock.paths, unsafe=False)
|
||||
check_user_patch.assert_called_once_with(lock.paths.root, unsafe=False)
|
||||
tree_create.assert_called_once_with()
|
||||
|
||||
|
||||
|
||||
@@ -6,12 +6,10 @@ import pytest
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any
|
||||
from unittest.mock import call as MockCall
|
||||
from unittest.mock import MagicMock, call as MockCall
|
||||
|
||||
from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError
|
||||
from ahriman.core.utils import check_output, check_user, dataclass_view, enum_values, extract_user, filter_json, \
|
||||
full_version, minmax, package_like, parse_version, partition, pretty_datetime, pretty_size, safe_filename, \
|
||||
srcinfo_property, srcinfo_property_list, trim_package, utcnow, walk
|
||||
from ahriman.core.utils import *
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.package_source import PackageSource
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
@@ -163,7 +161,7 @@ def test_check_user(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||
mocker.patch("os.getuid", return_value=paths.root_owner[0])
|
||||
check_user(paths, unsafe=False)
|
||||
check_user(paths.root, unsafe=False)
|
||||
|
||||
|
||||
def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
@@ -171,7 +169,7 @@ def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: Mock
|
||||
must not fail in case if no directory found
|
||||
"""
|
||||
mocker.patch("pathlib.Path.exists", return_value=False)
|
||||
check_user(repository_paths, unsafe=False)
|
||||
check_user(repository_paths.root, unsafe=False)
|
||||
|
||||
|
||||
def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
@@ -182,7 +180,7 @@ def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture
|
||||
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
|
||||
|
||||
with pytest.raises(UnsafeRunError):
|
||||
check_user(paths, unsafe=False)
|
||||
check_user(paths.root, unsafe=False)
|
||||
|
||||
|
||||
def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
@@ -191,7 +189,7 @@ def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -
|
||||
"""
|
||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
|
||||
check_user(paths, unsafe=True)
|
||||
check_user(paths.root, unsafe=True)
|
||||
|
||||
|
||||
def test_dataclass_view(package_ahriman: Package) -> None:
|
||||
@@ -275,6 +273,18 @@ def test_minmax() -> None:
|
||||
assert minmax([[1, 2, 3], [4, 5], [6, 7, 8, 9]], key=len) == ([4, 5], [6, 7, 8, 9])
|
||||
|
||||
|
||||
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly retrieve owner of the path
|
||||
"""
|
||||
stat_mock = MagicMock()
|
||||
stat_mock.st_uid = 42
|
||||
stat_mock.st_gid = 142
|
||||
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
|
||||
|
||||
assert owner(repository_paths.root) == (42, 142)
|
||||
|
||||
|
||||
def test_package_like(package_ahriman: Package) -> None:
|
||||
"""
|
||||
package_like must return true for archives
|
||||
@@ -414,6 +424,17 @@ def test_safe_filename() -> None:
|
||||
assert safe_filename("tolua++-1.0.93-4-x86_64.pkg.tar.zst") == "tolua---1.0.93-4-x86_64.pkg.tar.zst"
|
||||
|
||||
|
||||
def test_safe_iterdir(mocker: MockerFixture, resource_path_root: Path) -> None:
|
||||
"""
|
||||
must suppress PermissionError
|
||||
"""
|
||||
assert list(safe_iterdir(resource_path_root))
|
||||
|
||||
iterdir_mock = mocker.patch("pathlib.Path.iterdir", side_effect=PermissionError)
|
||||
assert list(safe_iterdir(Path("root"))) == []
|
||||
iterdir_mock.assert_called_once_with()
|
||||
|
||||
|
||||
def test_srcinfo_property() -> None:
|
||||
"""
|
||||
must correctly extract properties
|
||||
|
||||
@@ -55,7 +55,7 @@ def test_root_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) ->
|
||||
"""
|
||||
must correctly define root directory owner
|
||||
"""
|
||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.owner", return_value=(42, 142))
|
||||
mocker.patch("ahriman.models.repository_paths.owner", return_value=(42, 142))
|
||||
assert repository_paths.root_owner == (42, 142)
|
||||
|
||||
|
||||
@@ -186,23 +186,11 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc
|
||||
iterdir_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly retrieve owner of the path
|
||||
"""
|
||||
stat_mock = MagicMock()
|
||||
stat_mock.st_uid = 42
|
||||
stat_mock.st_gid = 142
|
||||
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
|
||||
|
||||
assert RepositoryPaths.owner(repository_paths.root) == (42, 142)
|
||||
|
||||
|
||||
def test_chown(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly set owner for the directory
|
||||
"""
|
||||
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
|
||||
mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=False))
|
||||
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||
chown_mock = mocker.patch("os.chown")
|
||||
|
||||
@@ -215,7 +203,7 @@ def test_chown_parent(repository_paths: RepositoryPaths, mocker: MockerFixture)
|
||||
"""
|
||||
must correctly set owner for the directory including parents
|
||||
"""
|
||||
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
|
||||
mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=False))
|
||||
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||
chown_mock = mocker.patch("os.chown")
|
||||
|
||||
@@ -231,7 +219,7 @@ def test_chown_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) ->
|
||||
"""
|
||||
must skip ownership set in case if it is same as root
|
||||
"""
|
||||
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=True))
|
||||
mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=True))
|
||||
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||
chown_mock = mocker.patch("os.chown")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user