add tests

This commit is contained in:
2021-08-27 03:56:17 +03:00
parent 3e044fd006
commit 89aa8f0b45
23 changed files with 684 additions and 34 deletions

View File

@ -19,8 +19,10 @@
# #
import argparse import argparse
import configparser import configparser
import getpass
import random
import string
from getpass import getpass
from pathlib import Path from pathlib import Path
from typing import Type from typing import Type
@ -42,38 +44,60 @@ class CreateUser(Handler):
:param architecture: repository architecture :param architecture: repository architecture
:param configuration: configuration instance :param configuration: configuration instance
""" """
user = CreateUser.create_user(args, configuration) salt = CreateUser.get_salt(configuration)
CreateUser.create_configuration(user, configuration.include) user = CreateUser.create_user(args, salt)
CreateUser.create_configuration(user, salt, configuration.include)
@staticmethod @staticmethod
def create_configuration(user: User, include_path: Path) -> None: def create_configuration(user: User, salt: str, include_path: Path) -> None:
""" """
put new user to configuration put new user to configuration
:param user: user descriptor :param user: user descriptor
:param salt: password hash salt
:param include_path: path to directory with configuration includes :param include_path: path to directory with configuration includes
""" """
def set_option(section_name: str, name: str, value: str) -> None:
if section_name not in configuration.sections():
configuration.add_section(section_name)
configuration.set(section_name, name, value)
target = include_path / "auth.ini" target = include_path / "auth.ini"
configuration = configparser.ConfigParser() configuration = configparser.ConfigParser()
configuration.read(target) if target.is_file(): # load current configuration in case if it exists
configuration.read(target)
section = Configuration.section_name("auth", user.access.value) section = Configuration.section_name("auth", user.access.value)
configuration.add_section(section) set_option("auth", "salt", salt)
configuration.set(section, user.username, user.password) set_option(section, user.username, user.password)
with target.open("w") as ahriman_configuration: with target.open("w") as ahriman_configuration:
configuration.write(ahriman_configuration) configuration.write(ahriman_configuration)
@staticmethod @staticmethod
def create_user(args: argparse.Namespace, configuration: Configuration) -> User: def create_user(args: argparse.Namespace, salt: str) -> User:
""" """
create user descriptor from arguments create user descriptor from arguments
:param args: command line args :param args: command line args
:param configuration: configuration instance :param salt: password hash salt
:return: built user descriptor :return: built user descriptor
""" """
user = User(args.username, args.password, args.role) user = User(args.username, args.password, args.role)
if user.password is None: if user.password is None:
user.password = getpass() user.password = getpass.getpass()
user.password = user.generate_password(user.password, configuration.get("auth", "salt")) user.password = user.generate_password(user.password, salt)
return user return user
@staticmethod
def get_salt(configuration: Configuration, salt_length: int = 20) -> str:
"""
get salt from configuration or create new string
:param configuration: configuration instance
:param salt_length: salt length
:return: current salt
"""
salt = configuration.get("auth", "salt", fallback=None)
if salt:
return salt
return "".join(random.choices(string.ascii_letters + string.digits, k=salt_length))

View File

@ -35,7 +35,7 @@ class Auth:
:cvar ALLOWED_PATHS_GROUPS: URI paths prefixes which can be accessed without authorization, predefined :cvar ALLOWED_PATHS_GROUPS: URI paths prefixes which can be accessed without authorization, predefined
""" """
ALLOWED_PATHS = {"/", "/favicon.ico", "/login", "/logout"} ALLOWED_PATHS = {"/favicon.ico", "/login", "/logout"}
ALLOWED_PATHS_GROUPS: Set[str] = set() ALLOWED_PATHS_GROUPS: Set[str] = set()
def __init__(self, configuration: Configuration) -> None: def __init__(self, configuration: Configuration) -> None:
@ -84,9 +84,9 @@ class Auth:
:param uri: request uri :param uri: request uri
:return: True in case if this URI can be requested without authorization and False otherwise :return: True in case if this URI can be requested without authorization and False otherwise
""" """
if uri is None: if not uri:
return False # request without context is not allowed return False # request without context is not allowed
return uri in self.ALLOWED_PATHS or any(uri.startswith(path) for path in self.ALLOWED_PATHS_GROUPS) return uri in self.allowed_paths or any(uri.startswith(path) for path in self.allowed_paths_groups)
def verify_access(self, username: str, required: UserAccess) -> bool: def verify_access(self, username: str, required: UserAccess) -> bool:
""" """

