mirror of
				https://github.com/arcan1s/ahriman.git
				synced 2025-11-04 07:43:42 +00:00 
			
		
		
		
	define permissions in views directly
This commit is contained in:
		@ -24,8 +24,6 @@ Base authorization settings. `OAuth` provider requires `aioauth-client` library
 | 
			
		||||
 | 
			
		||||
* `target` - specifies authorization provider, string, optional, default `disabled`. Allowed values are `disabled`, `configuration`, `oauth`.
 | 
			
		||||
* `allow_read_only` - allow requesting read only pages without authorization, boolean, required.
 | 
			
		||||
* `allowed_paths` - URI paths (exact match) which can be accessed without authorization, space separated list of strings, optional.
 | 
			
		||||
* `allowed_paths_groups` - URI paths prefixes which can be accessed without authorization, space separated list of strings, optional.
 | 
			
		||||
* `client_id` - OAuth2 application client ID, string, required in case if `oauth` is used.
 | 
			
		||||
* `client_secret` - OAuth2 application client secret key, string, required in case if `oauth` is used.
 | 
			
		||||
* `max_age` - parameter which controls both cookie expiration and token expiration inside the service, integer, optional, default is 7 days.
 | 
			
		||||
 | 
			
		||||
@ -33,16 +33,11 @@ from ahriman.models.user_access import UserAccess
 | 
			
		||||
class Auth:
 | 
			
		||||
    """
 | 
			
		||||
    helper to deal with user authorization
 | 
			
		||||
    :ivar allowed_paths: URI paths which can be accessed without authorization
 | 
			
		||||
    :ivar allowed_paths_groups: URI paths prefixes which can be accessed without authorization
 | 
			
		||||
    :ivar allow_read_only: allow read only access to the index page
 | 
			
		||||
    :ivar enabled: indicates if authorization is enabled
 | 
			
		||||
    :cvar ALLOWED_PATHS: URI paths which can be accessed without authorization, predefined
 | 
			
		||||
    :cvar ALLOWED_PATHS_GROUPS: URI paths prefixes which can be accessed without authorization, predefined
 | 
			
		||||
    :ivar max_age: session age in seconds. It will be used for both client side and server side checks
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    ALLOWED_PATHS = {"/", "/index.html"}
 | 
			
		||||
    ALLOWED_PATHS_GROUPS = {"/static", "/user-api"}
 | 
			
		||||
 | 
			
		||||
    def __init__(self, configuration: Configuration, provider: AuthSettings = AuthSettings.Disabled) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        default constructor
 | 
			
		||||
@ -52,10 +47,7 @@ class Auth:
 | 
			
		||||
        self.logger = logging.getLogger("http")
 | 
			
		||||
 | 
			
		||||
        self.allow_read_only = configuration.getboolean("auth", "allow_read_only")
 | 
			
		||||
        self.allowed_paths = set(configuration.getlist("auth", "allowed_paths", fallback=[]))
 | 
			
		||||
        self.allowed_paths.update(self.ALLOWED_PATHS)
 | 
			
		||||
        self.allowed_paths_groups = set(configuration.getlist("auth", "allowed_paths_groups", fallback=[]))
 | 
			
		||||
        self.allowed_paths_groups.update(self.ALLOWED_PATHS_GROUPS)
 | 
			
		||||
 | 
			
		||||
        self.enabled = provider.is_enabled
 | 
			
		||||
        self.max_age = configuration.getint("auth", "max_age", fallback=7 * 24 * 3600)
 | 
			
		||||
 | 
			
		||||
@ -115,19 +107,6 @@ class Auth:
 | 
			
		||||
        del username, password
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    async def is_safe_request(self, uri: Optional[str], required: UserAccess) -> bool:
 | 
			
		||||
        """
 | 
			
		||||
        check if requested path are allowed without authorization
 | 
			
		||||
        :param uri: request uri
 | 
			
		||||
        :param required: required access level
 | 
			
		||||
        :return: True in case if this URI can be requested without authorization and False otherwise
 | 
			
		||||
        """
 | 
			
		||||
        if required == UserAccess.Read and self.allow_read_only:
 | 
			
		||||
            return True  # in case if read right requested and allowed in options
 | 
			
		||||
        if not uri:
 | 
			
		||||
            return False  # request without context is not allowed
 | 
			
		||||
        return uri in self.allowed_paths or any(uri.startswith(path) for path in self.allowed_paths_groups)
 | 
			
		||||
 | 
			
		||||
    async def known_username(self, username: Optional[str]) -> bool:  # pylint: disable=no-self-use
 | 
			
		||||
        """
 | 
			
		||||
        check if user is known
 | 
			
		||||
 | 
			
		||||
