merge settings groups instead of using whole group

This commit is contained in:
2021-03-30 04:53:15 +03:00
parent bd2b61494f
commit 2260e52d5c
14 changed files with 222 additions and 101 deletions

View File

@ -54,11 +54,10 @@ class Task:
self.package = package
self.paths = paths
section = config.get_section_name("build", architecture)
self.archbuild_flags = config.getlist(section, "archbuild_flags")
self.build_command = config.get(section, "build_command")
self.makepkg_flags = config.getlist(section, "makepkg_flags")
self.makechrootpkg_flags = config.getlist(section, "makechrootpkg_flags")
self.archbuild_flags = config.wrap("build", architecture, "archbuild_flags", config.getlist)
self.build_command = config.wrap("build", architecture, "build_command", config.get)
self.makepkg_flags = config.wrap("build", architecture, "makepkg_flags", config.getlist)
self.makechrootpkg_flags = config.wrap("build", architecture, "makechrootpkg_flags", config.getlist)
@property
def cache_path(self) -> Path:

View File

@ -24,7 +24,10 @@ import logging
from logging.config import fileConfig
from pathlib import Path
from typing import Dict, List, Optional, Type
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar
T = TypeVar("T")
class Configuration(configparser.RawConfigParser):
@ -70,6 +73,16 @@ class Configuration(configparser.RawConfigParser):
config.load_logging(logfile)
return config
@staticmethod
def section_name(section: str, architecture: str) -> str:
"""
generate section name for architecture specific sections
:param section: section name
:param architecture: repository architecture
:return: correct section name for repository specific section
"""
return f"{section}_{architecture}"
def dump(self, architecture: str) -> Dict[str, Dict[str, str]]:
"""
dump configuration to dictionary
@ -81,11 +94,16 @@ class Configuration(configparser.RawConfigParser):
if not self.has_section(section):
continue
result[section] = dict(self[section])
for group in Configuration.ARCHITECTURE_SPECIFIC_SECTIONS:
section = self.get_section_name(group, architecture)
if not self.has_section(section):
continue
result[section] = dict(self[section])
for section in Configuration.ARCHITECTURE_SPECIFIC_SECTIONS:
# get global settings
settings = dict(self[section]) if self.has_section(section) else {}
# get overrides
specific = self.section_name(section, architecture)
specific_settings = dict(self[specific]) if self.has_section(specific) else {}
# merge
settings.update(specific_settings)
if settings: # append only in case if it is not empty
result[section] = settings
return result
@ -113,16 +131,6 @@ class Configuration(configparser.RawConfigParser):
return value
return self.path.parent / value
def get_section_name(self, prefix: str, suffix: str) -> str:
"""
check if there is `prefix`_`suffix` section and return it on success. Return `prefix` otherwise
:param prefix: section name prefix
:param suffix: section name suffix (e.g. architecture name)
:return: found section name
"""
probe = f"{prefix}_{suffix}"
return probe if self.has_section(probe) else prefix
def load(self, path: Path) -> None:
"""
fully load configuration
@ -163,3 +171,18 @@ class Configuration(configparser.RawConfigParser):
file_logger()
else:
console_logger()
def wrap(self, section: str, architecture: str, key: str, function: Callable[..., T], **kwargs: Any) -> T:
"""
wrapper to get option by either using architecture specific section or generic section
:param section: section name
:param architecture: repository architecture
:param key: key name
:param function: function to call, e.g. `Configuration.get`
:param kwargs: any other keywords which will be passed to function directly
:return: either value from architecture specific section or global value
"""
specific_section = self.section_name(section, architecture)
if self.has_option(specific_section, key):
return function(specific_section, key, **kwargs)
return function(section, key, **kwargs)

View File

