mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-04-07 19:03:38 +00:00
Compare commits
19 Commits
93ccf7aa62
...
a4fc515370
| Author | SHA1 | Date | |
|---|---|---|---|
| a4fc515370 | |||
| 47eaec7027 | |||
| a51fad2e9f | |||
| 86b3266586 | |||
| 9773cdfe18 | |||
| 183cd306f4 | |||
| 5dbab02478 | |||
| 3dc28866e8 | |||
| eb56e3a762 | |||
| 6c886399b5 | |||
| 1099681487 | |||
| beed38ea8c | |||
| 8d64699f2b | |||
| 5f7ac0355b | |||
| e8abeeddc9 | |||
| b567a3b683 | |||
| 389bad6725 | |||
| 5738b8b911 | |||
| 5ac2e3de19 |
@@ -72,14 +72,16 @@ class Setup(Handler):
|
||||
|
||||
application = Application(repository_id, configuration, report=report)
|
||||
|
||||
with application.repository.paths.preserve_owner():
|
||||
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
|
||||
Setup.executable_create(application.repository.paths, repository_id)
|
||||
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
|
||||
Setup.configuration_create_devtools(
|
||||
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
|
||||
Setup.configuration_create_sudo(application.repository.paths, repository_id)
|
||||
# basically we create configuration here as root, but it is ok, because those files are only used for reading
|
||||
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
|
||||
Setup.executable_create(application.repository.paths, repository_id)
|
||||
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
|
||||
Setup.configuration_create_devtools(
|
||||
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
|
||||
Setup.configuration_create_sudo(application.repository.paths, repository_id)
|
||||
|
||||
# finish initialization
|
||||
with application.repository.paths.preserve_owner():
|
||||
application.repository.repo.init()
|
||||
# lazy database sync
|
||||
application.repository.pacman.handle # pylint: disable=pointless-statement
|
||||
|
||||
@@ -158,7 +158,7 @@ class Lock(LazyLogging):
|
||||
"""
|
||||
check if current user is actually owner of ahriman root
|
||||
"""
|
||||
check_user(self.paths, unsafe=self.unsafe)
|
||||
check_user(self.paths.root, unsafe=self.unsafe)
|
||||
self.paths.tree_create()
|
||||
|
||||
def check_version(self) -> None:
|
||||
|
||||
@@ -130,7 +130,7 @@ class Pacman(LazyLogging):
|
||||
return # database for some reason deos not exist
|
||||
|
||||
self.logger.info("copy pacman database %s from operating system root to ahriman's home %s", src, dst)
|
||||
with self.repository_paths.preserve_owner(dst.parent):
|
||||
with self.repository_paths.preserve_owner():
|
||||
shutil.copy(src, dst)
|
||||
|
||||
def database_init(self, handle: Handle, repository: str, architecture: str) -> DB:
|
||||
|
||||
@@ -125,7 +125,7 @@ class ArchiveTree(LazyLogging):
|
||||
if root.exists():
|
||||
return
|
||||
|
||||
with self.paths.preserve_owner(self.paths.archive):
|
||||
with self.paths.preserve_owner():
|
||||
root.mkdir(0o755, parents=True)
|
||||
# init empty repository here
|
||||
Repo(self.repository_id.name, self.paths, self.sign_args, root).init()
|
||||
|
||||
@@ -52,7 +52,7 @@ def migrate_data(connection: Connection, configuration: Configuration) -> None:
|
||||
|
||||
# create archive directory if required
|
||||
if not paths.archive.is_dir():
|
||||
with paths.preserve_owner(paths.archive):
|
||||
with paths.preserve_owner():
|
||||
paths.archive.mkdir(mode=0o755, parents=True)
|
||||
|
||||
move_packages(paths, pacman)
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
import subprocess
|
||||
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any, Self
|
||||
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
@@ -229,20 +228,6 @@ class PkgbuildParserError(ValueError):
|
||||
ValueError.__init__(self, message)
|
||||
|
||||
|
||||
class PathError(ValueError):
|
||||
"""
|
||||
exception which will be raised on path which is not belong to root directory
|
||||
"""
|
||||
|
||||
def __init__(self, path: Path, root: Path) -> None:
|
||||
"""
|
||||
Args:
|
||||
path(Path): path which raised an exception
|
||||
root(Path): repository root (i.e. ahriman home)
|
||||
"""
|
||||
ValueError.__init__(self, f"Path `{path}` does not belong to repository root `{root}`")
|
||||
|
||||
|
||||
class PasswordError(ValueError):
|
||||
"""
|
||||
exception which will be raised in case of password related errors
|
||||
|
||||
@@ -38,7 +38,6 @@ from pwd import getpwuid
|
||||
from typing import Any, IO, TypeVar
|
||||
|
||||
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
|
||||
__all__ = [
|
||||
@@ -52,6 +51,7 @@ __all__ = [
|
||||
"filter_json",
|
||||
"full_version",
|
||||
"minmax",
|
||||
"owner",
|
||||
"package_like",
|
||||
"parse_version",
|
||||
"partition",
|
||||
@@ -61,6 +61,7 @@ __all__ = [
|
||||
"safe_filename",
|
||||
"srcinfo_property",
|
||||
"srcinfo_property_list",
|
||||
"symlink_relative",
|
||||
"trim_package",
|
||||
"utcnow",
|
||||
"walk",
|
||||
@@ -194,12 +195,13 @@ def check_output(*args: str, exception: Exception | Callable[[int, list[str], st
|
||||
return stdout
|
||||
|
||||
|
||||
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
|
||||
def check_user(root: Path, *, unsafe: bool) -> None:
|
||||
"""
|
||||
check if current user is the owner of the root
|
||||
|
||||
Args:
|
||||
paths(RepositoryPaths): repository paths object
|
||||
root(Path): path to root directory (e.g. repository root
|
||||
:attr:`ahriman.models.repository_paths.RepositoryPaths.root`)
|
||||
unsafe(bool): if set no user check will be performed before path creation
|
||||
|
||||
Raises:
|
||||
@@ -208,14 +210,16 @@ def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
|
||||
Examples:
|
||||
Simply run function with arguments::
|
||||
|
||||
>>> check_user(paths, unsafe=False)
|
||||
>>> check_user(root, unsafe=False)
|
||||
"""
|
||||
if not paths.root.exists():
|
||||
if not root.exists():
|
||||
return # no directory found, skip check
|
||||
if unsafe:
|
||||
return # unsafe flag is enabled, no check performed
|
||||
current_uid = os.getuid()
|
||||
root_uid, _ = paths.root_owner
|
||||
|
||||
current_uid = os.geteuid()
|
||||
root_uid, _ = owner(root)
|
||||
|
||||
if current_uid != root_uid:
|
||||
raise UnsafeRunError(current_uid, root_uid)
|
||||
|
||||
@@ -334,6 +338,20 @@ def minmax(source: Iterable[T], *, key: Callable[[T], Any] | None = None) -> tup
|
||||
return min(first_iter, key=key), max(second_iter, key=key) # type: ignore
|
||||
|
||||
|
||||
def owner(path: Path) -> tuple[int, int]:
|
||||
"""
|
||||
retrieve owner information by path
|
||||
|
||||
Args:
|
||||
path(Path): path for which extract ids
|
||||
|
||||
Returns:
|
||||
tuple[int, int]: owner user and group ids of the directory
|
||||
"""
|
||||
stat = path.stat()
|
||||
return stat.st_uid, stat.st_gid
|
||||
|
||||
|
||||
def package_like(filename: Path) -> bool:
|
||||
"""
|
||||
check if file looks like package
|
||||
|
||||
@@ -27,8 +27,8 @@ from functools import cached_property
|
||||
from pathlib import Path
|
||||
from pwd import getpwuid
|
||||
|
||||
from ahriman.core.exceptions import PathError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.utils import owner
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
|
||||
|
||||
@@ -103,7 +103,7 @@ class RepositoryPaths(LazyLogging):
|
||||
Returns:
|
||||
Path: path to directory in which build process is run
|
||||
"""
|
||||
uid, _ = self.owner(self.root)
|
||||
uid, _ = owner(self.root)
|
||||
return self.chroot / f"{self.repository_id.name}-{self.repository_id.architecture}" / getpwuid(uid).pw_name
|
||||
|
||||
@property
|
||||
@@ -165,7 +165,7 @@ class RepositoryPaths(LazyLogging):
|
||||
Returns:
|
||||
tuple[int, int]: owner user and group of the root directory
|
||||
"""
|
||||
return self.owner(self.root)
|
||||
return owner(self.root)
|
||||
|
||||
# pylint: disable=protected-access
|
||||
@classmethod
|
||||
@@ -218,47 +218,6 @@ class RepositoryPaths(LazyLogging):
|
||||
|
||||
return set(walk(instance))
|
||||
|
||||
@staticmethod
|
||||
def owner(path: Path) -> tuple[int, int]:
|
||||
"""
|
||||
retrieve owner information by path
|
||||
|
||||
Args:
|
||||
path(Path): path for which extract ids
|
||||
|
||||
Returns:
|
||||
tuple[int, int]: owner user and group ids of the directory
|
||||
"""
|
||||
stat = path.stat()
|
||||
return stat.st_uid, stat.st_gid
|
||||
|
||||
def _chown(self, path: Path) -> None:
|
||||
"""
|
||||
set owner of path recursively (from root) to root owner
|
||||
|
||||
Notes:
|
||||
More likely you don't want to call this method explicitly, consider using :func:`preserve_owner()`
|
||||
as context manager instead
|
||||
|
||||
Args:
|
||||
path(Path): path to be chown
|
||||
|
||||
Raises:
|
||||
PathError: if path does not belong to root
|
||||
"""
|
||||
def set_owner(current: Path) -> None:
|
||||
uid, gid = self.owner(current)
|
||||
if uid == root_uid and gid == root_gid:
|
||||
return
|
||||
os.chown(current, root_uid, root_gid, follow_symlinks=False)
|
||||
|
||||
if self.root not in path.parents:
|
||||
raise PathError(path, self.root)
|
||||
root_uid, root_gid = self.root_owner
|
||||
while path != self.root:
|
||||
set_owner(path)
|
||||
path = path.parent
|
||||
|
||||
def archive_for(self, package_base: str) -> Path:
|
||||
"""
|
||||
get path to archive specified search criteria
|
||||
@@ -271,7 +230,7 @@ class RepositoryPaths(LazyLogging):
|
||||
"""
|
||||
directory = self.archive / "packages" / package_base[0] / package_base
|
||||
if not directory.is_dir(): # create if not exists
|
||||
with self.preserve_owner(self.archive):
|
||||
with self.preserve_owner():
|
||||
directory.mkdir(mode=0o755, parents=True)
|
||||
|
||||
return directory
|
||||
@@ -289,13 +248,10 @@ class RepositoryPaths(LazyLogging):
|
||||
return self.cache / package_base
|
||||
|
||||
@contextlib.contextmanager
|
||||
def preserve_owner(self, path: Path | None = None) -> Iterator[None]:
|
||||
def preserve_owner(self) -> Iterator[None]:
|
||||
"""
|
||||
perform any action preserving owner for any newly created file or directory
|
||||
|
||||
Args:
|
||||
path(Path | None, optional): use this path as root instead of repository root (Default value = None)
|
||||
|
||||
Examples:
|
||||
This method is designed to use as context manager when you are going to perform operations which might
|
||||
change filesystem, especially if you are doing it under unsafe flag, e.g.::
|
||||
@@ -306,29 +262,26 @@ class RepositoryPaths(LazyLogging):
|
||||
Note, however, that this method doesn't handle any exceptions and will eventually interrupt
|
||||
if there will be any.
|
||||
"""
|
||||
path = path or self.root
|
||||
# guard non-root
|
||||
# the reason we do this is that it only works if permissions can be actually changed. Hence,
|
||||
# non-privileged user (e.g. personal user or ahriman user) can't change permissions.
|
||||
# The only one who can do so is root, so if user is not root we just terminate function
|
||||
current_uid, current_gid = os.geteuid(), os.getegid()
|
||||
if current_uid != 0:
|
||||
yield
|
||||
return
|
||||
|
||||
def walk(root: Path) -> Iterator[Path]:
|
||||
yield root
|
||||
if not root.exists():
|
||||
return
|
||||
# set uid and gid to root owner
|
||||
target_uid, target_gid = self.root_owner
|
||||
os.setegid(target_gid)
|
||||
os.seteuid(target_uid)
|
||||
|
||||
# basically walk, but skipping some content
|
||||
for child in root.iterdir():
|
||||
yield child
|
||||
if child in (self.chroot.parent,):
|
||||
yield from child.iterdir() # we only yield top-level in chroot directory
|
||||
elif child.is_dir():
|
||||
yield from walk(child)
|
||||
|
||||
# get current filesystem and run action
|
||||
previous_snapshot = set(walk(path))
|
||||
yield
|
||||
|
||||
# get newly created files and directories and chown them
|
||||
new_entries = set(walk(path)).difference(previous_snapshot)
|
||||
for entry in new_entries:
|
||||
self._chown(entry)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# reset uid and gid
|
||||
os.seteuid(current_uid)
|
||||
os.setegid(current_gid)
|
||||
|
||||
def tree_clear(self, package_base: str) -> None:
|
||||
"""
|
||||
|
||||
@@ -142,7 +142,7 @@ def test_check_user(lock: Lock, mocker: MockerFixture) -> None:
|
||||
tree_create = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.tree_create")
|
||||
|
||||
lock.check_user()
|
||||
check_user_patch.assert_called_once_with(lock.paths, unsafe=False)
|
||||
check_user_patch.assert_called_once_with(lock.paths.root, unsafe=False)
|
||||
tree_create.assert_called_once_with()
|
||||
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ def test_database_copy(pacman: Pacman, mocker: MockerFixture) -> None:
|
||||
pacman.database_copy(pacman.handle, database, path, use_ahriman_cache=True)
|
||||
mkdir_mock.assert_called_once_with(mode=0o755, exist_ok=True)
|
||||
copy_mock.assert_called_once_with(path / "sync" / "core.db", dst_path)
|
||||
owner_guard_mock.assert_called_once_with(dst_path.parent)
|
||||
owner_guard_mock.assert_called_once_with()
|
||||
|
||||
|
||||
def test_database_copy_skip(pacman: Pacman, mocker: MockerFixture) -> None:
|
||||
|
||||
@@ -117,7 +117,7 @@ def test_tree_create(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
|
||||
init_mock = mocker.patch("ahriman.core.alpm.repo.Repo.init")
|
||||
|
||||
archive_tree.tree_create()
|
||||
owner_guard_mock.assert_called_once_with(archive_tree.paths.archive)
|
||||
owner_guard_mock.assert_called_once_with()
|
||||
mkdir_mock.assert_called_once_with(0o755, parents=True)
|
||||
init_mock.assert_called_once_with()
|
||||
|
||||
|
||||
@@ -7,34 +7,10 @@ import pytest
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any
|
||||
from unittest.mock import call as MockCall
|
||||
from unittest.mock import MagicMock, call as MockCall
|
||||
|
||||
from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError
|
||||
from ahriman.core.utils import (
|
||||
atomic_move,
|
||||
check_output,
|
||||
check_user,
|
||||
dataclass_view,
|
||||
enum_values,
|
||||
extract_user,
|
||||
filelock,
|
||||
filter_json,
|
||||
full_version,
|
||||
minmax,
|
||||
package_like,
|
||||
parse_version,
|
||||
partition,
|
||||
pretty_datetime,
|
||||
pretty_interval,
|
||||
pretty_size,
|
||||
safe_filename,
|
||||
srcinfo_property,
|
||||
srcinfo_property_list,
|
||||
symlink_relative,
|
||||
trim_package,
|
||||
utcnow,
|
||||
walk,
|
||||
)
|
||||
from ahriman.core.utils import *
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.package_source import PackageSource
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
@@ -197,8 +173,8 @@ def test_check_user(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
must check user correctly
|
||||
"""
|
||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||
mocker.patch("os.getuid", return_value=paths.root_owner[0])
|
||||
check_user(paths, unsafe=False)
|
||||
mocker.patch("os.geteuid", return_value=paths.root_owner[0])
|
||||
check_user(paths.root, unsafe=False)
|
||||
|
||||
|
||||
def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
@@ -206,7 +182,7 @@ def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: Mock
|
||||
must not fail in case if no directory found
|
||||
"""
|
||||
mocker.patch("pathlib.Path.exists", return_value=False)
|
||||
check_user(repository_paths, unsafe=False)
|
||||
check_user(repository_paths.root, unsafe=False)
|
||||
|
||||
|
||||
def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
@@ -214,10 +190,10 @@ def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture
|
||||
must raise exception if user differs
|
||||
"""
|
||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
|
||||
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1)
|
||||
|
||||
with pytest.raises(UnsafeRunError):
|
||||
check_user(paths, unsafe=False)
|
||||
check_user(paths.root, unsafe=False)
|
||||
|
||||
|
||||
def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
@@ -225,8 +201,8 @@ def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -
|
||||
must skip check if unsafe flag is set
|
||||
"""
|
||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
|
||||
check_user(paths, unsafe=True)
|
||||
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1)
|
||||
check_user(paths.root, unsafe=True)
|
||||
|
||||
|
||||
def test_dataclass_view(package_ahriman: Package) -> None:
|
||||
@@ -357,6 +333,18 @@ def test_minmax() -> None:
|
||||
assert minmax([[1, 2, 3], [4, 5], [6, 7, 8, 9]], key=len) == ([4, 5], [6, 7, 8, 9])
|
||||
|
||||
|
||||
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly retrieve owner of the path
|
||||
"""
|
||||
stat_mock = MagicMock()
|
||||
stat_mock.st_uid = 42
|
||||
stat_mock.st_gid = 142
|
||||
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
|
||||
|
||||
assert owner(repository_paths.root) == (42, 142)
|
||||
|
||||
|
||||
def test_package_like(package_ahriman: Package) -> None:
|
||||
"""
|
||||
package_like must return true for archives
|
||||
|
||||
@@ -3,9 +3,8 @@ import pytest
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from unittest.mock import MagicMock, call as MockCall
|
||||
from unittest.mock import call as MockCall
|
||||
|
||||
from ahriman.core.exceptions import PathError
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
@@ -55,7 +54,7 @@ def test_root_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) ->
|
||||
"""
|
||||
must correctly define root directory owner
|
||||
"""
|
||||
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.owner", return_value=(42, 142))
|
||||
mocker.patch("ahriman.models.repository_paths.owner", return_value=(42, 142))
|
||||
assert repository_paths.root_owner == (42, 142)
|
||||
|
||||
|
||||
@@ -186,68 +185,6 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc
|
||||
iterdir_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly retrieve owner of the path
|
||||
"""
|
||||
stat_mock = MagicMock()
|
||||
stat_mock.st_uid = 42
|
||||
stat_mock.st_gid = 142
|
||||
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
|
||||
|
||||
assert RepositoryPaths.owner(repository_paths.root) == (42, 142)
|
||||
|
||||
|
||||
def test_chown(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly set owner for the directory
|
||||
"""
|
||||
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
|
||||
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||
chown_mock = mocker.patch("os.chown")
|
||||
|
||||
path = repository_paths.root / "path"
|
||||
repository_paths._chown(path)
|
||||
chown_mock.assert_called_once_with(path, 42, 42, follow_symlinks=False)
|
||||
|
||||
|
||||
def test_chown_parent(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly set owner for the directory including parents
|
||||
"""
|
||||
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
|
||||
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||
chown_mock = mocker.patch("os.chown")
|
||||
|
||||
path = repository_paths.root / "parent" / "path"
|
||||
repository_paths._chown(path)
|
||||
chown_mock.assert_has_calls([
|
||||
MockCall(path, 42, 42, follow_symlinks=False),
|
||||
MockCall(path.parent, 42, 42, follow_symlinks=False)
|
||||
])
|
||||
|
||||
|
||||
def test_chown_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must skip ownership set in case if it is same as root
|
||||
"""
|
||||
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=True))
|
||||
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||
chown_mock = mocker.patch("os.chown")
|
||||
|
||||
path = repository_paths.root / "path"
|
||||
repository_paths._chown(path)
|
||||
chown_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_chown_invalid_path(repository_paths: RepositoryPaths) -> None:
|
||||
"""
|
||||
must raise invalid path exception in case if directory outside the root supplied
|
||||
"""
|
||||
with pytest.raises(PathError):
|
||||
repository_paths._chown(repository_paths.root.parent)
|
||||
|
||||
|
||||
def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly define archive path
|
||||
@@ -266,7 +203,7 @@ def test_archive_for_create_tree(repository_paths: RepositoryPaths, package_ahri
|
||||
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
|
||||
|
||||
repository_paths.archive_for(package_ahriman.base)
|
||||
owner_mock.assert_called_once_with(repository_paths.archive)
|
||||
owner_mock.assert_called_once_with()
|
||||
mkdir_mock.assert_called_once_with(mode=0o755, parents=True)
|
||||
|
||||
|
||||
@@ -283,42 +220,49 @@ def test_preserve_owner(tmp_path: Path, repository_id: RepositoryId, mocker: Moc
|
||||
"""
|
||||
must preserve file owner during operations
|
||||
"""
|
||||
mocker.patch("os.geteuid", return_value=0)
|
||||
mocker.patch("os.getegid", return_value=0)
|
||||
seteuid_mock = mocker.patch("os.seteuid")
|
||||
setegid_mock = mocker.patch("os.setegid")
|
||||
|
||||
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
||||
target_uid, target_gid = repository_paths.root_owner
|
||||
repository_paths.tree_create()
|
||||
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
|
||||
|
||||
with repository_paths.preserve_owner():
|
||||
(repository_paths.root / "created1").touch()
|
||||
(repository_paths.chroot / "created2").touch()
|
||||
chown_mock.assert_has_calls([MockCall(repository_paths.root / "created1")])
|
||||
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
|
||||
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
|
||||
|
||||
|
||||
def test_preserve_owner_specific(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
def test_preserve_owner_exception(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must preserve file owner during operations only in specific directory
|
||||
must return to original uid and gid even during exception
|
||||
"""
|
||||
mocker.patch("os.geteuid", return_value=0)
|
||||
mocker.patch("os.getegid", return_value=0)
|
||||
mocker.patch("pathlib.Path.mkdir", side_effect=Exception)
|
||||
seteuid_mock = mocker.patch("os.seteuid")
|
||||
setegid_mock = mocker.patch("os.setegid")
|
||||
|
||||
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
||||
target_uid, target_gid = repository_paths.root_owner
|
||||
with pytest.raises(Exception):
|
||||
repository_paths.tree_create()
|
||||
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
|
||||
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
|
||||
|
||||
|
||||
def test_preserve_owner_non_root(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must skip processing if user is not root
|
||||
"""
|
||||
mocker.patch("os.geteuid", return_value=42)
|
||||
mocker.patch("os.getegid", return_value=42)
|
||||
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
||||
seteuid_mock = mocker.patch("os.seteuid")
|
||||
setegid_mock = mocker.patch("os.setegid")
|
||||
|
||||
repository_paths.tree_create()
|
||||
(repository_paths.root / "content").mkdir()
|
||||
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
|
||||
|
||||
with repository_paths.preserve_owner(repository_paths.root / "content"):
|
||||
(repository_paths.root / "created1").touch()
|
||||
(repository_paths.root / "content" / "created2").touch()
|
||||
(repository_paths.chroot / "created3").touch()
|
||||
chown_mock.assert_has_calls([MockCall(repository_paths.root / "content" / "created2")])
|
||||
|
||||
|
||||
def test_preserve_owner_no_directory(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must skip directory scan if it does not exist
|
||||
"""
|
||||
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
||||
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
|
||||
|
||||
with repository_paths.preserve_owner(Path("empty")):
|
||||
(repository_paths.root / "created1").touch()
|
||||
chown_mock.assert_not_called()
|
||||
seteuid_mock.assert_not_called()
|
||||
setegid_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_tree_clear(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
|
||||
Reference in New Issue
Block a user