@ -23,9 +23,11 @@ from enum import Enum
 | 
			
		||||
class UserAccess(Enum):
 | 
			
		||||
    """
 | 
			
		||||
    web user access enumeration
 | 
			
		||||
    :cvar Read: user can read status page
 | 
			
		||||
    :cvar Safe: user can access the page without authorization, should not be user for user configuration
 | 
			
		||||
    :cvar Read: user can read the page
 | 
			
		||||
    :cvar Write: user can modify task and package list
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    Safe = "safe"
 | 
			
		||||
    Read = "read"
 | 
			
		||||
    Write = "write"
 | 
			
		||||
 | 
			
		||||
@ -19,10 +19,12 @@
 | 
			
		||||
#
 | 
			
		||||
import aiohttp_security  # type: ignore
 | 
			
		||||
import base64
 | 
			
		||||
import types
 | 
			
		||||
 | 
			
		||||
from aiohttp import web
 | 
			
		||||
from aiohttp.web import middleware, Request
 | 
			
		||||
from aiohttp.web_response import StreamResponse
 | 
			
		||||
from aiohttp.web_urldispatcher import StaticResource
 | 
			
		||||
from aiohttp_session import setup as setup_session  # type: ignore
 | 
			
		||||
from aiohttp_session.cookie_storage import EncryptedCookieStorage  # type: ignore
 | 
			
		||||
from cryptography import fernet
 | 
			
		||||
@ -72,20 +74,22 @@ class AuthorizationPolicy(aiohttp_security.AbstractAuthorizationPolicy):  # type
 | 
			
		||||
        return await self.validator.verify_access(user.username, permission, context)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def auth_handler(validator: Auth) -> MiddlewareType:
 | 
			
		||||
def auth_handler() -> MiddlewareType:
 | 
			
		||||
    """
 | 
			
		||||
    authorization and authentication middleware
 | 
			
		||||
    :param validator: authorization module instance
 | 
			
		||||
    :return: built middleware
 | 
			
		||||
    """
 | 
			
		||||
    @middleware
 | 
			
		||||
    async def handle(request: Request, handler: HandlerType) -> StreamResponse:
 | 
			
		||||
        if request.method in ("GET", "HEAD", "OPTIONS"):
 | 
			
		||||
            permission = UserAccess.Read
 | 
			
		||||
        permission_method = getattr(handler, "get_permission", None)
 | 
			
		||||
        if permission_method is not None:
 | 
			
		||||
            permission = await permission_method(request)
 | 
			
		||||
        elif isinstance(handler, types.MethodType):  # additional wrapper for static resources
 | 
			
		||||
            handler_instance = getattr(handler, "__self__", None)
 | 
			
		||||
            permission = UserAccess.Safe if isinstance(handler_instance, StaticResource) else UserAccess.Write
 | 
			
		||||
        else:
 | 
			
		||||
            permission = UserAccess.Write
 | 
			
		||||
 | 
			
		||||
        if not await validator.is_safe_request(request.path, permission):
 | 
			
		||||
        if permission != UserAccess.Safe:
 | 
			
		||||
            await aiohttp_security.check_permission(request, permission, request.path)
 | 
			
		||||
 | 
			
		||||
        return await handler(request)
 | 
			
		||||
@ -109,6 +113,6 @@ def setup_auth(application: web.Application, validator: Auth) -> web.Application
 | 
			
		||||
    identity_policy = aiohttp_security.SessionIdentityPolicy()
 | 
			
		||||
 | 
			
		||||
    aiohttp_security.setup(application, identity_policy, authorization_policy)
 | 
			
		||||
    application.middlewares.append(auth_handler(validator))
 | 
			
		||||
    application.middlewares.append(auth_handler())
 | 
			
		||||
 | 
			
		||||
    return application
 | 
			
		||||
 | 
			
		||||
@ -17,13 +17,16 @@
 | 
			
		||||
# You should have received a copy of the GNU General Public License
 | 
			
		||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
#
 | 
			
		||||
from aiohttp.web import View
 | 
			
		||||
from typing import Any, Dict, List, Optional
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
 | 
			
		||||
from aiohttp.web import Request, View
 | 
			
		||||
from typing import Any, Dict, List, Optional, Type
 | 
			
		||||
 | 
			
		||||
from ahriman.core.auth.auth import Auth
 | 
			
		||||
from ahriman.core.configuration import Configuration
 | 
			
		||||
from ahriman.core.spawn import Spawn
 | 
			
		||||
from ahriman.core.status.watcher import Watcher
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class BaseView(View):
 | 
			
		||||
@ -63,6 +66,16 @@ class BaseView(View):
 | 
			
		||||
        validator: Auth = self.request.app["validator"]
 | 
			
		||||
        return validator
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    async def get_permission(cls: Type[BaseView], request: Request) -> UserAccess:
 | 
			
		||||
        """
 | 
			
		||||
        retrieve user permission from the request
 | 
			
		||||
        :param request: request object
 | 
			
		||||
        :return: extracted permission
 | 
			
		||||
        """
 | 
			
		||||
        permission: UserAccess = getattr(cls, f"{request.method.upper()}_PERMISSION", UserAccess.Write)
 | 
			
		||||
        return permission
 | 
			
		||||
 | 
			
		||||
    async def extract_data(self, list_keys: Optional[List[str]] = None) -> Dict[str, Any]:
 | 
			
		||||
        """
 | 
			
		||||
        extract json data from either json or form data
 | 
			
		||||
 | 
			
		||||