View File

@ -39,9 +39,10 @@ class Client:
:param configuration: configuration instance :param configuration: configuration instance
:return: client according to current settings :return: client according to current settings
""" """
address = configuration.get("web", "address", fallback=None)
host = configuration.get("web", "host", fallback=None) host = configuration.get("web", "host", fallback=None)
port = configuration.getint("web", "port", fallback=None) port = configuration.getint("web", "port", fallback=None)
if host is not None and port is not None: if address or (host and port):
from ahriman.core.status.web_client import WebClient from ahriman.core.status.web_client import WebClient
return WebClient(configuration) return WebClient(configuration)
return cls() return cls()

View File

@ -51,7 +51,7 @@ class WebClient(Client):
configuration.get("web", "password", fallback=None)) configuration.get("web", "password", fallback=None))
self.__session = requests.session() self.__session = requests.session()
self.login() self._login()
@property @property
def _ahriman_url(self) -> str: def _ahriman_url(self) -> str:
@ -89,7 +89,7 @@ class WebClient(Client):
address = f"http://{host}:{port}" address = f"http://{host}:{port}"
return address return address
def login(self) -> None: def _login(self) -> None:
""" """
process login to the service process login to the service
""" """

View File

@ -17,11 +17,11 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
import aiohttp_security # type: ignore
from aiohttp import web from aiohttp import web
from aiohttp.web import middleware, Request from aiohttp.web import middleware, Request
from aiohttp.web_response import StreamResponse from aiohttp.web_response import StreamResponse
from aiohttp_security import setup as setup_security # type: ignore
from aiohttp_security import AbstractAuthorizationPolicy, SessionIdentityPolicy, check_permission
from typing import Optional from typing import Optional
from ahriman.core.auth import Auth from ahriman.core.auth import Auth
@ -30,7 +30,7 @@ from ahriman.models.user_access import UserAccess
from ahriman.web.middlewares import HandlerType, MiddlewareType from ahriman.web.middlewares import HandlerType, MiddlewareType
class AuthorizationPolicy(AbstractAuthorizationPolicy): # type: ignore class AuthorizationPolicy(aiohttp_security.AbstractAuthorizationPolicy): # type: ignore
""" """
authorization policy implementation authorization policy implementation
:ivar validator: validator instance :ivar validator: validator instance
@ -71,13 +71,14 @@ def auth_handler() -> MiddlewareType:
""" """
@middleware @middleware
async def handle(request: Request, handler: HandlerType) -> StreamResponse: async def handle(request: Request, handler: HandlerType) -> StreamResponse:
print(request)
if request.path.startswith("/api"): if request.path.startswith("/api"):
permission = UserAccess.Status permission = UserAccess.Status
elif request.method in ("HEAD", "GET", "OPTIONS"): elif request.method in ("GET", "HEAD", "OPTIONS"):
permission = UserAccess.Read permission = UserAccess.Read
else: else:
permission = UserAccess.Write permission = UserAccess.Write
await check_permission(request, permission, request.path) await aiohttp_security.check_permission(request, permission, request.path)
return await handler(request) return await handler(request)
@ -92,10 +93,10 @@ def setup_auth(application: web.Application, configuration: Configuration) -> we
:return: configured web application :return: configured web application
""" """
authorization_policy = AuthorizationPolicy(configuration) authorization_policy = AuthorizationPolicy(configuration)
identity_policy = SessionIdentityPolicy() identity_policy = aiohttp_security.SessionIdentityPolicy()
application["validator"] = authorization_policy.validator application["validator"] = authorization_policy.validator
setup_security(application, identity_policy, authorization_policy) aiohttp_security.setup(application, identity_policy, authorization_policy)
application.middlewares.append(auth_handler()) application.middlewares.append(auth_handler())
return application return application

View File

@ -17,8 +17,9 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
import aiohttp_security # type: ignore
from aiohttp.web import HTTPFound, HTTPUnauthorized, Response from aiohttp.web import HTTPFound, HTTPUnauthorized, Response
from aiohttp_security import remember # type: ignore
from ahriman.web.views.base import BaseView from ahriman.web.views.base import BaseView
@ -44,8 +45,11 @@ class LoginView(BaseView):
username = data.get("username") username = data.get("username")
response = HTTPFound("/") response = HTTPFound("/")
if self.validator.check_credentials(username, data.get("password")): try:
await remember(self.request, response, username) if self.validator.check_credentials(username, data.get("password")):
await aiohttp_security.remember(self.request, response, username)
return response
except KeyError:
return response return response
raise HTTPUnauthorized() raise HTTPUnauthorized()

