mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-04-08 03:13:37 +00:00
Compare commits
2 Commits
5ac2e3de19
...
389bad6725
| Author | SHA1 | Date | |
|---|---|---|---|
| 389bad6725 | |||
| 5738b8b911 |
@@ -72,14 +72,16 @@ class Setup(Handler):
|
||||
|
||||
application = Application(repository_id, configuration, report=report)
|
||||
|
||||
with application.repository.paths.preserve_owner():
|
||||
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
|
||||
Setup.executable_create(application.repository.paths, repository_id)
|
||||
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
|
||||
Setup.configuration_create_devtools(
|
||||
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
|
||||
Setup.configuration_create_sudo(application.repository.paths, repository_id)
|
||||
# basically we create configuration here as root, but it is ok, because those files are only used for reading
|
||||
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
|
||||
Setup.executable_create(application.repository.paths, repository_id)
|
||||
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
|
||||
Setup.configuration_create_devtools(
|
||||
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
|
||||
Setup.configuration_create_sudo(application.repository.paths, repository_id)
|
||||
|
||||
# finish initialization
|
||||
with application.repository.paths.preserve_owner():
|
||||
application.repository.repo.init()
|
||||
# lazy database sync
|
||||
application.repository.pacman.handle # pylint: disable=pointless-statement
|
||||
|
||||
@@ -130,7 +130,7 @@ class Pacman(LazyLogging):
|
||||
return # database for some reason deos not exist
|
||||
|
||||
self.logger.info("copy pacman database %s from operating system root to ahriman's home %s", src, dst)
|
||||
with self.repository_paths.preserve_owner(dst.parent):
|
||||
with self.repository_paths.preserve_owner():
|
||||
shutil.copy(src, dst)
|
||||
|
||||
def database_init(self, handle: Handle, repository: str, architecture: str) -> DB:
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
import subprocess
|
||||
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any, Self
|
||||
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
@@ -229,20 +228,6 @@ class PkgbuildParserError(ValueError):
|
||||
ValueError.__init__(self, message)
|
||||
|
||||
|
||||
class PathError(ValueError):
|
||||
"""
|
||||
exception which will be raised on path which is not belong to root directory
|
||||
"""
|
||||
|
||||
def __init__(self, path: Path, root: Path) -> None:
|
||||
"""
|
||||
Args:
|
||||
path(Path): path which raised an exception
|
||||
root(Path): repository root (i.e. ahriman home)
|
||||
"""
|
||||
ValueError.__init__(self, f"Path `{path}` does not belong to repository root `{root}`")
|
||||
|
||||
|
||||
class PasswordError(ValueError):
|
||||
"""
|
||||
exception which will be raised in case of password related errors
|
||||
|
||||
@@ -54,7 +54,6 @@ __all__ = [
|
||||
"pretty_interval",
|
||||
"pretty_size",
|
||||
"safe_filename",
|
||||
"safe_iterdir",
|
||||
"srcinfo_property",
|
||||
"srcinfo_property_list",
|
||||
"trim_package",
|
||||
@@ -193,7 +192,7 @@ def check_user(root: Path, *, unsafe: bool) -> None:
|
||||
if unsafe:
|
||||
return # unsafe flag is enabled, no check performed
|
||||
|
||||
current_uid = os.getuid()
|
||||
current_uid = os.geteuid()
|
||||
root_uid, _ = owner(root)
|
||||
|
||||
if current_uid != root_uid:
|
||||
@@ -449,22 +448,6 @@ def safe_filename(source: str) -> str:
|
||||
return re.sub(r"[^A-Za-z\d\-._~:\[\]@]", "-", source)
|
||||
|
||||
|
||||
def safe_iterdir(path: Path) -> Iterator[Path]:
|
||||
"""
|
||||
wrapper around :func:`pathlib.Path.iterdir`, which suppresses :exc:`PermissionError`
|
||||
|
||||
Args:
|
||||
path(Path): path to iterate
|
||||
|
||||
Yields:
|
||||
Path: content of the directory
|
||||
"""
|
||||
try:
|
||||
yield from path.iterdir()
|
||||
except PermissionError:
|
||||
pass
|
||||
|
||||
|
||||
def srcinfo_property(key: str, srcinfo: Mapping[str, Any], package_srcinfo: Mapping[str, Any], *,
|
||||
default: Any = None) -> Any:
|
||||
"""
|
||||
|
||||
@@ -27,9 +27,8 @@ from functools import cached_property
|
||||
from pathlib import Path
|
||||
from pwd import getpwuid
|
||||
|
||||
from ahriman.core.exceptions import PathError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.utils import owner, safe_iterdir
|
||||
from ahriman.core.utils import owner
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
|
||||
|
||||
@@ -209,33 +208,6 @@ class RepositoryPaths(LazyLogging):
|
||||
|
||||
return set(walk(instance))
|
||||
|
||||
def _chown(self, path: Path) -> None:
|
||||
"""
|
||||
set owner of path recursively (from root) to root owner
|
||||
|
||||
Notes:
|
||||
More likely you don't want to call this method explicitly, consider using :func:`preserve_owner`
|
||||
as context manager instead
|
||||
|
||||
Args:
|
||||
path(Path): path to be chown
|
||||
|
||||
Raises:
|
||||
PathError: if path does not belong to root
|
||||
"""
|
||||
def set_owner(current: Path) -> None:
|
||||
uid, gid = owner(current)
|
||||
if uid == root_uid and gid == root_gid:
|
||||
return
|
||||
os.chown(current, root_uid, root_gid, follow_symlinks=False)
|
||||
|
||||
if self.root not in path.parents:
|
||||
raise PathError(path, self.root)
|
||||
root_uid, root_gid = self.root_owner
|
||||
while path != self.root:
|
||||
set_owner(path)
|
||||
path = path.parent
|
||||
|
||||
def cache_for(self, package_base: str) -> Path:
|
||||
"""
|
||||
get path to cached PKGBUILD and package sources for the package base
|
||||
@@ -249,13 +221,10 @@ class RepositoryPaths(LazyLogging):
|
||||
return self.cache / package_base
|
||||
|
||||
@contextlib.contextmanager
|
||||
def preserve_owner(self, path: Path | None = None) -> Iterator[None]:
|
||||
def preserve_owner(self) -> Iterator[None]:
|
||||
"""
|
||||
perform any action preserving owner for any newly created file or directory
|
||||
|
||||
Args:
|
||||
path(Path | None, optional): use this path as root instead of repository root (Default value = None)
|
||||
|
||||
Examples:
|
||||
This method is designed to use as context manager when you are going to perform operations which might
|
||||
change filesystem, especially if you are doing it under unsafe flag, e.g.::
|
||||
@@ -266,25 +235,26 @@ class RepositoryPaths(LazyLogging):
|
||||
Note, however, that this method doesn't handle any exceptions and will eventually interrupt
|
||||
if there will be any.
|
||||
"""
|
||||
path = path or self.root
|
||||
# guard non-root
|
||||
# the reason we do this is that it only works if permissions can be actually changed. Hence,
|
||||
# non-privileged user (e.g. personal user or ahriman user) can't change permissions.
|
||||
# The only one who can do so is root, so if user is not root we just terminate function
|
||||
current_uid, current_gid = os.geteuid(), os.getegid()
|
||||
if current_uid != 0:
|
||||
yield
|
||||
return
|
||||
|
||||
def walk(root: Path) -> Iterator[Path]:
|
||||
# basically walk, but skipping some content
|
||||
for child in safe_iterdir(root):
|
||||
yield child
|
||||
if child in (self.chroot.parent,):
|
||||
yield from safe_iterdir(child) # we only yield top-level in chroot directory
|
||||
elif child.is_dir():
|
||||
yield from walk(child)
|
||||
# set uid and gid to root owner
|
||||
target_uid, target_gid = self.root_owner
|
||||
os.setegid(target_gid)
|
||||
os.seteuid(target_uid)
|
||||
|
||||
# get current filesystem and run action
|
||||
previous_snapshot = set(walk(path))
|
||||
yield
|
||||
|
||||
# get newly created files and directories and chown them
|
||||
new_entries = set(walk(path)).difference(previous_snapshot)
|
||||
for entry in new_entries:
|
||||
self._chown(entry)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# reset uid and gid
|
||||
os.seteuid(current_uid)
|
||||
os.setegid(current_gid)
|
||||
|
||||
def tree_clear(self, package_base: str) -> None:
|
||||
"""
|
||||
|
||||
@@ -67,7 +67,7 @@ def test_database_copy(pacman: Pacman, mocker: MockerFixture) -> None:
|
||||
pacman.database_copy(pacman.handle, database, path, use_ahriman_cache=True)
|
||||
mkdir_mock.assert_called_once_with(mode=0o755, exist_ok=True)
|
||||
copy_mock.assert_called_once_with(path / "sync" / "core.db", dst_path)
|
||||
owner_guard_mock.assert_called_once_with(dst_path.parent)
|
||||
owner_guard_mock.assert_called_once_with()
|
||||
|
||||
|
||||
def test_database_copy_skip(pacman: Pacman, mocker: MockerFixture) -> None:
|
||||
|
||||
@@ -160,7 +160,7 @@ def test_check_user(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
must check user correctly
|
||||
"""
|
||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||
mocker.patch("os.getuid", return_value=paths.root_owner[0])
|
||||
mocker.patch("os.geteuid", return_value=paths.root_owner[0])
|
||||
check_user(paths.root, unsafe=False)
|
||||
|
||||
|
||||
@@ -177,7 +177,7 @@ def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture
|
||||
must raise exception if user differs
|
||||
"""
|
||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
|
||||
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1)
|
||||
|
||||
with pytest.raises(UnsafeRunError):
|
||||
check_user(paths.root, unsafe=False)
|
||||
@@ -188,7 +188,7 @@ def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -
|
||||
must skip check if unsafe flag is set
|
||||
"""
|
||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
|
||||
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1)
|
||||
check_user(paths.root, unsafe=True)
|
||||
|
||||
|
||||
@@ -436,17 +436,6 @@ def test_safe_filename() -> None:
|
||||
assert safe_filename("tolua++-1.0.93-4-x86_64.pkg.tar.zst") == "tolua---1.0.93-4-x86_64.pkg.tar.zst"
|
||||
|
||||
|
||||
def test_safe_iterdir(mocker: MockerFixture, resource_path_root: Path) -> None:
|
||||
"""
|
||||
must suppress PermissionError
|
||||
"""
|
||||
assert list(safe_iterdir(resource_path_root))
|
||||
|
||||
iterdir_mock = mocker.patch("pathlib.Path.iterdir", side_effect=PermissionError)
|
||||
assert list(safe_iterdir(Path("root"))) == []
|
||||
iterdir_mock.assert_called_once_with()
|
||||
|
||||
|
||||
def test_srcinfo_property() -> None:
|
||||
"""
|
||||
must correctly extract properties
|
||||
|
||||
@@ -3,9 +3,8 @@ import pytest
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from unittest.mock import MagicMock, call as MockCall
|
||||
from unittest.mock import call as MockCall
|
||||
|
||||
from ahriman.core.exceptions import PathError
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
@@ -186,56 +185,6 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc
|
||||
iterdir_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_chown(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly set owner for the directory
|
||||
"""
|
||||
mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=False))
|
||||
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||
chown_mock = mocker.patch("os.chown")
|
||||
|
||||
path = repository_paths.root / "path"
|
||||
repository_paths._chown(path)
|
||||
chown_mock.assert_called_once_with(path, 42, 42, follow_symlinks=False)
|
||||
|
||||
|
||||
def test_chown_parent(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly set owner for the directory including parents
|
||||
"""
|
||||
mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=False))
|
||||
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||
chown_mock = mocker.patch("os.chown")
|
||||
|
||||
path = repository_paths.root / "parent" / "path"
|
||||
repository_paths._chown(path)
|
||||
chown_mock.assert_has_calls([
|
||||
MockCall(path, 42, 42, follow_symlinks=False),
|
||||
MockCall(path.parent, 42, 42, follow_symlinks=False)
|
||||
])
|
||||
|
||||
|
||||
def test_chown_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must skip ownership set in case if it is same as root
|
||||
"""
|
||||
mocker.patch("ahriman.models.repository_paths.owner", _get_owner(repository_paths.root, same=True))
|
||||
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||
chown_mock = mocker.patch("os.chown")
|
||||
|
||||
path = repository_paths.root / "path"
|
||||
repository_paths._chown(path)
|
||||
chown_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_chown_invalid_path(repository_paths: RepositoryPaths) -> None:
|
||||
"""
|
||||
must raise invalid path exception in case if directory outside the root supplied
|
||||
"""
|
||||
with pytest.raises(PathError):
|
||||
repository_paths._chown(repository_paths.root.parent)
|
||||
|
||||
|
||||
def test_cache_for(repository_paths: RepositoryPaths, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must return correct path for cache directory
|
||||
@@ -249,30 +198,49 @@ def test_preserve_owner(tmp_path: Path, repository_id: RepositoryId, mocker: Moc
|
||||
"""
|
||||
must preserve file owner during operations
|
||||
"""
|
||||
mocker.patch("os.geteuid", return_value=0)
|
||||
mocker.patch("os.getegid", return_value=0)
|
||||
seteuid_mock = mocker.patch("os.seteuid")
|
||||
setegid_mock = mocker.patch("os.setegid")
|
||||
|
||||
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
||||
target_uid, target_gid = repository_paths.root_owner
|
||||
repository_paths.tree_create()
|
||||
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
|
||||
|
||||
with repository_paths.preserve_owner():
|
||||
(repository_paths.root / "created1").touch()
|
||||
(repository_paths.chroot / "created2").touch()
|
||||
chown_mock.assert_has_calls([MockCall(repository_paths.root / "created1")])
|
||||
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
|
||||
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
|
||||
|
||||
|
||||
def test_preserve_owner_specific(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
def test_preserve_owner_exception(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must preserve file owner during operations only in specific directory
|
||||
must return to original uid and gid even during exception
|
||||
"""
|
||||
mocker.patch("os.geteuid", return_value=0)
|
||||
mocker.patch("os.getegid", return_value=0)
|
||||
mocker.patch("pathlib.Path.mkdir", side_effect=Exception)
|
||||
seteuid_mock = mocker.patch("os.seteuid")
|
||||
setegid_mock = mocker.patch("os.setegid")
|
||||
|
||||
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
||||
repository_paths.tree_create()
|
||||
(repository_paths.root / "content").mkdir()
|
||||
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
|
||||
target_uid, target_gid = repository_paths.root_owner
|
||||
with pytest.raises(Exception):
|
||||
repository_paths.tree_create()
|
||||
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
|
||||
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
|
||||
|
||||
with repository_paths.preserve_owner(repository_paths.root / "content"):
|
||||
(repository_paths.root / "created1").touch()
|
||||
(repository_paths.root / "content" / "created2").touch()
|
||||
(repository_paths.chroot / "created3").touch()
|
||||
chown_mock.assert_has_calls([MockCall(repository_paths.root / "content" / "created2")])
|
||||
|
||||
def test_preserve_owner_non_root(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must skip processing if user is not root
|
||||
"""
|
||||
mocker.patch("os.geteuid", return_value=42)
|
||||
mocker.patch("os.getegid", return_value=42)
|
||||
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
||||
seteuid_mock = mocker.patch("os.seteuid")
|
||||
setegid_mock = mocker.patch("os.setegid")
|
||||
|
||||
repository_paths.tree_create()
|
||||
seteuid_mock.assert_not_called()
|
||||
setegid_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_tree_clear(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
|
||||
Reference in New Issue
Block a user