Compare commits

...

6 Commits

12 changed files with 420 additions and 107 deletions

View File

@ -15,8 +15,6 @@ pacman -Sy --noconfirm devtools git pyalpm python-inflection python-passlib pyth
pacman -Sy --noconfirm --asdeps base-devel python-build python-flit python-installer python-tox python-wheel
# optional dependencies
if [[ -z $MINIMAL_INSTALL ]]; then
# VCS support
pacman -Sy --noconfirm breezy darcs mercurial subversion
# web server
pacman -Sy --noconfirm python-aioauth-client python-aiohttp python-aiohttp-apispec-git python-aiohttp-cors python-aiohttp-jinja2 python-aiohttp-security python-aiohttp-session python-cryptography python-jinja
# additional features

View File

@ -31,7 +31,6 @@ RUN useradd -m -d "/home/build" -s "/usr/bin/nologin" build && \
echo "build ALL=(ALL) NOPASSWD: ALL" > "/etc/sudoers.d/build"
COPY "docker/install-aur-package.sh" "/usr/local/bin/install-aur-package"
## install package dependencies
## darcs is not installed by reasons, because it requires a lot haskell packages which dramatically increase image size
RUN pacman -Sy --noconfirm --asdeps \
devtools \
git \
@ -50,9 +49,7 @@ RUN pacman -Sy --noconfirm --asdeps \
python-wheel \
&& \
pacman -Sy --noconfirm --asdeps \
breezy \
git \
mercurial \
python-aiohttp \
python-boto3 \
python-cerberus \
@ -61,7 +58,6 @@ RUN pacman -Sy --noconfirm --asdeps \
python-matplotlib \
python-systemd \
rsync \
subversion \
&& \
runuser -u build -- install-aur-package \
python-aioauth-client \

View File

@ -265,11 +265,7 @@ TL;DR
How to update VCS packages
^^^^^^^^^^^^^^^^^^^^^^^^^^
Normally the service handles VCS packages correctly, however it requires additional dependencies:
.. code-block:: shell
pacman -S breezy darcs mercurial subversion
Normally the service handles VCS packages correctly. The version is updated in clean chroot, no additional actions are required.
How to review changes before build
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

View File