View File

@ -17,8 +17,9 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
import aiohttp_security # type: ignore
from aiohttp.web import HTTPFound, Response from aiohttp.web import HTTPFound, Response
from aiohttp_security import check_authorized, forget # type: ignore
from ahriman.web.views.base import BaseView from ahriman.web.views.base import BaseView
@ -33,9 +34,9 @@ class LogoutView(BaseView):
logout user from the service. No parameters supported here logout user from the service. No parameters supported here
:return: redirect to main page :return: redirect to main page
""" """
await check_authorized(self.request) await aiohttp_security.check_authorized(self.request)
response = HTTPFound("/") response = HTTPFound("/")
await forget(self.request, response) await aiohttp_security.forget(self.request, response)
return response return response

View File

@ -0,0 +1,132 @@
import argparse
import pytest
from pathlib import Path
from pytest_mock import MockerFixture
from unittest import mock
from ahriman.application.handlers import CreateUser
from ahriman.core.configuration import Configuration
from ahriman.models.user import User
from ahriman.models.user_access import UserAccess
def _default_args(args: argparse.Namespace) -> argparse.Namespace:
"""
default arguments for these test cases
:param args: command line arguments fixture
:return: generated arguments for these test cases
"""
args.username = "user"
args.password = "pa55w0rd"
args.role = UserAccess.Status
return args
def test_run(args: argparse.Namespace, configuration: Configuration, mocker: MockerFixture) -> None:
"""
must run command
"""
args = _default_args(args)
create_configuration_mock = mocker.patch("ahriman.application.handlers.CreateUser.create_configuration")
create_user = mocker.patch("ahriman.application.handlers.CreateUser.create_user")
get_salt_mock = mocker.patch("ahriman.application.handlers.CreateUser.get_salt")
CreateUser.run(args, "x86_64", configuration)
create_configuration_mock.assert_called_once()
create_user.assert_called_once()
get_salt_mock.assert_called_once()
def test_create_configuration(user: User, mocker: MockerFixture) -> None:
"""
must correctly create configuration file
"""
section = Configuration.section_name("auth", user.access.value)
mocker.patch("pathlib.Path.open")
add_section_mock = mocker.patch("configparser.RawConfigParser.add_section")
set_mock = mocker.patch("configparser.RawConfigParser.set")
write_mock = mocker.patch("configparser.RawConfigParser.write")
CreateUser.create_configuration(user, "salt", Path("path"))
write_mock.assert_called_once()
add_section_mock.assert_has_calls([mock.call("auth"), mock.call(section)])
set_mock.assert_has_calls([
mock.call("auth", "salt", pytest.helpers.anyvar(str)),
mock.call(section, user.username, user.password)
])
def test_create_configuration_user_exists(configuration: Configuration, user: User, mocker: MockerFixture) -> None:
"""
must correctly update configuration file if user already exists
"""
section = Configuration.section_name("auth", user.access.value)
configuration.add_section(section)
configuration.set(section, user.username, "")
mocker.patch("pathlib.Path.open")
mocker.patch("configparser.ConfigParser", return_value=configuration)
mocker.patch("configparser.RawConfigParser.write")
add_section_mock = mocker.patch("configparser.RawConfigParser.add_section")
CreateUser.create_configuration(user, "salt", Path("path"))
add_section_mock.assert_not_called()
assert configuration.get(section, user.username) == user.password
def test_create_configuration_file_exists(user: User, mocker: MockerFixture) -> None:
"""
must correctly update configuration file if file already exists
"""
mocker.patch("pathlib.Path.open")
mocker.patch("pathlib.Path.is_file", return_value=True)
mocker.patch("configparser.RawConfigParser.write")
read_mock = mocker.patch("configparser.RawConfigParser.read")
CreateUser.create_configuration(user, "salt", Path("path"))
read_mock.assert_called_once()
def test_create_user(args: argparse.Namespace, user: User) -> None:
"""
must create user
"""
args = _default_args(args)
generated = CreateUser.create_user(args, "salt")
assert generated.username == user.username
assert generated.check_credentials(user.password, "salt")
assert generated.access == user.access
def test_create_user_getpass(args: argparse.Namespace, mocker: MockerFixture) -> None:
"""
must create user and get password from command line
"""
args = _default_args(args)
args.password = None
getpass_mock = mocker.patch("getpass.getpass", return_value="password")
generated = CreateUser.create_user(args, "salt")
getpass_mock.assert_called_once()
assert generated.check_credentials("password", "salt")
def test_get_salt_read(configuration: Configuration) -> None:
"""
must read salt from configuration
"""
assert CreateUser.get_salt(configuration) == "salt"
def test_get_salt_generate(configuration: Configuration) -> None:
"""
must generate salt if it does not exist
"""
configuration.remove_option("auth", "salt")
salt = CreateUser.get_salt(configuration, 16)
assert salt
assert len(salt) == 16

