Compare commits

...

3 Commits

3 changed files with 212 additions and 33 deletions

View File

@ -0,0 +1,183 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
from collections import defaultdict
from collections.abc import Iterator
from dataclasses import dataclass
from enum import ReprEnum
from types import SimpleNamespace
from typing import Generator, IO, Self
from ahriman.models.pkgbuild_patch import PkgbuildPatch
class PkgbuildToken(bytes, ReprEnum):
Comment = b"#"
Assignment = b"="
SingleQuote = b"'"
DoubleQuote = b"\""
Space = b" "
NewLine = b"\n"
ParenthesisOpen = b"("
ParenthesisClose = b")"
FunctionStarts = b"function"
FunctionDeclaration = b"()"
BraceOpen = b"{"
BraceClose = b"}"
@dataclass
class PkgbuildWord:
word: bytes
quote: bytes | None
@property
def closing(self) -> PkgbuildToken | None:
if self.quote:
return None
match self.word:
case PkgbuildToken.ParenthesisOpen:
return PkgbuildToken.ParenthesisClose
case PkgbuildToken.BraceOpen:
return PkgbuildToken.BraceClose
return None
@property
def original(self) -> bytes:
quote = self.quote or b""
return quote + self.word + quote
def __bool__(self) -> bool:
return bool(self.original)
class BytesPkgbuildParser(Iterator[PkgbuildPatch]):
def __init__(self, stream: IO[bytes]) -> None:
self._io = stream
def _next(self, *, declaration: bool) -> bytes:
while not (token := self._next_token(declaration=declaration)):
continue
return token
def _next_token(self, *, declaration: bool) -> bytes:
buffer = b""
while word := self._next_word():
match word:
case PkgbuildWord(PkgbuildToken.Comment, None):
self._io.readline()
case PkgbuildWord(PkgbuildToken.NewLine, None):
if declaration:
buffer = b""
return buffer
case PkgbuildWord(PkgbuildToken.Assignment, None) if declaration:
return buffer
case PkgbuildWord(PkgbuildToken.Space, None) if declaration:
if buffer.endswith(PkgbuildToken.FunctionDeclaration):
return buffer
buffer = b""
continue
case PkgbuildWord(PkgbuildToken.Space, None):
return buffer
case PkgbuildWord(PkgbuildToken.ParenthesisOpen, None):
buffer += PkgbuildToken.ParenthesisOpen
buffer += b"".join(self._next_words_until(PkgbuildWord(PkgbuildToken.ParenthesisClose, None)))
case PkgbuildWord(PkgbuildToken.BraceOpen, None):
buffer += PkgbuildToken.BraceOpen
buffer += b"".join(self._next_words_until(PkgbuildWord(PkgbuildToken.BraceClose, None)))
case PkgbuildWord(token, _):
buffer += token
raise StopIteration
def _next_word(self) -> PkgbuildWord:
# pass SimpleNamespace as an argument to implement side effects
def generator(quote: SimpleNamespace) -> Generator[bytes, None, None]:
while token := self._io.read(1):
match token:
case (PkgbuildToken.SingleQuote | PkgbuildToken.DoubleQuote) if quote.open is None:
quote.open = token
case closing_quote if closing_quote == quote.open:
return
case part:
yield part
if quote.open is None:
return
if quote.open is not None:
raise ValueError("No closing quotation")
open_quote = SimpleNamespace(open=None)
value = b"".join(generator(open_quote))
return PkgbuildWord(value, open_quote.open)
def _next_words_until(self, ending: PkgbuildWord) -> Generator[bytes, None, None]:
braces = defaultdict(int)
while element := self._next_word():
yield element.original
match element:
case PkgbuildWord(token, None) if braces[token] > 0:
braces[token] -= 1
case with_closure if (closing := with_closure.closing) is not None:
braces[closing] += 1
case _ if element == ending:
return
if any(brace for brace in braces.values() if brace > 0):
raise ValueError("Unclosed parenthesis and/or braces found")
raise ValueError(f"No matching ending element {ending.word} found")
def parse(self) -> Generator[PkgbuildPatch, None, None]:
"""
parse source stream and yield parsed entries
Yields:
PkgbuildPatch: extracted a PKGBUILD node
"""
yield from self
def __iter__(self) -> Self:
"""
base iterator method
Returns:
Self: iterator instance
"""
return self
def __next__(self) -> PkgbuildPatch:
key = self._next(declaration=True)
value = self._next(declaration=False)
return PkgbuildPatch(key.decode(encoding="utf8"), value.decode(encoding="utf8"))

View File

@ -23,7 +23,7 @@ import sys
from collections.abc import Generator, Mapping, MutableMapping
from string import Template
from typing import ClassVar
from typing import Any, ClassVar
from ahriman.core.configuration.shell_template import ShellTemplate
@ -85,7 +85,7 @@ class ShellInterpolator(configparser.Interpolation):
"prefix": sys.prefix,
}
def before_get(self, parser: MutableMapping[str, Mapping[str, str]], section: str, option: str, value: str,
def before_get(self, parser: MutableMapping[str, Mapping[str, str]], section: Any, option: Any, value: str,
defaults: Mapping[str, str]) -> str:
"""
interpolate option value
@ -100,8 +100,8 @@ class ShellInterpolator(configparser.Interpolation):
Args:
parser(MutableMapping[str, Mapping[str, str]]): option parser
section(str): section name
option(str): option name
section(Any): section name
option(Any): option name
value(str): source (not-converted) value
defaults(Mapping[str, str]): default values

View File

@ -1,8 +1,8 @@
import pytest
import pytest_asyncio
from aiohttp.test_utils import TestClient
from aiohttp.web import Application, Resource, UrlMappingMatchInfo
from asyncio import BaseEventLoop
from collections.abc import Awaitable, Callable
from marshmallow import Schema
from pytest_mock import MockerFixture
@ -164,15 +164,13 @@ def application_with_auth(configuration: Configuration, user: User, spawner: Spa
return application
@pytest.fixture
def client(application: Application, event_loop: BaseEventLoop, aiohttp_client: Any,
mocker: MockerFixture) -> TestClient:
@pytest_asyncio.fixture
async def client(application: Application, aiohttp_client: Any, mocker: MockerFixture) -> TestClient:
"""
web client fixture
Args:
application(Application): application fixture
event_loop(BaseEventLoop): context event loop
aiohttp_client(Any): aiohttp client fixture
mocker(MockerFixture): mocker object
@ -180,37 +178,35 @@ def client(application: Application, event_loop: BaseEventLoop, aiohttp_client:
TestClient: web client test instance
"""
mocker.patch("pathlib.Path.iterdir", return_value=[])
return event_loop.run_until_complete(aiohttp_client(application))
return await aiohttp_client(application)
@pytest.fixture
def client_with_auth(application_with_auth: Application, event_loop: BaseEventLoop, aiohttp_client: Any,
mocker: MockerFixture) -> TestClient:
"""
web client fixture with full authorization functions
Args:
application_with_auth(Application): application fixture
event_loop(BaseEventLoop): context event loop
aiohttp_client(Any): aiohttp client fixture
mocker(MockerFixture): mocker object
Returns:
TestClient: web client test instance
"""
mocker.patch("pathlib.Path.iterdir", return_value=[])
return event_loop.run_until_complete(aiohttp_client(application_with_auth))
@pytest.fixture
def client_with_oauth_auth(application_with_auth: Application, event_loop: BaseEventLoop, aiohttp_client: Any,
@pytest_asyncio.fixture
async def client_with_auth(application_with_auth: Application, aiohttp_client: Any,
mocker: MockerFixture) -> TestClient:
"""
web client fixture with full authorization functions
Args:
application_with_auth(Application): application fixture
event_loop(BaseEventLoop): context event loop
aiohttp_client(Any): aiohttp client fixture
mocker(MockerFixture): mocker object
Returns:
TestClient: web client test instance
"""
mocker.patch("pathlib.Path.iterdir", return_value=[])
return await aiohttp_client(application_with_auth)
@pytest_asyncio.fixture
async def client_with_oauth_auth(application_with_auth: Application, aiohttp_client: Any,
mocker: MockerFixture) -> TestClient:
"""
web client fixture with full authorization functions
Args:
application_with_auth(Application): application fixture
aiohttp_client(Any): aiohttp client fixture
mocker(MockerFixture): mocker object
@ -219,4 +215,4 @@ def client_with_oauth_auth(application_with_auth: Application, event_loop: BaseE
"""
mocker.patch("pathlib.Path.iterdir", return_value=[])
application_with_auth[AuthKey] = MagicMock(spec=OAuth)
return event_loop.run_until_complete(aiohttp_client(application_with_auth))
return await aiohttp_client(application_with_auth)