@ -23,6 +23,7 @@ from typing import Callable, Dict, Iterable
from ahriman.core.configuration import Configuration
from ahriman.core.report.report import Report
from ahriman.core.sign.gpg import GPG
from ahriman.core.util import pretty_datetime, pretty_size
from ahriman.models.package import Package
from ahriman.models.sign_settings import SignSettings
@ -56,7 +57,7 @@ class HTML(Report):
:ivar homepage: homepage link if any (for footer)
:ivar link_path: prefix fo packages to download
:ivar name: repository name
:ivar pgp_key: default PGP key
:ivar default_pgp_key: default PGP key
:ivar report_path: output path to html report
:ivar sign_targets: targets to sign enabled in configuration
:ivar template_path: path to directory with jinja templates
@ -69,18 +70,15 @@ class HTML(Report):
:param config: configuration instance
"""
Report.__init__(self, architecture, config)
section = config.get_section_name("html", architecture)
self.report_path = config.getpath(section, "path")
self.link_path = config.get(section, "link_path")
self.template_path = config.getpath(section, "template_path")
self.report_path = config.wrap("html", architecture, "path", config.getpath)
self.link_path = config.wrap("html", architecture, "link_path", config.get)
self.template_path = config.wrap("html", architecture, "template_path", config.getpath)
# base template vars
self.homepage = config.get(section, "homepage", fallback=None)
self.homepage = config.wrap("html", architecture, "homepage", config.get, fallback=None)
self.name = config.get("repository", "name")
sign_section = config.get_section_name("sign", architecture)
self.sign_targets = [SignSettings.from_option(opt) for opt in config.getlist(sign_section, "target")]
self.pgp_key = config.get(sign_section, "key") if self.sign_targets else None
self.sign_targets, self.default_pgp_key = GPG.sign_options(architecture, config)
def generate(self, packages: Iterable[Package]) -> None:
"""
@ -115,7 +113,7 @@ class HTML(Report):
has_package_signed=SignSettings.SignPackages in self.sign_targets,
has_repo_signed=SignSettings.SignRepository in self.sign_targets,
packages=sorted(content, key=comparator),
pgp_key=self.pgp_key,
pgp_key=self.default_pgp_key,
repository=self.name)
self.report_path.write_text(html)

View File