@ -24,12 +24,15 @@ from typing import Any, Dict
 | 
			
		||||
from ahriman import version
 | 
			
		||||
from ahriman.core.auth.helpers import authorized_userid
 | 
			
		||||
from ahriman.core.util import pretty_datetime
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IndexView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    root view
 | 
			
		||||
    :cvar GET_PERMISSION: get permissions of self
 | 
			
		||||
    :cvar HEAD_PERMISSION: head permissions of self
 | 
			
		||||
 | 
			
		||||
    It uses jinja2 templates for report generation, the following variables are allowed:
 | 
			
		||||
 | 
			
		||||
@ -58,6 +61,8 @@ class IndexView(BaseView):
 | 
			
		||||
        version - ahriman version, string, required
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    GET_PERMISSION = HEAD_PERMISSION = UserAccess.Safe
 | 
			
		||||
 | 
			
		||||
    @aiohttp_jinja2.template("build-status.jinja2")
 | 
			
		||||
    async def get(self) -> Dict[str, Any]:
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
@ -19,14 +19,18 @@
 | 
			
		||||
#
 | 
			
		||||
from aiohttp.web import HTTPFound, Response, json_response
 | 
			
		||||
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AddView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    add package web view
 | 
			
		||||
    :cvar POST_PERMISSION: post permissions of self
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    POST_PERMISSION = UserAccess.Write
 | 
			
		||||
 | 
			
		||||
    async def post(self) -> Response:
 | 
			
		||||
        """
 | 
			
		||||
        add new package
 | 
			
		||||
 | 
			
		||||
@ -17,18 +17,21 @@
 | 
			
		||||
# You should have received a copy of the GNU General Public License
 | 
			
		||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
#
 | 
			
		||||
from aiohttp.web import Response
 | 
			
		||||
from aiohttp.web_exceptions import HTTPNoContent
 | 
			
		||||
from aiohttp.web import HTTPNoContent, Response
 | 
			
		||||
 | 
			
		||||
from ahriman.core.auth.auth import Auth
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ReloadAuthView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    reload authentication module web view
 | 
			
		||||
    :cvar POST_PERMISSION: post permissions of self
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    POST_PERMISSION = UserAccess.Write
 | 
			
		||||
 | 
			
		||||
    async def post(self) -> Response:
 | 
			
		||||
        """
 | 
			
		||||
        reload authentication module. No parameters supported here
 | 
			
		||||
 | 
			
		||||
@ -19,14 +19,18 @@
 | 
			
		||||
#
 | 
			
		||||
from aiohttp.web import HTTPFound, Response, json_response
 | 
			
		||||
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RemoveView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    remove package web view
 | 
			
		||||
    :cvar POST_PERMISSION: post permissions of self
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    POST_PERMISSION = UserAccess.Write
 | 
			
		||||
 | 
			
		||||
    async def post(self) -> Response:
 | 
			
		||||
        """
 | 
			
		||||
        remove existing packages
 | 
			
		||||
 | 
			
		||||
@ -22,14 +22,19 @@ import aur  # type: ignore
 | 
			
		||||
from aiohttp.web import Response, json_response
 | 
			
		||||
from typing import Callable, Iterator
 | 
			
		||||
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SearchView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    AUR search web view
 | 
			
		||||
    :cvar GET_PERMISSION: get permissions of self
 | 
			
		||||
    :cvar HEAD_PERMISSION: head permissions of self
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    GET_PERMISSION = HEAD_PERMISSION = UserAccess.Read
 | 
			
		||||
 | 
			
		||||
    async def get(self) -> Response:
 | 
			
		||||
        """
 | 
			
		||||
        search packages in AUR
 | 
			
		||||
 | 
			
		||||
@ -20,14 +20,21 @@
 | 
			
		||||
from aiohttp.web import HTTPNoContent, Response, json_response
 | 
			
		||||
 | 
			
		||||
from ahriman.models.build_status import BuildStatusEnum
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AhrimanView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    service status web view
 | 
			
		||||
    :cvar GET_PERMISSION: get permissions of self
 | 
			
		||||
    :cvar HEAD_PERMISSION: head permissions of self
 | 
			
		||||
    :cvar POST_PERMISSION: post permissions of self
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    GET_PERMISSION = HEAD_PERMISSION = UserAccess.Read
 | 
			
		||||
    POST_PERMISSION = UserAccess.Write
 | 
			
		||||
 | 
			
		||||
    async def get(self) -> Response:
 | 
			
		||||
        """
 | 
			
		||||
        get current service status
 | 
			
		||||
 | 
			
		||||
@ -22,14 +22,22 @@ from aiohttp.web import HTTPNoContent, HTTPNotFound, Response, json_response
 | 
			
		||||
from ahriman.core.exceptions import UnknownPackage
 | 
			
		||||
from ahriman.models.build_status import BuildStatusEnum
 | 
			
		||||
from ahriman.models.package import Package
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PackageView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    package base specific web view
 | 
			
		||||
    :cvar DELETE_PERMISSION: delete permissions of self
 | 
			
		||||
    :cvar GET_PERMISSION: get permissions of self
 | 
			
		||||
    :cvar HEAD_PERMISSION: head permissions of self
 | 
			
		||||
    :cvar POST_PERMISSION: post permissions of self
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    DELETE_PERMISSION = POST_PERMISSION = UserAccess.Write
 | 
			
		||||
    GET_PERMISSION = HEAD_PERMISSION = UserAccess.Read
 | 
			
		||||
 | 
			
		||||
    async def get(self) -> Response:
 | 
			
		||||
        """
 | 
			
		||||
        get current package base status
 | 
			
		||||
 | 
			
		||||
@ -19,14 +19,21 @@
 | 
			
		||||
#
 | 
			
		||||
