mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-07-28 21:29:56 +00:00
PEP-585 complaint: remove type aliases (#93)
This commit is contained in:
@ -20,7 +20,7 @@
|
||||
import aiohttp_apispec # type: ignore
|
||||
|
||||
from aiohttp.web import Application
|
||||
from typing import Any, Dict, List
|
||||
from typing import Any
|
||||
|
||||
from ahriman import version
|
||||
from ahriman.core.configuration import Configuration
|
||||
@ -29,12 +29,12 @@ from ahriman.core.configuration import Configuration
|
||||
__all__ = ["setup_apispec"]
|
||||
|
||||
|
||||
def _info() -> Dict[str, Any]:
|
||||
def _info() -> dict[str, Any]:
|
||||
"""
|
||||
create info object for swagger docs
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: info object as per openapi specification
|
||||
dict[str, Any]: info object as per openapi specification
|
||||
"""
|
||||
return {
|
||||
"title": "ahriman",
|
||||
@ -62,12 +62,12 @@ def _info() -> Dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
def _security() -> List[Dict[str, Any]]:
|
||||
def _security() -> list[dict[str, Any]]:
|
||||
"""
|
||||
get security definitions
|
||||
|
||||
Returns:
|
||||
List[Dict[str, Any]]: generated security definition
|
||||
list[dict[str, Any]]: generated security definition
|
||||
"""
|
||||
return [{
|
||||
"token": {
|
||||
@ -78,7 +78,7 @@ def _security() -> List[Dict[str, Any]]:
|
||||
}]
|
||||
|
||||
|
||||
def _servers(application: Application) -> List[Dict[str, Any]]:
|
||||
def _servers(application: Application) -> list[dict[str, Any]]:
|
||||
"""
|
||||
get list of defined addresses for server
|
||||
|
||||
@ -86,7 +86,7 @@ def _servers(application: Application) -> List[Dict[str, Any]]:
|
||||
application(Application): web application instance
|
||||
|
||||
Returns:
|
||||
List[Dict[str, Any]]: list (actually only one) of defined web urls
|
||||
list[dict[str, Any]]: list (actually only one) of defined web urls
|
||||
"""
|
||||
configuration: Configuration = application["configuration"]
|
||||
address = configuration.get("web", "address", fallback=None)
|
||||
|
@ -18,7 +18,7 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from aiohttp.web import Request, StreamResponse
|
||||
from typing import Awaitable, Callable
|
||||
from collections.abc import Awaitable, Callable
|
||||
|
||||
|
||||
HandlerType = Callable[[Request], Awaitable[StreamResponse]]
|
||||
|
@ -25,7 +25,6 @@ from aiohttp.web import Application, Request, StaticResource, StreamResponse, mi
|
||||
from aiohttp_session import setup as setup_session
|
||||
from aiohttp_session.cookie_storage import EncryptedCookieStorage
|
||||
from cryptography import fernet
|
||||
from typing import Optional
|
||||
|
||||
from ahriman.core.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
@ -53,7 +52,7 @@ class _AuthorizationPolicy(aiohttp_security.AbstractAuthorizationPolicy):
|
||||
"""
|
||||
self.validator = validator
|
||||
|
||||
async def authorized_userid(self, identity: str) -> Optional[str]:
|
||||
async def authorized_userid(self, identity: str) -> str | None:
|
||||
"""
|
||||
retrieve authenticated username
|
||||
|
||||
@ -61,18 +60,18 @@ class _AuthorizationPolicy(aiohttp_security.AbstractAuthorizationPolicy):
|
||||
identity(str): username
|
||||
|
||||
Returns:
|
||||
Optional[str]: user identity (username) in case if user exists and None otherwise
|
||||
str | None: user identity (username) in case if user exists and None otherwise
|
||||
"""
|
||||
return identity if await self.validator.known_username(identity) else None
|
||||
|
||||
async def permits(self, identity: str, permission: UserAccess, context: Optional[str] = None) -> bool:
|
||||
async def permits(self, identity: str, permission: UserAccess, context: str | None = None) -> bool:
|
||||
"""
|
||||
check user permissions
|
||||
|
||||
Args:
|
||||
identity(str): username
|
||||
permission(UserAccess): requested permission level
|
||||
context(Optional[str], optional): URI request path (Default value = None)
|
||||
context(str | None, optional): URI request path (Default value = None)
|
||||
|
||||
Returns:
|
||||
bool: True in case if user is allowed to perform this request and False otherwise
|
||||
|
@ -19,7 +19,7 @@
|
||||
#
|
||||
import aiohttp_jinja2
|
||||
|
||||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.web.views.base import BaseView
|
||||
@ -36,11 +36,11 @@ class DocsView(BaseView):
|
||||
GET_PERMISSION = UserAccess.Unauthorized
|
||||
|
||||
@aiohttp_jinja2.template("api.jinja2")
|
||||
async def get(self) -> Dict[str, Any]:
|
||||
async def get(self) -> dict[str, Any]:
|
||||
"""
|
||||
return static docs html
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: parameters for jinja template
|
||||
dict[str, Any]: parameters for jinja template
|
||||
"""
|
||||
return {}
|
||||
|
@ -18,7 +18,7 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from aiohttp.web import Response, json_response
|
||||
from typing import Callable, Dict
|
||||
from collections.abc import Callable
|
||||
|
||||
from ahriman.core.util import partition
|
||||
from ahriman.models.user_access import UserAccess
|
||||
@ -43,7 +43,7 @@ class SwaggerView(BaseView):
|
||||
Response: 200 with json api specification
|
||||
"""
|
||||
spec = self.request.app["swagger_dict"]
|
||||
is_body_parameter: Callable[[Dict[str, str]], bool] = lambda p: p["in"] == "body"
|
||||
is_body_parameter: Callable[[dict[str, str]], bool] = lambda p: p["in"] == "body"
|
||||
|
||||
# special workaround because it writes request body to parameters section
|
||||
paths = spec["paths"]
|
||||
|
@ -21,7 +21,8 @@ from __future__ import annotations
|
||||
|
||||
from aiohttp_cors import CorsViewMixin # type: ignore
|
||||
from aiohttp.web import Request, StreamResponse, View
|
||||
from typing import Any, Awaitable, Callable, Dict, List, Optional, Type, TypeVar
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from ahriman.core.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
@ -29,7 +30,8 @@ from ahriman.core.spawn import Spawn
|
||||
from ahriman.core.status.watcher import Watcher
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
T = TypeVar("T", str, List[str])
|
||||
|
||||
T = TypeVar("T", str, list[str])
|
||||
|
||||
|
||||
class BaseView(View, CorsViewMixin):
|
||||
@ -87,7 +89,7 @@ class BaseView(View, CorsViewMixin):
|
||||
return validator
|
||||
|
||||
@classmethod
|
||||
async def get_permission(cls: Type[BaseView], request: Request) -> UserAccess:
|
||||
async def get_permission(cls: type[BaseView], request: Request) -> UserAccess:
|
||||
"""
|
||||
retrieve user permission from the request
|
||||
|
||||
@ -102,12 +104,12 @@ class BaseView(View, CorsViewMixin):
|
||||
return permission
|
||||
|
||||
@staticmethod
|
||||
def get_non_empty(extractor: Callable[[str], Optional[T]], key: str) -> T:
|
||||
def get_non_empty(extractor: Callable[[str], T | None], key: str) -> T:
|
||||
"""
|
||||
get non-empty value from request parameters
|
||||
|
||||
Args:
|
||||
extractor(Callable[[str], T]): function to get value by the specified key
|
||||
extractor(Callable[[str], T | None]): function to get value by the specified key
|
||||
key(str): key to extract value
|
||||
|
||||
Returns:
|
||||
@ -124,19 +126,19 @@ class BaseView(View, CorsViewMixin):
|
||||
raise KeyError(f"Key {key} is missing or empty")
|
||||
return value
|
||||
|
||||
async def data_as_json(self, list_keys: List[str]) -> Dict[str, Any]:
|
||||
async def data_as_json(self, list_keys: list[str]) -> dict[str, Any]:
|
||||
"""
|
||||
extract form data and convert it to json object
|
||||
|
||||
Args:
|
||||
list_keys(List[str]): list of keys which must be forced to list from form data
|
||||
list_keys(list[str]): list of keys which must be forced to list from form data
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: form data converted to json. In case if a key is found multiple times
|
||||
dict[str, Any]: form data converted to json. In case if a key is found multiple times
|
||||
it will be returned as list
|
||||
"""
|
||||
raw = await self.request.post()
|
||||
json: Dict[str, Any] = {}
|
||||
json: dict[str, Any] = {}
|
||||
for key, value in raw.items():
|
||||
if key in json and isinstance(json[key], list):
|
||||
json[key].append(value)
|
||||
@ -148,19 +150,19 @@ class BaseView(View, CorsViewMixin):
|
||||
json[key] = value
|
||||
return json
|
||||
|
||||
async def extract_data(self, list_keys: Optional[List[str]] = None) -> Dict[str, Any]:
|
||||
async def extract_data(self, list_keys: list[str] | None = None) -> dict[str, Any]:
|
||||
"""
|
||||
extract json data from either json or form data
|
||||
|
||||
Args:
|
||||
list_keys(Optional[List[str]], optional): optional list of keys which must be forced to list from form data
|
||||
list_keys(list[str] | None, optional): optional list of keys which must be forced to list from form data
|
||||
(Default value = None)
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: raw json object or form data converted to json
|
||||
dict[str, Any]: raw json object or form data converted to json
|
||||
"""
|
||||
try:
|
||||
json: Dict[str, Any] = await self.request.json()
|
||||
json: dict[str, Any] = await self.request.json()
|
||||
return json
|
||||
except ValueError:
|
||||
return await self.data_as_json(list_keys or [])
|
||||
@ -173,7 +175,7 @@ class BaseView(View, CorsViewMixin):
|
||||
Raises:
|
||||
HTTPMethodNotAllowed: in case if there is no GET method implemented
|
||||
"""
|
||||
get_method: Optional[Callable[..., Awaitable[StreamResponse]]] = getattr(self, "get", None)
|
||||
get_method: Callable[..., Awaitable[StreamResponse]] | None = getattr(self, "get", None)
|
||||
# using if/else in order to suppress mypy warning which doesn't know that
|
||||
# ``_raise_allowed_methods`` raises exception
|
||||
if get_method is not None:
|
||||
|
@ -19,7 +19,7 @@
|
||||
#
|
||||
import aiohttp_jinja2
|
||||
|
||||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
|
||||
from ahriman.core.auth.helpers import authorized_userid
|
||||
from ahriman.models.user_access import UserAccess
|
||||
@ -46,12 +46,12 @@ class IndexView(BaseView):
|
||||
GET_PERMISSION = UserAccess.Unauthorized
|
||||
|
||||
@aiohttp_jinja2.template("build-status.jinja2")
|
||||
async def get(self) -> Dict[str, Any]:
|
||||
async def get(self) -> dict[str, Any]:
|
||||
"""
|
||||
process get request. No parameters supported here
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: parameters for jinja template
|
||||
dict[str, Any]: parameters for jinja template
|
||||
"""
|
||||
auth_username = await authorized_userid(self.request)
|
||||
auth = {
|
||||
|
@ -20,7 +20,7 @@
|
||||
import aiohttp_apispec # type: ignore
|
||||
|
||||
from aiohttp.web import HTTPBadRequest, HTTPNotFound, Response, json_response
|
||||
from typing import Callable, List
|
||||
from collections.abc import Callable
|
||||
|
||||
from ahriman.core.alpm.remote import AUR
|
||||
from ahriman.models.aur_package import AURPackage
|
||||
@ -70,7 +70,7 @@ class SearchView(BaseView):
|
||||
HTTPNotFound: if no packages found
|
||||
"""
|
||||
try:
|
||||
search: List[str] = self.get_non_empty(lambda key: self.request.query.getall(key, default=[]), "for")
|
||||
search: list[str] = self.get_non_empty(lambda key: self.request.query.getall(key, default=[]), "for")
|
||||
packages = AUR.multisearch(*search, pacman=self.service.repository.pacman)
|
||||
except Exception as e:
|
||||
raise HTTPBadRequest(reason=str(e))
|
||||
|
@ -23,7 +23,6 @@ import logging
|
||||
import socket
|
||||
|
||||
from aiohttp.web import Application, normalize_path_middleware, run_app
|
||||
from typing import Optional
|
||||
|
||||
from ahriman.core.auth import Auth
|
||||
from ahriman.core.configuration import Configuration
|
||||
@ -41,7 +40,7 @@ from ahriman.web.routes import setup_routes
|
||||
__all__ = ["run_server", "setup_service"]
|
||||
|
||||
|
||||
def _create_socket(configuration: Configuration, application: Application) -> Optional[socket.socket]:
|
||||
def _create_socket(configuration: Configuration, application: Application) -> socket.socket | None:
|
||||
"""
|
||||
create unix socket based on configuration option
|
||||
|
||||
@ -50,7 +49,7 @@ def _create_socket(configuration: Configuration, application: Application) -> Op
|
||||
application(Application): web application instance
|
||||
|
||||
Returns:
|
||||
Optional[socket.socket]: unix socket object if set by option
|
||||
socket.socket | None: unix socket object if set by option
|
||||
"""
|
||||
unix_socket = configuration.getpath("web", "unix_socket", fallback=None)
|
||||
if unix_socket is None:
|
||||
|
Reference in New Issue
Block a user