Support type triggers (#96)

* implement mirrorlist package generator

* implement keyring package generator

* docs update

* do not skip empty lines

* fill remote source for local packages

* faq update
This commit is contained in:
2023-05-25 16:27:16 +03:00
committed by GitHub
parent 8b2601beaa
commit adb5b58a1d
55 changed files with 2304 additions and 76 deletions

View File

@ -100,6 +100,8 @@ def _parser() -> argparse.ArgumentParser:
_set_patch_set_add_parser(subparsers)
_set_repo_backup_parser(subparsers)
_set_repo_check_parser(subparsers)
_set_repo_create_keyring_parser(subparsers)
_set_repo_create_mirrorlist_parser(subparsers)
_set_repo_daemon_parser(subparsers)
_set_repo_rebuild_parser(subparsers)
_set_repo_remove_unknown_parser(subparsers)
@ -478,6 +480,44 @@ def _set_repo_check_parser(root: SubParserAction) -> argparse.ArgumentParser:
return parser
def _set_repo_create_keyring_parser(root: SubParserAction) -> argparse.ArgumentParser:
"""
add parser for create-keyring subcommand
Args:
root(SubParserAction): subparsers for the commands
Returns:
argparse.ArgumentParser: created argument parser
"""
parser = root.add_parser("repo-create-keyring", help="create keyring package",
description="create package which contains list of trusted keys as set by "
"configuration. Note, that this action will only create package, the package "
"itself has to be built manually",
formatter_class=_formatter)
parser.set_defaults(handler=handlers.Triggers, trigger=["ahriman.core.support.KeyringTrigger"])
return parser
def _set_repo_create_mirrorlist_parser(root: SubParserAction) -> argparse.ArgumentParser:
"""
add parser for create-mirrorlist subcommand
Args:
root(SubParserAction): subparsers for the commands
Returns:
argparse.ArgumentParser: created argument parser
"""
parser = root.add_parser("repo-create-mirrorlist", help="create mirrorlist package",
description="create package which contains list of available mirrors as set by "
"configuration. Note, that this action will only create package, the package "
"itself has to be built manually",
formatter_class=_formatter)
parser.set_defaults(handler=handlers.Triggers, trigger=["ahriman.core.support.MirrorlistTrigger"])
return parser
def _set_repo_daemon_parser(root: SubParserAction) -> argparse.ArgumentParser:
"""
add parser for daemon subcommand

View File

@ -26,6 +26,7 @@ from typing import Any
from ahriman.application.application.application_properties import ApplicationProperties
from ahriman.core.build_tools.sources import Sources
from ahriman.core.exceptions import UnknownPackageError
from ahriman.core.util import package_like
from ahriman.models.package import Package
from ahriman.models.package_source import PackageSource
@ -43,8 +44,14 @@ class ApplicationPackages(ApplicationProperties):
Args:
source(str): path to package archive
Raises:
UnknownPackageError: if specified path doesn't exist
"""
local_path = Path(source)
if not local_path.is_file():
raise UnknownPackageError(source)
dst = self.repository.paths.packages / local_path.name
shutil.copy(local_path, dst)
@ -68,6 +75,9 @@ class ApplicationPackages(ApplicationProperties):
source(str): path to local directory
"""
local_dir = Path(source)
if not local_dir.is_dir():
raise UnknownPackageError(source)
for full_path in filter(package_like, local_dir.iterdir()):
self._add_archive(str(full_path))
@ -77,12 +87,19 @@ class ApplicationPackages(ApplicationProperties):
Args:
source(str): path to directory with local source files
Raises:
UnknownPackageError: if specified package is unknown or doesn't exist
"""
source_dir = Path(source)
package = Package.from_build(source_dir, self.architecture)
cache_dir = self.repository.paths.cache_for(package.base)
shutil.copytree(source_dir, cache_dir) # copy package to store in caches
Sources.init(cache_dir) # we need to run init command in directory where we do have permissions
if (source_dir := Path(source)).is_dir():
package = Package.from_build(source_dir, self.architecture)
cache_dir = self.repository.paths.cache_for(package.base)
shutil.copytree(source_dir, cache_dir) # copy package to store in caches
Sources.init(cache_dir) # we need to run init command in directory where we do have permissions
elif (source_dir := self.repository.paths.cache_for(source)).is_dir():
package = Package.from_build(source_dir, self.architecture)
else:
raise UnknownPackageError(source)
self.database.build_queue_insert(package)
@ -95,8 +112,11 @@ class ApplicationPackages(ApplicationProperties):
"""
dst = self.repository.paths.packages / Path(source).name # URL is path, is not it?
# timeout=None to suppress pylint warns. Also suppress bandit warnings
response = requests.get(source, stream=True, timeout=None) # nosec
response.raise_for_status()
try:
response = requests.get(source, stream=True, timeout=None) # nosec
response.raise_for_status()
except Exception:
raise UnknownPackageError(source)
with dst.open("wb") as local_file:
for chunk in response.iter_content(chunk_size=1024):

View File

@ -78,7 +78,9 @@ class Validate(Handler):
# create trigger loader instance
loader = TriggerLoader()
for trigger in loader.selected_triggers(configuration):
triggers = loader.selected_triggers(configuration) + loader.known_triggers(configuration)
for trigger in triggers:
try:
trigger_class = loader.load_trigger_class(trigger)
except ExtensionError:

View File

@ -125,6 +125,12 @@ class Sources(LazyLogging):
Sources._check_output("git", "init", "--initial-branch", instance.DEFAULT_BRANCH,
cwd=sources_dir, logger=instance.logger)
# extract local files...
files = ["PKGBUILD", ".SRCINFO"] + [str(path) for path in Package.local_files(sources_dir)]
instance.add(sources_dir, *files)
# ...and commit them
instance.commit(sources_dir, author="ahriman <ahriman@localhost>")
@staticmethod
def load(sources_dir: Path, package: Package, patches: list[PkgbuildPatch], paths: RepositoryPaths) -> None:
"""

View File

@ -99,6 +99,16 @@ class Configuration(configparser.RawConfigParser):
"""
return self.getpath("settings", "logging")
@property
def repository_name(self) -> str:
"""
repository name as defined by configuration
Returns:
str: repository name from configuration
"""
return self.get("repository", "name")
@property
def repository_paths(self) -> RepositoryPaths:
"""

View File

@ -163,6 +163,11 @@ CONFIGURATION_SCHEMA: ConfigurationSchema = {
"coerce": "list",
"schema": {"type": "string"},
},
"triggers_known": {
"type": "list",
"coerce": "list",
"schema": {"type": "string"},
},
"vcs_allowed_age": {
"type": "integer",
"coerce": "integer",

View File

@ -144,6 +144,24 @@ class Validator(RootValidator):
if constraint and url.scheme not in constraint:
self._error(field, f"Url {value} scheme must be one of {constraint}")
def _validate_path_is_absolute(self, constraint: bool, field: str, value: Path) -> None:
"""
check if path is absolute or not
Args:
constraint(bool): True in case if path must be absolute and False if it must be relative
field(str): field name to be checked
value(Path): value to be checked
Examples:
The rule's arguments are validated against this schema:
{"type": "boolean"}
"""
if constraint and not value.is_absolute():
self._error(field, f"Path {value} must be absolute")
if not constraint and value.is_absolute():
self._error(field, f"Path {value} must be relative")
def _validate_path_exists(self, constraint: bool, field: str, value: Path) -> None:
"""
check if paths exists
@ -159,3 +177,5 @@ class Validator(RootValidator):
"""
if constraint and not value.exists():
self._error(field, f"Path {value} must exist")
if not constraint and value.exists():
self._error(field, f"Path {value} must not exist")

View File

@ -194,6 +194,18 @@ class PasswordError(ValueError):
ValueError.__init__(self, f"Password error: {details}")
class PkgbuildGeneratorError(RuntimeError):
"""
exception class for support type triggers
"""
def __init__(self) -> None:
"""
default constructor
"""
RuntimeError.__init__(self, "Could not generate package")
class ReportError(RuntimeError):
"""
report generation exception

View File

@ -57,4 +57,4 @@ class HTML(Report, JinjaTemplate):
result(Result): build result
"""
html = self.make_html(Result(success=packages), self.template_path)
self.report_path.write_text(html)
self.report_path.write_text(html, encoding="utf8")

View File

@ -75,7 +75,7 @@ class JinjaTemplate:
# base template vars
self.homepage = configuration.get(section, "homepage", fallback=None)
self.name = configuration.get("repository", "name")
self.name = configuration.repository_name
self.sign_targets, self.default_pgp_key = GPG.sign_options(configuration)

View File

@ -67,7 +67,7 @@ class RepositoryProperties(LazyLogging):
self.configuration = configuration
self.database = database
self.name = configuration.get("repository", "name")
self.name = configuration.repository_name
self.vcs_allowed_age = configuration.getint("build", "vcs_allowed_age", fallback=0)
self.paths: RepositoryPaths = configuration.repository_paths # additional workaround for pycharm typing
@ -79,7 +79,7 @@ class RepositoryProperties(LazyLogging):
self.ignore_list = configuration.getlist("build", "ignore_packages", fallback=[])
self.pacman = Pacman(architecture, configuration, refresh_database=refresh_pacman_database)
self.sign = GPG(architecture, configuration)
self.sign = GPG(configuration)
self.repo = Repo(self.name, self.paths, self.sign.repository_sign_args)
self.reporter = Client.load(configuration, report=report)
self.triggers = TriggerLoader.load(architecture, configuration)

View File

@ -19,6 +19,7 @@
#
import requests
from collections.abc import Generator
from pathlib import Path
from ahriman.core.configuration import Configuration
@ -34,7 +35,6 @@ class GPG(LazyLogging):
Attributes:
DEFAULT_TIMEOUT(int): (class attribute) HTTP request timeout in seconds
architecture(str): repository architecture
configuration(Configuration): configuration instance
default_key(str | None): default PGP key ID to use
targets(set[SignSettings]): list of targets to sign (repository, package etc)
@ -43,15 +43,13 @@ class GPG(LazyLogging):
_check_output = check_output
DEFAULT_TIMEOUT = 30
def __init__(self, architecture: str, configuration: Configuration) -> None:
def __init__(self, configuration: Configuration) -> None:
"""
default constructor
Args:
architecture(str): repository architecture
configuration(Configuration): configuration instance
"""
self.architecture = architecture
self.configuration = configuration
self.targets, self.default_key = self.sign_options(configuration)
@ -128,6 +126,34 @@ class GPG(LazyLogging):
raise
return response.text
def key_export(self, key: str) -> str:
"""
export public key from stored keychain
Args:
key(str): key ID to export
Returns:
str: PGP key in .asc format
"""
return GPG._check_output("gpg", "--armor", "--no-emit-version", "--export", key, logger=self.logger)
def key_fingerprint(self, key: str) -> str:
"""
get full key fingerprint from short key id
Args:
key(str): key ID to lookup
Returns:
str: full PGP key fingerprint
"""
metadata = GPG._check_output("gpg", "--with-colons", "--fingerprint", key, logger=self.logger)
# fingerprint line will be like
# fpr:::::::::43A663569A07EE1E4ECC55CC7E3A4240CE3C45C2:
fingerprint = next(filter(lambda line: line[:3] == "fpr", metadata.splitlines()))
return fingerprint.split(":")[-2]
def key_import(self, server: str, key: str) -> None:
"""
import key to current user and sign it locally
@ -139,6 +165,21 @@ class GPG(LazyLogging):
key_body = self.key_download(server, key)
GPG._check_output("gpg", "--import", input_data=key_body, logger=self.logger)
def keys(self) -> list[str]:
"""
extract list of keys described in configuration
Returns:
list[str]: list of unique keys which are set in configuration
"""
def generator() -> Generator[str, None, None]:
if self.default_key is not None:
yield self.default_key
for _, value in filter(lambda pair: pair[0].startswith("key_"), self.configuration["sign"].items()):
yield value
return sorted(set(generator()))
def process(self, path: Path, key: str) -> list[Path]:
"""
gpg command wrapper

View File

@ -0,0 +1,21 @@
#
# Copyright (c) 2021-2023 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.core.support.keyring_trigger import KeyringTrigger
from ahriman.core.support.mirrorlist_trigger import MirrorlistTrigger

View File

@ -0,0 +1,114 @@
#
# Copyright (c) 2021-2023 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.core import context
from ahriman.core.configuration import Configuration
from ahriman.core.sign.gpg import GPG
from ahriman.core.support.package_creator import PackageCreator
from ahriman.core.support.pkgbuild.keyring_generator import KeyringGenerator
from ahriman.core.triggers import Trigger
from ahriman.models.context_key import ContextKey
class KeyringTrigger(Trigger):
"""
keyring generator trigger
Attributes:
targets(list[str]): git remote target list
"""
CONFIGURATION_SCHEMA = {
"keyring": {
"type": "dict",
"schema": {
"target": {
"type": "list",
"coerce": "list",
"schema": {"type": "string"},
},
},
},
"keyring_generator": {
"type": "dict",
"schema": {
"description": {
"type": "string",
},
"homepage": {
"type": "string",
},
"license": {
"type": "list",
"coerce": "list",
},
"package": {
"type": "string",
},
"packagers": {
"type": "list",
"coerce": "list",
},
"revoked": {
"type": "list",
"coerce": "list",
},
"trusted": {
"type": "list",
"coerce": "list",
},
},
},
}
def __init__(self, architecture: str, configuration: Configuration) -> None:
"""
default constructor
Args:
architecture(str): repository architecture
configuration(Configuration): configuration instance
"""
Trigger.__init__(self, architecture, configuration)
self.targets = self.configuration_sections(configuration)
@classmethod
def configuration_sections(cls, configuration: Configuration) -> list[str]:
"""
extract configuration sections from configuration
Args:
configuration(Configuration): configuration instance
Returns:
list[str]: read configuration sections belong to this trigger
"""
return configuration.getlist("keyring", "target", fallback=[])
def on_start(self) -> None:
"""
trigger action which will be called at the start of the application
"""
ctx = context.get()
sign = ctx.get(ContextKey("sign", GPG))
for target in self.targets:
generator = KeyringGenerator(sign, self.configuration, target)
runner = PackageCreator(self.configuration, generator)
runner.run()

View File

@ -0,0 +1,105 @@
#
# Copyright (c) 2021-2023 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from ahriman.core.configuration import Configuration
from ahriman.core.support.package_creator import PackageCreator
from ahriman.core.support.pkgbuild.mirrorlist_generator import MirrorlistGenerator
from ahriman.core.triggers import Trigger
class MirrorlistTrigger(Trigger):
"""
mirrorlist generator trigger
Attributes:
targets(list[str]): git remote target list
"""
CONFIGURATION_SCHEMA = {
"mirrorlist": {
"type": "dict",
"schema": {
"target": {
"type": "list",
"coerce": "list",
"schema": {"type": "string"},
},
},
},
"mirrorlist_generator": {
"type": "dict",
"schema": {
"description": {
"type": "string",
},
"homepage": {
"type": "string",
},
"license": {
"type": "list",
"coerce": "list",
},
"package": {
"type": "string",
},
"path": {
"type": "string",
"path_is_absolute": True,
},
"servers": {
"type": "list",
"coerce": "list",
"required": True,
},
},
},
}
def __init__(self, architecture: str, configuration: Configuration) -> None:
"""
default constructor
Args:
architecture(str): repository architecture
configuration(Configuration): configuration instance
"""
Trigger.__init__(self, architecture, configuration)
self.targets = self.configuration_sections(configuration)
@classmethod
def configuration_sections(cls, configuration: Configuration) -> list[str]:
"""
extract configuration sections from configuration
Args:
configuration(Configuration): configuration instance
Returns:
list[str]: read configuration sections belong to this trigger
"""
return configuration.getlist("mirrorlist", "target", fallback=[])
def on_start(self) -> None:
"""
trigger action which will be called at the start of the application
"""
for target in self.targets:
generator = MirrorlistGenerator(self.configuration, target)
runner = PackageCreator(self.configuration, generator)
runner.run()

View File

@ -0,0 +1,71 @@
#
# Copyright (c) 2021-2023 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import shutil
from ahriman.core import context
from ahriman.core.build_tools.sources import Sources
from ahriman.core.configuration import Configuration
from ahriman.core.database import SQLite
from ahriman.core.support.pkgbuild.pkgbuild_generator import PkgbuildGenerator
from ahriman.models.build_status import BuildStatus
from ahriman.models.context_key import ContextKey
from ahriman.models.package import Package
class PackageCreator:
"""
helper which creates packages based on pkgbuild generator
Attributes:
configuration(Configuration): configuration instance
generator(PkgbuildGenerator): PKGBUILD generator instance
"""
def __init__(self, configuration: Configuration, generator: PkgbuildGenerator) -> None:
"""
default constructor
Args:
configuration(Configuration): configuration instance
generator(PkgbuildGenerator): PKGBUILD generator instance
"""
self.configuration = configuration
self.generator = generator
def run(self) -> None:
"""
create new local package
"""
local_path = self.configuration.repository_paths.cache_for(self.generator.pkgname)
# clear old tree if any
shutil.rmtree(local_path, ignore_errors=True)
# create local tree
local_path.mkdir(mode=0o755, parents=True, exist_ok=True)
self.generator.write_pkgbuild(local_path)
Sources.init(local_path)
# register package
ctx = context.get()
database: SQLite = ctx.get(ContextKey("database", SQLite))
_, architecture = self.configuration.check_loaded()
package = Package.from_build(local_path, architecture)
database.package_update(package, BuildStatus())

View File

@ -0,0 +1,19 @@
#
# Copyright (c) 2021-2023 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

View File

@ -0,0 +1,194 @@
#
# Copyright (c) 2021-2023 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from collections.abc import Callable
from pathlib import Path
from ahriman.core.configuration import Configuration
from ahriman.core.exceptions import PkgbuildGeneratorError
from ahriman.core.sign.gpg import GPG
from ahriman.core.support.pkgbuild.pkgbuild_generator import PkgbuildGenerator
class KeyringGenerator(PkgbuildGenerator):
"""
generator for keyring PKGBUILD
Attributes:
sign(GPG): GPG wrapper instance
name(str): repository name
packagers(list[str]): list of packagers PGP keys
pkgbuild_license(list[str]): keyring package license
pkgbuild_pkgdesc(str): keyring package description
pkgbuild_pkgname(str): keyring package name
pkgbuild_url(str): keyring package home page
revoked(list[str]): list of revoked PGP keys
trusted(list[str]): lif of trusted PGP keys
"""
def __init__(self, sign: GPG, configuration: Configuration, section: str) -> None:
"""
default constructor
Args:
sign(GPG): GPG wrapper instance
configuration(Configuration): configuration instance
section(str): settings section name
"""
self.sign = sign
self.name = configuration.repository_name
# configuration fields
self.packagers = configuration.getlist(section, "packagers", fallback=sign.keys())
self.revoked = configuration.getlist(section, "revoked", fallback=[])
self.trusted = configuration.getlist(
section, "trusted", fallback=[sign.default_key] if sign.default_key is not None else [])
# pkgbuild description fields
self.pkgbuild_pkgname = configuration.get(section, "package", fallback=f"{self.name}-keyring")
self.pkgbuild_pkgdesc = configuration.get(section, "description", fallback=f"{self.name} PGP keyring")
self.pkgbuild_license = configuration.getlist(section, "license", fallback=["Unlicense"])
self.pkgbuild_url = configuration.get(section, "homepage", fallback="")
@property
def license(self) -> list[str]:
"""
package licenses list
Returns:
list[str]: package licenses as PKGBUILD property
"""
return self.pkgbuild_license
@property
def pkgdesc(self) -> str:
"""
package description
Returns:
str: package description as PKGBUILD property
"""
return self.pkgbuild_pkgdesc
@property
def pkgname(self) -> str:
"""
package name
Returns:
str: package name as PKGBUILD property
"""
return self.pkgbuild_pkgname
@property
def url(self) -> str:
"""
package upstream url
Returns:
str: package upstream url as PKGBUILD property
"""
return self.pkgbuild_url
def _generate_gpg(self, source_path: Path) -> None:
"""
generate GPG keychain
Args:
source_path(Path): destination of the file content
"""
with source_path.open("w") as source_file:
for key in sorted(set(self.trusted + self.packagers + self.revoked)):
public_key = self.sign.key_export(key)
source_file.write(public_key)
source_file.write("\n")
def _generate_revoked(self, source_path: Path) -> None:
"""
generate revoked PGP keys
Args:
source_path(Path): destination of the file content
"""
with source_path.open("w") as source_file:
for key in sorted(set(self.revoked)):
fingerprint = self.sign.key_fingerprint(key)
source_file.write(fingerprint)
source_file.write("\n")
def _generate_trusted(self, source_path: Path) -> None:
"""
generate trusted PGP keys
Args:
source_path(Path): destination of the file content
"""
if not self.trusted:
raise PkgbuildGeneratorError
with source_path.open("w") as source_file:
for key in sorted(set(self.trusted)):
fingerprint = self.sign.key_fingerprint(key)
source_file.write(fingerprint)
source_file.write(":4:\n")
def install(self) -> str | None:
"""
content of the install functions
Returns:
str | None: content of the install functions if any
"""
# copy-paste from archlinux-keyring
return f"""post_upgrade() {{
if usr/bin/pacman-key -l >/dev/null 2>&1; then
usr/bin/pacman-key --populate {self.name}
usr/bin/pacman-key --updatedb
fi
}}
post_install() {{
if [ -x usr/bin/pacman-key ]; then
post_upgrade
fi
}}"""
def package(self) -> str:
"""
package function generator
Returns:
str: package() function for PKGBUILD
"""
return f"""{{
install -Dm644 "{Path("$srcdir") / f"{self.name}.gpg"}" "{Path("$pkgdir") / "usr" / "share" / "pacman" / "keyrings" / f"{self.name}.gpg"}"
install -Dm644 "{Path("$srcdir") / f"{self.name}-revoked"}" "{Path("$pkgdir") / "usr" / "share" / "pacman" / "keyrings" / f"{self.name}-revoked"}"
install -Dm644 "{Path("$srcdir") / f"{self.name}-trusted"}" "{Path("$pkgdir") / "usr" / "share" / "pacman" / "keyrings" / f"{self.name}-trusted"}"
}}"""
def sources(self) -> dict[str, Callable[[Path], None]]:
"""
return list of sources for the package
Returns:
dict[str, Callable[[Path], None]]: map of source identifier (e.g. filename) to its generator function
"""
return {
f"{self.name}.gpg": self._generate_gpg,
f"{self.name}-revoked": self._generate_revoked,
f"{self.name}-trusted": self._generate_trusted,
}

View File

@ -0,0 +1,143 @@
#
# Copyright (c) 2021-2023 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from collections.abc import Callable
from pathlib import Path
from ahriman.core.configuration import Configuration
from ahriman.core.support.pkgbuild.pkgbuild_generator import PkgbuildGenerator
from ahriman.models.pkgbuild_patch import PkgbuildPatch
class MirrorlistGenerator(PkgbuildGenerator):
"""
generator for mirrorlist PKGBUILD
Attributes:
path(Path): path to mirrorlist relative to /
pkgbuild_license(list[str]): mirrorlist package license
pkgbuild_pkgdesc(str): mirrorlist package description
pkgbuild_pkgname(str): mirrorlist package name
pkgbuild_url(str): mirrorlist package home page
servers(list[str]): list of mirror servers
"""
def __init__(self, configuration: Configuration, section: str) -> None:
"""
default constructor
Args:
configuration(Configuration): configuration instance
section(str): settings section name
"""
name = configuration.repository_name
# configuration fields
self.servers = configuration.getlist(section, "servers")
self.path = configuration.getpath(section, "path", fallback=Path("/etc") / "pacman.d" / f"{name}-mirrorlist")
self.path = self.path.relative_to("/") # in pkgbuild we are always operating with relative to / path
# pkgbuild description fields
self.pkgbuild_pkgname = configuration.get(section, "package", fallback=f"{name}-mirrorlist")
self.pkgbuild_pkgdesc = configuration.get(
section, "description", fallback=f"{name} mirror list for use by pacman")
self.pkgbuild_license = configuration.getlist(section, "license", fallback=["Unlicense"])
self.pkgbuild_url = configuration.get(section, "homepage", fallback="")
@property
def license(self) -> list[str]:
"""
package licenses list
Returns:
list[str]: package licenses as PKGBUILD property
"""
return self.pkgbuild_license
@property
def pkgdesc(self) -> str:
"""
package description
Returns:
str: package description as PKGBUILD property
"""
return self.pkgbuild_pkgdesc
@property
def pkgname(self) -> str:
"""
package name
Returns:
str: package name as PKGBUILD property
"""
return self.pkgbuild_pkgname
@property
def url(self) -> str:
"""
package upstream url
Returns:
str: package upstream url as PKGBUILD property
"""
return self.pkgbuild_url
def _generate_mirrorlist(self, source_path: Path) -> None:
"""
generate mirrorlist file
Args:
source_path(Path): destination of the mirrorlist content
"""
content = "".join([f"Server = {server}\n" for server in self.servers])
source_path.write_text(content, encoding="utf8")
def package(self) -> str:
"""
package function generator
Returns:
str: package() function for PKGBUILD
"""
return f"""{{
install -Dm644 "{Path("$srcdir") / "mirrorlist"}" "{Path("$pkgdir") / self.path}"
}}"""
def patches(self) -> list[PkgbuildPatch]:
"""
list of additional PKGBUILD properties
Returns:
list[PkgbuildPatch]: list of patches which generate PKGBUILD content
"""
return [
PkgbuildPatch("backup", [str(self.path)]),
]
def sources(self) -> dict[str, Callable[[Path], None]]:
"""
return list of sources for the package
Returns:
dict[str, Callable[[Path], None]]: map of source identifier (e.g. filename) to its generator function
"""
return {
"mirrorlist": self._generate_mirrorlist,
}

View File

@ -0,0 +1,201 @@
#
# Copyright (c) 2021-2023 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import hashlib
import itertools
from collections.abc import Callable, Generator
from pathlib import Path
from ahriman.core.util import utcnow
from ahriman.models.pkgbuild_patch import PkgbuildPatch
class PkgbuildGenerator:
"""
main class for generating PKGBUILDs
Attributes:
PKGBUILD_STATIC_PROPERTIES(list[PkgbuildPatch]): (class attribute) list of default pkgbuild static properties
"""
PKGBUILD_STATIC_PROPERTIES = [
PkgbuildPatch("pkgrel", "1"),
PkgbuildPatch("arch", ["any"]),
]
@property
def license(self) -> list[str]:
"""
package licenses list
Returns:
list[str]: package licenses as PKGBUILD property
"""
return []
@property
def pkgdesc(self) -> str:
"""
package description
Returns:
str: package description as PKGBUILD property
Raises:
NotImplementedError: not implemented method
"""
raise NotImplementedError
@property
def pkgname(self) -> str:
"""
package name
Returns:
str: package name as PKGBUILD property
Raises:
NotImplementedError: not implemented method
"""
raise NotImplementedError
@property
def pkgver(self) -> str:
"""
package version
Returns:
str: package version as PKGBUILD property
"""
return utcnow().strftime("%Y%m%d")
@property
def url(self) -> str:
"""
package upstream url
Returns:
str: package upstream url as PKGBUILD property
"""
return ""
def install(self) -> str | None:
"""
content of the install functions
Returns:
str | None: content of the install functions if any
"""
def package(self) -> str:
"""
package function generator
Returns:
str: package() function for PKGBUILD
Raises:
NotImplementedError: not implemented method
"""
raise NotImplementedError
def patches(self) -> list[PkgbuildPatch]:
"""
list of additional PKGBUILD properties
Returns:
list[PkgbuildPatch]: list of patches which generate PKGBUILD content
"""
return []
def sources(self) -> dict[str, Callable[[Path], None]]:
"""
return list of sources for the package
Returns:
dict[str, Callable[[Path], None]]: map of source identifier (e.g. filename) to its generator function
"""
return {}
def write_install(self, source_dir: Path) -> list[PkgbuildPatch]:
"""
generate content of install file
Args:
source_dir(Path): path to directory in which sources must be generated
Returns:
list[PkgbuildPatch]: patch for the pkgbuild if install file exists and empty list otherwise
"""
content: str | None = self.install()
if content is None:
return []
source_path = source_dir / f"{self.pkgname}.install"
source_path.write_text(content)
return [PkgbuildPatch("install", source_path.name)]
def write_pkgbuild(self, source_dir: Path) -> None:
"""
generate PKGBUILD content to the specified path
Args:
source_dir(Path): path to directory in which sources must be generated
"""
patches = self.PKGBUILD_STATIC_PROPERTIES # default static properties...
patches.extend([
PkgbuildPatch("license", self.license),
PkgbuildPatch("pkgdesc", self.pkgdesc),
PkgbuildPatch("pkgname", self.pkgname),
PkgbuildPatch("pkgver", self.pkgver),
PkgbuildPatch("url", self.url),
]) # ...main properties as defined by derived class...
patches.extend(self.patches()) # ...optional properties as defined by derived class...
patches.extend(self.write_install(source_dir)) # ...install function...
patches.append(PkgbuildPatch("package()", self.package())) # ...package function...
patches.extend(self.write_sources(source_dir)) # ...and finally source files
for patch in patches:
patch.write(source_dir / "PKGBUILD")
def write_sources(self, source_dir: Path) -> list[PkgbuildPatch]:
"""
write sources and returns valid PKGBUILD properties for them
Args:
source_dir(Path): path to directory in which sources must be generated
Returns:
list[PkgbuildPatch]: list of patches to be applied to the PKGBUILD
"""
def sources_generator() -> Generator[tuple[str, str], None, None]:
for source, generator in sorted(self.sources().items()):
source_path = source_dir / source
generator(source_path)
with source_path.open("rb") as source_file:
source_hash = hashlib.sha512(source_file.read())
yield source, source_hash.hexdigest()
sources_iter, hashes_iter = itertools.tee(sources_generator())
return [
PkgbuildPatch("source", [source for source, _ in sources_iter]),
PkgbuildPatch("sha512sums", [sha512 for _, sha512 in hashes_iter]),
]

View File

@ -84,6 +84,20 @@ class TriggerLoader(LazyLogging):
return instance
@staticmethod
def known_triggers(configuration: Configuration) -> list[str]:
"""
read configuration and return list of known triggers. Unlike ``selected_triggers`` this option is used mainly
for configuration and validation and mentioned triggers are not being executed automatically
Args:
configuration(Configuration): configuration instance
Returns:
list[str]: list of registered, but not enabled, triggers
"""
return configuration.getlist("build", "triggers_known", fallback=[])
@staticmethod
def selected_triggers(configuration: Configuration) -> list[str]:
"""

View File

@ -17,6 +17,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# pylint: disable=too-many-lines
import datetime
import io
import itertools
@ -48,6 +49,8 @@ __all__ = [
"pretty_datetime",
"pretty_size",
"safe_filename",
"srcinfo_property",
"srcinfo_property_list",
"trim_package",
"utcnow",
"walk",
@ -117,8 +120,6 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
result: list[str] = []
for line in iter(get_io(process, "stdout").readline, ""):
line = line.strip()
if not line: # skip empty lines
continue
result.append(line)
log(line)
@ -133,7 +134,7 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
raise exception
raise subprocess.CalledProcessError(status_code, process.args)
return "\n".join(result)
return "\n".join(result).rstrip("\n") # remove newline at the end of any
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
@ -328,6 +329,47 @@ def safe_filename(source: str) -> str:
return re.sub(r"[^A-Za-z\d\-._~:\[\]@]", "-", source)
def srcinfo_property(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[str, Any], *,
default: Any = None) -> Any:
"""
extract property from SRCINFO. This method extracts property from package if this property is presented in
``package``. Otherwise, it looks for the same property in root srcinfo. If none found, the default value will be
returned
Args:
key(str): key to extract from srcinfo
srcinfo(dict[str, Any]): root structure of SRCINFO
package_srcinfo(dict[str, Any]): package specific SRCINFO
default(Any, optional): the default value for the specified key (Default value = None)
Returns:
Any: extracted value from SRCINFO
"""
return package_srcinfo.get(key) or srcinfo.get(key) or default
def srcinfo_property_list(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[str, Any], *,
architecture: str | None = None) -> list[Any]:
"""
extract list property from SRCINFO. Unlike ``srcinfo_property`` it supposes that default return value is always
empty list. If ``architecture`` is supplied, then it will try to lookup for architecture specific values and will
append it at the end of result
Args:
key(str): key to extract from srcinfo
srcinfo(dict[str, Any]): root structure of SRCINFO
package_srcinfo(dict[str, Any]): package specific SRCINFO
architecture(str | None, optional): package architecture if set (Default value = None)
Returns:
list[Any]: list of extracted properties from SRCINFO
"""
values: list[Any] = srcinfo_property(key, srcinfo, package_srcinfo, default=[])
if architecture is not None:
values.extend(srcinfo_property(f"{key}_{architecture}", srcinfo, package_srcinfo, default=[]))
return values
def trim_package(package_name: str) -> str:
"""
remove version bound and description from package name. Pacman allows to specify version bound (=, <=, >= etc) for

View File

@ -22,18 +22,19 @@ from __future__ import annotations
import copy
from collections.abc import Iterable
from collections.abc import Generator, Iterable
from dataclasses import asdict, dataclass
from pathlib import Path
from pyalpm import vercmp # type: ignore[import]
from srcinfo.parse import parse_srcinfo # type: ignore[import]
from typing import Any, Self
from urllib.parse import urlparse
from ahriman.core.alpm.pacman import Pacman
from ahriman.core.alpm.remote import AUR, Official, OfficialSyncdb
from ahriman.core.exceptions import PackageInfoError
from ahriman.core.log import LazyLogging
from ahriman.core.util import check_output, full_version, utcnow
from ahriman.core.util import check_output, full_version, srcinfo_property_list, utcnow
from ahriman.models.package_description import PackageDescription
from ahriman.models.package_source import PackageSource
from ahriman.models.remote_source import RemoteSource
@ -235,23 +236,25 @@ class Package(LazyLogging):
if errors:
raise PackageInfoError(errors)
def get_property(key: str, properties: dict[str, Any], default: Any) -> Any:
return properties.get(key) or srcinfo.get(key) or default
def get_list(key: str, properties: dict[str, Any]) -> Any:
return get_property(key, properties, []) + get_property(f"{key}_{architecture}", properties, [])
packages = {
package: PackageDescription(
depends=get_list("depends", properties),
make_depends=get_list("makedepends", properties),
opt_depends=get_list("optdepends", properties),
depends=srcinfo_property_list("depends", srcinfo, properties, architecture=architecture),
make_depends=srcinfo_property_list("makedepends", srcinfo, properties, architecture=architecture),
opt_depends=srcinfo_property_list("optdepends", srcinfo, properties, architecture=architecture),
)
for package, properties in srcinfo["packages"].items()
}
version = full_version(srcinfo.get("epoch"), srcinfo["pkgver"], srcinfo["pkgrel"])
return cls(base=srcinfo["pkgbase"], version=version, remote=None, packages=packages)
remote = RemoteSource(
git_url=path.absolute().as_uri(),
web_url="",
path=".",
branch="master",
source=PackageSource.Local,
)
return cls(base=srcinfo["pkgbase"], version=version, remote=remote, packages=packages)
@classmethod
def from_json(cls, dump: dict[str, Any]) -> Self:
@ -293,6 +296,41 @@ class Package(LazyLogging):
remote=remote,
packages={package.name: PackageDescription.from_aur(package)})
@staticmethod
def local_files(path: Path) -> Generator[Path, None, None]:
"""
extract list of local files
Args:
path(Path): path to package sources directory
Returns:
Generator[Path, None, None]: list of paths of files which belong to the package and distributed together
with this tarball. All paths are relative to the ``path``
"""
srcinfo_source = Package._check_output("makepkg", "--printsrcinfo", cwd=path)
srcinfo, errors = parse_srcinfo(srcinfo_source)
if errors:
raise PackageInfoError(errors)
# we could use arch property, but for consistency it is better to call special method
architectures = Package.supported_architectures(path)
for architecture in architectures:
for source in srcinfo_property_list("source", srcinfo, {}, architecture=architecture):
if "::" in source:
_, source = source.split("::", 1) # in case if filename is specified, remove it
if urlparse(source).scheme:
# basically file schema should use absolute path which is impossible if we are distributing
# files together with PKGBUILD. In this case we are going to skip it also
continue
yield Path(source)
if (install := srcinfo.get("install", None)) is not None:
yield Path(install)
@staticmethod
def supported_architectures(path: Path) -> set[str]:
"""