from aiohttp.web import HTTPNoContent, Response, json_response
 | 
			
		||||
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PackagesView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    global watcher view
 | 
			
		||||
    :cvar GET_PERMISSION: get permissions of self
 | 
			
		||||
    :cvar HEAD_PERMISSION: head permissions of self
 | 
			
		||||
    :cvar POST_PERMISSION: post permissions of self
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    GET_PERMISSION = HEAD_PERMISSION = UserAccess.Read
 | 
			
		||||
    POST_PERMISSION = UserAccess.Write
 | 
			
		||||
 | 
			
		||||
    async def get(self) -> Response:
 | 
			
		||||
        """
 | 
			
		||||
        get current packages status
 | 
			
		||||
 | 
			
		||||
@ -22,14 +22,19 @@ from aiohttp.web import Response, json_response
 | 
			
		||||
from ahriman import version
 | 
			
		||||
from ahriman.models.counters import Counters
 | 
			
		||||
from ahriman.models.internal_status import InternalStatus
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class StatusView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    web service status web view
 | 
			
		||||
    :cvar GET_PERMISSION: get permissions of self
 | 
			
		||||
    :cvar HEAD_PERMISSION: head permissions of self
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    GET_PERMISSION = HEAD_PERMISSION = UserAccess.Read
 | 
			
		||||
 | 
			
		||||
    async def get(self) -> Response:
 | 
			
		||||
        """
 | 
			
		||||
        get current service status
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,7 @@
 | 
			
		||||
from aiohttp.web import HTTPFound, HTTPMethodNotAllowed, HTTPUnauthorized, Response
 | 
			
		||||
 | 
			
		||||
from ahriman.core.auth.helpers import remember
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.models.user_identity import UserIdentity
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
@ -27,8 +28,12 @@ from ahriman.web.views.base import BaseView
 | 
			
		||||
class LoginView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    login endpoint view
 | 
			
		||||
    :cvar GET_PERMISSION: get permissions of self
 | 
			
		||||
    :cvar POST_PERMISSION: post permissions of self
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    GET_PERMISSION = POST_PERMISSION = UserAccess.Safe
 | 
			
		||||
 | 
			
		||||
    async def get(self) -> Response:
 | 
			
		||||
        """
 | 
			
		||||
        OAuth2 response handler
 | 
			
		||||
 | 
			
		||||
@ -20,14 +20,18 @@
 | 
			
		||||
from aiohttp.web import HTTPFound, Response
 | 
			
		||||
 | 
			
		||||
from ahriman.core.auth.helpers import check_authorized, forget
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.base import BaseView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class LogoutView(BaseView):
 | 
			
		||||
    """
 | 
			
		||||
    logout endpoint view
 | 
			
		||||
    :cvar POST_PERMISSION: post permissions of self
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    POST_PERMISSION = UserAccess.Safe
 | 
			
		||||
 | 
			
		||||
    async def post(self) -> Response:
 | 
			
		||||
        """
 | 
			
		||||
        logout user from the service. No parameters supported here
 | 
			
		||||
 | 
			
		||||
@ -109,40 +109,6 @@ async def test_check_credentials(auth: Auth, user: User) -> None:
 | 
			
		||||
    assert await auth.check_credentials(None, None)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_is_safe_request(auth: Auth) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must validate safe request
 | 
			
		||||
    """
 | 
			
		||||
    # login and logout are always safe
 | 
			
		||||
    assert await auth.is_safe_request("/user-api/v1/login", UserAccess.Write)
 | 
			
		||||
    assert await auth.is_safe_request("/user-api/v1/logout", UserAccess.Write)
 | 
			
		||||
 | 
			
		||||
    auth.allowed_paths.add("/safe")
 | 
			
		||||
    auth.allowed_paths_groups.add("/unsafe/safe")
 | 
			
		||||
 | 
			
		||||
    assert await auth.is_safe_request("/safe", UserAccess.Write)
 | 
			
		||||
    assert not await auth.is_safe_request("/unsafe", UserAccess.Write)
 | 
			
		||||
    assert await auth.is_safe_request("/unsafe/safe", UserAccess.Write)
 | 
			
		||||
    assert await auth.is_safe_request("/unsafe/safe/suffix", UserAccess.Write)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_is_safe_request_empty(auth: Auth) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must not allow requests without path
 | 
			
		||||
    """
 | 
			
		||||
    assert not await auth.is_safe_request(None, UserAccess.Read)
 | 
			
		||||
    assert not await auth.is_safe_request("", UserAccess.Read)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_is_safe_request_read_only(auth: Auth) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must allow read-only requests if it is set in settings
 | 
			
		||||
    """
 | 
			
		||||
    assert await auth.is_safe_request("/", UserAccess.Read)
 | 
			
		||||
    auth.allow_read_only = True
 | 
			
		||||
    assert await auth.is_safe_request("/unsafe", UserAccess.Read)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_known_username(auth: Auth, user: User) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must allow any username
 | 
			
		||||
 | 
			
		||||
