ahriman/src/ahriman/application/handlers/validate.py

136 lines
5.2 KiB
Python

#
# Copyright (c) 2021-2023 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
import copy
from typing import Any, Dict, Type
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
from ahriman.core.configuration.schema import CONFIGURATION_SCHEMA, ConfigurationSchema
from ahriman.core.configuration.validator import Validator
from ahriman.core.exceptions import ExtensionError
from ahriman.core.formatters import ValidationPrinter
from ahriman.core.triggers import TriggerLoader
class Validate(Handler):
"""
configuration validator handler
"""
ALLOW_AUTO_ARCHITECTURE_RUN = False
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
Args:
args(argparse.Namespace): command line args
architecture(str): repository architecture
configuration(Configuration): configuration instance
report(bool): force enable or disable reporting
unsafe(bool): if set no user check will be performed before path creation
"""
schema = Validate.schema(architecture, configuration)
validator = Validator(configuration=configuration, schema=schema)
if validator.validate(configuration.dump()):
return # no errors found
for node, errors in validator.errors.items():
ValidationPrinter(node, errors).print(verbose=True)
# as we reach this part it means that we always have errors
Validate.check_if_empty(args.exit_code, True)
@staticmethod
def schema(architecture: str, configuration: Configuration) -> ConfigurationSchema:
"""
get schema with triggers
Args:
architecture(str): repository architecture
configuration(Configuration): configuration instance
Returns:
ConfigurationSchema: configuration validation schema
"""
root = copy.deepcopy(CONFIGURATION_SCHEMA)
# create trigger loader instance
loader = TriggerLoader()
for trigger in loader.selected_triggers(configuration):
try:
trigger_class = loader.load_trigger_class(trigger)
except ExtensionError:
continue
# default settings if any
for schema_name, schema in trigger_class.configuration_schema(architecture, None).items():
erased = Validate.schema_erase_required(copy.deepcopy(schema))
root[schema_name] = Validate.schema_merge(root.get(schema_name, {}), erased)
# settings according to enabled triggers
for schema_name, schema in trigger_class.configuration_schema(architecture, configuration).items():
root[schema_name] = Validate.schema_merge(root.get(schema_name, {}), copy.deepcopy(schema))
return root
@staticmethod
def schema_erase_required(schema: ConfigurationSchema) -> ConfigurationSchema:
"""
recursively remove required field from supplied cerberus schema
Args:
schema(ConfigurationSchema): source schema from which required field must be removed
Returns:
ConfigurationSchema: schema without required fields. Note, that source schema will be modified in-place
"""
schema.pop("required", None)
for value in filter(lambda v: isinstance(v, dict), schema.values()):
Validate.schema_erase_required(value)
return schema
@staticmethod
def schema_merge(source: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
"""
merge child schema into source. In case if source already contains values, new keys will be added
(possibly with overrides - in case if such key already set also)
Args:
source(Dict[str, Any]): source (current) schema into which will be merged
schema(Dict[str, Any]): new schema to be merged
Returns:
Dict[str, Any]: schema with added elements from source schema if they were set before and not presented
in the new one. Note, that schema will be modified in-place
"""
for key, value in source.items():
if key not in schema:
schema[key] = value # new key found, just add it as is
elif isinstance(value, dict):
# value is dictionary, so we need to go deeper
Validate.schema_merge(value, schema[key])
return schema