mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-07-23 10:49:55 +00:00
add login annd logout to index also improve auth
This commit is contained in:
@ -1,11 +1,10 @@
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any, Type, TypeVar
|
||||
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.status.watcher import Watcher
|
||||
from ahriman.models.package import Package
|
||||
@ -44,6 +43,15 @@ def anyvar(cls: Type[T], strict: bool = False) -> T:
|
||||
|
||||
|
||||
# generic fixtures
|
||||
@pytest.fixture
|
||||
def auth(configuration: Configuration) -> Auth:
|
||||
"""
|
||||
auth provider fixture
|
||||
:return: auth service instance
|
||||
"""
|
||||
return Auth(configuration)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def configuration(resource_path_root: Path) -> Configuration:
|
||||
"""
|
||||
|
14
tests/ahriman/core/auth/conftest.py
Normal file
14
tests/ahriman/core/auth/conftest.py
Normal file
@ -0,0 +1,14 @@
|
||||
import pytest
|
||||
|
||||
from ahriman.core.auth.mapping_auth import MappingAuth
|
||||
from ahriman.core.configuration import Configuration
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mapping_auth(configuration: Configuration) -> MappingAuth:
|
||||
"""
|
||||
auth provider fixture
|
||||
:return: auth service instance
|
||||
"""
|
||||
configuration.set("web", "auth", "yes")
|
||||
return MappingAuth(configuration)
|
81
tests/ahriman/core/auth/test_auth.py
Normal file
81
tests/ahriman/core/auth/test_auth.py
Normal file
@ -0,0 +1,81 @@
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.auth.mapping_auth import MappingAuth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
|
||||
def test_load_dummy(configuration: Configuration) -> None:
|
||||
"""
|
||||
must load dummy validator if authorization is not enabled
|
||||
"""
|
||||
configuration.set("web", "auth", "no")
|
||||
auth = Auth.load(configuration)
|
||||
assert isinstance(auth, Auth)
|
||||
|
||||
|
||||
def test_load_dummy_empty(configuration: Configuration) -> None:
|
||||
"""
|
||||
must load dummy validator if no option set
|
||||
"""
|
||||
auth = Auth.load(configuration)
|
||||
assert isinstance(auth, Auth)
|
||||
|
||||
|
||||
def test_load_mapping(configuration: Configuration) -> None:
|
||||
"""
|
||||
must load mapping validator if option set
|
||||
"""
|
||||
configuration.set("web", "auth", "yes")
|
||||
auth = Auth.load(configuration)
|
||||
assert isinstance(auth, MappingAuth)
|
||||
|
||||
|
||||
def test_check_credentials(auth: Auth, user: User) -> None:
|
||||
"""
|
||||
must pass any credentials
|
||||
"""
|
||||
assert auth.check_credentials(user.username, user.password)
|
||||
assert auth.check_credentials(None, "")
|
||||
assert auth.check_credentials("", None)
|
||||
assert auth.check_credentials(None, None)
|
||||
|
||||
|
||||
def test_is_safe_request(auth: Auth) -> None:
|
||||
"""
|
||||
must validate safe request
|
||||
"""
|
||||
# login and logout are always safe
|
||||
assert auth.is_safe_request("/login")
|
||||
assert auth.is_safe_request("/logout")
|
||||
|
||||
auth.allowed_paths.add("/safe")
|
||||
auth.allowed_paths_groups.add("/unsafe/safe")
|
||||
|
||||
assert auth.is_safe_request("/safe")
|
||||
assert not auth.is_safe_request("/unsafe")
|
||||
assert auth.is_safe_request("/unsafe/safe")
|
||||
assert auth.is_safe_request("/unsafe/safe/suffix")
|
||||
|
||||
|
||||
def test_is_safe_request_empty(auth: Auth) -> None:
|
||||
"""
|
||||
must not allow requests without path
|
||||
"""
|
||||
assert not auth.is_safe_request(None)
|
||||
assert not auth.is_safe_request("")
|
||||
|
||||
|
||||
def test_known_username(auth: Auth, user: User) -> None:
|
||||
"""
|
||||
must allow any username
|
||||
"""
|
||||
assert auth.known_username(user.username)
|
||||
|
||||
|
||||
def test_verify_access(auth: Auth, user: User) -> None:
|
||||
"""
|
||||
must allow any access
|
||||
"""
|
||||
assert auth.verify_access(user.username, user.access)
|
||||
assert auth.verify_access(user.username, UserAccess.Write)
|
102
tests/ahriman/core/auth/test_helpers.py
Normal file
102
tests/ahriman/core/auth/test_helpers.py
Normal file
@ -0,0 +1,102 @@
|
||||
import importlib
|
||||
import sys
|
||||
|
||||
import ahriman.core.auth.helpers as helpers
|
||||
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
|
||||
def test_import_aiohttp_security() -> None:
|
||||
"""
|
||||
must import aiohttp_security correctly
|
||||
"""
|
||||
assert helpers._has_aiohttp_security
|
||||
|
||||
|
||||
def test_import_aiohttp_security_missing(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must set missing flag if no aiohttp_security module found
|
||||
"""
|
||||
mocker.patch.dict(sys.modules, {"aiohttp_security": None})
|
||||
importlib.reload(helpers)
|
||||
assert not helpers._has_aiohttp_security
|
||||
|
||||
|
||||
async def test_authorized_userid_dummy(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must not call authorized_userid from library if not enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", False)
|
||||
authorized_userid_mock = mocker.patch("aiohttp_security.authorized_userid")
|
||||
await helpers.authorized_userid()
|
||||
authorized_userid_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_authorized_userid_library(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call authorized_userid from library if enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", True)
|
||||
authorized_userid_mock = mocker.patch("aiohttp_security.authorized_userid")
|
||||
await helpers.authorized_userid()
|
||||
authorized_userid_mock.assert_called_once()
|
||||
|
||||
|
||||
async def test_check_authorized_dummy(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must not call check_authorized from library if not enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", False)
|
||||
check_authorized_mock = mocker.patch("aiohttp_security.check_authorized")
|
||||
await helpers.check_authorized()
|
||||
check_authorized_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_check_authorized_library(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call check_authorized from library if enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", True)
|
||||
check_authorized_mock = mocker.patch("aiohttp_security.check_authorized")
|
||||
await helpers.check_authorized()
|
||||
check_authorized_mock.assert_called_once()
|
||||
|
||||
|
||||
async def test_forget_dummy(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must not call forget from library if not enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", False)
|
||||
forget_mock = mocker.patch("aiohttp_security.forget")
|
||||
await helpers.forget()
|
||||
forget_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_forget_library(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call forget from library if enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", True)
|
||||
forget_mock = mocker.patch("aiohttp_security.forget")
|
||||
await helpers.forget()
|
||||
forget_mock.assert_called_once()
|
||||
|
||||
|
||||
async def test_remember_dummy(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must not call remember from library if not enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", False)
|
||||
remember_mock = mocker.patch("aiohttp_security.remember")
|
||||
await helpers.remember()
|
||||
remember_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_remember_library(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call remember from library if enabled
|
||||
"""
|
||||
mocker.patch.object(helpers, "_has_aiohttp_security", True)
|
||||
remember_mock = mocker.patch("aiohttp_security.remember")
|
||||
await helpers.remember()
|
||||
remember_mock.assert_called_once()
|
67
tests/ahriman/core/auth/test_mapping_auth.py
Normal file
67
tests/ahriman/core/auth/test_mapping_auth.py
Normal file
@ -0,0 +1,67 @@
|
||||
from ahriman.core.auth.mapping_auth import MappingAuth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
|
||||
def test_get_users(mapping_auth: MappingAuth, configuration: Configuration) -> None:
|
||||
"""
|
||||
must return valid user list
|
||||
"""
|
||||
user_write = User("user_write", "pwd_write", UserAccess.Write)
|
||||
write_section = Configuration.section_name("auth", user_write.access.value)
|
||||
configuration.add_section(write_section)
|
||||
configuration.set(write_section, user_write.username, user_write.password)
|
||||
user_read = User("user_read", "pwd_read", UserAccess.Read)
|
||||
read_section = Configuration.section_name("auth", user_read.access.value)
|
||||
configuration.add_section(read_section)
|
||||
configuration.set(read_section, user_read.username, user_read.password)
|
||||
|
||||
users = mapping_auth.get_users(configuration)
|
||||
expected = {user_write.username: user_write, user_read.username: user_read}
|
||||
assert users == expected
|
||||
|
||||
|
||||
def test_check_credentials(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must return true for valid credentials
|
||||
"""
|
||||
current_password = user.password
|
||||
user.password = user.generate_password(user.password, mapping_auth.salt)
|
||||
mapping_auth.users[user.username] = user
|
||||
assert mapping_auth.check_credentials(user.username, current_password)
|
||||
assert not mapping_auth.check_credentials(user.username, user.password) # here password is hashed so it is invalid
|
||||
|
||||
|
||||
def test_check_credentials_empty(mapping_auth: MappingAuth) -> None:
|
||||
"""
|
||||
must reject on empty credentials
|
||||
"""
|
||||
assert not mapping_auth.check_credentials(None, "")
|
||||
assert not mapping_auth.check_credentials("", None)
|
||||
assert not mapping_auth.check_credentials(None, None)
|
||||
|
||||
|
||||
def test_check_credentials_unknown(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must reject on unknown user
|
||||
"""
|
||||
assert not mapping_auth.check_credentials(user.username, user.password)
|
||||
|
||||
|
||||
def test_known_username(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must allow only known users
|
||||
"""
|
||||
mapping_auth.users[user.username] = user
|
||||
assert mapping_auth.known_username(user.username)
|
||||
assert not mapping_auth.known_username(user.password)
|
||||
|
||||
|
||||
def test_verify_access(mapping_auth: MappingAuth, user: User) -> None:
|
||||
"""
|
||||
must verify user access
|
||||
"""
|
||||
mapping_auth.users[user.username] = user
|
||||
assert mapping_auth.verify_access(user.username, user.access)
|
||||
assert not mapping_auth.verify_access(user.username, UserAccess.Write)
|
@ -2,7 +2,7 @@ import pytest
|
||||
|
||||
from ahriman.core.alpm.pacman import Pacman
|
||||
from ahriman.core.alpm.repo import Repo
|
||||
from ahriman.core.auth import Auth
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.build_tools.task import Task
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.tree import Leaf
|
||||
@ -30,15 +30,6 @@ def leaf_python_schedule(package_python_schedule: Package) -> Leaf:
|
||||
return Leaf(package_python_schedule, set())
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def auth(configuration: Configuration) -> Auth:
|
||||
"""
|
||||
auth provider fixture
|
||||
:return: auth service instance
|
||||
"""
|
||||
return Auth(configuration)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pacman(configuration: Configuration) -> Pacman:
|
||||
"""
|
||||
|
@ -1,83 +0,0 @@
|
||||
from ahriman.core.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
|
||||
def test_get_users(auth: Auth, configuration: Configuration) -> None:
|
||||
"""
|
||||
must return valid user list
|
||||
"""
|
||||
user_write = User("user_write", "pwd_write", UserAccess.Write)
|
||||
write_section = Configuration.section_name("auth", user_write.access.value)
|
||||
configuration.add_section(write_section)
|
||||
configuration.set(write_section, user_write.username, user_write.password)
|
||||
user_read = User("user_read", "pwd_read", UserAccess.Read)
|
||||
read_section = Configuration.section_name("auth", user_read.access.value)
|
||||
configuration.add_section(read_section)
|
||||
configuration.set(read_section, user_read.username, user_read.password)
|
||||
|
||||
users = auth.get_users(configuration)
|
||||
expected = {user_write.username: user_write, user_read.username: user_read}
|
||||
assert users == expected
|
||||
|
||||
|
||||
def test_check_credentials(auth: Auth, user: User) -> None:
|
||||
"""
|
||||
must return true for valid credentials
|
||||
"""
|
||||
current_password = user.password
|
||||
user.password = user.generate_password(user.password, auth.salt)
|
||||
auth.users[user.username] = user
|
||||
assert auth.check_credentials(user.username, current_password)
|
||||
assert not auth.check_credentials(user.username, user.password) # here password is hashed so it is invalid
|
||||
|
||||
|
||||
def test_check_credentials_empty(auth: Auth) -> None:
|
||||
"""
|
||||
must reject on empty credentials
|
||||
"""
|
||||
assert not auth.check_credentials(None, "")
|
||||
assert not auth.check_credentials("", None)
|
||||
assert not auth.check_credentials(None, None)
|
||||
|
||||
|
||||
def test_check_credentials_unknown(auth: Auth, user: User) -> None:
|
||||
"""
|
||||
must reject on unknown user
|
||||
"""
|
||||
assert not auth.check_credentials(user.username, user.password)
|
||||
|
||||
|
||||
def test_is_safe_request(auth: Auth) -> None:
|
||||
"""
|
||||
must validate safe request
|
||||
"""
|
||||
# login and logout are always safe
|
||||
assert auth.is_safe_request("/login")
|
||||
assert auth.is_safe_request("/logout")
|
||||
|
||||
auth.allowed_paths.add("/safe")
|
||||
auth.allowed_paths_groups.add("/unsafe/safe")
|
||||
|
||||
assert auth.is_safe_request("/safe")
|
||||
assert not auth.is_safe_request("/unsafe")
|
||||
assert auth.is_safe_request("/unsafe/safe")
|
||||
assert auth.is_safe_request("/unsafe/safe/suffix")
|
||||
|
||||
|
||||
def test_is_safe_request_empty(auth: Auth) -> None:
|
||||
"""
|
||||
must not allow requests without path
|
||||
"""
|
||||
assert not auth.is_safe_request(None)
|
||||
assert not auth.is_safe_request("")
|
||||
|
||||
|
||||
def test_verify_access(auth: Auth, user: User) -> None:
|
||||
"""
|
||||
must verify user access
|
||||
"""
|
||||
auth.users[user.username] = user
|
||||
assert auth.verify_access(user.username, user.access)
|
||||
assert not auth.verify_access(user.username, UserAccess.Write)
|
@ -61,6 +61,8 @@ def test_get_local_files(s3: S3, resource_path_root: Path) -> None:
|
||||
Path("models/package_yay_srcinfo"),
|
||||
Path("web/templates/search-line.jinja2"),
|
||||
Path("web/templates/build-status.jinja2"),
|
||||
Path("web/templates/login-form.jinja2"),
|
||||
Path("web/templates/login-form-hide.jinja2"),
|
||||
Path("web/templates/repo-index.jinja2"),
|
||||
Path("web/templates/sorttable.jinja2"),
|
||||
Path("web/templates/style.jinja2"),
|
||||
|
@ -3,7 +3,10 @@ import pytest
|
||||
from aiohttp import web
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
import ahriman.core.auth.helpers
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.web.web import setup_service
|
||||
|
||||
|
||||
@ -15,18 +18,26 @@ def application(configuration: Configuration, mocker: MockerFixture) -> web.Appl
|
||||
:param mocker: mocker object
|
||||
:return: application test instance
|
||||
"""
|
||||
mocker.patch.object(ahriman.core.auth.helpers, "_has_aiohttp_security", False)
|
||||
mocker.patch("pathlib.Path.mkdir")
|
||||
return setup_service("x86_64", configuration)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def application_with_auth(configuration: Configuration, mocker: MockerFixture) -> web.Application:
|
||||
def application_with_auth(configuration: Configuration, user: User, mocker: MockerFixture) -> web.Application:
|
||||
"""
|
||||
application fixture with auth enabled
|
||||
:param configuration: configuration fixture
|
||||
:param user: user descriptor fixture
|
||||
:param mocker: mocker object
|
||||
:return: application test instance
|
||||
"""
|
||||
configuration.set("web", "auth", "yes")
|
||||
mocker.patch.object(ahriman.core.auth.helpers, "_has_aiohttp_security", True)
|
||||
mocker.patch("pathlib.Path.mkdir")
|
||||
return setup_service("x86_64", configuration)
|
||||
application = setup_service("x86_64", configuration)
|
||||
|
||||
generated = User(user.username, user.generate_password(user.password, application["validator"].salt), user.access)
|
||||
application["validator"].users[generated.username] = generated
|
||||
|
||||
return application
|
||||
|
@ -2,6 +2,7 @@ import pytest
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.web.middlewares.auth_handler import AuthorizationPolicy
|
||||
@ -24,6 +25,8 @@ def authorization_policy(configuration: Configuration, user: User) -> Authorizat
|
||||
fixture for authorization policy
|
||||
:return: authorization policy fixture
|
||||
"""
|
||||
policy = AuthorizationPolicy(configuration)
|
||||
configuration.set("web", "auth", "yes")
|
||||
validator = Auth.load(configuration)
|
||||
policy = AuthorizationPolicy(validator)
|
||||
policy.validator.users = {user.username: user}
|
||||
return policy
|
||||
|
@ -1,8 +1,9 @@
|
||||
from aiohttp import web
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
@ -17,89 +18,83 @@ async def test_authorized_userid(authorization_policy: AuthorizationPolicy, user
|
||||
assert await authorization_policy.authorized_userid("some random name") is None
|
||||
|
||||
|
||||
async def test_permits(authorization_policy: AuthorizationPolicy, user: User, mocker: MockerFixture) -> None:
|
||||
async def test_permits(authorization_policy: AuthorizationPolicy, user: User) -> None:
|
||||
"""
|
||||
must call validator check
|
||||
"""
|
||||
safe_request_mock = mocker.patch("ahriman.core.auth.Auth.is_safe_request", return_value=False)
|
||||
verify_access_mock = mocker.patch("ahriman.core.auth.Auth.verify_access", return_value=True)
|
||||
authorization_policy.validator = MagicMock()
|
||||
authorization_policy.validator.verify_access.return_value = True
|
||||
|
||||
assert await authorization_policy.permits(user.username, user.access, "/endpoint")
|
||||
safe_request_mock.assert_called_with("/endpoint")
|
||||
verify_access_mock.assert_called_with(user.username, user.access)
|
||||
authorization_policy.validator.verify_access.assert_called_with(user.username, user.access)
|
||||
|
||||
|
||||
async def test_permits_safe(authorization_policy: AuthorizationPolicy, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must call validator check
|
||||
"""
|
||||
safe_request_mock = mocker.patch("ahriman.core.auth.Auth.is_safe_request", return_value=True)
|
||||
verify_access_mock = mocker.patch("ahriman.core.auth.Auth.verify_access")
|
||||
|
||||
assert await authorization_policy.permits(user.username, user.access, "/endpoint")
|
||||
safe_request_mock.assert_called_with("/endpoint")
|
||||
verify_access_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_auth_handler_api(aiohttp_request: Any, mocker: MockerFixture) -> None:
|
||||
async def test_auth_handler_api(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for status permission for api calls
|
||||
"""
|
||||
aiohttp_request = aiohttp_request._replace(path="/api")
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler()
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Status, aiohttp_request.path)
|
||||
|
||||
|
||||
async def test_auth_handler_api_post(aiohttp_request: Any, mocker: MockerFixture) -> None:
|
||||
async def test_auth_handler_api_post(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for status permission for api calls with POST
|
||||
"""
|
||||
aiohttp_request = aiohttp_request._replace(path="/api", method="POST")
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler()
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Status, aiohttp_request.path)
|
||||
|
||||
|
||||
async def test_auth_handler_read(aiohttp_request: Any, mocker: MockerFixture) -> None:
|
||||
async def test_auth_handler_read(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for read permission for api calls with GET
|
||||
"""
|
||||
for method in ("GET", "HEAD", "OPTIONS"):
|
||||
aiohttp_request = aiohttp_request._replace(method=method)
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler()
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Read, aiohttp_request.path)
|
||||
|
||||
|
||||
async def test_auth_handler_write(aiohttp_request: Any, mocker: MockerFixture) -> None:
|
||||
async def test_auth_handler_write(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for read permission for api calls with POST
|
||||
"""
|
||||
for method in ("CONNECT", "DELETE", "PATCH", "POST", "PUT", "TRACE"):
|
||||
aiohttp_request = aiohttp_request._replace(method=method)
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler()
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Write, aiohttp_request.path)
|
||||
|
||||
|
||||
def test_setup_auth(application: web.Application, configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
def test_setup_auth(
|
||||
application_with_auth: web.Application,
|
||||
configuration: Configuration,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must setup authorization
|
||||
"""
|
||||
aiohttp_security_setup_mock = mocker.patch("aiohttp_security.setup")
|
||||
application = setup_auth(application, configuration)
|
||||
application = setup_auth(application_with_auth, configuration)
|
||||
assert application.get("validator") is not None
|
||||
aiohttp_security_setup_mock.assert_called_once()
|
||||
|
@ -20,3 +20,18 @@ def client(application: web.Application, loop: BaseEventLoop,
|
||||
"""
|
||||
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
||||
return loop.run_until_complete(aiohttp_client(application))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client_with_auth(application_with_auth: web.Application, loop: BaseEventLoop,
|
||||
aiohttp_client: Any, mocker: MockerFixture) -> TestClient:
|
||||
"""
|
||||
web client fixture with full authorization functions
|
||||
:param application_with_auth: application fixture
|
||||
:param loop: context event loop
|
||||
:param aiohttp_client: aiohttp client fixture
|
||||
:param mocker: mocker object
|
||||
:return: web client test instance
|
||||
"""
|
||||
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
||||
return loop.run_until_complete(aiohttp_client(application_with_auth))
|
||||
|
@ -1,19 +1,28 @@
|
||||
from pytest_aiohttp import TestClient
|
||||
|
||||
|
||||
async def test_get(client: TestClient) -> None:
|
||||
async def test_get(client_with_auth: TestClient) -> None:
|
||||
"""
|
||||
must generate status page correctly (/)
|
||||
"""
|
||||
response = await client_with_auth.get("/")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
||||
|
||||
async def test_get_index(client_with_auth: TestClient) -> None:
|
||||
"""
|
||||
must generate status page correctly (/index.html)
|
||||
"""
|
||||
response = await client_with_auth.get("/index.html")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
||||
|
||||
async def test_get_without_auth(client: TestClient) -> None:
|
||||
"""
|
||||
must use dummy authorized_userid function in case if no security library installed
|
||||
"""
|
||||
response = await client.get("/")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
||||
|
||||
async def test_get_index(client: TestClient) -> None:
|
||||
"""
|
||||
must generate status page correctly (/index.html)
|
||||
"""
|
||||
response = await client.get("/index.html")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
@ -1,48 +1,41 @@
|
||||
from aiohttp.test_utils import TestClient
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from ahriman.core.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
|
||||
|
||||
async def test_post(client: TestClient, configuration: Configuration, user: User, mocker: MockerFixture) -> None:
|
||||
async def test_post(client_with_auth: TestClient, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must login user correctly
|
||||
"""
|
||||
client.app["validator"] = Auth(configuration)
|
||||
payload = {"username": user.username, "password": user.password}
|
||||
remember_patch = mocker.patch("aiohttp_security.remember")
|
||||
mocker.patch("ahriman.core.auth.Auth.check_credentials", return_value=True)
|
||||
remember_mock = mocker.patch("aiohttp_security.remember")
|
||||
|
||||
post_response = await client.post("/login", json=payload)
|
||||
post_response = await client_with_auth.post("/login", json=payload)
|
||||
assert post_response.status == 200
|
||||
|
||||
post_response = await client.post("/login", data=payload)
|
||||
post_response = await client_with_auth.post("/login", data=payload)
|
||||
assert post_response.status == 200
|
||||
|
||||
remember_patch.assert_called()
|
||||
remember_mock.assert_called()
|
||||
|
||||
|
||||
async def test_post_skip(client: TestClient, user: User, mocker: MockerFixture) -> None:
|
||||
async def test_post_skip(client: TestClient, user: User) -> None:
|
||||
"""
|
||||
must process if no auth configured
|
||||
"""
|
||||
payload = {"username": user.username, "password": user.password}
|
||||
post_response = await client.post("/login", json=payload)
|
||||
remember_patch = mocker.patch("aiohttp_security.remember")
|
||||
assert post_response.status == 200
|
||||
remember_patch.assert_not_called()
|
||||
|
||||
|
||||
async def test_post_unauthorized(client: TestClient, configuration: Configuration, user: User,
|
||||
mocker: MockerFixture) -> None:
|
||||
async def test_post_unauthorized(client_with_auth: TestClient, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must return unauthorized on invalid auth
|
||||
"""
|
||||
client.app["validator"] = Auth(configuration)
|
||||
payload = {"username": user.username, "password": user.password}
|
||||
mocker.patch("ahriman.core.auth.Auth.check_credentials", return_value=False)
|
||||
payload = {"username": user.username, "password": ""}
|
||||
remember_mock = mocker.patch("aiohttp_security.remember")
|
||||
|
||||
post_response = await client.post("/login", json=payload)
|
||||
post_response = await client_with_auth.post("/login", json=payload)
|
||||
assert post_response.status == 401
|
||||
remember_mock.assert_not_called()
|
||||
|
@ -3,36 +3,33 @@ from aiohttp.web import HTTPUnauthorized
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
|
||||
async def test_post(client: TestClient, mocker: MockerFixture) -> None:
|
||||
async def test_post(client_with_auth: TestClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must logout user correctly
|
||||
"""
|
||||
mocker.patch("aiohttp_security.check_authorized")
|
||||
forget_patch = mocker.patch("aiohttp_security.forget")
|
||||
forget_mock = mocker.patch("aiohttp_security.forget")
|
||||
|
||||
post_response = await client.post("/logout")
|
||||
post_response = await client_with_auth.post("/logout")
|
||||
assert post_response.status == 200
|
||||
forget_patch.assert_called_once()
|
||||
forget_mock.assert_called_once()
|
||||
|
||||
|
||||
async def test_post_unauthorized(client: TestClient, mocker: MockerFixture) -> None:
|
||||
async def test_post_unauthorized(client_with_auth: TestClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must raise exception if unauthorized
|
||||
"""
|
||||
mocker.patch("aiohttp_security.check_authorized", side_effect=HTTPUnauthorized())
|
||||
forget_patch = mocker.patch("aiohttp_security.forget")
|
||||
forget_mock = mocker.patch("aiohttp_security.forget")
|
||||
|
||||
post_response = await client.post("/logout")
|
||||
post_response = await client_with_auth.post("/logout")
|
||||
assert post_response.status == 401
|
||||
forget_patch.assert_not_called()
|
||||
forget_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_post_disabled(client: TestClient, mocker: MockerFixture) -> None:
|
||||
async def test_post_disabled(client: TestClient) -> None:
|
||||
"""
|
||||
must raise exception if auth is disabled
|
||||
"""
|
||||
forget_patch = mocker.patch("aiohttp_security.forget")
|
||||
|
||||
post_response = await client.post("/logout")
|
||||
assert post_response.status == 401
|
||||
forget_patch.assert_not_called()
|
||||
assert post_response.status == 200
|
||||
|
@ -57,6 +57,5 @@ region = eu-central-1
|
||||
secret_key =
|
||||
|
||||
[web]
|
||||
auth = no
|
||||
host = 127.0.0.1
|
||||
templates = ../web/templates
|
Reference in New Issue
Block a user