Compare commits

..

15 Commits

13 changed files with 233 additions and 123 deletions

View File

@@ -72,16 +72,14 @@ class Setup(Handler):
application = Application(repository_id, configuration, report=report) application = Application(repository_id, configuration, report=report)
# basically we create configuration here as root, but it is ok, because those files are only used for reading
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
Setup.executable_create(application.repository.paths, repository_id)
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
Setup.configuration_create_devtools(
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
Setup.configuration_create_sudo(application.repository.paths, repository_id)
# finish initialization
with application.repository.paths.preserve_owner(): with application.repository.paths.preserve_owner():
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
Setup.executable_create(application.repository.paths, repository_id)
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
Setup.configuration_create_devtools(
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
Setup.configuration_create_sudo(application.repository.paths, repository_id)
application.repository.repo.init() application.repository.repo.init()
# lazy database sync # lazy database sync
application.repository.pacman.handle # pylint: disable=pointless-statement application.repository.pacman.handle # pylint: disable=pointless-statement

View File

@@ -158,7 +158,7 @@ class Lock(LazyLogging):
""" """
check if current user is actually owner of ahriman root check if current user is actually owner of ahriman root
""" """
check_user(self.paths.root, unsafe=self.unsafe) check_user(self.paths, unsafe=self.unsafe)
self.paths.tree_create() self.paths.tree_create()
def check_version(self) -> None: def check_version(self) -> None:

View File

@@ -130,7 +130,7 @@ class Pacman(LazyLogging):
return # database for some reason deos not exist return # database for some reason deos not exist
self.logger.info("copy pacman database %s from operating system root to ahriman's home %s", src, dst) self.logger.info("copy pacman database %s from operating system root to ahriman's home %s", src, dst)
with self.repository_paths.preserve_owner(): with self.repository_paths.preserve_owner(dst.parent):
shutil.copy(src, dst) shutil.copy(src, dst)
def database_init(self, handle: Handle, repository: str, architecture: str) -> DB: def database_init(self, handle: Handle, repository: str, architecture: str) -> DB:

View File

@@ -125,7 +125,7 @@ class ArchiveTree(LazyLogging):
if root.exists(): if root.exists():
return return
with self.paths.preserve_owner(): with self.paths.preserve_owner(self.paths.archive):
root.mkdir(0o755, parents=True) root.mkdir(0o755, parents=True)
# init empty repository here # init empty repository here
Repo(self.repository_id.name, self.paths, self.sign_args, root).init() Repo(self.repository_id.name, self.paths, self.sign_args, root).init()

View File

@@ -52,7 +52,7 @@ def migrate_data(connection: Connection, configuration: Configuration) -> None:
# create archive directory if required # create archive directory if required
if not paths.archive.is_dir(): if not paths.archive.is_dir():
with paths.preserve_owner(): with paths.preserve_owner(paths.archive):
paths.archive.mkdir(mode=0o755, parents=True) paths.archive.mkdir(mode=0o755, parents=True)
move_packages(paths, pacman) move_packages(paths, pacman)

View File

@@ -20,6 +20,7 @@
import subprocess import subprocess
from collections.abc import Callable from collections.abc import Callable
from pathlib import Path
from typing import Any, Self from typing import Any, Self
from ahriman.models.repository_id import RepositoryId from ahriman.models.repository_id import RepositoryId
@@ -228,6 +229,20 @@ class PkgbuildParserError(ValueError):
ValueError.__init__(self, message) ValueError.__init__(self, message)
class PathError(ValueError):
"""
exception which will be raised on path which is not belong to root directory
"""
def __init__(self, path: Path, root: Path) -> None:
"""
Args:
path(Path): path which raised an exception
root(Path): repository root (i.e. ahriman home)
"""
ValueError.__init__(self, f"Path `{path}` does not belong to repository root `{root}`")
class PasswordError(ValueError): class PasswordError(ValueError):
""" """
exception which will be raised in case of password related errors exception which will be raised in case of password related errors

View File

@@ -38,6 +38,7 @@ from pwd import getpwuid
from typing import Any, IO, TypeVar from typing import Any, IO, TypeVar
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
from ahriman.models.repository_paths import RepositoryPaths
__all__ = [ __all__ = [
@@ -51,7 +52,6 @@ __all__ = [
"filter_json", "filter_json",
"full_version", "full_version",
"minmax", "minmax",
"owner",
"package_like", "package_like",
"parse_version", "parse_version",
"partition", "partition",
@@ -61,7 +61,6 @@ __all__ = [
"safe_filename", "safe_filename",
"srcinfo_property", "srcinfo_property",
"srcinfo_property_list", "srcinfo_property_list",
"symlink_relative",
"trim_package", "trim_package",
"utcnow", "utcnow",
"walk", "walk",
@@ -195,13 +194,12 @@ def check_output(*args: str, exception: Exception | Callable[[int, list[str], st
return stdout return stdout
def check_user(root: Path, *, unsafe: bool) -> None: def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
""" """
check if current user is the owner of the root check if current user is the owner of the root
Args: Args:
root(Path): path to root directory (e.g. repository root paths(RepositoryPaths): repository paths object
:attr:`ahriman.models.repository_paths.RepositoryPaths.root`)
unsafe(bool): if set no user check will be performed before path creation unsafe(bool): if set no user check will be performed before path creation
Raises: Raises:
@@ -210,16 +208,14 @@ def check_user(root: Path, *, unsafe: bool) -> None:
Examples: Examples:
Simply run function with arguments:: Simply run function with arguments::
>>> check_user(root, unsafe=False) >>> check_user(paths, unsafe=False)
""" """
if not root.exists(): if not paths.root.exists():
return # no directory found, skip check return # no directory found, skip check
if unsafe: if unsafe:
return # unsafe flag is enabled, no check performed return # unsafe flag is enabled, no check performed
current_uid = os.getuid()
current_uid = os.geteuid() root_uid, _ = paths.root_owner
root_uid, _ = owner(root)
if current_uid != root_uid: if current_uid != root_uid:
raise UnsafeRunError(current_uid, root_uid) raise UnsafeRunError(current_uid, root_uid)
@@ -338,20 +334,6 @@ def minmax(source: Iterable[T], *, key: Callable[[T], Any] | None = None) -> tup
return min(first_iter, key=key), max(second_iter, key=key) # type: ignore return min(first_iter, key=key), max(second_iter, key=key) # type: ignore
def owner(path: Path) -> tuple[int, int]:
"""
retrieve owner information by path
Args:
path(Path): path for which extract ids
Returns:
tuple[int, int]: owner user and group ids of the directory
"""
stat = path.stat()
return stat.st_uid, stat.st_gid
def package_like(filename: Path) -> bool: def package_like(filename: Path) -> bool:
""" """
check if file looks like package check if file looks like package

View File

@@ -27,8 +27,8 @@ from functools import cached_property
from pathlib import Path from pathlib import Path
from pwd import getpwuid from pwd import getpwuid
from ahriman.core.exceptions import PathError
from ahriman.core.log import LazyLogging from ahriman.core.log import LazyLogging
from ahriman.core.utils import owner
from ahriman.models.repository_id import RepositoryId from ahriman.models.repository_id import RepositoryId
@@ -103,7 +103,7 @@ class RepositoryPaths(LazyLogging):
Returns: Returns:
Path: path to directory in which build process is run Path: path to directory in which build process is run
""" """
uid, _ = owner(self.root) uid, _ = self.owner(self.root)
return self.chroot / f"{self.repository_id.name}-{self.repository_id.architecture}" / getpwuid(uid).pw_name return self.chroot / f"{self.repository_id.name}-{self.repository_id.architecture}" / getpwuid(uid).pw_name
@property @property
@@ -165,7 +165,7 @@ class RepositoryPaths(LazyLogging):
Returns: Returns:
tuple[int, int]: owner user and group of the root directory tuple[int, int]: owner user and group of the root directory
""" """
return owner(self.root) return self.owner(self.root)
# pylint: disable=protected-access # pylint: disable=protected-access
@classmethod @classmethod
@@ -218,6 +218,47 @@ class RepositoryPaths(LazyLogging):
return set(walk(instance)) return set(walk(instance))
@staticmethod
def owner(path: Path) -> tuple[int, int]:
"""
retrieve owner information by path
Args:
path(Path): path for which extract ids
Returns:
tuple[int, int]: owner user and group ids of the directory
"""
stat = path.stat()
return stat.st_uid, stat.st_gid
def _chown(self, path: Path) -> None:
"""
set owner of path recursively (from root) to root owner
Notes:
More likely you don't want to call this method explicitly, consider using :func:`preserve_owner()`
as context manager instead
Args:
path(Path): path to be chown
Raises:
PathError: if path does not belong to root
"""
def set_owner(current: Path) -> None:
uid, gid = self.owner(current)
if uid == root_uid and gid == root_gid:
return
os.chown(current, root_uid, root_gid, follow_symlinks=False)
if self.root not in path.parents:
raise PathError(path, self.root)
root_uid, root_gid = self.root_owner
while path != self.root:
set_owner(path)
path = path.parent
def archive_for(self, package_base: str) -> Path: def archive_for(self, package_base: str) -> Path:
""" """
get path to archive specified search criteria get path to archive specified search criteria
@@ -230,7 +271,7 @@ class RepositoryPaths(LazyLogging):
""" """
directory = self.archive / "packages" / package_base[0] / package_base directory = self.archive / "packages" / package_base[0] / package_base
if not directory.is_dir(): # create if not exists if not directory.is_dir(): # create if not exists
with self.preserve_owner(): with self.preserve_owner(self.archive):
directory.mkdir(mode=0o755, parents=True) directory.mkdir(mode=0o755, parents=True)
return directory return directory
@@ -248,10 +289,13 @@ class RepositoryPaths(LazyLogging):
return self.cache / package_base return self.cache / package_base
@contextlib.contextmanager @contextlib.contextmanager
def preserve_owner(self) -> Iterator[None]: def preserve_owner(self, path: Path | None = None) -> Iterator[None]:
""" """
perform any action preserving owner for any newly created file or directory perform any action preserving owner for any newly created file or directory
Args:
path(Path | None, optional): use this path as root instead of repository root (Default value = None)
Examples: Examples:
This method is designed to use as context manager when you are going to perform operations which might This method is designed to use as context manager when you are going to perform operations which might
change filesystem, especially if you are doing it under unsafe flag, e.g.:: change filesystem, especially if you are doing it under unsafe flag, e.g.::
@@ -262,26 +306,29 @@ class RepositoryPaths(LazyLogging):
Note, however, that this method doesn't handle any exceptions and will eventually interrupt Note, however, that this method doesn't handle any exceptions and will eventually interrupt
if there will be any. if there will be any.
""" """
# guard non-root path = path or self.root
# the reason we do this is that it only works if permissions can be actually changed. Hence,
# non-privileged user (e.g. personal user or ahriman user) can't change permissions.
# The only one who can do so is root, so if user is not root we just terminate function
current_uid, current_gid = os.geteuid(), os.getegid()
if current_uid != 0:
yield
return
# set uid and gid to root owner def walk(root: Path) -> Iterator[Path]:
target_uid, target_gid = self.root_owner yield root
os.setegid(target_gid) if not root.exists():
os.seteuid(target_uid) return
try: # basically walk, but skipping some content
yield for child in root.iterdir():
finally: yield child
# reset uid and gid if child in (self.chroot.parent,):
os.seteuid(current_uid) yield from child.iterdir() # we only yield top-level in chroot directory
os.setegid(current_gid) elif child.is_dir():
yield from walk(child)
# get current filesystem and run action
previous_snapshot = set(walk(path))
yield
# get newly created files and directories and chown them
new_entries = set(walk(path)).difference(previous_snapshot)
for entry in new_entries:
self._chown(entry)
def tree_clear(self, package_base: str) -> None: def tree_clear(self, package_base: str) -> None:
""" """

View File

@@ -142,7 +142,7 @@ def test_check_user(lock: Lock, mocker: MockerFixture) -> None:
tree_create = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.tree_create") tree_create = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.tree_create")
lock.check_user() lock.check_user()
check_user_patch.assert_called_once_with(lock.paths.root, unsafe=False) check_user_patch.assert_called_once_with(lock.paths, unsafe=False)
tree_create.assert_called_once_with() tree_create.assert_called_once_with()

View File

@@ -67,7 +67,7 @@ def test_database_copy(pacman: Pacman, mocker: MockerFixture) -> None:
pacman.database_copy(pacman.handle, database, path, use_ahriman_cache=True) pacman.database_copy(pacman.handle, database, path, use_ahriman_cache=True)
mkdir_mock.assert_called_once_with(mode=0o755, exist_ok=True) mkdir_mock.assert_called_once_with(mode=0o755, exist_ok=True)
copy_mock.assert_called_once_with(path / "sync" / "core.db", dst_path) copy_mock.assert_called_once_with(path / "sync" / "core.db", dst_path)
owner_guard_mock.assert_called_once_with() owner_guard_mock.assert_called_once_with(dst_path.parent)
def test_database_copy_skip(pacman: Pacman, mocker: MockerFixture) -> None: def test_database_copy_skip(pacman: Pacman, mocker: MockerFixture) -> None:

View File

@@ -117,7 +117,7 @@ def test_tree_create(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
init_mock = mocker.patch("ahriman.core.alpm.repo.Repo.init") init_mock = mocker.patch("ahriman.core.alpm.repo.Repo.init")
archive_tree.tree_create() archive_tree.tree_create()
owner_guard_mock.assert_called_once_with() owner_guard_mock.assert_called_once_with(archive_tree.paths.archive)
mkdir_mock.assert_called_once_with(0o755, parents=True) mkdir_mock.assert_called_once_with(0o755, parents=True)
init_mock.assert_called_once_with() init_mock.assert_called_once_with()

View File

@@ -7,10 +7,34 @@ import pytest
from pathlib import Path from pathlib import Path
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from typing import Any from typing import Any
from unittest.mock import MagicMock, call as MockCall from unittest.mock import call as MockCall
from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError
from ahriman.core.utils import * from ahriman.core.utils import (
atomic_move,
check_output,
check_user,
dataclass_view,
enum_values,
extract_user,
filelock,
filter_json,
full_version,
minmax,
package_like,
parse_version,
partition,
pretty_datetime,
pretty_interval,
pretty_size,
safe_filename,
srcinfo_property,
srcinfo_property_list,
symlink_relative,
trim_package,
utcnow,
walk,
)
from ahriman.models.package import Package from ahriman.models.package import Package
from ahriman.models.package_source import PackageSource from ahriman.models.package_source import PackageSource
from ahriman.models.repository_id import RepositoryId from ahriman.models.repository_id import RepositoryId
@@ -173,8 +197,8 @@ def test_check_user(repository_id: RepositoryId, mocker: MockerFixture) -> None:
must check user correctly must check user correctly
""" """
paths = RepositoryPaths(Path.cwd(), repository_id) paths = RepositoryPaths(Path.cwd(), repository_id)
mocker.patch("os.geteuid", return_value=paths.root_owner[0]) mocker.patch("os.getuid", return_value=paths.root_owner[0])
check_user(paths.root, unsafe=False) check_user(paths, unsafe=False)
def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None: def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
@@ -182,7 +206,7 @@ def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: Mock
must not fail in case if no directory found must not fail in case if no directory found
""" """
mocker.patch("pathlib.Path.exists", return_value=False) mocker.patch("pathlib.Path.exists", return_value=False)
check_user(repository_paths.root, unsafe=False) check_user(repository_paths, unsafe=False)
def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture) -> None: def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture) -> None:
@@ -190,10 +214,10 @@ def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture
must raise exception if user differs must raise exception if user differs
""" """
paths = RepositoryPaths(Path.cwd(), repository_id) paths = RepositoryPaths(Path.cwd(), repository_id)
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1) mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
with pytest.raises(UnsafeRunError): with pytest.raises(UnsafeRunError):
check_user(paths.root, unsafe=False) check_user(paths, unsafe=False)
def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -> None: def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -> None:
@@ -201,8 +225,8 @@ def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -
must skip check if unsafe flag is set must skip check if unsafe flag is set
""" """
paths = RepositoryPaths(Path.cwd(), repository_id) paths = RepositoryPaths(Path.cwd(), repository_id)
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1) mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
check_user(paths.root, unsafe=True) check_user(paths, unsafe=True)
def test_dataclass_view(package_ahriman: Package) -> None: def test_dataclass_view(package_ahriman: Package) -> None:
@@ -333,18 +357,6 @@ def test_minmax() -> None:
assert minmax([[1, 2, 3], [4, 5], [6, 7, 8, 9]], key=len) == ([4, 5], [6, 7, 8, 9]) assert minmax([[1, 2, 3], [4, 5], [6, 7, 8, 9]], key=len) == ([4, 5], [6, 7, 8, 9])
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly retrieve owner of the path
"""
stat_mock = MagicMock()
stat_mock.st_uid = 42
stat_mock.st_gid = 142
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
assert owner(repository_paths.root) == (42, 142)
def test_package_like(package_ahriman: Package) -> None: def test_package_like(package_ahriman: Package) -> None:
""" """
package_like must return true for archives package_like must return true for archives

View File

@@ -3,8 +3,9 @@ import pytest
from collections.abc import Callable from collections.abc import Callable
from pathlib import Path from pathlib import Path
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from unittest.mock import call as MockCall from unittest.mock import MagicMock, call as MockCall
from ahriman.core.exceptions import PathError
from ahriman.models.package import Package from ahriman.models.package import Package
from ahriman.models.repository_id import RepositoryId from ahriman.models.repository_id import RepositoryId
from ahriman.models.repository_paths import RepositoryPaths from ahriman.models.repository_paths import RepositoryPaths
@@ -54,7 +55,7 @@ def test_root_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) ->
""" """
must correctly define root directory owner must correctly define root directory owner
""" """
mocker.patch("ahriman.models.repository_paths.owner", return_value=(42, 142)) mocker.patch("ahriman.models.repository_paths.RepositoryPaths.owner", return_value=(42, 142))
assert repository_paths.root_owner == (42, 142) assert repository_paths.root_owner == (42, 142)
@@ -185,6 +186,68 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc
iterdir_mock.assert_not_called() iterdir_mock.assert_not_called()
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly retrieve owner of the path
"""
stat_mock = MagicMock()
stat_mock.st_uid = 42
stat_mock.st_gid = 142
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
assert RepositoryPaths.owner(repository_paths.root) == (42, 142)
def test_chown(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly set owner for the directory
"""
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "path"
repository_paths._chown(path)
chown_mock.assert_called_once_with(path, 42, 42, follow_symlinks=False)
def test_chown_parent(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly set owner for the directory including parents
"""
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "parent" / "path"
repository_paths._chown(path)
chown_mock.assert_has_calls([
MockCall(path, 42, 42, follow_symlinks=False),
MockCall(path.parent, 42, 42, follow_symlinks=False)
])
def test_chown_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must skip ownership set in case if it is same as root
"""
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=True))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "path"
repository_paths._chown(path)
chown_mock.assert_not_called()
def test_chown_invalid_path(repository_paths: RepositoryPaths) -> None:
"""
must raise invalid path exception in case if directory outside the root supplied
"""
with pytest.raises(PathError):
repository_paths._chown(repository_paths.root.parent)
def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None: def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
""" """
must correctly define archive path must correctly define archive path
@@ -203,7 +266,7 @@ def test_archive_for_create_tree(repository_paths: RepositoryPaths, package_ahri
mkdir_mock = mocker.patch("pathlib.Path.mkdir") mkdir_mock = mocker.patch("pathlib.Path.mkdir")
repository_paths.archive_for(package_ahriman.base) repository_paths.archive_for(package_ahriman.base)
owner_mock.assert_called_once_with() owner_mock.assert_called_once_with(repository_paths.archive)
mkdir_mock.assert_called_once_with(mode=0o755, parents=True) mkdir_mock.assert_called_once_with(mode=0o755, parents=True)
@@ -220,49 +283,42 @@ def test_preserve_owner(tmp_path: Path, repository_id: RepositoryId, mocker: Moc
""" """
must preserve file owner during operations must preserve file owner during operations
""" """
mocker.patch("os.geteuid", return_value=0)
mocker.patch("os.getegid", return_value=0)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths = RepositoryPaths(tmp_path, repository_id) repository_paths = RepositoryPaths(tmp_path, repository_id)
target_uid, target_gid = repository_paths.root_owner
repository_paths.tree_create() repository_paths.tree_create()
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)]) chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
with repository_paths.preserve_owner():
(repository_paths.root / "created1").touch()
(repository_paths.chroot / "created2").touch()
chown_mock.assert_has_calls([MockCall(repository_paths.root / "created1")])
def test_preserve_owner_exception(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None: def test_preserve_owner_specific(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
""" """
must return to original uid and gid even during exception must preserve file owner during operations only in specific directory
""" """
mocker.patch("os.geteuid", return_value=0)
mocker.patch("os.getegid", return_value=0)
mocker.patch("pathlib.Path.mkdir", side_effect=Exception)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths = RepositoryPaths(tmp_path, repository_id) repository_paths = RepositoryPaths(tmp_path, repository_id)
target_uid, target_gid = repository_paths.root_owner
with pytest.raises(Exception):
repository_paths.tree_create()
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
def test_preserve_owner_non_root(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
"""
must skip processing if user is not root
"""
mocker.patch("os.geteuid", return_value=42)
mocker.patch("os.getegid", return_value=42)
repository_paths = RepositoryPaths(tmp_path, repository_id)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths.tree_create() repository_paths.tree_create()
seteuid_mock.assert_not_called() (repository_paths.root / "content").mkdir()
setegid_mock.assert_not_called() chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
with repository_paths.preserve_owner(repository_paths.root / "content"):
(repository_paths.root / "created1").touch()
(repository_paths.root / "content" / "created2").touch()
(repository_paths.chroot / "created3").touch()
chown_mock.assert_has_calls([MockCall(repository_paths.root / "content" / "created2")])
def test_preserve_owner_no_directory(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
"""
must skip directory scan if it does not exist
"""
repository_paths = RepositoryPaths(tmp_path, repository_id)
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
with repository_paths.preserve_owner(Path("empty")):
(repository_paths.root / "created1").touch()
chown_mock.assert_not_called()
def test_tree_clear(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None: def test_tree_clear(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None: