mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-07-23 10:49:55 +00:00
Auth support (#25)
* initial auth implementation * add create user parser * add tests * update dependencies list * add login annd logout to index also improve auth * realworld fixes * add method set_option to Configuration and also use it everywhere * split CreateUser handler to additional read method * check user duplicate on auth mapping read * generate salt by using passlib instead of random.choice * case-insensetive usernames * update dependencies * update configuration reference * improve tests * fix codefactor errors * hide fields if authorization is enabled, but no auth supplied * add settings object for auth provider * readme update
This commit is contained in:
@ -3,7 +3,10 @@ import pytest
|
||||
from aiohttp import web
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
import ahriman.core.auth.helpers
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.web.web import setup_service
|
||||
|
||||
|
||||
@ -15,5 +18,26 @@ def application(configuration: Configuration, mocker: MockerFixture) -> web.Appl
|
||||
:param mocker: mocker object
|
||||
:return: application test instance
|
||||
"""
|
||||
mocker.patch.object(ahriman.core.auth.helpers, "_has_aiohttp_security", False)
|
||||
mocker.patch("pathlib.Path.mkdir")
|
||||
return setup_service("x86_64", configuration)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def application_with_auth(configuration: Configuration, user: User, mocker: MockerFixture) -> web.Application:
|
||||
"""
|
||||
application fixture with auth enabled
|
||||
:param configuration: configuration fixture
|
||||
:param user: user descriptor fixture
|
||||
:param mocker: mocker object
|
||||
:return: application test instance
|
||||
"""
|
||||
configuration.set_option("auth", "target", "configuration")
|
||||
mocker.patch.object(ahriman.core.auth.helpers, "_has_aiohttp_security", True)
|
||||
mocker.patch("pathlib.Path.mkdir")
|
||||
application = setup_service("x86_64", configuration)
|
||||
|
||||
generated = User(user.username, user.hash_password(user.password, application["validator"].salt), user.access)
|
||||
application["validator"]._users[generated.username] = generated
|
||||
|
||||
return application
|
||||
|
@ -2,8 +2,12 @@ import pytest
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.web.middlewares.auth_handler import AuthorizationPolicy
|
||||
|
||||
_request = namedtuple("_request", ["path"])
|
||||
_request = namedtuple("_request", ["path", "method"])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -12,4 +16,17 @@ def aiohttp_request() -> _request:
|
||||
fixture for aiohttp like object
|
||||
:return: aiohttp like request test instance
|
||||
"""
|
||||
return _request("path")
|
||||
return _request("path", "GET")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def authorization_policy(configuration: Configuration, user: User) -> AuthorizationPolicy:
|
||||
"""
|
||||
fixture for authorization policy
|
||||
:return: authorization policy fixture
|
||||
"""
|
||||
configuration.set_option("auth", "target", "configuration")
|
||||
validator = Auth.load(configuration)
|
||||
policy = AuthorizationPolicy(validator)
|
||||
policy.validator._users = {user.username: user}
|
||||
return policy
|
||||
|
100
tests/ahriman/web/middlewares/test_auth_handler.py
Normal file
100
tests/ahriman/web/middlewares/test_auth_handler.py
Normal file
@ -0,0 +1,100 @@
|
||||
from aiohttp import web
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
from ahriman.core.auth.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.web.middlewares.auth_handler import auth_handler, AuthorizationPolicy, setup_auth
|
||||
|
||||
|
||||
async def test_authorized_userid(authorization_policy: AuthorizationPolicy, user: User) -> None:
|
||||
"""
|
||||
must return authorized user id
|
||||
"""
|
||||
assert await authorization_policy.authorized_userid(user.username) == user.username
|
||||
assert await authorization_policy.authorized_userid("some random name") is None
|
||||
|
||||
|
||||
async def test_permits(authorization_policy: AuthorizationPolicy, user: User) -> None:
|
||||
"""
|
||||
must call validator check
|
||||
"""
|
||||
authorization_policy.validator = MagicMock()
|
||||
authorization_policy.validator.verify_access.return_value = True
|
||||
|
||||
assert await authorization_policy.permits(user.username, user.access, "/endpoint")
|
||||
authorization_policy.validator.verify_access.assert_called_with(user.username, user.access, "/endpoint")
|
||||
|
||||
|
||||
async def test_auth_handler_api(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for status permission for api calls
|
||||
"""
|
||||
aiohttp_request = aiohttp_request._replace(path="/api")
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Status, aiohttp_request.path)
|
||||
|
||||
|
||||
async def test_auth_handler_api_post(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for status permission for api calls with POST
|
||||
"""
|
||||
aiohttp_request = aiohttp_request._replace(path="/api", method="POST")
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Status, aiohttp_request.path)
|
||||
|
||||
|
||||
async def test_auth_handler_read(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for read permission for api calls with GET
|
||||
"""
|
||||
for method in ("GET", "HEAD", "OPTIONS"):
|
||||
aiohttp_request = aiohttp_request._replace(method=method)
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Read, aiohttp_request.path)
|
||||
|
||||
|
||||
async def test_auth_handler_write(aiohttp_request: Any, auth: Auth, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for read permission for api calls with POST
|
||||
"""
|
||||
for method in ("CONNECT", "DELETE", "PATCH", "POST", "PUT", "TRACE"):
|
||||
aiohttp_request = aiohttp_request._replace(method=method)
|
||||
request_handler = AsyncMock()
|
||||
mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler(auth)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Write, aiohttp_request.path)
|
||||
|
||||
|
||||
def test_setup_auth(
|
||||
application_with_auth: web.Application,
|
||||
configuration: Configuration,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must setup authorization
|
||||
"""
|
||||
aiohttp_security_setup_mock = mocker.patch("aiohttp_security.setup")
|
||||
application = setup_auth(application_with_auth, configuration)
|
||||
assert application.get("validator") is not None
|
||||
aiohttp_security_setup_mock.assert_called_once()
|
@ -35,9 +35,22 @@ def test_run(application: web.Application, mocker: MockerFixture) -> None:
|
||||
must run application
|
||||
"""
|
||||
port = 8080
|
||||
application["configuration"].set("web", "port", str(port))
|
||||
application["configuration"].set_option("web", "port", str(port))
|
||||
run_application_mock = mocker.patch("aiohttp.web.run_app")
|
||||
|
||||
run_server(application)
|
||||
run_application_mock.assert_called_with(application, host="127.0.0.1", port=port,
|
||||
handle_signals=False, access_log=pytest.helpers.anyvar(int))
|
||||
|
||||
|
||||
def test_run_with_auth(application_with_auth: web.Application, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must run application
|
||||
"""
|
||||
port = 8080
|
||||
application_with_auth["configuration"].set_option("web", "port", str(port))
|
||||
run_application_mock = mocker.patch("aiohttp.web.run_app")
|
||||
|
||||
run_server(application_with_auth)
|
||||
run_application_mock.assert_called_with(application_with_auth, host="127.0.0.1", port=port,
|
||||
handle_signals=False, access_log=pytest.helpers.anyvar(int))
|
||||
|
@ -20,3 +20,18 @@ def client(application: web.Application, loop: BaseEventLoop,
|
||||
"""
|
||||
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
||||
return loop.run_until_complete(aiohttp_client(application))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client_with_auth(application_with_auth: web.Application, loop: BaseEventLoop,
|
||||
aiohttp_client: Any, mocker: MockerFixture) -> TestClient:
|
||||
"""
|
||||
web client fixture with full authorization functions
|
||||
:param application_with_auth: application fixture
|
||||
:param loop: context event loop
|
||||
:param aiohttp_client: aiohttp client fixture
|
||||
:param mocker: mocker object
|
||||
:return: web client test instance
|
||||
"""
|
||||
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
||||
return loop.run_until_complete(aiohttp_client(application_with_auth))
|
||||
|
@ -1,19 +1,28 @@
|
||||
from pytest_aiohttp import TestClient
|
||||
|
||||
|
||||
async def test_get(client: TestClient) -> None:
|
||||
async def test_get(client_with_auth: TestClient) -> None:
|
||||
"""
|
||||
must generate status page correctly (/)
|
||||
"""
|
||||
response = await client_with_auth.get("/")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
||||
|
||||
async def test_get_index(client_with_auth: TestClient) -> None:
|
||||
"""
|
||||
must generate status page correctly (/index.html)
|
||||
"""
|
||||
response = await client_with_auth.get("/index.html")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
||||
|
||||
async def test_get_without_auth(client: TestClient) -> None:
|
||||
"""
|
||||
must use dummy authorized_userid function in case if no security library installed
|
||||
"""
|
||||
response = await client.get("/")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
||||
|
||||
async def test_get_index(client: TestClient) -> None:
|
||||
"""
|
||||
must generate status page correctly (/index.html)
|
||||
"""
|
||||
response = await client.get("/index.html")
|
||||
assert response.status == 200
|
||||
assert await response.text()
|
||||
|
41
tests/ahriman/web/views/test_view_login.py
Normal file
41
tests/ahriman/web/views/test_view_login.py
Normal file
@ -0,0 +1,41 @@
|
||||
from aiohttp.test_utils import TestClient
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from ahriman.models.user import User
|
||||
|
||||
|
||||
async def test_post(client_with_auth: TestClient, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must login user correctly
|
||||
"""
|
||||
payload = {"username": user.username, "password": user.password}
|
||||
remember_mock = mocker.patch("aiohttp_security.remember")
|
||||
|
||||
post_response = await client_with_auth.post("/login", json=payload)
|
||||
assert post_response.status == 200
|
||||
|
||||
post_response = await client_with_auth.post("/login", data=payload)
|
||||
assert post_response.status == 200
|
||||
|
||||
remember_mock.assert_called()
|
||||
|
||||
|
||||
async def test_post_skip(client: TestClient, user: User) -> None:
|
||||
"""
|
||||
must process if no auth configured
|
||||
"""
|
||||
payload = {"username": user.username, "password": user.password}
|
||||
post_response = await client.post("/login", json=payload)
|
||||
assert post_response.status == 200
|
||||
|
||||
|
||||
async def test_post_unauthorized(client_with_auth: TestClient, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must return unauthorized on invalid auth
|
||||
"""
|
||||
payload = {"username": user.username, "password": ""}
|
||||
remember_mock = mocker.patch("aiohttp_security.remember")
|
||||
|
||||
post_response = await client_with_auth.post("/login", json=payload)
|
||||
assert post_response.status == 401
|
||||
remember_mock.assert_not_called()
|
35
tests/ahriman/web/views/test_view_logout.py
Normal file
35
tests/ahriman/web/views/test_view_logout.py
Normal file
@ -0,0 +1,35 @@
|
||||
from aiohttp.test_utils import TestClient
|
||||
from aiohttp.web import HTTPUnauthorized
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
|
||||
async def test_post(client_with_auth: TestClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must logout user correctly
|
||||
"""
|
||||
mocker.patch("aiohttp_security.check_authorized")
|
||||
forget_mock = mocker.patch("aiohttp_security.forget")
|
||||
|
||||
post_response = await client_with_auth.post("/logout")
|
||||
assert post_response.status == 200
|
||||
forget_mock.assert_called_once()
|
||||
|
||||
|
||||
async def test_post_unauthorized(client_with_auth: TestClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must raise exception if unauthorized
|
||||
"""
|
||||
mocker.patch("aiohttp_security.check_authorized", side_effect=HTTPUnauthorized())
|
||||
forget_mock = mocker.patch("aiohttp_security.forget")
|
||||
|
||||
post_response = await client_with_auth.post("/logout")
|
||||
assert post_response.status == 401
|
||||
forget_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_post_disabled(client: TestClient) -> None:
|
||||
"""
|
||||
must raise exception if auth is disabled
|
||||
"""
|
||||
post_response = await client.post("/logout")
|
||||
assert post_response.status == 200
|
Reference in New Issue
Block a user