Compare commits

..

1 Commits

Author SHA1 Message Date
3da3caaaa5 in-house bytes pkgbuild parser 2025-06-02 13:55:14 +03:00
19 changed files with 761 additions and 630 deletions

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,7 @@
pkgbase='ahriman'
pkgname=('ahriman' 'ahriman-core' 'ahriman-triggers' 'ahriman-web')
pkgver=2.18.1
pkgver=2.17.1
pkgrel=1
pkgdesc="ArcH linux ReposItory MANager"
arch=('any')

View File

@ -635,7 +635,6 @@ _set_new_action() {
# ${!x} -> ${hello} -> "world"
_shtab_ahriman() {
local completing_word="${COMP_WORDS[COMP_CWORD]}"
local previous_word="${COMP_WORDS[COMP_CWORD-1]}"
local completed_positional_actions
local current_action
local current_action_args_start_index
@ -692,10 +691,6 @@ _shtab_ahriman() {
if [[ $pos_only = 0 && "${completing_word}" == -* ]]; then
# optional argument started: use option strings
COMPREPLY=( $(compgen -W "${current_option_strings[*]}" -- "${completing_word}") )
elif [[ "${previous_word}" == ">" || "${previous_word}" == ">>" ||
"${previous_word}" =~ ^[12]">" || "${previous_word}" =~ ^[12]">>" ]]; then
# handle redirection operators
COMPREPLY=( $(compgen -f -- "${completing_word}") )
else
# use choices & compgen
local IFS=$'\n' # items may contain spaces, so delimit using newline

View File

@ -1,4 +1,4 @@
.TH AHRIMAN "1" "2025\-06\-16" "ahriman" "Generated Python Manual"
.TH AHRIMAN "1" "2025\-01\-05" "ahriman" "Generated Python Manual"
.SH NAME
ahriman
.SH SYNOPSIS

View File

@ -17,4 +17,4 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__version__ = "2.18.1"
__version__ = "2.17.1"

View File

@ -0,0 +1,183 @@
#
# Copyright (c) 2021-2025 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
from collections import defaultdict
from collections.abc import Iterator
from dataclasses import dataclass
from enum import ReprEnum
from types import SimpleNamespace
from typing import Generator, IO, Self
from ahriman.models.pkgbuild_patch import PkgbuildPatch
class PkgbuildToken(bytes, ReprEnum):
Comment = b"#"
Assignment = b"="
SingleQuote = b"'"
DoubleQuote = b"\""
Space = b" "
NewLine = b"\n"
ParenthesisOpen = b"("
ParenthesisClose = b")"
FunctionStarts = b"function"
FunctionDeclaration = b"()"
BraceOpen = b"{"
BraceClose = b"}"
@dataclass
class PkgbuildWord:
word: bytes
quote: bytes | None
@property
def closing(self) -> PkgbuildToken | None:
if self.quote:
return None
match self.word:
case PkgbuildToken.ParenthesisOpen:
return PkgbuildToken.ParenthesisClose
case PkgbuildToken.BraceOpen:
return PkgbuildToken.BraceClose
return None
@property
def original(self) -> bytes:
quote = self.quote or b""
return quote + self.word + quote
def __bool__(self) -> bool:
return bool(self.original)
class BytesPkgbuildParser(Iterator[PkgbuildPatch]):
def __init__(self, stream: IO[bytes]) -> None:
self._io = stream
def _next(self, *, declaration: bool) -> bytes:
while not (token := self._next_token(declaration=declaration)):
continue
return token
def _next_token(self, *, declaration: bool) -> bytes:
buffer = b""
while word := self._next_word():
match word:
case PkgbuildWord(PkgbuildToken.Comment, None):
self._io.readline()
case PkgbuildWord(PkgbuildToken.NewLine, None):
if declaration:
buffer = b""
return buffer
case PkgbuildWord(PkgbuildToken.Assignment, None) if declaration:
return buffer
case PkgbuildWord(PkgbuildToken.Space, None) if declaration:
if buffer.endswith(PkgbuildToken.FunctionDeclaration):
return buffer
buffer = b""
continue
case PkgbuildWord(PkgbuildToken.Space, None):
return buffer
case PkgbuildWord(PkgbuildToken.ParenthesisOpen, None):
buffer += PkgbuildToken.ParenthesisOpen
buffer += b"".join(self._next_words_until(PkgbuildWord(PkgbuildToken.ParenthesisClose, None)))
case PkgbuildWord(PkgbuildToken.BraceOpen, None):
buffer += PkgbuildToken.BraceOpen
buffer += b"".join(self._next_words_until(PkgbuildWord(PkgbuildToken.BraceClose, None)))
case PkgbuildWord(token, _):
buffer += token
raise StopIteration
def _next_word(self) -> PkgbuildWord:
# pass SimpleNamespace as an argument to implement side effects
def generator(quote: SimpleNamespace) -> Generator[bytes, None, None]:
while token := self._io.read(1):
match token:
case (PkgbuildToken.SingleQuote | PkgbuildToken.DoubleQuote) if quote.open is None:
quote.open = token
case closing_quote if closing_quote == quote.open:
return
case part:
yield part
if quote.open is None:
return
if quote.open is not None:
raise ValueError("No closing quotation")
open_quote = SimpleNamespace(open=None)
value = b"".join(generator(open_quote))
return PkgbuildWord(value, open_quote.open)
def _next_words_until(self, ending: PkgbuildWord) -> Generator[bytes, None, None]:
braces = defaultdict(int)
while element := self._next_word():
yield element.original
match element:
case PkgbuildWord(token, None) if braces[token] > 0:
braces[token] -= 1
case with_closure if (closing := with_closure.closing) is not None:
braces[closing] += 1
case _ if element == ending:
return
if any(brace for brace in braces.values() if brace > 0):
raise ValueError("Unclosed parenthesis and/or braces found")
raise ValueError(f"No matching ending element {ending.word} found")
def parse(self) -> Generator[PkgbuildPatch, None, None]:
"""
parse source stream and yield parsed entries
Yields:
PkgbuildPatch: extracted a PKGBUILD node
"""
yield from self
def __iter__(self) -> Self:
"""
base iterator method
Returns:
Self: iterator instance
"""
return self
def __next__(self) -> PkgbuildPatch:
key = self._next(declaration=True)
value = self._next(declaration=False)
return PkgbuildPatch(key.decode(encoding="utf8"), value.decode(encoding="utf8"))

View File

@ -35,7 +35,7 @@ class Remote(SyncHttpClient):
>>> package = AUR.info("ahriman")
>>> search_result = Official.multisearch("pacman", "manager", pacman=pacman)
Difference between :func:`search()` and :func:`multisearch()` is that :func:`search()` passes all arguments to
Differnece between :func:`search()` and :func:`multisearch()` is that :func:`search()` passes all arguments to
underlying wrapper directly, whereas :func:`multisearch()` splits search one by one and finds intersection
between search results.
"""

View File

@ -17,7 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import contextlib
import sqlite3
from collections.abc import Callable
@ -88,12 +87,10 @@ class Operations(LazyLogging):
Returns:
T: result of the ``query`` call
"""
with contextlib.closing(sqlite3.connect(self.path, detect_types=sqlite3.PARSE_DECLTYPES)) as connection:
with sqlite3.connect(self.path, detect_types=sqlite3.PARSE_DECLTYPES) as connection:
connection.set_trace_callback(self.logger.debug)
connection.row_factory = self.factory
result = query(connection)
if commit:
connection.commit()
return result

View File

@ -40,7 +40,7 @@ class JinjaTemplate:
* homepage - link to homepage, string, optional
* last_update - report generation time, pretty printed datetime, required
* link_path - prefix of packages to download, string, required
* link_path - prefix fo packages to download, string, required
* has_package_signed - ``True`` in case if package sign enabled, ``False`` otherwise, required
* has_repo_signed - ``True`` in case if repository database sign enabled, ``False`` otherwise, required
* packages - sorted list of packages properties, required
@ -64,7 +64,7 @@ class JinjaTemplate:
Attributes:
default_pgp_key(str | None): default PGP key
homepage(str | None): homepage link if any (for footer)
link_path(str): prefix of packages to download
link_path(str): prefix fo packages to download
name(str): repository name
rss_url(str | None): link to the RSS feed
sign_targets(set[SignSettings]): targets to sign enabled in configuration

View File

@ -71,7 +71,7 @@ class EventLogger:
>>> with self.in_event(package_base, EventType.PackageUpdated):
>>> do_something()
Additional parameter ``failure`` can be set in order to emit an event on exception occurred. If none set
Additional parameter ``failure`` can be set in order to emit an event on exception occured. If none set
(default), then no event will be recorded on exception
"""
with MetricsTimer() as timer:

View File

@ -69,7 +69,7 @@ class Package(LazyLogging):
:attr:`ahriman.models.package_source.PackageSource.Archive`,
:attr:`ahriman.models.package_source.PackageSource.AUR`,
:attr:`ahriman.models.package_source.PackageSource.Local` and
:attr:`ahriman.models.package_source.PackageSource.Repository` respectively:
:attr:`ahriman.models.package_source.PackageSource.Repository` repsectively:
>>> ahriman_package = Package.from_aur("ahriman")
>>> pacman_package = Package.from_official("pacman", pacman)

View File

@ -39,7 +39,7 @@ class RemoteSchema(Schema):
"example": ".",
})
source = fields.Enum(PackageSource, by_value=True, required=True, metadata={
"description": "Package source",
"description": "Pacakge source",
})
web_url = fields.String(metadata={
"description": "Package repository page",

View File

@ -106,7 +106,7 @@ class PackageView(StatusViewGuard, BaseView):
@apidocs(
tags=["Packages"],
summary="Update package",
description="Update package status and set its descriptor optionally",
description="Update package status and set its descriptior optionally",
permission=POST_PERMISSION,
error_400_enabled=True,
error_404_description="Repository is unknown",

View File

@ -53,7 +53,7 @@ def test_remote_git_url(remote: Remote) -> None:
must raise NotImplemented for missing remote git url
"""
with pytest.raises(NotImplementedError):
remote.remote_git_url("package", "repositories")
remote.remote_git_url("package", "repositorys")
def test_remote_web_url(remote: Remote) -> None:

View File

@ -10,7 +10,7 @@ from ahriman.core.exceptions import PacmanError
def test_copy(mocker: MockerFixture) -> None:
"""
must copy local database file
must copy loca database file
"""
copy_mock = mocker.patch("shutil.copy")
PacmanDatabase.copy(Path("remote"), Path("local"))

View File

@ -1,4 +1,3 @@
import pytest
import sqlite3
from pytest_mock import MockerFixture
@ -25,29 +24,15 @@ def test_factory(database: SQLite) -> None:
def test_with_connection(database: SQLite, mocker: MockerFixture) -> None:
"""
must run query inside connection and close it at the end
must run query inside connection
"""
connection_mock = MagicMock()
connect_mock = mocker.patch("sqlite3.connect", return_value=connection_mock)
database.with_connection(lambda conn: conn.execute("select 1"))
connect_mock.assert_called_once_with(database.path, detect_types=sqlite3.PARSE_DECLTYPES)
connection_mock.set_trace_callback.assert_called_once_with(database.logger.debug)
connection_mock.commit.assert_not_called()
connection_mock.close.assert_called_once_with()
def test_with_connection_close(database: SQLite, mocker: MockerFixture) -> None:
"""
must close connection on errors
"""
connection_mock = MagicMock()
connection_mock.commit.side_effect = Exception
mocker.patch("sqlite3.connect", return_value=connection_mock)
with pytest.raises(Exception):
database.with_connection(lambda conn: conn.execute("select 1"), commit=True)
connection_mock.close.assert_called_once_with()
connection_mock.__enter__().set_trace_callback.assert_called_once_with(database.logger.debug)
connection_mock.__enter__().commit.assert_not_called()
def test_with_connection_with_commit(database: SQLite, mocker: MockerFixture) -> None:
@ -59,4 +44,4 @@ def test_with_connection_with_commit(database: SQLite, mocker: MockerFixture) ->
mocker.patch("sqlite3.connect", return_value=connection_mock)
database.with_connection(lambda conn: conn.execute("select 1"), commit=True)
connection_mock.commit.assert_called_once_with()
connection_mock.__enter__().commit.assert_called_once_with()

View File

@ -285,7 +285,7 @@ def test_set_unknown(client: Client, package_ahriman: Package, mocker: MockerFix
def test_set_unknown_skip(client: Client, package_ahriman: Package, mocker: MockerFixture) -> None:
"""
must skip unknown status update in case if package is already known
must skip unknown status update in case if pacakge is already known
"""
mocker.patch("ahriman.core.status.Client.package_get", return_value=[(package_ahriman, None)])
update_mock = mocker.patch("ahriman.core.status.Client.package_update")

View File

@ -73,7 +73,7 @@ def test_configuration_sections(configuration: Configuration) -> None:
def test_on_result(trigger: Trigger) -> None:
"""
must pass execution to run method
must pass execution nto run method
"""
trigger.on_result(Result(), [])

View File

@ -3,7 +3,7 @@ from ahriman.models.log_record_id import LogRecordId
def test_init() -> None:
"""
must correctly assign process identifier if not set
must correctly assign proces identifier if not set
"""
assert LogRecordId("1", "2").process_id == LogRecordId.DEFAULT_PROCESS_ID
assert LogRecordId("1", "2", "3").process_id == "3"