@ -44,8 +44,7 @@ class UpdateHandler(Cleaner):
"""
result: List[Package] = []
build_section = self.config.get_section_name("build", self.architecture)
ignore_list = self.config.getlist(build_section, "ignore_packages")
ignore_list = self.config.wrap("build", self.architecture, "ignore_packages", self.config.getlist)
for local in self.packages():
if local.base in ignore_list:

View File

@ -20,7 +20,7 @@
import logging
from pathlib import Path
from typing import List
from typing import List, Optional, Set, Tuple
from ahriman.core.configuration import Configuration
from ahriman.core.exceptions import BuildFailed
@ -35,7 +35,7 @@ class GPG:
:ivar config: configuration instance
:ivar default_key: default PGP key ID to use
:ivar logger: class logger
:ivar target: list of targets to sign (repository, package etc)
:ivar targets: list of targets to sign (repository, package etc)
"""
_check_output = check_output
@ -47,22 +47,24 @@ class GPG:
:param config: configuration instance
"""
self.logger = logging.getLogger("build_details")
self.architecture = architecture
self.config = config
self.section = config.get_section_name("sign", architecture)
self.target = {SignSettings.from_option(opt) for opt in config.getlist(self.section, "target")}
self.default_key = config.get(self.section, "key") if self.target else ""
self.targets, self.default_key = self.sign_options(architecture, config)
@property
def repository_sign_args(self) -> List[str]:
"""
:return: command line arguments for repo-add command to sign database
"""
if SignSettings.SignRepository not in self.target:
if SignSettings.SignRepository not in self.targets:
return []
if self.default_key is None:
self.logger.error("no default key set, skip repository sign")
return []
return ["--sign", "--key", self.default_key]
@staticmethod
def sign_cmd(path: Path, key: str) -> List[str]:
def sign_command(path: Path, key: str) -> List[str]:
"""
gpg command to run
:param path: path to file to sign
@ -71,6 +73,21 @@ class GPG:
"""
return ["gpg", "-u", key, "-b", str(path)]
@staticmethod
def sign_options(architecture: str, config: Configuration) -> Tuple[Set[SignSettings], Optional[str]]:
"""
extract default sign options from configuration
:param architecture: repository architecture
:param config: configuration instance
:return: tuple of sign targets and default PGP key
"""
targets = {
SignSettings.from_option(option)
for option in config.wrap("sign", architecture, "targets", config.getlist)
}
default_key = config.wrap("sign", architecture, "key", config.get) if targets else None
return targets, default_key
def process(self, path: Path, key: str) -> List[Path]:
"""
gpg command wrapper
@ -79,7 +96,7 @@ class GPG:
:return: list of generated files including original file
"""
GPG._check_output(
*GPG.sign_cmd(path, key),
*GPG.sign_command(path, key),
exception=BuildFailed(path.name),
logger=self.logger)
return [path, path.parent / f"{path.name}.sig"]
@ -91,9 +108,13 @@ class GPG:
:param base: package base required to check for key overrides
:return: list of generated files including original file
"""
if SignSettings.SignPackages not in self.target:
if SignSettings.SignPackages not in self.targets:
return [path]
key = self.config.wrap("sign", self.architecture, f"key_{base}",
self.config.get, fallback=self.default_key)
if key is None:
self.logger.error(f"no default key set, skip package {path} sign")
return [path]
key = self.config.get(self.section, f"key_{base}", fallback=self.default_key)
return self.process(path, key)
def sign_repository(self, path: Path) -> List[Path]:
@ -103,6 +124,9 @@ class GPG:
:param path: path to repository database
:return: list of generated files including original file
"""
if SignSettings.SignRepository not in self.target:
if SignSettings.SignRepository not in self.targets:
return [path]
if self.default_key is None:
self.logger.error("no default key set, skip repository sign")
return [path]
return self.process(path, self.default_key)

View File

@ -118,9 +118,8 @@ class Client:
:param config: configuration instance
:return: client according to current settings
"""
section = config.get_section_name("web", architecture)
host = config.get(section, "host", fallback=None)
port = config.getint(section, "port", fallback=None)
host = config.wrap("web", architecture, "host", config.get, fallback=None)
port = config.wrap("web", architecture, "port", config.getint, fallback=None)
if host is None or port is None:
return Client()

View File

@ -39,9 +39,8 @@ class Rsync(Upload):
:param config: configuration instance
"""
Upload.__init__(self, architecture, config)
section = config.get_section_name("rsync", architecture)
self.command = config.getlist(section, "command")
self.remote = config.get(section, "remote")
self.command = config.wrap("rsync", architecture, "command", config.getlist)
self.remote = config.wrap("rsync", architecture, "remote", config.get)
def sync(self, path: Path) -> None:
"""

View File

@ -39,9 +39,8 @@ class S3(Upload):
:param config: configuration instance
"""
Upload.__init__(self, architecture, config)
section = config.get_section_name("s3", architecture)
self.bucket = config.get(section, "bucket")
self.command = config.getlist(section, "command")
self.bucket = config.wrap("s3", architecture, "bucket", config.get)
self.command = config.wrap("s3", architecture, "command", config.getlist)
def sync(self, path: Path) -> None:
"""

View File

@ -58,9 +58,10 @@ def run_server(application: web.Application) -> None:
"""
application.logger.info("start server")
section = application["config"].get_section_name("web", application["architecture"])
host = application["config"].get(section, "host")
port = application["config"].getint(section, "port")
architecture: str = application["architecture"]
config: Configuration = application["config"]
host = config.wrap("web", architecture, "host", config.get)
port = config.wrap("web", architecture, "port", config.getint)
web.run_app(application, host=host, port=port, handle_signals=False,
access_log=logging.getLogger("http"))