merge settings groups instead of using whole group

This commit is contained in:
Evgenii Alekseev 2021-03-30 04:53:15 +03:00
parent 8d7d508bb2
commit eb02e1e62b
14 changed files with 222 additions and 101 deletions

View File

@ -1,6 +1,6 @@
# ahriman configuration
Some groups can be specified for each architecture separately. E.g. if there are `build` and `build_x86_64` groups it will use the `build_x86_64` for the `x86_64` architecture and `build` for any other (architecture specific group has higher priority).
Some groups can be specified for each architecture separately. E.g. if there are `build` and `build_x86_64` groups it will use the option from `build_x86_64` for the `x86_64` architecture and `build` for any other (architecture specific group has higher priority). In case if both groups are presented, architecture specific options will be merged into global ones overriding them.
## `settings` group

View File

@ -54,11 +54,10 @@ class Task:
self.package = package
self.paths = paths
section = config.get_section_name("build", architecture)
self.archbuild_flags = config.getlist(section, "archbuild_flags")
self.build_command = config.get(section, "build_command")
self.makepkg_flags = config.getlist(section, "makepkg_flags")
self.makechrootpkg_flags = config.getlist(section, "makechrootpkg_flags")
self.archbuild_flags = config.wrap("build", architecture, "archbuild_flags", config.getlist)
self.build_command = config.wrap("build", architecture, "build_command", config.get)
self.makepkg_flags = config.wrap("build", architecture, "makepkg_flags", config.getlist)
self.makechrootpkg_flags = config.wrap("build", architecture, "makechrootpkg_flags", config.getlist)
@property
def cache_path(self) -> Path:

View File

@ -24,7 +24,10 @@ import logging
from logging.config import fileConfig
from pathlib import Path
from typing import Dict, List, Optional, Type
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar
T = TypeVar("T")
class Configuration(configparser.RawConfigParser):
@ -70,6 +73,16 @@ class Configuration(configparser.RawConfigParser):
config.load_logging(logfile)
return config
@staticmethod
def section_name(section: str, architecture: str) -> str:
"""
generate section name for architecture specific sections
:param section: section name
:param architecture: repository architecture
:return: correct section name for repository specific section
"""
return f"{section}_{architecture}"
def dump(self, architecture: str) -> Dict[str, Dict[str, str]]:
"""
dump configuration to dictionary
@ -81,11 +94,16 @@ class Configuration(configparser.RawConfigParser):
if not self.has_section(section):
continue
result[section] = dict(self[section])
for group in Configuration.ARCHITECTURE_SPECIFIC_SECTIONS:
section = self.get_section_name(group, architecture)
if not self.has_section(section):
continue
result[section] = dict(self[section])
for section in Configuration.ARCHITECTURE_SPECIFIC_SECTIONS:
# get global settings
settings = dict(self[section]) if self.has_section(section) else {}
# get overrides
specific = self.section_name(section, architecture)
specific_settings = dict(self[specific]) if self.has_section(specific) else {}
# merge
settings.update(specific_settings)
if settings: # append only in case if it is not empty
result[section] = settings
return result
@ -113,16 +131,6 @@ class Configuration(configparser.RawConfigParser):
return value
return self.path.parent / value
def get_section_name(self, prefix: str, suffix: str) -> str:
"""
check if there is `prefix`_`suffix` section and return it on success. Return `prefix` otherwise
:param prefix: section name prefix
:param suffix: section name suffix (e.g. architecture name)
:return: found section name
"""
probe = f"{prefix}_{suffix}"
return probe if self.has_section(probe) else prefix
def load(self, path: Path) -> None:
"""
fully load configuration
@ -163,3 +171,18 @@ class Configuration(configparser.RawConfigParser):
file_logger()
else:
console_logger()
def wrap(self, section: str, architecture: str, key: str, function: Callable[..., T], **kwargs: Any) -> T:
"""
wrapper to get option by either using architecture specific section or generic section
:param section: section name
:param architecture: repository architecture
:param key: key name
:param function: function to call, e.g. `Configuration.get`
:param kwargs: any other keywords which will be passed to function directly
:return: either value from architecture specific section or global value
"""
specific_section = self.section_name(section, architecture)
if self.has_option(specific_section, key):
return function(specific_section, key, **kwargs)
return function(section, key, **kwargs)

