Compare commits

..

19 Commits

Author SHA1 Message Date
a4fc515370 fix rebase 2026-02-10 02:13:35 +02:00
47eaec7027 add separate function for symlinks creation 2026-02-10 02:06:42 +02:00
a51fad2e9f fix sttyle 2026-02-10 02:06:31 +02:00
86b3266586 remove generators 2026-02-10 02:06:31 +02:00
9773cdfe18 simplify symlionk creation 2026-02-10 02:06:31 +02:00
183cd306f4 drop excess REQUIRES_REPOSITORY 2026-02-10 02:06:31 +02:00
5dbab02478 support requires repostory flag 2026-02-10 02:06:31 +02:00
3dc28866e8 gpg loader fix 2026-02-10 02:06:31 +02:00
eb56e3a762 regenerate docs 2026-02-10 02:06:31 +02:00
6c886399b5 add archive trigger 2026-02-10 02:06:31 +02:00
1099681487 add archive trigger 2026-02-10 02:06:31 +02:00
beed38ea8c lookup through archive packages before build 2026-02-10 02:06:31 +02:00
8d64699f2b use generic packages tree for all repos 2026-02-10 02:06:06 +02:00
5f7ac0355b implement atomic_move method, move files only with lock 2026-02-10 02:05:11 +02:00
e8abeeddc9 write tests to support new changes 2026-02-10 02:04:30 +02:00
b567a3b683 store built packages in archive tree instead of repository 2026-02-10 02:00:29 +02:00
389bad6725 fix: use effective uid instead of uid 2026-02-03 16:38:13 +02:00
5738b8b911 fix: rewrite preserver_owner method complitely
Previous implementation was somewhat working in the most) scenarios, but
was super slow to handle permissions. However, it is actually very
limited operations in which the application can do anything, so it is
much easier to just drop privileged user to normal one
2026-02-03 15:27:19 +02:00
5ac2e3de19 fix: handle permissionerror during walking over tree
Previously it tried to look into 700 directories (e.g. .gnupg) which
breaks running as non-ahriman user
2026-02-02 22:16:15 +02:00
13 changed files with 123 additions and 233 deletions

View File