View File

@ -6,6 +6,7 @@ from pytest_mock import MockerFixture
from ahriman.application.handlers import Handler from ahriman.application.handlers import Handler
from ahriman.models.build_status import BuildStatusEnum from ahriman.models.build_status import BuildStatusEnum
from ahriman.models.sign_settings import SignSettings from ahriman.models.sign_settings import SignSettings
from ahriman.models.user_access import UserAccess
def test_parser(parser: argparse.ArgumentParser) -> None: def test_parser(parser: argparse.ArgumentParser) -> None:
@ -83,6 +84,28 @@ def test_subparsers_config(parser: argparse.ArgumentParser) -> None:
assert args.unsafe assert args.unsafe
def test_subparsers_create_user(parser: argparse.ArgumentParser) -> None:
"""
create-user command must imply architecture, lock, no-log, no-report and unsafe
"""
args = parser.parse_args(["create-user", "username"])
assert args.architecture == [""]
assert args.lock is None
assert args.no_log
assert args.no_report
assert args.unsafe
def test_subparsers_create_user_option_role(parser: argparse.ArgumentParser) -> None:
"""
create-user command must convert role option to useraccess instance
"""
args = parser.parse_args(["create-user", "username"])
assert isinstance(args.role, UserAccess)
args = parser.parse_args(["create-user", "username", "--role", "write"])
assert isinstance(args.role, UserAccess)
def test_subparsers_init(parser: argparse.ArgumentParser) -> None: def test_subparsers_init(parser: argparse.ArgumentParser) -> None:
""" """
init command must imply no_report init command must imply no_report

View File

@ -11,7 +11,8 @@ from ahriman.core.status.watcher import Watcher
from ahriman.models.package import Package from ahriman.models.package import Package
from ahriman.models.package_description import PackageDescription from ahriman.models.package_description import PackageDescription
from ahriman.models.repository_paths import RepositoryPaths from ahriman.models.repository_paths import RepositoryPaths
from ahriman.models.user import User
from ahriman.models.user_access import UserAccess
T = TypeVar("T") T = TypeVar("T")
@ -158,6 +159,15 @@ def repository_paths(configuration: Configuration) -> RepositoryPaths:
root=configuration.getpath("repository", "root")) root=configuration.getpath("repository", "root"))
@pytest.fixture
def user() -> User:
"""
fixture for user descriptor
:return: user descriptor instance
"""
return User("user", "pa55w0rd", UserAccess.Status)
@pytest.fixture @pytest.fixture
def watcher(configuration: Configuration, mocker: MockerFixture) -> Watcher: def watcher(configuration: Configuration, mocker: MockerFixture) -> Watcher:
""" """

View File

@ -2,6 +2,7 @@ import pytest
from ahriman.core.alpm.pacman import Pacman from ahriman.core.alpm.pacman import Pacman
from ahriman.core.alpm.repo import Repo from ahriman.core.alpm.repo import Repo
from ahriman.core.auth import Auth
from ahriman.core.build_tools.task import Task from ahriman.core.build_tools.task import Task
from ahriman.core.configuration import Configuration from ahriman.core.configuration import Configuration
from ahriman.core.tree import Leaf from ahriman.core.tree import Leaf
@ -29,6 +30,15 @@ def leaf_python_schedule(package_python_schedule: Package) -> Leaf:
return Leaf(package_python_schedule, set()) return Leaf(package_python_schedule, set())
@pytest.fixture
def auth(configuration: Configuration) -> Auth:
"""
auth provider fixture
:return: auth service instance
"""
return Auth(configuration)
@pytest.fixture @pytest.fixture
def pacman(configuration: Configuration) -> Pacman: def pacman(configuration: Configuration) -> Pacman:
""" """

