fix: rewrite preserver_owner method complitely

Previous implementation was somewhat working in the most) scenarios, but
was super slow to handle permissions. However, it is actually very
limited operations in which the application can do anything, so it is
much easier to just drop privileged user to normal one
This commit is contained in:
2026-02-03 15:27:19 +02:00
parent 5ac2e3de19
commit 5738b8b911
8 changed files with 67 additions and 170 deletions

View File

@@ -67,7 +67,7 @@ def test_database_copy(pacman: Pacman, mocker: MockerFixture) -> None:
pacman.database_copy(pacman.handle, database, path, use_ahriman_cache=True)
mkdir_mock.assert_called_once_with(mode=0o755, exist_ok=True)
copy_mock.assert_called_once_with(path / "sync" / "core.db", dst_path)
owner_guard_mock.assert_called_once_with(dst_path.parent)
owner_guard_mock.assert_called_once_with()
def test_database_copy_skip(pacman: Pacman, mocker: MockerFixture) -> None:

View File

@@ -436,17 +436,6 @@ def test_safe_filename() -> None:
assert safe_filename("tolua++-1.0.93-4-x86_64.pkg.tar.zst") == "tolua---1.0.93-4-x86_64.pkg.tar.zst"
def test_safe_iterdir(mocker: MockerFixture, resource_path_root: Path) -> None:
"""
must suppress PermissionError
"""
assert list(safe_iterdir(resource_path_root))
iterdir_mock = mocker.patch("pathlib.Path.iterdir", side_effect=PermissionError)
assert list(safe_iterdir(Path("root"))) == []
iterdir_mock.assert_called_once_with()
def test_srcinfo_property() -> None:
"""
must correctly extract properties

View File

@@ -3,9 +3,8 @@ import pytest
from collections.abc import Callable
from pathlib import Path
from pytest_mock import MockerFixture
from unittest.mock import MagicMock, call as MockCall
from unittest.mock import call as MockCall
from ahriman.core.exceptions import PathError
from ahriman.models.package import Package
from ahriman.models.repository_id import RepositoryId
from ahriman.models.repository_paths import RepositoryPaths
@@ -186,56 +185,6 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc
iterdir_mock.assert_not_called()
def test_chown(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly set owner for the directory
"""
mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=False))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "path"
repository_paths._chown(path)
chown_mock.assert_called_once_with(path, 42, 42, follow_symlinks=False)
def test_chown_parent(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly set owner for the directory including parents
"""
mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=False))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "parent" / "path"
repository_paths._chown(path)
chown_mock.assert_has_calls([
MockCall(path, 42, 42, follow_symlinks=False),
MockCall(path.parent, 42, 42, follow_symlinks=False)
])
def test_chown_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must skip ownership set in case if it is same as root
"""
mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=True))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "path"
repository_paths._chown(path)
chown_mock.assert_not_called()
def test_chown_invalid_path(repository_paths: RepositoryPaths) -> None:
"""
must raise invalid path exception in case if directory outside the root supplied
"""
with pytest.raises(PathError):
repository_paths._chown(repository_paths.root.parent)
def test_cache_for(repository_paths: RepositoryPaths, package_ahriman: Package) -> None:
"""
must return correct path for cache directory
@@ -249,30 +198,49 @@ def test_preserve_owner(tmp_path: Path, repository_id: RepositoryId, mocker: Moc
"""
must preserve file owner during operations
"""
mocker.patch("os.getuid", return_value=0)
mocker.patch("os.getgid", return_value=0)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths = RepositoryPaths(tmp_path, repository_id)
target_uid, target_gid = repository_paths.root_owner
repository_paths.tree_create()
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
with repository_paths.preserve_owner():
(repository_paths.root / "created1").touch()
(repository_paths.chroot / "created2").touch()
chown_mock.assert_has_calls([MockCall(repository_paths.root / "created1")])
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
def test_preserve_owner_specific(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
def test_preserve_owner_exception(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
"""
must preserve file owner during operations only in specific directory
must return to original uid and gid even during exception
"""
mocker.patch("os.getuid", return_value=0)
mocker.patch("os.getgid", return_value=0)
mocker.patch("pathlib.Path.mkdir", side_effect=Exception)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths = RepositoryPaths(tmp_path, repository_id)
repository_paths.tree_create()
(repository_paths.root / "content").mkdir()
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
target_uid, target_gid = repository_paths.root_owner
with pytest.raises(Exception):
repository_paths.tree_create()
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
with repository_paths.preserve_owner(repository_paths.root / "content"):
(repository_paths.root / "created1").touch()
(repository_paths.root / "content" / "created2").touch()
(repository_paths.chroot / "created3").touch()
chown_mock.assert_has_calls([MockCall(repository_paths.root / "content" / "created2")])
def test_preserve_owner_non_root(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
"""
must skip processing if user is not root
"""
mocker.patch("os.getuid", return_value=42)
mocker.patch("os.getgid", return_value=42)
repository_paths = RepositoryPaths(tmp_path, repository_id)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths.tree_create()
seteuid_mock.assert_not_called()
setegid_mock.assert_not_called()
def test_tree_clear(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None: