expose trigger configuration schema

Note that this commit contains the following breaking changes:

* remote pull and remote push triggers are now enabled by default (with
  empty target list)
* remote pull and remote push triggers now require target option to be
  set (old behaviour had fallback on `gitremote`)
* validation is now considered to be stable, so it is enabled by default
  in docker image (can be disabled however)
This commit is contained in:
2023-01-10 03:14:21 +02:00
parent b09aea13af
commit 5a05c8ce91
27 changed files with 717 additions and 448 deletions

View File

@@ -20,16 +20,15 @@
import argparse
import copy
from typing import Any, Callable, Dict, Optional, Type
from typing import Any, Dict, Type
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
from ahriman.core.configuration.schema import CONFIGURATION_SCHEMA, \
GITREMOTE_REMOTE_PULL_SCHEMA, GITREMOTE_REMOTE_PUSH_SCHEMA, \
REPORT_CONSOLE_SCHEMA, REPORT_EMAIL_SCHEMA, REPORT_HTML_SCHEMA, REPORT_TELEGRAM_SCHEMA,\
UPLOAD_GITHUB_SCHEMA, UPLOAD_RSYNC_SCHEMA, UPLOAD_S3_SCHEMA
from ahriman.core.configuration.schema import CONFIGURATION_SCHEMA, ConfigurationSchema
from ahriman.core.configuration.validator import Validator
from ahriman.core.exceptions import ExtensionError
from ahriman.core.formatters import ValidationPrinter
from ahriman.core.triggers import TriggerLoader
class Validate(Handler):
@@ -64,7 +63,7 @@ class Validate(Handler):
Validate.check_if_empty(args.exit_code, True)
@staticmethod
def schema(architecture: str, configuration: Configuration) -> Dict[str, Any]:
def schema(architecture: str, configuration: Configuration) -> ConfigurationSchema:
"""
get schema with triggers
@@ -73,45 +72,39 @@ class Validate(Handler):
configuration(Configuration): configuration instance
Returns:
Dict[str, Any]: configuration validation schema
ConfigurationSchema: configuration validation schema
"""
root = copy.deepcopy(CONFIGURATION_SCHEMA)
# that's actually bad design, but in order to validate built-in triggers we need to know which are set
Validate.schema_insert(architecture, configuration, root, "remote-pull", lambda _: GITREMOTE_REMOTE_PULL_SCHEMA)
Validate.schema_insert(architecture, configuration, root, "remote-push", lambda _: GITREMOTE_REMOTE_PUSH_SCHEMA)
# create trigger loader instance
loader = TriggerLoader()
for trigger in loader.selected_triggers(configuration):
try:
trigger_class = loader.load_trigger_class(trigger)
except ExtensionError:
continue
report_schemas = {
"console": REPORT_CONSOLE_SCHEMA,
"email": REPORT_EMAIL_SCHEMA,
"html": REPORT_HTML_SCHEMA,
"telegram": REPORT_TELEGRAM_SCHEMA,
}
for schema_name, schema in report_schemas.items():
root[schema_name] = Validate.schema_erase_required(copy.deepcopy(schema))
Validate.schema_insert(architecture, configuration, root, "report", report_schemas.get)
# default settings if any
for schema_name, schema in trigger_class.configuration_schema(architecture, None).items():
erased = Validate.schema_erase_required(copy.deepcopy(schema))
root[schema_name] = Validate.schema_merge(root.get(schema_name, {}), erased)
upload_schemas = {
"github": UPLOAD_GITHUB_SCHEMA,
"rsync": UPLOAD_RSYNC_SCHEMA,
"s3": UPLOAD_S3_SCHEMA,
}
for schema_name, schema in upload_schemas.items():
root[schema_name] = Validate.schema_erase_required(copy.deepcopy(schema))
Validate.schema_insert(architecture, configuration, root, "upload", upload_schemas.get)
# settings according to enabled triggers
for schema_name, schema in trigger_class.configuration_schema(architecture, configuration).items():
root[schema_name] = Validate.schema_merge(root.get(schema_name, {}), copy.deepcopy(schema))
return root
@staticmethod
def schema_erase_required(schema: Dict[str, Any]) -> Dict[str, Any]:
def schema_erase_required(schema: ConfigurationSchema) -> ConfigurationSchema:
"""
recursively remove required field from supplied cerberus schema
Args:
schema(Dict[str, Any]): source schema from which required field must be removed
schema(ConfigurationSchema): source schema from which required field must be removed
Returns:
Dict[str, Any]: schema without required fields
ConfigurationSchema: schema without required fields. Note, that source schema will be modified in-place
"""
schema.pop("required", None)
for value in filter(lambda v: isinstance(v, dict), schema.values()):
@@ -119,32 +112,24 @@ class Validate(Handler):
return schema
@staticmethod
def schema_insert(architecture: str, configuration: Configuration, root: Dict[str, Any], root_section: str,
schema_mapping: Callable[[str], Optional[Dict[str, Any]]]) -> Dict[str, Any]:
def schema_merge(source: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
"""
insert child schema into the root schema based on mapping rules
Notes:
Actually it is a bad design, because we are reading triggers configuration from parsers which (basically)
don't know anything about triggers. But in order to validate built-in triggers we need to know which are set
merge child schema into source. In case if source already contains values, new keys will be added
(possibly with overrides - in case if such key already set also)
Args:
architecture(str): repository architecture
configuration(Configuration): configuration instance
root(Dict[str, Any]): root schema in which child schema will be inserted
root_section(str): section name in root schema
schema_mapping(Callable[[str], Optional[Dict[str, Any]]]): extractor for child schema based on trigger type
source(Dict[str, Any]): source (current) schema into which will be merged
schema(Dict[str, Any]): new schema to be merged
Returns:
Dict[str, Any]: modified root schema. Note, however, that schema will be modified in place
Dict[str, Any]: schema with added elements from source schema if they were set before and not presented
in the new one. Note, that schema will be modified in-place
"""
if not configuration.has_section(root_section):
return root
for key, value in source.items():
if key not in schema:
schema[key] = value # new key found, just add it as is
elif isinstance(value, dict):
# value is dictionary, so we need to go deeper
Validate.schema_merge(value, schema[key])
targets = configuration.getlist(root_section, "target", fallback=[])
for target in targets:
section, schema_name = configuration.gettype(target, architecture)
if (schema := schema_mapping(schema_name)) is not None:
root[section] = copy.deepcopy(schema)
return root
return schema