@ -9,10 +9,7 @@ url="https://github.com/arcan1s/ahriman"
license=('GPL3')
depends=('devtools>=1:1.0.0' 'git' 'pyalpm' 'python-inflection' 'python-passlib' 'python-pyelftools' 'python-requests' 'python-srcinfo')
makedepends=('python-build' 'python-flit' 'python-installer' 'python-wheel')
optdepends=('breezy: -bzr packages support'
'darcs: -darcs packages support'
'mercurial: -hg packages support'
'python-aioauth-client: web server with OAuth2 authorization'
optdepends=('python-aioauth-client: web server with OAuth2 authorization'
'python-aiohttp: web server'
'python-aiohttp-apispec>=3.0.0: web server'
'python-aiohttp-cors: web server'
@ -26,8 +23,7 @@ optdepends=('breezy: -bzr packages support'
'python-requests-unixsocket2: client report to web server by unix socket'
'python-jinja: html report generation'
'python-systemd: journal support'
'rsync: sync by using rsync'
'subversion: -svn packages support')
'rsync: sync by using rsync')
source=("https://github.com/arcan1s/ahriman/releases/download/$pkgver/$pkgname-$pkgver.tar.gz"
'ahriman.sysusers'
'ahriman.tmpfiles')

View File

@ -17,13 +17,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from collections.abc import Generator
from pathlib import Path
from ahriman.core.build_tools.sources import Sources
from ahriman.core.configuration import Configuration
from ahriman.core.exceptions import BuildError
from ahriman.core.log import LazyLogging
from ahriman.core.utils import check_output
from ahriman.core.utils import check_output, package_like
from ahriman.models.package import Package
from ahriman.models.pkgbuild_patch import PkgbuildPatch
from ahriman.models.repository_paths import RepositoryPaths
@ -67,12 +68,43 @@ class Task(LazyLogging):
self.makepkg_flags = configuration.getlist("build", "makepkg_flags", fallback=[])
self.makechrootpkg_flags = configuration.getlist("build", "makechrootpkg_flags", fallback=[])
def build(self, sources_dir: Path, **kwargs: str | None) -> list[Path]:
def _package_archives(self, sources_dir: Path, source_files: list[Path]) -> list[Path]:
"""
extract package archives from the directory
Args:
sources_dir(Path): path to where sources are
source_files(list[Path]): list of files which were initially in the directory
Returns:
list[Path]: list of file paths which looks like freshly generated archives
"""
def files() -> Generator[Path, None, None]:
for filepath in sources_dir.iterdir():
if filepath in source_files:
continue # skip files which were already there
if filepath.suffix == ".log":
continue # skip log files
if not package_like(filepath):
continue # path doesn't look like a package
yield filepath
# debug packages are always formed as package.base-debug
# see /usr/share/makepkg/util/pkgbuild.sh for more details
debug_package_prefix = f"{self.package.base}-debug-"
return [
package
for package in files()
if self.include_debug_packages or not package.name.startswith(debug_package_prefix)
]
def build(self, sources_dir: Path, *, dry_run: bool = False, **kwargs: str | None) -> list[Path]:
"""
run package build
Args:
sources_dir(Path): path to where sources are
dry_run(bool, optional): do not perform build itself (Default value = False)
**kwargs(str | None): environment variables to be passed to build processes
Returns:
@ -82,6 +114,8 @@ class Task(LazyLogging):
command.extend(self.archbuild_flags)
command.extend(["--"] + self.makechrootpkg_flags)
command.extend(["--"] + self.makepkg_flags)
if dry_run:
command.extend(["--nobuild"])
self.logger.info("using %s for %s", command, self.package.base)
environment: dict[str, str] = {
@ -91,6 +125,7 @@ class Task(LazyLogging):
}
self.logger.info("using environment variables %s", environment)
source_files = list(sources_dir.iterdir())
check_output(
*command,
exception=BuildError.from_process(self.package.base),
@ -100,20 +135,7 @@ class Task(LazyLogging):
environment=environment,
)
package_list_command = ["makepkg", "--packagelist"]
if not self.include_debug_packages:
package_list_command.append("OPTIONS=(!debug)") # disable debug flag manually
packages = check_output(
*package_list_command,
exception=BuildError.from_process(self.package.base),
cwd=sources_dir,
logger=self.logger,
environment=environment,
).splitlines()
# some dirty magic here
# the filter is applied in order to make sure that result will only contain packages which were actually built
# e.g. in some cases packagelist command produces debug packages which were not actually built
return list(filter(lambda path: path.is_file(), map(Path, packages)))
return self._package_archives(sources_dir, source_files)
def init(self, sources_dir: Path, patches: list[PkgbuildPatch], local_version: str | None) -> str | None:
"""

View File

@ -58,7 +58,7 @@ class PackageInfo(RepositoryProperties):
# force version to max of them
self.logger.warning("version of %s differs, found %s and %s",
current.base, current.version, local.version)
if current.is_outdated(local, self.paths, calculate_version=False):
if current.is_outdated(local, self.configuration, calculate_version=False):
current.version = local.version
current.packages.update(local.packages)
except Exception:

View File

@ -51,7 +51,6 @@ class RepositoryProperties(EventLogger, LazyLogging):
scan_paths(ScanPaths): scan paths for the implicit dependencies
sign(GPG): GPG wrapper instance
triggers(TriggerLoader): triggers holder
vcs_allowed_age(int): maximal age of the VCS packages before they will be checked
"""
def __init__(self, repository_id: RepositoryId, configuration: Configuration, database: SQLite, *, report: bool,
@ -70,8 +69,6 @@ class RepositoryProperties(EventLogger, LazyLogging):
self.configuration = configuration
self.database = database
self.vcs_allowed_age = configuration.getint("build", "vcs_allowed_age", fallback=0)
self.paths: RepositoryPaths = configuration.repository_paths # additional workaround for pycharm typing
self.ignore_list = configuration.getlist("build", "ignore_packages", fallback=[])

View File

@ -67,10 +67,7 @@ class UpdateHandler(PackageInfo, Cleaner):
try:
remote = load_remote(local)
if local.is_outdated(
remote, self.paths,
vcs_allowed_age=self.vcs_allowed_age,
calculate_version=vcs):
if local.is_outdated(remote, self.configuration, calculate_version=vcs):
self.reporter.set_pending(local.base)
self.event(local.base, EventType.PackageOutdated, "Remote version is newer than local")
result.append(remote)
@ -154,9 +151,7 @@ class UpdateHandler(PackageInfo, Cleaner):
if local is None:
continue # we don't add packages automatically
if local.is_outdated(remote, self.paths,
vcs_allowed_age=self.vcs_allowed_age,
calculate_version=vcs):
if local.is_outdated(remote, self.configuration, calculate_version=vcs):
self.reporter.set_pending(local.base)
self.event(local.base, EventType.PackageOutdated, "Locally pulled sources are outdated")
result.append(remote)

View File

@ -27,7 +27,7 @@ import re
import selectors
import subprocess
from collections.abc import Callable, Generator, Iterable
from collections.abc import Callable, Generator, Iterable, Mapping
from dataclasses import asdict
from enum import Enum
from pathlib import Path
@ -407,7 +407,7 @@ def safe_filename(source: str) -> str:
return re.sub(r"[^A-Za-z\d\-._~:\[\]@]", "-", source)
def srcinfo_property(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[str, Any], *,
def srcinfo_property(key: str, srcinfo: Mapping[str, Any], package_srcinfo: Mapping[str, Any], *,
default: Any = None) -> Any:
"""
extract property from SRCINFO. This method extracts property from package if this property is presented in
@ -416,8 +416,8 @@ def srcinfo_property(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[st
Args:
key(str): key to extract
srcinfo(dict[str, Any]): root structure of SRCINFO
package_srcinfo(dict[str, Any]): package specific SRCINFO
srcinfo(Mapping[str, Any]): root structure of SRCINFO
package_srcinfo(Mapping[str, Any]): package specific SRCINFO
default(Any, optional): the default value for the specified key (Default value = None)
Returns:
@ -426,7 +426,7 @@ def srcinfo_property(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[st
return package_srcinfo.get(key) or srcinfo.get(key) or default
def srcinfo_property_list(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[str, Any], *,
def srcinfo_property_list(key: str, srcinfo: Mapping[str, Any], package_srcinfo: Mapping[str, Any], *,
architecture: str | None = None) -> list[Any]:
"""
extract list property from SRCINFO. Unlike :func:`srcinfo_property()` it supposes that default return value is
@ -435,8 +435,8 @@ def srcinfo_property_list(key: str, srcinfo: dict[str, Any], package_srcinfo: di
Args:
key(str): key to extract
srcinfo(dict[str, Any]): root structure of SRCINFO
package_srcinfo(dict[str, Any]): package specific SRCINFO
srcinfo(Mapping[str, Any]): root structure of SRCINFO
package_srcinfo(Mapping[str, Any]): package specific SRCINFO
architecture(str | None, optional): package architecture if set (Default value = None)
Returns:

View File

@ -26,19 +26,18 @@ from collections.abc import Callable, Generator, Iterable
from dataclasses import dataclass
from pathlib import Path
from pyalpm import vercmp # type: ignore[import-not-found]
from srcinfo.parse import parse_srcinfo # type: ignore[import-untyped]
from typing import Any, Self
from urllib.parse import urlparse
from ahriman.core.alpm.pacman import Pacman
from ahriman.core.alpm.remote import AUR, Official, OfficialSyncdb
from ahriman.core.exceptions import PackageInfoError
from ahriman.core.configuration import Configuration
from ahriman.core.log import LazyLogging
from ahriman.core.utils import check_output, dataclass_view, full_version, parse_version, srcinfo_property_list, utcnow
from ahriman.core.utils import dataclass_view, full_version, parse_version, srcinfo_property_list, utcnow
from ahriman.models.package_description import PackageDescription
from ahriman.models.package_source import PackageSource
from ahriman.models.pkgbuild import Pkgbuild
from ahriman.models.remote_source import RemoteSource
from ahriman.models.repository_paths import RepositoryPaths
@dataclass(kw_only=True)
@ -255,25 +254,23 @@ class Package(LazyLogging):
Returns:
Self: package properties
Raises:
PackageInfoError: if there are parsing errors
"""
srcinfo_source = check_output("makepkg", "--printsrcinfo", cwd=path)
srcinfo, errors = parse_srcinfo(srcinfo_source)
if errors:
raise PackageInfoError(errors)
pkgbuild = Pkgbuild.from_file(path / "PKGBUILD")
packages = {
package: PackageDescription(
depends=srcinfo_property_list("depends", srcinfo, properties, architecture=architecture),
make_depends=srcinfo_property_list("makedepends", srcinfo, properties, architecture=architecture),
opt_depends=srcinfo_property_list("optdepends", srcinfo, properties, architecture=architecture),
check_depends=srcinfo_property_list("checkdepends", srcinfo, properties, architecture=architecture),
depends=srcinfo_property_list("depends", pkgbuild, properties, architecture=architecture),
make_depends=srcinfo_property_list("makedepends", pkgbuild, properties, architecture=architecture),
opt_depends=srcinfo_property_list("optdepends", pkgbuild, properties, architecture=architecture),
check_depends=srcinfo_property_list("checkdepends", pkgbuild, properties, architecture=architecture),
)
for package, properties in srcinfo["packages"].items()
for package, properties in pkgbuild.packages().items()
}
version = full_version(srcinfo.get("epoch"), srcinfo["pkgver"], srcinfo["pkgrel"])
version = full_version(
pkgbuild.get_as("epoch", default=None),
pkgbuild.get_as("pkgver"),
pkgbuild.get_as("pkgrel"),
)
remote = RemoteSource(
source=PackageSource.Local,
@ -284,7 +281,7 @@ class Package(LazyLogging):
)
return cls(
base=srcinfo["pkgbase"],
base=pkgbuild.get_as("pkgbase"),
version=version,
remote=remote,
packages=packages,
@ -363,16 +360,12 @@ class Package(LazyLogging):
Raises:
PackageInfoError: if there are parsing errors
"""
srcinfo_source = check_output("makepkg", "--printsrcinfo", cwd=path)
srcinfo, errors = parse_srcinfo(srcinfo_source)
if errors:
raise PackageInfoError(errors)
pkgbuild = Pkgbuild.from_file(path / "PKGBUILD")
# we could use arch property, but for consistency it is better to call special method
architectures = Package.supported_architectures(path)
for architecture in architectures:
for source in srcinfo_property_list("source", srcinfo, {}, architecture=architecture):
for source in srcinfo_property_list("source", pkgbuild, {}, architecture=architecture):
if "::" in source:
_, source = source.split("::", 1) # in case if filename is specified, remove it
@ -383,7 +376,7 @@ class Package(LazyLogging):
yield Path(source)
if (install := srcinfo.get("install", None)) is not None:
if isinstance(install := pkgbuild.get("install"), str): # well, in reality it is either None or str
yield Path(install)
@staticmethod
@ -396,15 +389,9 @@ class Package(LazyLogging):
Returns:
set[str]: list of package supported architectures
Raises:
PackageInfoError: if there are parsing errors
"""
srcinfo_source = check_output("makepkg", "--printsrcinfo", cwd=path)
srcinfo, errors = parse_srcinfo(srcinfo_source)
if errors:
raise PackageInfoError(errors)
return set(srcinfo.get("arch", []))
pkgbuild = Pkgbuild.from_file(path / "PKGBUILD")
return set(pkgbuild.get("arch", []))
def _package_list_property(self, extractor: Callable[[PackageDescription], list[str]]) -> list[str]:
"""
@ -426,39 +413,43 @@ class Package(LazyLogging):
return sorted(set(generator()))
def actual_version(self, paths: RepositoryPaths) -> str:
def actual_version(self, configuration: Configuration) -> str:
"""
additional method to handle VCS package versions
Args:
paths(RepositoryPaths): repository paths instance
configuration(Configuration): configuration instance
Returns:
str: package version if package is not VCS and current version according to VCS otherwise
Raises:
PackageInfoError: if there are parsing errors
"""
if not self.is_vcs:
return self.version
from ahriman.core.build_tools.sources import Sources
from ahriman.core.build_tools.task import Task
Sources.load(paths.cache_for(self.base), self, [], paths)
_, repository_id = configuration.check_loaded()
paths = configuration.repository_paths
task = Task(self, configuration, repository_id.architecture, paths)
try:
# update pkgver first
check_output("makepkg", "--nodeps", "--nobuild", cwd=paths.cache_for(self.base), logger=self.logger)
# generate new .SRCINFO and put it to parser
srcinfo_source = check_output("makepkg", "--printsrcinfo",
cwd=paths.cache_for(self.base), logger=self.logger)
srcinfo, errors = parse_srcinfo(srcinfo_source)
if errors:
raise PackageInfoError(errors)
# create fresh chroot environment, fetch sources and - automagically - update PKGBUILD
task.init(paths.cache_for(self.base), [], None)
task.build(paths.cache_for(self.base), dry_run=False)
return full_version(srcinfo.get("epoch"), srcinfo["pkgver"], srcinfo["pkgrel"])
pkgbuild = Pkgbuild.from_file(paths.cache_for(self.base) / "PKGBUILD")
return full_version(
pkgbuild.get_as("epoch", default=None),
pkgbuild.get_as("pkgver"),
pkgbuild.get_as("pkgrel"),
)
except Exception:
self.logger.exception("cannot determine version of VCS package, make sure that VCS tools are installed")
self.logger.exception("cannot determine version of VCS package")
finally:
# clear log files generated by devtools
for log_file in paths.cache_for(self.base).glob("*.log"):
log_file.unlink()
return self.version
@ -513,26 +504,25 @@ class Package(LazyLogging):
if package.build_date is not None
)
def is_outdated(self, remote: Package, paths: RepositoryPaths, *,
vcs_allowed_age: float | int = 0,
def is_outdated(self, remote: Package, configuration: Configuration, *,
calculate_version: bool = True) -> bool:
"""
check if package is out-of-dated
Args:
remote(Package): package properties from remote source
paths(RepositoryPaths): repository paths instance. Required for VCS packages cache
vcs_allowed_age(float | int, optional): max age of the built packages before they will be
forced to calculate actual version (Default value = 0)
configuration(Configuration): configuration instance
calculate_version(bool, optional): expand version to actual value (by calculating git versions)
(Default value = True)
Returns:
bool: ``True`` if the package is out-of-dated and ``False`` otherwise
"""
vcs_allowed_age = configuration.getint("build", "vcs_allowed_age", fallback=0)
min_vcs_build_date = utcnow().timestamp() - vcs_allowed_age
if calculate_version and not self.is_newer_than(min_vcs_build_date):
remote_version = remote.actual_version(paths)
remote_version = remote.actual_version(configuration)
else:
remote_version = remote.version

View File

@ -0,0 +1,307 @@
#
# Copyright (c) 2021-2024 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import re
import shlex
from collections.abc import Generator, Iterator, Mapping
from dataclasses import dataclass
from enum import StrEnum
from io import StringIO
from pathlib import Path
from typing import IO, Self, TypeVar, cast
from ahriman.models.pkgbuild_patch import PkgbuildPatch
T = TypeVar("T", str, list[str])
U = TypeVar("U", str, list[str], None)
class PkgbuildToken(StrEnum):
"""
well-known tokens dictionary
Attributes:
ArrayEnds(PkgbuildToken): (class attribute) array ends token
ArrayStarts(PkgbuildToken): (class attribute) array starts token
FunctionDeclaration(PkgbuildToken): (class attribute) function declaration token
FunctionEnds(PkgbuildToken): (class attribute) function ends token
FunctionStarts(PkgbuildToken): (class attribute) function starts token
"""
ArrayStarts = "("
ArrayEnds = ")"
FunctionDeclaration = "()"
FunctionStarts = "{"
FunctionEnds = "}"
@dataclass(frozen=True)
class Pkgbuild(Mapping[str, str | list[str]]):
"""
simple pkgbuild reader implementation in pure python, because others sucks
Attributes:
fields(dict[str, PkgbuildPatch]): PKGBUILD fields
"""
fields: dict[str, PkgbuildPatch]
_ARRAY_ASSIGNMENT = re.compile(r"^(?P<key>\w+)=$")
_STRING_ASSIGNMENT = re.compile(r"^(?P<key>\w+)=(?P<value>.+)$")
# in addition, functions can have dash to usual assignment
_FUNCTION_DECLARATION = re.compile(r"^(?P<key>[\w-]+)$")
@property
def variables(self) -> dict[str, str]:
"""
list of variables defined and (maybe) used in this PKGBUILD
Returns:
dict[str, str]: map of variable name to its value. The value will be included here in case if it presented
in the internal dictionary, it is not a function and the value has string type
"""
return {
key: value.value
for key, value in self.fields.items()
if not value.is_function and isinstance(value.value, str)
}
@classmethod
def from_file(cls, path: Path) -> Self:
"""
parse PKGBUILD from the file
Args:
path(Path): path to the PKGBUILD file
Returns:
Self: constructed instance of self
"""
with path.open() as input_file:
return cls.from_io(input_file)
@classmethod
def from_io(cls, stream: IO[str]) -> Self:
"""
parse PKGBUILD from input stream
Args:
stream: IO[str]: input stream containing PKGBUILD content
Returns:
Self: constructed instance of self
"""
fields = {}
parser = shlex.shlex(stream, posix=True, punctuation_chars=True)
# ignore substitution and extend bash symbols
parser.wordchars += "${}#:+"
# in case of default behaviour, it will ignore, for example, segment part of url outside of quotes
parser.commenters = ""
while token := parser.get_token():
try:
key, value = cls._parse_token(token, parser)
fields[key] = value
except StopIteration:
break
return cls(fields)
@staticmethod
def _parse_array(parser: shlex.shlex) -> list[str]:
"""
parse array from the PKGBUILD. This method will extract tokens from parser until it matches closing array,
modifying source parser state
Args:
parser(shlex.shlex): shell parser instance
Returns:
list[str]: extracted arrays elements
Raises:
ValueError: if array is not closed
"""
def extract() -> Generator[str, None, None]:
while token := parser.get_token():
if token == PkgbuildToken.ArrayEnds:
break
yield token
if token != PkgbuildToken.ArrayEnds:
raise ValueError("No closing array bracket found")
return list(extract())
@staticmethod
def _parse_function(parser: shlex.shlex) -> str:
"""
parse function from the PKGBUILD. This method will extract tokens from parser until it matches closing function,
modifying source parser state. Instead of trying to combine tokens together, it uses positions of the file
and read content again in this range
Args:
parser(shlex.shlex): shell parser instance
Returns:
str: function body
Raises:
ValueError: if function body wasn't found or parser input stream doesn't support position reading
"""
io: IO[str] = parser.instream # type: ignore[assignment]
# find start and end positions
start_position, end_position = -1, -1
while token := parser.get_token():
match token:
case PkgbuildToken.FunctionStarts:
start_position = io.tell()
case PkgbuildToken.FunctionEnds:
end_position = io.tell()
break
if not 0 < start_position < end_position:
raise ValueError("Function body wasn't found")
# read the specified interval from source stream
io.seek(start_position - 1) # start from the previous symbol ("{")
content = io.read(end_position - start_position + 1)
return content
@staticmethod
def _parse_token(token: str, parser: shlex.shlex) -> tuple[str, PkgbuildPatch]:
"""
parse single token to the PKGBUILD field
Args:
token(str): current token
parser(shlex.shlex): shell parser instance
Returns:
tuple[str, PkgbuildPatch]: extracted a pair of key and its value
Raises:
StopIteration: if iteration reaches the end of the file
"""
# simple assignment rule
if (match := Pkgbuild._STRING_ASSIGNMENT.match(token)) is not None:
key = match.group("key")
value = match.group("value")
return key, PkgbuildPatch(key, value)
match parser.get_token():
# array processing. Arrays will be sent as "key=", "(", values, ")"
case PkgbuildToken.ArrayStarts if (match := Pkgbuild._ARRAY_ASSIGNMENT.match(token)) is not None:
key = match.group("key")
value = Pkgbuild._parse_array(parser)
return key, PkgbuildPatch(key, value)
# functions processing. Function will be sent as "name", "()", "{", body, "}"
case PkgbuildToken.FunctionDeclaration if Pkgbuild._FUNCTION_DECLARATION.match(token):
key = f"{token}{PkgbuildToken.FunctionDeclaration}"
value = Pkgbuild._parse_function(parser)
return token, PkgbuildPatch(key, value) # this is not mistake, assign to token without ()
# special function case, where "(" and ")" are separated tokens, e.g. "pkgver ( )"
case PkgbuildToken.ArrayStarts if Pkgbuild._FUNCTION_DECLARATION.match(token):
next_token = parser.get_token()
if next_token == PkgbuildToken.ArrayEnds: # replace closing bracket with "()"
next_token = PkgbuildToken.FunctionDeclaration
parser.push_token(next_token) # type: ignore[arg-type]
return Pkgbuild._parse_token(token, parser)
# some random token received without continuation, lets guess it is empty assignment (i.e. key=)
case other if other is not None:
return Pkgbuild._parse_token(other, parser)
# reached the end of the parser
case None:
raise StopIteration
def get_as(self, key: str, **kwargs: T | U) -> T | U:
"""
type guard for getting value by key
Args:
key(str): key name
default(U, optional): default value to return if no key found
Returns:
T | U: value associated with key or default value if no value found and fallback is provided
Raises:
KeyError: if no key found and no default has been provided
"""
if key not in self:
if "default" in kwargs:
return kwargs["default"]
raise KeyError(key)
return cast(T, self[key])
def packages(self) -> dict[str, Self]:
"""
extract properties from internal package functions
Returns:
dict[str, Self]: map of package name to its inner properties if defined
"""
packages = [self["pkgname"]] if isinstance(self["pkgname"], str) else self["pkgname"]
def io(package_name: str) -> IO[str]:
# try to read package specific function and fallback to default otherwise
content = self.get_as(f"package_{package_name}", default=None) or self.get_as("package")
return StringIO(content)
return {package: self.from_io(io(package)) for package in packages}
def __getitem__(self, key: str) -> str | list[str]:
"""
get the field of the PKGBUILD
Args:
key(str): key name
Returns:
str | list[str]: value by the key
"""
return self.fields[key].substitute(self.variables)
def __iter__(self) -> Iterator[str]:
"""
iterate over the fields
Returns:
Iterator[str]: keys iterator
"""
return iter(self.fields)
def __len__(self) -> int:
"""
get length of the mapping
Returns:
int: amount of the fields in this PKGBUILD
"""
return len(self.fields)

View File

@ -21,6 +21,7 @@ import shlex
from dataclasses import dataclass, fields
from pathlib import Path
from string import Template
from typing import Any, Generator, Self
from ahriman.core.utils import dataclass_view, filter_json
@ -167,6 +168,21 @@ class PkgbuildPatch:
return f"{self.key} {self.value}" # no quoting enabled here
return f"""{self.key}={PkgbuildPatch.quote(self.value)}"""
def substitute(self, variables: dict[str, str]) -> str | list[str]:
"""
substitute variables into the value
Args:
variables(dict[str, str]): map of variables available for usage
Returns:
str | list[str]: substituted value. All unknown variables will remain as links to their values.
This function doesn't support recursive substitution
"""
if isinstance(self.value, str):
return Template(self.value).safe_substitute(variables)
return [Template(value).safe_substitute(variables) for value in self.value]
def view(self) -> dict[str, Any]:
"""
generate json patch view