mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-06-28 06:41:43 +00:00
implement support of unix socket for server
This feature can be used for unauthorized access to apis - e.g. for reporting service if it is run on the same machine. Since now it becomes recommended way for the interprocess communication, thus some options (e.g. creating user with as-service flag) are no longer available now
This commit is contained in:
@ -1,17 +1,16 @@
|
||||
import pytest
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
|
||||
_passwd = namedtuple("passwd", ["pw_dir"])
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def passwd() -> _passwd:
|
||||
def passwd() -> MagicMock:
|
||||
"""
|
||||
get passwd structure for the user
|
||||
|
||||
Returns:
|
||||
_passwd: passwd structure test instance
|
||||
MagicMock: passwd structure test instance
|
||||
"""
|
||||
return _passwd("home")
|
||||
passwd = MagicMock()
|
||||
passwd.pw_dir = "home"
|
||||
return passwd
|
||||
|
@ -31,6 +31,7 @@ def _default_args(args: argparse.Namespace) -> argparse.Namespace:
|
||||
args.sign_key = "key"
|
||||
args.sign_target = [SignSettings.Packages]
|
||||
args.web_port = 8080
|
||||
args.web_unix_socket = Path("/var/lib/ahriman/ahriman-web.sock")
|
||||
return args
|
||||
|
||||
|
||||
@ -91,6 +92,7 @@ def test_configuration_create_ahriman(args: argparse.Namespace, configuration: C
|
||||
" ".join([target.name.lower() for target in args.sign_target])),
|
||||
MockCall(Configuration.section_name("sign", "x86_64"), "key", args.sign_key),
|
||||
MockCall(Configuration.section_name("web", "x86_64"), "port", str(args.web_port)),
|
||||
MockCall(Configuration.section_name("web", "x86_64"), "unix_socket", str(args.web_unix_socket)),
|
||||
])
|
||||
write_mock.assert_called_once_with(pytest.helpers.anyvar(int))
|
||||
|
||||
|
@ -26,7 +26,6 @@ def _default_args(args: argparse.Namespace) -> argparse.Namespace:
|
||||
"""
|
||||
args.username = "user"
|
||||
args.action = Action.Update
|
||||
args.as_service = False
|
||||
args.exit_code = False
|
||||
args.password = "pa55w0rd"
|
||||
args.role = UserAccess.Reporter
|
||||
@ -73,33 +72,8 @@ def test_run_empty_salt(args: argparse.Namespace, configuration: Configuration,
|
||||
|
||||
Users.run(args, "x86_64", configuration, report=False, unsafe=False)
|
||||
get_auth_configuration_mock.assert_called_once_with(configuration.include)
|
||||
create_configuration_mock.assert_called_once_with(pytest.helpers.anyvar(int), pytest.helpers.anyvar(int),
|
||||
pytest.helpers.anyvar(int), args.as_service, args.secure)
|
||||
create_user_mock.assert_called_once_with(args)
|
||||
get_salt_mock.assert_called_once_with(configuration)
|
||||
update_mock.assert_called_once_with(user)
|
||||
|
||||
|
||||
def test_run_service_user(args: argparse.Namespace, configuration: Configuration, database: SQLite,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must create configuration if as service argument is provided
|
||||
"""
|
||||
args = _default_args(args)
|
||||
args.as_service = True
|
||||
user = User(username=args.username, password=args.password, access=args.role)
|
||||
mocker.patch("ahriman.core.database.SQLite.load", return_value=database)
|
||||
mocker.patch("ahriman.models.user.User.hash_password", return_value=user)
|
||||
get_auth_configuration_mock = mocker.patch("ahriman.application.handlers.Users.configuration_get")
|
||||
create_configuration_mock = mocker.patch("ahriman.application.handlers.Users.configuration_create")
|
||||
create_user_mock = mocker.patch("ahriman.application.handlers.Users.user_create", return_value=user)
|
||||
get_salt_mock = mocker.patch("ahriman.application.handlers.Users.get_salt", return_value=("salt", "salt"))
|
||||
update_mock = mocker.patch("ahriman.core.database.SQLite.user_update")
|
||||
|
||||
Users.run(args, "x86_64", configuration, report=False, unsafe=False)
|
||||
get_auth_configuration_mock.assert_called_once_with(configuration.include)
|
||||
create_configuration_mock.assert_called_once_with(pytest.helpers.anyvar(int), pytest.helpers.anyvar(int),
|
||||
pytest.helpers.anyvar(int), args.as_service, args.secure)
|
||||
create_configuration_mock.assert_called_once_with(
|
||||
pytest.helpers.anyvar(int), pytest.helpers.anyvar(int), args.secure)
|
||||
create_user_mock.assert_called_once_with(args)
|
||||
get_salt_mock.assert_called_once_with(configuration)
|
||||
update_mock.assert_called_once_with(user)
|
||||
@ -151,7 +125,7 @@ def test_run_remove(args: argparse.Namespace, configuration: Configuration, data
|
||||
remove_mock.assert_called_once_with(args.username)
|
||||
|
||||
|
||||
def test_configuration_create(configuration: Configuration, user: User, mocker: MockerFixture) -> None:
|
||||
def test_configuration_create(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly create configuration file
|
||||
"""
|
||||
@ -159,26 +133,11 @@ def test_configuration_create(configuration: Configuration, user: User, mocker:
|
||||
set_mock = mocker.patch("ahriman.core.configuration.Configuration.set_option")
|
||||
write_mock = mocker.patch("ahriman.application.handlers.Users.configuration_write")
|
||||
|
||||
Users.configuration_create(configuration, user, "salt", False, False)
|
||||
Users.configuration_create(configuration, "salt", False)
|
||||
set_mock.assert_called_once_with("auth", "salt", pytest.helpers.anyvar(int))
|
||||
write_mock.assert_called_once_with(configuration, False)
|
||||
|
||||
|
||||
def test_configuration_create_with_plain_password(configuration: Configuration, user: User,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must set plain text password and user for the service
|
||||
"""
|
||||
mocker.patch("pathlib.Path.open")
|
||||
|
||||
Users.configuration_create(configuration, user, "salt", True, False)
|
||||
|
||||
generated = User.from_option(user.username, user.password).hash_password("salt")
|
||||
service = User.from_option(configuration.get("web", "username"), configuration.get("web", "password"))
|
||||
assert generated.username == service.username
|
||||
assert generated.check_credentials(service.password, configuration.get("auth", "salt"))
|
||||
|
||||
|
||||
def test_configuration_get(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must load configuration from filesystem
|
||||
|
@ -43,6 +43,14 @@ def test_load_full_client_from_address(configuration: Configuration) -> None:
|
||||
assert isinstance(Client.load(configuration, report=True), WebClient)
|
||||
|
||||
|
||||
def test_load_full_client_from_unix_socket(configuration: Configuration) -> None:
|
||||
"""
|
||||
must load full client by using unix socket
|
||||
"""
|
||||
configuration.set_option("web", "unix_socket", "/var/lib/ahriman/ahriman-web.sock")
|
||||
assert isinstance(Client.load(configuration, report=True), WebClient)
|
||||
|
||||
|
||||
def test_add(client: Client, package_ahriman: Package) -> None:
|
||||
"""
|
||||
must process package addition without errors
|
||||
|
@ -2,6 +2,7 @@ import json
|
||||
import logging
|
||||
import pytest
|
||||
import requests
|
||||
import requests_unixsocket
|
||||
|
||||
from pytest_mock import MockerFixture
|
||||
from requests import Response
|
||||
@ -36,10 +37,36 @@ def test_parse_address(configuration: Configuration) -> None:
|
||||
"""
|
||||
configuration.set_option("web", "host", "localhost")
|
||||
configuration.set_option("web", "port", "8080")
|
||||
assert WebClient.parse_address(configuration) == "http://localhost:8080"
|
||||
assert WebClient.parse_address(configuration) == ("http://localhost:8080", False)
|
||||
|
||||
configuration.set_option("web", "address", "http://localhost:8081")
|
||||
assert WebClient.parse_address(configuration) == "http://localhost:8081"
|
||||
assert WebClient.parse_address(configuration) == ("http://localhost:8081", False)
|
||||
|
||||
configuration.set_option("web", "unix_socket", "/run/ahriman.sock")
|
||||
assert WebClient.parse_address(configuration) == ("http+unix://%2Frun%2Fahriman.sock", True)
|
||||
|
||||
|
||||
def test_create_session(web_client: WebClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must create normal requests session
|
||||
"""
|
||||
login_mock = mocker.patch("ahriman.core.status.web_client.WebClient._login")
|
||||
|
||||
session = web_client._create_session(use_unix_socket=False)
|
||||
assert isinstance(session, requests.Session)
|
||||
assert not isinstance(session, requests_unixsocket.Session)
|
||||
login_mock.assert_called_once_with()
|
||||
|
||||
|
||||
def test_create_session_unix_socket(web_client: WebClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must create unix socket session
|
||||
"""
|
||||
login_mock = mocker.patch("ahriman.core.status.web_client.WebClient._login")
|
||||
|
||||
session = web_client._create_session(use_unix_socket=True)
|
||||
assert isinstance(session, requests_unixsocket.Session)
|
||||
login_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_login(web_client: WebClient, user: User, mocker: MockerFixture) -> None:
|
||||
|
@ -1,6 +1,5 @@
|
||||
import pytest
|
||||
|
||||
from collections import namedtuple
|
||||
from typing import Any, Dict, List
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
@ -10,9 +9,6 @@ from ahriman.core.upload.rsync import Rsync
|
||||
from ahriman.core.upload.s3 import S3
|
||||
|
||||
|
||||
_s3_object = namedtuple("s3_object", ["key", "e_tag", "delete"])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def github(configuration: Configuration) -> Github:
|
||||
"""
|
||||
@ -78,12 +74,22 @@ def s3(configuration: Configuration) -> S3:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def s3_remote_objects() -> List[_s3_object]:
|
||||
def s3_remote_objects() -> List[MagicMock]:
|
||||
"""
|
||||
fixture for boto3 like S3 objects
|
||||
|
||||
Returns:
|
||||
List[_s3_object]: boto3 like S3 objects test instance
|
||||
List[MagicMock]: boto3 like S3 objects test instance
|
||||
"""
|
||||
delete_mock = MagicMock()
|
||||
return list(map(lambda item: _s3_object(f"x86_64/{item}", f"\"{item}\"", delete_mock), ["a", "b", "c"]))
|
||||
|
||||
result = []
|
||||
for item in ["a", "b", "c"]:
|
||||
s3_object = MagicMock()
|
||||
s3_object.key = f"x86_64/{item}"
|
||||
s3_object.e_tag = f"\"{item}\""
|
||||
s3_object.delete = delete_mock
|
||||
|
||||
result.append(s3_object)
|
||||
|
||||
return result
|
||||
|
@ -3,9 +3,8 @@ import pytest
|
||||
from asyncio import BaseEventLoop
|
||||
from aiohttp import web
|
||||
from aiohttp.test_utils import TestClient
|
||||
from collections import namedtuple
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any
|
||||
from typing import Any, Dict, Optional
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import ahriman.core.auth.helpers
|
||||
@ -18,11 +17,9 @@ from ahriman.models.user import User
|
||||
from ahriman.web.web import setup_service
|
||||
|
||||
|
||||
_request = namedtuple("_request", ["app", "path", "method", "json", "post"])
|
||||
|
||||
|
||||
@pytest.helpers.register
|
||||
def request(app: web.Application, path: str, method: str, json: Any = None, data: Any = None) -> _request:
|
||||
def request(app: web.Application, path: str, method: str, json: Any = None, data: Any = None,
|
||||
extra: Optional[Dict[str, Any]] = None) -> MagicMock:
|
||||
"""
|
||||
request generator helper
|
||||
|
||||
@ -32,11 +29,22 @@ def request(app: web.Application, path: str, method: str, json: Any = None, data
|
||||
method(str): method for the request
|
||||
json(Any, optional): json payload of the request (Default value = None)
|
||||
data(Any, optional): form data payload of the request (Default value = None)
|
||||
extra(Optional[Dict[str, Any]], optional): extra info which will be injected for ``get_extra_info`` command
|
||||
|
||||
Returns:
|
||||
_request: dummy request object
|
||||
MagicMock: dummy request mock
|
||||
"""
|
||||
return _request(app, path, method, json, data)
|
||||
request_mock = MagicMock()
|
||||
request_mock.app = app
|
||||
request_mock.path = path
|
||||
request_mock.method = method
|
||||
request_mock.json = json
|
||||
request_mock.post = data
|
||||
|
||||
extra = extra or {}
|
||||
request_mock.get_extra_info.side_effect = lambda key: extra.get(key)
|
||||
|
||||
return request_mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -1,4 +1,5 @@
|
||||
import pytest
|
||||
import socket
|
||||
|
||||
from aiohttp import web
|
||||
from aiohttp.test_utils import TestClient
|
||||
@ -55,6 +56,21 @@ async def test_permits(authorization_policy: AuthorizationPolicy, user: User) ->
|
||||
assert not await authorization_policy.permits(user.username, user.access, "/endpoint")
|
||||
|
||||
|
||||
async def test_auth_handler_unix_socket(client_with_auth: TestClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must allow calls via unix sockets
|
||||
"""
|
||||
aiohttp_request = pytest.helpers.request(
|
||||
"", "/api/v1/status", "GET", extra={"socket": socket.socket(socket.AF_UNIX)})
|
||||
request_handler = AsyncMock()
|
||||
request_handler.get_permission.return_value = UserAccess.Full
|
||||
check_permission_mock = mocker.patch("aiohttp_security.check_permission")
|
||||
|
||||
handler = auth_handler(allow_read_only=False)
|
||||
await handler(aiohttp_request, request_handler)
|
||||
check_permission_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_auth_handler_api(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must ask for status permission for api calls
|
||||
|
@ -50,7 +50,7 @@ def test_run(application: web.Application, mocker: MockerFixture) -> None:
|
||||
|
||||
run_server(application)
|
||||
run_application_mock.assert_called_once_with(
|
||||
application, host="127.0.0.1", port=port, handle_signals=False,
|
||||
application, host="127.0.0.1", port=port, path=None, handle_signals=False,
|
||||
access_log=pytest.helpers.anyvar(int), access_log_class=FilteredAccessLogger
|
||||
)
|
||||
|
||||
@ -65,7 +65,7 @@ def test_run_with_auth(application_with_auth: web.Application, mocker: MockerFix
|
||||
|
||||
run_server(application_with_auth)
|
||||
run_application_mock.assert_called_once_with(
|
||||
application_with_auth, host="127.0.0.1", port=port, handle_signals=False,
|
||||
application_with_auth, host="127.0.0.1", port=port, path=None, handle_signals=False,
|
||||
access_log=pytest.helpers.anyvar(int), access_log_class=FilteredAccessLogger
|
||||
)
|
||||
|
||||
@ -80,6 +80,23 @@ def test_run_with_debug(application_with_debug: web.Application, mocker: MockerF
|
||||
|
||||
run_server(application_with_debug)
|
||||
run_application_mock.assert_called_once_with(
|
||||
application_with_debug, host="127.0.0.1", port=port, handle_signals=False,
|
||||
application_with_debug, host="127.0.0.1", port=port, path=None, handle_signals=False,
|
||||
access_log=pytest.helpers.anyvar(int), access_log_class=FilteredAccessLogger
|
||||
)
|
||||
|
||||
|
||||
def test_run_with_socket(application: web.Application, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must run application
|
||||
"""
|
||||
port = 8080
|
||||
socket = "/run/ahriman.sock"
|
||||
application["configuration"].set_option("web", "port", str(port))
|
||||
application["configuration"].set_option("web", "unix_socket", socket)
|
||||
run_application_mock = mocker.patch("aiohttp.web.run_app")
|
||||
|
||||
run_server(application)
|
||||
run_application_mock.assert_called_once_with(
|
||||
application, host="127.0.0.1", port=port, path=socket, handle_signals=False,
|
||||
access_log=pytest.helpers.anyvar(int), access_log_class=FilteredAccessLogger
|
||||
)
|
||||
|
Reference in New Issue
Block a user