mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-04-09 03:33:39 +00:00
Compare commits
15 Commits
a4fc515370
...
93ccf7aa62
| Author | SHA1 | Date | |
|---|---|---|---|
| 93ccf7aa62 | |||
| 5d783eff4a | |||
| 69aadb4e8b | |||
| 4e996c4ed9 | |||
| 4f4d2a4c01 | |||
| 2e5382bd5b | |||
| 77d56f701f | |||
| b4435552d0 | |||
| 5441990ecb | |||
| fa0b7f2334 | |||
| c234123560 | |||
| 1f47d52719 | |||
| 2750c85b87 | |||
| c776c1c4a1 | |||
| acf3df2222 |
@@ -72,16 +72,14 @@ class Setup(Handler):
|
|||||||
|
|
||||||
application = Application(repository_id, configuration, report=report)
|
application = Application(repository_id, configuration, report=report)
|
||||||
|
|
||||||
# basically we create configuration here as root, but it is ok, because those files are only used for reading
|
|
||||||
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
|
|
||||||
Setup.executable_create(application.repository.paths, repository_id)
|
|
||||||
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
|
|
||||||
Setup.configuration_create_devtools(
|
|
||||||
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
|
|
||||||
Setup.configuration_create_sudo(application.repository.paths, repository_id)
|
|
||||||
|
|
||||||
# finish initialization
|
|
||||||
with application.repository.paths.preserve_owner():
|
with application.repository.paths.preserve_owner():
|
||||||
|
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
|
||||||
|
Setup.executable_create(application.repository.paths, repository_id)
|
||||||
|
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
|
||||||
|
Setup.configuration_create_devtools(
|
||||||
|
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
|
||||||
|
Setup.configuration_create_sudo(application.repository.paths, repository_id)
|
||||||
|
|
||||||
application.repository.repo.init()
|
application.repository.repo.init()
|
||||||
# lazy database sync
|
# lazy database sync
|
||||||
application.repository.pacman.handle # pylint: disable=pointless-statement
|
application.repository.pacman.handle # pylint: disable=pointless-statement
|
||||||
|
|||||||
@@ -158,7 +158,7 @@ class Lock(LazyLogging):
|
|||||||
"""
|
"""
|
||||||
check if current user is actually owner of ahriman root
|
check if current user is actually owner of ahriman root
|
||||||
"""
|
"""
|
||||||
check_user(self.paths.root, unsafe=self.unsafe)
|
check_user(self.paths, unsafe=self.unsafe)
|
||||||
self.paths.tree_create()
|
self.paths.tree_create()
|
||||||
|
|
||||||
def check_version(self) -> None:
|
def check_version(self) -> None:
|
||||||
|
|||||||
@@ -130,7 +130,7 @@ class Pacman(LazyLogging):
|
|||||||
return # database for some reason deos not exist
|
return # database for some reason deos not exist
|
||||||
|
|
||||||
self.logger.info("copy pacman database %s from operating system root to ahriman's home %s", src, dst)
|
self.logger.info("copy pacman database %s from operating system root to ahriman's home %s", src, dst)
|
||||||
with self.repository_paths.preserve_owner():
|
with self.repository_paths.preserve_owner(dst.parent):
|
||||||
shutil.copy(src, dst)
|
shutil.copy(src, dst)
|
||||||
|
|
||||||
def database_init(self, handle: Handle, repository: str, architecture: str) -> DB:
|
def database_init(self, handle: Handle, repository: str, architecture: str) -> DB:
|
||||||
|
|||||||
@@ -125,7 +125,7 @@ class ArchiveTree(LazyLogging):
|
|||||||
if root.exists():
|
if root.exists():
|
||||||
return
|
return
|
||||||
|
|
||||||
with self.paths.preserve_owner():
|
with self.paths.preserve_owner(self.paths.archive):
|
||||||
root.mkdir(0o755, parents=True)
|
root.mkdir(0o755, parents=True)
|
||||||
# init empty repository here
|
# init empty repository here
|
||||||
Repo(self.repository_id.name, self.paths, self.sign_args, root).init()
|
Repo(self.repository_id.name, self.paths, self.sign_args, root).init()
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ def migrate_data(connection: Connection, configuration: Configuration) -> None:
|
|||||||
|
|
||||||
# create archive directory if required
|
# create archive directory if required
|
||||||
if not paths.archive.is_dir():
|
if not paths.archive.is_dir():
|
||||||
with paths.preserve_owner():
|
with paths.preserve_owner(paths.archive):
|
||||||
paths.archive.mkdir(mode=0o755, parents=True)
|
paths.archive.mkdir(mode=0o755, parents=True)
|
||||||
|
|
||||||
move_packages(paths, pacman)
|
move_packages(paths, pacman)
|
||||||
|
|||||||
@@ -20,6 +20,7 @@
|
|||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
|
from pathlib import Path
|
||||||
from typing import Any, Self
|
from typing import Any, Self
|
||||||
|
|
||||||
from ahriman.models.repository_id import RepositoryId
|
from ahriman.models.repository_id import RepositoryId
|
||||||
@@ -228,6 +229,20 @@ class PkgbuildParserError(ValueError):
|
|||||||
ValueError.__init__(self, message)
|
ValueError.__init__(self, message)
|
||||||
|
|
||||||
|
|
||||||
|
class PathError(ValueError):
|
||||||
|
"""
|
||||||
|
exception which will be raised on path which is not belong to root directory
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, path: Path, root: Path) -> None:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
path(Path): path which raised an exception
|
||||||
|
root(Path): repository root (i.e. ahriman home)
|
||||||
|
"""
|
||||||
|
ValueError.__init__(self, f"Path `{path}` does not belong to repository root `{root}`")
|
||||||
|
|
||||||
|
|
||||||
class PasswordError(ValueError):
|
class PasswordError(ValueError):
|
||||||
"""
|
"""
|
||||||
exception which will be raised in case of password related errors
|
exception which will be raised in case of password related errors
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ from pwd import getpwuid
|
|||||||
from typing import Any, IO, TypeVar
|
from typing import Any, IO, TypeVar
|
||||||
|
|
||||||
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
|
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
|
||||||
|
from ahriman.models.repository_paths import RepositoryPaths
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
@@ -51,7 +52,6 @@ __all__ = [
|
|||||||
"filter_json",
|
"filter_json",
|
||||||
"full_version",
|
"full_version",
|
||||||
"minmax",
|
"minmax",
|
||||||
"owner",
|
|
||||||
"package_like",
|
"package_like",
|
||||||
"parse_version",
|
"parse_version",
|
||||||
"partition",
|
"partition",
|
||||||
@@ -61,7 +61,6 @@ __all__ = [
|
|||||||
"safe_filename",
|
"safe_filename",
|
||||||
"srcinfo_property",
|
"srcinfo_property",
|
||||||
"srcinfo_property_list",
|
"srcinfo_property_list",
|
||||||
"symlink_relative",
|
|
||||||
"trim_package",
|
"trim_package",
|
||||||
"utcnow",
|
"utcnow",
|
||||||
"walk",
|
"walk",
|
||||||
@@ -195,13 +194,12 @@ def check_output(*args: str, exception: Exception | Callable[[int, list[str], st
|
|||||||
return stdout
|
return stdout
|
||||||
|
|
||||||
|
|
||||||
def check_user(root: Path, *, unsafe: bool) -> None:
|
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
|
||||||
"""
|
"""
|
||||||
check if current user is the owner of the root
|
check if current user is the owner of the root
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
root(Path): path to root directory (e.g. repository root
|
paths(RepositoryPaths): repository paths object
|
||||||
:attr:`ahriman.models.repository_paths.RepositoryPaths.root`)
|
|
||||||
unsafe(bool): if set no user check will be performed before path creation
|
unsafe(bool): if set no user check will be performed before path creation
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
@@ -210,16 +208,14 @@ def check_user(root: Path, *, unsafe: bool) -> None:
|
|||||||
Examples:
|
Examples:
|
||||||
Simply run function with arguments::
|
Simply run function with arguments::
|
||||||
|
|
||||||
>>> check_user(root, unsafe=False)
|
>>> check_user(paths, unsafe=False)
|
||||||
"""
|
"""
|
||||||
if not root.exists():
|
if not paths.root.exists():
|
||||||
return # no directory found, skip check
|
return # no directory found, skip check
|
||||||
if unsafe:
|
if unsafe:
|
||||||
return # unsafe flag is enabled, no check performed
|
return # unsafe flag is enabled, no check performed
|
||||||
|
current_uid = os.getuid()
|
||||||
current_uid = os.geteuid()
|
root_uid, _ = paths.root_owner
|
||||||
root_uid, _ = owner(root)
|
|
||||||
|
|
||||||
if current_uid != root_uid:
|
if current_uid != root_uid:
|
||||||
raise UnsafeRunError(current_uid, root_uid)
|
raise UnsafeRunError(current_uid, root_uid)
|
||||||
|
|
||||||
@@ -338,20 +334,6 @@ def minmax(source: Iterable[T], *, key: Callable[[T], Any] | None = None) -> tup
|
|||||||
return min(first_iter, key=key), max(second_iter, key=key) # type: ignore
|
return min(first_iter, key=key), max(second_iter, key=key) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
def owner(path: Path) -> tuple[int, int]:
|
|
||||||
"""
|
|
||||||
retrieve owner information by path
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path(Path): path for which extract ids
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
tuple[int, int]: owner user and group ids of the directory
|
|
||||||
"""
|
|
||||||
stat = path.stat()
|
|
||||||
return stat.st_uid, stat.st_gid
|
|
||||||
|
|
||||||
|
|
||||||
def package_like(filename: Path) -> bool:
|
def package_like(filename: Path) -> bool:
|
||||||
"""
|
"""
|
||||||
check if file looks like package
|
check if file looks like package
|
||||||
|
|||||||
@@ -27,8 +27,8 @@ from functools import cached_property
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pwd import getpwuid
|
from pwd import getpwuid
|
||||||
|
|
||||||
|
from ahriman.core.exceptions import PathError
|
||||||
from ahriman.core.log import LazyLogging
|
from ahriman.core.log import LazyLogging
|
||||||
from ahriman.core.utils import owner
|
|
||||||
from ahriman.models.repository_id import RepositoryId
|
from ahriman.models.repository_id import RepositoryId
|
||||||
|
|
||||||
|
|
||||||
@@ -103,7 +103,7 @@ class RepositoryPaths(LazyLogging):
|
|||||||
Returns:
|
Returns:
|
||||||
Path: path to directory in which build process is run
|
Path: path to directory in which build process is run
|
||||||
"""
|
"""
|
||||||
uid, _ = owner(self.root)
|
uid, _ = self.owner(self.root)
|
||||||
return self.chroot / f"{self.repository_id.name}-{self.repository_id.architecture}" / getpwuid(uid).pw_name
|
return self.chroot / f"{self.repository_id.name}-{self.repository_id.architecture}" / getpwuid(uid).pw_name
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -165,7 +165,7 @@ class RepositoryPaths(LazyLogging):
|
|||||||
Returns:
|
Returns:
|
||||||
tuple[int, int]: owner user and group of the root directory
|
tuple[int, int]: owner user and group of the root directory
|
||||||
"""
|
"""
|
||||||
return owner(self.root)
|
return self.owner(self.root)
|
||||||
|
|
||||||
# pylint: disable=protected-access
|
# pylint: disable=protected-access
|
||||||
@classmethod
|
@classmethod
|
||||||
@@ -218,6 +218,47 @@ class RepositoryPaths(LazyLogging):
|
|||||||
|
|
||||||
return set(walk(instance))
|
return set(walk(instance))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def owner(path: Path) -> tuple[int, int]:
|
||||||
|
"""
|
||||||
|
retrieve owner information by path
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path(Path): path for which extract ids
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[int, int]: owner user and group ids of the directory
|
||||||
|
"""
|
||||||
|
stat = path.stat()
|
||||||
|
return stat.st_uid, stat.st_gid
|
||||||
|
|
||||||
|
def _chown(self, path: Path) -> None:
|
||||||
|
"""
|
||||||
|
set owner of path recursively (from root) to root owner
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
More likely you don't want to call this method explicitly, consider using :func:`preserve_owner()`
|
||||||
|
as context manager instead
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path(Path): path to be chown
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PathError: if path does not belong to root
|
||||||
|
"""
|
||||||
|
def set_owner(current: Path) -> None:
|
||||||
|
uid, gid = self.owner(current)
|
||||||
|
if uid == root_uid and gid == root_gid:
|
||||||
|
return
|
||||||
|
os.chown(current, root_uid, root_gid, follow_symlinks=False)
|
||||||
|
|
||||||
|
if self.root not in path.parents:
|
||||||
|
raise PathError(path, self.root)
|
||||||
|
root_uid, root_gid = self.root_owner
|
||||||
|
while path != self.root:
|
||||||
|
set_owner(path)
|
||||||
|
path = path.parent
|
||||||
|
|
||||||
def archive_for(self, package_base: str) -> Path:
|
def archive_for(self, package_base: str) -> Path:
|
||||||
"""
|
"""
|
||||||
get path to archive specified search criteria
|
get path to archive specified search criteria
|
||||||
@@ -230,7 +271,7 @@ class RepositoryPaths(LazyLogging):
|
|||||||
"""
|
"""
|
||||||
directory = self.archive / "packages" / package_base[0] / package_base
|
directory = self.archive / "packages" / package_base[0] / package_base
|
||||||
if not directory.is_dir(): # create if not exists
|
if not directory.is_dir(): # create if not exists
|
||||||
with self.preserve_owner():
|
with self.preserve_owner(self.archive):
|
||||||
directory.mkdir(mode=0o755, parents=True)
|
directory.mkdir(mode=0o755, parents=True)
|
||||||
|
|
||||||
return directory
|
return directory
|
||||||
@@ -248,10 +289,13 @@ class RepositoryPaths(LazyLogging):
|
|||||||
return self.cache / package_base
|
return self.cache / package_base
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def preserve_owner(self) -> Iterator[None]:
|
def preserve_owner(self, path: Path | None = None) -> Iterator[None]:
|
||||||
"""
|
"""
|
||||||
perform any action preserving owner for any newly created file or directory
|
perform any action preserving owner for any newly created file or directory
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path(Path | None, optional): use this path as root instead of repository root (Default value = None)
|
||||||
|
|
||||||
Examples:
|
Examples:
|
||||||
This method is designed to use as context manager when you are going to perform operations which might
|
This method is designed to use as context manager when you are going to perform operations which might
|
||||||
change filesystem, especially if you are doing it under unsafe flag, e.g.::
|
change filesystem, especially if you are doing it under unsafe flag, e.g.::
|
||||||
@@ -262,26 +306,29 @@ class RepositoryPaths(LazyLogging):
|
|||||||
Note, however, that this method doesn't handle any exceptions and will eventually interrupt
|
Note, however, that this method doesn't handle any exceptions and will eventually interrupt
|
||||||
if there will be any.
|
if there will be any.
|
||||||
"""
|
"""
|
||||||
# guard non-root
|
path = path or self.root
|
||||||
# the reason we do this is that it only works if permissions can be actually changed. Hence,
|
|
||||||
# non-privileged user (e.g. personal user or ahriman user) can't change permissions.
|
|
||||||
# The only one who can do so is root, so if user is not root we just terminate function
|
|
||||||
current_uid, current_gid = os.geteuid(), os.getegid()
|
|
||||||
if current_uid != 0:
|
|
||||||
yield
|
|
||||||
return
|
|
||||||
|
|
||||||
# set uid and gid to root owner
|
def walk(root: Path) -> Iterator[Path]:
|
||||||
target_uid, target_gid = self.root_owner
|
yield root
|
||||||
os.setegid(target_gid)
|
if not root.exists():
|
||||||
os.seteuid(target_uid)
|
return
|
||||||
|
|
||||||
try:
|
# basically walk, but skipping some content
|
||||||
yield
|
for child in root.iterdir():
|
||||||
finally:
|
yield child
|
||||||
# reset uid and gid
|
if child in (self.chroot.parent,):
|
||||||
os.seteuid(current_uid)
|
yield from child.iterdir() # we only yield top-level in chroot directory
|
||||||
os.setegid(current_gid)
|
elif child.is_dir():
|
||||||
|
yield from walk(child)
|
||||||
|
|
||||||
|
# get current filesystem and run action
|
||||||
|
previous_snapshot = set(walk(path))
|
||||||
|
yield
|
||||||
|
|
||||||
|
# get newly created files and directories and chown them
|
||||||
|
new_entries = set(walk(path)).difference(previous_snapshot)
|
||||||
|
for entry in new_entries:
|
||||||
|
self._chown(entry)
|
||||||
|
|
||||||
def tree_clear(self, package_base: str) -> None:
|
def tree_clear(self, package_base: str) -> None:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -142,7 +142,7 @@ def test_check_user(lock: Lock, mocker: MockerFixture) -> None:
|
|||||||
tree_create = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.tree_create")
|
tree_create = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.tree_create")
|
||||||
|
|
||||||
lock.check_user()
|
lock.check_user()
|
||||||
check_user_patch.assert_called_once_with(lock.paths.root, unsafe=False)
|
check_user_patch.assert_called_once_with(lock.paths, unsafe=False)
|
||||||
tree_create.assert_called_once_with()
|
tree_create.assert_called_once_with()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ def test_database_copy(pacman: Pacman, mocker: MockerFixture) -> None:
|
|||||||
pacman.database_copy(pacman.handle, database, path, use_ahriman_cache=True)
|
pacman.database_copy(pacman.handle, database, path, use_ahriman_cache=True)
|
||||||
mkdir_mock.assert_called_once_with(mode=0o755, exist_ok=True)
|
mkdir_mock.assert_called_once_with(mode=0o755, exist_ok=True)
|
||||||
copy_mock.assert_called_once_with(path / "sync" / "core.db", dst_path)
|
copy_mock.assert_called_once_with(path / "sync" / "core.db", dst_path)
|
||||||
owner_guard_mock.assert_called_once_with()
|
owner_guard_mock.assert_called_once_with(dst_path.parent)
|
||||||
|
|
||||||
|
|
||||||
def test_database_copy_skip(pacman: Pacman, mocker: MockerFixture) -> None:
|
def test_database_copy_skip(pacman: Pacman, mocker: MockerFixture) -> None:
|
||||||
|
|||||||
@@ -117,7 +117,7 @@ def test_tree_create(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
|
|||||||
init_mock = mocker.patch("ahriman.core.alpm.repo.Repo.init")
|
init_mock = mocker.patch("ahriman.core.alpm.repo.Repo.init")
|
||||||
|
|
||||||
archive_tree.tree_create()
|
archive_tree.tree_create()
|
||||||
owner_guard_mock.assert_called_once_with()
|
owner_guard_mock.assert_called_once_with(archive_tree.paths.archive)
|
||||||
mkdir_mock.assert_called_once_with(0o755, parents=True)
|
mkdir_mock.assert_called_once_with(0o755, parents=True)
|
||||||
init_mock.assert_called_once_with()
|
init_mock.assert_called_once_with()
|
||||||
|
|
||||||
|
|||||||
@@ -7,10 +7,34 @@ import pytest
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pytest_mock import MockerFixture
|
from pytest_mock import MockerFixture
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from unittest.mock import MagicMock, call as MockCall
|
from unittest.mock import call as MockCall
|
||||||
|
|
||||||
from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError
|
from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError
|
||||||
from ahriman.core.utils import *
|
from ahriman.core.utils import (
|
||||||
|
atomic_move,
|
||||||
|
check_output,
|
||||||
|
check_user,
|
||||||
|
dataclass_view,
|
||||||
|
enum_values,
|
||||||
|
extract_user,
|
||||||
|
filelock,
|
||||||
|
filter_json,
|
||||||
|
full_version,
|
||||||
|
minmax,
|
||||||
|
package_like,
|
||||||
|
parse_version,
|
||||||
|
partition,
|
||||||
|
pretty_datetime,
|
||||||
|
pretty_interval,
|
||||||
|
pretty_size,
|
||||||
|
safe_filename,
|
||||||
|
srcinfo_property,
|
||||||
|
srcinfo_property_list,
|
||||||
|
symlink_relative,
|
||||||
|
trim_package,
|
||||||
|
utcnow,
|
||||||
|
walk,
|
||||||
|
)
|
||||||
from ahriman.models.package import Package
|
from ahriman.models.package import Package
|
||||||
from ahriman.models.package_source import PackageSource
|
from ahriman.models.package_source import PackageSource
|
||||||
from ahriman.models.repository_id import RepositoryId
|
from ahriman.models.repository_id import RepositoryId
|
||||||
@@ -173,8 +197,8 @@ def test_check_user(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
|||||||
must check user correctly
|
must check user correctly
|
||||||
"""
|
"""
|
||||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||||
mocker.patch("os.geteuid", return_value=paths.root_owner[0])
|
mocker.patch("os.getuid", return_value=paths.root_owner[0])
|
||||||
check_user(paths.root, unsafe=False)
|
check_user(paths, unsafe=False)
|
||||||
|
|
||||||
|
|
||||||
def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||||
@@ -182,7 +206,7 @@ def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: Mock
|
|||||||
must not fail in case if no directory found
|
must not fail in case if no directory found
|
||||||
"""
|
"""
|
||||||
mocker.patch("pathlib.Path.exists", return_value=False)
|
mocker.patch("pathlib.Path.exists", return_value=False)
|
||||||
check_user(repository_paths.root, unsafe=False)
|
check_user(repository_paths, unsafe=False)
|
||||||
|
|
||||||
|
|
||||||
def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||||
@@ -190,10 +214,10 @@ def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture
|
|||||||
must raise exception if user differs
|
must raise exception if user differs
|
||||||
"""
|
"""
|
||||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||||
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1)
|
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
|
||||||
|
|
||||||
with pytest.raises(UnsafeRunError):
|
with pytest.raises(UnsafeRunError):
|
||||||
check_user(paths.root, unsafe=False)
|
check_user(paths, unsafe=False)
|
||||||
|
|
||||||
|
|
||||||
def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||||
@@ -201,8 +225,8 @@ def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -
|
|||||||
must skip check if unsafe flag is set
|
must skip check if unsafe flag is set
|
||||||
"""
|
"""
|
||||||
paths = RepositoryPaths(Path.cwd(), repository_id)
|
paths = RepositoryPaths(Path.cwd(), repository_id)
|
||||||
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1)
|
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
|
||||||
check_user(paths.root, unsafe=True)
|
check_user(paths, unsafe=True)
|
||||||
|
|
||||||
|
|
||||||
def test_dataclass_view(package_ahriman: Package) -> None:
|
def test_dataclass_view(package_ahriman: Package) -> None:
|
||||||
@@ -333,18 +357,6 @@ def test_minmax() -> None:
|
|||||||
assert minmax([[1, 2, 3], [4, 5], [6, 7, 8, 9]], key=len) == ([4, 5], [6, 7, 8, 9])
|
assert minmax([[1, 2, 3], [4, 5], [6, 7, 8, 9]], key=len) == ([4, 5], [6, 7, 8, 9])
|
||||||
|
|
||||||
|
|
||||||
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
|
||||||
"""
|
|
||||||
must correctly retrieve owner of the path
|
|
||||||
"""
|
|
||||||
stat_mock = MagicMock()
|
|
||||||
stat_mock.st_uid = 42
|
|
||||||
stat_mock.st_gid = 142
|
|
||||||
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
|
|
||||||
|
|
||||||
assert owner(repository_paths.root) == (42, 142)
|
|
||||||
|
|
||||||
|
|
||||||
def test_package_like(package_ahriman: Package) -> None:
|
def test_package_like(package_ahriman: Package) -> None:
|
||||||
"""
|
"""
|
||||||
package_like must return true for archives
|
package_like must return true for archives
|
||||||
|
|||||||
@@ -3,8 +3,9 @@ import pytest
|
|||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pytest_mock import MockerFixture
|
from pytest_mock import MockerFixture
|
||||||
from unittest.mock import call as MockCall
|
from unittest.mock import MagicMock, call as MockCall
|
||||||
|
|
||||||
|
from ahriman.core.exceptions import PathError
|
||||||
from ahriman.models.package import Package
|
from ahriman.models.package import Package
|
||||||
from ahriman.models.repository_id import RepositoryId
|
from ahriman.models.repository_id import RepositoryId
|
||||||
from ahriman.models.repository_paths import RepositoryPaths
|
from ahriman.models.repository_paths import RepositoryPaths
|
||||||
@@ -54,7 +55,7 @@ def test_root_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) ->
|
|||||||
"""
|
"""
|
||||||
must correctly define root directory owner
|
must correctly define root directory owner
|
||||||
"""
|
"""
|
||||||
mocker.patch("ahriman.models.repository_paths.owner", return_value=(42, 142))
|
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.owner", return_value=(42, 142))
|
||||||
assert repository_paths.root_owner == (42, 142)
|
assert repository_paths.root_owner == (42, 142)
|
||||||
|
|
||||||
|
|
||||||
@@ -185,6 +186,68 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc
|
|||||||
iterdir_mock.assert_not_called()
|
iterdir_mock.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must correctly retrieve owner of the path
|
||||||
|
"""
|
||||||
|
stat_mock = MagicMock()
|
||||||
|
stat_mock.st_uid = 42
|
||||||
|
stat_mock.st_gid = 142
|
||||||
|
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
|
||||||
|
|
||||||
|
assert RepositoryPaths.owner(repository_paths.root) == (42, 142)
|
||||||
|
|
||||||
|
|
||||||
|
def test_chown(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must correctly set owner for the directory
|
||||||
|
"""
|
||||||
|
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
|
||||||
|
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||||
|
chown_mock = mocker.patch("os.chown")
|
||||||
|
|
||||||
|
path = repository_paths.root / "path"
|
||||||
|
repository_paths._chown(path)
|
||||||
|
chown_mock.assert_called_once_with(path, 42, 42, follow_symlinks=False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_chown_parent(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must correctly set owner for the directory including parents
|
||||||
|
"""
|
||||||
|
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
|
||||||
|
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||||
|
chown_mock = mocker.patch("os.chown")
|
||||||
|
|
||||||
|
path = repository_paths.root / "parent" / "path"
|
||||||
|
repository_paths._chown(path)
|
||||||
|
chown_mock.assert_has_calls([
|
||||||
|
MockCall(path, 42, 42, follow_symlinks=False),
|
||||||
|
MockCall(path.parent, 42, 42, follow_symlinks=False)
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def test_chown_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must skip ownership set in case if it is same as root
|
||||||
|
"""
|
||||||
|
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=True))
|
||||||
|
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
|
||||||
|
chown_mock = mocker.patch("os.chown")
|
||||||
|
|
||||||
|
path = repository_paths.root / "path"
|
||||||
|
repository_paths._chown(path)
|
||||||
|
chown_mock.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_chown_invalid_path(repository_paths: RepositoryPaths) -> None:
|
||||||
|
"""
|
||||||
|
must raise invalid path exception in case if directory outside the root supplied
|
||||||
|
"""
|
||||||
|
with pytest.raises(PathError):
|
||||||
|
repository_paths._chown(repository_paths.root.parent)
|
||||||
|
|
||||||
|
|
||||||
def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||||
"""
|
"""
|
||||||
must correctly define archive path
|
must correctly define archive path
|
||||||
@@ -203,7 +266,7 @@ def test_archive_for_create_tree(repository_paths: RepositoryPaths, package_ahri
|
|||||||
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
|
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
|
||||||
|
|
||||||
repository_paths.archive_for(package_ahriman.base)
|
repository_paths.archive_for(package_ahriman.base)
|
||||||
owner_mock.assert_called_once_with()
|
owner_mock.assert_called_once_with(repository_paths.archive)
|
||||||
mkdir_mock.assert_called_once_with(mode=0o755, parents=True)
|
mkdir_mock.assert_called_once_with(mode=0o755, parents=True)
|
||||||
|
|
||||||
|
|
||||||
@@ -220,49 +283,42 @@ def test_preserve_owner(tmp_path: Path, repository_id: RepositoryId, mocker: Moc
|
|||||||
"""
|
"""
|
||||||
must preserve file owner during operations
|
must preserve file owner during operations
|
||||||
"""
|
"""
|
||||||
mocker.patch("os.geteuid", return_value=0)
|
|
||||||
mocker.patch("os.getegid", return_value=0)
|
|
||||||
seteuid_mock = mocker.patch("os.seteuid")
|
|
||||||
setegid_mock = mocker.patch("os.setegid")
|
|
||||||
|
|
||||||
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
||||||
target_uid, target_gid = repository_paths.root_owner
|
|
||||||
repository_paths.tree_create()
|
repository_paths.tree_create()
|
||||||
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
|
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
|
||||||
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
|
|
||||||
|
with repository_paths.preserve_owner():
|
||||||
|
(repository_paths.root / "created1").touch()
|
||||||
|
(repository_paths.chroot / "created2").touch()
|
||||||
|
chown_mock.assert_has_calls([MockCall(repository_paths.root / "created1")])
|
||||||
|
|
||||||
|
|
||||||
def test_preserve_owner_exception(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
def test_preserve_owner_specific(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||||
"""
|
"""
|
||||||
must return to original uid and gid even during exception
|
must preserve file owner during operations only in specific directory
|
||||||
"""
|
"""
|
||||||
mocker.patch("os.geteuid", return_value=0)
|
|
||||||
mocker.patch("os.getegid", return_value=0)
|
|
||||||
mocker.patch("pathlib.Path.mkdir", side_effect=Exception)
|
|
||||||
seteuid_mock = mocker.patch("os.seteuid")
|
|
||||||
setegid_mock = mocker.patch("os.setegid")
|
|
||||||
|
|
||||||
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
||||||
target_uid, target_gid = repository_paths.root_owner
|
|
||||||
with pytest.raises(Exception):
|
|
||||||
repository_paths.tree_create()
|
|
||||||
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
|
|
||||||
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
|
|
||||||
|
|
||||||
|
|
||||||
def test_preserve_owner_non_root(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
|
||||||
"""
|
|
||||||
must skip processing if user is not root
|
|
||||||
"""
|
|
||||||
mocker.patch("os.geteuid", return_value=42)
|
|
||||||
mocker.patch("os.getegid", return_value=42)
|
|
||||||
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
|
||||||
seteuid_mock = mocker.patch("os.seteuid")
|
|
||||||
setegid_mock = mocker.patch("os.setegid")
|
|
||||||
|
|
||||||
repository_paths.tree_create()
|
repository_paths.tree_create()
|
||||||
seteuid_mock.assert_not_called()
|
(repository_paths.root / "content").mkdir()
|
||||||
setegid_mock.assert_not_called()
|
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
|
||||||
|
|
||||||
|
with repository_paths.preserve_owner(repository_paths.root / "content"):
|
||||||
|
(repository_paths.root / "created1").touch()
|
||||||
|
(repository_paths.root / "content" / "created2").touch()
|
||||||
|
(repository_paths.chroot / "created3").touch()
|
||||||
|
chown_mock.assert_has_calls([MockCall(repository_paths.root / "content" / "created2")])
|
||||||
|
|
||||||
|
|
||||||
|
def test_preserve_owner_no_directory(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must skip directory scan if it does not exist
|
||||||
|
"""
|
||||||
|
repository_paths = RepositoryPaths(tmp_path, repository_id)
|
||||||
|
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
|
||||||
|
|
||||||
|
with repository_paths.preserve_owner(Path("empty")):
|
||||||
|
(repository_paths.root / "created1").touch()
|
||||||
|
chown_mock.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
def test_tree_clear(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
def test_tree_clear(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user