View File

@ -17,13 +17,21 @@ def test_load_dummy_client(configuration: Configuration) -> None:
def test_load_full_client(configuration: Configuration) -> None: def test_load_full_client(configuration: Configuration) -> None:
""" """
must load full client if no settings set must load full client if settings set
""" """
configuration.set("web", "host", "localhost") configuration.set("web", "host", "localhost")
configuration.set("web", "port", "8080") configuration.set("web", "port", "8080")
assert isinstance(Client.load(configuration), WebClient) assert isinstance(Client.load(configuration), WebClient)
def test_load_full_client_from_address(configuration: Configuration) -> None:
"""
must load full client if settings set
"""
configuration.set("web", "address", "http://localhost:8080")
assert isinstance(Client.load(configuration), WebClient)
def test_add(client: Client, package_ahriman: Package) -> None: def test_add(client: Client, package_ahriman: Package) -> None:
""" """
must process package addition without errors must process package addition without errors

View File

@ -5,10 +5,12 @@ import requests
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from requests import Response from requests import Response
from ahriman.core.configuration import Configuration
from ahriman.core.status.web_client import WebClient from ahriman.core.status.web_client import WebClient
from ahriman.models.build_status import BuildStatus, BuildStatusEnum from ahriman.models.build_status import BuildStatus, BuildStatusEnum
from ahriman.models.internal_status import InternalStatus from ahriman.models.internal_status import InternalStatus
from ahriman.models.package import Package from ahriman.models.package import Package
from ahriman.models.user import User
def test_ahriman_url(web_client: WebClient) -> None: def test_ahriman_url(web_client: WebClient) -> None:
@ -27,6 +29,60 @@ def test_status_url(web_client: WebClient) -> None:
assert web_client._status_url.endswith("/api/v1/status") assert web_client._status_url.endswith("/api/v1/status")
def test_parse_address(configuration: Configuration) -> None:
"""
must extract address correctly
"""
configuration.set("web", "host", "localhost")
configuration.set("web", "port", "8080")
assert WebClient.parse_address(configuration) == "http://localhost:8080"
configuration.set("web", "address", "http://localhost:8081")
assert WebClient.parse_address(configuration) == "http://localhost:8081"
def test_login(web_client: WebClient, user: User, mocker: MockerFixture) -> None:
"""
must login user
"""
web_client.user = user
requests_mock = mocker.patch("requests.Session.post")
payload = {
"username": user.username,
"password": user.password
}
web_client._login()
requests_mock.assert_called_with(pytest.helpers.anyvar(str, True), json=payload)
def test_login_failed(web_client: WebClient, user: User, mocker: MockerFixture) -> None:
"""
must suppress any exception happened during login
"""
web_client.user = user
mocker.patch("requests.Session.post", side_effect=Exception())
web_client._login()
def test_login_failed_http_error(web_client: WebClient, user: User, mocker: MockerFixture) -> None:
"""
must suppress any exception happened during login
"""
web_client.user = user
mocker.patch("requests.Session.post", side_effect=requests.exceptions.HTTPError())
web_client._login()
def test_login_skip(web_client: WebClient, mocker: MockerFixture) -> None:
"""
must skip login if no user set
"""
requests_mock = mocker.patch("requests.Session.post")
web_client._login()
requests_mock.assert_not_called()
def test_package_url(web_client: WebClient, package_ahriman: Package) -> None: def test_package_url(web_client: WebClient, package_ahriman: Package) -> None:
""" """
must generate package status correctly must generate package status correctly

View File

