mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-04-24 15:27:17 +00:00
type: update to the typed aiohttp release
This commit is contained in:
parent
18d17d4d52
commit
0991dbb59c
@ -20,7 +20,7 @@
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import aiohttp_security # type: ignore[import-untyped]
|
import aiohttp_security
|
||||||
_has_aiohttp_security = True
|
_has_aiohttp_security = True
|
||||||
except ImportError:
|
except ImportError:
|
||||||
_has_aiohttp_security = False
|
_has_aiohttp_security = False
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
#
|
#
|
||||||
import aiohttp_security # type: ignore[import-untyped]
|
import aiohttp_security
|
||||||
import socket
|
import socket
|
||||||
import types
|
import types
|
||||||
|
|
||||||
@ -25,6 +25,7 @@ from aiohttp.web import Application, Request, StaticResource, StreamResponse, mi
|
|||||||
from aiohttp_session import setup as setup_session
|
from aiohttp_session import setup as setup_session
|
||||||
from aiohttp_session.cookie_storage import EncryptedCookieStorage
|
from aiohttp_session.cookie_storage import EncryptedCookieStorage
|
||||||
from cryptography import fernet
|
from cryptography import fernet
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
from ahriman.core.auth import Auth
|
from ahriman.core.auth import Auth
|
||||||
from ahriman.core.configuration import Configuration
|
from ahriman.core.configuration import Configuration
|
||||||
@ -50,6 +51,7 @@ class _AuthorizationPolicy(aiohttp_security.AbstractAuthorizationPolicy):
|
|||||||
Args:
|
Args:
|
||||||
validator(Auth): authorization module instance
|
validator(Auth): authorization module instance
|
||||||
"""
|
"""
|
||||||
|
aiohttp_security.AbstractAuthorizationPolicy.__init__(self)
|
||||||
self.validator = validator
|
self.validator = validator
|
||||||
|
|
||||||
async def authorized_userid(self, identity: str) -> str | None:
|
async def authorized_userid(self, identity: str) -> str | None:
|
||||||
@ -64,18 +66,21 @@ class _AuthorizationPolicy(aiohttp_security.AbstractAuthorizationPolicy):
|
|||||||
"""
|
"""
|
||||||
return identity if await self.validator.known_username(identity) else None
|
return identity if await self.validator.known_username(identity) else None
|
||||||
|
|
||||||
async def permits(self, identity: str, permission: UserAccess, context: str | None = None) -> bool:
|
async def permits(self, identity: str | None, permission: str | Enum, context: str | None = None) -> bool:
|
||||||
"""
|
"""
|
||||||
check user permissions
|
check user permissions
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
identity(str): username
|
identity(str | None): username
|
||||||
permission(UserAccess): requested permission level
|
permission(str | Enum): requested permission level
|
||||||
context(str | None, optional): URI request path (Default value = None)
|
context(str | None, optional): URI request path (Default value = None)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
bool: True in case if user is allowed to perform this request and False otherwise
|
bool: True in case if user is allowed to perform this request and False otherwise
|
||||||
"""
|
"""
|
||||||
|
# some methods for type checking and parent class compatibility
|
||||||
|
if identity is None or not isinstance(permission, UserAccess):
|
||||||
|
return False # no identity provided or invalid access rights requested
|
||||||
return await self.validator.verify_access(identity, permission, context)
|
return await self.validator.verify_access(identity, permission, context)
|
||||||
|
|
||||||
|
|
||||||
|
@ -139,7 +139,7 @@ class BaseView(View, CorsViewMixin):
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
# pylint: disable=not-callable,protected-access
|
# pylint: disable=not-callable,protected-access
|
||||||
async def head(self) -> StreamResponse: # type: ignore[return]
|
async def head(self) -> StreamResponse:
|
||||||
"""
|
"""
|
||||||
HEAD method implementation based on the result of GET method
|
HEAD method implementation based on the result of GET method
|
||||||
|
|
||||||
|
@ -9,6 +9,7 @@ from unittest.mock import AsyncMock, call as MockCall
|
|||||||
|
|
||||||
from ahriman.core.auth import Auth
|
from ahriman.core.auth import Auth
|
||||||
from ahriman.core.configuration import Configuration
|
from ahriman.core.configuration import Configuration
|
||||||
|
from ahriman.models.build_status import BuildStatusEnum
|
||||||
from ahriman.models.user import User
|
from ahriman.models.user import User
|
||||||
from ahriman.models.user_access import UserAccess
|
from ahriman.models.user_access import UserAccess
|
||||||
from ahriman.web.middlewares.auth_handler import _AuthorizationPolicy, _auth_handler, _cookie_secret_key, setup_auth
|
from ahriman.web.middlewares.auth_handler import _AuthorizationPolicy, _auth_handler, _cookie_secret_key, setup_auth
|
||||||
@ -39,6 +40,9 @@ async def test_permits(authorization_policy: _AuthorizationPolicy, user: User) -
|
|||||||
|
|
||||||
assert await authorization_policy.permits(user.username, user.access, "/endpoint")
|
assert await authorization_policy.permits(user.username, user.access, "/endpoint")
|
||||||
assert not await authorization_policy.permits("somerandomname", user.access, "/endpoint")
|
assert not await authorization_policy.permits("somerandomname", user.access, "/endpoint")
|
||||||
|
assert not await authorization_policy.permits(None, user.access, "/endpoint")
|
||||||
|
assert not await authorization_policy.permits(user.username, "random", "/endpoint")
|
||||||
|
assert not await authorization_policy.permits(None, BuildStatusEnum.Building, "/endpoint")
|
||||||
|
|
||||||
authorization_policy.validator.verify_access.assert_has_calls([
|
authorization_policy.validator.verify_access.assert_has_calls([
|
||||||
MockCall(user.username, user.access, "/endpoint"),
|
MockCall(user.username, user.access, "/endpoint"),
|
||||||
|
Loading…
Reference in New Issue
Block a user