initial auth implementation

This commit is contained in:
Evgenii Alekseev 2021-08-26 03:11:07 +03:00
parent 3922c55464
commit 3ee5f7f13e
20 changed files with 574 additions and 72 deletions

View File

@ -43,5 +43,6 @@ command = rsync --archive --compress --partial --delete
chunk_size = 8388608
[web]
auth = no
host = 127.0.0.1
templates = /usr/share/ahriman

View File

@ -97,6 +97,10 @@ setup(
"Jinja2",
"aiohttp",
"aiohttp_jinja2",
"aiohttp_session",
"aiohttp_security",
"cryptography",
"passlib",
],
},

98
src/ahriman/core/auth.py Normal file
View File

@ -0,0 +1,98 @@
#
# Copyright (c) 2021 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Dict, Optional, Set
from ahriman.core.configuration import Configuration
from ahriman.models.user import User
from ahriman.models.user_access import UserAccess
class Auth:
"""
helper to deal with user authorization
:ivar allowed_paths: URI paths which can be accessed without authorization
:ivar allowed_paths_groups: URI paths prefixes which can be accessed without authorization
:ivar salt: random generated string to salt passwords
:ivar users: map of username to its descriptor
:cvar ALLOWED_PATHS: URI paths which can be accessed without authorization, predefined
:cvar ALLOWED_PATHS_GROUPS: URI paths prefixes which can be accessed without authorization, predefined
"""
ALLOWED_PATHS = {"/", "/favicon.ico", "/login", "/logout"}
ALLOWED_PATHS_GROUPS: Set[str] = set()
def __init__(self, configuration: Configuration) -> None:
"""
default constructor
:param configuration: configuration instance
"""
self.salt = configuration.get("auth", "salt")
self.users = self.get_users(configuration)
self.allowed_paths = set(configuration.getlist("auth", "allowed_paths"))
self.allowed_paths.update(self.ALLOWED_PATHS)
self.allowed_paths_groups = set(configuration.getlist("auth", "allowed_paths_groups"))
self.allowed_paths_groups.update(self.ALLOWED_PATHS_GROUPS)
@staticmethod
def get_users(configuration: Configuration) -> Dict[str, User]:
"""
load users from settings
:param configuration: configuration instance
:return: map of username to its descriptor
"""
users: Dict[str, User] = {}
for role in UserAccess:
section = configuration.section_name("auth", role.value)
if not configuration.has_section(section):
continue
for user, password in configuration[section].items():
users[user] = User(user, password, role)
return users
def check_credentials(self, username: Optional[str], password: Optional[str]) -> bool:
"""
validate user password
:param username: username
:param password: entered password
:return: True in case if password matches, False otherwise
"""
if username is None or password is None:
return False # invalid data supplied
return username in self.users and self.users[username].check_credentials(password, self.salt)
def is_safe_request(self, uri: Optional[str]) -> bool:
"""
check if requested path are allowed without authorization
:param uri: request uri
:return: True in case if this URI can be requested without authorization and False otherwise
"""
if uri is None:
return False # request without context is not allowed
return uri in self.ALLOWED_PATHS or any(uri.startswith(path) for path in self.ALLOWED_PATHS_GROUPS)
def verify_access(self, username: str, required: UserAccess) -> bool:
"""
validate if user has access to requested resource
:param username: username
:param required: required access level
:return: True in case if user is allowed to do this request and False otherwise
"""
return username in self.users and self.users[username].verify_access(required)

View File