@ -0,0 +1,83 @@
from ahriman.core.auth import Auth
from ahriman.core.configuration import Configuration
from ahriman.models.user import User
from ahriman.models.user_access import UserAccess
def test_get_users(auth: Auth, configuration: Configuration) -> None:
"""
must return valid user list
"""
user_write = User("user_write", "pwd_write", UserAccess.Write)
write_section = Configuration.section_name("auth", user_write.access.value)
configuration.add_section(write_section)
configuration.set(write_section, user_write.username, user_write.password)
user_read = User("user_read", "pwd_read", UserAccess.Read)
read_section = Configuration.section_name("auth", user_read.access.value)
configuration.add_section(read_section)
configuration.set(read_section, user_read.username, user_read.password)
users = auth.get_users(configuration)
expected = {user_write.username: user_write, user_read.username: user_read}
assert users == expected
def test_check_credentials(auth: Auth, user: User) -> None:
"""
must return true for valid credentials
"""
current_password = user.password
user.password = user.generate_password(user.password, auth.salt)
auth.users[user.username] = user
assert auth.check_credentials(user.username, current_password)
assert not auth.check_credentials(user.username, user.password) # here password is hashed so it is invalid
def test_check_credentials_empty(auth: Auth) -> None:
"""
must reject on empty credentials
"""
assert not auth.check_credentials(None, "")
assert not auth.check_credentials("", None)
assert not auth.check_credentials(None, None)
def test_check_credentials_unknown(auth: Auth, user: User) -> None:
"""
must reject on unknown user
"""
assert not auth.check_credentials(user.username, user.password)
def test_is_safe_request(auth: Auth) -> None:
"""
must validate safe request
"""
# login and logout are always safe
assert auth.is_safe_request("/login")
assert auth.is_safe_request("/logout")
auth.allowed_paths.add("/safe")
auth.allowed_paths_groups.add("/unsafe/safe")
assert auth.is_safe_request("/safe")
assert not auth.is_safe_request("/unsafe")
assert auth.is_safe_request("/unsafe/safe")
assert auth.is_safe_request("/unsafe/safe/suffix")
def test_is_safe_request_empty(auth: Auth) -> None:
"""
must not allow requests without path
"""
assert not auth.is_safe_request(None)
assert not auth.is_safe_request("")
def test_verify_access(auth: Auth, user: User) -> None:
"""
must verify user access
"""
auth.users[user.username] = user
assert auth.verify_access(user.username, user.access)
assert not auth.verify_access(user.username, UserAccess.Write)

View File

@ -0,0 +1,62 @@
from ahriman.models.user import User
from ahriman.models.user_access import UserAccess
def test_from_option(user: User) -> None:
"""
must generate user from options
"""
assert User.from_option(user.username, user.password) == user
# default is status access
user.access = UserAccess.Write
assert User.from_option(user.username, user.password) != user
def test_from_option_empty() -> None:
"""
must return nothing if settings are missed
"""
assert User.from_option(None, "") is None
assert User.from_option("", None) is None
assert User.from_option(None, None) is None
def test_check_credentials_generate_password(user: User) -> None:
"""
must generate and validate user password
"""
current_password = user.password
user.password = user.generate_password(current_password, "salt")
assert user.check_credentials(current_password, "salt")
assert not user.check_credentials(current_password, "salt1")
assert not user.check_credentials(user.password, "salt")
def test_verify_access_read(user: User) -> None:
"""
user with read access must be able to only request read
"""
user.access = UserAccess.Read
assert user.verify_access(UserAccess.Read)
assert not user.verify_access(UserAccess.Write)
assert not user.verify_access(UserAccess.Status)
def test_verify_access_status(user: User) -> None:
"""
user with status access must be able to only request status
"""
user.access = UserAccess.Status
assert not user.verify_access(UserAccess.Read)
assert not user.verify_access(UserAccess.Write)
assert user.verify_access(UserAccess.Status)
def test_verify_access_write(user: User) -> None:
"""
user with write access must be able to do anything
"""
user.access = UserAccess.Write
assert user.verify_access(UserAccess.Read)
assert user.verify_access(UserAccess.Write)
assert user.verify_access(UserAccess.Status)

View File

View File

@ -17,3 +17,16 @@ def application(configuration: Configuration, mocker: MockerFixture) -> web.Appl
""" """
mocker.patch("pathlib.Path.mkdir") mocker.patch("pathlib.Path.mkdir")
return setup_service("x86_64", configuration) return setup_service("x86_64", configuration)
@pytest.fixture
def application_with_auth(configuration: Configuration, mocker: MockerFixture) -> web.Application:
"""
application fixture with auth enabled
:param configuration: configuration fixture
:param mocker: mocker object
:return: application test instance
"""
configuration.set("web", "auth", "yes")
mocker.patch("pathlib.Path.mkdir")
return setup_service("x86_64", configuration)

View File

