mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-06-28 06:41:43 +00:00
Auth support (#25)
* initial auth implementation * add create user parser * add tests * update dependencies list * add login annd logout to index also improve auth * realworld fixes * add method set_option to Configuration and also use it everywhere * split CreateUser handler to additional read method * check user duplicate on auth mapping read * generate salt by using passlib instead of random.choice * case-insensetive usernames * update dependencies * update configuration reference * improve tests * fix codefactor errors * hide fields if authorization is enabled, but no auth supplied * add settings object for auth provider * readme update
This commit is contained in:
144
tests/ahriman/application/handlers/test_handler_create_user.py
Normal file
144
tests/ahriman/application/handlers/test_handler_create_user.py
Normal file
@ -0,0 +1,144 @@
|
||||
import argparse
|
||||
import pytest
|
||||
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from unittest import mock
|
||||
|
||||
from ahriman.application.handlers import CreateUser
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
|
||||
def _default_args(args: argparse.Namespace) -> argparse.Namespace:
|
||||
"""
|
||||
default arguments for these test cases
|
||||
:param args: command line arguments fixture
|
||||
:return: generated arguments for these test cases
|
||||
"""
|
||||
args.username = "user"
|
||||
args.password = "pa55w0rd"
|
||||
args.role = UserAccess.Status
|
||||
return args
|
||||
|
||||
|
||||
def test_run(args: argparse.Namespace, configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must run command
|
||||
"""
|
||||
args = _default_args(args)
|
||||
get_auth_configuration_mock = mocker.patch("ahriman.application.handlers.CreateUser.get_auth_configuration")
|
||||
create_configuration_mock = mocker.patch("ahriman.application.handlers.CreateUser.create_configuration")
|
||||
create_user = mocker.patch("ahriman.application.handlers.CreateUser.create_user")
|
||||
get_salt_mock = mocker.patch("ahriman.application.handlers.CreateUser.get_salt")
|
||||
|
||||
CreateUser.run(args, "x86_64", configuration)
|
||||
get_auth_configuration_mock.assert_called_once()
|
||||
create_configuration_mock.assert_called_once()
|
||||
create_user.assert_called_once()
|
||||
get_salt_mock.assert_called_once()
|
||||
|
||||
|
||||
def test_create_configuration(configuration: Configuration, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly create configuration file
|
||||
"""
|
||||
section = Configuration.section_name("auth", user.access.value)
|
||||
mocker.patch("pathlib.Path.open")
|
||||
set_mock = mocker.patch("ahriman.core.configuration.Configuration.set_option")
|
||||
write_mock = mocker.patch("ahriman.core.configuration.Configuration.write")
|
||||
|
||||
CreateUser.create_configuration(configuration, user, "salt")
|
||||
write_mock.assert_called_once()
|
||||
set_mock.assert_has_calls([
|
||||
mock.call("auth", "salt", pytest.helpers.anyvar(str)),
|
||||
mock.call(section, user.username, user.password)
|
||||
])
|
||||
|
||||
|
||||
def test_create_configuration_not_loaded(configuration: Configuration, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must do nothing in case if configuration is not loaded
|
||||
"""
|
||||
configuration.path = None
|
||||
mocker.patch("pathlib.Path.open")
|
||||
write_mock = mocker.patch("ahriman.core.configuration.Configuration.write")
|
||||
|
||||
CreateUser.create_configuration(configuration, user, "salt")
|
||||
write_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_create_configuration_user_exists(configuration: Configuration, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly update configuration file if user already exists
|
||||
"""
|
||||
section = Configuration.section_name("auth", user.access.value)
|
||||
configuration.set_option(section, user.username, "")
|
||||
mocker.patch("pathlib.Path.open")
|
||||
mocker.patch("ahriman.core.configuration.Configuration.write")
|
||||
|
||||
CreateUser.create_configuration(configuration, user, "salt")
|
||||
assert configuration.get(section, user.username) == user.password
|
||||
|
||||
|
||||
def test_create_user(args: argparse.Namespace, user: User) -> None:
|
||||
"""
|
||||
must create user
|
||||
"""
|
||||
args = _default_args(args)
|
||||
generated = CreateUser.create_user(args, "salt")
|
||||
assert generated.username == user.username
|
||||
assert generated.check_credentials(user.password, "salt")
|
||||
assert generated.access == user.access
|
||||
|
||||
|
||||
def test_create_user_getpass(args: argparse.Namespace, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must create user and get password from command line
|
||||
"""
|
||||
args = _default_args(args)
|
||||
args.password = None
|
||||
|
||||
getpass_mock = mocker.patch("getpass.getpass", return_value="password")
|
||||
generated = CreateUser.create_user(args, "salt")
|
||||
|
||||
getpass_mock.assert_called_once()
|
||||
assert generated.check_credentials("password", "salt")
|
||||
|
||||
|
||||
def test_get_salt_read(configuration: Configuration) -> None:
|
||||
"""
|
||||
must read salt from configuration
|
||||
"""
|
||||
assert CreateUser.get_salt(configuration) == "salt"
|
||||
|
||||
|
||||
def test_get_salt_generate(configuration: Configuration) -> None:
|
||||
"""
|
||||
must generate salt if it does not exist
|
||||
"""
|
||||
configuration.remove_option("auth", "salt")
|
||||
|
||||
salt = CreateUser.get_salt(configuration, 16)
|
||||
assert salt
|
||||
assert len(salt) == 16
|
||||
|
||||
|
||||
def test_get_auth_configuration() -> None:
|
||||
"""
|
||||
must load empty configuration
|
||||
"""
|
||||
assert CreateUser.get_auth_configuration(Path("path"))
|
||||
|
||||
|
||||
def test_get_auth_configuration_exists(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must load configuration from filesystem
|
||||
"""
|
||||
mocker.patch("pathlib.Path.open")
|
||||
mocker.patch("pathlib.Path.is_file", return_value=True)
|
||||
read_mock = mocker.patch("ahriman.core.configuration.Configuration.read")
|
||||
|
||||
CreateUser.get_auth_configuration(Path("path"))
|
||||
read_mock.assert_called_once()
|
@ -62,19 +62,12 @@ def test_create_ahriman_configuration(args: argparse.Namespace, configuration: C
|
||||
"""
|
||||
args = _default_args(args)
|
||||
mocker.patch("pathlib.Path.open")
|
||||
add_section_mock = mocker.patch("configparser.RawConfigParser.add_section")
|
||||
set_mock = mocker.patch("configparser.RawConfigParser.set")
|
||||
write_mock = mocker.patch("configparser.RawConfigParser.write")
|
||||
set_option_mock = mocker.patch("ahriman.core.configuration.Configuration.set_option")
|
||||
write_mock = mocker.patch("ahriman.core.configuration.Configuration.write")
|
||||
|
||||
command = Setup.build_command(args.build_command, "x86_64")
|
||||
Setup.create_ahriman_configuration(args, "x86_64", args.repository, configuration.include)
|
||||
add_section_mock.assert_has_calls([
|
||||
mock.call(Configuration.section_name("build", "x86_64")),
|
||||
mock.call("repository"),
|
||||
mock.call(Configuration.section_name("sign", "x86_64")),
|
||||
mock.call(Configuration.section_name("web", "x86_64")),
|
||||
])
|
||||
set_mock.assert_has_calls([
|
||||
set_option_mock.assert_has_calls([
|
||||
mock.call(Configuration.section_name("build", "x86_64"), "build_command", str(command)),
|
||||
mock.call("repository", "name", args.repository),
|
||||
mock.call(Configuration.section_name("sign", "x86_64"), "target",
|
||||
@ -92,9 +85,9 @@ def test_create_devtools_configuration(args: argparse.Namespace, repository_path
|
||||
"""
|
||||
args = _default_args(args)
|
||||
mocker.patch("pathlib.Path.open")
|
||||
mocker.patch("configparser.RawConfigParser.set")
|
||||
add_section_mock = mocker.patch("configparser.RawConfigParser.add_section")
|
||||
write_mock = mocker.patch("configparser.RawConfigParser.write")
|
||||
mocker.patch("ahriman.core.configuration.Configuration.set")
|
||||
add_section_mock = mocker.patch("ahriman.core.configuration.Configuration.add_section")
|
||||
write_mock = mocker.patch("ahriman.core.configuration.Configuration.write")
|
||||
|
||||
Setup.create_devtools_configuration(args.build_command, "x86_64", args.from_configuration,
|
||||
args.no_multilib, args.repository, repository_paths)
|
||||
@ -112,13 +105,11 @@ def test_create_devtools_configuration_no_multilib(args: argparse.Namespace, rep
|
||||
"""
|
||||
args = _default_args(args)
|
||||
mocker.patch("pathlib.Path.open")
|
||||
mocker.patch("configparser.RawConfigParser.set")
|
||||
add_section_mock = mocker.patch("configparser.RawConfigParser.add_section")
|
||||
write_mock = mocker.patch("configparser.RawConfigParser.write")
|
||||
mocker.patch("ahriman.core.configuration.Configuration.set")
|
||||
write_mock = mocker.patch("ahriman.core.configuration.Configuration.write")
|
||||
|
||||
Setup.create_devtools_configuration(args.build_command, "x86_64", args.from_configuration,
|
||||
True, args.repository, repository_paths)
|
||||
add_section_mock.assert_called_once()
|
||||
write_mock.assert_called_once()
|
||||
|
||||
|
||||
|
@ -6,6 +6,7 @@ from pytest_mock import MockerFixture
|
||||
from ahriman.application.handlers import Handler
|
||||
from ahriman.models.build_status import BuildStatusEnum
|
||||
from ahriman.models.sign_settings import SignSettings
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
|
||||
def test_parser(parser: argparse.ArgumentParser) -> None:
|
||||
@ -83,6 +84,28 @@ def test_subparsers_config(parser: argparse.ArgumentParser) -> None:
|
||||
assert args.unsafe
|
||||
|
||||
|
||||
def test_subparsers_create_user(parser: argparse.ArgumentParser) -> None:
|
||||
"""
|
||||
create-user command must imply architecture, lock, no-log, no-report and unsafe
|
||||
"""
|
||||
args = parser.parse_args(["create-user", "username"])
|
||||
assert args.architecture == [""]
|
||||
assert args.lock is None
|
||||
assert args.no_log
|
||||
assert args.no_report
|
||||
assert args.unsafe
|
||||
|
||||
|
||||
def test_subparsers_create_user_option_role(parser: argparse.ArgumentParser) -> None:
|
||||
"""
|
||||
create-user command must convert role option to useraccess instance
|
||||
"""
|
||||
args = parser.parse_args(["create-user", "username"])
|
||||
assert isinstance(args.role, UserAccess)
|
||||
args = parser.parse_args(["create-user", "username", "--role", "write"])
|
||||
assert isinstance(args.role, UserAccess)
|
||||
|
||||
|
||||
def test_subparsers_init(parser: argparse.ArgumentParser) -> None:
|
||||
"""
|
||||
init command must imply no_report
|
||||
|
@ -1,17 +1,17 @@
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any, Type, TypeVar
|
||||
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.status.watcher import Watcher
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.package_description import PackageDescription
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
@ -43,6 +43,15 @@ def anyvar(cls: Type[T], strict: bool = False) -> T:
|
||||
|
||||
|
||||
# generic fixtures
|
||||
@pytest.fixture
|
||||
def auth(configuration: Configuration) -> Auth:
|
||||
"""
|
||||
auth provider fixture
|
||||
:return: auth service instance
|
||||
"""
|
||||
return Auth(configuration)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def configuration(resource_path_root: Path) -> Configuration:
|
||||
"""
|
||||
@ -158,6 +167,15 @@ def repository_paths(configuration: Configuration) -> RepositoryPaths:
|
||||
root=configuration.getpath("repository", "root"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user() -> User:
|
||||
"""
|
||||
fixture for user descriptor
|
||||
:return: user descriptor instance
|
||||
"""
|
||||
return User("user", "pa55w0rd", UserAccess.Status)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def watcher(configuration: Configuration, mocker: MockerFixture) -> Watcher:
|
||||
"""
|
||||
|
13
tests/ahriman/core/auth/conftest.py
Normal file
13
tests/ahriman/core/auth/conftest.py
Normal file
@ -0,0 +1,13 @@
|
||||
import pytest
|
||||
|
||||
from ahriman.core.auth.mapping_auth import MappingAuth
|
||||
from ahriman.core.configuration import Configuration
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mapping_auth(configuration: Configuration) -> MappingAuth:
|
||||
"""
|
||||
auth provider fixture
|
||||
:return: auth service instance
|
||||
"""
|
||||
return MappingAuth(configuration)
|
81
tests/ahriman/core/auth/test_auth.py
Normal file
81
tests/ahriman/core/auth/test_auth.py
Normal file
@ -0,0 +1,81 @@
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.auth.mapping_auth import MappingAuth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
|
||||
def test_load_dummy(configuration: Configuration) -> None:
|
||||
"""
|
||||
must load dummy validator if authorization is not enabled
|
||||
"""
|
||||
configuration.set_option("auth", "target", "disabled")
|
||||
auth = Auth.load(configuration)
|
||||
assert isinstance(auth, Auth)
|
||||
|
||||
|
||||
def test_load_dummy_empty(configuration: Configuration) -> None:
|
||||
"""
|
||||
must load dummy validator if no option set
|
||||
"""
|
||||
auth = Auth.load(configuration)
|
||||
assert isinstance(auth, Auth)
|
||||
|
||||
|
||||
def test_load_mapping(configuration: Configuration) -> None:
|
||||
"""
|
||||
must load mapping validator if option set
|
||||
"""
|
||||
configuration.set_option("auth", "target", "configuration")
|
||||
auth = Auth.load(configuration)
|
||||
assert isinstance(auth, MappingAuth)
|
||||
|
||||
|
||||
def test_check_credentials(auth: Auth, user: User) -> None:
|
||||
"""
|
||||
must pass any credentials
|
||||
"""
|
||||
assert auth.check_credentials(user.username, user.password)
|
||||
assert auth.check_credentials(None, "")
|
||||
assert auth.check_credentials("", None)
|
||||
assert auth.check_credentials(None, None)
|
||||
|
||||
|
||||
def test_is_safe_request(auth: Auth) -> None:
|
||||
"""
|
||||
must validate safe request
|
||||
"""
|
||||
# login and logout are always safe
|
||||
assert auth.is_safe_request("/login")
|
||||
assert auth.is_safe_request("/logout")
|
||||
|
||||
auth.allowed_paths.add("/safe")
|
||||
auth.allowed_paths_groups.add("/unsafe/safe")
|
||||
|
||||
assert auth.is_safe_request("/safe")
|
||||
assert not auth.is_safe_request("/unsafe")
|
||||
assert auth.is_safe_request("/unsafe/safe")
|
||||
assert auth.is_safe_request("/unsafe/safe/suffix")
|
||||
|
||||
|
||||
def test_is_safe_request_empty(auth: Auth) -> None:
|
||||
"""
|
||||
must not allow requests without path
|
||||
"""
|
||||
assert not auth.is_safe_request(None)
|
||||
assert not auth.is_safe_request("")
|
||||
|
||||
|
||||
def test_known_username(auth: Auth, user: User) -> None:
|
||||
"""
|
||||
must allow any username
|
||||
"""
|
||||
assert auth.known_username(user.username)
|
||||
|
||||
|
||||
def test_verify_access(auth: Auth, user: User) -> None:
|
||||
"""
|
||||
must allow any access
|
||||
"""
|
||||
assert auth.verify_access(user.username, user.access, None)
|
||||
assert auth.verify_access(user.username, UserAccess.Write, None)
|
102
tests/ahriman/core/auth/test_helpers.py
Normal file
102
tests/ahriman/core/auth/test_helpers.py
Normal file
@ -0,0 +1,102 @@
|
||||
import importlib
|
||||
import sys
|
||||
|
||||
import ahriman.core.auth.helpers as helpers
|
||||
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
|
||||
def test_import_aiohttp_security() -> None:
|
||||
"""
|
||||
must import aiohttp_security correctly
|
||||
"""
|
||||
assert helpers._has_aiohttp_security
|
||||
|
||||
|
||||
def test_import_aiohttp_security_missing(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must set missing flag if no aiohttp_security module found
|
||||
"""
|
||||
mocker.patch.dict(sys.modules, {"aiohttp_security": None})
|
||||
importlib.reload(helpers)
|
||||
assert not helpers._has_aiohttp_security
|
||||
|
||||
|
||||
async def test_authorized_userid_dummy(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must not call authorized_userid from library if not enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", False)
|
||||
authorized_userid_mock = mocker.patch("aiohttp_security.authorized_userid")
|
||||
await helpers.authorized_userid()
|
||||
authorized_userid_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_authorized_userid_library(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call authorized_userid from library if enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", True)
|
||||
authorized_userid_mock = mocker.patch("aiohttp_security.authorized_userid")
|
||||
await helpers.authorized_userid()
|
||||
authorized_userid_mock.assert_called_once()
|
||||
|
||||
|
||||
async def test_check_authorized_dummy(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must not call check_authorized from library if not enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", False)
|
||||
check_authorized_mock = mocker.patch("aiohttp_security.check_authorized")
|
||||
await helpers.check_authorized()
|
||||
check_authorized_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_check_authorized_library(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call check_authorized from library if enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", True)
|
||||
check_authorized_mock = mocker.patch("aiohttp_security.check_authorized")
|
||||
await helpers.check_authorized()
|
||||
check_authorized_mock.assert_called_once()
|
||||
|
||||
|
||||
async def test_forget_dummy(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must not call forget from library if not enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", False)
|
||||
forget_mock = mocker.patch("aiohttp_security.forget")
|
||||
await helpers.forget()
|
||||
forget_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_forget_library(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call forget from library if enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", True)
|
||||
forget_mock = mocker.patch("aiohttp_security.forget")
|
||||
await helpers.forget()
|
||||
forget_mock.assert_called_once()
|
||||
|
||||
|
||||
async def test_remember_dummy(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must not call remember from library if not enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", False)
|
||||
remember_mock = mocker.patch("aiohttp_security.remember")
|
||||
await helpers.remember()
|
||||
remember_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_remember_library(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call remember from library if enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", True)
|
||||
remember_mock = mocker.patch("aiohttp_security.remember")
|
||||
await helpers.remember()
|
||||
remember_mock.assert_called_once()
|
121
tests/ahriman/core/auth/test_mapping_auth.py
Normal file
121
tests/ahriman/core/auth/test_mapping_auth.py
Normal file
@ -0,0 +1,121 @@
|
||||
import pytest
|
||||
|
||||
from ahriman.core.auth.mapping_auth import MappingAuth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.exceptions import DuplicateUser
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
|
||||
def test_get_users(mapping_auth: MappingAuth, configuration: Configuration) -> None:
|
||||
"""
|
||||
must return valid user list
|
||||
"""
|
||||
user_write = User("user_write", "pwd_write", UserAccess.Write)
|
||||
write_section = Configuration.section_name("auth", user_write.access.value)
|
||||
configuration.set_option(write_section, user_write.username, user_write.password)
|
||||
user_read = User("user_read", "pwd_read", UserAccess.Read)
|
||||
read_section = Configuration.section_name("auth", user_read.access.value)
|
||||
configuration.set_option(read_section, user_read.username, user_read.password)
|
||||
user_read = User("user_read", "pwd_read", UserAccess.Read)
|
||||
read_section = Configuration.section_name("auth", user_read.access.value)
|
||||
configuration.set_option(read_section, user_read.username, user_read.password)
|
||||
|
||||
users = mapping_auth.get_users(configuration)
|
||||
expected = {user_write.username: user_write, user_read.username: user_read}
|
||||
assert users == expected
|
||||
|
||||
|
||||
def test_get_users_normalized(mapping_auth: MappingAuth, configuration: Configuration) -> None:
|
||||
"""
|
||||
must return user list with normalized usernames in keys
|
||||
"""
|
||||
user = User("UsEr", "pwd_read", UserAccess.Read)
|
||||
read_section = Configuration.section_name("auth", user.access.value)
|
||||
configuration.set_option(read_section, user.username, user.password)
|
||||
|
||||
users = mapping_auth.get_users(configuration)
|
||||
expected = user.username.lower()
|
||||
assert expected in users
|
||||
assert users[expected].username == expected
|
||||
|
||||
|
||||
def test_get_users_duplicate(mapping_auth: MappingAuth, configuration: Configuration, user: User) -> None:
|
||||
"""
|
||||
must raise exception on duplicate username
|
||||
"""
|
||||
write_section = Configuration.section_name("auth", UserAccess.Write.value)
|
||||
configuration.set_option(write_section, user.username, user.password)
|
||||
read_section = Configuration.section_name("auth", UserAccess.Read.value)
|
||||
configuration.set_option(read_section, user.username, user.password)
|
||||
|
||||
with pytest.raises(DuplicateUser):
|
||||
mapping_auth.get_users(configuration)
|
||||
|
||||
|
||||
def test_check_credentials(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must return true for valid credentials
|
||||
"""
|
||||
current_password = user.password
|
||||
user.password = user.hash_password(user.password, mapping_auth.salt)
|
||||
mapping_auth._users[user.username] = user
|
||||
assert mapping_auth.check_credentials(user.username, current_password)
|
||||
assert not mapping_auth.check_credentials(user.username, user.password) # here password is hashed so it is invalid
|
||||
|
||||
|
||||
def test_check_credentials_empty(mapping_auth: MappingAuth) -> None:
|
||||
"""
|
||||
must reject on empty credentials
|
||||
"""
|
||||
assert not mapping_auth.check_credentials(None, "")
|
||||
assert not mapping_auth.check_credentials("", None)
|
||||
assert not mapping_auth.check_credentials(None, None)
|
||||
|
||||
|
||||
def test_check_credentials_unknown(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must reject on unknown user
|
||||
"""
|
||||
assert not mapping_auth.check_credentials(user.username, user.password)
|
||||
|
||||
|
||||
def test_get_user(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must return user from storage by username
|
||||
"""
|
||||
mapping_auth._users[user.username] = user
|
||||
assert mapping_auth.get_user(user.username) == user
|
||||
|
||||
|
||||
def test_get_user_normalized(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must return user from storage by username case-insensitive
|
||||
"""
|
||||
mapping_auth._users[user.username] = user
|
||||
assert mapping_auth.get_user(user.username.upper()) == user
|
||||
|
||||
|
||||
def test_get_user_unknown(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must return None in case if no user found
|
||||
"""
|
||||
assert mapping_auth.get_user(user.username) is None
|
||||
|
||||
|
||||
def test_known_username(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must allow only known users
|
||||
"""
|
||||
mapping_auth._users[user.username] = user
|
||||
assert mapping_auth.known_username(user.username)
|
||||
assert not mapping_auth.known_username(user.password)
|
||||
|
||||
|
||||
def test_verify_access(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must verify user access
|
||||
"""
|
||||
mapping_auth._users[user.username] = user
|
||||
assert mapping_auth.verify_access(user.username, user.access, None)
|
||||
assert not mapping_auth.verify_access(user.username, UserAccess.Write, None)
|
@ -2,6 +2,7 @@ import pytest
|
||||
|
||||
from ahriman.core.alpm.pacman import Pacman
|
||||
from ahriman.core.alpm.repo import Repo
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.build_tools.task import Task
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.tree import Leaf
|
||||
|
@ -23,8 +23,8 @@ def test_send_auth(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must send an email with attachment with auth
|
||||
"""
|
||||
configuration.set("email", "user", "username")
|
||||
configuration.set("email", "password", "password")
|
||||
configuration.set_option("email", "user", "username")
|
||||
configuration.set_option("email", "password", "password")
|
||||
smtp_mock = mocker.patch("smtplib.SMTP")
|
||||
|
||||
report = Email("x86_64", configuration)
|
||||
@ -36,7 +36,7 @@ def test_send_auth_no_password(configuration: Configuration, mocker: MockerFixtu
|
||||
"""
|
||||
must send an email with attachment without auth if no password supplied
|
||||
"""
|
||||
configuration.set("email", "user", "username")
|
||||
configuration.set_option("email", "user", "username")
|
||||
smtp_mock = mocker.patch("smtplib.SMTP")
|
||||
|
||||
report = Email("x86_64", configuration)
|
||||
@ -48,7 +48,7 @@ def test_send_auth_no_user(configuration: Configuration, mocker: MockerFixture)
|
||||
"""
|
||||
must send an email with attachment without auth if no user supplied
|
||||
"""
|
||||
configuration.set("email", "password", "password")
|
||||
configuration.set_option("email", "password", "password")
|
||||
smtp_mock = mocker.patch("smtplib.SMTP")
|
||||
|
||||
report = Email("x86_64", configuration)
|
||||
@ -60,7 +60,7 @@ def test_send_ssl_tls(configuration: Configuration, mocker: MockerFixture) -> No
|
||||
"""
|
||||
must send an email with attachment with ssl/tls
|
||||
"""
|
||||
configuration.set("email", "ssl", "ssl")
|
||||
configuration.set_option("email", "ssl", "ssl")
|
||||
smtp_mock = mocker.patch("smtplib.SMTP_SSL")
|
||||
|
||||
report = Email("x86_64", configuration)
|
||||
@ -75,7 +75,7 @@ def test_send_starttls(configuration: Configuration, mocker: MockerFixture) -> N
|
||||
"""
|
||||
must send an email with attachment with starttls
|
||||
"""
|
||||
configuration.set("email", "ssl", "starttls")
|
||||
configuration.set_option("email", "ssl", "starttls")
|
||||
smtp_mock = mocker.patch("smtplib.SMTP")
|
||||
|
||||
report = Email("x86_64", configuration)
|
||||
@ -109,7 +109,7 @@ def test_generate_no_empty(configuration: Configuration, package_ahriman: Packag
|
||||
"""
|
||||
must not generate report with built packages if no_empty_report is set
|
||||
"""
|
||||
configuration.set("email", "no_empty_report", "yes")
|
||||
configuration.set_option("email", "no_empty_report", "yes")
|
||||
send_mock = mocker.patch("ahriman.core.report.email.Email._send")
|
||||
|
||||
report = Email("x86_64", configuration)
|
||||
@ -122,7 +122,7 @@ def test_generate_no_empty_with_built(configuration: Configuration, package_ahri
|
||||
"""
|
||||
must generate report with built packages if no_empty_report is set
|
||||
"""
|
||||
configuration.set("email", "no_empty_report", "yes")
|
||||
configuration.set_option("email", "no_empty_report", "yes")
|
||||
send_mock = mocker.patch("ahriman.core.report.email.Email._send")
|
||||
|
||||
report = Email("x86_64", configuration)
|
||||
|
@ -2,6 +2,7 @@ import pytest
|
||||
|
||||
from typing import Any, Dict
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.status.client import Client
|
||||
from ahriman.core.status.web_client import WebClient
|
||||
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
|
||||
@ -40,9 +41,11 @@ def client() -> Client:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def web_client() -> WebClient:
|
||||
def web_client(configuration: Configuration) -> WebClient:
|
||||
"""
|
||||
fixture for web client
|
||||
:param configuration: configuration fixture
|
||||
:return: web client test instance
|
||||
"""
|
||||
return WebClient("localhost", 8080)
|
||||
configuration.set("web", "port", 8080)
|
||||
return WebClient(configuration)
|
||||
|
@ -17,10 +17,18 @@ def test_load_dummy_client(configuration: Configuration) -> None:
|
||||
|
||||
def test_load_full_client(configuration: Configuration) -> None:
|
||||
"""
|
||||
must load full client if no settings set
|
||||
must load full client if settings set
|
||||
"""
|
||||
configuration.set("web", "host", "localhost")
|
||||
configuration.set("web", "port", "8080")
|
||||
configuration.set_option("web", "host", "localhost")
|
||||
configuration.set_option("web", "port", "8080")
|
||||
assert isinstance(Client.load(configuration), WebClient)
|
||||
|
||||
|
||||
def test_load_full_client_from_address(configuration: Configuration) -> None:
|
||||
"""
|
||||
must load full client if settings set
|
||||
"""
|
||||
configuration.set_option("web", "address", "http://localhost:8080")
|
||||
assert isinstance(Client.load(configuration), WebClient)
|
||||
|
||||
|
||||
|
@ -5,41 +5,97 @@ import requests
|
||||
from pytest_mock import MockerFixture
|
||||
from requests import Response
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.status.web_client import WebClient
|
||||
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
|
||||
from ahriman.models.internal_status import InternalStatus
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.user import User
|
||||
|
||||
|
||||
def test_ahriman_url(web_client: WebClient) -> None:
|
||||
"""
|
||||
must generate service status url correctly
|
||||
"""
|
||||
assert web_client._ahriman_url().startswith(f"http://{web_client.host}:{web_client.port}")
|
||||
assert web_client._ahriman_url().endswith("/api/v1/ahriman")
|
||||
|
||||
|
||||
def test_package_url(web_client: WebClient, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must generate package status correctly
|
||||
"""
|
||||
assert web_client._package_url(package_ahriman.base).startswith(f"http://{web_client.host}:{web_client.port}")
|
||||
assert web_client._package_url(package_ahriman.base).endswith(f"/api/v1/packages/{package_ahriman.base}")
|
||||
assert web_client._ahriman_url.startswith(web_client.address)
|
||||
assert web_client._ahriman_url.endswith("/api/v1/ahriman")
|
||||
|
||||
|
||||
def test_status_url(web_client: WebClient) -> None:
|
||||
"""
|
||||
must generate service status url correctly
|
||||
"""
|
||||
assert web_client._status_url().startswith(f"http://{web_client.host}:{web_client.port}")
|
||||
assert web_client._status_url().endswith("/api/v1/status")
|
||||
assert web_client._status_url.startswith(web_client.address)
|
||||
assert web_client._status_url.endswith("/api/v1/status")
|
||||
|
||||
|
||||
def test_parse_address(configuration: Configuration) -> None:
|
||||
"""
|
||||
must extract address correctly
|
||||
"""
|
||||
configuration.set_option("web", "host", "localhost")
|
||||
configuration.set_option("web", "port", "8080")
|
||||
assert WebClient.parse_address(configuration) == "http://localhost:8080"
|
||||
|
||||
configuration.set_option("web", "address", "http://localhost:8081")
|
||||
assert WebClient.parse_address(configuration) == "http://localhost:8081"
|
||||
|
||||
|
||||
def test_login(web_client: WebClient, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must login user
|
||||
"""
|
||||
web_client.user = user
|
||||
requests_mock = mocker.patch("requests.Session.post")
|
||||
payload = {
|
||||
"username": user.username,
|
||||
"password": user.password
|
||||
}
|
||||
|
||||
web_client._login()
|
||||
requests_mock.assert_called_with(pytest.helpers.anyvar(str, True), json=payload)
|
||||
|
||||
|
||||
def test_login_failed(web_client: WebClient, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must suppress any exception happened during login
|
||||
"""
|
||||
web_client.user = user
|
||||
mocker.patch("requests.Session.post", side_effect=Exception())
|
||||
web_client._login()
|
||||
|
||||
|
||||
def test_login_failed_http_error(web_client: WebClient, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must suppress any exception happened during login
|
||||
"""
|
||||
web_client.user = user
|
||||
mocker.patch("requests.Session.post", side_effect=requests.exceptions.HTTPError())
|
||||
web_client._login()
|
||||
|
||||
|
||||
def test_login_skip(web_client: WebClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must skip login if no user set
|
||||
"""
|
||||
requests_mock = mocker.patch("requests.Session.post")
|
||||
web_client._login()
|
||||
requests_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_package_url(web_client: WebClient, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must generate package status correctly
|
||||
"""
|
||||
assert web_client._package_url(package_ahriman.base).startswith(web_client.address)
|
||||
assert web_client._package_url(package_ahriman.base).endswith(f"/api/v1/packages/{package_ahriman.base}")
|
||||
|
||||
|
||||
def test_add(web_client: WebClient, package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must process package addition
|
||||
"""
|
||||
requests_mock = mocker.patch("requests.post")
|
||||
requests_mock = mocker.patch("requests.Session.post")
|
||||
payload = pytest.helpers.get_package_status(package_ahriman)
|
||||
|
||||
web_client.add(package_ahriman, BuildStatusEnum.Unknown)
|
||||
@ -50,7 +106,7 @@ def test_add_failed(web_client: WebClient, package_ahriman: Package, mocker: Moc
|
||||
"""
|
||||
must suppress any exception happened during addition
|
||||
"""
|
||||
mocker.patch("requests.post", side_effect=Exception())
|
||||
mocker.patch("requests.Session.post", side_effect=Exception())
|
||||
web_client.add(package_ahriman, BuildStatusEnum.Unknown)
|
||||
|
||||
|
||||
@ -58,7 +114,7 @@ def test_add_failed_http_error(web_client: WebClient, package_ahriman: Package,
|
||||
"""
|
||||
must suppress any exception happened during addition
|
||||
"""
|
||||
mocker.patch("requests.post", side_effect=requests.exceptions.HTTPError())
|
||||
mocker.patch("requests.Session.post", side_effect=requests.exceptions.HTTPError())
|
||||
web_client.add(package_ahriman, BuildStatusEnum.Unknown)
|
||||
|
||||
|
||||
@ -71,7 +127,7 @@ def test_get_all(web_client: WebClient, package_ahriman: Package, mocker: Mocker
|
||||
response_obj._content = json.dumps(response).encode("utf8")
|
||||
response_obj.status_code = 200
|
||||
|
||||
requests_mock = mocker.patch("requests.get", return_value=response_obj)
|
||||
requests_mock = mocker.patch("requests.Session.get", return_value=response_obj)
|
||||
|
||||
result = web_client.get(None)
|
||||
requests_mock.assert_called_once()
|
||||
@ -83,7 +139,7 @@ def test_get_failed(web_client: WebClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must suppress any exception happened during status getting
|
||||
"""
|
||||
mocker.patch("requests.get", side_effect=Exception())
|
||||
mocker.patch("requests.Session.get", side_effect=Exception())
|
||||
assert web_client.get(None) == []
|
||||
|
||||
|
||||
@ -91,7 +147,7 @@ def test_get_failed_http_error(web_client: WebClient, mocker: MockerFixture) ->
|
||||
"""
|
||||
must suppress any exception happened during status getting
|
||||
"""
|
||||
mocker.patch("requests.get", side_effect=requests.exceptions.HTTPError())
|
||||
mocker.patch("requests.Session.get", side_effect=requests.exceptions.HTTPError())
|
||||
assert web_client.get(None) == []
|
||||
|
||||
|
||||
@ -104,7 +160,7 @@ def test_get_single(web_client: WebClient, package_ahriman: Package, mocker: Moc
|
||||
response_obj._content = json.dumps(response).encode("utf8")
|
||||
response_obj.status_code = 200
|
||||
|
||||
requests_mock = mocker.patch("requests.get", return_value=response_obj)
|
||||
requests_mock = mocker.patch("requests.Session.get", return_value=response_obj)
|
||||
|
||||
result = web_client.get(package_ahriman.base)
|
||||
requests_mock.assert_called_once()
|
||||
@ -120,7 +176,7 @@ def test_get_internal(web_client: WebClient, mocker: MockerFixture) -> None:
|
||||
response_obj._content = json.dumps(InternalStatus(architecture="x86_64").view()).encode("utf8")
|
||||
response_obj.status_code = 200
|
||||
|
||||
requests_mock = mocker.patch("requests.get", return_value=response_obj)
|
||||
requests_mock = mocker.patch("requests.Session.get", return_value=response_obj)
|
||||
|
||||
result = web_client.get_internal()
|
||||
requests_mock.assert_called_once()
|
||||
@ -131,7 +187,7 @@ def test_get_internal_failed(web_client: WebClient, mocker: MockerFixture) -> No
|
||||
"""
|
||||
must suppress any exception happened during web service status getting
|
||||
"""
|
||||
mocker.patch("requests.get", side_effect=Exception())
|
||||
mocker.patch("requests.Session.get", side_effect=Exception())
|
||||
assert web_client.get_internal() == InternalStatus()
|
||||
|
||||
|
||||
@ -139,7 +195,7 @@ def test_get_internal_failed_http_error(web_client: WebClient, mocker: MockerFix
|
||||
"""
|
||||
must suppress any exception happened during web service status getting
|
||||
"""
|
||||
mocker.patch("requests.get", side_effect=requests.exceptions.HTTPError())
|
||||
mocker.patch("requests.Session.get", side_effect=requests.exceptions.HTTPError())
|
||||
assert web_client.get_internal() == InternalStatus()
|
||||
|
||||
|
||||
@ -151,7 +207,7 @@ def test_get_self(web_client: WebClient, mocker: MockerFixture) -> None:
|
||||
response_obj._content = json.dumps(BuildStatus().view()).encode("utf8")
|
||||
response_obj.status_code = 200
|
||||
|
||||
requests_mock = mocker.patch("requests.get", return_value=response_obj)
|
||||
requests_mock = mocker.patch("requests.Session.get", return_value=response_obj)
|
||||
|
||||
result = web_client.get_self()
|
||||
requests_mock.assert_called_once()
|
||||
@ -162,7 +218,7 @@ def test_get_self_failed(web_client: WebClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must suppress any exception happened during service status getting
|
||||
"""
|
||||
mocker.patch("requests.get", side_effect=Exception())
|
||||
mocker.patch("requests.Session.get", side_effect=Exception())
|
||||
assert web_client.get_self().status == BuildStatusEnum.Unknown
|
||||
|
||||
|
||||
@ -170,7 +226,7 @@ def test_get_self_failed_http_error(web_client: WebClient, mocker: MockerFixture
|
||||
"""
|
||||
must suppress any exception happened during service status getting
|
||||
"""
|
||||
mocker.patch("requests.get", side_effect=requests.exceptions.HTTPError())
|
||||
mocker.patch("requests.Session.get", side_effect=requests.exceptions.HTTPError())
|
||||
assert web_client.get_self().status == BuildStatusEnum.Unknown
|
||||
|
||||
|
||||
@ -178,7 +234,7 @@ def test_remove(web_client: WebClient, package_ahriman: Package, mocker: MockerF
|
||||
"""
|
||||
must process package removal
|
||||
"""
|
||||
requests_mock = mocker.patch("requests.delete")
|
||||
requests_mock = mocker.patch("requests.Session.delete")
|
||||
|
||||
web_client.remove(package_ahriman.base)
|
||||
requests_mock.assert_called_with(pytest.helpers.anyvar(str, True))
|
||||
@ -188,7 +244,7 @@ def test_remove_failed(web_client: WebClient, package_ahriman: Package, mocker:
|
||||
"""
|
||||
must suppress any exception happened during removal
|
||||
"""
|
||||
mocker.patch("requests.delete", side_effect=Exception())
|
||||
mocker.patch("requests.Session.delete", side_effect=Exception())
|
||||
web_client.remove(package_ahriman.base)
|
||||
|
||||
|
||||
@ -196,7 +252,7 @@ def test_remove_failed_http_error(web_client: WebClient, package_ahriman: Packag
|
||||
"""
|
||||
must suppress any exception happened during removal
|
||||
"""
|
||||
mocker.patch("requests.delete", side_effect=requests.exceptions.HTTPError())
|
||||
mocker.patch("requests.Session.delete", side_effect=requests.exceptions.HTTPError())
|
||||
web_client.remove(package_ahriman.base)
|
||||
|
||||
|
||||
@ -204,7 +260,7 @@ def test_update(web_client: WebClient, package_ahriman: Package, mocker: MockerF
|
||||
"""
|
||||
must process package update
|
||||
"""
|
||||
requests_mock = mocker.patch("requests.post")
|
||||
requests_mock = mocker.patch("requests.Session.post")
|
||||
|
||||
web_client.update(package_ahriman.base, BuildStatusEnum.Unknown)
|
||||
requests_mock.assert_called_with(pytest.helpers.anyvar(str, True), json={"status": BuildStatusEnum.Unknown.value})
|
||||
@ -214,7 +270,7 @@ def test_update_failed(web_client: WebClient, package_ahriman: Package, mocker:
|
||||
"""
|
||||
must suppress any exception happened during update
|
||||
"""
|
||||
mocker.patch("requests.post", side_effect=Exception())
|
||||
mocker.patch("requests.Session.post", side_effect=Exception())
|
||||
web_client.update(package_ahriman.base, BuildStatusEnum.Unknown)
|
||||
|
||||
|
||||
@ -222,7 +278,7 @@ def test_update_failed_http_error(web_client: WebClient, package_ahriman: Packag
|
||||
"""
|
||||
must suppress any exception happened during update
|
||||
"""
|
||||
mocker.patch("requests.post", side_effect=requests.exceptions.HTTPError())
|
||||
mocker.patch("requests.Session.post", side_effect=requests.exceptions.HTTPError())
|
||||
web_client.update(package_ahriman.base, BuildStatusEnum.Unknown)
|
||||
|
||||
|
||||
@ -230,7 +286,7 @@ def test_update_self(web_client: WebClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must process service update
|
||||
"""
|
||||
requests_mock = mocker.patch("requests.post")
|
||||
requests_mock = mocker.patch("requests.Session.post")
|
||||
|
||||
web_client.update_self(BuildStatusEnum.Unknown)
|
||||
requests_mock.assert_called_with(pytest.helpers.anyvar(str, True), json={"status": BuildStatusEnum.Unknown.value})
|
||||
@ -240,7 +296,7 @@ def test_update_self_failed(web_client: WebClient, mocker: MockerFixture) -> Non
|
||||
"""
|
||||
must suppress any exception happened during service update
|
||||
"""
|
||||
mocker.patch("requests.post", side_effect=Exception())
|
||||
mocker.patch("requests.Session.post", side_effect=Exception())
|
||||
web_client.update_self(BuildStatusEnum.Unknown)
|
||||
|
||||
|
||||
@ -248,5 +304,5 @@ def test_update_self_failed_http_error(web_client: WebClient, mocker: MockerFixt
|
||||
"""
|
||||
must suppress any exception happened during service update
|
||||
"""
|
||||
mocker.patch("requests.post", side_effect=requests.exceptions.HTTPError())
|
||||
mocker.patch("requests.Session.post", side_effect=requests.exceptions.HTTPError())
|
||||
web_client.update_self(BuildStatusEnum.Unknown)
|
||||
|
@ -9,7 +9,7 @@ def test_from_path(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must load configuration
|
||||
"""
|
||||
read_mock = mocker.patch("configparser.RawConfigParser.read")
|
||||
read_mock = mocker.patch("ahriman.core.configuration.Configuration.read")
|
||||
load_includes_mock = mocker.patch("ahriman.core.configuration.Configuration.load_includes")
|
||||
load_logging_mock = mocker.patch("ahriman.core.configuration.Configuration.load_logging")
|
||||
path = Path("path")
|
||||
@ -33,7 +33,7 @@ def test_absolute_path_for_absolute(configuration: Configuration) -> None:
|
||||
must not change path for absolute path in settings
|
||||
"""
|
||||
path = Path("/a/b/c")
|
||||
configuration.set("build", "path", str(path))
|
||||
configuration.set_option("build", "path", str(path))
|
||||
assert configuration.getpath("build", "path") == path
|
||||
|
||||
|
||||
@ -42,7 +42,7 @@ def test_absolute_path_for_relative(configuration: Configuration) -> None:
|
||||
must prepend root path to relative path
|
||||
"""
|
||||
path = Path("a")
|
||||
configuration.set("build", "path", str(path))
|
||||
configuration.set_option("build", "path", str(path))
|
||||
result = configuration.getpath("build", "path")
|
||||
assert result.is_absolute()
|
||||
assert result.parent == configuration.path.parent
|
||||
@ -61,8 +61,7 @@ def test_dump_architecture_specific(configuration: Configuration) -> None:
|
||||
dump must contain architecture specific settings
|
||||
"""
|
||||
section = configuration.section_name("build", "x86_64")
|
||||
configuration.add_section(section)
|
||||
configuration.set(section, "archbuild_flags", "hello flag")
|
||||
configuration.set_option(section, "archbuild_flags", "hello flag")
|
||||
configuration.merge_sections("x86_64")
|
||||
|
||||
dump = configuration.dump()
|
||||
@ -76,7 +75,7 @@ def test_getlist(configuration: Configuration) -> None:
|
||||
"""
|
||||
must return list of string correctly
|
||||
"""
|
||||
configuration.set("build", "test_list", "a b c")
|
||||
configuration.set_option("build", "test_list", "a b c")
|
||||
assert configuration.getlist("build", "test_list") == ["a", "b", "c"]
|
||||
|
||||
|
||||
@ -85,7 +84,7 @@ def test_getlist_empty(configuration: Configuration) -> None:
|
||||
must return list of string correctly for non-existing option
|
||||
"""
|
||||
assert configuration.getlist("build", "test_list") == []
|
||||
configuration.set("build", "test_list", "")
|
||||
configuration.set_option("build", "test_list", "")
|
||||
assert configuration.getlist("build", "test_list") == []
|
||||
|
||||
|
||||
@ -93,7 +92,7 @@ def test_getlist_single(configuration: Configuration) -> None:
|
||||
"""
|
||||
must return list of strings for single string
|
||||
"""
|
||||
configuration.set("build", "test_list", "a")
|
||||
configuration.set_option("build", "test_list", "a")
|
||||
assert configuration.getlist("build", "test_list") == ["a"]
|
||||
|
||||
|
||||
@ -101,7 +100,7 @@ def test_load_includes_missing(configuration: Configuration) -> None:
|
||||
"""
|
||||
must not fail if not include directory found
|
||||
"""
|
||||
configuration.set("settings", "include", "path")
|
||||
configuration.set_option("settings", "include", "path")
|
||||
configuration.load_includes()
|
||||
|
||||
|
||||
@ -144,8 +143,23 @@ def test_merge_sections_missing(configuration: Configuration) -> None:
|
||||
"""
|
||||
section = configuration.section_name("build", "x86_64")
|
||||
configuration.remove_section("build")
|
||||
configuration.add_section(section)
|
||||
configuration.set(section, "key", "value")
|
||||
configuration.set_option(section, "key", "value")
|
||||
|
||||
configuration.merge_sections("x86_64")
|
||||
assert configuration.get("build", "key") == "value"
|
||||
|
||||
|
||||
def test_set_option(configuration: Configuration) -> None:
|
||||
"""
|
||||
must set option correctly
|
||||
"""
|
||||
configuration.set_option("settings", "option", "value")
|
||||
assert configuration.get("settings", "option") == "value"
|
||||
|
||||
|
||||
def test_set_option_new_section(configuration: Configuration) -> None:
|
||||
"""
|
||||
must set option correctly even if no section found
|
||||
"""
|
||||
configuration.set_option("section", "option", "value")
|
||||
assert configuration.get("section", "option") == "value"
|
||||
|
@ -61,6 +61,8 @@ def test_get_local_files(s3: S3, resource_path_root: Path) -> None:
|
||||
Path("models/package_yay_srcinfo"),
|
||||
Path("web/templates/search-line.jinja2"),
|
||||
Path("web/templates/build-status.jinja2"),
|
||||
Path("web/templates/login-form.jinja2"),
|
||||
Path("web/templates/login-form-hide.jinja2"),
|
||||
Path("web/templates/repo-index.jinja2"),
|
||||
Path("web/templates/sorttable.jinja2"),
|
||||
Path("web/templates/style.jinja2"),
|
||||
|
36
tests/ahriman/models/test_auth_settings.py
Normal file
36
tests/ahriman/models/test_auth_settings.py
Normal file
@ -0,0 +1,36 @@
|
||||
import pytest
|
||||
|
||||
from ahriman.core.exceptions import InvalidOption
|
||||
from ahriman.models.auth_settings import AuthSettings
|
||||
|
||||
|
||||
def test_from_option_invalid() -> None:
|
||||
"""
|
||||
must raise exception on invalid option
|
||||
"""
|
||||
with pytest.raises(InvalidOption, match=".* `invalid`$"):
|
||||
AuthSettings.from_option("invalid")
|
||||
|
||||
|
||||
def test_from_option_valid() -> None:
|
||||
"""
|
||||
must return value from valid options
|
||||
"""
|
||||
assert AuthSettings.from_option("disabled") == AuthSettings.Disabled
|
||||
assert AuthSettings.from_option("DISABLED") == AuthSettings.Disabled
|
||||
assert AuthSettings.from_option("no") == AuthSettings.Disabled
|
||||
assert AuthSettings.from_option("NO") == AuthSettings.Disabled
|
||||
|
||||
assert AuthSettings.from_option("configuration") == AuthSettings.Configuration
|
||||
assert AuthSettings.from_option("ConFigUration") == AuthSettings.Configuration
|
||||
assert AuthSettings.from_option("mapping") == AuthSettings.Configuration
|
||||
assert AuthSettings.from_option("MAPPing") == AuthSettings.Configuration
|
||||
|
||||
|
||||
def test_is_enabled() -> None:
|
||||
"""
|
||||
must mark as disabled authorization for disabled and enabled otherwise
|
||||
"""
|
||||
assert not AuthSettings.Disabled.is_enabled
|
||||
for option in filter(lambda o: o != AuthSettings.Disabled, AuthSettings):
|
||||
assert option.is_enabled
|
75
tests/ahriman/models/test_user.py
Normal file
75
tests/ahriman/models/test_user.py
Normal file
@ -0,0 +1,75 @@
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
|
||||
def test_from_option(user: User) -> None:
|
||||
"""
|
||||
must generate user from options
|
||||
"""
|
||||
assert User.from_option(user.username, user.password) == user
|
||||
# default is status access
|
||||
user.access = UserAccess.Write
|
||||
assert User.from_option(user.username, user.password) != user
|
||||
|
||||
|
||||
def test_from_option_empty() -> None:
|
||||
"""
|
||||
must return nothing if settings are missed
|
||||
"""
|
||||
assert User.from_option(None, "") is None
|
||||
assert User.from_option("", None) is None
|
||||
assert User.from_option(None, None) is None
|
||||
|
||||
|
||||
def test_check_credentials_hash_password(user: User) -> None:
|
||||
"""
|
||||
must generate and validate user password
|
||||
"""
|
||||
current_password = user.password
|
||||
user.password = user.hash_password(current_password, "salt")
|
||||
assert user.check_credentials(current_password, "salt")
|
||||
assert not user.check_credentials(current_password, "salt1")
|
||||
assert not user.check_credentials(user.password, "salt")
|
||||
|
||||
|
||||
def test_generate_password() -> None:
|
||||
"""
|
||||
must generate password with specified length
|
||||
"""
|
||||
password = User.generate_password(16)
|
||||
assert password
|
||||
assert len(password) == 16
|
||||
|
||||
password = User.generate_password(42)
|
||||
assert password
|
||||
assert len(password) == 42
|
||||
|
||||
|
||||
def test_verify_access_read(user: User) -> None:
|
||||
"""
|
||||
user with read access must be able to only request read
|
||||
"""
|
||||
user.access = UserAccess.Read
|
||||
assert user.verify_access(UserAccess.Read)
|
||||
assert not user.verify_access(UserAccess.Write)
|
||||
assert not user.verify_access(UserAccess.Status)
|
||||
|
||||
|
||||
def test_verify_access_status(user: User) -> None:
|
||||
"""
|
||||
user with status access must be able to only request status
|
||||
"""
|
||||
user.access = UserAccess.Status
|
||||
assert not user.verify_access(UserAccess.Read)
|
||||
assert not user.verify_access(UserAccess.Write)
|
||||
assert user.verify_access(UserAccess.Status)
|
||||
|
||||
|
||||
def test_verify_access_write(user: User) -> None:
|
||||
"""
|
||||
user with write access must be able to do anything
|
||||
"""
|
||||
user.access = UserAccess.Write
|
||||
assert user.verify_access(UserAccess.Read)
|
||||
assert user.verify_access(UserAccess.Write)
|
||||
assert user.verify_access(UserAccess.Status)
|
0
tests/ahriman/models/test_user_access.py
Normal file
0
tests/ahriman/models/test_user_access.py
Normal file
@ -3,7 +3,10 @@ import pytest
|
||||
from aiohttp import web
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
import ahriman.core.auth.helpers
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.web.web import setup_service
|
||||
|
||||
|
||||
@ -15,5 +18,26 @@ def application(configuration: Configuration, mocker: MockerFixture) -> web.Appl
|
||||
:param mocker: mocker object
|
||||
:return: application test instance
|
||||
"""
|
||||
mocker.patch.object(ahriman.core.auth.helpers, "_has_aiohttp_security", False)
|
||||
mocker.patch("pathlib.Path.mkdir")
|
||||
return setup_service("x86_64", configuration)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def application_with_auth(configuration: Configuration, user: User, mocker: MockerFixture) -> web.Application:
|
||||
"""
|
||||
application fixture with auth enabled
|
||||
:param configuration: configuration fixture
|
||||
:param user: user descriptor fixture
|
||||
:param mocker: mocker object
|
||||
:return: application test instance
|
||||
"""
|
||||
configuration.set_option("auth", "target", "configuration")
|
||||
mocker.patch.object(ahriman.core.auth.helpers, "_has_aiohttp_security", True)
|
||||
mocker.patch("pathlib.Path.mkdir")
|
||||
application = setup_service("x86_64", configuration)
|
||||
|
||||
generated = User(user.username, user.hash_password(user.password, application["validator"].salt), user.access)
|
||||
application["validator"]._users[generated.username] = generated
|
||||
|
||||
return application
|
||||
|
@ -2,8 +2,12 @@ import pytest
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.web.middlewares.auth_handler import AuthorizationPolicy
|
||||
|
||||
_request = namedtuple("_request", ["path"])
|
||||
_request = namedtuple("_request", ["path", "method"])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -12,4 +16,17 @@ def aiohttp_request() -> _request:
|
||||
fixture for aiohttp like object
|
||||
:return: aiohttp like request test instance
|
||||
"""
|
||||
return _request("path")
|
||||
return _request("path", "GET")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def authorization_policy(configuration: Configuration, user: User) -> AuthorizationPolicy:
|
||||
"""
|
||||
fixture for authorization policy
|
||||
:return: authorization policy fixture
|
||||
"""
|
||||
configuration.set_option("auth", "target", "configuration")
|
||||
validator = Auth.load(configuration)
|
||||
policy = AuthorizationPolicy(validator)
|
||||
policy.validator._users = {user.username: user}
|
||||
return policy
|
||||
|
100
tests/ahriman/web/middlewares/test_auth_handler.py
Normal file
100
tests/ahriman/web/middlewares/test_auth_handler.py
Normal file
@ -0,0 +1,100 @@
|
||||
from aiohttp import web
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.web.middlewares.auth_handler import auth_handler, AuthorizationPolicy, setup_auth
|
||||
|
||||
|
||||
async def test_authorized_userid(authorization_policy: AuthorizationPolicy, user: User) -> None:
|
||||
"""
|
||||
must return authorized user id
|
||||
"""
|
||||
assert await authorization_policy.authorized_userid(user.username) == user.username
|
||||
assert await authorization_policy.authorized_userid("some random name") is None
|
||||
|
||||
|
||||
async def test_permits(authorization_policy: AuthorizationPolicy, user: User) -> None:
|
||||
"""
|
||||
must call validator check
|
||||
"""
|
||||
authorization_policy.validator = MagicMock()
|
||||
authorization_policy.validator.verify_access.return_value = True
|
||||
|
||||
assert await authorization_policy.permits(user.username, user.access, "/endpoint")
|
||||
authorization_policy.validator.verify_access.assert_called_with(user.username, user.access, "/endpoint")
|
||||
|
||||
|
||||
async def test_auth_handler_api(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for status permission for api calls
|
||||
"""
|
||||
aiohttp_request = aiohttp_request._replace(path="/api")
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Status, aiohttp_request.path)
|
||||
|
||||
|
||||
async def test_auth_handler_api_post(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for status permission for api calls with POST
|
||||
"""
|
||||
aiohttp_request = aiohttp_request._replace(path="/api", method="POST")
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Status, aiohttp_request.path)
|
||||
|
||||
|
||||
async def test_auth_handler_read(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for read permission for api calls with GET
|
||||
"""
|
||||
for method in ("GET", "HEAD", "OPTIONS"):
|
||||
aiohttp_request = aiohttp_request._replace(method=method)
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Read, aiohttp_request.path)
|
||||
|
||||
|
||||
async def test_auth_handler_write(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for read permission for api calls with POST
|
||||
"""
|
||||
for method in ("CONNECT", "DELETE", "PATCH", "POST", "PUT", "TRACE"):
|
||||
aiohttp_request = aiohttp_request._replace(method=method)
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Write, aiohttp_request.path)
|
||||
|
||||
|
||||
def test_setup_auth(
|
||||
application_with_auth: web.Application,
|
||||
configuration: Configuration,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must setup authorization
|
||||
"""
|
||||
aiohttp_security_setup_mock = mocker.patch("aiohttp_security.setup")
|
||||
application = setup_auth(application_with_auth, configuration)
|
||||
assert application.get("validator") is not None
|
||||
aiohttp_security_setup_mock.assert_called_once()
|
@ -35,9 +35,22 @@ def test_run(application: web.Application, mocker: MockerFixture) -> None:
|
||||
must run application
|
||||
"""
|
||||
port = 8080
|
||||
application["configuration"].set("web", "port", str(port))
|
||||
application["configuration"].set_option("web", "port", str(port))
|
||||
run_application_mock = mocker.patch("aiohttp.web.run_app")
|
||||
|
||||
run_server(application)
|
||||
run_application_mock.assert_called_with(application, host="127.0.0.1", port=port,
|
||||
handle_signals=False, access_log=pytest.helpers.anyvar(int))
|
||||
|
||||
|
||||
def test_run_with_auth(application_with_auth: web.Application, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must run application
|
||||
"""
|
||||
port = 8080
|
||||
application_with_auth["configuration"].set_option("web", "port", str(port))
|
||||
run_application_mock = mocker.patch("aiohttp.web.run_app")
|
||||
|
||||
run_server(application_with_auth)
|
||||
run_application_mock.assert_called_with(application_with_auth, host="127.0.0.1", port=port,
|
||||
handle_signals=False, access_log=pytest.helpers.anyvar(int))
|
||||
|
@ -20,3 +20,18 @@ def client(application: web.Application, loop: BaseEventLoop,
|
||||
"""
|
||||
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
||||
return loop.run_until_complete(aiohttp_client(application))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client_with_auth(application_with_auth: web.Application, loop: BaseEventLoop,
|
||||
aiohttp_client: Any, mocker: MockerFixture) -> TestClient:
|
||||
"""
|
||||
web client fixture with full authorization functions
|
||||
:param application_with_auth: application fixture
|
||||
:param loop: context event loop
|
||||
:param aiohttp_client: aiohttp client fixture
|
||||
:param mocker: mocker object
|
||||
:return: web client test instance
|
||||
"""
|
||||
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
||||
return loop.run_until_complete(aiohttp_client(application_with_auth))
|
||||
|
@ -1,19 +1,28 @@
|
||||
from pytest_aiohttp import TestClient
|
||||
|
||||
|
||||
async def test_get(client: TestClient) -> None:
|
||||
async def test_get(client_with_auth: TestClient) -> None:
|
||||
"""
|
||||
must generate status page correctly (/)
|
||||
"""
|
||||
response = await client_with_auth.get("/")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
||||
|
||||
async def test_get_index(client_with_auth: TestClient) -> None:
|
||||
"""
|
||||
must generate status page correctly (/index.html)
|
||||
"""
|
||||
response = await client_with_auth.get("/index.html")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
||||
|
||||
async def test_get_without_auth(client: TestClient) -> None:
|
||||
"""
|
||||
must use dummy authorized_userid function in case if no security library installed
|
||||
"""
|
||||
response = await client.get("/")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
||||
|
||||
async def test_get_index(client: TestClient) -> None:
|
||||
"""
|
||||
must generate status page correctly (/index.html)
|
||||
"""
|
||||
response = await client.get("/index.html")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
41
tests/ahriman/web/views/test_view_login.py
Normal file
41
tests/ahriman/web/views/test_view_login.py
Normal file
@ -0,0 +1,41 @@
|
||||
from aiohttp.test_utils import TestClient
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from ahriman.models.user import User
|
||||
|
||||
|
||||
async def test_post(client_with_auth: TestClient, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must login user correctly
|
||||
"""
|
||||
payload = {"username": user.username, "password": user.password}
|
||||
remember_mock = mocker.patch("aiohttp_security.remember")
|
||||
|
||||
post_response = await client_with_auth.post("/login", json=payload)
|
||||
assert post_response.status == 200
|
||||
|
||||
post_response = await client_with_auth.post("/login", data=payload)
|
||||
assert post_response.status == 200
|
||||
|
||||
remember_mock.assert_called()
|
||||
|
||||
|
||||
async def test_post_skip(client: TestClient, user: User) -> None:
|
||||
"""
|
||||
must process if no auth configured
|
||||
"""
|
||||
payload = {"username": user.username, "password": user.password}
|
||||
post_response = await client.post("/login", json=payload)
|
||||
assert post_response.status == 200
|
||||
|
||||
|
||||
async def test_post_unauthorized(client_with_auth: TestClient, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must return unauthorized on invalid auth
|
||||
"""
|
||||
payload = {"username": user.username, "password": ""}
|
||||
remember_mock = mocker.patch("aiohttp_security.remember")
|
||||
|
||||
post_response = await client_with_auth.post("/login", json=payload)
|
||||
assert post_response.status == 401
|
||||
remember_mock.assert_not_called()
|
35
tests/ahriman/web/views/test_view_logout.py
Normal file
35
tests/ahriman/web/views/test_view_logout.py
Normal file
@ -0,0 +1,35 @@
|
||||
from aiohttp.test_utils import TestClient
|
||||
from aiohttp.web import HTTPUnauthorized
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
|
||||
async def test_post(client_with_auth: TestClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must logout user correctly
|
||||
"""
|
||||
mocker.patch("aiohttp_security.check_authorized")
|
||||
forget_mock = mocker.patch("aiohttp_security.forget")
|
||||
|
||||
post_response = await client_with_auth.post("/logout")
|
||||
assert post_response.status == 200
|
||||
forget_mock.assert_called_once()
|
||||
|
||||
|
||||
async def test_post_unauthorized(client_with_auth: TestClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must raise exception if unauthorized
|
||||
"""
|
||||
mocker.patch("aiohttp_security.check_authorized", side_effect=HTTPUnauthorized())
|
||||
forget_mock = mocker.patch("aiohttp_security.forget")
|
||||
|
||||
post_response = await client_with_auth.post("/logout")
|
||||
assert post_response.status == 401
|
||||
forget_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_post_disabled(client: TestClient) -> None:
|
||||
"""
|
||||
must raise exception if auth is disabled
|
||||
"""
|
||||
post_response = await client.post("/logout")
|
||||
assert post_response.status == 200
|
@ -8,6 +8,9 @@ database = /var/lib/pacman
|
||||
repositories = core extra community multilib
|
||||
root = /
|
||||
|
||||
[auth]
|
||||
salt = salt
|
||||
|
||||
[build]
|
||||
archbuild_flags =
|
||||
build_command = extra-x86_64-build
|
||||
|
Reference in New Issue
Block a user