View File

@ -23,6 +23,7 @@ from typing import Callable, Dict, Iterable
from ahriman.core.configuration import Configuration
from ahriman.core.report.report import Report
from ahriman.core.sign.gpg import GPG
from ahriman.core.util import pretty_datetime, pretty_size
from ahriman.models.package import Package
from ahriman.models.sign_settings import SignSettings
@ -56,7 +57,7 @@ class HTML(Report):
:ivar homepage: homepage link if any (for footer)
:ivar link_path: prefix fo packages to download
:ivar name: repository name
:ivar pgp_key: default PGP key
:ivar default_pgp_key: default PGP key
:ivar report_path: output path to html report
:ivar sign_targets: targets to sign enabled in configuration
:ivar template_path: path to directory with jinja templates
@ -69,18 +70,15 @@ class HTML(Report):
:param config: configuration instance
"""
Report.__init__(self, architecture, config)
section = config.get_section_name("html", architecture)
self.report_path = config.getpath(section, "path")
self.link_path = config.get(section, "link_path")
self.template_path = config.getpath(section, "template_path")
self.report_path = config.wrap("html", architecture, "path", config.getpath)
self.link_path = config.wrap("html", architecture, "link_path", config.get)
self.template_path = config.wrap("html", architecture, "template_path", config.getpath)
# base template vars
self.homepage = config.get(section, "homepage", fallback=None)
self.homepage = config.wrap("html", architecture, "homepage", config.get, fallback=None)
self.name = config.get("repository", "name")
sign_section = config.get_section_name("sign", architecture)
self.sign_targets = [SignSettings.from_option(opt) for opt in config.getlist(sign_section, "target")]
self.pgp_key = config.get(sign_section, "key") if self.sign_targets else None
self.sign_targets, self.default_pgp_key = GPG.sign_options(architecture, config)
def generate(self, packages: Iterable[Package]) -> None:
"""
@ -115,7 +113,7 @@ class HTML(Report):
has_package_signed=SignSettings.SignPackages in self.sign_targets,
has_repo_signed=SignSettings.SignRepository in self.sign_targets,
packages=sorted(content, key=comparator),
pgp_key=self.pgp_key,
pgp_key=self.default_pgp_key,
repository=self.name)
self.report_path.write_text(html)

View File