@@ -72,14 +72,16 @@ class Setup(Handler):
application = Application(repository_id, configuration, report=report)
with application.repository.paths.preserve_owner():
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
Setup.executable_create(application.repository.paths, repository_id)
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
Setup.configuration_create_devtools(
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
Setup.configuration_create_sudo(application.repository.paths, repository_id)
# basically we create configuration here as root, but it is ok, because those files are only used for reading
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
Setup.executable_create(application.repository.paths, repository_id)
repository_server = f"file://{application.repository.paths.repository}" if args.server is None else args.server
Setup.configuration_create_devtools(
repository_id, args.from_configuration, args.mirror, args.multilib, repository_server)
Setup.configuration_create_sudo(application.repository.paths, repository_id)
# finish initialization
with application.repository.paths.preserve_owner():
application.repository.repo.init()
# lazy database sync
application.repository.pacman.handle # pylint: disable=pointless-statement

View File

@@ -158,7 +158,7 @@ class Lock(LazyLogging):
"""
check if current user is actually owner of ahriman root
"""
check_user(self.paths, unsafe=self.unsafe)
check_user(self.paths.root, unsafe=self.unsafe)
self.paths.tree_create()
def check_version(self) -> None:

View File

@@ -130,7 +130,7 @@ class Pacman(LazyLogging):
return # database for some reason deos not exist
self.logger.info("copy pacman database %s from operating system root to ahriman's home %s", src, dst)
with self.repository_paths.preserve_owner(dst.parent):
with self.repository_paths.preserve_owner():
shutil.copy(src, dst)
def database_init(self, handle: Handle, repository: str, architecture: str) -> DB:

View File

@@ -125,7 +125,7 @@ class ArchiveTree(LazyLogging):
if root.exists():
return
with self.paths.preserve_owner(self.paths.archive):
with self.paths.preserve_owner():
root.mkdir(0o755, parents=True)
# init empty repository here
Repo(self.repository_id.name, self.paths, self.sign_args, root).init()

View File

@@ -52,7 +52,7 @@ def migrate_data(connection: Connection, configuration: Configuration) -> None:
# create archive directory if required
if not paths.archive.is_dir():
with paths.preserve_owner(paths.archive):
with paths.preserve_owner():
paths.archive.mkdir(mode=0o755, parents=True)
move_packages(paths, pacman)

View File

@@ -20,7 +20,6 @@
import subprocess
from collections.abc import Callable
from pathlib import Path
from typing import Any, Self
from ahriman.models.repository_id import RepositoryId
@@ -229,20 +228,6 @@ class PkgbuildParserError(ValueError):
ValueError.__init__(self, message)
class PathError(ValueError):
"""
exception which will be raised on path which is not belong to root directory
"""
def __init__(self, path: Path, root: Path) -> None:
"""
Args:
path(Path): path which raised an exception
root(Path): repository root (i.e. ahriman home)
"""
ValueError.__init__(self, f"Path `{path}` does not belong to repository root `{root}`")
class PasswordError(ValueError):
"""
exception which will be raised in case of password related errors

View File

@@ -38,7 +38,6 @@ from pwd import getpwuid
from typing import Any, IO, TypeVar
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
from ahriman.models.repository_paths import RepositoryPaths
__all__ = [
@@ -52,6 +51,7 @@ __all__ = [
"filter_json",
"full_version",
"minmax",
"owner",
"package_like",
"parse_version",
"partition",
@@ -61,6 +61,7 @@ __all__ = [
"safe_filename",
"srcinfo_property",
"srcinfo_property_list",
"symlink_relative",
"trim_package",
"utcnow",
"walk",
@@ -194,12 +195,13 @@ def check_output(*args: str, exception: Exception | Callable[[int, list[str], st
return stdout
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
def check_user(root: Path, *, unsafe: bool) -> None:
"""
check if current user is the owner of the root
Args:
paths(RepositoryPaths): repository paths object
root(Path): path to root directory (e.g. repository root
:attr:`ahriman.models.repository_paths.RepositoryPaths.root`)
unsafe(bool): if set no user check will be performed before path creation
Raises:
@@ -208,14 +210,16 @@ def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
Examples:
Simply run function with arguments::
>>> check_user(paths, unsafe=False)
>>> check_user(root, unsafe=False)
"""
if not paths.root.exists():
if not root.exists():
return # no directory found, skip check
if unsafe:
return # unsafe flag is enabled, no check performed
current_uid = os.getuid()
root_uid, _ = paths.root_owner
current_uid = os.geteuid()
root_uid, _ = owner(root)
if current_uid != root_uid:
raise UnsafeRunError(current_uid, root_uid)
@@ -334,6 +338,20 @@ def minmax(source: Iterable[T], *, key: Callable[[T], Any] | None = None) -> tup
return min(first_iter, key=key), max(second_iter, key=key) # type: ignore
def owner(path: Path) -> tuple[int, int]:
"""
retrieve owner information by path
Args:
path(Path): path for which extract ids
Returns:
tuple[int, int]: owner user and group ids of the directory
"""
stat = path.stat()
return stat.st_uid, stat.st_gid
def package_like(filename: Path) -> bool:
"""
check if file looks like package

View File

@@ -27,8 +27,8 @@ from functools import cached_property
from pathlib import Path
from pwd import getpwuid
from ahriman.core.exceptions import PathError
from ahriman.core.log import LazyLogging
from ahriman.core.utils import owner
from ahriman.models.repository_id import RepositoryId
@@ -103,7 +103,7 @@ class RepositoryPaths(LazyLogging):
Returns:
Path: path to directory in which build process is run
"""
uid, _ = self.owner(self.root)
uid, _ = owner(self.root)
return self.chroot / f"{self.repository_id.name}-{self.repository_id.architecture}" / getpwuid(uid).pw_name
@property
@@ -165,7 +165,7 @@ class RepositoryPaths(LazyLogging):
Returns:
tuple[int, int]: owner user and group of the root directory
"""
return self.owner(self.root)
return owner(self.root)
# pylint: disable=protected-access
@classmethod
@@ -218,47 +218,6 @@ class RepositoryPaths(LazyLogging):
return set(walk(instance))
@staticmethod
def owner(path: Path) -> tuple[int, int]:
"""
retrieve owner information by path
Args:
path(Path): path for which extract ids
Returns:
tuple[int, int]: owner user and group ids of the directory
"""
stat = path.stat()
return stat.st_uid, stat.st_gid
def _chown(self, path: Path) -> None:
"""
set owner of path recursively (from root) to root owner
Notes:
More likely you don't want to call this method explicitly, consider using :func:`preserve_owner()`
as context manager instead
Args:
path(Path): path to be chown
Raises:
PathError: if path does not belong to root
"""
def set_owner(current: Path) -> None:
uid, gid = self.owner(current)
if uid == root_uid and gid == root_gid:
return
os.chown(current, root_uid, root_gid, follow_symlinks=False)
if self.root not in path.parents:
raise PathError(path, self.root)
root_uid, root_gid = self.root_owner
while path != self.root:
set_owner(path)
path = path.parent
def archive_for(self, package_base: str) -> Path:
"""
get path to archive specified search criteria
@@ -271,7 +230,7 @@ class RepositoryPaths(LazyLogging):
"""
directory = self.archive / "packages" / package_base[0] / package_base
if not directory.is_dir(): # create if not exists
with self.preserve_owner(self.archive):
with self.preserve_owner():
directory.mkdir(mode=0o755, parents=True)
return directory
@@ -289,13 +248,10 @@ class RepositoryPaths(LazyLogging):
return self.cache / package_base
@contextlib.contextmanager
def preserve_owner(self, path: Path | None = None) -> Iterator[None]:
def preserve_owner(self) -> Iterator[None]:
"""
perform any action preserving owner for any newly created file or directory
Args:
path(Path | None, optional): use this path as root instead of repository root (Default value = None)
Examples:
This method is designed to use as context manager when you are going to perform operations which might
change filesystem, especially if you are doing it under unsafe flag, e.g.::
@@ -306,29 +262,26 @@ class RepositoryPaths(LazyLogging):
Note, however, that this method doesn't handle any exceptions and will eventually interrupt
if there will be any.
"""
path = path or self.root
# guard non-root
# the reason we do this is that it only works if permissions can be actually changed. Hence,
# non-privileged user (e.g. personal user or ahriman user) can't change permissions.
# The only one who can do so is root, so if user is not root we just terminate function
current_uid, current_gid = os.geteuid(), os.getegid()
if current_uid != 0:
yield
return
def walk(root: Path) -> Iterator[Path]:
yield root
if not root.exists():
return
# set uid and gid to root owner
target_uid, target_gid = self.root_owner
os.setegid(target_gid)
os.seteuid(target_uid)
# basically walk, but skipping some content
for child in root.iterdir():
yield child
if child in (self.chroot.parent,):
yield from child.iterdir() # we only yield top-level in chroot directory
elif child.is_dir():
yield from walk(child)
# get current filesystem and run action
previous_snapshot = set(walk(path))
yield
# get newly created files and directories and chown them
new_entries = set(walk(path)).difference(previous_snapshot)
for entry in new_entries:
self._chown(entry)
try:
yield
finally:
# reset uid and gid
os.seteuid(current_uid)
os.setegid(current_gid)
def tree_clear(self, package_base: str) -> None:
"""

View File

@@ -142,7 +142,7 @@ def test_check_user(lock: Lock, mocker: MockerFixture) -> None:
tree_create = mocker.patch("ahriman.models.repository_paths.RepositoryPaths.tree_create")
lock.check_user()
check_user_patch.assert_called_once_with(lock.paths, unsafe=False)
check_user_patch.assert_called_once_with(lock.paths.root, unsafe=False)
tree_create.assert_called_once_with()

View File

@@ -67,7 +67,7 @@ def test_database_copy(pacman: Pacman, mocker: MockerFixture) -> None:
pacman.database_copy(pacman.handle, database, path, use_ahriman_cache=True)
mkdir_mock.assert_called_once_with(mode=0o755, exist_ok=True)
copy_mock.assert_called_once_with(path / "sync" / "core.db", dst_path)
owner_guard_mock.assert_called_once_with(dst_path.parent)
owner_guard_mock.assert_called_once_with()
def test_database_copy_skip(pacman: Pacman, mocker: MockerFixture) -> None:

View File

@@ -117,7 +117,7 @@ def test_tree_create(archive_tree: ArchiveTree, mocker: MockerFixture) -> None:
init_mock = mocker.patch("ahriman.core.alpm.repo.Repo.init")
archive_tree.tree_create()
owner_guard_mock.assert_called_once_with(archive_tree.paths.archive)
owner_guard_mock.assert_called_once_with()
mkdir_mock.assert_called_once_with(0o755, parents=True)
init_mock.assert_called_once_with()

View File

@@ -7,34 +7,10 @@ import pytest
from pathlib import Path
from pytest_mock import MockerFixture
from typing import Any
from unittest.mock import call as MockCall
from unittest.mock import MagicMock, call as MockCall
from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError
from ahriman.core.utils import (
atomic_move,
check_output,
check_user,
dataclass_view,
enum_values,
extract_user,
filelock,
filter_json,
full_version,
minmax,
package_like,
parse_version,
partition,
pretty_datetime,
pretty_interval,
pretty_size,
safe_filename,
srcinfo_property,
srcinfo_property_list,
symlink_relative,
trim_package,
utcnow,
walk,
)
from ahriman.core.utils import *
from ahriman.models.package import Package
from ahriman.models.package_source import PackageSource
from ahriman.models.repository_id import RepositoryId
@@ -197,8 +173,8 @@ def test_check_user(repository_id: RepositoryId, mocker: MockerFixture) -> None:
must check user correctly
"""
paths = RepositoryPaths(Path.cwd(), repository_id)
mocker.patch("os.getuid", return_value=paths.root_owner[0])
check_user(paths, unsafe=False)
mocker.patch("os.geteuid", return_value=paths.root_owner[0])
check_user(paths.root, unsafe=False)
def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
@@ -206,7 +182,7 @@ def test_check_user_no_directory(repository_paths: RepositoryPaths, mocker: Mock
must not fail in case if no directory found
"""
mocker.patch("pathlib.Path.exists", return_value=False)
check_user(repository_paths, unsafe=False)
check_user(repository_paths.root, unsafe=False)
def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture) -> None:
@@ -214,10 +190,10 @@ def test_check_user_exception(repository_id: RepositoryId, mocker: MockerFixture
must raise exception if user differs
"""
paths = RepositoryPaths(Path.cwd(), repository_id)
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1)
with pytest.raises(UnsafeRunError):
check_user(paths, unsafe=False)
check_user(paths.root, unsafe=False)
def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -> None:
@@ -225,8 +201,8 @@ def test_check_user_unsafe(repository_id: RepositoryId, mocker: MockerFixture) -
must skip check if unsafe flag is set
"""
paths = RepositoryPaths(Path.cwd(), repository_id)
mocker.patch("os.getuid", return_value=paths.root_owner[0] + 1)
check_user(paths, unsafe=True)
mocker.patch("os.geteuid", return_value=paths.root_owner[0] + 1)
check_user(paths.root, unsafe=True)
def test_dataclass_view(package_ahriman: Package) -> None:
@@ -357,6 +333,18 @@ def test_minmax() -> None:
assert minmax([[1, 2, 3], [4, 5], [6, 7, 8, 9]], key=len) == ([4, 5], [6, 7, 8, 9])
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly retrieve owner of the path
"""
stat_mock = MagicMock()
stat_mock.st_uid = 42
stat_mock.st_gid = 142
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
assert owner(repository_paths.root) == (42, 142)
def test_package_like(package_ahriman: Package) -> None:
"""
package_like must return true for archives

View File

@@ -3,9 +3,8 @@ import pytest
from collections.abc import Callable
from pathlib import Path
from pytest_mock import MockerFixture
from unittest.mock import MagicMock, call as MockCall
from unittest.mock import call as MockCall
from ahriman.core.exceptions import PathError
from ahriman.models.package import Package
from ahriman.models.repository_id import RepositoryId
from ahriman.models.repository_paths import RepositoryPaths
@@ -55,7 +54,7 @@ def test_root_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) ->
"""
must correctly define root directory owner
"""
mocker.patch("ahriman.models.repository_paths.RepositoryPaths.owner", return_value=(42, 142))
mocker.patch("ahriman.models.repository_paths.owner", return_value=(42, 142))
assert repository_paths.root_owner == (42, 142)
@@ -186,68 +185,6 @@ def test_known_repositories_empty(repository_paths: RepositoryPaths, mocker: Moc
iterdir_mock.assert_not_called()
def test_owner(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly retrieve owner of the path
"""
stat_mock = MagicMock()
stat_mock.st_uid = 42
stat_mock.st_gid = 142
mocker.patch("pathlib.Path.stat", return_value=stat_mock)
assert RepositoryPaths.owner(repository_paths.root) == (42, 142)
def test_chown(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly set owner for the directory
"""
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "path"
repository_paths._chown(path)
chown_mock.assert_called_once_with(path, 42, 42, follow_symlinks=False)
def test_chown_parent(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must correctly set owner for the directory including parents
"""
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=False))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "parent" / "path"
repository_paths._chown(path)
chown_mock.assert_has_calls([
MockCall(path, 42, 42, follow_symlinks=False),
MockCall(path.parent, 42, 42, follow_symlinks=False)
])
def test_chown_skip(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must skip ownership set in case if it is same as root
"""
object.__setattr__(repository_paths, "owner", _get_owner(repository_paths.root, same=True))
mocker.patch.object(RepositoryPaths, "root_owner", (42, 42))
chown_mock = mocker.patch("os.chown")
path = repository_paths.root / "path"
repository_paths._chown(path)
chown_mock.assert_not_called()
def test_chown_invalid_path(repository_paths: RepositoryPaths) -> None:
"""
must raise invalid path exception in case if directory outside the root supplied
"""
with pytest.raises(PathError):
repository_paths._chown(repository_paths.root.parent)
def test_archive_for(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must correctly define archive path
@@ -266,7 +203,7 @@ def test_archive_for_create_tree(repository_paths: RepositoryPaths, package_ahri
mkdir_mock = mocker.patch("pathlib.Path.mkdir")
repository_paths.archive_for(package_ahriman.base)
owner_mock.assert_called_once_with(repository_paths.archive)
owner_mock.assert_called_once_with()
mkdir_mock.assert_called_once_with(mode=0o755, parents=True)
@@ -283,42 +220,49 @@ def test_preserve_owner(tmp_path: Path, repository_id: RepositoryId, mocker: Moc
"""
must preserve file owner during operations
"""
mocker.patch("os.geteuid", return_value=0)
mocker.patch("os.getegid", return_value=0)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths = RepositoryPaths(tmp_path, repository_id)
target_uid, target_gid = repository_paths.root_owner
repository_paths.tree_create()
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
with repository_paths.preserve_owner():
(repository_paths.root / "created1").touch()
(repository_paths.chroot / "created2").touch()
chown_mock.assert_has_calls([MockCall(repository_paths.root / "created1")])
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
def test_preserve_owner_specific(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
def test_preserve_owner_exception(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
"""
must preserve file owner during operations only in specific directory
must return to original uid and gid even during exception
"""
mocker.patch("os.geteuid", return_value=0)
mocker.patch("os.getegid", return_value=0)
mocker.patch("pathlib.Path.mkdir", side_effect=Exception)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths = RepositoryPaths(tmp_path, repository_id)
target_uid, target_gid = repository_paths.root_owner
with pytest.raises(Exception):
repository_paths.tree_create()
seteuid_mock.assert_has_calls([MockCall(target_uid), MockCall(0)])
setegid_mock.assert_has_calls([MockCall(target_gid), MockCall(0)])
def test_preserve_owner_non_root(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
"""
must skip processing if user is not root
"""
mocker.patch("os.geteuid", return_value=42)
mocker.patch("os.getegid", return_value=42)
repository_paths = RepositoryPaths(tmp_path, repository_id)
seteuid_mock = mocker.patch("os.seteuid")
setegid_mock = mocker.patch("os.setegid")
repository_paths.tree_create()
(repository_paths.root / "content").mkdir()
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
with repository_paths.preserve_owner(repository_paths.root / "content"):
(repository_paths.root / "created1").touch()
(repository_paths.root / "content" / "created2").touch()
(repository_paths.chroot / "created3").touch()
chown_mock.assert_has_calls([MockCall(repository_paths.root / "content" / "created2")])
def test_preserve_owner_no_directory(tmp_path: Path, repository_id: RepositoryId, mocker: MockerFixture) -> None:
"""
must skip directory scan if it does not exist
"""
repository_paths = RepositoryPaths(tmp_path, repository_id)
chown_mock = mocker.patch("ahriman.models.repository_paths.RepositoryPaths._chown")
with repository_paths.preserve_owner(Path("empty")):
(repository_paths.root / "created1").touch()
chown_mock.assert_not_called()
seteuid_mock.assert_not_called()
setegid_mock.assert_not_called()
def test_tree_clear(repository_paths: RepositoryPaths, package_ahriman: Package, mocker: MockerFixture) -> None: