Compare commits

..

15 Commits

13 changed files with 233 additions and 123 deletions

View File

@@ -72,16 +72,14 @@ class Setup(Handler):
application = Application(repository_id, configuration, report=report)
# basically we create configuration here as root, but it is ok, because those files are only used for reading
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
Setup.executable_create(application.repository.paths, repository_id)
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
Setup.configuration_create_devtools(
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
Setup.configuration_create_sudo(application.repository.paths, repository_id)
# finish initialization
with application.repository.paths.preserve_owner():
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
Setup.executable_create(application.repository.paths, repository_id)
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
Setup.configuration_create_devtools(
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
Setup.configuration_create_sudo(application.repository.paths, repository_id)
application.repository.repo.init()
# lazy database sync
application.repository.pacman.handle # pylint: disable=pointless-statement

View File

@@ -158,7 +158,7 @@ class Lock(LazyLogging):
"""
check if current user is actually owner of ahriman root
"""
check_user(self.paths.root, unsafe=self.unsafe)
check_user(self.paths, unsafe=self.unsafe)
self.paths.tree_create()
def check_version(self) -> None:

View File

@@ -130,7 +130,7 @@ class Pacman(LazyLogging):
return # database for some reason deos not exist
self.logger.info("copy pacman database %s from operating system root to ahriman's home %s", src, dst)
with self.repository_paths.preserve_owner():
with self.repository_paths.preserve_owner(dst.parent):
shutil.copy(src, dst)
def database_init(self, handle: Handle, repository: str, architecture: str) -> DB:

View File

@@ -125,7 +125,7 @@ class ArchiveTree(LazyLogging):
if root.exists():
return
with self.paths.preserve_owner():
with self.paths.preserve_owner(self.paths.archive):
root.mkdir(0o755, parents=True)
# init empty repository here
Repo(self.repository_id.name, self.paths, self.sign_args, root).init()

View File

@@ -52,7 +52,7 @@ def migrate_data(connection: Connection, configuration: Configuration) -> None:
# create archive directory if required
if not paths.archive.is_dir():
with paths.preserve_owner():
with paths.preserve_owner(paths.archive):
paths.archive.mkdir(mode=0o755, parents=True)
move_packages(paths, pacman)

View File

@@ -20,6 +20,7 @@
import subprocess
from collections.abc import Callable
from pathlib import Path
from typing import Any, Self
from ahriman.models.repository_id import RepositoryId
@@ -228,6 +229,20 @@ class PkgbuildParserError(ValueError):
ValueError.__init__(self, message)
class PathError(ValueError):
"""
exception which will be raised on path which is not belong to root directory
"""
def __init__(self, path: Path, root: Path) -> None:
"""
Args:
path(Path): path which raised an exception
root(Path): repository root (i.e. ahriman home)
"""
ValueError.__init__(self, f"Path `{path}` does not belong to repository root `{root}`")
class PasswordError(ValueError):
"""
exception which will be raised in case of password related errors

View File

@@ -38,6 +38,7 @@ from pwd import getpwuid
from typing import Any, IO, TypeVar
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
from ahriman.models.repository_paths import RepositoryPaths
__all__ = [
@@ -51,7 +52,6 @@ __all__ = [
"filter_json",
"full_version",
"minmax",
"owner",
"package_like",
"parse_version",
"partition",
@@ -61,7 +61,6 @@ __all__ = [
"safe_filename",
"srcinfo_property",
"srcinfo_property_list",
"symlink_relative",
"trim_package",
"utcnow",
"walk",
@@ -195,13 +194,12 @@ def check_output(*args: str, exception: Exception | Callable[[int, list[str], st
return stdout
def check_user(root: Path, *, unsafe: bool) -> None:
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
"""
check if current user is the owner of the root
Args:
root(Path): path to root directory (e.g. repository root
:attr:`ahriman.models.repository_paths.RepositoryPaths.root`)
paths(RepositoryPaths): repository paths object
unsafe(bool): if set no user check will be performed before path creation
Raises:
@@ -210,16 +208,14 @@ def check_user(root: Path, *, unsafe: bool) -> None:
Examples:
Simply run function with arguments::
>>> check_user(root, unsafe=False)
>>> check_user(paths, unsafe=False)
"""
if not root.exists():
if not paths.root.exists():
return # no directory found, skip check
if unsafe:
return # unsafe flag is enabled, no check performed
current_uid = os.geteuid()
root_uid, _ = owner(root)
current_uid = os.getuid()
root_uid, _ = paths.root_owner
if current_uid != root_uid:
raise UnsafeRunError(current_uid, root_uid)
@@ -338,20 +334,6 @@ def minmax(source: Iterable[T], *, key: Callable[[T], Any] | None = None) -> tup
return min(first_iter, key=key), max(second_iter, key=key) # type: ignore
def owner(path: Path) -> tuple[int, int]:
"""
retrieve owner information by path
Args:
path(Path): path for which extract ids
Returns:
tuple[int, int]: owner user and group ids of the directory
"""
stat = path.stat()
return stat.st_uid, stat.st_gid
def package_like(filename: Path) -> bool:
"""
check if file looks like package

View File

@@ -27,8 +27,8 @@ from functools import cached_property
from pathlib import Path
from pwd import getpwuid
from ahriman.core.exceptions import PathError
from ahriman.core.log import LazyLogging
from ahriman.core.utils import owner
from ahriman.models.repository_id import RepositoryId
@@ -103,7 +103,7 @@ class RepositoryPaths(LazyLogging):
Returns:
Path: path to directory in which build process is run
"""
uid, _ = owner(self.root)
uid, _ = self.owner(self.root)
return self.chroot / f"{self.repository_id.name}-{self.repository_id.architecture}" / getpwuid(uid).pw_name
@property
@@ -165,7 +165,7 @@ class RepositoryPaths(LazyLogging):
Returns:
tuple[int, int]: owner user and group of the root directory
"""
return owner(self.root)
return self.owner(self.root)
# pylint: disable=protected-access
@classmethod
@@ -218,6 +218,47 @@ class RepositoryPaths(LazyLogging):
return set(walk(instance))
@staticmethod
def owner(path: Path) -> tuple[int, int]:
"""
retrieve owner information by path
Args:
path(Path): path for which extract ids
Returns:
tuple[int, int]: owner user and group ids of the directory
"""
stat = path.stat()
return stat.st_uid, stat.st_gid
def _chown(self, path: Path) -> None:
"""
set owner of path recursively (from root) to root owner
Notes:
More likely you don't want to call this method explicitly, consider using :func:`preserve_owner()`
as context manager instead
Args:
path(Path): path to be chown
Raises:
PathError: if path does not belong to root
"""
def set_owner(current: Path) -> None:
uid, gid = self.owner(current)
if uid == root_uid and gid == root_gid:
return
os.chown(current, root_uid, root_gid, follow_symlinks=False)
if self.root not in path.parents:
raise PathError(path, self.root)
root_uid, root_gid = self.root_owner
while path != self.root:
set_owner(path)
path = path.parent
def archive_for(self, package_base: str) -> Path:
"""
get path to archive specified search criteria
@@ -230,7 +271,7 @@ class RepositoryPaths(LazyLogging):
"""
directory = self.archive / "packages" / package_base[0] / package_base
if not directory.is_dir(): # create if not exists
with self.preserve_owner():
with self.preserve_owner(self.archive):
directory.mkdir(mode=0o755, parents=True)
return directory
@@ -248,10 +289,13 @@ class RepositoryPaths(LazyLogging):
return self.cache / package_base
@contextlib.contextmanager
def preserve_owner(self) -> Iterator[None]:
def preserve_owner(self, path: Path | None = None) -> Iterator[None]:
"""
perform any action preserving owner for any newly created file or directory
Args:
path(Path | None, optional): use this path as root instead of repository root (Default value = None)
Examples:
This method is designed to use as context manager when you are going to perform operations which might
change filesystem, especially if you are doing it under unsafe flag, e.g.::
@@ -262,26 +306,29 @@ class RepositoryPaths(LazyLogging):
Note, however, that this method doesn't handle any exceptions and will eventually interrupt
if there will be any.
"""
# guard non-root
# the reason we do this is that it only works if permissions can be actually changed. Hence,
# non-privileged user (e.g. personal user or ahriman user) can't change permissions.
# The only one who can do so is root, so if user is not root we just terminate function
current_uid, current_gid = os.geteuid(), os.getegid()
if current_uid != 0:
yield
return
path = path or self.root
# set uid and gid to root owner
target_uid, target_gid = self.root_owner
os.setegid(target_gid)
os.seteuid(target_uid)
def walk(root: Path) -> Iterator[Path]:
yield root
if not root.exists():
return
try:
yield
finally:
# reset uid and gid
os.seteuid(current_uid)
os.setegid(current_gid)
# basically walk, but skipping some content
for child in root.iterdir():
yield child
if child in (self.chroot.parent,):
yield from child.iterdir() # we only yield top-level in chroot directory
elif child.is_dir():
yield from walk(child)
# get current filesystem and run action
previous_snapshot = set(walk(path))
yield
# get newly created files and directories and chown them
new_entries = set(walk(path)).difference(previous_snapshot)
for entry in new_entries:
self._chown(entry)
def tree_clear(self, package_base: str) -> None:
"""

View File

@@ -142,7 +142,7 @@ def test_check_user(lock: Lock, mocker: MockerFixture) -> None:
tree_create = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.tree_create")
lock.check_user()
check_user_patch.assert_called_once_with(lock.paths.root, unsafe=False)
check_user_patch.assert_called_once_with(lock.paths, unsafe=False)
tree_create.assert_called_once_with()

View File

@@ -67,7 +67,7 @@ def test_database_copy(pacman: Pacman, mocker: MockerFixture) -> None:
pacman.database_copy(pacman.handle, database, path, use_ahriman_cache=True)
mkdir_mock.assert_called_once_with(mode=0o755, exist_ok=True)
copy_mock.assert_called_once_with(path / "sync" / "core.db", dst_path)
owner_guard_mock.assert_called_once_with()
owner_guard_mock.assert_called_once_with(dst_path.parent)
def test_database_copy_skip(pacman: Pacman, mocker: MockerFixture) -> None:

View File

@@ -117,7 +117,7 @@ def test_tree_create(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
init_mock = mocker.patch("ahriman.core.alpm.repo.Repo.init")
archive_tree.tree_create()
owner_guard_mock.assert_called_once_with()
owner_guard_mock.assert_called_once_with(archive_tree.paths.archive)
mkdir_mock.assert_called_once_with(0o755, parents=True)
init_mock.assert_called_once_with()

View File

@@ -7,10 +7,34 @@ import pytest
from pathlib import Path
from pytest_mock import MockerFixture
from typing import Any
from unittest.mock import MagicMock, call as MockCall
from unittest.mock import call as MockCall
from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError
from ahriman.core.utils import *
from ahriman.core.utils import (
atomic_move,
check_output,
check_user,
dataclass_view,
enum_values,
extract_user,
filelock,
filter_json,
full_version,
minmax,
package_like,
parse_version,
partition,
pretty_datetime,
pretty_interval,
pretty_size,
safe_filename,
srcinfo_property,
srcinfo_property_list,
symlink_relative,
trim_package,
utcnow,
walk,
)
from ahriman.models.package import Package
from ahriman.models.package_source import PackageSource
from ahriman.models.repository_id import RepositoryId
@@ -173,8 +197,8 @@ def test_check_user(repository_id: RepositoryId, mocker: MockerFixture) -> None:
must check user correctly
"""
paths = RepositoryPaths(Path.cwd(), repository_id)
mocker.patch("os.geteuid", return_value=paths.root_owner[0])
check_user(paths.root, unsafe=False)
mocker.patch("os.getuid", return_value=paths.root_owner[0])
check_user(paths, unsafe=False)
def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
@@ -182,7 +206,7 @@ def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: Mock
must not fail in case if no directory found
"""
mocker.patch("pathlib.Path.exists", return_value=False)
check_user(repository_paths.root, unsafe=False)
check_user(repository_paths, unsafe=False)
def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture) -> None:
@@ -190,10 +214,10 @@ def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture
must raise exception if user differs
"""
paths = RepositoryPaths(Path.cwd(), repository_id)
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1)
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
with pytest.raises(UnsafeRunError):
check_user(paths.root, unsafe=False)
check_user(paths, unsafe=False)
def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -> None:
@@ -201,8 +225,8 @@ def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -
must skip check if unsafe flag is set
"""
paths = RepositoryPaths(Path.cwd(), repository_id)
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1)
check_user(paths.root, unsafe=True)
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
check_user(paths, unsafe=True)
def test_dataclass_view(package_ahriman: Package) -> None:
@@ -333,18 +357,6 @@ def test_minmax() -> None:
assert minmax([[1, 2, 3], [4, 5], [6, 7, 8, 9]], key=len) == ([4, 5], [6, 7, 8, 9])
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly retrieve owner of the path
"""
stat_mock = MagicMock()
stat_mock.st_uid = 42
stat_mock.st_gid = 142
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
assert owner(repository_paths.root) == (42, 142)
def test_package_like(package_ahriman: Package) -> None:
"""
package_like must return true for archives

View File

@@ -3,8 +3,9 @@ import pytest
from collections.abc import Callable
from pathlib import Path
from pytest_mock import MockerFixture
from unittest.mock import call as MockCall
from unittest.mock import MagicMock, call as MockCall
from ahriman.core.exceptions import PathError
from ahriman.models.package import Package
from ahriman.models.repository_id import RepositoryId
from ahriman.models.repository_paths import RepositoryPaths
@@ -54,7 +55,7 @@ def test_root_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) ->
"""
must correctly define root directory owner
"""
mocker.patch("ahriman.models.repository_paths.owner", return_value=(42, 142))
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.owner", return_value=(42, 142))
assert repository_paths.root_owner == (42, 142)
@@ -185,6 +186,68 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc
iterdir_mock.assert_not_called()
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly retrieve owner of the path
"""
stat_mock = MagicMock()
stat_mock.st_uid = 42
stat_mock.st_gid = 142
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
assert RepositoryPaths.owner(repository_paths.root) == (42, 142)
def test_chown(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly set owner for the directory
"""
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "path"
repository_paths._chown(path)
chown_mock.assert_called_once_with(path, 42, 42, follow_symlinks=False)
def test_chown_parent(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly set owner for the directory including parents
"""
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "parent" / "path"
repository_paths._chown(path)
chown_mock.assert_has_calls([
MockCall(path, 42, 42, follow_symlinks=False),
MockCall(path.parent, 42, 42, follow_symlinks=False)
])
def test_chown_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must skip ownership set in case if it is same as root
"""
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=True))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "path"
repository_paths._chown(path)
chown_mock.assert_not_called()
def test_chown_invalid_path(repository_paths: RepositoryPaths) -> None:
"""
must raise invalid path exception in case if directory outside the root supplied
"""
with pytest.raises(PathError):
repository_paths._chown(repository_paths.root.parent)
def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must correctly define archive path
@@ -203,7 +266,7 @@ def test_archive_for_create_tree(repository_paths: RepositoryPaths, package_ahri
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
repository_paths.archive_for(package_ahriman.base)
owner_mock.assert_called_once_with()
owner_mock.assert_called_once_with(repository_paths.archive)
mkdir_mock.assert_called_once_with(mode=0o755, parents=True)
@@ -220,49 +283,42 @@ def test_preserve_owner(tmp_path: Path, repository_id: RepositoryId, mocker: Moc
"""
must preserve file owner during operations
"""
mocker.patch("os.geteuid", return_value=0)
mocker.patch("os.getegid", return_value=0)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths = RepositoryPaths(tmp_path, repository_id)
target_uid, target_gid = repository_paths.root_owner
repository_paths.tree_create()
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
with repository_paths.preserve_owner():
(repository_paths.root / "created1").touch()
(repository_paths.chroot / "created2").touch()
chown_mock.assert_has_calls([MockCall(repository_paths.root / "created1")])
def test_preserve_owner_exception(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
def test_preserve_owner_specific(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
"""
must return to original uid and gid even during exception
must preserve file owner during operations only in specific directory
"""
mocker.patch("os.geteuid", return_value=0)
mocker.patch("os.getegid", return_value=0)
mocker.patch("pathlib.Path.mkdir", side_effect=Exception)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths = RepositoryPaths(tmp_path, repository_id)
target_uid, target_gid = repository_paths.root_owner
with pytest.raises(Exception):
repository_paths.tree_create()
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
def test_preserve_owner_non_root(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
"""
must skip processing if user is not root
"""
mocker.patch("os.geteuid", return_value=42)
mocker.patch("os.getegid", return_value=42)
repository_paths = RepositoryPaths(tmp_path, repository_id)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths.tree_create()
seteuid_mock.assert_not_called()
setegid_mock.assert_not_called()
(repository_paths.root / "content").mkdir()
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
with repository_paths.preserve_owner(repository_paths.root / "content"):
(repository_paths.root / "created1").touch()
(repository_paths.root / "content" / "created2").touch()
(repository_paths.chroot / "created3").touch()
chown_mock.assert_has_calls([MockCall(repository_paths.root / "content" / "created2")])
def test_preserve_owner_no_directory(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
"""
must skip directory scan if it does not exist
"""
repository_paths = RepositoryPaths(tmp_path, repository_id)
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
with repository_paths.preserve_owner(Path("empty")):
(repository_paths.root / "created1").touch()
chown_mock.assert_not_called()
def test_tree_clear(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None: