realworld fixes

* add method set_option to Configuration and also use it everywhere
* split CreateUser handler to additional read method
* check user duplicate on auth mapping read
* generate salt by using passlib instead of random.choice
* case-insensetive usernames
* update dependencies
* update configuration reference
* improve tests
This commit is contained in:
Evgenii Alekseev 2021-09-02 00:53:33 +03:00
parent b755088024
commit 21d74c7a03
24 changed files with 293 additions and 154 deletions

View File

@ -18,6 +18,23 @@ libalpm and AUR related configuration.
* `repositories` - list of pacman repositories, space separated list of strings, required. * `repositories` - list of pacman repositories, space separated list of strings, required.
* `root` - root for alpm library, string, required. * `root` - root for alpm library, string, required.
## `auth` group
Base authorization settings.
* `allowed_paths` - URI paths (exact match) which can be accessed without authorization, space separated list of strings, optional.
* `allowed_paths_groups` - URI paths prefixes which can be accessed without authorization, space separated list of strings, optional.
* `enabled` - enables web services authorization, boolean, optional, default `no`.
* `salt` - password hash salt, string, required in case if authorization enabled (automatically generated by `create-user` subcommand).
## `auth:*` groups
Authorization mapping. Group name must refer to user access level, i.e. it should be one of `auth:status` (internal API usage only), `auth:read` (read hidden pages), `auth:write` (everything is allowed).
Key is always username (case-insensitive), option value depends on authorization provider:
* `MappingAuth` (default) - reads salted password hashes from values, uses SHA512 in order to hash passwords. Password can be set by using `create-user` subcommand.
## `build:*` groups ## `build:*` groups
Build related configuration. Group name must refer to architecture, e.g. it should be `build:x86_64` for x86_64 architecture. Build related configuration. Group name must refer to architecture, e.g. it should be `build:x86_64` for x86_64 architecture.
@ -101,6 +118,9 @@ Group name must refer to architecture, e.g. it should be `s3:x86_64` for x86_64
Web server settings. If any of `host`/`port` is not set, web integration will be disabled. Group name must refer to architecture, e.g. it should be `web:x86_64` for x86_64 architecture. Web server settings. If any of `host`/`port` is not set, web integration will be disabled. Group name must refer to architecture, e.g. it should be `web:x86_64` for x86_64 architecture.
* `address` - optional address in form `proto://host:port` (`port` can be omitted in case of default `proto` ports), will be used instead of `http://{host}:{port}` in case if set, string, optional.
* `host` - host to bind, string, optional. * `host` - host to bind, string, optional.
* `password` - password to authorize in web service in order to update service status, string, required in case if authorization enabled.
* `port` - port to bind, int, optional. * `port` - port to bind, int, optional.
* `templates` - path to templates directory, string, required. * `templates` - path to templates directory, string, required.
* `username` - username to authorize in web service in order to update service status, string, required in case if authorization enabled.

View File

@ -7,8 +7,8 @@ pkgdesc="ArcHlinux ReposItory MANager"
arch=('any') arch=('any')
url="https://github.com/arcan1s/ahriman" url="https://github.com/arcan1s/ahriman"
license=('GPL3') license=('GPL3')
depends=('devtools' 'git' 'pyalpm' 'python-aur' 'python-srcinfo') depends=('devtools' 'git' 'pyalpm' 'python-aur' 'python-passlib' 'python-srcinfo')
makedepends=('python-argparse-manpage' 'python-pip') makedepends=('python-pip')
optdepends=('breezy: -bzr packages support' optdepends=('breezy: -bzr packages support'
'darcs: -darcs packages support' 'darcs: -darcs packages support'
'gnupg: package and repository sign' 'gnupg: package and repository sign'

View File

@ -3,6 +3,3 @@ test = pytest
[tool:pytest] [tool:pytest]
addopts = --cov=ahriman --cov-report term-missing:skip-covered --pspec addopts = --cov=ahriman --cov-report term-missing:skip-covered --pspec
[build_manpages]
manpages = man/ahriman.1:module=ahriman.application.ahriman:function=_parser

View File

@ -1,4 +1,3 @@
from build_manpages import build_manpages
from pathlib import Path from pathlib import Path
from setuptools import setup, find_packages from setuptools import setup, find_packages
from typing import Any, Dict from typing import Any, Dict
@ -31,6 +30,7 @@ setup(
], ],
install_requires=[ install_requires=[
"aur", "aur",
"passlib",
"pyalpm", "pyalpm",
"requests", "requests",
"srcinfo", "srcinfo",
@ -103,8 +103,4 @@ setup(
"passlib", "passlib",
], ],
}, },
cmdclass={
"build_manpages": build_manpages.build_manpages,
}
) )

View File

@ -18,10 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
import argparse import argparse
import configparser
import getpass import getpass
import random
import string
from pathlib import Path from pathlib import Path
from typing import Type from typing import Type
@ -46,32 +43,24 @@ class CreateUser(Handler):
""" """
salt = CreateUser.get_salt(configuration) salt = CreateUser.get_salt(configuration)
user = CreateUser.create_user(args, salt) user = CreateUser.create_user(args, salt)
CreateUser.create_configuration(user, salt, configuration.include) auth_configuration = CreateUser.get_auth_configuration(configuration.include)
CreateUser.create_configuration(auth_configuration, user, salt)
@staticmethod @staticmethod
def create_configuration(user: User, salt: str, include_path: Path) -> None: def create_configuration(configuration: Configuration, user: User, salt: str) -> None:
""" """
put new user to configuration put new user to configuration
:param configuration: configuration instance
:param user: user descriptor :param user: user descriptor
:param salt: password hash salt :param salt: password hash salt
:param include_path: path to directory with configuration includes
""" """
def set_option(section_name: str, name: str, value: str) -> None:
if section_name not in configuration.sections():
configuration.add_section(section_name)
configuration.set(section_name, name, value)
target = include_path / "auth.ini"
configuration = configparser.ConfigParser()
if target.is_file(): # load current configuration in case if it exists
configuration.read(target)
section = Configuration.section_name("auth", user.access.value) section = Configuration.section_name("auth", user.access.value)
set_option("auth", "salt", salt) configuration.set_option("auth", "salt", salt)
set_option(section, user.username, user.password) configuration.set_option(section, user.username, user.password)
with target.open("w") as ahriman_configuration: if configuration.path is None:
return
with configuration.path.open("w") as ahriman_configuration:
configuration.write(ahriman_configuration) configuration.write(ahriman_configuration)
@staticmethod @staticmethod
@ -85,9 +74,23 @@ class CreateUser(Handler):
user = User(args.username, args.password, args.role) user = User(args.username, args.password, args.role)
if user.password is None: if user.password is None:
user.password = getpass.getpass() user.password = getpass.getpass()
user.password = user.generate_password(user.password, salt) user.password = user.hash_password(user.password, salt)
return user return user
@staticmethod
def get_auth_configuration(include_path: Path) -> Configuration:
"""
create configuration instance
:param include_path: path to directory with configuration includes
:return: configuration instance. In case if there are local settings they will be loaded
"""
target = include_path / "auth.ini"
configuration = Configuration()
if target.is_file(): # load current configuration in case if it exists
configuration.load(target)
return configuration
@staticmethod @staticmethod
def get_salt(configuration: Configuration, salt_length: int = 20) -> str: def get_salt(configuration: Configuration, salt_length: int = 20) -> str:
""" """
@ -99,5 +102,4 @@ class CreateUser(Handler):
salt = configuration.get("auth", "salt", fallback=None) salt = configuration.get("auth", "salt", fallback=None)
if salt: if salt:
return salt return salt
return User.generate_password(salt_length)
return "".join(random.choices(string.ascii_letters + string.digits, k=salt_length))

View File

@ -18,7 +18,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
import argparse import argparse
import configparser
from pathlib import Path from pathlib import Path
from typing import Type from typing import Type
@ -79,25 +78,20 @@ class Setup(Handler):
:param repository: repository name :param repository: repository name
:param include_path: path to directory with configuration includes :param include_path: path to directory with configuration includes
""" """
configuration = configparser.ConfigParser() configuration = Configuration()
section = Configuration.section_name("build", architecture) section = Configuration.section_name("build", architecture)
configuration.add_section(section) configuration.set_option(section, "build_command", str(Setup.build_command(args.build_command, architecture)))
configuration.set(section, "build_command", str(Setup.build_command(args.build_command, architecture))) configuration.set_option("repository", "name", repository)
configuration.add_section("repository")
configuration.set("repository", "name", repository)
if args.sign_key is not None: if args.sign_key is not None:
section = Configuration.section_name("sign", architecture) section = Configuration.section_name("sign", architecture)
configuration.add_section(section) configuration.set_option(section, "target", " ".join([target.name.lower() for target in args.sign_target]))
configuration.set(section, "target", " ".join([target.name.lower() for target in args.sign_target])) configuration.set_option(section, "key", args.sign_key)
configuration.set(section, "key", args.sign_key)
if args.web_port is not None: if args.web_port is not None:
section = Configuration.section_name("web", architecture) section = Configuration.section_name("web", architecture)
configuration.add_section(section) configuration.set_option(section, "port", str(args.web_port))
configuration.set(section, "port", str(args.web_port))
target = include_path / "setup-overrides.ini" target = include_path / "setup-overrides.ini"
with target.open("w") as ahriman_configuration: with target.open("w") as ahriman_configuration:
@ -115,7 +109,7 @@ class Setup(Handler):
:param repository: repository name :param repository: repository name
:param paths: repository paths instance :param paths: repository paths instance
""" """
configuration = configparser.ConfigParser() configuration = Configuration()
# preserve case # preserve case
# stupid mypy thinks that it is impossible # stupid mypy thinks that it is impossible
configuration.optionxform = lambda key: key # type: ignore configuration.optionxform = lambda key: key # type: ignore
@ -125,17 +119,15 @@ class Setup(Handler):
configuration.read(source) configuration.read(source)
# set our architecture now # set our architecture now
configuration.set("options", "Architecture", architecture) configuration.set_option("options", "Architecture", architecture)
# add multilib # add multilib
if not no_multilib: if not no_multilib:
configuration.add_section("multilib") configuration.set_option("multilib", "Include", str(Setup.MIRRORLIST_PATH))
configuration.set("multilib", "Include", str(Setup.MIRRORLIST_PATH))
# add repository itself # add repository itself
configuration.add_section(repository) configuration.set_option(repository, "SigLevel", "Optional TrustAll") # we don't care
configuration.set(repository, "SigLevel", "Optional TrustAll") # we don't care configuration.set_option(repository, "Server", f"file://{paths.repository}")
configuration.set(repository, "Server", f"file://{paths.repository}")
target = source.parent / f"pacman-{prefix}-{architecture}.conf" target = source.parent / f"pacman-{prefix}-{architecture}.conf"
with target.open("w") as devtools_configuration: with target.open("w") as devtools_configuration:

View File

@ -47,7 +47,7 @@ class Auth:
self.allowed_paths.update(self.ALLOWED_PATHS) self.allowed_paths.update(self.ALLOWED_PATHS)
self.allowed_paths_groups = set(configuration.getlist("auth", "allowed_paths_groups")) self.allowed_paths_groups = set(configuration.getlist("auth", "allowed_paths_groups"))
self.allowed_paths_groups.update(self.ALLOWED_PATHS_GROUPS) self.allowed_paths_groups.update(self.ALLOWED_PATHS_GROUPS)
self.enabled = configuration.getboolean("web", "auth", fallback=False) self.enabled = configuration.getboolean("auth", "enabled", fallback=False)
@classmethod @classmethod
def load(cls: Type[Auth], configuration: Configuration) -> Auth: def load(cls: Type[Auth], configuration: Configuration) -> Auth:
@ -56,7 +56,7 @@ class Auth:
:param configuration: configuration instance :param configuration: configuration instance
:return: authorization module according to current settings :return: authorization module according to current settings
""" """
if configuration.getboolean("web", "auth", fallback=False): if configuration.getboolean("auth", "enabled", fallback=False):
from ahriman.core.auth.mapping_auth import MappingAuth from ahriman.core.auth.mapping_auth import MappingAuth
return MappingAuth(configuration) return MappingAuth(configuration)
return cls(configuration) return cls(configuration)

View File

@ -21,6 +21,7 @@ from typing import Dict, Optional
from ahriman.core.auth.auth import Auth from ahriman.core.auth.auth import Auth
from ahriman.core.configuration import Configuration from ahriman.core.configuration import Configuration
from ahriman.core.exceptions import DuplicateUser
from ahriman.models.user import User from ahriman.models.user import User
from ahriman.models.user_access import UserAccess from ahriman.models.user_access import UserAccess
@ -29,7 +30,7 @@ class MappingAuth(Auth):
""" """
user authorization based on mapping from configuration file user authorization based on mapping from configuration file
:ivar salt: random generated string to salt passwords :ivar salt: random generated string to salt passwords
:ivar users: map of username to its descriptor :ivar _users: map of username to its descriptor
""" """
def __init__(self, configuration: Configuration) -> None: def __init__(self, configuration: Configuration) -> None:
@ -39,7 +40,7 @@ class MappingAuth(Auth):
""" """
Auth.__init__(self, configuration) Auth.__init__(self, configuration)
self.salt = configuration.get("auth", "salt") self.salt = configuration.get("auth", "salt")
self.users = self.get_users(configuration) self._users = self.get_users(configuration)
@staticmethod @staticmethod
def get_users(configuration: Configuration) -> Dict[str, User]: def get_users(configuration: Configuration) -> Dict[str, User]:
@ -54,7 +55,10 @@ class MappingAuth(Auth):
if not configuration.has_section(section): if not configuration.has_section(section):
continue continue
for user, password in configuration[section].items(): for user, password in configuration[section].items():
users[user] = User(user, password, role) normalized_user = user.lower()
if normalized_user in users:
raise DuplicateUser(normalized_user)
users[normalized_user] = User(normalized_user, password, role)
return users return users
def check_credentials(self, username: Optional[str], password: Optional[str]) -> bool: def check_credentials(self, username: Optional[str], password: Optional[str]) -> bool:
@ -66,7 +70,17 @@ class MappingAuth(Auth):
""" """
if username is None or password is None: if username is None or password is None:
return False # invalid data supplied return False # invalid data supplied
return self.known_username(username) and self.users[username].check_credentials(password, self.salt) user = self.get_user(username)
return user is not None and user.check_credentials(password, self.salt)
def get_user(self, username: str) -> Optional[User]:
"""
retrieve user from in-memory mapping
:param username: username
:return: user descriptor if username is known and None otherwise
"""
normalized_user = username.lower()
return self._users.get(normalized_user)
def known_username(self, username: str) -> bool: def known_username(self, username: str) -> bool:
""" """
@ -74,7 +88,7 @@ class MappingAuth(Auth):
:param username: username :param username: username
:return: True in case if user is known and can be authorized and False otherwise :return: True in case if user is known and can be authorized and False otherwise
""" """
return username in self.users return self.get_user(username) is not None
def verify_access(self, username: str, required: UserAccess) -> bool: def verify_access(self, username: str, required: UserAccess) -> bool:
""" """
@ -83,4 +97,5 @@ class MappingAuth(Auth):
:param required: required access level :param required: required access level
:return: True in case if user is allowed to do this request and False otherwise :return: True in case if user is allowed to do this request and False otherwise
""" """
return self.known_username(username) and self.users[username].verify_access(required) user = self.get_user(username)
return user is not None and user.verify_access(required)

View File

@ -170,18 +170,27 @@ class Configuration(configparser.RawConfigParser):
:param architecture: repository architecture :param architecture: repository architecture
""" """
for section in self.ARCHITECTURE_SPECIFIC_SECTIONS: for section in self.ARCHITECTURE_SPECIFIC_SECTIONS:
if not self.has_section(section):
self.add_section(section) # add section if not exists
# get overrides # get overrides
specific = self.section_name(section, architecture) specific = self.section_name(section, architecture)
if self.has_section(specific): if self.has_section(specific):
# if there is no such section it means that there is no overrides for this arch # if there is no such section it means that there is no overrides for this arch
# but we anyway will have to delete sections for others archs # but we anyway will have to delete sections for others archs
for key, value in self[specific].items(): for key, value in self[specific].items():
self.set(section, key, value) self.set_option(section, key, value)
# remove any arch specific section # remove any arch specific section
for foreign in self.sections(): for foreign in self.sections():
# we would like to use lambda filter here, but pylint is too dumb # we would like to use lambda filter here, but pylint is too dumb
if not foreign.startswith(f"{section}:"): if not foreign.startswith(f"{section}:"):
continue continue
self.remove_section(foreign) self.remove_section(foreign)
def set_option(self, section: str, option: str, value: Optional[str]) -> None:
"""
set option. Unlike default `configparser.RawConfigParser.set` it also creates section if it does not exist
:param section: section name
:param option: option name
:param value: option value as string in parsable format
"""
if not self.has_section(section):
self.add_section(section)
self.set(section, option, value)

View File

@ -45,6 +45,19 @@ class DuplicateRun(Exception):
Exception.__init__(self, "Another application instance is run") Exception.__init__(self, "Another application instance is run")
class DuplicateUser(Exception):
"""
exception which will be thrown in case if there are two users with different settings
"""
def __init__(self, username: str) -> None:
"""
default constructor
:param username: username with duplicates
"""
Exception.__init__(self, f"Found duplicate user with username {username}")
class InitializeException(Exception): class InitializeException(Exception):
""" """
base service initialization exception base service initialization exception

View File

@ -21,6 +21,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Type from typing import Optional, Type
from passlib.pwd import genword as generate_password # type: ignore
from passlib.handlers.sha2_crypt import sha512_crypt # type: ignore from passlib.handlers.sha2_crypt import sha512_crypt # type: ignore
from ahriman.models.user_access import UserAccess from ahriman.models.user_access import UserAccess
@ -52,6 +53,16 @@ class User:
return None return None
return cls(username, password, UserAccess.Status) return cls(username, password, UserAccess.Status)
@staticmethod
def generate_password(length: int) -> str:
"""
generate password with specified length
:param length: password length
:return: random string which contains letters and numbers
"""
password: str = generate_password(length=length)
return password
def check_credentials(self, password: str, salt: str) -> bool: def check_credentials(self, password: str, salt: str) -> bool:
""" """
validate user password validate user password
@ -62,7 +73,7 @@ class User:
verified: bool = self._HASHER.verify(password + salt, self.password) verified: bool = self._HASHER.verify(password + salt, self.password)
return verified return verified
def generate_password(self, password: str, salt: str) -> str: def hash_password(self, password: str, salt: str) -> str:
""" """
generate hashed password from plain text generate hashed password from plain text
:param password: entered password :param password: entered password

View File

@ -28,67 +28,60 @@ def test_run(args: argparse.Namespace, configuration: Configuration, mocker: Moc
must run command must run command
""" """
args = _default_args(args) args = _default_args(args)
get_auth_configuration_mock = mocker.patch("ahriman.application.handlers.CreateUser.get_auth_configuration")
create_configuration_mock = mocker.patch("ahriman.application.handlers.CreateUser.create_configuration") create_configuration_mock = mocker.patch("ahriman.application.handlers.CreateUser.create_configuration")
create_user = mocker.patch("ahriman.application.handlers.CreateUser.create_user") create_user = mocker.patch("ahriman.application.handlers.CreateUser.create_user")
get_salt_mock = mocker.patch("ahriman.application.handlers.CreateUser.get_salt") get_salt_mock = mocker.patch("ahriman.application.handlers.CreateUser.get_salt")
CreateUser.run(args, "x86_64", configuration) CreateUser.run(args, "x86_64", configuration)
get_auth_configuration_mock.assert_called_once()
create_configuration_mock.assert_called_once() create_configuration_mock.assert_called_once()
create_user.assert_called_once() create_user.assert_called_once()
get_salt_mock.assert_called_once() get_salt_mock.assert_called_once()
def test_create_configuration(user: User, mocker: MockerFixture) -> None: def test_create_configuration(configuration: Configuration, user: User, mocker: MockerFixture) -> None:
""" """
must correctly create configuration file must correctly create configuration file
""" """
section = Configuration.section_name("auth", user.access.value) section = Configuration.section_name("auth", user.access.value)
mocker.patch("pathlib.Path.open") mocker.patch("pathlib.Path.open")
add_section_mock = mocker.patch("configparser.RawConfigParser.add_section") set_mock = mocker.patch("ahriman.core.configuration.Configuration.set_option")
set_mock = mocker.patch("configparser.RawConfigParser.set") write_mock = mocker.patch("ahriman.core.configuration.Configuration.write")
write_mock = mocker.patch("configparser.RawConfigParser.write")
CreateUser.create_configuration(user, "salt", Path("path")) CreateUser.create_configuration(configuration, user, "salt")
write_mock.assert_called_once() write_mock.assert_called_once()
add_section_mock.assert_has_calls([mock.call("auth"), mock.call(section)])
set_mock.assert_has_calls([ set_mock.assert_has_calls([
mock.call("auth", "salt", pytest.helpers.anyvar(str)), mock.call("auth", "salt", pytest.helpers.anyvar(str)),
mock.call(section, user.username, user.password) mock.call(section, user.username, user.password)
]) ])
def test_create_configuration_not_loaded(configuration: Configuration, user: User, mocker: MockerFixture) -> None:
"""
must do nothing in case if configuration is not loaded
"""
configuration.path = None
mocker.patch("pathlib.Path.open")
write_mock = mocker.patch("ahriman.core.configuration.Configuration.write")
CreateUser.create_configuration(configuration, user, "salt")
write_mock.assert_not_called()
def test_create_configuration_user_exists(configuration: Configuration, user: User, mocker: MockerFixture) -> None: def test_create_configuration_user_exists(configuration: Configuration, user: User, mocker: MockerFixture) -> None:
""" """
must correctly update configuration file if user already exists must correctly update configuration file if user already exists
""" """
section = Configuration.section_name("auth", user.access.value) section = Configuration.section_name("auth", user.access.value)
configuration.add_section(section) configuration.set_option(section, user.username, "")
configuration.set(section, user.username, "")
mocker.patch("pathlib.Path.open") mocker.patch("pathlib.Path.open")
mocker.patch("configparser.ConfigParser", return_value=configuration) mocker.patch("ahriman.core.configuration.Configuration.write")
mocker.patch("configparser.RawConfigParser.write")
add_section_mock = mocker.patch("configparser.RawConfigParser.add_section")
CreateUser.create_configuration(user, "salt", Path("path")) CreateUser.create_configuration(configuration, user, "salt")
add_section_mock.assert_not_called()
assert configuration.get(section, user.username) == user.password assert configuration.get(section, user.username) == user.password
def test_create_configuration_file_exists(user: User, mocker: MockerFixture) -> None:
"""
must correctly update configuration file if file already exists
"""
mocker.patch("pathlib.Path.open")
mocker.patch("pathlib.Path.is_file", return_value=True)
mocker.patch("configparser.RawConfigParser.write")
read_mock = mocker.patch("configparser.RawConfigParser.read")
CreateUser.create_configuration(user, "salt", Path("path"))
read_mock.assert_called_once()
def test_create_user(args: argparse.Namespace, user: User) -> None: def test_create_user(args: argparse.Namespace, user: User) -> None:
""" """
must create user must create user
@ -130,3 +123,22 @@ def test_get_salt_generate(configuration: Configuration) -> None:
salt = CreateUser.get_salt(configuration, 16) salt = CreateUser.get_salt(configuration, 16)
assert salt assert salt
assert len(salt) == 16 assert len(salt) == 16
def test_get_auth_configuration() -> None:
"""
must load empty configuration
"""
assert CreateUser.get_auth_configuration(Path("path"))
def test_get_auth_configuration_exists(mocker: MockerFixture) -> None:
"""
must load configuration from filesystem
"""
mocker.patch("pathlib.Path.open")
mocker.patch("pathlib.Path.is_file", return_value=True)
read_mock = mocker.patch("ahriman.core.configuration.Configuration.read")
CreateUser.get_auth_configuration(Path("path"))
read_mock.assert_called_once()

View File

@ -62,19 +62,12 @@ def test_create_ahriman_configuration(args: argparse.Namespace, configuration: C
""" """
args = _default_args(args) args = _default_args(args)
mocker.patch("pathlib.Path.open") mocker.patch("pathlib.Path.open")
add_section_mock = mocker.patch("configparser.RawConfigParser.add_section") set_option_mock = mocker.patch("ahriman.core.configuration.Configuration.set_option")
set_mock = mocker.patch("configparser.RawConfigParser.set") write_mock = mocker.patch("ahriman.core.configuration.Configuration.write")
write_mock = mocker.patch("configparser.RawConfigParser.write")
command = Setup.build_command(args.build_command, "x86_64") command = Setup.build_command(args.build_command, "x86_64")
Setup.create_ahriman_configuration(args, "x86_64", args.repository, configuration.include) Setup.create_ahriman_configuration(args, "x86_64", args.repository, configuration.include)
add_section_mock.assert_has_calls([ set_option_mock.assert_has_calls([
mock.call(Configuration.section_name("build", "x86_64")),
mock.call("repository"),
mock.call(Configuration.section_name("sign", "x86_64")),
mock.call(Configuration.section_name("web", "x86_64")),
])
set_mock.assert_has_calls([
mock.call(Configuration.section_name("build", "x86_64"), "build_command", str(command)), mock.call(Configuration.section_name("build", "x86_64"), "build_command", str(command)),
mock.call("repository", "name", args.repository), mock.call("repository", "name", args.repository),
mock.call(Configuration.section_name("sign", "x86_64"), "target", mock.call(Configuration.section_name("sign", "x86_64"), "target",
@ -92,9 +85,9 @@ def test_create_devtools_configuration(args: argparse.Namespace, repository_path
""" """
args = _default_args(args) args = _default_args(args)
mocker.patch("pathlib.Path.open") mocker.patch("pathlib.Path.open")
mocker.patch("configparser.RawConfigParser.set") mocker.patch("ahriman.core.configuration.Configuration.set")
add_section_mock = mocker.patch("configparser.RawConfigParser.add_section") add_section_mock = mocker.patch("ahriman.core.configuration.Configuration.add_section")
write_mock = mocker.patch("configparser.RawConfigParser.write") write_mock = mocker.patch("ahriman.core.configuration.Configuration.write")
Setup.create_devtools_configuration(args.build_command, "x86_64", args.from_configuration, Setup.create_devtools_configuration(args.build_command, "x86_64", args.from_configuration,
args.no_multilib, args.repository, repository_paths) args.no_multilib, args.repository, repository_paths)
@ -112,13 +105,11 @@ def test_create_devtools_configuration_no_multilib(args: argparse.Namespace, rep
""" """
args = _default_args(args) args = _default_args(args)
mocker.patch("pathlib.Path.open") mocker.patch("pathlib.Path.open")
mocker.patch("configparser.RawConfigParser.set") mocker.patch("ahriman.core.configuration.Configuration.set")
add_section_mock = mocker.patch("configparser.RawConfigParser.add_section") write_mock = mocker.patch("ahriman.core.configuration.Configuration.write")
write_mock = mocker.patch("configparser.RawConfigParser.write")
Setup.create_devtools_configuration(args.build_command, "x86_64", args.from_configuration, Setup.create_devtools_configuration(args.build_command, "x86_64", args.from_configuration,
True, args.repository, repository_paths) True, args.repository, repository_paths)
add_section_mock.assert_called_once()
write_mock.assert_called_once() write_mock.assert_called_once()

View File

@ -10,5 +10,5 @@ def mapping_auth(configuration: Configuration) -> MappingAuth:
auth provider fixture auth provider fixture
:return: auth service instance :return: auth service instance
""" """
configuration.set("web", "auth", "yes") configuration.set_option("auth", "enabled", "yes")
return MappingAuth(configuration) return MappingAuth(configuration)

View File

@ -9,7 +9,7 @@ def test_load_dummy(configuration: Configuration) -> None:
""" """
must load dummy validator if authorization is not enabled must load dummy validator if authorization is not enabled
""" """
configuration.set("web", "auth", "no") configuration.set_option("auth", "enabled", "no")
auth = Auth.load(configuration) auth = Auth.load(configuration)
assert isinstance(auth, Auth) assert isinstance(auth, Auth)
@ -26,7 +26,7 @@ def test_load_mapping(configuration: Configuration) -> None:
""" """
must load mapping validator if option set must load mapping validator if option set
""" """
configuration.set("web", "auth", "yes") configuration.set_option("auth", "enabled", "yes")
auth = Auth.load(configuration) auth = Auth.load(configuration)
assert isinstance(auth, MappingAuth) assert isinstance(auth, MappingAuth)

View File

@ -1,5 +1,8 @@
import pytest
from ahriman.core.auth.mapping_auth import MappingAuth from ahriman.core.auth.mapping_auth import MappingAuth
from ahriman.core.configuration import Configuration from ahriman.core.configuration import Configuration
from ahriman.core.exceptions import DuplicateUser
from ahriman.models.user import User from ahriman.models.user import User
from ahriman.models.user_access import UserAccess from ahriman.models.user_access import UserAccess
@ -10,25 +13,53 @@ def test_get_users(mapping_auth: MappingAuth, configuration: Configuration) -> N
""" """
user_write = User("user_write", "pwd_write", UserAccess.Write) user_write = User("user_write", "pwd_write", UserAccess.Write)
write_section = Configuration.section_name("auth", user_write.access.value) write_section = Configuration.section_name("auth", user_write.access.value)
configuration.add_section(write_section) configuration.set_option(write_section, user_write.username, user_write.password)
configuration.set(write_section, user_write.username, user_write.password)
user_read = User("user_read", "pwd_read", UserAccess.Read) user_read = User("user_read", "pwd_read", UserAccess.Read)
read_section = Configuration.section_name("auth", user_read.access.value) read_section = Configuration.section_name("auth", user_read.access.value)
configuration.add_section(read_section) configuration.set_option(read_section, user_read.username, user_read.password)
configuration.set(read_section, user_read.username, user_read.password) user_read = User("user_read", "pwd_read", UserAccess.Read)
read_section = Configuration.section_name("auth", user_read.access.value)
configuration.set_option(read_section, user_read.username, user_read.password)
users = mapping_auth.get_users(configuration) users = mapping_auth.get_users(configuration)
expected = {user_write.username: user_write, user_read.username: user_read} expected = {user_write.username: user_write, user_read.username: user_read}
assert users == expected assert users == expected
def test_get_users_normalized(mapping_auth: MappingAuth, configuration: Configuration) -> None:
"""
must return user list with normalized usernames in keys
"""
user = User("UsEr", "pwd_read", UserAccess.Read)
read_section = Configuration.section_name("auth", user.access.value)
configuration.set_option(read_section, user.username, user.password)
users = mapping_auth.get_users(configuration)
expected = user.username.lower()
assert expected in users
assert users[expected].username == expected
def test_get_users_duplicate(mapping_auth: MappingAuth, configuration: Configuration, user: User) -> None:
"""
must raise exception on duplicate username
"""
write_section = Configuration.section_name("auth", UserAccess.Write.value)
configuration.set_option(write_section, user.username, user.password)
read_section = Configuration.section_name("auth", UserAccess.Read.value)
configuration.set_option(read_section, user.username, user.password)
with pytest.raises(DuplicateUser):
mapping_auth.get_users(configuration)
def test_check_credentials(mapping_auth: MappingAuth, user: User) -> None: def test_check_credentials(mapping_auth: MappingAuth, user: User) -> None:
""" """
must return true for valid credentials must return true for valid credentials
""" """
current_password = user.password current_password = user.password
user.password = user.generate_password(user.password, mapping_auth.salt) user.password = user.hash_password(user.password, mapping_auth.salt)
mapping_auth.users[user.username] = user mapping_auth._users[user.username] = user
assert mapping_auth.check_credentials(user.username, current_password) assert mapping_auth.check_credentials(user.username, current_password)
assert not mapping_auth.check_credentials(user.username, user.password) # here password is hashed so it is invalid assert not mapping_auth.check_credentials(user.username, user.password) # here password is hashed so it is invalid
@ -49,11 +80,34 @@ def test_check_credentials_unknown(mapping_auth: MappingAuth, user: User) -> Non
assert not mapping_auth.check_credentials(user.username, user.password) assert not mapping_auth.check_credentials(user.username, user.password)
def test_get_user(mapping_auth: MappingAuth, user: User) -> None:
"""
must return user from storage by username
"""
mapping_auth._users[user.username] = user
assert mapping_auth.get_user(user.username) == user
def test_get_user_normalized(mapping_auth: MappingAuth, user: User) -> None:
"""
must return user from storage by username case-insensitive
"""
mapping_auth._users[user.username] = user
assert mapping_auth.get_user(user.username.upper()) == user
def test_get_user_unknown(mapping_auth: MappingAuth, user: User) -> None:
"""
must return None in case if no user found
"""
assert mapping_auth.get_user(user.username) is None
def test_known_username(mapping_auth: MappingAuth, user: User) -> None: def test_known_username(mapping_auth: MappingAuth, user: User) -> None:
""" """
must allow only known users must allow only known users
""" """
mapping_auth.users[user.username] = user mapping_auth._users[user.username] = user
assert mapping_auth.known_username(user.username) assert mapping_auth.known_username(user.username)
assert not mapping_auth.known_username(user.password) assert not mapping_auth.known_username(user.password)
@ -62,6 +116,6 @@ def test_verify_access(mapping_auth: MappingAuth, user: User) -> None:
""" """
must verify user access must verify user access
""" """
mapping_auth.users[user.username] = user mapping_auth._users[user.username] = user
assert mapping_auth.verify_access(user.username, user.access) assert mapping_auth.verify_access(user.username, user.access)
assert not mapping_auth.verify_access(user.username, UserAccess.Write) assert not mapping_auth.verify_access(user.username, UserAccess.Write)

View File

@ -23,8 +23,8 @@ def test_send_auth(configuration: Configuration, mocker: MockerFixture) -> None:
""" """
must send an email with attachment with auth must send an email with attachment with auth
""" """
configuration.set("email", "user", "username") configuration.set_option("email", "user", "username")
configuration.set("email", "password", "password") configuration.set_option("email", "password", "password")
smtp_mock = mocker.patch("smtplib.SMTP") smtp_mock = mocker.patch("smtplib.SMTP")
report = Email("x86_64", configuration) report = Email("x86_64", configuration)
@ -36,7 +36,7 @@ def test_send_auth_no_password(configuration: Configuration, mocker: MockerFixtu
""" """
must send an email with attachment without auth if no password supplied must send an email with attachment without auth if no password supplied
""" """
configuration.set("email", "user", "username") configuration.set_option("email", "user", "username")
smtp_mock = mocker.patch("smtplib.SMTP") smtp_mock = mocker.patch("smtplib.SMTP")
report = Email("x86_64", configuration) report = Email("x86_64", configuration)
@ -48,7 +48,7 @@ def test_send_auth_no_user(configuration: Configuration, mocker: MockerFixture)
""" """
must send an email with attachment without auth if no user supplied must send an email with attachment without auth if no user supplied
""" """
configuration.set("email", "password", "password") configuration.set_option("email", "password", "password")
smtp_mock = mocker.patch("smtplib.SMTP") smtp_mock = mocker.patch("smtplib.SMTP")
report = Email("x86_64", configuration) report = Email("x86_64", configuration)
@ -60,7 +60,7 @@ def test_send_ssl_tls(configuration: Configuration, mocker: MockerFixture) -> No
""" """
must send an email with attachment with ssl/tls must send an email with attachment with ssl/tls
""" """
configuration.set("email", "ssl", "ssl") configuration.set_option("email", "ssl", "ssl")
smtp_mock = mocker.patch("smtplib.SMTP_SSL") smtp_mock = mocker.patch("smtplib.SMTP_SSL")
report = Email("x86_64", configuration) report = Email("x86_64", configuration)
@ -75,7 +75,7 @@ def test_send_starttls(configuration: Configuration, mocker: MockerFixture) -> N
""" """
must send an email with attachment with starttls must send an email with attachment with starttls
""" """
configuration.set("email", "ssl", "starttls") configuration.set_option("email", "ssl", "starttls")
smtp_mock = mocker.patch("smtplib.SMTP") smtp_mock = mocker.patch("smtplib.SMTP")
report = Email("x86_64", configuration) report = Email("x86_64", configuration)
@ -109,7 +109,7 @@ def test_generate_no_empty(configuration: Configuration, package_ahriman: Packag
""" """
must not generate report with built packages if no_empty_report is set must not generate report with built packages if no_empty_report is set
""" """
configuration.set("email", "no_empty_report", "yes") configuration.set_option("email", "no_empty_report", "yes")
send_mock = mocker.patch("ahriman.core.report.email.Email._send") send_mock = mocker.patch("ahriman.core.report.email.Email._send")
report = Email("x86_64", configuration) report = Email("x86_64", configuration)
@ -122,7 +122,7 @@ def test_generate_no_empty_with_built(configuration: Configuration, package_ahri
""" """
must generate report with built packages if no_empty_report is set must generate report with built packages if no_empty_report is set
""" """
configuration.set("email", "no_empty_report", "yes") configuration.set_option("email", "no_empty_report", "yes")
send_mock = mocker.patch("ahriman.core.report.email.Email._send") send_mock = mocker.patch("ahriman.core.report.email.Email._send")
report = Email("x86_64", configuration) report = Email("x86_64", configuration)

View File

@ -19,8 +19,8 @@ def test_load_full_client(configuration: Configuration) -> None:
""" """
must load full client if settings set must load full client if settings set
""" """
configuration.set("web", "host", "localhost") configuration.set_option("web", "host", "localhost")
configuration.set("web", "port", "8080") configuration.set_option("web", "port", "8080")
assert isinstance(Client.load(configuration), WebClient) assert isinstance(Client.load(configuration), WebClient)
@ -28,7 +28,7 @@ def test_load_full_client_from_address(configuration: Configuration) -> None:
""" """
must load full client if settings set must load full client if settings set
""" """
configuration.set("web", "address", "http://localhost:8080") configuration.set_option("web", "address", "http://localhost:8080")
assert isinstance(Client.load(configuration), WebClient) assert isinstance(Client.load(configuration), WebClient)

View File

@ -33,11 +33,11 @@ def test_parse_address(configuration: Configuration) -> None:
""" """
must extract address correctly must extract address correctly
""" """
configuration.set("web", "host", "localhost") configuration.set_option("web", "host", "localhost")
configuration.set("web", "port", "8080") configuration.set_option("web", "port", "8080")
assert WebClient.parse_address(configuration) == "http://localhost:8080" assert WebClient.parse_address(configuration) == "http://localhost:8080"
configuration.set("web", "address", "http://localhost:8081") configuration.set_option("web", "address", "http://localhost:8081")
assert WebClient.parse_address(configuration) == "http://localhost:8081" assert WebClient.parse_address(configuration) == "http://localhost:8081"

View File

@ -9,7 +9,7 @@ def test_from_path(mocker: MockerFixture) -> None:
""" """
must load configuration must load configuration
""" """
read_mock = mocker.patch("configparser.RawConfigParser.read") read_mock = mocker.patch("ahriman.core.configuration.Configuration.read")
load_includes_mock = mocker.patch("ahriman.core.configuration.Configuration.load_includes") load_includes_mock = mocker.patch("ahriman.core.configuration.Configuration.load_includes")
load_logging_mock = mocker.patch("ahriman.core.configuration.Configuration.load_logging") load_logging_mock = mocker.patch("ahriman.core.configuration.Configuration.load_logging")
path = Path("path") path = Path("path")
@ -33,7 +33,7 @@ def test_absolute_path_for_absolute(configuration: Configuration) -> None:
must not change path for absolute path in settings must not change path for absolute path in settings
""" """
path = Path("/a/b/c") path = Path("/a/b/c")
configuration.set("build", "path", str(path)) configuration.set_option("build", "path", str(path))
assert configuration.getpath("build", "path") == path assert configuration.getpath("build", "path") == path
@ -42,7 +42,7 @@ def test_absolute_path_for_relative(configuration: Configuration) -> None:
must prepend root path to relative path must prepend root path to relative path
""" """
path = Path("a") path = Path("a")
configuration.set("build", "path", str(path)) configuration.set_option("build", "path", str(path))
result = configuration.getpath("build", "path") result = configuration.getpath("build", "path")
assert result.is_absolute() assert result.is_absolute()
assert result.parent == configuration.path.parent assert result.parent == configuration.path.parent
@ -61,8 +61,7 @@ def test_dump_architecture_specific(configuration: Configuration) -> None:
dump must contain architecture specific settings dump must contain architecture specific settings
""" """
section = configuration.section_name("build", "x86_64") section = configuration.section_name("build", "x86_64")
configuration.add_section(section) configuration.set_option(section, "archbuild_flags", "hello flag")
configuration.set(section, "archbuild_flags", "hello flag")
configuration.merge_sections("x86_64") configuration.merge_sections("x86_64")
dump = configuration.dump() dump = configuration.dump()
@ -76,7 +75,7 @@ def test_getlist(configuration: Configuration) -> None:
""" """
must return list of string correctly must return list of string correctly
""" """
configuration.set("build", "test_list", "a b c") configuration.set_option("build", "test_list", "a b c")
assert configuration.getlist("build", "test_list") == ["a", "b", "c"] assert configuration.getlist("build", "test_list") == ["a", "b", "c"]
@ -85,7 +84,7 @@ def test_getlist_empty(configuration: Configuration) -> None:
must return list of string correctly for non-existing option must return list of string correctly for non-existing option
""" """
assert configuration.getlist("build", "test_list") == [] assert configuration.getlist("build", "test_list") == []
configuration.set("build", "test_list", "") configuration.set_option("build", "test_list", "")
assert configuration.getlist("build", "test_list") == [] assert configuration.getlist("build", "test_list") == []
@ -93,7 +92,7 @@ def test_getlist_single(configuration: Configuration) -> None:
""" """
must return list of strings for single string must return list of strings for single string
""" """
configuration.set("build", "test_list", "a") configuration.set_option("build", "test_list", "a")
assert configuration.getlist("build", "test_list") == ["a"] assert configuration.getlist("build", "test_list") == ["a"]
@ -101,7 +100,7 @@ def test_load_includes_missing(configuration: Configuration) -> None:
""" """
must not fail if not include directory found must not fail if not include directory found
""" """
configuration.set("settings", "include", "path") configuration.set_option("settings", "include", "path")
configuration.load_includes() configuration.load_includes()
@ -144,8 +143,23 @@ def test_merge_sections_missing(configuration: Configuration) -> None:
""" """
section = configuration.section_name("build", "x86_64") section = configuration.section_name("build", "x86_64")
configuration.remove_section("build") configuration.remove_section("build")
configuration.add_section(section) configuration.set_option(section, "key", "value")
configuration.set(section, "key", "value")
configuration.merge_sections("x86_64") configuration.merge_sections("x86_64")
assert configuration.get("build", "key") == "value" assert configuration.get("build", "key") == "value"
def test_set_option(configuration: Configuration) -> None:
"""
must set option correctly
"""
configuration.set_option("settings", "option", "value")
assert configuration.get("settings", "option") == "value"
def test_set_option_new_section(configuration: Configuration) -> None:
"""
must set option correctly even if no section found
"""
configuration.set_option("section", "option", "value")
assert configuration.get("section", "option") == "value"

View File

@ -21,17 +21,30 @@ def test_from_option_empty() -> None:
assert User.from_option(None, None) is None assert User.from_option(None, None) is None
def test_check_credentials_generate_password(user: User) -> None: def test_check_credentials_hash_password(user: User) -> None:
""" """
must generate and validate user password must generate and validate user password
""" """
current_password = user.password current_password = user.password
user.password = user.generate_password(current_password, "salt") user.password = user.hash_password(current_password, "salt")
assert user.check_credentials(current_password, "salt") assert user.check_credentials(current_password, "salt")
assert not user.check_credentials(current_password, "salt1") assert not user.check_credentials(current_password, "salt1")
assert not user.check_credentials(user.password, "salt") assert not user.check_credentials(user.password, "salt")
def test_generate_password() -> None:
"""
must generate password with specified length
"""
password = User.generate_password(16)
assert password
assert len(password) == 16
password = User.generate_password(42)
assert password
assert len(password) == 42
def test_verify_access_read(user: User) -> None: def test_verify_access_read(user: User) -> None:
""" """
user with read access must be able to only request read user with read access must be able to only request read

View File

@ -32,12 +32,12 @@ def application_with_auth(configuration: Configuration, user: User, mocker: Mock
:param mocker: mocker object :param mocker: mocker object
:return: application test instance :return: application test instance
""" """
configuration.set("web", "auth", "yes") configuration.set_option("auth", "enabled", "yes")
mocker.patch.object(ahriman.core.auth.helpers, "_has_aiohttp_security", True) mocker.patch.object(ahriman.core.auth.helpers, "_has_aiohttp_security", True)
mocker.patch("pathlib.Path.mkdir") mocker.patch("pathlib.Path.mkdir")
application = setup_service("x86_64", configuration) application = setup_service("x86_64", configuration)
generated = User(user.username, user.generate_password(user.password, application["validator"].salt), user.access) generated = User(user.username, user.hash_password(user.password, application["validator"].salt), user.access)
application["validator"].users[generated.username] = generated application["validator"]._users[generated.username] = generated
return application return application

View File

@ -25,8 +25,8 @@ def authorization_policy(configuration: Configuration, user: User) -> Authorizat
fixture for authorization policy fixture for authorization policy
:return: authorization policy fixture :return: authorization policy fixture
""" """
configuration.set("web", "auth", "yes") configuration.set_option("auth", "enabled", "yes")
validator = Auth.load(configuration) validator = Auth.load(configuration)
policy = AuthorizationPolicy(validator) policy = AuthorizationPolicy(validator)
policy.validator.users = {user.username: user} policy.validator._users = {user.username: user}
return policy return policy

View File

@ -35,7 +35,7 @@ def test_run(application: web.Application, mocker: MockerFixture) -> None:
must run application must run application
""" """
port = 8080 port = 8080
application["configuration"].set("web", "port", str(port)) application["configuration"].set_option("web", "port", str(port))
run_application_mock = mocker.patch("aiohttp.web.run_app") run_application_mock = mocker.patch("aiohttp.web.run_app")
run_server(application) run_server(application)
@ -48,7 +48,7 @@ def test_run_with_auth(application_with_auth: web.Application, mocker: MockerFix
must run application must run application
""" """
port = 8080 port = 8080
application_with_auth["configuration"].set("web", "port", str(port)) application_with_auth["configuration"].set_option("web", "port", str(port))
run_application_mock = mocker.patch("aiohttp.web.run_app") run_application_mock = mocker.patch("aiohttp.web.run_app")
run_server(application_with_auth) run_server(application_with_auth)