mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-07-16 15:29:56 +00:00
feat: drop explicit makepkg usage (#134)
* generate filenames without using makepkg * pkgbuild parser impl * completely remove makepkg calls * simplify typed get * try to improve parser * docs and recipes updatte * never raise keyerror instead return empty string * udpate tests * add support of array expansion * docs update * tests update * handle quoted control sequences correctly * expand bash * allow packages without package function * docs update * add moroe tests * small improovements * support escaped arrays and functions
This commit is contained in:
@ -47,7 +47,7 @@ class ServiceUpdates(Handler):
|
||||
report(bool): force enable or disable reporting
|
||||
"""
|
||||
remote = Package.from_aur("ahriman", None)
|
||||
_, release = remote.version.rsplit("-", 1) # we don't store pkgrel locally, so we just append it
|
||||
_, release = remote.version.rsplit("-", maxsplit=1) # we don't store pkgrel locally, so we just append it
|
||||
local_version = f"{__version__}-{release}"
|
||||
|
||||
# technically we would like to compare versions, but it is fine to raise an exception in case if locally
|
||||
|
325
src/ahriman/core/alpm/pkgbuild_parser.py
Normal file
325
src/ahriman/core/alpm/pkgbuild_parser.py
Normal file
@ -0,0 +1,325 @@
|
||||
#
|
||||
# Copyright (c) 2021-2024 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import itertools
|
||||
import re
|
||||
import shlex
|
||||
|
||||
from collections.abc import Generator
|
||||
from enum import StrEnum
|
||||
from typing import IO
|
||||
|
||||
from ahriman.core.exceptions import PkgbuildParserError
|
||||
from ahriman.models.pkgbuild_patch import PkgbuildPatch
|
||||
|
||||
|
||||
class PkgbuildToken(StrEnum):
|
||||
"""
|
||||
well-known tokens dictionary
|
||||
|
||||
Attributes:
|
||||
ArrayEnds(PkgbuildToken): (class attribute) array ends token
|
||||
ArrayStarts(PkgbuildToken): (class attribute) array starts token
|
||||
Comma(PkgbuildToken): (class attribute) comma token
|
||||
Comment(PkgbuildToken): (class attribute) comment token
|
||||
FunctionDeclaration(PkgbuildToken): (class attribute) function declaration token
|
||||
FunctionEnds(PkgbuildToken): (class attribute) function ends token
|
||||
FunctionStarts(PkgbuildToken): (class attribute) function starts token
|
||||
"""
|
||||
|
||||
ArrayStarts = "("
|
||||
ArrayEnds = ")"
|
||||
|
||||
Comma = ","
|
||||
|
||||
Comment = "#"
|
||||
|
||||
FunctionDeclaration = "()"
|
||||
FunctionStarts = "{"
|
||||
FunctionEnds = "}"
|
||||
|
||||
|
||||
class PkgbuildParser(shlex.shlex):
|
||||
"""
|
||||
simple pkgbuild reader implementation in pure python, because others suck.
|
||||
|
||||
What is it:
|
||||
|
||||
#. Simple PKGBUILD parser written in python.
|
||||
#. No shell execution, so it is free from random shell attacks.
|
||||
#. Able to parse simple constructions (assignments, comments, functions, arrays).
|
||||
|
||||
What it is not:
|
||||
|
||||
#. Fully functional shell parser.
|
||||
#. Shell executor.
|
||||
#. No parameter expansion.
|
||||
|
||||
For more details what does it support, please, consult with the test cases.
|
||||
|
||||
Examples:
|
||||
This class is heavily based on :mod:`shlex` parser, but instead of strings operates with the
|
||||
:class:`ahriman.models.pkgbuild_patch.PkgbuildPatch` objects. The main way to use it is to call :func:`parse()`
|
||||
function and collect parsed objects, e.g.::
|
||||
|
||||
>>> parser = PkgbuildParser(StringIO("input string"))
|
||||
>>> for patch in parser.parse():
|
||||
>>> print(f"{patch.key} = {patch.value}")
|
||||
|
||||
It doesn't store the state of the fields (but operates with the :mod:`shlex` parser state), so no shell
|
||||
post-processing is performed (e.g. variable substitution).
|
||||
"""
|
||||
|
||||
_ARRAY_ASSIGNMENT = re.compile(r"^(?P<key>\w+)=$")
|
||||
# in addition to usual assignment, functions can have dash
|
||||
_FUNCTION_DECLARATION = re.compile(r"^(?P<key>[\w-]+)$")
|
||||
_STRING_ASSIGNMENT = re.compile(r"^(?P<key>\w+)=(?P<value>.+)$")
|
||||
|
||||
def __init__(self, stream: IO[str]) -> None:
|
||||
"""
|
||||
Args:
|
||||
stream(IO[str]): input stream containing PKGBUILD content
|
||||
"""
|
||||
shlex.shlex.__init__(self, stream, posix=True, punctuation_chars=True)
|
||||
self._io = stream # direct access without type casting
|
||||
|
||||
# ignore substitution and extend bash symbols
|
||||
self.wordchars += "${}#:+-@!"
|
||||
# in case of default behaviour, it will ignore, for example, segment part of url outside of quotes
|
||||
self.commenters = ""
|
||||
|
||||
@staticmethod
|
||||
def _expand_array(array: list[str]) -> list[str]:
|
||||
"""
|
||||
bash array expansion simulator. It takes raw array and tries to expand constructions like
|
||||
``(first prefix-{mid1,mid2}-suffix last)`` into ``(first, prefix-mid1-suffix prefix-mid2-suffix last)``
|
||||
|
||||
Args:
|
||||
array(list[str]): input array
|
||||
|
||||
Returns:
|
||||
list[str]: either source array or expanded array if possible
|
||||
|
||||
Raises:
|
||||
PkgbuildParserError: if there are errors in parser
|
||||
"""
|
||||
# we are using comma as marker for expansion (if any)
|
||||
if PkgbuildToken.Comma not in array:
|
||||
return array
|
||||
# again sanity check, for expansion there are at least 3 elements (first, last and comma)
|
||||
if len(array) < 3:
|
||||
return array
|
||||
|
||||
result = []
|
||||
buffer, prefix = [], None
|
||||
|
||||
for index, (first, second) in enumerate(itertools.pairwise(array)):
|
||||
match (first, second):
|
||||
# in this case we check if expansion should be started
|
||||
# this condition matches "prefix{first", ","
|
||||
case (_, PkgbuildToken.Comma) if PkgbuildToken.FunctionStarts in first:
|
||||
prefix, part = first.rsplit(PkgbuildToken.FunctionStarts, maxsplit=1)
|
||||
buffer.append(f"{prefix}{part}")
|
||||
|
||||
# the last element case, it matches either ",", "last}" or ",", "last}suffix"
|
||||
# in case if there is suffix, it must be appended to all list elements
|
||||
case (PkgbuildToken.Comma, _) if prefix is not None and PkgbuildToken.FunctionEnds in second:
|
||||
part, suffix = second.rsplit(PkgbuildToken.FunctionEnds, maxsplit=1)
|
||||
buffer.append(f"{prefix}{part}")
|
||||
result.extend([f"{part}{suffix}" for part in buffer])
|
||||
# reset state
|
||||
buffer, prefix = [], None
|
||||
|
||||
# we have already prefix string, so we are in progress of expansion
|
||||
# we always operate the last element, so this matches ",", "next"
|
||||
case (PkgbuildToken.Comma, _) if prefix is not None:
|
||||
buffer.append(f"{prefix}{second}")
|
||||
|
||||
# exactly first element of the list
|
||||
case (_, _) if prefix is None and index == 0:
|
||||
result.append(first)
|
||||
|
||||
# any next normal element
|
||||
case (_, _) if prefix is None:
|
||||
result.append(second)
|
||||
|
||||
# small sanity check
|
||||
if prefix is not None:
|
||||
raise PkgbuildParserError("error in array expansion", array)
|
||||
|
||||
return result
|
||||
|
||||
def _is_escaped(self) -> bool:
|
||||
"""
|
||||
check if the last element was quoted. ``shlex.shlex`` parser doesn't provide information about was the token
|
||||
quoted or not, thus there is no difference between "'#'" (diez in quotes) and "#" (diez without quotes). This
|
||||
method simply rolls back to the last non-space character and check if it is a quotation mark
|
||||
|
||||
Returns:
|
||||
bool: ``True`` if the previous element of the stream is a quote or escaped and ``False`` otherwise
|
||||
"""
|
||||
current_position = self._io.tell()
|
||||
|
||||
last_char = penultimate_char = None
|
||||
for index in range(current_position - 1, -1, -1):
|
||||
self._io.seek(index)
|
||||
last_char = self._io.read(1)
|
||||
if last_char.isspace():
|
||||
continue
|
||||
|
||||
if index >= 0:
|
||||
self._io.seek(index - 1)
|
||||
penultimate_char = self._io.read(1)
|
||||
|
||||
break
|
||||
|
||||
self._io.seek(current_position) # reset position of the stream
|
||||
is_quoted = last_char is not None and last_char in self.quotes
|
||||
is_escaped = penultimate_char is not None and penultimate_char in self.escape
|
||||
|
||||
return is_quoted or is_escaped
|
||||
|
||||
def _parse_array(self) -> list[str]:
|
||||
"""
|
||||
parse array from the PKGBUILD. This method will extract tokens from parser until it matches closing array,
|
||||
modifying source parser state
|
||||
|
||||
Returns:
|
||||
list[str]: extracted arrays elements
|
||||
|
||||
Raises:
|
||||
PkgbuildParserError: if array is not closed
|
||||
"""
|
||||
def extract() -> Generator[str, None, None]:
|
||||
while token := self.get_token():
|
||||
match token:
|
||||
case _ if self._is_escaped():
|
||||
pass
|
||||
case PkgbuildToken.ArrayEnds:
|
||||
break
|
||||
case PkgbuildToken.Comment:
|
||||
self.instream.readline()
|
||||
continue
|
||||
yield token
|
||||
|
||||
if token != PkgbuildToken.ArrayEnds:
|
||||
raise PkgbuildParserError("no closing array bracket found")
|
||||
|
||||
return self._expand_array(list(extract()))
|
||||
|
||||
def _parse_function(self) -> str:
|
||||
"""
|
||||
parse function from the PKGBUILD. This method will extract tokens from parser until it matches closing function,
|
||||
modifying source parser state. Instead of trying to combine tokens together, it uses positions of the file
|
||||
and reads content again in this range
|
||||
|
||||
Returns:
|
||||
str: function body
|
||||
|
||||
Raises:
|
||||
PkgbuildParserError: if function body wasn't found or parser input stream doesn't support position reading
|
||||
"""
|
||||
# find start and end positions
|
||||
start_position = end_position = -1
|
||||
counter = 0 # simple processing of the inner "{" and "}"
|
||||
for token in self:
|
||||
match token:
|
||||
case _ if self._is_escaped():
|
||||
continue
|
||||
case PkgbuildToken.FunctionStarts:
|
||||
if counter == 0:
|
||||
start_position = self._io.tell() - 1
|
||||
counter += 1
|
||||
case PkgbuildToken.FunctionEnds:
|
||||
end_position = self._io.tell()
|
||||
counter -= 1
|
||||
if counter == 0:
|
||||
break
|
||||
|
||||
if not 0 < start_position < end_position:
|
||||
raise PkgbuildParserError("function body wasn't found")
|
||||
|
||||
# read the specified interval from source stream
|
||||
self._io.seek(start_position - 1) # start from the previous symbol
|
||||
content = self._io.read(end_position - start_position)
|
||||
|
||||
# special case of the end of file
|
||||
if self.state == self.eof: # type: ignore[attr-defined]
|
||||
content += self._io.read(1)
|
||||
|
||||
# reset position (because the last position was before the next token starts)
|
||||
self._io.seek(end_position)
|
||||
|
||||
return content
|
||||
|
||||
def _parse_token(self, token: str) -> Generator[PkgbuildPatch, None, None]:
|
||||
"""
|
||||
parse single token to the PKGBUILD field
|
||||
|
||||
Args:
|
||||
token(str): current token
|
||||
|
||||
Yields:
|
||||
PkgbuildPatch: extracted a PKGBUILD node
|
||||
"""
|
||||
# simple assignment rule
|
||||
if m := self._STRING_ASSIGNMENT.match(token):
|
||||
key = m.group("key")
|
||||
value = m.group("value")
|
||||
yield PkgbuildPatch(key, value)
|
||||
return
|
||||
|
||||
if token == PkgbuildToken.Comment:
|
||||
self.instream.readline()
|
||||
return
|
||||
|
||||
match self.get_token():
|
||||
# array processing. Arrays will be sent as "key=", "(", values, ")"
|
||||
case PkgbuildToken.ArrayStarts if m := self._ARRAY_ASSIGNMENT.match(token):
|
||||
key = m.group("key")
|
||||
value = self._parse_array()
|
||||
yield PkgbuildPatch(key, value)
|
||||
|
||||
# functions processing. Function will be sent as "name", "()", "{", body, "}"
|
||||
case PkgbuildToken.FunctionDeclaration if self._FUNCTION_DECLARATION.match(token):
|
||||
key = f"{token}{PkgbuildToken.FunctionDeclaration}"
|
||||
value = self._parse_function()
|
||||
yield PkgbuildPatch(key, value) # this is not mistake, assign to token without ()
|
||||
|
||||
# special function case, where "(" and ")" are separated tokens, e.g. "pkgver ( )"
|
||||
case PkgbuildToken.ArrayStarts if self._FUNCTION_DECLARATION.match(token):
|
||||
next_token = self.get_token()
|
||||
if next_token == PkgbuildToken.ArrayEnds: # replace closing bracket with "()"
|
||||
next_token = PkgbuildToken.FunctionDeclaration
|
||||
self.push_token(next_token) # type: ignore[arg-type]
|
||||
yield from self._parse_token(token)
|
||||
|
||||
# some random token received without continuation, lets guess it is empty assignment (i.e. key=)
|
||||
case other if other is not None:
|
||||
yield from self._parse_token(other)
|
||||
|
||||
def parse(self) -> Generator[PkgbuildPatch, None, None]:
|
||||
"""
|
||||
parse source stream and yield parsed entries
|
||||
|
||||
Yields:
|
||||
PkgbuildPatch: extracted a PKGBUILD node
|
||||
"""
|
||||
for token in self:
|
||||
yield from self._parse_token(token)
|
@ -115,7 +115,7 @@ class PackageArchive:
|
||||
Returns:
|
||||
FilesystemPackage: generated pacman package model with empty paths
|
||||
"""
|
||||
package_name, *_ = path.parent.name.rsplit("-", 2)
|
||||
package_name, *_ = path.parent.name.rsplit("-", maxsplit=2)
|
||||
try:
|
||||
pacman_package = OfficialSyncdb.info(package_name, pacman=self.pacman)
|
||||
return FilesystemPackage(
|
||||
|
@ -17,13 +17,14 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
|
||||
from ahriman.core.build_tools.sources import Sources
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.exceptions import BuildError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.utils import check_output
|
||||
from ahriman.core.utils import check_output, package_like
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.pkgbuild_patch import PkgbuildPatch
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
@ -65,12 +66,43 @@ class Task(LazyLogging):
|
||||
self.makepkg_flags = configuration.getlist("build", "makepkg_flags", fallback=[])
|
||||
self.makechrootpkg_flags = configuration.getlist("build", "makechrootpkg_flags", fallback=[])
|
||||
|
||||
def build(self, sources_dir: Path, **kwargs: str | None) -> list[Path]:
|
||||
def _package_archives(self, sources_dir: Path, source_files: list[Path]) -> list[Path]:
|
||||
"""
|
||||
extract package archives from the directory
|
||||
|
||||
Args:
|
||||
sources_dir(Path): path to where sources are
|
||||
source_files(list[Path]): list of files which were initially in the directory
|
||||
|
||||
Returns:
|
||||
list[Path]: list of file paths which looks like freshly generated archives
|
||||
"""
|
||||
def files() -> Generator[Path, None, None]:
|
||||
for filepath in sources_dir.iterdir():
|
||||
if filepath in source_files:
|
||||
continue # skip files which were already there
|
||||
if filepath.suffix == ".log":
|
||||
continue # skip log files
|
||||
if not package_like(filepath):
|
||||
continue # path doesn't look like a package
|
||||
yield filepath
|
||||
|
||||
# debug packages are always formed as package.base-debug
|
||||
# see /usr/share/makepkg/util/pkgbuild.sh for more details
|
||||
debug_package_prefix = f"{self.package.base}-debug-"
|
||||
return [
|
||||
package
|
||||
for package in files()
|
||||
if self.include_debug_packages or not package.name.startswith(debug_package_prefix)
|
||||
]
|
||||
|
||||
def build(self, sources_dir: Path, *, dry_run: bool = False, **kwargs: str | None) -> list[Path]:
|
||||
"""
|
||||
run package build
|
||||
|
||||
Args:
|
||||
sources_dir(Path): path to where sources are
|
||||
dry_run(bool, optional): do not perform build itself (Default value = False)
|
||||
**kwargs(str | None): environment variables to be passed to build processes
|
||||
|
||||
Returns:
|
||||
@ -80,6 +112,8 @@ class Task(LazyLogging):
|
||||
command.extend(self.archbuild_flags)
|
||||
command.extend(["--"] + self.makechrootpkg_flags)
|
||||
command.extend(["--"] + self.makepkg_flags)
|
||||
if dry_run:
|
||||
command.extend(["--nobuild"])
|
||||
self.logger.info("using %s for %s", command, self.package.base)
|
||||
|
||||
environment: dict[str, str] = {
|
||||
@ -89,6 +123,7 @@ class Task(LazyLogging):
|
||||
}
|
||||
self.logger.info("using environment variables %s", environment)
|
||||
|
||||
source_files = list(sources_dir.iterdir())
|
||||
check_output(
|
||||
*command,
|
||||
exception=BuildError.from_process(self.package.base),
|
||||
@ -98,20 +133,7 @@ class Task(LazyLogging):
|
||||
environment=environment,
|
||||
)
|
||||
|
||||
package_list_command = ["makepkg", "--packagelist"]
|
||||
if not self.include_debug_packages:
|
||||
package_list_command.append("OPTIONS=(!debug)") # disable debug flag manually
|
||||
packages = check_output(
|
||||
*package_list_command,
|
||||
exception=BuildError.from_process(self.package.base),
|
||||
cwd=sources_dir,
|
||||
logger=self.logger,
|
||||
environment=environment,
|
||||
).splitlines()
|
||||
# some dirty magic here
|
||||
# the filter is applied in order to make sure that result will only contain packages which were actually built
|
||||
# e.g. in some cases packagelist command produces debug packages which were not actually built
|
||||
return list(filter(lambda path: path.is_file(), map(Path, packages)))
|
||||
return self._package_archives(sources_dir, source_files)
|
||||
|
||||
def init(self, sources_dir: Path, patches: list[PkgbuildPatch], local_version: str | None) -> str | None:
|
||||
"""
|
||||
|
@ -24,16 +24,7 @@ import sys
|
||||
from collections.abc import Generator, Mapping, MutableMapping
|
||||
from string import Template
|
||||
|
||||
|
||||
class ExtendedTemplate(Template):
|
||||
"""
|
||||
extension to the default :class:`Template` class, which also enabled braces regex to lookup in sections
|
||||
|
||||
Attributes:
|
||||
braceidpattern(str): regular expression to match a colon inside braces
|
||||
"""
|
||||
|
||||
braceidpattern = r"(?a:[_a-z0-9][_a-z0-9:]*)"
|
||||
from ahriman.core.configuration.shell_template import ShellTemplate
|
||||
|
||||
|
||||
class ShellInterpolator(configparser.Interpolation):
|
||||
@ -60,7 +51,7 @@ class ShellInterpolator(configparser.Interpolation):
|
||||
"""
|
||||
def identifiers() -> Generator[tuple[str | None, str], None, None]:
|
||||
# extract all found identifiers and parse them
|
||||
for identifier in ExtendedTemplate(value).get_identifiers():
|
||||
for identifier in ShellTemplate(value).get_identifiers():
|
||||
match identifier.split(":"):
|
||||
case [lookup_option]: # single option from the same section
|
||||
yield None, lookup_option
|
||||
@ -121,7 +112,7 @@ class ShellInterpolator(configparser.Interpolation):
|
||||
|
||||
# resolve internal references
|
||||
variables = dict(self._extract_variables(parser, value, defaults))
|
||||
internal = ExtendedTemplate(escaped).safe_substitute(variables)
|
||||
internal = ShellTemplate(escaped).safe_substitute(variables)
|
||||
|
||||
# resolve enriched environment variables by using default Template class
|
||||
environment = Template(internal).safe_substitute(self.environment())
|
||||
|
158
src/ahriman/core/configuration/shell_template.py
Normal file
158
src/ahriman/core/configuration/shell_template.py
Normal file
@ -0,0 +1,158 @@
|
||||
#
|
||||
# Copyright (c) 2021-2024 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import fnmatch
|
||||
import re
|
||||
|
||||
from collections.abc import Generator, Mapping
|
||||
from string import Template
|
||||
|
||||
|
||||
class ShellTemplate(Template):
|
||||
"""
|
||||
extension to the default :class:`Template` class, which also adds additional tokens to braced regex and enables
|
||||
bash expansion
|
||||
|
||||
Attributes:
|
||||
braceidpattern(str): regular expression to match every character except for closing bracket
|
||||
"""
|
||||
|
||||
braceidpattern = r"(?a:[_a-z0-9][^}]*)"
|
||||
|
||||
_REMOVE_BACK = re.compile(r"^(?P<key>\w+)%(?P<pattern>.+)$")
|
||||
_REMOVE_FRONT = re.compile(r"^(?P<key>\w+)#(?P<pattern>.+)$")
|
||||
_REPLACE = re.compile(r"^(?P<key>\w+)/(?P<pattern>.+)/(?P<replacement>.+)$")
|
||||
|
||||
@staticmethod
|
||||
def _remove_back(source: str, pattern: str, *, greedy: bool) -> str:
|
||||
"""
|
||||
resolve "${var%(%)pattern}" constructions
|
||||
|
||||
Args:
|
||||
source(str): source string to match the pattern inside
|
||||
pattern(str): shell expression to match
|
||||
greedy(bool): match as much as possible or not
|
||||
|
||||
Returns:
|
||||
str: result after removal ``pattern`` from the end of the string
|
||||
"""
|
||||
regex = fnmatch.translate(pattern)
|
||||
compiled = re.compile(regex)
|
||||
|
||||
result = source
|
||||
start_pos = 0
|
||||
|
||||
while m := compiled.search(source, start_pos):
|
||||
result = source[:m.start()]
|
||||
start_pos += m.start() + 1
|
||||
if greedy:
|
||||
break
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _remove_front(source: str, pattern: str, *, greedy: bool) -> str:
|
||||
"""
|
||||
resolve "${var#(#)pattern}" constructions
|
||||
|
||||
Args:
|
||||
source(str): source string to match the pattern inside
|
||||
pattern(str): shell expression to match
|
||||
greedy(bool): match as much as possible or not
|
||||
|
||||
Returns:
|
||||
str: result after removal ``pattern`` from the start of the string
|
||||
"""
|
||||
regex = fnmatch.translate(pattern)[:-2] # remove \Z at the end of the regex
|
||||
if not greedy:
|
||||
regex = regex.replace("*", "*?")
|
||||
compiled = re.compile(regex)
|
||||
|
||||
m = compiled.match(source)
|
||||
if m is None:
|
||||
return source
|
||||
|
||||
return source[m.end():]
|
||||
|
||||
@staticmethod
|
||||
def _replace(source: str, pattern: str, replacement: str, *, greedy: bool) -> str:
|
||||
"""
|
||||
resolve "${var/(/)pattern/replacement}" constructions
|
||||
|
||||
Args:
|
||||
source(str): source string to match the pattern inside
|
||||
pattern(str): shell expression to match
|
||||
replacement(str): new substring
|
||||
greedy(bool): replace as much as possible or not
|
||||
|
||||
Returns:
|
||||
str: result after replacing ``pattern`` by ``replacement``
|
||||
"""
|
||||
match pattern:
|
||||
case from_back if from_back.startswith("%"):
|
||||
removed = ShellTemplate._remove_back(source, from_back[1:], greedy=False)
|
||||
return removed if removed == source else removed + replacement
|
||||
|
||||
case from_front if from_front.startswith("#"):
|
||||
removed = ShellTemplate._remove_front(source, from_front[1:], greedy=False)
|
||||
return removed if removed == source else replacement + removed
|
||||
|
||||
case regular:
|
||||
regex = fnmatch.translate(regular)[:-2] # remove \Z at the end of the regex
|
||||
compiled = re.compile(regex)
|
||||
return compiled.sub(replacement, source, count=not greedy)
|
||||
|
||||
def shell_substitute(self, mapping: Mapping[str, str], /, **kwargs: str) -> str:
|
||||
"""
|
||||
this method behaves the same as :func:`safe_substitute`, however also expands bash string operations
|
||||
|
||||
Args:
|
||||
mapping(Mapping[str, str]): key-value dictionary of variables
|
||||
**kwargs(str): key-value dictionary of variables passed as kwargs
|
||||
|
||||
Returns:
|
||||
str: string with replaced values
|
||||
"""
|
||||
substitutions = (
|
||||
(self._REMOVE_BACK, self._remove_back, "%"),
|
||||
(self._REMOVE_FRONT, self._remove_front, "#"),
|
||||
(self._REPLACE, self._replace, "/"),
|
||||
)
|
||||
|
||||
def generator(variables: dict[str, str]) -> Generator[tuple[str, str], None, None]:
|
||||
for identifier in self.get_identifiers():
|
||||
for regex, function, greediness in substitutions:
|
||||
if m := regex.match(identifier):
|
||||
source = variables.get(m.group("key"))
|
||||
if source is None:
|
||||
continue
|
||||
|
||||
# replace pattern with non-greedy
|
||||
pattern = m.group("pattern").removeprefix(greediness)
|
||||
greedy = m.group("pattern").startswith(greediness)
|
||||
# gather all additional args
|
||||
args = {key: value for key, value in m.groupdict().items() if key not in ("key", "pattern")}
|
||||
|
||||
yield identifier, function(source, pattern, **args, greedy=greedy)
|
||||
break
|
||||
|
||||
kwargs.update(mapping)
|
||||
substituted = dict(generator(kwargs))
|
||||
|
||||
return self.safe_substitute(kwargs | substituted)
|
@ -212,6 +212,23 @@ class PacmanError(RuntimeError):
|
||||
RuntimeError.__init__(self, f"Could not perform operation with pacman: `{details}`")
|
||||
|
||||
|
||||
class PkgbuildParserError(ValueError):
|
||||
"""
|
||||
exception raises in case of PKGBUILD parser errors
|
||||
"""
|
||||
|
||||
def __init__(self, reason: str, source: Any = None) -> None:
|
||||
"""
|
||||
Args:
|
||||
reason(str): parser error reason
|
||||
source(Any, optional): source line if available (Default value = None)
|
||||
"""
|
||||
message = f"Could not parse PKGBUILD: {reason}"
|
||||
if source is not None:
|
||||
message += f", source: `{source}`"
|
||||
ValueError.__init__(self, message)
|
||||
|
||||
|
||||
class PathError(ValueError):
|
||||
"""
|
||||
exception which will be raised on path which is not belong to root directory
|
||||
|
@ -58,7 +58,7 @@ class PackageInfo(RepositoryProperties):
|
||||
# force version to max of them
|
||||
self.logger.warning("version of %s differs, found %s and %s",
|
||||
current.base, current.version, local.version)
|
||||
if current.is_outdated(local, self.paths, calculate_version=False):
|
||||
if current.is_outdated(local, self.configuration, calculate_version=False):
|
||||
current.version = local.version
|
||||
current.packages.update(local.packages)
|
||||
except Exception:
|
||||
|
@ -51,7 +51,6 @@ class RepositoryProperties(EventLogger, LazyLogging):
|
||||
scan_paths(ScanPaths): scan paths for the implicit dependencies
|
||||
sign(GPG): GPG wrapper instance
|
||||
triggers(TriggerLoader): triggers holder
|
||||
vcs_allowed_age(int): maximal age of the VCS packages before they will be checked
|
||||
"""
|
||||
|
||||
def __init__(self, repository_id: RepositoryId, configuration: Configuration, database: SQLite, *, report: bool,
|
||||
@ -68,8 +67,6 @@ class RepositoryProperties(EventLogger, LazyLogging):
|
||||
self.configuration = configuration
|
||||
self.database = database
|
||||
|
||||
self.vcs_allowed_age = configuration.getint("build", "vcs_allowed_age", fallback=0)
|
||||
|
||||
self.paths: RepositoryPaths = configuration.repository_paths # additional workaround for pycharm typing
|
||||
|
||||
self.ignore_list = configuration.getlist("build", "ignore_packages", fallback=[])
|
||||
|
@ -67,10 +67,7 @@ class UpdateHandler(PackageInfo, Cleaner):
|
||||
try:
|
||||
remote = load_remote(local)
|
||||
|
||||
if local.is_outdated(
|
||||
remote, self.paths,
|
||||
vcs_allowed_age=self.vcs_allowed_age,
|
||||
calculate_version=vcs):
|
||||
if local.is_outdated(remote, self.configuration, calculate_version=vcs):
|
||||
self.reporter.set_pending(local.base)
|
||||
self.event(local.base, EventType.PackageOutdated, "Remote version is newer than local")
|
||||
result.append(remote)
|
||||
@ -156,9 +153,7 @@ class UpdateHandler(PackageInfo, Cleaner):
|
||||
if local.remote.is_remote:
|
||||
continue # avoid checking AUR packages
|
||||
|
||||
if local.is_outdated(remote, self.paths,
|
||||
vcs_allowed_age=self.vcs_allowed_age,
|
||||
calculate_version=vcs):
|
||||
if local.is_outdated(remote, self.configuration, calculate_version=vcs):
|
||||
self.reporter.set_pending(local.base)
|
||||
self.event(local.base, EventType.PackageOutdated, "Locally pulled sources are outdated")
|
||||
result.append(remote)
|
||||
|
@ -197,7 +197,7 @@ class Watcher(LazyLogging):
|
||||
proxy methods for reporter client
|
||||
|
||||
Args:
|
||||
item(str): property name:
|
||||
item(str): property name
|
||||
|
||||
Returns:
|
||||
Any: attribute by its name
|
||||
|
@ -27,7 +27,7 @@ import re
|
||||
import selectors
|
||||
import subprocess
|
||||
|
||||
from collections.abc import Callable, Generator, Iterable
|
||||
from collections.abc import Callable, Generator, Iterable, Mapping
|
||||
from dataclasses import asdict
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
@ -407,7 +407,7 @@ def safe_filename(source: str) -> str:
|
||||
return re.sub(r"[^A-Za-z\d\-._~:\[\]@]", "-", source)
|
||||
|
||||
|
||||
def srcinfo_property(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[str, Any], *,
|
||||
def srcinfo_property(key: str, srcinfo: Mapping[str, Any], package_srcinfo: Mapping[str, Any], *,
|
||||
default: Any = None) -> Any:
|
||||
"""
|
||||
extract property from SRCINFO. This method extracts property from package if this property is presented in
|
||||
@ -416,8 +416,8 @@ def srcinfo_property(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[st
|
||||
|
||||
Args:
|
||||
key(str): key to extract
|
||||
srcinfo(dict[str, Any]): root structure of SRCINFO
|
||||
package_srcinfo(dict[str, Any]): package specific SRCINFO
|
||||
srcinfo(Mapping[str, Any]): root structure of SRCINFO
|
||||
package_srcinfo(Mapping[str, Any]): package specific SRCINFO
|
||||
default(Any, optional): the default value for the specified key (Default value = None)
|
||||
|
||||
Returns:
|
||||
@ -426,7 +426,7 @@ def srcinfo_property(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[st
|
||||
return package_srcinfo.get(key) or srcinfo.get(key) or default
|
||||
|
||||
|
||||
def srcinfo_property_list(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[str, Any], *,
|
||||
def srcinfo_property_list(key: str, srcinfo: Mapping[str, Any], package_srcinfo: Mapping[str, Any], *,
|
||||
architecture: str | None = None) -> list[Any]:
|
||||
"""
|
||||
extract list property from SRCINFO. Unlike :func:`srcinfo_property()` it supposes that default return value is
|
||||
@ -435,8 +435,8 @@ def srcinfo_property_list(key: str, srcinfo: dict[str, Any], package_srcinfo: di
|
||||
|
||||
Args:
|
||||
key(str): key to extract
|
||||
srcinfo(dict[str, Any]): root structure of SRCINFO
|
||||
package_srcinfo(dict[str, Any]): package specific SRCINFO
|
||||
srcinfo(Mapping[str, Any]): root structure of SRCINFO
|
||||
package_srcinfo(Mapping[str, Any]): package specific SRCINFO
|
||||
architecture(str | None, optional): package architecture if set (Default value = None)
|
||||
|
||||
Returns:
|
||||
|
@ -26,19 +26,18 @@ from collections.abc import Callable, Generator, Iterable
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from pyalpm import vercmp # type: ignore[import-not-found]
|
||||
from srcinfo.parse import parse_srcinfo # type: ignore[import-untyped]
|
||||
from typing import Any, Self
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from ahriman.core.alpm.pacman import Pacman
|
||||
from ahriman.core.alpm.remote import AUR, Official, OfficialSyncdb
|
||||
from ahriman.core.exceptions import PackageInfoError
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.utils import check_output, dataclass_view, full_version, parse_version, srcinfo_property_list, utcnow
|
||||
from ahriman.core.utils import dataclass_view, full_version, parse_version, srcinfo_property_list, utcnow
|
||||
from ahriman.models.package_description import PackageDescription
|
||||
from ahriman.models.package_source import PackageSource
|
||||
from ahriman.models.pkgbuild import Pkgbuild
|
||||
from ahriman.models.remote_source import RemoteSource
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
|
||||
@dataclass(kw_only=True)
|
||||
@ -255,25 +254,19 @@ class Package(LazyLogging):
|
||||
|
||||
Returns:
|
||||
Self: package properties
|
||||
|
||||
Raises:
|
||||
PackageInfoError: if there are parsing errors
|
||||
"""
|
||||
srcinfo_source = check_output("makepkg", "--printsrcinfo", cwd=path)
|
||||
srcinfo, errors = parse_srcinfo(srcinfo_source)
|
||||
if errors:
|
||||
raise PackageInfoError(errors)
|
||||
pkgbuild = Pkgbuild.from_file(path / "PKGBUILD")
|
||||
|
||||
packages = {
|
||||
package: PackageDescription(
|
||||
depends=srcinfo_property_list("depends", srcinfo, properties, architecture=architecture),
|
||||
make_depends=srcinfo_property_list("makedepends", srcinfo, properties, architecture=architecture),
|
||||
opt_depends=srcinfo_property_list("optdepends", srcinfo, properties, architecture=architecture),
|
||||
check_depends=srcinfo_property_list("checkdepends", srcinfo, properties, architecture=architecture),
|
||||
depends=srcinfo_property_list("depends", pkgbuild, properties, architecture=architecture),
|
||||
make_depends=srcinfo_property_list("makedepends", pkgbuild, properties, architecture=architecture),
|
||||
opt_depends=srcinfo_property_list("optdepends", pkgbuild, properties, architecture=architecture),
|
||||
check_depends=srcinfo_property_list("checkdepends", pkgbuild, properties, architecture=architecture),
|
||||
)
|
||||
for package, properties in srcinfo["packages"].items()
|
||||
for package, properties in pkgbuild.packages().items()
|
||||
}
|
||||
version = full_version(srcinfo.get("epoch"), srcinfo["pkgver"], srcinfo["pkgrel"])
|
||||
version = full_version(pkgbuild.get("epoch"), pkgbuild["pkgver"], pkgbuild["pkgrel"])
|
||||
|
||||
remote = RemoteSource(
|
||||
source=PackageSource.Local,
|
||||
@ -284,7 +277,7 @@ class Package(LazyLogging):
|
||||
)
|
||||
|
||||
return cls(
|
||||
base=srcinfo["pkgbase"],
|
||||
base=pkgbuild["pkgbase"],
|
||||
version=version,
|
||||
remote=remote,
|
||||
packages=packages,
|
||||
@ -363,18 +356,14 @@ class Package(LazyLogging):
|
||||
Raises:
|
||||
PackageInfoError: if there are parsing errors
|
||||
"""
|
||||
srcinfo_source = check_output("makepkg", "--printsrcinfo", cwd=path)
|
||||
srcinfo, errors = parse_srcinfo(srcinfo_source)
|
||||
if errors:
|
||||
raise PackageInfoError(errors)
|
||||
|
||||
pkgbuild = Pkgbuild.from_file(path / "PKGBUILD")
|
||||
# we could use arch property, but for consistency it is better to call special method
|
||||
architectures = Package.supported_architectures(path)
|
||||
|
||||
for architecture in architectures:
|
||||
for source in srcinfo_property_list("source", srcinfo, {}, architecture=architecture):
|
||||
for source in srcinfo_property_list("source", pkgbuild, {}, architecture=architecture):
|
||||
if "::" in source:
|
||||
_, source = source.split("::", 1) # in case if filename is specified, remove it
|
||||
_, source = source.split("::", maxsplit=1) # in case if filename is specified, remove it
|
||||
|
||||
if urlparse(source).scheme:
|
||||
# basically file schema should use absolute path which is impossible if we are distributing
|
||||
@ -383,7 +372,7 @@ class Package(LazyLogging):
|
||||
|
||||
yield Path(source)
|
||||
|
||||
if (install := srcinfo.get("install", None)) is not None:
|
||||
if (install := pkgbuild.get("install")) is not None:
|
||||
yield Path(install)
|
||||
|
||||
@staticmethod
|
||||
@ -396,15 +385,9 @@ class Package(LazyLogging):
|
||||
|
||||
Returns:
|
||||
set[str]: list of package supported architectures
|
||||
|
||||
Raises:
|
||||
PackageInfoError: if there are parsing errors
|
||||
"""
|
||||
srcinfo_source = check_output("makepkg", "--printsrcinfo", cwd=path)
|
||||
srcinfo, errors = parse_srcinfo(srcinfo_source)
|
||||
if errors:
|
||||
raise PackageInfoError(errors)
|
||||
return set(srcinfo.get("arch", []))
|
||||
pkgbuild = Pkgbuild.from_file(path / "PKGBUILD")
|
||||
return set(pkgbuild.get("arch", []))
|
||||
|
||||
def _package_list_property(self, extractor: Callable[[PackageDescription], list[str]]) -> list[str]:
|
||||
"""
|
||||
@ -426,39 +409,39 @@ class Package(LazyLogging):
|
||||
|
||||
return sorted(set(generator()))
|
||||
|
||||
def actual_version(self, paths: RepositoryPaths) -> str:
|
||||
def actual_version(self, configuration: Configuration) -> str:
|
||||
"""
|
||||
additional method to handle VCS package versions
|
||||
|
||||
Args:
|
||||
paths(RepositoryPaths): repository paths instance
|
||||
configuration(Configuration): configuration instance
|
||||
|
||||
Returns:
|
||||
str: package version if package is not VCS and current version according to VCS otherwise
|
||||
|
||||
Raises:
|
||||
PackageInfoError: if there are parsing errors
|
||||
"""
|
||||
if not self.is_vcs:
|
||||
return self.version
|
||||
|
||||
from ahriman.core.build_tools.sources import Sources
|
||||
from ahriman.core.build_tools.task import Task
|
||||
|
||||
Sources.load(paths.cache_for(self.base), self, [], paths)
|
||||
_, repository_id = configuration.check_loaded()
|
||||
paths = configuration.repository_paths
|
||||
task = Task(self, configuration, repository_id.architecture, paths)
|
||||
|
||||
try:
|
||||
# update pkgver first
|
||||
check_output("makepkg", "--nodeps", "--nobuild", cwd=paths.cache_for(self.base), logger=self.logger)
|
||||
# generate new .SRCINFO and put it to parser
|
||||
srcinfo_source = check_output("makepkg", "--printsrcinfo",
|
||||
cwd=paths.cache_for(self.base), logger=self.logger)
|
||||
srcinfo, errors = parse_srcinfo(srcinfo_source)
|
||||
if errors:
|
||||
raise PackageInfoError(errors)
|
||||
# create fresh chroot environment, fetch sources and - automagically - update PKGBUILD
|
||||
task.init(paths.cache_for(self.base), [], None)
|
||||
task.build(paths.cache_for(self.base), dry_run=True)
|
||||
|
||||
return full_version(srcinfo.get("epoch"), srcinfo["pkgver"], srcinfo["pkgrel"])
|
||||
pkgbuild = Pkgbuild.from_file(paths.cache_for(self.base) / "PKGBUILD")
|
||||
|
||||
return full_version(pkgbuild.get("epoch"), pkgbuild["pkgver"], pkgbuild["pkgrel"])
|
||||
except Exception:
|
||||
self.logger.exception("cannot determine version of VCS package, make sure that VCS tools are installed")
|
||||
self.logger.exception("cannot determine version of VCS package")
|
||||
finally:
|
||||
# clear log files generated by devtools
|
||||
for log_file in paths.cache_for(self.base).glob("*.log"):
|
||||
log_file.unlink()
|
||||
|
||||
return self.version
|
||||
|
||||
@ -513,26 +496,25 @@ class Package(LazyLogging):
|
||||
if package.build_date is not None
|
||||
)
|
||||
|
||||
def is_outdated(self, remote: Package, paths: RepositoryPaths, *,
|
||||
vcs_allowed_age: float | int = 0,
|
||||
def is_outdated(self, remote: Package, configuration: Configuration, *,
|
||||
calculate_version: bool = True) -> bool:
|
||||
"""
|
||||
check if package is out-of-dated
|
||||
|
||||
Args:
|
||||
remote(Package): package properties from remote source
|
||||
paths(RepositoryPaths): repository paths instance. Required for VCS packages cache
|
||||
vcs_allowed_age(float | int, optional): max age of the built packages before they will be
|
||||
forced to calculate actual version (Default value = 0)
|
||||
configuration(Configuration): configuration instance
|
||||
calculate_version(bool, optional): expand version to actual value (by calculating git versions)
|
||||
(Default value = True)
|
||||
|
||||
Returns:
|
||||
bool: ``True`` if the package is out-of-dated and ``False`` otherwise
|
||||
"""
|
||||
vcs_allowed_age = configuration.getint("build", "vcs_allowed_age", fallback=0)
|
||||
min_vcs_build_date = utcnow().timestamp() - vcs_allowed_age
|
||||
|
||||
if calculate_version and not self.is_newer_than(min_vcs_build_date):
|
||||
remote_version = remote.actual_version(paths)
|
||||
remote_version = remote.actual_version(configuration)
|
||||
else:
|
||||
remote_version = remote.version
|
||||
|
||||
|
149
src/ahriman/models/pkgbuild.py
Normal file
149
src/ahriman/models/pkgbuild.py
Normal file
@ -0,0 +1,149 @@
|
||||
#
|
||||
# Copyright (c) 2021-2024 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from collections.abc import Iterator, Mapping
|
||||
from dataclasses import dataclass
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
from typing import Any, IO, Self
|
||||
|
||||
from ahriman.core.alpm.pkgbuild_parser import PkgbuildParser, PkgbuildToken
|
||||
from ahriman.models.pkgbuild_patch import PkgbuildPatch
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Pkgbuild(Mapping[str, Any]):
|
||||
"""
|
||||
model and proxy for PKGBUILD properties
|
||||
|
||||
Attributes:
|
||||
fields(dict[str, PkgbuildPatch]): PKGBUILD fields
|
||||
"""
|
||||
|
||||
fields: dict[str, PkgbuildPatch]
|
||||
|
||||
@property
|
||||
def variables(self) -> dict[str, str]:
|
||||
"""
|
||||
list of variables defined and (maybe) used in this PKGBUILD
|
||||
|
||||
Returns:
|
||||
dict[str, str]: map of variable name to its value. The value will be included here in case if it presented
|
||||
in the internal dictionary, it is not a function and the value has string type
|
||||
"""
|
||||
return {
|
||||
key: value.value
|
||||
for key, value in self.fields.items()
|
||||
if not value.is_function and isinstance(value.value, str)
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path: Path) -> Self:
|
||||
"""
|
||||
parse PKGBUILD from the file
|
||||
|
||||
Args:
|
||||
path(Path): path to the PKGBUILD file
|
||||
|
||||
Returns:
|
||||
Self: constructed instance of self
|
||||
"""
|
||||
with path.open() as input_file:
|
||||
return cls.from_io(input_file)
|
||||
|
||||
@classmethod
|
||||
def from_io(cls, stream: IO[str]) -> Self:
|
||||
"""
|
||||
parse PKGBUILD from input stream
|
||||
|
||||
Args:
|
||||
stream(IO[str]): input stream containing PKGBUILD content
|
||||
|
||||
Returns:
|
||||
Self: constructed instance of self
|
||||
"""
|
||||
parser = PkgbuildParser(stream)
|
||||
fields = {patch.key: patch for patch in parser.parse()}
|
||||
|
||||
# pkgbase is optional field, the pkgname must be used instead if not set
|
||||
# however, pkgname is not presented is "package()" functions which we are parsing here too,
|
||||
# thus, in our terms, it is optional too
|
||||
if "pkgbase" not in fields and "pkgname" in fields:
|
||||
fields["pkgbase"] = PkgbuildPatch("pkgbase", fields["pkgname"].value)
|
||||
|
||||
return cls({key: value for key, value in fields.items() if key})
|
||||
|
||||
def packages(self) -> dict[str, Self]:
|
||||
"""
|
||||
extract properties from internal package functions
|
||||
|
||||
Returns:
|
||||
dict[str, Self]: map of package name to its inner properties if defined
|
||||
"""
|
||||
packages = [self["pkgname"]] if isinstance(self["pkgname"], str) else self["pkgname"]
|
||||
|
||||
def io(package_name: str) -> IO[str]:
|
||||
# try to read package specific function and fallback to default otherwise
|
||||
content = self.get(f"package_{package_name}") or self.get("package") or ""
|
||||
return StringIO(content)
|
||||
|
||||
return {package: self.from_io(io(package)) for package in packages}
|
||||
|
||||
def __getitem__(self, item: str) -> Any:
|
||||
"""
|
||||
get the field of the PKGBUILD. This method tries to get exact key value if possible; if none found, it tries to
|
||||
fetch function with the same name
|
||||
|
||||
Args:
|
||||
item(str): key name
|
||||
|
||||
Returns:
|
||||
Any: substituted value by the key
|
||||
|
||||
Raises:
|
||||
KeyError: if key doesn't exist
|
||||
"""
|
||||
value = self.fields.get(item)
|
||||
# if the key wasn't found and user didn't ask for function explicitly, we can try to get by function name
|
||||
if value is None and not item.endswith(PkgbuildToken.FunctionDeclaration):
|
||||
value = self.fields.get(f"{item}{PkgbuildToken.FunctionDeclaration}")
|
||||
|
||||
# if we still didn't find anything, we can just raise the exception
|
||||
if value is None:
|
||||
raise KeyError(item)
|
||||
|
||||
return value.substitute(self.variables)
|
||||
|
||||
def __iter__(self) -> Iterator[str]:
|
||||
"""
|
||||
iterate over the fields
|
||||
|
||||
Returns:
|
||||
Iterator[str]: keys iterator
|
||||
"""
|
||||
return iter(self.fields)
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""
|
||||
get length of the mapping
|
||||
|
||||
Returns:
|
||||
int: amount of the fields in this PKGBUILD
|
||||
"""
|
||||
return len(self.fields)
|
@ -23,6 +23,7 @@ from dataclasses import dataclass, fields
|
||||
from pathlib import Path
|
||||
from typing import Any, Generator, Self
|
||||
|
||||
from ahriman.core.configuration.shell_template import ShellTemplate
|
||||
from ahriman.core.utils import dataclass_view, filter_json
|
||||
|
||||
|
||||
@ -167,6 +168,21 @@ class PkgbuildPatch:
|
||||
return f"{self.key} {self.value}" # no quoting enabled here
|
||||
return f"""{self.key}={PkgbuildPatch.quote(self.value)}"""
|
||||
|
||||
def substitute(self, variables: dict[str, str]) -> str | list[str]:
|
||||
"""
|
||||
substitute variables into the value
|
||||
|
||||
Args:
|
||||
variables(dict[str, str]): map of variables available for usage
|
||||
|
||||
Returns:
|
||||
str | list[str]: substituted value. All unknown variables will remain as links to their values.
|
||||
This function doesn't support recursive substitution
|
||||
"""
|
||||
if isinstance(self.value, str):
|
||||
return ShellTemplate(self.value).shell_substitute(variables)
|
||||
return [ShellTemplate(value).shell_substitute(variables) for value in self.value]
|
||||
|
||||
def view(self) -> dict[str, Any]:
|
||||
"""
|
||||
generate json patch view
|
||||
|
Reference in New Issue
Block a user