@ -43,60 +43,74 @@ async def test_permits(authorization_policy: AuthorizationPolicy, user: User) ->
 | 
			
		||||
    assert not await authorization_policy.permits(user.username, user.access, "/endpoint")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_auth_handler_api(auth: Auth, mocker: MockerFixture) -> None:
 | 
			
		||||
async def test_auth_handler_api(mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must ask for status permission for api calls
 | 
			
		||||
    """
 | 
			
		||||
    aiohttp_request = pytest.helpers.request("", "/status-api", "GET")
 | 
			
		||||
    request_handler = AsyncMock()
 | 
			
		||||
    mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
 | 
			
		||||
    request_handler.get_permission.return_value = UserAccess.Read
 | 
			
		||||
    check_permission_mock = mocker.patch("aiohttp_security.check_permission")
 | 
			
		||||
 | 
			
		||||
    handler = auth_handler(auth)
 | 
			
		||||
    handler = auth_handler()
 | 
			
		||||
    await handler(aiohttp_request, request_handler)
 | 
			
		||||
    check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Read, aiohttp_request.path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_auth_handler_api_post(auth: Auth, mocker: MockerFixture) -> None:
 | 
			
		||||
async def test_auth_handler_api_no_method(mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must ask for write permission if handler does not have get_permission method
 | 
			
		||||
    """
 | 
			
		||||
    aiohttp_request = pytest.helpers.request("", "/status-api", "GET")
 | 
			
		||||
    request_handler = AsyncMock()
 | 
			
		||||
    request_handler.get_permission = None
 | 
			
		||||
    check_permission_mock = mocker.patch("aiohttp_security.check_permission")
 | 
			
		||||
 | 
			
		||||
    handler = auth_handler()
 | 
			
		||||
    await handler(aiohttp_request, request_handler)
 | 
			
		||||
    check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Write, aiohttp_request.path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_auth_handler_api_post(mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must ask for status permission for api calls with POST
 | 
			
		||||
    """
 | 
			
		||||
    aiohttp_request = pytest.helpers.request("", "/status-api", "POST")
 | 
			
		||||
    request_handler = AsyncMock()
 | 
			
		||||
    mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
 | 
			
		||||
    request_handler.get_permission.return_value = UserAccess.Write
 | 
			
		||||
    check_permission_mock = mocker.patch("aiohttp_security.check_permission")
 | 
			
		||||
 | 
			
		||||
    handler = auth_handler(auth)
 | 
			
		||||
    handler = auth_handler()
 | 
			
		||||
    await handler(aiohttp_request, request_handler)
 | 
			
		||||
    check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Write, aiohttp_request.path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_auth_handler_read(auth: Auth, mocker: MockerFixture) -> None:
 | 
			
		||||
async def test_auth_handler_read(mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must ask for read permission for api calls with GET
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("GET", "HEAD", "OPTIONS"):
 | 
			
		||||
        aiohttp_request = pytest.helpers.request("", "", method)
 | 
			
		||||
        request_handler = AsyncMock()
 | 
			
		||||
        mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
 | 
			
		||||
        request_handler.get_permission.return_value = UserAccess.Read
 | 
			
		||||
        check_permission_mock = mocker.patch("aiohttp_security.check_permission")
 | 
			
		||||
 | 
			
		||||
        handler = auth_handler(auth)
 | 
			
		||||
        handler = auth_handler()
 | 
			
		||||
        await handler(aiohttp_request, request_handler)
 | 
			
		||||
        check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Read, aiohttp_request.path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_auth_handler_write(auth: Auth, mocker: MockerFixture) -> None:
 | 
			
		||||
async def test_auth_handler_write(mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must ask for read permission for api calls with POST
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("CONNECT", "DELETE", "PATCH", "POST", "PUT", "TRACE"):
 | 
			
		||||
        aiohttp_request = pytest.helpers.request("", "", method)
 | 
			
		||||
        request_handler = AsyncMock()
 | 
			
		||||
        mocker.patch("ahriman.core.auth.auth.Auth.is_safe_request", return_value=False)
 | 
			
		||||
        request_handler.get_permission.return_value = UserAccess.Write
 | 
			
		||||
        check_permission_mock = mocker.patch("aiohttp_security.check_permission")
 | 
			
		||||
 | 
			
		||||
        handler = auth_handler(auth)
 | 
			
		||||
        handler = auth_handler()
 | 
			
		||||
        await handler(aiohttp_request, request_handler)
 | 
			
		||||
        check_permission_mock.assert_called_with(aiohttp_request, UserAccess.Write, aiohttp_request.path)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,20 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from aiohttp.test_utils import TestClient
 | 
			
		||||
from pytest_mock import MockerFixture
 | 
			
		||||
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.service.add import AddView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("POST",):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await AddView.get_permission(request) == UserAccess.Write
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_post(client: TestClient, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,20 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from aiohttp.test_utils import TestClient
 | 
			
		||||
from pytest_mock import MockerFixture
 | 
			
		||||
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.service.reload_auth import ReloadAuthView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("POST",):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await ReloadAuthView.get_permission(request) == UserAccess.Write
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_post(client_with_auth: TestClient, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,20 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from aiohttp.test_utils import TestClient
 | 
			
		||||
from pytest_mock import MockerFixture
 | 
			
		||||
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.service.remove import RemoveView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("POST",):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await RemoveView.get_permission(request) == UserAccess.Write
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_post(client: TestClient, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
@ -1,8 +1,21 @@
 | 
			
		||||
import aur
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from aiohttp.test_utils import TestClient
 | 
			
		||||
from pytest_mock import MockerFixture
 | 
			
		||||
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.service.search import SearchView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("GET", "HEAD"):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await SearchView.get_permission(request) == UserAccess.Read
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get(client: TestClient, aur_package_ahriman: aur.Package, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
@ -1,7 +1,23 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from aiohttp.test_utils import TestClient
 | 
			
		||||
from pytest_mock import MockerFixture
 | 
			
		||||
 | 
			
		||||
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.status.ahriman import AhrimanView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("GET", "HEAD"):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await AhrimanView.get_permission(request) == UserAccess.Read
 | 
			
		||||
    for method in ("POST",):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await AhrimanView.get_permission(request) == UserAccess.Write
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get(client: TestClient) -> None:
 | 
			
		||||
 | 
			
		||||
@ -1,7 +1,23 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from pytest_aiohttp import TestClient
 | 
			
		||||
 | 
			
		||||
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
 | 
			
		||||
from ahriman.models.package import Package
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.status.package import PackageView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("GET", "HEAD"):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await PackageView.get_permission(request) == UserAccess.Read
 | 
			
		||||
    for method in ("DELETE", "POST"):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await PackageView.get_permission(request) == UserAccess.Write
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get(client: TestClient, package_ahriman: Package, package_python_schedule: Package) -> None:
 | 
			
		||||
 | 
			
		||||
@ -1,8 +1,24 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from pytest_aiohttp import TestClient
 | 
			
		||||
from pytest_mock import MockerFixture
 | 
			
		||||
 | 
			
		||||
from ahriman.models.build_status import BuildStatusEnum
 | 
			
		||||
from ahriman.models.package import Package
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.status.packages import PackagesView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("GET", "HEAD"):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await PackagesView.get_permission(request) == UserAccess.Read
 | 
			
		||||
    for method in ("POST",):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await PackagesView.get_permission(request) == UserAccess.Write
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get(client: TestClient, package_ahriman: Package, package_python_schedule: Package) -> None:
 | 
			
		||||
 | 
			
		||||
@ -1,9 +1,22 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from pytest_aiohttp import TestClient
 | 
			
		||||
 | 
			
		||||
import ahriman.version as version
 | 
			
		||||
 | 
			
		||||
from ahriman.models.build_status import BuildStatusEnum
 | 
			
		||||
from ahriman.models.package import Package
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.status.status import StatusView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("GET", "HEAD"):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await StatusView.get_permission(request) == UserAccess.Read
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get(client: TestClient, package_ahriman: Package) -> None:
 | 
			
		||||
 | 
			
		||||
@ -33,6 +33,16 @@ def test_validator(base: BaseView) -> None:
 | 
			
		||||
    assert base.validator
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission(base: BaseView) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must search for permission attribute in class
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("DELETE", "GET", "HEAD", "POST"):
 | 
			
		||||
        request = pytest.helpers.request(base.request.app, "", method)
 | 
			
		||||
        setattr(BaseView, f"{method.upper()}_PERMISSION", "permission")
 | 
			
		||||
        assert await base.get_permission(request) == "permission"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_extract_data_json(base: BaseView) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must parse and return json
 | 
			
		||||
 | 
			
		||||
@ -1,5 +1,19 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from pytest_aiohttp import TestClient
 | 
			
		||||
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.index import IndexView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("GET", "HEAD"):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await IndexView.get_permission(request) == UserAccess.Safe
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get(client_with_auth: TestClient) -> None:
 | 
			
		||||
    """
 | 
			
		||||
@ -34,3 +48,11 @@ async def test_get_static(client: TestClient) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    response = await client.get("/static/favicon.ico")
 | 
			
		||||
    assert response.ok
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_static_with_auth(client_with_auth: TestClient) -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return static files
 | 
			
		||||
    """
 | 
			
		||||
    response = await client_with_auth.get("/static/favicon.ico")
 | 
			
		||||
    assert response.ok
 | 
			
		||||
 | 
			
		||||
@ -1,9 +1,21 @@
 | 
			
		||||
import pytest
 | 
			
		||||
from aiohttp.test_utils import TestClient
 | 
			
		||||
from pytest_mock import MockerFixture
 | 
			
		||||
from unittest.mock import MagicMock
 | 
			
		||||
 | 
			
		||||
from ahriman.core.auth.oauth import OAuth
 | 
			
		||||
from ahriman.models.user import User
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.user.login import LoginView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("GET", "POST"):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await LoginView.get_permission(request) == UserAccess.Safe
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_default_validator(client_with_auth: TestClient) -> None:
 | 
			
		||||
 | 
			
		||||
@ -1,7 +1,21 @@
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from aiohttp.test_utils import TestClient
 | 
			
		||||
from aiohttp.web import HTTPUnauthorized
 | 
			
		||||
from pytest_mock import MockerFixture
 | 
			
		||||
 | 
			
		||||
from ahriman.models.user_access import UserAccess
 | 
			
		||||
from ahriman.web.views.user.logout import LogoutView
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_get_permission() -> None:
 | 
			
		||||
    """
 | 
			
		||||
    must return correct permission for the request
 | 
			
		||||
    """
 | 
			
		||||
    for method in ("POST",):
 | 
			
		||||
        request = pytest.helpers.request("", "", method)
 | 
			
		||||
        assert await LogoutView.get_permission(request) == UserAccess.Safe
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def test_post(client_with_auth: TestClient, mocker: MockerFixture) -> None:
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user