@ -78,14 +78,14 @@ class Configuration(configparser.RawConfigParser):
return config
@staticmethod
def section_name(section: str, architecture: str) -> str:
def section_name(section: str, suffix: str) -> str:
"""
generate section name for architecture specific sections
generate section name for sections which depends on context
:param section: section name
:param architecture: repository architecture
:param suffix: session suffix, e.g. repository architecture
:return: correct section name for repository specific section
"""
return f"{section}:{architecture}"
return f"{section}:{suffix}"
def dump(self) -> Dict[str, Dict[str, str]]:
"""

View File

@ -43,7 +43,7 @@ class Client:
port = configuration.getint("web", "port", fallback=None)
if host is not None and port is not None:
from ahriman.core.status.web_client import WebClient
return WebClient(host, port)
return WebClient(configuration)
return cls()
def add(self, package: Package, status: BuildStatusEnum) -> None:

View File

@ -22,37 +22,90 @@ import requests
from typing import List, Optional, Tuple
from ahriman.core.configuration import Configuration
from ahriman.core.status.client import Client
from ahriman.core.util import exception_response_text
from ahriman.models.build_status import BuildStatusEnum, BuildStatus
from ahriman.models.internal_status import InternalStatus
from ahriman.models.package import Package
from ahriman.models.user import User
class WebClient(Client):
"""
build status reporter web client
:ivar host: host of web service
:ivar address: address of the web service
:ivar logger: class logger
:ivar port: port of web service
:ivar user: web service user descriptor
"""
def __init__(self, host: str, port: int) -> None:
def __init__(self, configuration: Configuration) -> None:
"""
default constructor
:param host: host of web service
:param port: port of web service
:param configuration: configuration instance
"""
self.logger = logging.getLogger("http")
self.host = host
self.port = port
self.address = self.parse_address(configuration)
self.user = User.from_option(
configuration.get("web", "username", fallback=None),
configuration.get("web", "password", fallback=None))
self.__session = requests.session()
@property
def _ahriman_url(self) -> str:
"""
url generator
:return: full url for web service for ahriman service itself
"""
return f"http://{self.host}:{self.port}/api/v1/ahriman"
return f"{self.address}/api/v1/ahriman"
@property
def _login_url(self) -> str:
"""
:return: full url for web service to login
"""
return f"{self.address}/login"
@property
def _status_url(self) -> str:
"""
:return: full url for web service for status
"""
return f"{self.address}/api/v1/status"
@staticmethod
def parse_address(configuration: Configuration) -> str:
"""
parse address from configuration
:param configuration: configuration instance
:return: valid http address
"""
address = configuration.get("web", "address", fallback=None)
if not address:
# build address from host and port directly
host = configuration.get("web", "host")
port = configuration.getint("web", "port")
address = f"http://{host}:{port}"
return address
def login(self) -> None:
"""
process login to the service
"""
if self.user is None:
return # no auth configured
payload = {
"username": self.user.username,
"password": self.user.password
}
try:
response = self.__session.post(self._login_url, json=payload)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
self.logger.exception("could not login as %s: %s", self.user, exception_response_text(e))
except Exception:
self.logger.exception("could not login as %s", self.user)
def _package_url(self, base: str = "") -> str:
"""
@ -60,14 +113,7 @@ class WebClient(Client):
:param base: package base to generate url
:return: full url of web service for specific package base
"""
return f"http://{self.host}:{self.port}/api/v1/packages/{base}"
def _status_url(self) -> str:
"""
url generator
:return: full url for web service for status
"""
return f"http://{self.host}:{self.port}/api/v1/status"
return f"{self.address}/api/v1/packages/{base}"
def add(self, package: Package, status: BuildStatusEnum) -> None:
"""
@ -81,7 +127,7 @@ class WebClient(Client):
}
try:
response = requests.post(self._package_url(package.base), json=payload)
response = self.__session.post(self._package_url(package.base), json=payload)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
self.logger.exception("could not add %s: %s", package.base, exception_response_text(e))
@ -95,7 +141,7 @@ class WebClient(Client):
:return: list of current package description and status if it has been found
"""
try:
response = requests.get(self._package_url(base or ""))
response = self.__session.get(self._package_url(base or ""))
response.raise_for_status()
status_json = response.json()
@ -115,7 +161,7 @@ class WebClient(Client):
:return: current internal (web) service status
"""
try:
response = requests.get(self._status_url())
response = self.__session.get(self._status_url)
response.raise_for_status()
status_json = response.json()
@ -132,7 +178,7 @@ class WebClient(Client):
:return: current ahriman status
"""
try:
response = requests.get(self._ahriman_url())
response = self.__session.get(self._ahriman_url)
response.raise_for_status()
status_json = response.json()
@ -149,7 +195,7 @@ class WebClient(Client):
:param base: basename to remove
"""
try:
response = requests.delete(self._package_url(base))
response = self.__session.delete(self._package_url(base))
response.raise_for_status()
except requests.exceptions.HTTPError as e:
self.logger.exception("could not delete %s: %s", base, exception_response_text(e))
@ -165,7 +211,7 @@ class WebClient(Client):
payload = {"status": status.value}
try:
response = requests.post(self._package_url(base), json=payload)
response = self.__session.post(self._package_url(base), json=payload)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
self.logger.exception("could not update %s: %s", base, exception_response_text(e))
@ -180,7 +226,7 @@ class WebClient(Client):
payload = {"status": status.value}
try:
response = requests.post(self._ahriman_url(), json=payload)
response = self.__session.post(self._ahriman_url, json=payload)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
self.logger.exception("could not update service status: %s", exception_response_text(e))

View File

@ -0,0 +1,78 @@
#
# Copyright (c) 2021 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Type
from passlib.handlers.sha2_crypt import sha512_crypt # type: ignore
from ahriman.models.user_access import UserAccess
@dataclass
class User:
"""
authorized web user model
:ivar username: username
:ivar password: hashed user password with salt
:ivar access: user role
"""
username: str
password: str
access: UserAccess
@classmethod
def from_option(cls: Type[User], username: Optional[str], password: Optional[str]) -> Optional[User]:
"""
build user descriptor from configuration options
:param username: username
:param password: password as string
:return: generated user descriptor if all options are supplied and None otherwise
"""
if username is None or password is None:
return None
return cls(username, password, UserAccess.Status)
def check_credentials(self, password: str, salt: str) -> bool:
"""
validate user password
:param password: entered password
:param salt: salt for hashed password
:return: True in case if password matches, False otherwise
"""
verified: bool = sha512_crypt.verify(password + salt, self.password)
return verified
def verify_access(self, required: UserAccess) -> bool:
"""
validate if user has access to requested resource
:param required: required access level
:return: True in case if user is allowed to do this request and False otherwise
"""
if self.access == UserAccess.Write:
return True # everything is allowed
return self.access == required
def __repr__(self) -> str:
"""
generate string representation of object
:return: unique string representation
"""
return f"User(username={self.username}, access={self.access})"

View File

@ -0,0 +1,33 @@
#
# Copyright (c) 2021 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from enum import Enum
class UserAccess(Enum):
"""
web user access enumeration
:cvar Read: user can read status page
:cvar Write: user can modify task and package list
:cvar Status: user can update statuses via API
"""
Read = "read"
Write = "write"
Status = "status"

View File

@ -17,3 +17,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from aiohttp.web import Request
from aiohttp.web_response import StreamResponse
from typing import Awaitable, Callable
HandlerType = Callable[[Request], Awaitable[StreamResponse]]
MiddlewareType = Callable[[Request, HandlerType], Awaitable[StreamResponse]]

View File

@ -0,0 +1,101 @@
#
# Copyright (c) 2021 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from aiohttp import web
from aiohttp.web import middleware, Request
from aiohttp.web_response import StreamResponse
from aiohttp_security import setup as setup_security # type: ignore
from aiohttp_security import AbstractAuthorizationPolicy, SessionIdentityPolicy, check_permission
from typing import Optional
from ahriman.core.auth import Auth
from ahriman.core.configuration import Configuration
from ahriman.models.user_access import UserAccess
from ahriman.web.middlewares import HandlerType, MiddlewareType
class AuthorizationPolicy(AbstractAuthorizationPolicy): # type: ignore
"""
authorization policy implementation
:ivar validator: validator instance
"""
def __init__(self, configuration: Configuration) -> None:
"""
default constructor
:param configuration: configuration instance
"""
self.validator = Auth(configuration)
async def authorized_userid(self, identity: str) -> Optional[str]:
"""
retrieve authorized username
:param identity: username
:return: user identity (username) in case if user exists and None otherwise
"""
return identity if identity in self.validator.users else None
async def permits(self, identity: str, permission: UserAccess, context: Optional[str] = None) -> bool:
"""
check user permissions
:param identity: username
:param permission: requested permission level
:param context: URI request path
:return: True in case if user is allowed to perform this request and False otherwise
"""
if self.validator.is_safe_request(context):
return True
return self.validator.verify_access(identity, permission)
def auth_handler() -> MiddlewareType:
"""
authorization and authentication middleware
:return: built middleware
"""
@middleware
async def handle(request: Request, handler: HandlerType) -> StreamResponse:
if request.path.startswith("/api"):
permission = UserAccess.Status
elif request.method in ("HEAD", "GET", "OPTIONS"):
permission = UserAccess.Read
else:
permission = UserAccess.Write
await check_permission(request, permission, request.path)
return await handler(request)
return handle
def setup_auth(application: web.Application, configuration: Configuration) -> web.Application:
"""
setup authorization policies for the application
:param application: web application instance
:param configuration: configuration instance
:return: configured web application
"""
authorization_policy = AuthorizationPolicy(configuration)
identity_policy = SessionIdentityPolicy()
application["validator"] = authorization_policy.validator
setup_security(application, identity_policy, authorization_policy)
application.middlewares.append(auth_handler())
return application