@ -44,8 +44,7 @@ class UpdateHandler(Cleaner):
"""
result: List[Package] = []
build_section = self.config.get_section_name("build", self.architecture)
ignore_list = self.config.getlist(build_section, "ignore_packages")
ignore_list = self.config.wrap("build", self.architecture, "ignore_packages", self.config.getlist)
for local in self.packages():
if local.base in ignore_list:

View File

@ -20,7 +20,7 @@
import logging
from pathlib import Path
from typing import List
from typing import List, Optional, Set, Tuple
from ahriman.core.configuration import Configuration
from ahriman.core.exceptions import BuildFailed
@ -35,7 +35,7 @@ class GPG:
:ivar config: configuration instance
:ivar default_key: default PGP key ID to use
:ivar logger: class logger
:ivar target: list of targets to sign (repository, package etc)
:ivar targets: list of targets to sign (repository, package etc)
"""
_check_output = check_output
@ -47,22 +47,24 @@ class GPG:
:param config: configuration instance
"""
self.logger = logging.getLogger("build_details")
self.architecture = architecture
self.config = config
self.section = config.get_section_name("sign", architecture)
self.target = {SignSettings.from_option(opt) for opt in config.getlist(self.section, "target")}
self.default_key = config.get(self.section, "key") if self.target else ""
self.targets, self.default_key = self.sign_options(architecture, config)
@property
def repository_sign_args(self) -> List[str]:
"""
:return: command line arguments for repo-add command to sign database
"""
if SignSettings.SignRepository not in self.target:
if SignSettings.SignRepository not in self.targets:
return []
if self.default_key is None:
self.logger.error("no default key set, skip repository sign")
return []
return ["--sign", "--key", self.default_key]
@staticmethod
def sign_cmd(path: Path, key: str) -> List[str]:
def sign_command(path: Path, key: str) -> List[str]:
"""
gpg command to run
:param path: path to file to sign
@ -71,6 +73,21 @@ class GPG:
"""
return ["gpg", "-u", key, "-b", str(path)]
@staticmethod
def sign_options(architecture: str, config: Configuration) -> Tuple[Set[SignSettings], Optional[str]]:
"""
extract default sign options from configuration
:param architecture: repository architecture
:param config: configuration instance
:return: tuple of sign targets and default PGP key
"""
targets = {
SignSettings.from_option(option)
for option in config.wrap("sign", architecture, "targets", config.getlist)
}
default_key = config.wrap("sign", architecture, "key", config.get) if targets else None
return targets, default_key
def process(self, path: Path, key: str) -> List[Path]:
"""
gpg command wrapper
@ -79,7 +96,7 @@ class GPG:
:return: list of generated files including original file
"""
GPG._check_output(
*GPG.sign_cmd(path, key),
*GPG.sign_command(path, key),
exception=BuildFailed(path.name),
logger=self.logger)
return [path, path.parent / f"{path.name}.sig"]
@ -91,9 +108,13 @@ class GPG:
:param base: package base required to check for key overrides
:return: list of generated files including original file
"""
if SignSettings.SignPackages not in self.target:
if SignSettings.SignPackages not in self.targets:
return [path]
key = self.config.wrap("sign", self.architecture, f"key_{base}",
self.config.get, fallback=self.default_key)
if key is None:
self.logger.error(f"no default key set, skip package {path} sign")
return [path]
key = self.config.get(self.section, f"key_{base}", fallback=self.default_key)
return self.process(path, key)
def sign_repository(self, path: Path) -> List[Path]:
@ -103,6 +124,9 @@ class GPG:
:param path: path to repository database
:return: list of generated files including original file
"""
if SignSettings.SignRepository not in self.target:
if SignSettings.SignRepository not in self.targets:
return [path]
if self.default_key is None:
self.logger.error("no default key set, skip repository sign")
return [path]
return self.process(path, self.default_key)

View File

@ -118,9 +118,8 @@ class Client:
:param config: configuration instance
:return: client according to current settings
"""
section = config.get_section_name("web", architecture)
host = config.get(section, "host", fallback=None)
port = config.getint(section, "port", fallback=None)
host = config.wrap("web", architecture, "host", config.get, fallback=None)
port = config.wrap("web", architecture, "port", config.getint, fallback=None)
if host is None or port is None:
return Client()

View File

@ -39,9 +39,8 @@ class Rsync(Upload):
:param config: configuration instance
"""
Upload.__init__(self, architecture, config)
section = config.get_section_name("rsync", architecture)
self.command = config.getlist(section, "command")
self.remote = config.get(section, "remote")
self.command = config.wrap("rsync", architecture, "command", config.getlist)
self.remote = config.wrap("rsync", architecture, "remote", config.get)
def sync(self, path: Path) -> None:
"""

View File

@ -39,9 +39,8 @@ class S3(Upload):
:param config: configuration instance
"""
Upload.__init__(self, architecture, config)
section = config.get_section_name("s3", architecture)
self.bucket = config.get(section, "bucket")
self.command = config.getlist(section, "command")
self.bucket = config.wrap("s3", architecture, "bucket", config.get)
self.command = config.wrap("s3", architecture, "command", config.getlist)
def sync(self, path: Path) -> None:
"""

View File