@ -2,8 +2,11 @@ import pytest
from collections import namedtuple from collections import namedtuple
from ahriman.core.configuration import Configuration
from ahriman.models.user import User
from ahriman.web.middlewares.auth_handler import AuthorizationPolicy
_request = namedtuple("_request", ["path"]) _request = namedtuple("_request", ["path", "method"])
@pytest.fixture @pytest.fixture
@ -12,4 +15,15 @@ def aiohttp_request() -> _request:
fixture for aiohttp like object fixture for aiohttp like object
:return: aiohttp like request test instance :return: aiohttp like request test instance
""" """
return _request("path") return _request("path", "GET")
@pytest.fixture
def authorization_policy(configuration: Configuration, user: User) -> AuthorizationPolicy:
"""
fixture for authorization policy
:return: authorization policy fixture
"""
policy = AuthorizationPolicy(configuration)
policy.validator.users = {user.username: user}
return policy

View File

@ -0,0 +1,105 @@
from aiohttp import web
from pytest_mock import MockerFixture
from typing import Any
from unittest.mock import AsyncMock
from ahriman.core.configuration import Configuration
from ahriman.models.user import User
from ahriman.models.user_access import UserAccess
from ahriman.web.middlewares.auth_handler import auth_handler, AuthorizationPolicy, setup_auth
async def test_authorized_userid(authorization_policy: AuthorizationPolicy, user: User) -> None:
"""
must return authorized user id
"""
assert await authorization_policy.authorized_userid(user.username) == user.username
assert await authorization_policy.authorized_userid("some random name") is None
async def test_permits(authorization_policy: AuthorizationPolicy, user: User, mocker: MockerFixture) -> None:
"""
must call validator check
"""
safe_request_mock = mocker.patch("ahriman.core.auth.Auth.is_safe_request", return_value=False)
verify_access_mock = mocker.patch("ahriman.core.auth.Auth.verify_access", return_value=True)
assert await authorization_policy.permits(user.username, user.access, "/endpoint")
safe_request_mock.assert_called_with("/endpoint")
verify_access_mock.assert_called_with(user.username, user.access)
async def test_permits_safe(authorization_policy: AuthorizationPolicy, user: User, mocker: MockerFixture) -> None:
"""
must call validator check
"""
safe_request_mock = mocker.patch("ahriman.core.auth.Auth.is_safe_request", return_value=True)
verify_access_mock = mocker.patch("ahriman.core.auth.Auth.verify_access")
assert await authorization_policy.permits(user.username, user.access, "/endpoint")
safe_request_mock.assert_called_with("/endpoint")
verify_access_mock.assert_not_called()
async def test_auth_handler_api(aiohttp_request: Any, mocker: MockerFixture) -> None:
"""
must ask for status permission for api calls
"""
aiohttp_request = aiohttp_request._replace(path="/api")
request_handler = AsyncMock()
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
handler = auth_handler()
await handler(aiohttp_request, request_handler)
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Status, aiohttp_request.path)
async def test_auth_handler_api_post(aiohttp_request: Any, mocker: MockerFixture) -> None:
"""
must ask for status permission for api calls with POST
"""
aiohttp_request = aiohttp_request._replace(path="/api", method="POST")
request_handler = AsyncMock()
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
handler = auth_handler()
await handler(aiohttp_request, request_handler)
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Status, aiohttp_request.path)
async def test_auth_handler_read(aiohttp_request: Any, mocker: MockerFixture) -> None:
"""
must ask for read permission for api calls with GET
"""
for method in ("GET", "HEAD", "OPTIONS"):
aiohttp_request = aiohttp_request._replace(method=method)
request_handler = AsyncMock()
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
handler = auth_handler()
await handler(aiohttp_request, request_handler)
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Read, aiohttp_request.path)
async def test_auth_handler_write(aiohttp_request: Any, mocker: MockerFixture) -> None:
"""
must ask for read permission for api calls with POST
"""
for method in ("CONNECT", "DELETE", "PATCH", "POST", "PUT", "TRACE"):
aiohttp_request = aiohttp_request._replace(method=method)
request_handler = AsyncMock()
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
handler = auth_handler()
await handler(aiohttp_request, request_handler)
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Write, aiohttp_request.path)
def test_setup_auth(application: web.Application, configuration: Configuration, mocker: MockerFixture) -> None:
"""
must setup authorization
"""
aiohttp_security_setup_mock = mocker.patch("aiohttp_security.setup")
application = setup_auth(application, configuration)
assert application.get("validator") is not None
aiohttp_security_setup_mock.assert_called_once()