View File

@ -21,13 +21,11 @@ from aiohttp.web import middleware, Request
from aiohttp.web_exceptions import HTTPClientError
from aiohttp.web_response import StreamResponse
from logging import Logger
from typing import Awaitable, Callable
from ahriman.web.middlewares import HandlerType, MiddlewareType
HandlerType = Callable[[Request], Awaitable[StreamResponse]]
def exception_handler(logger: Logger) -> Callable[[Request, HandlerType], Awaitable[StreamResponse]]:
def exception_handler(logger: Logger) -> MiddlewareType:
"""
exception handler middleware. Just log any exception (except for client ones)
:param logger: class logger

View File

@ -21,6 +21,8 @@ from aiohttp.web import Application
from ahriman.web.views.ahriman import AhrimanView
from ahriman.web.views.index import IndexView
from ahriman.web.views.login import LoginView
from ahriman.web.views.logout import LogoutView
from ahriman.web.views.package import PackageView
from ahriman.web.views.packages import PackagesView
from ahriman.web.views.status import StatusView
@ -35,6 +37,9 @@ def setup_routes(application: Application) -> None:
GET / get build status page
GET /index.html same as above
POST /login login to service
POST /logout logout from service
GET /api/v1/ahriman get current service status
POST /api/v1/ahriman update service status
@ -52,6 +57,9 @@ def setup_routes(application: Application) -> None:
application.router.add_get("/", IndexView)
application.router.add_get("/index.html", IndexView)
application.router.add_post("/login", LoginView)
application.router.add_post("/logout", LogoutView)
application.router.add_get("/api/v1/ahriman", AhrimanView)
application.router.add_post("/api/v1/ahriman", AhrimanView)

View File

@ -46,7 +46,7 @@ class AhrimanView(BaseView):
:return: 204 on success
"""
data = await self.request.json()
data = await self.extract_data()
try:
status = BuildStatusEnum(data["status"])

View File

@ -18,7 +18,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from aiohttp.web import View
from typing import Any, Dict
from ahriman.core.auth import Auth
from ahriman.core.status.watcher import Watcher
@ -34,3 +36,22 @@ class BaseView(View):
"""
watcher: Watcher = self.request.app["watcher"]
return watcher
@property
def validator(self) -> Auth:
"""
:return: authorization service instance
"""
validator: Auth = self.request.app["validator"]
return validator
async def extract_data(self) -> Dict[str, Any]:
"""
extract json data from either json or form data
:return: raw json object or form data converted to json
"""
try:
json: Dict[str, Any] = await self.request.json()
return json
except ValueError:
return dict(await self.request.post())

View File

@ -0,0 +1,51 @@
#
# Copyright (c) 2021 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from aiohttp.web import HTTPFound, HTTPUnauthorized, Response
from aiohttp_security import remember # type: ignore
from ahriman.web.views.base import BaseView
class LoginView(BaseView):
"""
login endpoint view
"""
async def post(self) -> Response:
"""
login user to service
either JSON body or form data must be supplied the following fields are required:
{
"username": "username" # username to use for login
"password": "pa55w0rd" # password to use for login
}
:return: redirect to main page
"""
data = await self.extract_data()
username = data.get("username")
response = HTTPFound("/")
if self.validator.check_credentials(username, data.get("password")):
await remember(self.request, response, username)
return response
raise HTTPUnauthorized()

View File

@ -0,0 +1,41 @@
#
# Copyright (c) 2021 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from aiohttp.web import HTTPFound, Response
from aiohttp_security import check_authorized, forget # type: ignore
from ahriman.web.views.base import BaseView
class LogoutView(BaseView):
"""
logout endpoint view
"""
async def post(self) -> Response:
"""
logout user from the service. No parameters supported here
:return: redirect to main page
"""
await check_authorized(self.request)
response = HTTPFound("/")
await forget(self.request, response)
return response

View File

@ -74,7 +74,7 @@ class PackageView(BaseView):
:return: 204 on success
"""
base = self.request.match_info["package"]
data = await self.request.json()
data = await self.extract_data()
try:
package = Package.from_json(data["package"]) if "package" in data else None