@ -58,9 +58,10 @@ def run_server(application: web.Application) -> None:
"""
application.logger.info("start server")
section = application["config"].get_section_name("web", application["architecture"])
host = application["config"].get(section, "host")
port = application["config"].getint(section, "port")
architecture: str = application["architecture"]
config: Configuration = application["config"]
host = config.wrap("web", architecture, "host", config.get)
port = config.wrap("web", architecture, "port", config.getint)
web.run_app(application, host=host, port=port, handle_signals=False,
access_log=logging.getLogger("http"))

View File

@ -1,5 +1,4 @@
import pytest
import shutil
from pathlib import Path
from pytest_mock import MockerFixture

View File

@ -7,3 +7,9 @@ from ahriman.core.sign.gpg import GPG
@pytest.fixture
def gpg(configuration: Configuration) -> GPG:
return GPG("x86_64", configuration)
@pytest.fixture
def gpg_with_key(gpg: GPG) -> GPG:
gpg.default_key = "key"
return gpg

View File

@ -5,93 +5,177 @@ from ahriman.core.sign.gpg import GPG
from ahriman.models.sign_settings import SignSettings
def test_repository_sign_args(gpg: GPG) -> None:
def test_repository_sign_args_1(gpg_with_key: GPG) -> None:
"""
must generate correct sign args
"""
gpg.target = {SignSettings.SignRepository}
assert gpg.repository_sign_args
gpg_with_key.targets = {SignSettings.SignRepository}
assert gpg_with_key.repository_sign_args
def test_sign_package_1(gpg: GPG, mocker: MockerFixture) -> None:
def test_repository_sign_args_2(gpg_with_key: GPG) -> None:
"""
must generate correct sign args
"""
gpg_with_key.targets = {SignSettings.SignPackages, SignSettings.SignRepository}
assert gpg_with_key.repository_sign_args
def test_repository_sign_args_skip_1(gpg_with_key: GPG) -> None:
"""
must return empty args if it is not set
"""
gpg_with_key.targets = {}
assert not gpg_with_key.repository_sign_args
def test_repository_sign_args_skip_2(gpg_with_key: GPG) -> None:
"""
must return empty args if it is not set
"""
gpg_with_key.targets = {SignSettings.SignPackages}
assert not gpg_with_key.repository_sign_args
def test_repository_sign_args_skip_3(gpg: GPG) -> None:
"""
must return empty args if it is not set
"""
gpg.targets = {SignSettings.SignRepository}
assert not gpg.repository_sign_args
def test_repository_sign_args_skip_4(gpg: GPG) -> None:
"""
must return empty args if it is not set
"""
gpg.targets = {SignSettings.SignPackages, SignSettings.SignRepository}
assert not gpg.repository_sign_args
def test_sign_package_1(gpg_with_key: GPG, mocker: MockerFixture) -> None:
"""
must sign package
"""
result = [Path("a"), Path("a.sig")]
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process", return_value=result)
gpg.target = {SignSettings.SignPackages}
assert gpg.sign_package(Path("a"), "a") == result
gpg_with_key.targets = {SignSettings.SignPackages}
assert gpg_with_key.sign_package(Path("a"), "a") == result
process_mock.assert_called_once()
def test_sign_package_2(gpg: GPG, mocker: MockerFixture) -> None:
def test_sign_package_2(gpg_with_key: GPG, mocker: MockerFixture) -> None:
"""
must sign package
"""
result = [Path("a"), Path("a.sig")]
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process", return_value=result)
gpg.target = {SignSettings.SignPackages, SignSettings.SignRepository}
assert gpg.sign_package(Path("a"), "a") == result
gpg_with_key.targets = {SignSettings.SignPackages, SignSettings.SignRepository}
assert gpg_with_key.sign_package(Path("a"), "a") == result
process_mock.assert_called_once()
def test_sign_package_skip_1(gpg: GPG, mocker: MockerFixture) -> None:
def test_sign_package_skip_1(gpg_with_key: GPG, mocker: MockerFixture) -> None:
"""
must not sign package if it is not set
"""
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process")
gpg.target = {}
gpg_with_key.targets = {}
gpg_with_key.sign_package(Path("a"), "a")
process_mock.assert_not_called()
def test_sign_package_skip_2(gpg: GPG, mocker: MockerFixture) -> None:
def test_sign_package_skip_2(gpg_with_key: GPG, mocker: MockerFixture) -> None:
"""
must not sign package if it is not set
"""
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process")
gpg.target = {SignSettings.SignRepository}
gpg_with_key.targets = {SignSettings.SignRepository}
gpg_with_key.sign_package(Path("a"), "a")
process_mock.assert_not_called()
def test_sign_repository_1(gpg: GPG, mocker: MockerFixture) -> None:
def test_sign_package_skip_3(gpg: GPG, mocker: MockerFixture) -> None:
"""
must not sign package if it is not set
"""
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process")
gpg.targets = {SignSettings.SignPackages}
gpg.sign_package(Path("a"), "a")
process_mock.assert_not_called()
def test_sign_package_skip_4(gpg: GPG, mocker: MockerFixture) -> None:
"""
must not sign package if it is not set
"""
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process")
gpg.targets = {SignSettings.SignPackages, SignSettings.SignRepository}
gpg.sign_package(Path("a"), "a")
process_mock.assert_not_called()
def test_sign_repository_1(gpg_with_key: GPG, mocker: MockerFixture) -> None:
"""
must sign repository
"""
result = [Path("a"), Path("a.sig")]
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process", return_value=result)
gpg.target = {SignSettings.SignRepository}
assert gpg.sign_repository(Path("a")) == result
gpg_with_key.targets = {SignSettings.SignRepository}
assert gpg_with_key.sign_repository(Path("a")) == result
process_mock.assert_called_once()
def test_sign_repository_2(gpg: GPG, mocker: MockerFixture) -> None:
def test_sign_repository_2(gpg_with_key: GPG, mocker: MockerFixture) -> None:
"""
must sign repository
"""
result = [Path("a"), Path("a.sig")]
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process", return_value=result)
gpg.target = {SignSettings.SignPackages, SignSettings.SignRepository}
assert gpg.sign_repository(Path("a")) == result
gpg_with_key.targets = {SignSettings.SignPackages, SignSettings.SignRepository}
assert gpg_with_key.sign_repository(Path("a")) == result
process_mock.assert_called_once()
def test_sign_repository_skip_1(gpg: GPG, mocker: MockerFixture) -> None:
def test_sign_repository_skip_1(gpg_with_key: GPG, mocker: MockerFixture) -> None:
"""
must not sign repository if it is not set
"""
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process")
gpg.target = {}
gpg_with_key.targets = {}
gpg_with_key.sign_repository(Path("a"))
process_mock.assert_not_called()
def test_sign_repository_skip_2(gpg: GPG, mocker: MockerFixture) -> None:
def test_sign_repository_skip_2(gpg_with_key: GPG, mocker: MockerFixture) -> None:
"""
must not sign repository if it is not set
"""
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process")
gpg.target = {SignSettings.SignPackages}
gpg_with_key.targets = {SignSettings.SignPackages}
gpg_with_key.sign_repository(Path("a"))
process_mock.assert_not_called()
def test_sign_repository_skip_3(gpg: GPG, mocker: MockerFixture) -> None:
"""
must not sign repository if it is not set
"""
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process")
gpg.targets = {SignSettings.SignRepository}
gpg.sign_repository(Path("a"))
process_mock.assert_not_called()
def test_sign_repository_skip_4(gpg: GPG, mocker: MockerFixture) -> None:
"""
must not sign repository if it is not set
"""
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process")
gpg.targets = {SignSettings.SignPackages, SignSettings.SignRepository}
gpg.sign_repository(Path("a"))
process_mock.assert_not_called()

View File

@ -21,6 +21,13 @@ def test_from_path(mocker: MockerFixture) -> None:
load_logging_mock.assert_called_once()
def test_section_name(configuration: Configuration) -> None:
"""
must return architecture specific group
"""
assert configuration.section_name("build", "x86_64") == "build_x86_64"
def test_absolute_path_for_absolute(configuration: Configuration) -> None:
"""
must not change path for absolute path in settings
@ -54,12 +61,13 @@ def test_dump_architecture_specific(configuration: Configuration) -> None:
dump must contain architecture specific settings
"""
configuration.add_section("build_x86_64")
configuration.set("build_x86_64", "archbuild_flags", "")
configuration.set("build_x86_64", "archbuild_flags", "hello flag")
dump = configuration.dump("x86_64")
assert dump
assert "build" not in dump
assert "build_x86_64" in dump
assert "build" in dump
assert "build_x86_64" not in dump
assert dump["build"]["archbuild_flags"] == "hello flag"
def test_getlist(configuration: Configuration) -> None:
@ -87,23 +95,6 @@ def test_getlist_single(configuration: Configuration) -> None:
assert configuration.getlist("build", "test_list") == ["a"]
def test_get_section_name(configuration: Configuration) -> None:
"""
must return architecture specific group
"""
configuration.add_section("build_x86_64")
configuration.set("build_x86_64", "archbuild_flags", "")
assert configuration.get_section_name("build", "x86_64") == "build_x86_64"
def test_get_section_name_missing(configuration: Configuration) -> None:
"""
must return default group if architecture depending group does not exist
"""
assert configuration.get_section_name("prefix", "suffix") == "prefix"
assert configuration.get_section_name("build", "x86_64") == "build"
def test_load_includes_missing(configuration: Configuration) -> None:
"""
must not fail if not include directory found