mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-07-22 18:29:56 +00:00
Compare commits
3 Commits
aad94d7b8a
...
3da3caaaa5
Author | SHA1 | Date | |
---|---|---|---|
3da3caaaa5 | |||
9653fc4f4a | |||
bcd46c66e8 |
183
src/ahriman/core/alpm/bytes_pkgbuild_parser.py
Normal file
183
src/ahriman/core/alpm/bytes_pkgbuild_parser.py
Normal file
@ -0,0 +1,183 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2021-2025 ahriman team.
|
||||||
|
#
|
||||||
|
# This file is part of ahriman
|
||||||
|
# (see https://github.com/arcan1s/ahriman).
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections import defaultdict
|
||||||
|
from collections.abc import Iterator
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import ReprEnum
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from typing import Generator, IO, Self
|
||||||
|
|
||||||
|
from ahriman.models.pkgbuild_patch import PkgbuildPatch
|
||||||
|
|
||||||
|
|
||||||
|
class PkgbuildToken(bytes, ReprEnum):
|
||||||
|
|
||||||
|
Comment = b"#"
|
||||||
|
Assignment = b"="
|
||||||
|
SingleQuote = b"'"
|
||||||
|
DoubleQuote = b"\""
|
||||||
|
Space = b" "
|
||||||
|
NewLine = b"\n"
|
||||||
|
|
||||||
|
ParenthesisOpen = b"("
|
||||||
|
ParenthesisClose = b")"
|
||||||
|
|
||||||
|
FunctionStarts = b"function"
|
||||||
|
FunctionDeclaration = b"()"
|
||||||
|
BraceOpen = b"{"
|
||||||
|
BraceClose = b"}"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PkgbuildWord:
|
||||||
|
|
||||||
|
word: bytes
|
||||||
|
quote: bytes | None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def closing(self) -> PkgbuildToken | None:
|
||||||
|
if self.quote:
|
||||||
|
return None
|
||||||
|
match self.word:
|
||||||
|
case PkgbuildToken.ParenthesisOpen:
|
||||||
|
return PkgbuildToken.ParenthesisClose
|
||||||
|
case PkgbuildToken.BraceOpen:
|
||||||
|
return PkgbuildToken.BraceClose
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def original(self) -> bytes:
|
||||||
|
quote = self.quote or b""
|
||||||
|
return quote + self.word + quote
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
return bool(self.original)
|
||||||
|
|
||||||
|
|
||||||
|
class BytesPkgbuildParser(Iterator[PkgbuildPatch]):
|
||||||
|
|
||||||
|
def __init__(self, stream: IO[bytes]) -> None:
|
||||||
|
self._io = stream
|
||||||
|
|
||||||
|
def _next(self, *, declaration: bool) -> bytes:
|
||||||
|
while not (token := self._next_token(declaration=declaration)):
|
||||||
|
continue
|
||||||
|
return token
|
||||||
|
|
||||||
|
def _next_token(self, *, declaration: bool) -> bytes:
|
||||||
|
buffer = b""
|
||||||
|
while word := self._next_word():
|
||||||
|
match word:
|
||||||
|
case PkgbuildWord(PkgbuildToken.Comment, None):
|
||||||
|
self._io.readline()
|
||||||
|
|
||||||
|
case PkgbuildWord(PkgbuildToken.NewLine, None):
|
||||||
|
if declaration:
|
||||||
|
buffer = b""
|
||||||
|
return buffer
|
||||||
|
|
||||||
|
case PkgbuildWord(PkgbuildToken.Assignment, None) if declaration:
|
||||||
|
return buffer
|
||||||
|
|
||||||
|
case PkgbuildWord(PkgbuildToken.Space, None) if declaration:
|
||||||
|
if buffer.endswith(PkgbuildToken.FunctionDeclaration):
|
||||||
|
return buffer
|
||||||
|
buffer = b""
|
||||||
|
continue
|
||||||
|
|
||||||
|
case PkgbuildWord(PkgbuildToken.Space, None):
|
||||||
|
return buffer
|
||||||
|
|
||||||
|
case PkgbuildWord(PkgbuildToken.ParenthesisOpen, None):
|
||||||
|
buffer += PkgbuildToken.ParenthesisOpen
|
||||||
|
buffer += b"".join(self._next_words_until(PkgbuildWord(PkgbuildToken.ParenthesisClose, None)))
|
||||||
|
|
||||||
|
case PkgbuildWord(PkgbuildToken.BraceOpen, None):
|
||||||
|
buffer += PkgbuildToken.BraceOpen
|
||||||
|
buffer += b"".join(self._next_words_until(PkgbuildWord(PkgbuildToken.BraceClose, None)))
|
||||||
|
|
||||||
|
case PkgbuildWord(token, _):
|
||||||
|
buffer += token
|
||||||
|
|
||||||
|
raise StopIteration
|
||||||
|
|
||||||
|
def _next_word(self) -> PkgbuildWord:
|
||||||
|
# pass SimpleNamespace as an argument to implement side effects
|
||||||
|
def generator(quote: SimpleNamespace) -> Generator[bytes, None, None]:
|
||||||
|
while token := self._io.read(1):
|
||||||
|
match token:
|
||||||
|
case (PkgbuildToken.SingleQuote | PkgbuildToken.DoubleQuote) if quote.open is None:
|
||||||
|
quote.open = token
|
||||||
|
case closing_quote if closing_quote == quote.open:
|
||||||
|
return
|
||||||
|
case part:
|
||||||
|
yield part
|
||||||
|
if quote.open is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
if quote.open is not None:
|
||||||
|
raise ValueError("No closing quotation")
|
||||||
|
|
||||||
|
open_quote = SimpleNamespace(open=None)
|
||||||
|
value = b"".join(generator(open_quote))
|
||||||
|
|
||||||
|
return PkgbuildWord(value, open_quote.open)
|
||||||
|
|
||||||
|
def _next_words_until(self, ending: PkgbuildWord) -> Generator[bytes, None, None]:
|
||||||
|
braces = defaultdict(int)
|
||||||
|
while element := self._next_word():
|
||||||
|
yield element.original
|
||||||
|
match element:
|
||||||
|
case PkgbuildWord(token, None) if braces[token] > 0:
|
||||||
|
braces[token] -= 1
|
||||||
|
case with_closure if (closing := with_closure.closing) is not None:
|
||||||
|
braces[closing] += 1
|
||||||
|
case _ if element == ending:
|
||||||
|
return
|
||||||
|
|
||||||
|
if any(brace for brace in braces.values() if brace > 0):
|
||||||
|
raise ValueError("Unclosed parenthesis and/or braces found")
|
||||||
|
raise ValueError(f"No matching ending element {ending.word} found")
|
||||||
|
|
||||||
|
def parse(self) -> Generator[PkgbuildPatch, None, None]:
|
||||||
|
"""
|
||||||
|
parse source stream and yield parsed entries
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
PkgbuildPatch: extracted a PKGBUILD node
|
||||||
|
"""
|
||||||
|
yield from self
|
||||||
|
|
||||||
|
def __iter__(self) -> Self:
|
||||||
|
"""
|
||||||
|
base iterator method
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Self: iterator instance
|
||||||
|
"""
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __next__(self) -> PkgbuildPatch:
|
||||||
|
key = self._next(declaration=True)
|
||||||
|
value = self._next(declaration=False)
|
||||||
|
|
||||||
|
return PkgbuildPatch(key.decode(encoding="utf8"), value.decode(encoding="utf8"))
|
@ -23,7 +23,7 @@ import sys
|
|||||||
|
|
||||||
from collections.abc import Generator, Mapping, MutableMapping
|
from collections.abc import Generator, Mapping, MutableMapping
|
||||||
from string import Template
|
from string import Template
|
||||||
from typing import ClassVar
|
from typing import Any, ClassVar
|
||||||
|
|
||||||
from ahriman.core.configuration.shell_template import ShellTemplate
|
from ahriman.core.configuration.shell_template import ShellTemplate
|
||||||
|
|
||||||
@ -85,7 +85,7 @@ class ShellInterpolator(configparser.Interpolation):
|
|||||||
"prefix": sys.prefix,
|
"prefix": sys.prefix,
|
||||||
}
|
}
|
||||||
|
|
||||||
def before_get(self, parser: MutableMapping[str, Mapping[str, str]], section: str, option: str, value: str,
|
def before_get(self, parser: MutableMapping[str, Mapping[str, str]], section: Any, option: Any, value: str,
|
||||||
defaults: Mapping[str, str]) -> str:
|
defaults: Mapping[str, str]) -> str:
|
||||||
"""
|
"""
|
||||||
interpolate option value
|
interpolate option value
|
||||||
@ -100,8 +100,8 @@ class ShellInterpolator(configparser.Interpolation):
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
parser(MutableMapping[str, Mapping[str, str]]): option parser
|
parser(MutableMapping[str, Mapping[str, str]]): option parser
|
||||||
section(str): section name
|
section(Any): section name
|
||||||
option(str): option name
|
option(Any): option name
|
||||||
value(str): source (not-converted) value
|
value(str): source (not-converted) value
|
||||||
defaults(Mapping[str, str]): default values
|
defaults(Mapping[str, str]): default values
|
||||||
|
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
|
||||||
from aiohttp.test_utils import TestClient
|
from aiohttp.test_utils import TestClient
|
||||||
from aiohttp.web import Application, Resource, UrlMappingMatchInfo
|
from aiohttp.web import Application, Resource, UrlMappingMatchInfo
|
||||||
from asyncio import BaseEventLoop
|
|
||||||
from collections.abc import Awaitable, Callable
|
from collections.abc import Awaitable, Callable
|
||||||
from marshmallow import Schema
|
from marshmallow import Schema
|
||||||
from pytest_mock import MockerFixture
|
from pytest_mock import MockerFixture
|
||||||
@ -164,15 +164,13 @@ def application_with_auth(configuration: Configuration, user: User, spawner: Spa
|
|||||||
return application
|
return application
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest_asyncio.fixture
|
||||||
def client(application: Application, event_loop: BaseEventLoop, aiohttp_client: Any,
|
async def client(application: Application, aiohttp_client: Any, mocker: MockerFixture) -> TestClient:
|
||||||
mocker: MockerFixture) -> TestClient:
|
|
||||||
"""
|
"""
|
||||||
web client fixture
|
web client fixture
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
application(Application): application fixture
|
application(Application): application fixture
|
||||||
event_loop(BaseEventLoop): context event loop
|
|
||||||
aiohttp_client(Any): aiohttp client fixture
|
aiohttp_client(Any): aiohttp client fixture
|
||||||
mocker(MockerFixture): mocker object
|
mocker(MockerFixture): mocker object
|
||||||
|
|
||||||
@ -180,18 +178,17 @@ def client(application: Application, event_loop: BaseEventLoop, aiohttp_client:
|
|||||||
TestClient: web client test instance
|
TestClient: web client test instance
|
||||||
"""
|
"""
|
||||||
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
||||||
return event_loop.run_until_complete(aiohttp_client(application))
|
return await aiohttp_client(application)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest_asyncio.fixture
|
||||||
def client_with_auth(application_with_auth: Application, event_loop: BaseEventLoop, aiohttp_client: Any,
|
async def client_with_auth(application_with_auth: Application, aiohttp_client: Any,
|
||||||
mocker: MockerFixture) -> TestClient:
|
mocker: MockerFixture) -> TestClient:
|
||||||
"""
|
"""
|
||||||
web client fixture with full authorization functions
|
web client fixture with full authorization functions
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
application_with_auth(Application): application fixture
|
application_with_auth(Application): application fixture
|
||||||
event_loop(BaseEventLoop): context event loop
|
|
||||||
aiohttp_client(Any): aiohttp client fixture
|
aiohttp_client(Any): aiohttp client fixture
|
||||||
mocker(MockerFixture): mocker object
|
mocker(MockerFixture): mocker object
|
||||||
|
|
||||||
@ -199,18 +196,17 @@ def client_with_auth(application_with_auth: Application, event_loop: BaseEventLo
|
|||||||
TestClient: web client test instance
|
TestClient: web client test instance
|
||||||
"""
|
"""
|
||||||
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
||||||
return event_loop.run_until_complete(aiohttp_client(application_with_auth))
|
return await aiohttp_client(application_with_auth)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest_asyncio.fixture
|
||||||
def client_with_oauth_auth(application_with_auth: Application, event_loop: BaseEventLoop, aiohttp_client: Any,
|
async def client_with_oauth_auth(application_with_auth: Application, aiohttp_client: Any,
|
||||||
mocker: MockerFixture) -> TestClient:
|
mocker: MockerFixture) -> TestClient:
|
||||||
"""
|
"""
|
||||||
web client fixture with full authorization functions
|
web client fixture with full authorization functions
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
application_with_auth(Application): application fixture
|
application_with_auth(Application): application fixture
|
||||||
event_loop(BaseEventLoop): context event loop
|
|
||||||
aiohttp_client(Any): aiohttp client fixture
|
aiohttp_client(Any): aiohttp client fixture
|
||||||
mocker(MockerFixture): mocker object
|
mocker(MockerFixture): mocker object
|
||||||
|
|
||||||
@ -219,4 +215,4 @@ def client_with_oauth_auth(application_with_auth: Application, event_loop: BaseE
|
|||||||
"""
|
"""
|
||||||
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
mocker.patch("pathlib.Path.iterdir", return_value=[])
|
||||||
application_with_auth[AuthKey] = MagicMock(spec=OAuth)
|
application_with_auth[AuthKey] = MagicMock(spec=OAuth)
|
||||||
return event_loop.run_until_complete(aiohttp_client(application_with_auth))
|
return await aiohttp_client(application_with_auth)
|
||||||
|
Reference in New Issue
Block a user