View File

@ -18,14 +18,19 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import aiohttp_jinja2
import base64
import jinja2
import logging
from aiohttp import web
from aiohttp_session import setup as setup_session # type: ignore
from aiohttp_session.cookie_storage import EncryptedCookieStorage # type: ignore
from cryptography import fernet
from ahriman.core.configuration import Configuration
from ahriman.core.exceptions import InitializeException
from ahriman.core.status.watcher import Watcher
from ahriman.web.middlewares.auth_handler import setup_auth
from ahriman.web.middlewares.exception_handler import exception_handler
from ahriman.web.routes import setup_routes
@ -92,4 +97,12 @@ def setup_service(architecture: str, configuration: Configuration) -> web.Applic
application.logger.info("setup watcher")
application["watcher"] = Watcher(architecture, configuration)
fernet_key = fernet.Fernet.generate_key()
secret_key = base64.urlsafe_b64decode(fernet_key)
storage = EncryptedCookieStorage(secret_key, cookie_name='API_SESSION')
setup_session(application, storage)
if configuration.getboolean("web", "auth", fallback=False):
setup_auth(application, configuration)
return application

View File

@ -2,6 +2,7 @@ import pytest
from typing import Any, Dict
from ahriman.core.configuration import Configuration
from ahriman.core.status.client import Client
from ahriman.core.status.web_client import WebClient
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
@ -40,9 +41,10 @@ def client() -> Client:
@pytest.fixture
def web_client() -> WebClient:
def web_client(configuration: Configuration) -> WebClient:
"""
fixture for web client
:return: web client test instance
"""
return WebClient("localhost", 8080)
configuration.set("web", "port", 8080)
return WebClient(configuration)

View File