View File

@ -41,3 +41,16 @@ def test_run(application: web.Application, mocker: MockerFixture) -> None:
run_server(application) run_server(application)
run_application_mock.assert_called_with(application, host="127.0.0.1", port=port, run_application_mock.assert_called_with(application, host="127.0.0.1", port=port,
handle_signals=False, access_log=pytest.helpers.anyvar(int)) handle_signals=False, access_log=pytest.helpers.anyvar(int))
def test_run_with_auth(application_with_auth: web.Application, mocker: MockerFixture) -> None:
"""
must run application
"""
port = 8080
application_with_auth["configuration"].set("web", "port", str(port))
run_application_mock = mocker.patch("aiohttp.web.run_app")
run_server(application_with_auth)
run_application_mock.assert_called_with(application_with_auth, host="127.0.0.1", port=port,
handle_signals=False, access_log=pytest.helpers.anyvar(int))

View File

@ -0,0 +1,48 @@
from aiohttp.test_utils import TestClient
from pytest_mock import MockerFixture
from ahriman.core.auth import Auth
from ahriman.core.configuration import Configuration
from ahriman.models.user import User
async def test_post(client: TestClient, configuration: Configuration, user: User, mocker: MockerFixture) -> None:
"""
must login user correctly
"""
client.app["validator"] = Auth(configuration)
payload = {"username": user.username, "password": user.password}
remember_patch = mocker.patch("aiohttp_security.remember")
mocker.patch("ahriman.core.auth.Auth.check_credentials", return_value=True)
post_response = await client.post("/login", json=payload)
assert post_response.status == 200
post_response = await client.post("/login", data=payload)
assert post_response.status == 200
remember_patch.assert_called()
async def test_post_skip(client: TestClient, user: User, mocker: MockerFixture) -> None:
"""
must process if no auth configured
"""
payload = {"username": user.username, "password": user.password}
post_response = await client.post("/login", json=payload)
remember_patch = mocker.patch("aiohttp_security.remember")
assert post_response.status == 200
remember_patch.assert_not_called()
async def test_post_unauthorized(client: TestClient, configuration: Configuration, user: User,
mocker: MockerFixture) -> None:
"""
must return unauthorized on invalid auth
"""
client.app["validator"] = Auth(configuration)
payload = {"username": user.username, "password": user.password}
mocker.patch("ahriman.core.auth.Auth.check_credentials", return_value=False)
post_response = await client.post("/login", json=payload)
assert post_response.status == 401

View File

@ -0,0 +1,38 @@
from aiohttp.test_utils import TestClient
from aiohttp.web import HTTPUnauthorized
from pytest_mock import MockerFixture
async def test_post(client: TestClient, mocker: MockerFixture) -> None:
"""
must logout user correctly
"""
mocker.patch("aiohttp_security.check_authorized")
forget_patch = mocker.patch("aiohttp_security.forget")
post_response = await client.post("/logout")
assert post_response.status == 200
forget_patch.assert_called_once()
async def test_post_unauthorized(client: TestClient, mocker: MockerFixture) -> None:
"""
must raise exception if unauthorized
"""
mocker.patch("aiohttp_security.check_authorized", side_effect=HTTPUnauthorized())
forget_patch = mocker.patch("aiohttp_security.forget")
post_response = await client.post("/logout")
assert post_response.status == 401
forget_patch.assert_not_called()
async def test_post_disabled(client: TestClient, mocker: MockerFixture) -> None:
"""
must raise exception if auth is disabled
"""
forget_patch = mocker.patch("aiohttp_security.forget")
post_response = await client.post("/logout")
assert post_response.status == 401
forget_patch.assert_not_called()

View File

@ -8,6 +8,9 @@ database = /var/lib/pacman
repositories = core extra community multilib repositories = core extra community multilib
root = / root = /
[auth]
salt = salt
[build] [build]
archbuild_flags = archbuild_flags =
build_command = extra-x86_64-build build_command = extra-x86_64-build
@ -54,5 +57,6 @@ region = eu-central-1
secret_key = secret_key =
[web] [web]
auth = no
host = 127.0.0.1 host = 127.0.0.1
templates = ../web/templates templates = ../web/templates