@ -15,31 +15,31 @@ def test_ahriman_url(web_client: WebClient) -> None:
"""
must generate service status url correctly
"""
assert web_client._ahriman_url().startswith(f"http://{web_client.host}:{web_client.port}")
assert web_client._ahriman_url().endswith("/api/v1/ahriman")
def test_package_url(web_client: WebClient, package_ahriman: Package) -> None:
"""
must generate package status correctly
"""
assert web_client._package_url(package_ahriman.base).startswith(f"http://{web_client.host}:{web_client.port}")
assert web_client._package_url(package_ahriman.base).endswith(f"/api/v1/packages/{package_ahriman.base}")
assert web_client._ahriman_url.startswith(web_client.address)
assert web_client._ahriman_url.endswith("/api/v1/ahriman")
def test_status_url(web_client: WebClient) -> None:
"""
must generate service status url correctly
"""
assert web_client._status_url().startswith(f"http://{web_client.host}:{web_client.port}")
assert web_client._status_url().endswith("/api/v1/status")
assert web_client._status_url.startswith(web_client.address)
assert web_client._status_url.endswith("/api/v1/status")
def test_package_url(web_client: WebClient, package_ahriman: Package) -> None:
"""
must generate package status correctly
"""
assert web_client._package_url(package_ahriman.base).startswith(web_client.address)
assert web_client._package_url(package_ahriman.base).endswith(f"/api/v1/packages/{package_ahriman.base}")
def test_add(web_client: WebClient, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must process package addition
"""
requests_mock = mocker.patch("requests.post")
requests_mock = mocker.patch("requests.Session.post")
payload = pytest.helpers.get_package_status(package_ahriman)
web_client.add(package_ahriman, BuildStatusEnum.Unknown)
@ -50,7 +50,7 @@ def test_add_failed(web_client: WebClient, package_ahriman: Package, mocker: Moc
"""
must suppress any exception happened during addition
"""
mocker.patch("requests.post", side_effect=Exception())
mocker.patch("requests.Session.post", side_effect=Exception())
web_client.add(package_ahriman, BuildStatusEnum.Unknown)
@ -58,7 +58,7 @@ def test_add_failed_http_error(web_client: WebClient, package_ahriman: Package,
"""
must suppress any exception happened during addition
"""
mocker.patch("requests.post", side_effect=requests.exceptions.HTTPError())
mocker.patch("requests.Session.post", side_effect=requests.exceptions.HTTPError())
web_client.add(package_ahriman, BuildStatusEnum.Unknown)
@ -71,7 +71,7 @@ def test_get_all(web_client: WebClient, package_ahriman: Package, mocker: Mocker
response_obj._content = json.dumps(response).encode("utf8")
response_obj.status_code = 200
requests_mock = mocker.patch("requests.get", return_value=response_obj)
requests_mock = mocker.patch("requests.Session.get", return_value=response_obj)
result = web_client.get(None)
requests_mock.assert_called_once()
@ -83,7 +83,7 @@ def test_get_failed(web_client: WebClient, mocker: MockerFixture) -> None:
"""
must suppress any exception happened during status getting
"""
mocker.patch("requests.get", side_effect=Exception())
mocker.patch("requests.Session.get", side_effect=Exception())
assert web_client.get(None) == []
@ -91,7 +91,7 @@ def test_get_failed_http_error(web_client: WebClient, mocker: MockerFixture) ->
"""
must suppress any exception happened during status getting
"""
mocker.patch("requests.get", side_effect=requests.exceptions.HTTPError())
mocker.patch("requests.Session.get", side_effect=requests.exceptions.HTTPError())
assert web_client.get(None) == []
@ -104,7 +104,7 @@ def test_get_single(web_client: WebClient, package_ahriman: Package, mocker: Moc
response_obj._content = json.dumps(response).encode("utf8")
response_obj.status_code = 200
requests_mock = mocker.patch("requests.get", return_value=response_obj)
requests_mock = mocker.patch("requests.Session.get", return_value=response_obj)
result = web_client.get(package_ahriman.base)
requests_mock.assert_called_once()
@ -120,7 +120,7 @@ def test_get_internal(web_client: WebClient, mocker: MockerFixture) -> None:
response_obj._content = json.dumps(InternalStatus(architecture="x86_64").view()).encode("utf8")
response_obj.status_code = 200
requests_mock = mocker.patch("requests.get", return_value=response_obj)
requests_mock = mocker.patch("requests.Session.get", return_value=response_obj)
result = web_client.get_internal()
requests_mock.assert_called_once()
@ -131,7 +131,7 @@ def test_get_internal_failed(web_client: WebClient, mocker: MockerFixture) -> No
"""
must suppress any exception happened during web service status getting
"""
mocker.patch("requests.get", side_effect=Exception())
mocker.patch("requests.Session.get", side_effect=Exception())
assert web_client.get_internal() == InternalStatus()
@ -139,7 +139,7 @@ def test_get_internal_failed_http_error(web_client: WebClient, mocker: MockerFix
"""
must suppress any exception happened during web service status getting
"""
mocker.patch("requests.get", side_effect=requests.exceptions.HTTPError())
mocker.patch("requests.Session.get", side_effect=requests.exceptions.HTTPError())
assert web_client.get_internal() == InternalStatus()
@ -151,7 +151,7 @@ def test_get_self(web_client: WebClient, mocker: MockerFixture) -> None:
response_obj._content = json.dumps(BuildStatus().view()).encode("utf8")
response_obj.status_code = 200
requests_mock = mocker.patch("requests.get", return_value=response_obj)
requests_mock = mocker.patch("requests.Session.get", return_value=response_obj)
result = web_client.get_self()
requests_mock.assert_called_once()
@ -162,7 +162,7 @@ def test_get_self_failed(web_client: WebClient, mocker: MockerFixture) -> None:
"""
must suppress any exception happened during service status getting
"""
mocker.patch("requests.get", side_effect=Exception())
mocker.patch("requests.Session.get", side_effect=Exception())
assert web_client.get_self().status == BuildStatusEnum.Unknown
@ -170,7 +170,7 @@ def test_get_self_failed_http_error(web_client: WebClient, mocker: MockerFixture
"""
must suppress any exception happened during service status getting
"""
mocker.patch("requests.get", side_effect=requests.exceptions.HTTPError())
mocker.patch("requests.Session.get", side_effect=requests.exceptions.HTTPError())
assert web_client.get_self().status == BuildStatusEnum.Unknown
@ -178,7 +178,7 @@ def test_remove(web_client: WebClient, package_ahriman: Package, mocker: MockerF
"""
must process package removal
"""
requests_mock = mocker.patch("requests.delete")
requests_mock = mocker.patch("requests.Session.delete")
web_client.remove(package_ahriman.base)
requests_mock.assert_called_with(pytest.helpers.anyvar(str, True))
@ -188,7 +188,7 @@ def test_remove_failed(web_client: WebClient, package_ahriman: Package, mocker:
"""
must suppress any exception happened during removal
"""
mocker.patch("requests.delete", side_effect=Exception())
mocker.patch("requests.Session.delete", side_effect=Exception())
web_client.remove(package_ahriman.base)
@ -196,7 +196,7 @@ def test_remove_failed_http_error(web_client: WebClient, package_ahriman: Packag
"""
must suppress any exception happened during removal
"""
mocker.patch("requests.delete", side_effect=requests.exceptions.HTTPError())
mocker.patch("requests.Session.delete", side_effect=requests.exceptions.HTTPError())
web_client.remove(package_ahriman.base)
@ -204,7 +204,7 @@ def test_update(web_client: WebClient, package_ahriman: Package, mocker: MockerF
"""
must process package update
"""
requests_mock = mocker.patch("requests.post")
requests_mock = mocker.patch("requests.Session.post")
web_client.update(package_ahriman.base, BuildStatusEnum.Unknown)
requests_mock.assert_called_with(pytest.helpers.anyvar(str, True), json={"status": BuildStatusEnum.Unknown.value})
@ -214,7 +214,7 @@ def test_update_failed(web_client: WebClient, package_ahriman: Package, mocker:
"""
must suppress any exception happened during update
"""
mocker.patch("requests.post", side_effect=Exception())
mocker.patch("requests.Session.post", side_effect=Exception())
web_client.update(package_ahriman.base, BuildStatusEnum.Unknown)
@ -222,7 +222,7 @@ def test_update_failed_http_error(web_client: WebClient, package_ahriman: Packag
"""
must suppress any exception happened during update
"""
mocker.patch("requests.post", side_effect=requests.exceptions.HTTPError())
mocker.patch("requests.Session.post", side_effect=requests.exceptions.HTTPError())
web_client.update(package_ahriman.base, BuildStatusEnum.Unknown)
@ -230,7 +230,7 @@ def test_update_self(web_client: WebClient, mocker: MockerFixture) -> None:
"""
must process service update
"""
requests_mock = mocker.patch("requests.post")
requests_mock = mocker.patch("requests.Session.post")
web_client.update_self(BuildStatusEnum.Unknown)
requests_mock.assert_called_with(pytest.helpers.anyvar(str, True), json={"status": BuildStatusEnum.Unknown.value})
@ -240,7 +240,7 @@ def test_update_self_failed(web_client: WebClient, mocker: MockerFixture) -> Non
"""
must suppress any exception happened during service update
"""
mocker.patch("requests.post", side_effect=Exception())
mocker.patch("requests.Session.post", side_effect=Exception())
web_client.update_self(BuildStatusEnum.Unknown)
@ -248,5 +248,5 @@ def test_update_self_failed_http_error(web_client: WebClient, mocker: MockerFixt
"""
must suppress any exception happened during service update
"""
mocker.patch("requests.post", side_effect=requests.exceptions.HTTPError())
mocker.patch("requests.Session.post", side_effect=requests.exceptions.HTTPError())
web_client.update_self(BuildStatusEnum.Unknown)