Compare commits

..

12 Commits

Author SHA1 Message Date
aac3ccfc30 migration to pkgctl 2023-05-09 15:31:50 +03:00
8c55438140 disable debugpanel for now 2023-05-04 14:30:27 +03:00
277d40d231 do not assign path in context manager as it is deprectated 2023-05-04 14:29:39 +03:00
2ff56965d9 PEP-673 use Self as return type for classmethods (#94)
* PEP-673 use Self as return type for classmethods

* add dummy test file

* remove python3.10 compat
2023-05-04 03:28:08 +03:00
9dc6d56a8d fix flapping tests for oauth
Original implementation sends requests to httpbin which sometimes might
not be available. With proposed changes we are blocking redirects and
just check request itself
2023-05-03 14:53:35 +03:00
d59a0629c5 PEP-561 complaint: add py.typed marker 2023-04-17 18:56:49 +03:00
31f47b8069 fix some pycharm warnings 2023-04-17 02:23:50 +03:00
e42ca95789 suppress status errors also if option is set 2023-04-15 05:05:55 +03:00
ff917281a2 add referencne to api docs to docs 2023-04-14 05:06:53 +03:00
f7e260e1f4 add reference to api documentation from main page 2023-04-12 04:22:49 +03:00
abf29083c4 PEP-585 complaint: remove type aliases (#93) 2023-04-12 04:09:28 +03:00
54d36ccf36 docs update 2023-04-10 21:13:43 +03:00
184 changed files with 1422 additions and 1326 deletions

View File

@ -10,7 +10,7 @@ echo -e '[arcanisrepo]\nServer = http://repo.arcanis.me/$arch\nSigLevel = Never'
# refresh the image
pacman --noconfirm -Syu
# main dependencies
pacman --noconfirm -Sy base-devel devtools git pyalpm python-cerberus python-inflection python-passlib python-requests python-setuptools python-srcinfo sudo
pacman --noconfirm -Sy base-devel devtools-git-poc git pyalpm python-cerberus python-inflection python-passlib python-requests python-srcinfo sudo
# make dependencies
pacman --noconfirm -Sy python-build python-installer python-wheel
# optional dependencies
@ -52,9 +52,9 @@ if [[ -z $MINIMAL_INSTALL ]]; then
fi
# add the first package
# the build itself does not really work in the container
sudo -u ahriman -- ahriman package-add --now yay
sudo -u ahriman -- ahriman package-add --now ahriman
# check if package was actually installed
test -n "$(find "/var/lib/ahriman/repository/x86_64" -name "yay*pkg*")"
test -n "$(find "/var/lib/ahriman/repository/x86_64" -name "ahriman*pkg*")"
# run package check
sudo -u ahriman -- ahriman repo-update
# stop web service lol

View File

@ -4,7 +4,7 @@
set -ex
# install dependencies
pacman --noconfirm -Syu base-devel python-pip python-setuptools python-tox
pacman --noconfirm -Syu base-devel python-setuptools python-tox
# run test and check targets
make check tests

View File

@ -90,7 +90,9 @@ Again, the most checks can be performed by `make check` command, though some add
self.instance_attribute = ""
```
* Type annotations are the must, even for local functions.
* Type annotations are the must, even for local functions. For the function argument `self` (for instance methods) and `cls` (for class methods) should not be annotated.
* For collection types built-in classes must be used if possible (e.g. `dict` instead of `typing.Dict`, `tuple` instead of `typing.Tuple`). In case if built-in type is not available, but `collections.abc` provides interface, it must be used (e.g. `collections.abc.Awaitable` instead of `typing.Awaitable`, `collections.abc.Iterable` instead of `typing.Iterable`). For union classes, the bar operator (`|`) must be used (e.g. `float | int` instead of `typing.Union[float, int]`), which also includes `typinng.Optional` (e.g. `str | None` instead of `Optional[str]`).
* `classmethod` should always return `Self`. In case of mypy warning (e.g. if there is a branch in which function doesn't return the instance of `cls`) consider using `staticmethod` instead.
* Recommended order of function definitions in class:
```python
@ -102,7 +104,7 @@ Again, the most checks can be performed by `make check` command, though some add
def property(self) -> Any: ...
@classmethod
def class_method(cls: Type[Clazz]) -> Clazz: ...
def class_method(cls) -> Self: ...
@staticmethod
def static_method() -> Any: ...

View File

@ -28,7 +28,7 @@ RUN useradd -m -d "/home/build" -s "/usr/bin/nologin" build && \
COPY "docker/install-aur-package.sh" "/usr/local/bin/install-aur-package"
## install package dependencies
## darcs is not installed by reasons, because it requires a lot haskell packages which dramatically increase image size
RUN pacman --noconfirm -Sy devtools git pyalpm python-cerberus python-inflection python-passlib python-requests python-setuptools python-srcinfo && \
RUN pacman --noconfirm -Sy devtools-git-poc git pyalpm python-cerberus python-inflection python-passlib python-requests python-srcinfo && \
pacman --noconfirm -Sy python-build python-installer python-wheel && \
pacman --noconfirm -Sy breezy mercurial python-aiohttp python-aiohttp-cors python-boto3 python-cryptography python-jinja python-requests-unixsocket rsync subversion && \
runuser -u build -- install-aur-package python-aioauth-client python-aiohttp-apispec-git python-aiohttp-jinja2 \

View File

@ -36,7 +36,6 @@ AHRIMAN_GNUPG_HOME="$(getent passwd "$AHRIMAN_USER" | cut -d : -f 6)/.gnupg"
chown "$AHRIMAN_USER":"$AHRIMAN_USER" "$AHRIMAN_GNUPG_HOME"
# run built-in setup command
AHRIMAN_SETUP_ARGS=("--build-as-user" "$AHRIMAN_USER")
AHRIMAN_SETUP_ARGS+=("--packager" "$AHRIMAN_PACKAGER")
AHRIMAN_SETUP_ARGS+=("--repository" "$AHRIMAN_REPOSITORY")
if [ -z "$AHRIMAN_MULTILIB" ]; then

View File

@ -8,11 +8,28 @@ Subpackages
:maxdepth: 4
ahriman.web.middlewares
ahriman.web.schemas
ahriman.web.views
Submodules
----------
ahriman.web.apispec module
--------------------------
.. automodule:: ahriman.web.apispec
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.cors module
-----------------------
.. automodule:: ahriman.web.cors
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.routes module
-------------------------

View File

@ -0,0 +1,165 @@
ahriman.web.schemas package
===========================
Submodules
----------
ahriman.web.schemas.aur\_package\_schema module
-----------------------------------------------
.. automodule:: ahriman.web.schemas.aur_package_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.auth\_schema module
---------------------------------------
.. automodule:: ahriman.web.schemas.auth_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.counters\_schema module
-------------------------------------------
.. automodule:: ahriman.web.schemas.counters_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.error\_schema module
----------------------------------------
.. automodule:: ahriman.web.schemas.error_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.internal\_status\_schema module
---------------------------------------------------
.. automodule:: ahriman.web.schemas.internal_status_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.log\_schema module
--------------------------------------
.. automodule:: ahriman.web.schemas.log_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.login\_schema module
----------------------------------------
.. automodule:: ahriman.web.schemas.login_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.logs\_schema module
---------------------------------------
.. automodule:: ahriman.web.schemas.logs_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.oauth2\_schema module
-----------------------------------------
.. automodule:: ahriman.web.schemas.oauth2_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.package\_name\_schema module
------------------------------------------------
.. automodule:: ahriman.web.schemas.package_name_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.package\_names\_schema module
-------------------------------------------------
.. automodule:: ahriman.web.schemas.package_names_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.package\_properties\_schema module
------------------------------------------------------
.. automodule:: ahriman.web.schemas.package_properties_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.package\_schema module
------------------------------------------
.. automodule:: ahriman.web.schemas.package_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.package\_status\_schema module
--------------------------------------------------
.. automodule:: ahriman.web.schemas.package_status_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.pgp\_key\_id\_schema module
-----------------------------------------------
.. automodule:: ahriman.web.schemas.pgp_key_id_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.pgp\_key\_schema module
-------------------------------------------
.. automodule:: ahriman.web.schemas.pgp_key_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.remote\_schema module
-----------------------------------------
.. automodule:: ahriman.web.schemas.remote_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.search\_schema module
-----------------------------------------
.. automodule:: ahriman.web.schemas.search_schema
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.schemas.status\_schema module
-----------------------------------------
.. automodule:: ahriman.web.schemas.status_schema
:members:
:no-undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: ahriman.web.schemas
:members:
:no-undoc-members:
:show-inheritance:

View File

@ -0,0 +1,29 @@
ahriman.web.views.api package
=============================
Submodules
----------
ahriman.web.views.api.docs module
---------------------------------
.. automodule:: ahriman.web.views.api.docs
:members:
:no-undoc-members:
:show-inheritance:
ahriman.web.views.api.swagger module
------------------------------------
.. automodule:: ahriman.web.views.api.swagger
:members:
:no-undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: ahriman.web.views.api
:members:
:no-undoc-members:
:show-inheritance:

View File

@ -7,6 +7,7 @@ Subpackages
.. toctree::
:maxdepth: 4
ahriman.web.views.api
ahriman.web.views.service
ahriman.web.views.status
ahriman.web.views.user

View File

@ -121,7 +121,7 @@ Type conversions
By default, it parses rows into python dictionary. In addition, the following pseudo-types are supported:
* ``Dict[str, Any]``, ``List[Any]`` - for storing JSON data structures in database (technically there is no restriction on types for dictionary keys and values, but it is recommended to use only string keys). The type is stored as ``json`` data type and ``json.loads`` and ``json.dumps`` methods are used in order to read and write from/to database respectively.
* ``dict[str, Any]``, ``list[Any]`` - for storing JSON data structures in database (technically there is no restriction on types for dictionary keys and values, but it is recommended to use only string keys). The type is stored as ``json`` data type and ``json.loads`` and ``json.dumps`` methods are used in order to read and write from/to database respectively.
Basic flows
-----------
@ -235,7 +235,7 @@ Triggers are extensions which can be used in order to perform any actions on app
The main idea is to load classes by their full path (e.g. ``ahriman.core.upload.UploadTrigger``) by using ``importlib``: get the last part of the import and treat it as class name, join remain part by ``.`` and interpret as module path, import module and extract attribute from it.
The loaded triggers will be called with ``ahriman.models.result.Result`` and ``List[Packages]`` arguments, which describes the process result and current repository packages respectively. Any exception raised will be suppressed and will generate an exception message in logs.
The loaded triggers will be called with ``ahriman.models.result.Result`` and ``list[Packages]`` arguments, which describes the process result and current repository packages respectively. Any exception raised will be suppressed and will generate an exception message in logs.
In addition triggers can implement ``on_start`` and ``on_stop`` actions which will be called on the application start and right before the application exit. The ``on_start`` action is usually being called from handlers directly in order to make sure that no trigger will be run when it is not required (e.g. on user management). As soon as ``on_start`` action is called, the additional flag will be set; ``ahriman.core.triggers.TriggerLoader`` class implements ``__del__`` method in which, if the flag is set, the ``on_stop`` actions will be called.

View File

@ -63,11 +63,9 @@ Authorized users are stored inside internal database, if any of external provide
Build related configuration. Group name can refer to architecture, e.g. ``build:x86_64`` can be used for x86_64 architecture specific settings.
* ``archbuild_flags`` - additional flags passed to ``archbuild`` command, space separated list of strings, optional.
* ``build_command`` - default build command, string, required.
* ``build_flags`` - additional ``pkgctl`` flags, space separated list of strings, optional.
* ``ignore_packages`` - list packages to ignore during a regular update (manual update will still work), space separated list of strings, optional.
* ``makepkg_flags`` - additional flags passed to ``makepkg`` command, space separated list of strings, optional.
* ``makechrootpkg_flags`` - additional flags passed to ``makechrootpkg`` command, space separated list of strings, optional.
* ``triggers`` - list of ``ahriman.core.triggers.Trigger`` class implementation (e.g. ``ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger``) which will be loaded and run at the end of processing, space separated list of strings, optional. You can also specify triggers by their paths, e.g. ``/usr/lib/python3.10/site-packages/ahriman/core/report/report.py.ReportTrigger``. Triggers are run in the order of mention.
* ``vcs_allowed_age`` - maximal age in seconds of the VCS packages before their version will be updated with its remote source, int, optional, default ``604800``.

View File

@ -19,7 +19,7 @@ Features
Live demos
----------
* `Build status page <https://ahriman-demo.arcanis.me>`_. You can login as ``demo`` user by using ``demo`` password. Note, however, you will not be able to run tasks.
* `Build status page <https://ahriman-demo.arcanis.me>`_. You can login as ``demo`` user by using ``demo`` password. Note, however, you will not be able to run tasks. `HTTP API documentation <https://ahriman-demo.arcanis.me/api-docs>`_ is also available.
* `Repository index <http://repo.arcanis.me/x86_64/index.html>`_.
* `Telegram feed <https://t.me/arcanisrepo>`_.

View File

@ -24,46 +24,31 @@ Initial setup
#.
Configure build tools (it is required for correct dependency management system):
#.
Create build command (you can choose any name for command, basically it should be ``{name}-{arch}-build``):
#.
Create configuration file ``{repository}.conf``:
.. code-block:: shell
ln -s /usr/bin/archbuild /usr/local/bin/ahriman-x86_64-build
#.
Create configuration file (same as previous ``pacman-{name}.conf``):
.. code-block:: shell
cp /usr/share/devtools/pacman-{extra,ahriman}.conf
cp /usr/local/share/devtools-git-poc/pacman.conf.d/{extra,aur-clone}.conf
#.
Change configuration file, add your own repository, add multilib repository etc:
.. code-block:: shell
echo '[multilib]' | tee -a /usr/share/devtools/pacman-ahriman.conf
echo 'Include = /etc/pacman.d/mirrorlist' | tee -a /usr/share/devtools/pacman-ahriman.conf
echo '[multilib]' | tee -a /usr/local/share/devtools-git-poc/pacman.conf.d/aur-clone.conf
echo 'Include = /etc/pacman.d/mirrorlist' | tee -a /usr/local/share/devtools-git-poc/pacman.conf.d/aur-clone.conf
echo '[aur-clone]' | tee -a /usr/share/devtools/pacman-ahriman.conf
echo 'SigLevel = Optional TrustAll' | tee -a /usr/share/devtools/pacman-ahriman.conf
echo 'Server = file:///var/lib/ahriman/repository/$arch' | tee -a /usr/share/devtools/pacman-ahriman.conf
#.
Set ``build_command`` option to point to your command:
.. code-block:: shell
echo '[build]' | tee -a /etc/ahriman.ini.d/build.ini
echo 'build_command = ahriman-x86_64-build' | tee -a /etc/ahriman.ini.d/build.ini
echo '[aur-clone]' | tee -a /usr/local/share/devtools-git-poc/pacman.conf.d/aur-clone.conf
echo 'SigLevel = Optional TrustAll' | tee -a /usr/local/share/devtools-git-poc/pacman.conf.d/aur-clone.conf
echo 'Server = file:///var/lib/ahriman/repository/$arch' | tee -a /usr/local/share/devtools-git-poc/pacman.conf.d/aur-clone.conf
#.
Configure ``/etc/sudoers.d/ahriman`` to allow running command without a password:
Configure ``/etc/sudoers.d/ahriman`` to allow running devtools command without a password:
.. code-block:: shell
echo 'Cmnd_Alias CARCHBUILD_CMD = /usr/local/bin/ahriman-x86_64-build *' | tee -a /etc/sudoers.d/ahriman
echo 'Cmnd_Alias CARCHBUILD_CMD = /usr/bin/pkgctl build *' | tee -a /etc/sudoers.d/ahriman
echo 'ahriman ALL=(ALL) NOPASSWD: CARCHBUILD_CMD' | tee -a /etc/sudoers.d/ahriman
chmod 400 /etc/sudoers.d/ahriman

View File

@ -7,7 +7,7 @@ pkgdesc="ArcH linux ReposItory MANager"
arch=('any')
url="https://github.com/arcan1s/ahriman"
license=('GPL3')
depends=('devtools' 'git' 'pyalpm' 'python-cerberus' 'python-inflection' 'python-passlib' 'python-requests' 'python-setuptools' 'python-srcinfo')
depends=('devtools-git-poc' 'git' 'pyalpm' 'python-cerberus' 'python-inflection' 'python-passlib' 'python-requests' 'python-srcinfo')
makedepends=('python-build' 'python-installer' 'python-wheel')
optdepends=('breezy: -bzr packages support'
'darcs: -darcs packages support'

View File

@ -19,11 +19,9 @@ oauth_scopes = https://www.googleapis.com/auth/userinfo.email
allow_read_only = yes
[build]
archbuild_flags =
build_command = extra-x86_64-build
build_command = pkgctl
ignore_packages =
makechrootpkg_flags =
makepkg_flags = --nocolor --ignorearch
pkgctl_flags =
triggers = ahriman.core.gitremote.RemotePullTrigger ahriman.core.report.ReportTrigger ahriman.core.upload.UploadTrigger ahriman.core.gitremote.RemotePushTrigger
vcs_allowed_age = 604800

View File

@ -100,6 +100,7 @@
<li><a id="badge-version" class="nav-link" href="https://github.com/arcan1s/ahriman" title="sources"><i class="bi bi-github"></i> ahriman</a></li>
<li><a class="nav-link" href="https://github.com/arcan1s/ahriman/releases" title="releases list">releases</a></li>
<li><a class="nav-link" href="https://github.com/arcan1s/ahriman/issues" title="issues tracker">report a bug</a></li>
<li><a class="nav-link" href="/api-docs" title="API documentation">api</a></li>
</ul>
{% if index_url is not none %}

View File

@ -1,10 +1,10 @@
from pathlib import Path
from setuptools import find_packages, setup
from typing import Any, Dict
from typing import Any
metadata_path = Path(__file__).resolve().parent / "src/ahriman/version.py"
metadata: Dict[str, Any] = {}
metadata: dict[str, Any] = {}
with metadata_path.open() as metadata_file:
exec(metadata_file.read(), metadata) # pylint: disable=exec-used
@ -25,6 +25,7 @@ setup(
packages=find_packages("src"),
package_dir={"": "src"},
package_data={"": ["py.typed"]},
dependency_links=[
],
@ -33,7 +34,6 @@ setup(
"inflection",
"passlib",
"requests",
"setuptools",
"srcinfo",
],
setup_requires=[

View File

@ -23,7 +23,7 @@ import sys
import tempfile
from pathlib import Path
from typing import List, TypeVar
from typing import TypeVar
from ahriman import version
from ahriman.application import handlers
@ -35,7 +35,7 @@ from ahriman.models.sign_settings import SignSettings
from ahriman.models.user_access import UserAccess
__all__: List[str] = []
__all__: list[str] = []
# this workaround is for several things
@ -70,7 +70,8 @@ def _parser() -> argparse.ArgumentParser:
fromfile_prefix_chars="@", formatter_class=_formatter)
parser.add_argument("-a", "--architecture", help="target architectures. For several subcommands it can be used "
"multiple times", action="append")
parser.add_argument("-c", "--configuration", help="configuration path", type=Path, default=Path("/etc/ahriman.ini"))
parser.add_argument("-c", "--configuration", help="configuration path", type=Path,
default=Path("/etc") / "ahriman.ini")
parser.add_argument("--force", help="force run, remove file lock", action="store_true")
parser.add_argument("-l", "--lock", help="lock file", type=Path,
default=Path(tempfile.gettempdir()) / "ahriman.lock")
@ -819,10 +820,9 @@ def _set_service_setup_parser(root: SubParserAction) -> argparse.ArgumentParser:
description="create initial service configuration, requires root",
epilog="Create _minimal_ configuration for the service according to provided options.",
formatter_class=_formatter)
parser.add_argument("--build-as-user", help="force makepkg user to the specific one")
parser.add_argument("--build-command", help="build command prefix", default="ahriman")
parser.add_argument("--from-configuration", help="path to default devtools pacman configuration",
type=Path, default=Path("/usr/share/devtools/pacman-extra.conf"))
parser.add_argument("--build-command", help="path to build command", default=Path("/usr") / "bin" / "pkgctl")
parser.add_argument("--from-configuration", help="path to default devtools pacman configuration", type=Path,
default=Path("/usr") / "local" / "share" / "devtools-git-poc" / "pacman.conf.d" / "extra.conf")
parser.add_argument("--makeflags-jobs", help="append MAKEFLAGS variable with parallelism set to number of cores",
action=argparse.BooleanOptionalAction, default=True)
parser.add_argument("--mirror", help="use the specified explicitly mirror instead of including mirrorlist")

View File

@ -17,7 +17,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Iterable, List, Set
from collections.abc import Iterable
from ahriman.application.application.application_packages import ApplicationPackages
from ahriman.application.application.application_repository import ApplicationRepository
@ -50,14 +50,14 @@ class Application(ApplicationPackages, ApplicationRepository):
be used instead.
"""
def _known_packages(self) -> Set[str]:
def _known_packages(self) -> set[str]:
"""
load packages from repository and pacman repositories
Returns:
Set[str]: list of known packages
set[str]: list of known packages
"""
known_packages: Set[str] = set()
known_packages: set[str] = set()
# local set
for base in self.repository.packages():
for package, properties in base.packages.items():
@ -89,15 +89,15 @@ class Application(ApplicationPackages, ApplicationRepository):
"""
self.repository.triggers.on_stop()
def with_dependencies(self, packages: List[Package], *, process_dependencies: bool) -> List[Package]:
def with_dependencies(self, packages: list[Package], *, process_dependencies: bool) -> list[Package]:
"""
add missing dependencies to list of packages
Args:
packages(List[Package]): list of source packages of which dependencies have to be processed
packages(list[Package]): list of source packages of which dependencies have to be processed
process_dependencies(bool): if no set, dependencies will not be processed
"""
def missing_dependencies(source: Iterable[Package]) -> Set[str]:
def missing_dependencies(source: Iterable[Package]) -> set[str]:
# build initial list of dependencies
result = set()
for package in source:

View File

@ -20,8 +20,9 @@
import requests
import shutil
from collections.abc import Iterable
from pathlib import Path
from typing import Any, Iterable
from typing import Any
from ahriman.application.application.application_properties import ApplicationProperties
from ahriman.core.build_tools.sources import Sources

View File

@ -21,6 +21,7 @@ from ahriman.core.configuration import Configuration
from ahriman.core.database import SQLite
from ahriman.core.log import LazyLogging
from ahriman.core.repository import Repository
from ahriman.models.pacman_synchronization import PacmanSynchronization
class ApplicationProperties(LazyLogging):
@ -34,8 +35,8 @@ class ApplicationProperties(LazyLogging):
repository(Repository): repository instance
"""
def __init__(self, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool, refresh_pacman_database: int = 0) -> None:
def __init__(self, architecture: str, configuration: Configuration, *, report: bool, unsafe: bool,
refresh_pacman_database: PacmanSynchronization = PacmanSynchronization.Disabled) -> None:
"""
default constructor
@ -44,8 +45,8 @@ class ApplicationProperties(LazyLogging):
configuration(Configuration): configuration instance
report(bool): force enable or disable reporting
unsafe(bool): if set no user check will be performed before path creation
refresh_pacman_database(int, optional): pacman database syncronization level, ``0`` is disabled
(Default value = 0)
refresh_pacman_database(PacmanSynchronization, optional): pacman database synchronization level
(Default value = PacmanSynchronization.Disabled)
"""
self.configuration = configuration
self.architecture = architecture

View File

@ -17,8 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from collections.abc import Callable, Iterable
from pathlib import Path
from typing import Callable, Iterable, List
from ahriman.application.application.application_properties import ApplicationProperties
from ahriman.core.build_tools.sources import Sources
@ -89,19 +89,19 @@ class ApplicationRepository(ApplicationProperties):
# process triggers
self.on_result(Result())
def unknown(self) -> List[str]:
def unknown(self) -> list[str]:
"""
get packages which were not found in AUR
Returns:
List[str]: unknown package archive list
list[str]: unknown package archive list
"""
def has_local(probe: Package) -> bool:
cache_dir = self.repository.paths.cache_for(probe.base)
return cache_dir.is_dir() and not Sources.has_remotes(cache_dir)
def unknown_aur(probe: Package) -> List[str]:
packages: List[str] = []
def unknown_aur(probe: Package) -> list[str]:
packages: list[str] = []
for single in probe.packages:
try:
_ = Package.from_aur(single, self.repository.pacman)
@ -109,7 +109,7 @@ class ApplicationRepository(ApplicationProperties):
packages.append(single)
return packages
def unknown_local(probe: Package) -> List[str]:
def unknown_local(probe: Package) -> list[str]:
cache_dir = self.repository.paths.cache_for(probe.base)
local = Package.from_build(cache_dir, self.architecture)
packages = set(probe.packages.keys()).difference(local.packages.keys())
@ -155,7 +155,7 @@ class ApplicationRepository(ApplicationProperties):
return build_result
def updates(self, filter_packages: Iterable[str], *,
aur: bool, local: bool, manual: bool, vcs: bool, log_fn: Callable[[str], None]) -> List[Package]:
aur: bool, local: bool, manual: bool, vcs: bool, log_fn: Callable[[str], None]) -> list[Package]:
"""
get list of packages to run update process
@ -168,7 +168,7 @@ class ApplicationRepository(ApplicationProperties):
log_fn(Callable[[str], None]): logger function to log updates
Returns:
List[Package]: list of out-of-dated packages
list[Package]: list of out-of-dated packages
"""
updates = {}

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -32,7 +30,7 @@ class Add(Handler):
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -22,7 +22,6 @@ import pwd
from pathlib import Path
from tarfile import TarFile
from typing import Set, Type
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -37,7 +36,7 @@ class Backup(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False # it should be called only as "no-architecture"
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -55,7 +54,7 @@ class Backup(Handler):
archive.add(backup_path)
@staticmethod
def get_paths(configuration: Configuration) -> Set[Path]:
def get_paths(configuration: Configuration) -> set[Path]:
"""
extract paths to back up
@ -63,7 +62,7 @@ class Backup(Handler):
configuration(Configuration): configuration instance
Returns:
Set[Path]: map of the filesystem paths
set[Path]: map of the filesystem paths
"""
paths = set(configuration.include.glob("*.ini"))

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -32,7 +30,7 @@ class Clean(Handler):
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -20,8 +20,6 @@
import argparse
import threading
from typing import Type
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -32,7 +30,7 @@ class Daemon(Handler):
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
from ahriman.core.formatters import ConfigurationPrinter
@ -34,7 +32,7 @@ class Dump(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -17,13 +17,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
import argparse
import logging
from multiprocessing import Pool
from typing import List, Type
from ahriman.application.lock import Lock
from ahriman.core.configuration import Configuration
@ -53,7 +50,7 @@ class Handler:
ALLOW_MULTI_ARCHITECTURE_RUN = True
@classmethod
def architectures_extract(cls: Type[Handler], args: argparse.Namespace) -> List[str]:
def architectures_extract(cls, args: argparse.Namespace) -> list[str]:
"""
get known architectures
@ -61,7 +58,7 @@ class Handler:
args(argparse.Namespace): command line args
Returns:
List[str]: list of architectures for which tree is created
list[str]: list of architectures for which tree is created
Raises:
MissingArchitecture: if no architecture set and automatic detection is not allowed or failed
@ -84,7 +81,7 @@ class Handler:
return sorted(architectures)
@classmethod
def call(cls: Type[Handler], args: argparse.Namespace, architecture: str) -> bool:
def call(cls, args: argparse.Namespace, architecture: str) -> bool:
"""
additional function to wrap all calls for multiprocessing library
@ -109,7 +106,7 @@ class Handler:
return False
@classmethod
def execute(cls: Type[Handler], args: argparse.Namespace) -> int:
def execute(cls, args: argparse.Namespace) -> int:
"""
execute function for all aru
@ -138,7 +135,7 @@ class Handler:
return 0 if all(result) else 1
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -33,7 +31,7 @@ class Help(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False # it should be called only as "no-architecture"
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -34,7 +32,7 @@ class KeyImport(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False # it should be called only as "no-architecture"
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -21,7 +21,6 @@ import argparse
import sys
from pathlib import Path
from typing import List, Optional, Tuple, Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
@ -39,7 +38,7 @@ class Patch(Handler):
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -66,17 +65,17 @@ class Patch(Handler):
Patch.patch_set_remove(application, args.package, args.variable)
@staticmethod
def patch_create_from_diff(sources_dir: Path, architecture: str, track: List[str]) -> Tuple[str, PkgbuildPatch]:
def patch_create_from_diff(sources_dir: Path, architecture: str, track: list[str]) -> tuple[str, PkgbuildPatch]:
"""
create PKGBUILD plain diff patches from sources directory
Args:
sources_dir(Path): path to directory with the package sources
architecture(str): repository architecture
track(List[str]): track files which match the glob before creating the patch
track(list[str]): track files which match the glob before creating the patch
Returns:
Tuple[str, PkgbuildPatch]: package base and created PKGBUILD patch based on the diff from master HEAD
tuple[str, PkgbuildPatch]: package base and created PKGBUILD patch based on the diff from master HEAD
to current files
"""
package = Package.from_build(sources_dir, architecture)
@ -84,13 +83,13 @@ class Patch(Handler):
return package.base, PkgbuildPatch(None, patch)
@staticmethod
def patch_create_from_function(variable: str, patch_path: Optional[Path]) -> PkgbuildPatch:
def patch_create_from_function(variable: str, patch_path: Path | None) -> PkgbuildPatch:
"""
create single-function patch set for the package base
Args:
variable(str): function or variable name inside PKGBUILD
patch_path(Path): optional path to patch content. If not set, it will be read from stdin
patch_path(Path | None): optional path to patch content. If not set, it will be read from stdin
Returns:
PkgbuildPatch: created patch for the PKGBUILD function
@ -116,15 +115,15 @@ class Patch(Handler):
application.database.patches_insert(package_base, patch)
@staticmethod
def patch_set_list(application: Application, package_base: Optional[str], variables: List[str],
def patch_set_list(application: Application, package_base: str | None, variables: list[str],
exit_code: bool) -> None:
"""
list patches available for the package base
Args:
application(Application): application instance
package_base(Optional[str]): package base
variables(List[str]): extract patches only for specified PKGBUILD variables
package_base(str | None): package base
variables(list[str]): extract patches only for specified PKGBUILD variables
exit_code(bool): exit with error on empty search result
"""
patches = application.database.patches_list(package_base, variables)
@ -134,13 +133,13 @@ class Patch(Handler):
PatchPrinter(base, patch).print(verbose=True, separator=" = ")
@staticmethod
def patch_set_remove(application: Application, package_base: str, variables: List[str]) -> None:
def patch_set_remove(application: Application, package_base: str, variables: list[str]) -> None:
"""
remove patch set for the package base
Args:
application(Application): application instance
package_base(str): package base
variables(List[str]): remove patches only for specified PKGBUILD variables
variables(list[str]): remove patches only for specified PKGBUILD variables
"""
application.database.patches_remove(package_base, variables)

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import List, Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -34,7 +32,7 @@ class Rebuild(Handler):
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -46,13 +44,11 @@ class Rebuild(Handler):
report(bool): force enable or disable reporting
unsafe(bool): if set no user check will be performed before path creation
"""
depends_on = set(args.depends_on) if args.depends_on else None
application = Application(architecture, configuration, report=report, unsafe=unsafe)
application.on_start()
packages = Rebuild.extract_packages(application, from_database=args.from_database)
updates = application.repository.packages_depend_on(packages, depends_on)
updates = application.repository.packages_depend_on(packages, args.depends_on or None)
Rebuild.check_if_empty(args.exit_code, not updates)
if args.dry_run:
@ -64,7 +60,7 @@ class Rebuild(Handler):
Rebuild.check_if_empty(args.exit_code, result.is_empty)
@staticmethod
def extract_packages(application: Application, *, from_database: bool) -> List[Package]:
def extract_packages(application: Application, *, from_database: bool) -> list[Package]:
"""
extract packages from database file
@ -73,7 +69,7 @@ class Rebuild(Handler):
from_database(bool): extract packages from database instead of repository filesystem
Returns:
List[Package]: list of packages which were stored in database
list[Package]: list of packages which were stored in database
"""
if from_database:
return [package for (package, _) in application.database.packages_get()]

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -32,7 +30,7 @@ class Remove(Handler):
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -33,7 +31,7 @@ class RemoveUnknown(Handler):
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -19,7 +19,6 @@
#
import argparse
from typing import Type
from tarfile import TarFile
from ahriman.application.handlers import Handler
@ -34,7 +33,7 @@ class Restore(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False # it should be called only as "no-architecture"
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -20,7 +20,7 @@
import argparse
from dataclasses import fields
from typing import Callable, Iterable, List, Tuple, Type
from collections.abc import Callable, Iterable
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
@ -36,14 +36,18 @@ class Search(Handler):
packages search handler
Attributes:
SORT_FIELDS(Set[str]): (class attribute) allowed fields to sort the package list
SORT_FIELDS(set[str]): (class attribute) allowed fields to sort the package list
"""
ALLOW_AUTO_ARCHITECTURE_RUN = False # it should be called only as "no-architecture"
SORT_FIELDS = {field.name for field in fields(AURPackage) if field.default_factory is not list} # type: ignore
SORT_FIELDS = {
field.name
for field in fields(AURPackage)
if field.default_factory is not list # type: ignore[comparison-overlap]
}
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -67,7 +71,7 @@ class Search(Handler):
AurPrinter(package).print(verbose=args.info)
@staticmethod
def sort(packages: Iterable[AURPackage], sort_by: str) -> List[AURPackage]:
def sort(packages: Iterable[AURPackage], sort_by: str) -> list[AURPackage]:
"""
sort package list by specified field
@ -76,7 +80,7 @@ class Search(Handler):
sort_by(str): AUR package field name to sort by
Returns:
List[AURPackage]: sorted list for packages
list[AURPackage]: sorted list for packages
Raises:
InvalidOption: if search fields is not in list of allowed ones
@ -85,6 +89,6 @@ class Search(Handler):
raise OptionError(sort_by)
# always sort by package name at the last
# well technically it is not a string, but we can deal with it
comparator: Callable[[AURPackage], Tuple[str, str]] =\
comparator: Callable[[AURPackage], tuple[str, str]] =\
lambda package: (getattr(package, sort_by), package.name)
return sorted(packages, key=comparator)

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman import version
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
@ -37,7 +35,7 @@ class ServiceUpdates(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False # it should be called only as "no-architecture"
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -21,7 +21,6 @@ import argparse
from pathlib import Path
from pwd import getpwuid
from typing import Optional, Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
@ -34,19 +33,17 @@ class Setup(Handler):
setup handler
Attributes:
ARCHBUILD_COMMAND_PATH(Path): (class attribute) default devtools command
MIRRORLIST_PATH(Path): (class attribute) path to pacman default mirrorlist (used by multilib repository)
SUDOERS_DIR_PATH(Path): (class attribute) path to sudoers.d includes directory
"""
ALLOW_AUTO_ARCHITECTURE_RUN = False
ARCHBUILD_COMMAND_PATH = Path("/usr/bin/archbuild")
MIRRORLIST_PATH = Path("/etc/pacman.d/mirrorlist")
SUDOERS_DIR_PATH = Path("/etc/sudoers.d")
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -64,30 +61,14 @@ class Setup(Handler):
application = Application(architecture, configuration, report=report, unsafe=unsafe)
Setup.configuration_create_makepkg(args.packager, args.makeflags_jobs, application.repository.paths)
Setup.executable_create(application.repository.paths, args.build_command, architecture)
Setup.configuration_create_devtools(args.build_command, architecture, args.from_configuration, args.mirror,
Setup.configuration_create_devtools(architecture, args.from_configuration, args.mirror,
args.multilib, args.repository, application.repository.paths)
Setup.configuration_create_sudo(application.repository.paths, args.build_command, architecture)
Setup.configuration_create_sudo(args.build_command)
application.repository.repo.init()
# lazy database sync
application.repository.pacman.handle # pylint: disable=pointless-statement
@staticmethod
def build_command(root: Path, prefix: str, architecture: str) -> Path:
"""
generate build command name
Args:
root(Path): root directory for the build command (must be root of the repository)
prefix(str): command prefix in {prefix}-{architecture}-build
architecture(str): repository architecture
Returns:
Path: valid devtools command name
"""
return root / f"{prefix}-{architecture}-build"
@staticmethod
def configuration_create_ahriman(args: argparse.Namespace, architecture: str, repository: str,
root: Configuration) -> None:
@ -102,12 +83,10 @@ class Setup(Handler):
"""
configuration = Configuration()
section = Configuration.section_name("build", architecture)
build_command = Setup.build_command(root.repository_paths.root, args.build_command, architecture)
configuration.set_option(section, "build_command", str(build_command))
configuration.set_option("repository", "name", repository)
if args.build_as_user is not None:
configuration.set_option(section, "makechrootpkg_flags", f"-U {args.build_as_user}")
section = Configuration.section_name("build", architecture)
configuration.set_option(section, "build_command", str(args.build_command))
section = Configuration.section_name("alpm", architecture)
if args.mirror is not None:
@ -132,7 +111,7 @@ class Setup(Handler):
configuration.write(ahriman_configuration)
@staticmethod
def configuration_create_devtools(prefix: str, architecture: str, source: Path, mirror: Optional[str],
def configuration_create_devtools(architecture: str, source: Path, mirror: str | None,
multilib: bool, repository: str, paths: RepositoryPaths) -> None:
"""
create configuration for devtools based on ``source`` configuration
@ -141,10 +120,9 @@ class Setup(Handler):
devtools does not allow to specify the pacman configuration, thus we still have to use configuration in /usr
Args:
prefix(str): command prefix in {prefix}-{architecture}-build
architecture(str): repository architecture
source(Path): path to source configuration file
mirror(Optional[str]): link to package server mirror
mirror(str | None): link to package server mirror
multilib(bool): add or do not multilib repository to the configuration
repository(str): repository name
paths(RepositoryPaths): repository paths instance
@ -154,7 +132,7 @@ class Setup(Handler):
configuration = Configuration(allow_no_value=True)
# preserve case
# stupid mypy thinks that it is impossible
configuration.optionxform = lambda key: key # type: ignore
configuration.optionxform = lambda key: key # type: ignore[method-assign]
# load default configuration first
# we cannot use Include here because it will be copied to new chroot, thus no includes there
@ -179,7 +157,7 @@ class Setup(Handler):
configuration.set_option(repository, "SigLevel", "Optional TrustAll") # we don't care
configuration.set_option(repository, "Server", f"file://{paths.repository}")
target = source.parent / f"pacman-{prefix}-{architecture}.conf"
target = source.parent / f"{repository}.conf"
with target.open("w") as devtools_configuration:
configuration.write(devtools_configuration)
@ -203,31 +181,13 @@ class Setup(Handler):
(home_dir / ".makepkg.conf").write_text(content, encoding="utf8")
@staticmethod
def configuration_create_sudo(paths: RepositoryPaths, prefix: str, architecture: str) -> None:
def configuration_create_sudo(build_command: Path) -> None:
"""
create configuration to run build command with sudo without password
Args:
paths(RepositoryPaths): repository paths instance
prefix(str): command prefix in {prefix}-{architecture}-build
architecture(str): repository architecture
build_command(Path): path to build command
"""
command = Setup.build_command(paths.root, prefix, architecture)
sudoers_file = Setup.build_command(Setup.SUDOERS_DIR_PATH, prefix, architecture)
sudoers_file.write_text(f"ahriman ALL=(ALL) NOPASSWD: {command} *\n", encoding="utf8")
sudoers_file = Setup.SUDOERS_DIR_PATH / f"ahriman-{build_command.name}"
sudoers_file.write_text(f"ahriman ALL=(ALL) NOPASSWD: {build_command} build *\n", encoding="utf8")
sudoers_file.chmod(0o400) # security!
@staticmethod
def executable_create(paths: RepositoryPaths, prefix: str, architecture: str) -> None:
"""
create executable for the service
Args:
paths(RepositoryPaths): repository paths instance
prefix(str): command prefix in {prefix}-{architecture}-build
architecture(str): repository architecture
"""
command = Setup.build_command(paths.root, prefix, architecture)
command.unlink(missing_ok=True)
command.symlink_to(Setup.ARCHBUILD_COMMAND_PATH)
paths.chown(command) # we would like to keep owner inside ahriman's home

View File

@ -22,7 +22,6 @@ import code
import sys
from pathlib import Path
from typing import Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
@ -38,7 +37,7 @@ class Shell(Handler):
ALLOW_MULTI_ARCHITECTURE_RUN = False
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -56,7 +55,7 @@ class Shell(Handler):
# licensed by https://creativecommons.org/licenses/by-sa/3.0
path = Path(sys.prefix) / "share" / "ahriman" / "templates" / "shell"
StringPrinter(path.read_text(encoding="utf8")).print(verbose=False)
# we only want to pass application isntance inside
# we only want to pass application instance inside
if args.code is None:
code.interact(local={"application": application})
else:

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -32,7 +30,7 @@ class Sign(Handler):
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -19,7 +19,7 @@
#
import argparse
from typing import Callable, Iterable, Tuple, Type
from collections.abc import Callable
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
@ -37,7 +37,7 @@ class Status(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -55,7 +55,7 @@ class Status(Handler):
service_status = client.get_internal()
StatusPrinter(service_status.status).print(verbose=args.info)
if args.package:
packages: Iterable[Tuple[Package, BuildStatus]] = sum(
packages: list[tuple[Package, BuildStatus]] = sum(
(client.get(base) for base in args.package),
start=[])
else:
@ -63,8 +63,8 @@ class Status(Handler):
Status.check_if_empty(args.exit_code, not packages)
comparator: Callable[[Tuple[Package, BuildStatus]], str] = lambda item: item[0].base
filter_fn: Callable[[Tuple[Package, BuildStatus]], bool] =\
comparator: Callable[[tuple[Package, BuildStatus]], str] = lambda item: item[0].base
filter_fn: Callable[[tuple[Package, BuildStatus]], bool] =\
lambda item: args.status is None or item[1].status == args.status
for package, package_status in sorted(filter(filter_fn, packages), key=comparator):
PackagePrinter(package, package_status).print(verbose=args.info)

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -35,7 +33,7 @@ class StatusUpdate(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -36,7 +34,7 @@ class Structure(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -33,7 +31,7 @@ class Triggers(Handler):
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -20,8 +20,6 @@
import argparse
import shlex
from typing import List, Type
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
from ahriman.core.formatters import StringPrinter
@ -35,7 +33,7 @@ class UnsafeCommands(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False # it should be called only as "no-architecture"
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -56,20 +54,20 @@ class UnsafeCommands(Handler):
UnsafeCommands.check_unsafe(args.command, unsafe_commands, parser)
@staticmethod
def check_unsafe(command: str, unsafe_commands: List[str], parser: argparse.ArgumentParser) -> None:
def check_unsafe(command: str, unsafe_commands: list[str], parser: argparse.ArgumentParser) -> None:
"""
check if command is unsafe
Args:
command(str): command to check
unsafe_commands(List[str]): list of unsafe commands
unsafe_commands(list[str]): list of unsafe commands
parser(argparse.ArgumentParser): generated argument parser
"""
args = parser.parse_args(shlex.split(command))
UnsafeCommands.check_if_empty(True, args.command in unsafe_commands)
@staticmethod
def get_unsafe_commands(parser: argparse.ArgumentParser) -> List[str]:
def get_unsafe_commands(parser: argparse.ArgumentParser) -> list[str]:
"""
extract unsafe commands from argument parser
@ -77,7 +75,7 @@ class UnsafeCommands(Handler):
parser(argparse.ArgumentParser): generated argument parser
Returns:
List[str]: list of commands with default unsafe flag
list[str]: list of commands with default unsafe flag
"""
# should never fail
# pylint: disable=protected-access

View File

@ -19,7 +19,7 @@
#
import argparse
from typing import Callable, Type
from collections.abc import Callable
from ahriman.application.application import Application
from ahriman.application.handlers import Handler
@ -32,7 +32,7 @@ class Update(Handler):
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -21,7 +21,6 @@ import argparse
import getpass
from pathlib import Path
from typing import Optional, Tuple, Type
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -40,7 +39,7 @@ class Users(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False # it should be called only as "no-architecture"
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -119,7 +118,7 @@ class Users(Handler):
path.chmod(0o600)
@staticmethod
def get_salt(configuration: Configuration, salt_length: int = 20) -> Tuple[Optional[str], str]:
def get_salt(configuration: Configuration, salt_length: int = 20) -> tuple[str | None, str]:
"""
get salt from configuration or create new string
@ -128,7 +127,7 @@ class Users(Handler):
salt_length(int, optional): salt length (Default value = 20)
Returns:
Tuple[Optional[str], str]: tuple containing salt from configuration if any and actual salt which must be
tuple[str | None, str]: tuple containing salt from configuration if any and actual salt which must be
used for password hash
"""
if salt := configuration.get("auth", "salt", fallback=None):

View File

@ -20,7 +20,7 @@
import argparse
import copy
from typing import Any, Dict, Type
from typing import Any
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
@ -39,7 +39,7 @@ class Validate(Handler):
ALLOW_AUTO_ARCHITECTURE_RUN = False
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -112,17 +112,17 @@ class Validate(Handler):
return schema
@staticmethod
def schema_merge(source: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
def schema_merge(source: dict[str, Any], schema: dict[str, Any]) -> dict[str, Any]:
"""
merge child schema into source. In case if source already contains values, new keys will be added
(possibly with overrides - in case if such key already set also)
Args:
source(Dict[str, Any]): source (current) schema into which will be merged
schema(Dict[str, Any]): new schema to be merged
source(dict[str, Any]): source (current) schema into which will be merged
schema(dict[str, Any]): new schema to be merged
Returns:
Dict[str, Any]: schema with added elements from source schema if they were set before and not presented
dict[str, Any]: schema with added elements from source schema if they were set before and not presented
in the new one. Note, that schema will be modified in-place
"""
for key, value in source.items():

View File

@ -18,10 +18,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
import pkg_resources
import re
import sys
from typing import Dict, List, Tuple, Type
from collections.abc import Generator
from importlib import metadata
from ahriman import version
from ahriman.application.handlers import Handler
@ -32,12 +33,16 @@ from ahriman.core.formatters import VersionPrinter
class Versions(Handler):
"""
version handler
Attributes:
PEP423_PACKAGE_NAME(str): (class attribute) special regex for valid PEP423 package name
"""
ALLOW_AUTO_ARCHITECTURE_RUN = False # it should be called only as "no-architecture"
PEP423_PACKAGE_NAME = re.compile(r"^[A-Za-z0-9._-]+")
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line
@ -51,37 +56,43 @@ class Versions(Handler):
"""
VersionPrinter(f"Module version {version.__version__}",
{"Python": sys.version}).print(verbose=False, separator=" ")
packages = Versions.package_dependencies("ahriman", ("pacman", "s3", "web"))
VersionPrinter("Installed packages", packages).print(verbose=False, separator=" ")
packages = Versions.package_dependencies("ahriman")
VersionPrinter("Installed packages", dict(packages)).print(verbose=False, separator=" ")
@staticmethod
def package_dependencies(root: str, root_extras: Tuple[str, ...] = ()) -> Dict[str, str]:
def package_dependencies(root: str) -> Generator[tuple[str, str], None, None]:
"""
extract list of ahriman package dependencies installed into system with their versions
Args:
root(str): root package name
root_extras(Tuple[str, ...], optional): extras for the root package (Default value = ())
Returns:
Dict[str, str]: map of installed dependency to its version
Generator[tuple[str, str], None, None]: map of installed dependency to its version
"""
resources: Dict[str, pkg_resources.Distribution] = pkg_resources.working_set.by_key # type: ignore
def dependencies_by_key(key: str) -> Generator[str, None, None]:
# in importlib it returns requires in the following format
# ["pytest (>=3.0.0) ; extra == 'test'", "pytest-cov ; extra == 'test'"]
try:
requires = metadata.requires(key)
except metadata.PackageNotFoundError:
return
for entry in requires or []:
yield from Versions.PEP423_PACKAGE_NAME.findall(entry)
def dependencies_by_key(key: str, extras: Tuple[str, ...] = ()) -> List[str]:
return [entry.key for entry in resources[key].requires(extras)]
keys: List[str] = []
portion = {key for key in dependencies_by_key(root, root_extras) if key in resources}
keys: list[str] = []
portion = set(dependencies_by_key(root))
while portion:
keys.extend(portion)
portion = {
key
for key in sum((dependencies_by_key(key) for key in portion), start=[])
if key not in keys and key in resources
for key in sum((list(dependencies_by_key(key)) for key in portion), start=[])
if key not in keys
}
return {
resource.project_name: resource.version
for resource in map(lambda key: resources[key], keys)
}
for key in keys:
try:
distribution = metadata.distribution(key)
yield distribution.name, distribution.version
except metadata.PackageNotFoundError:
continue

View File

@ -19,8 +19,6 @@
#
import argparse
from typing import Type
from ahriman.application.handlers import Handler
from ahriman.core.configuration import Configuration
from ahriman.core.spawn import Spawn
@ -35,7 +33,7 @@ class Web(Handler):
ALLOW_MULTI_ARCHITECTURE_RUN = False # required to be able to spawn external processes
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration, *,
def run(cls, args: argparse.Namespace, architecture: str, configuration: Configuration, *,
report: bool, unsafe: bool) -> None:
"""
callback for command line

View File

@ -17,12 +17,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
import argparse
from types import TracebackType
from typing import Literal, Optional, Type
from typing import Literal, Self
from ahriman import version
from ahriman.core.configuration import Configuration
@ -111,7 +109,7 @@ class Lock(LazyLogging):
except FileExistsError:
raise DuplicateRunError()
def __enter__(self) -> Lock:
def __enter__(self) -> Self:
"""
default workflow is the following:
@ -120,6 +118,9 @@ class Lock(LazyLogging):
3. Check web status watcher status
4. Create lock file
5. Report to status page if enabled
Returns:
Self: always instance of self
"""
self.check_user()
self.check_version()
@ -127,14 +128,14 @@ class Lock(LazyLogging):
self.reporter.update_self(BuildStatusEnum.Building)
return self
def __exit__(self, exc_type: Optional[Type[Exception]], exc_val: Optional[Exception],
def __exit__(self, exc_type: type[Exception] | None, exc_val: Exception | None,
exc_tb: TracebackType) -> Literal[False]:
"""
remove lock file when done
Args:
exc_type(Optional[Type[Exception]]): exception type name if any
exc_val(Optional[Exception]): exception raised if any
exc_type(type[Exception] | None): exception type name if any
exc_val(Exception | None): exception raised if any
exc_tb(TracebackType): exception traceback if any
Returns:

View File

@ -17,8 +17,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from collections.abc import Iterator
from contextvars import ContextVar
from typing import Any, Dict, Iterator, TypeVar
from typing import Any, TypeVar
from ahriman.models.context_key import ContextKey
@ -35,7 +36,7 @@ class _Context:
"""
default constructor. Must not be used directly
"""
self._content: Dict[str, Any] = {}
self._content: dict[str, Any] = {}
def get(self, key: ContextKey[T]) -> T:
"""

View File

@ -19,13 +19,15 @@
#
import shutil
from collections.abc import Callable, Generator
from pathlib import Path
from pyalpm import DB, Handle, Package, SIG_PACKAGE, error as PyalpmError # type: ignore
from typing import Any, Callable, Generator, Set
from pyalpm import DB, Handle, Package, SIG_PACKAGE, error as PyalpmError # type: ignore[import]
from typing import Any
from ahriman.core.configuration import Configuration
from ahriman.core.log import LazyLogging
from ahriman.core.util import trim_package
from ahriman.models.pacman_synchronization import PacmanSynchronization
from ahriman.models.repository_paths import RepositoryPaths
@ -39,28 +41,28 @@ class Pacman(LazyLogging):
handle: Handle
def __init__(self, architecture: str, configuration: Configuration, *, refresh_database: int) -> None:
def __init__(self, architecture: str, configuration: Configuration, *,
refresh_database: PacmanSynchronization) -> None:
"""
default constructor
Args:
architecture(str): repository architecture
configuration(Configuration): configuration instance
refresh_database(int): synchronize local cache to remote. If set to ``0``, no synchronization will be
enabled, if set to ``1`` - normal synchronization, if set to ``2`` - force synchronization
refresh_database(PacmanSynchronization): synchronize local cache to remote
"""
self.__create_handle_fn: Callable[[], Handle] = lambda: self.__create_handle(
architecture, configuration, refresh_database=refresh_database)
def __create_handle(self, architecture: str, configuration: Configuration, *, refresh_database: int) -> Handle:
def __create_handle(self, architecture: str, configuration: Configuration, *,
refresh_database: PacmanSynchronization) -> Handle:
"""
create lazy handle function
Args:
architecture(str): repository architecture
configuration(Configuration): configuration instance
refresh_database(int): synchronize local cache to remote. If set to ``0``, no synchronization will be
enabled, if set to ``1`` - normal synchronization, if set to ``2`` - force synchronization
refresh_database(PacmanSynchronization): synchronize local cache to remote
Returns:
Handle: fully initialized pacman handle
@ -78,7 +80,7 @@ class Pacman(LazyLogging):
self.database_copy(handle, database, pacman_root, paths, use_ahriman_cache=use_ahriman_cache)
if use_ahriman_cache and refresh_database:
self.database_sync(handle, force=refresh_database > 1)
self.database_sync(handle, force=refresh_database == PacmanSynchronization.Force)
return handle
@ -166,14 +168,14 @@ class Pacman(LazyLogging):
continue
yield package
def packages(self) -> Set[str]:
def packages(self) -> set[str]:
"""
get list of packages known for alpm
Returns:
Set[str]: list of package names
set[str]: list of package names
"""
result: Set[str] = set()
result: set[str] = set()
for database in self.handle.get_syncdbs():
for package in database.pkgcache:
# package itself

View File

@ -19,7 +19,7 @@
#
import requests
from typing import Any, Dict, List, Type
from typing import Any
from ahriman.core.alpm.pacman import Pacman
from ahriman.core.alpm.remote import Remote
@ -44,28 +44,8 @@ class AUR(Remote):
DEFAULT_RPC_VERSION = "5"
DEFAULT_TIMEOUT = 30
@staticmethod
def parse_response(response: Dict[str, Any]) -> List[AURPackage]:
"""
parse RPC response to package list
Args:
response(Dict[str, Any]): RPC response json
Returns:
List[AURPackage]: list of parsed packages
Raises:
InvalidPackageInfo: for error API response
"""
response_type = response["type"]
if response_type == "error":
error_details = response.get("error", "Unknown API error")
raise PackageInfoError(error_details)
return [AURPackage.from_json(package) for package in response["results"]]
@classmethod
def remote_git_url(cls: Type[Remote], package_base: str, repository: str) -> str:
def remote_git_url(cls, package_base: str, repository: str) -> str:
"""
generate remote git url from the package base
@ -79,7 +59,7 @@ class AUR(Remote):
return f"{AUR.DEFAULT_AUR_URL}/{package_base}.git"
@classmethod
def remote_web_url(cls: Type[Remote], package_base: str) -> str:
def remote_web_url(cls, package_base: str) -> str:
"""
generate remote web url from the package base
@ -91,7 +71,27 @@ class AUR(Remote):
"""
return f"{AUR.DEFAULT_AUR_URL}/packages/{package_base}"
def make_request(self, request_type: str, *args: str, **kwargs: str) -> List[AURPackage]:
@staticmethod
def parse_response(response: dict[str, Any]) -> list[AURPackage]:
"""
parse RPC response to package list
Args:
response(dict[str, Any]): RPC response json
Returns:
list[AURPackage]: list of parsed packages
Raises:
InvalidPackageInfo: for error API response
"""
response_type = response["type"]
if response_type == "error":
error_details = response.get("error", "Unknown API error")
raise PackageInfoError(error_details)
return [AURPackage.from_json(package) for package in response["results"]]
def make_request(self, request_type: str, *args: str, **kwargs: str) -> list[AURPackage]:
"""
perform request to AUR RPC
@ -101,9 +101,9 @@ class AUR(Remote):
**kwargs(str): list of additional named parameters like by
Returns:
List[AURPackage]: response parsed to package list
list[AURPackage]: response parsed to package list
"""
query: Dict[str, Any] = {
query: dict[str, Any] = {
"type": request_type,
"v": self.DEFAULT_RPC_VERSION
}
@ -145,7 +145,7 @@ class AUR(Remote):
except StopIteration:
raise UnknownPackageError(package_name)
def package_search(self, *keywords: str, pacman: Pacman) -> List[AURPackage]:
def package_search(self, *keywords: str, pacman: Pacman) -> list[AURPackage]:
"""
search package in AUR web
@ -154,6 +154,6 @@ class AUR(Remote):
pacman(Pacman): alpm wrapper instance
Returns:
List[AURPackage]: list of packages which match the criteria
list[AURPackage]: list of packages which match the criteria
"""
return self.make_request("search", *keywords, by="name-desc")

View File

@ -19,7 +19,7 @@
#
import requests
from typing import Any, Dict, List, Type
from typing import Any
from ahriman.core.alpm.pacman import Pacman
from ahriman.core.alpm.remote import Remote
@ -34,7 +34,7 @@ class Official(Remote):
Attributes:
DEFAULT_ARCHLINUX_URL(str): (class attribute) default archlinux url
DEFAULT_SEARCH_REPOSITORIES(List[str]): (class attribute) default list of repositories to search
DEFAULT_SEARCH_REPOSITORIES(list[str]): (class attribute) default list of repositories to search
DEFAULT_RPC_URL(str): (class attribute) default archlinux repositories RPC url
DEFAULT_TIMEOUT(int): (class attribute) HTTP request timeout in seconds
"""
@ -44,26 +44,8 @@ class Official(Remote):
DEFAULT_RPC_URL = "https://archlinux.org/packages/search/json"
DEFAULT_TIMEOUT = 30
@staticmethod
def parse_response(response: Dict[str, Any]) -> List[AURPackage]:
"""
parse RPC response to package list
Args:
response(Dict[str, Any]): RPC response json
Returns:
List[AURPackage]: list of parsed packages
Raises:
InvalidPackageInfo: for error API response
"""
if not response["valid"]:
raise PackageInfoError("API validation error")
return [AURPackage.from_repo(package) for package in response["results"]]
@classmethod
def remote_git_url(cls: Type[Remote], package_base: str, repository: str) -> str:
def remote_git_url(cls, package_base: str, repository: str) -> str:
"""
generate remote git url from the package base
@ -79,7 +61,7 @@ class Official(Remote):
return "https://github.com/archlinux/svntogit-community.git"
@classmethod
def remote_web_url(cls: Type[Remote], package_base: str) -> str:
def remote_web_url(cls, package_base: str) -> str:
"""
generate remote web url from the package base
@ -91,7 +73,25 @@ class Official(Remote):
"""
return f"{Official.DEFAULT_ARCHLINUX_URL}/packages/{package_base}"
def make_request(self, *args: str, by: str) -> List[AURPackage]:
@staticmethod
def parse_response(response: dict[str, Any]) -> list[AURPackage]:
"""
parse RPC response to package list
Args:
response(dict[str, Any]): RPC response json
Returns:
list[AURPackage]: list of parsed packages
Raises:
InvalidPackageInfo: for error API response
"""
if not response["valid"]:
raise PackageInfoError("API validation error")
return [AURPackage.from_repo(package) for package in response["results"]]
def make_request(self, *args: str, by: str) -> list[AURPackage]:
"""
perform request to official repositories RPC
@ -100,7 +100,7 @@ class Official(Remote):
by(str): search by the field
Returns:
List[AURPackage]: response parsed to package list
list[AURPackage]: response parsed to package list
"""
try:
response = requests.get(
@ -133,7 +133,7 @@ class Official(Remote):
except StopIteration:
raise UnknownPackageError(package_name)
def package_search(self, *keywords: str, pacman: Pacman) -> List[AURPackage]:
def package_search(self, *keywords: str, pacman: Pacman) -> list[AURPackage]:
"""
search package in AUR web
@ -142,6 +142,6 @@ class Official(Remote):
pacman(Pacman): alpm wrapper instance
Returns:
List[AURPackage]: list of packages which match the criteria
list[AURPackage]: list of packages which match the criteria
"""
return self.make_request(*keywords, by="q")

View File

@ -17,10 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
from typing import Dict, List, Type
from ahriman.core.alpm.pacman import Pacman
from ahriman.core.log import LazyLogging
from ahriman.models.aur_package import AURPackage
@ -44,7 +40,7 @@ class Remote(LazyLogging):
"""
@classmethod
def info(cls: Type[Remote], package_name: str, *, pacman: Pacman) -> AURPackage:
def info(cls, package_name: str, *, pacman: Pacman) -> AURPackage:
"""
get package info by its name
@ -58,7 +54,7 @@ class Remote(LazyLogging):
return cls().package_info(package_name, pacman=pacman)
@classmethod
def multisearch(cls: Type[Remote], *keywords: str, pacman: Pacman) -> List[AURPackage]:
def multisearch(cls, *keywords: str, pacman: Pacman) -> list[AURPackage]:
"""
search in remote repository by using API with multiple words. This method is required in order to handle
https://bugs.archlinux.org/task/49133. In addition, short words will be dropped
@ -68,10 +64,10 @@ class Remote(LazyLogging):
pacman(Pacman): alpm wrapper instance
Returns:
List[AURPackage]: list of packages each of them matches all search terms
list[AURPackage]: list of packages each of them matches all search terms
"""
instance = cls()
packages: Dict[str, AURPackage] = {}
packages: dict[str, AURPackage] = {}
for term in filter(lambda word: len(word) >= 3, keywords):
portion = instance.search(term, pacman=pacman)
packages = {
@ -82,7 +78,7 @@ class Remote(LazyLogging):
return list(packages.values())
@classmethod
def remote_git_url(cls: Type[Remote], package_base: str, repository: str) -> str:
def remote_git_url(cls, package_base: str, repository: str) -> str:
"""
generate remote git url from the package base
@ -99,7 +95,7 @@ class Remote(LazyLogging):
raise NotImplementedError
@classmethod
def remote_web_url(cls: Type[Remote], package_base: str) -> str:
def remote_web_url(cls, package_base: str) -> str:
"""
generate remote web url from the package base
@ -115,7 +111,7 @@ class Remote(LazyLogging):
raise NotImplementedError
@classmethod
def search(cls: Type[Remote], *keywords: str, pacman: Pacman) -> List[AURPackage]:
def search(cls, *keywords: str, pacman: Pacman) -> list[AURPackage]:
"""
search package in AUR web
@ -124,7 +120,7 @@ class Remote(LazyLogging):
pacman(Pacman): alpm wrapper instance
Returns:
List[AURPackage]: list of packages which match the criteria
list[AURPackage]: list of packages which match the criteria
"""
return cls().package_search(*keywords, pacman=pacman)
@ -144,7 +140,7 @@ class Remote(LazyLogging):
"""
raise NotImplementedError
def package_search(self, *keywords: str, pacman: Pacman) -> List[AURPackage]:
def package_search(self, *keywords: str, pacman: Pacman) -> list[AURPackage]:
"""
search package in AUR web
@ -153,7 +149,7 @@ class Remote(LazyLogging):
pacman(Pacman): alpm wrapper instance
Returns:
List[AURPackage]: list of packages which match the criteria
list[AURPackage]: list of packages which match the criteria
Raises:
NotImplementedError: not implemented method

View File

@ -18,7 +18,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from pathlib import Path
from typing import List
from ahriman.core.exceptions import BuildError
from ahriman.core.log import LazyLogging
@ -33,20 +32,20 @@ class Repo(LazyLogging):
Attributes:
name(str): repository name
paths(RepositoryPaths): repository paths instance
sign_args(List[str]): additional args which have to be used to sign repository archive
sign_args(list[str]): additional args which have to be used to sign repository archive
uid(int): uid of the repository owner user
"""
_check_output = check_output
def __init__(self, name: str, paths: RepositoryPaths, sign_args: List[str]) -> None:
def __init__(self, name: str, paths: RepositoryPaths, sign_args: list[str]) -> None:
"""
default constructor
Args:
name(str): repository name
paths(RepositoryPaths): repository paths instance
sign_args(List[str]): additional args which have to be used to sign repository archive
sign_args(list[str]): additional args which have to be used to sign repository archive
"""
self.name = name
self.paths = paths

View File

@ -19,8 +19,6 @@
#
from __future__ import annotations
from typing import Optional, Type
from ahriman.core.configuration import Configuration
from ahriman.core.database import SQLite
from ahriman.core.log import LazyLogging
@ -64,8 +62,8 @@ class Auth(LazyLogging):
"""
return """<button type="button" class="btn btn-link" data-bs-toggle="modal" data-bs-target="#login-modal" style="text-decoration: none"><i class="bi bi-box-arrow-in-right"></i> login</button>"""
@classmethod
def load(cls: Type[Auth], configuration: Configuration, database: SQLite) -> Auth:
@staticmethod
def load(configuration: Configuration, database: SQLite) -> Auth:
"""
load authorization module from settings
@ -83,15 +81,15 @@ class Auth(LazyLogging):
if provider == AuthSettings.OAuth:
from ahriman.core.auth.oauth import OAuth
return OAuth(configuration, database)
return cls(configuration)
return Auth(configuration)
async def check_credentials(self, username: Optional[str], password: Optional[str]) -> bool:
async def check_credentials(self, username: str | None, password: str | None) -> bool:
"""
validate user password
Args:
username(Optional[str]): username
password(Optional[str]): entered password
username(str | None): username
password(str | None): entered password
Returns:
bool: True in case if password matches, False otherwise
@ -99,12 +97,12 @@ class Auth(LazyLogging):
del username, password
return True
async def known_username(self, username: Optional[str]) -> bool:
async def known_username(self, username: str | None) -> bool:
"""
check if user is known
Args:
username(Optional[str]): username
username(str | None): username
Returns:
bool: True in case if user is known and can be authorized and False otherwise
@ -112,14 +110,14 @@ class Auth(LazyLogging):
del username
return True
async def verify_access(self, username: str, required: UserAccess, context: Optional[str]) -> bool:
async def verify_access(self, username: str, required: UserAccess, context: str | None) -> bool:
"""
validate if user has access to requested resource
Args:
username(str): username
required(UserAccess): required access level
context(Optional[str]): URI request path
context(str | None): URI request path
Returns:
bool: True in case if user is allowed to do this request and False otherwise

View File

@ -20,7 +20,7 @@
from typing import Any
try:
import aiohttp_security # type: ignore
import aiohttp_security # type: ignore[import]
_has_aiohttp_security = True
except ImportError:
_has_aiohttp_security = False

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Optional
from ahriman.core.auth import Auth
from ahriman.core.configuration import Configuration
from ahriman.core.database import SQLite
@ -50,13 +48,13 @@ class Mapping(Auth):
self.database = database
self.salt = configuration.get("auth", "salt")
async def check_credentials(self, username: Optional[str], password: Optional[str]) -> bool:
async def check_credentials(self, username: str | None, password: str | None) -> bool:
"""
validate user password
Args:
username(Optional[str]): username
password(Optional[str]): entered password
username(str | None): username
password(str | None): entered password
Returns:
bool: True in case if password matches, False otherwise
@ -66,7 +64,7 @@ class Mapping(Auth):
user = self.get_user(username)
return user is not None and user.check_credentials(password, self.salt)
def get_user(self, username: str) -> Optional[User]:
def get_user(self, username: str) -> User | None:
"""
retrieve user from in-memory mapping
@ -74,30 +72,30 @@ class Mapping(Auth):
username(str): username
Returns:
Optional[User]: user descriptor if username is known and None otherwise
User | None: user descriptor if username is known and None otherwise
"""
return self.database.user_get(username)
async def known_username(self, username: Optional[str]) -> bool:
async def known_username(self, username: str | None) -> bool:
"""
check if user is known
Args:
username(Optional[str]): username
username(str | None): username
Returns:
bool: True in case if user is known and can be authorized and False otherwise
"""
return username is not None and self.get_user(username) is not None
async def verify_access(self, username: str, required: UserAccess, context: Optional[str]) -> bool:
async def verify_access(self, username: str, required: UserAccess, context: str | None) -> bool:
"""
validate if user has access to requested resource
Args:
username(str): username
required(UserAccess): required access level
context(Optional[str]): URI request path
context(str | None): URI request path
Returns:
bool: True in case if user is allowed to do this request and False otherwise

View File

@ -19,8 +19,6 @@
#
import aioauth_client
from typing import Optional, Type
from ahriman.core.auth.mapping import Mapping
from ahriman.core.configuration import Configuration
from ahriman.core.database import SQLite
@ -72,7 +70,7 @@ class OAuth(Mapping):
return """<a class="nav-link" href="/api/v1/login" title="login via OAuth2"><i class="bi bi-google"></i> login</a>"""
@staticmethod
def get_provider(name: str) -> Type[aioauth_client.OAuth2Client]:
def get_provider(name: str) -> type[aioauth_client.OAuth2Client]:
"""
load OAuth2 provider by name
@ -80,12 +78,12 @@ class OAuth(Mapping):
name(str): name of the provider. Must be valid class defined in aioauth-client library
Returns:
Type[aioauth_client.OAuth2Client]: loaded provider type
type[aioauth_client.OAuth2Client]: loaded provider type
Raises:
InvalidOption: in case if invalid OAuth provider name supplied
"""
provider: Type[aioauth_client.OAuth2Client] = getattr(aioauth_client, name)
provider: type[aioauth_client.OAuth2Client] = getattr(aioauth_client, name)
try:
is_oauth2_client = issubclass(provider, aioauth_client.OAuth2Client)
except TypeError: # what if it is random string?
@ -114,7 +112,7 @@ class OAuth(Mapping):
uri: str = client.get_authorize_url(scope=self.scopes, redirect_uri=self.redirect_uri)
return uri
async def get_oauth_username(self, code: str) -> Optional[str]:
async def get_oauth_username(self, code: str) -> str | None:
"""
extract OAuth username from remote
@ -122,7 +120,7 @@ class OAuth(Mapping):
code(str): authorization code provided by external service
Returns:
Optional[str]: username as is in OAuth provider
str | None: username as is in OAuth provider
"""
try:
client = self.get_client()
@ -130,7 +128,7 @@ class OAuth(Mapping):
client.access_token = access_token
user, _ = await client.user_info()
username: str = user.email # type: ignore
username: str = user.email # type: ignore[attr-defined]
return username
except Exception:
self.logger.exception("got exception while performing request")

View File

@ -20,7 +20,6 @@
import shutil
from pathlib import Path
from typing import List, Optional
from ahriman.core.log import LazyLogging
from ahriman.core.util import check_output, utcnow, walk
@ -44,7 +43,7 @@ class Sources(LazyLogging):
_check_output = check_output
@staticmethod
def extend_architectures(sources_dir: Path, architecture: str) -> List[PkgbuildPatch]:
def extend_architectures(sources_dir: Path, architecture: str) -> list[PkgbuildPatch]:
"""
extend existing PKGBUILD with repository architecture
@ -53,7 +52,7 @@ class Sources(LazyLogging):
architecture(str): repository architecture
Returns:
List[PkgbuildPatch]: generated patch for PKGBUILD architectures if required
list[PkgbuildPatch]: generated patch for PKGBUILD architectures if required
"""
architectures = Package.supported_architectures(sources_dir)
if "any" in architectures: # makepkg does not like when there is any other arch except for any
@ -62,13 +61,13 @@ class Sources(LazyLogging):
return [PkgbuildPatch("arch", list(architectures))]
@staticmethod
def fetch(sources_dir: Path, remote: Optional[RemoteSource]) -> None:
def fetch(sources_dir: Path, remote: RemoteSource | None) -> None:
"""
either clone repository or update it to origin/``remote.branch``
Args:
sources_dir(Path): local path to fetch
remote(Optional[RemoteSource]): remote target (from where to fetch)
remote(RemoteSource | None): remote target (from where to fetch)
"""
instance = Sources()
# local directory exists and there is .git directory
@ -127,14 +126,14 @@ class Sources(LazyLogging):
cwd=sources_dir, logger=instance.logger)
@staticmethod
def load(sources_dir: Path, package: Package, patches: List[PkgbuildPatch], paths: RepositoryPaths) -> None:
def load(sources_dir: Path, package: Package, patches: list[PkgbuildPatch], paths: RepositoryPaths) -> None:
"""
fetch sources from remote and apply patches
Args:
sources_dir(Path): local path to fetch
package(Package): package definitions
patches(List[PkgbuildPatch]): optional patch to be applied
patches(list[PkgbuildPatch]): optional patch to be applied
paths(RepositoryPaths): repository paths instance
"""
instance = Sources()
@ -165,7 +164,7 @@ class Sources(LazyLogging):
return f"{diff}\n" # otherwise, patch will be broken
@staticmethod
def push(sources_dir: Path, remote: RemoteSource, *pattern: str, commit_author: Optional[str] = None) -> None:
def push(sources_dir: Path, remote: RemoteSource, *pattern: str, commit_author: str | None = None) -> None:
"""
commit selected changes and push files to the remote repository
@ -173,7 +172,7 @@ class Sources(LazyLogging):
sources_dir(Path): local path to git repository
remote(RemoteSource): remote target, branch and url
*pattern(str): glob patterns
commit_author(Optional[str], optional): commit author in form of git config (i.e. ``user <user@host>``)
commit_author(str | None, optional): commit author in form of git config (i.e. ``user <user@host>``)
(Default value = None)
"""
instance = Sources()
@ -192,7 +191,7 @@ class Sources(LazyLogging):
--intent-to-add git flag (Default value = False)
"""
# glob directory to find files which match the specified patterns
found_files: List[Path] = []
found_files: list[Path] = []
for glob in pattern:
found_files.extend(sources_dir.glob(glob))
if not found_files:
@ -203,15 +202,15 @@ class Sources(LazyLogging):
Sources._check_output("git", "add", *args, *[str(fn.relative_to(sources_dir)) for fn in found_files],
cwd=sources_dir, logger=self.logger)
def commit(self, sources_dir: Path, message: Optional[str] = None, author: Optional[str] = None) -> None:
def commit(self, sources_dir: Path, message: str | None = None, author: str | None = None) -> None:
"""
commit changes
Args:
sources_dir(Path): local path to git repository
message(Optional[str], optional): optional commit message if any. If none set, message will be generated
message(str | None, optional): optional commit message if any. If none set, message will be generated
according to the current timestamp (Default value = None)
author(Optional[str], optional): optional commit author if any (Default value = None)
author(str | None, optional): optional commit author if any (Default value = None)
"""
if message is None:
message = f"Autogenerated commit at {utcnow()}"

View File

@ -18,7 +18,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from pathlib import Path
from typing import List
from ahriman.core.build_tools.sources import Sources
from ahriman.core.configuration import Configuration
@ -35,6 +34,10 @@ class Task(LazyLogging):
base package build task
Attributes:
architecture(str): repository architecture
build_command(str): build command
build_flags(list[str]): list of additional flags to pkgctl
name(str): repository name
package(Package): package definitions
paths(RepositoryPaths): repository paths instance
uid(int): uid of the repository owner user
@ -53,14 +56,15 @@ class Task(LazyLogging):
"""
self.package = package
self.paths = paths
_, self.architecture = configuration.check_loaded()
self.uid, _ = paths.root_owner
self.name = configuration.get("repository", "name")
self.archbuild_flags = configuration.getlist("build", "archbuild_flags", fallback=[])
self.build_command = configuration.get("build", "build_command")
self.makepkg_flags = configuration.getlist("build", "makepkg_flags", fallback=[])
self.makechrootpkg_flags = configuration.getlist("build", "makechrootpkg_flags", fallback=[])
self.build_flags = configuration.getlist("build", "pkgctl_flags", fallback=[])
def build(self, sources_dir: Path) -> List[Path]:
def build(self, sources_dir: Path) -> list[Path]:
"""
run package build
@ -68,12 +72,11 @@ class Task(LazyLogging):
sources_dir(Path): path to where sources are
Returns:
List[Path]: paths of produced packages
list[Path]: paths of produced packages
"""
command = [self.build_command, "-r", str(self.paths.chroot)]
command.extend(self.archbuild_flags)
command.extend(["--"] + self.makechrootpkg_flags)
command.extend(["--"] + self.makepkg_flags)
command = [self.build_command, "build"] + self.build_flags
command.extend(["--arch", self.architecture])
command.extend(["--repo", self.name])
self.logger.info("using %s for %s", command, self.package.base)
Task._check_output(

View File

@ -17,14 +17,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
import configparser
import shlex
import sys
from collections.abc import Callable
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Type
from typing import Any, Self
from ahriman.core.exceptions import InitializeError
from ahriman.models.repository_paths import RepositoryPaths
@ -35,11 +34,11 @@ class Configuration(configparser.RawConfigParser):
extension for built-in configuration parser
Attributes:
ARCHITECTURE_SPECIFIC_SECTIONS(List[str]): (class attribute) known sections which can be architecture specific.
ARCHITECTURE_SPECIFIC_SECTIONS(list[str]): (class attribute) known sections which can be architecture specific.
Required by dump and merging functions
SYSTEM_CONFIGURATION_PATH(Path): (class attribute) default system configuration path distributed by package
architecture(Optional[str]): repository architecture
path(Optional[Path]): path to root configuration file
architecture(str | None): repository architecture
path(Path | None): path to root configuration file
Examples:
Configuration class provides additional method in order to handle application configuration. Since this class is
@ -63,7 +62,7 @@ class Configuration(configparser.RawConfigParser):
ARCHITECTURE_SPECIFIC_SECTIONS = ["alpm", "build", "sign", "web"]
SYSTEM_CONFIGURATION_PATH = Path(sys.prefix) / "share" / "ahriman" / "settings" / "ahriman.ini"
converters: Dict[str, Callable[[str], Any]] # typing guard
converters: dict[str, Callable[[str], Any]] # typing guard
def __init__(self, allow_no_value: bool = False) -> None:
"""
@ -77,8 +76,8 @@ class Configuration(configparser.RawConfigParser):
"list": shlex.split,
"path": self._convert_path,
})
self.architecture: Optional[str] = None
self.path: Optional[Path] = None
self.architecture: str | None = None
self.path: Path | None = None
@property
def include(self) -> Path:
@ -112,7 +111,7 @@ class Configuration(configparser.RawConfigParser):
return RepositoryPaths(self.getpath("repository", "root"), architecture)
@classmethod
def from_path(cls: Type[Configuration], path: Path, architecture: str) -> Configuration:
def from_path(cls, path: Path, architecture: str) -> Self:
"""
constructor with full object initialization
@ -121,7 +120,7 @@ class Configuration(configparser.RawConfigParser):
architecture(str): repository architecture
Returns:
Configuration: configuration instance
Self: configuration instance
"""
configuration = cls()
configuration.load(path)
@ -157,12 +156,12 @@ class Configuration(configparser.RawConfigParser):
return path
return self.path.parent / path
def check_loaded(self) -> Tuple[Path, str]:
def check_loaded(self) -> tuple[Path, str]:
"""
check if service was actually loaded
Returns:
Tuple[Path, str]: configuration root path and architecture if loaded
tuple[Path, str]: configuration root path and architecture if loaded
Raises:
InitializeError: in case if architecture and/or path are not set
@ -171,12 +170,12 @@ class Configuration(configparser.RawConfigParser):
raise InitializeError("Configuration path and/or architecture are not set")
return self.path, self.architecture
def dump(self) -> Dict[str, Dict[str, str]]:
def dump(self) -> dict[str, dict[str, str]]:
"""
dump configuration to dictionary
Returns:
Dict[str, Dict[str, str]]: configuration dump for specific architecture
dict[str, dict[str, str]]: configuration dump for specific architecture
"""
return {
section: dict(self[section])
@ -185,11 +184,11 @@ class Configuration(configparser.RawConfigParser):
# pylint and mypy are too stupid to find these methods
# pylint: disable=missing-function-docstring,multiple-statements,unused-argument
def getlist(self, *args: Any, **kwargs: Any) -> List[str]: ... # type: ignore
def getlist(self, *args: Any, **kwargs: Any) -> list[str]: ... # type: ignore[empty-body]
def getpath(self, *args: Any, **kwargs: Any) -> Path: ... # type: ignore
def getpath(self, *args: Any, **kwargs: Any) -> Path: ... # type: ignore[empty-body]
def gettype(self, section: str, architecture: str, *, fallback: Optional[str] = None) -> Tuple[str, str]:
def gettype(self, section: str, architecture: str, *, fallback: str | None = None) -> tuple[str, str]:
"""
get type variable with fallback to old logic. Despite the fact that it has same semantics as other get* methods,
but it has different argument list
@ -197,11 +196,11 @@ class Configuration(configparser.RawConfigParser):
Args:
section(str): section name
architecture(str): repository architecture
fallback(Optional[str], optional): optional fallback type if any. If set, second element of the tuple will
fallback(str | None, optional): optional fallback type if any. If set, second element of the tuple will
be always set to this value (Default value = None)
Returns:
Tuple[str, str]: section name and found type name
tuple[str, str]: section name and found type name
Raises:
configparser.NoSectionError: in case if no section found

View File

@ -17,13 +17,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Any, Dict
from typing import Any
__all__ = ["CONFIGURATION_SCHEMA", "ConfigurationSchema"]
ConfigurationSchema = Dict[str, Dict[str, Any]]
ConfigurationSchema = dict[str, dict[str, Any]]
CONFIGURATION_SCHEMA: ConfigurationSchema = {
@ -134,11 +134,6 @@ CONFIGURATION_SCHEMA: ConfigurationSchema = {
"build": {
"type": "dict",
"schema": {
"archbuild_flags": {
"type": "list",
"coerce": "list",
"schema": {"type": "string"},
},
"build_command": {
"type": "string",
"required": True,
@ -148,12 +143,7 @@ CONFIGURATION_SCHEMA: ConfigurationSchema = {
"coerce": "list",
"schema": {"type": "string"},
},
"makepkg_flags": {
"type": "list",
"coerce": "list",
"schema": {"type": "string"},
},
"makechrootpkg_flags": {
"pkgctl_flags": {
"type": "list",
"coerce": "list",
"schema": {"type": "string"},

View File

@ -19,9 +19,9 @@
#
import ipaddress
from cerberus import TypeDefinition, Validator as RootValidator # type: ignore
from cerberus import TypeDefinition, Validator as RootValidator # type: ignore[import]
from pathlib import Path
from typing import Any, List
from typing import Any
from urllib.parse import urlparse
from ahriman.core.configuration import Configuration
@ -74,7 +74,7 @@ class Validator(RootValidator):
bool: value converted to boolean according to configuration rules
"""
# pylint: disable=protected-access
converted: bool = self.configuration._convert_to_boolean(value) # type: ignore
converted: bool = self.configuration._convert_to_boolean(value) # type: ignore[attr-defined]
return converted
def _normalize_coerce_integer(self, value: str) -> int:
@ -90,7 +90,7 @@ class Validator(RootValidator):
del self
return int(value)
def _normalize_coerce_list(self, value: str) -> List[str]:
def _normalize_coerce_list(self, value: str) -> list[str]:
"""
extract string list from string value
@ -98,17 +98,17 @@ class Validator(RootValidator):
value(str): converting value
Returns:
List[str]: value converted to string list instance according to configuration rules
list[str]: value converted to string list instance according to configuration rules
"""
converted: List[str] = self.configuration.converters["list"](value)
converted: list[str] = self.configuration.converters["list"](value)
return converted
def _validate_is_ip_address(self, constraint: List[str], field: str, value: str) -> None:
def _validate_is_ip_address(self, constraint: list[str], field: str, value: str) -> None:
"""
check if the specified value is valid ip address
Args:
constraint(List[str]): optional list of allowed special words (e.g. ``localhost``)
constraint(list[str]): optional list of allowed special words (e.g. ``localhost``)
field(str): field name to be checked
value(Path): value to be checked
@ -123,12 +123,12 @@ class Validator(RootValidator):
except ValueError:
self._error(field, f"Value {value} must be valid IP address")
def _validate_is_url(self, constraint: List[str], field: str, value: str) -> None:
def _validate_is_url(self, constraint: list[str], field: str, value: str) -> None:
"""
check if the specified value is a valid url
Args:
constraint(List[str]): optional list of supported schemas. If empty, no schema validation will be performed
constraint(list[str]): optional list of supported schemas. If empty, no schema validation will be performed
field(str): field name to be checked
value(str): value to be checked

View File

@ -17,11 +17,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from collections.abc import Callable
from importlib import import_module
from pathlib import Path
from pkgutil import iter_modules
from sqlite3 import Connection, Cursor
from typing import Callable, List
from ahriman.core.configuration import Configuration
from ahriman.core.log import LazyLogging
@ -83,22 +83,22 @@ class Migrations(LazyLogging):
"data migration %s at index %s has been performed",
migration.name, migration.index)
def migrations(self) -> List[Migration]:
def migrations(self) -> list[Migration]:
"""
extract all migrations from the current package
idea comes from https://julienharbulot.com/python-dynamical-import.html
Returns:
List[Migration]: list of found migrations
list[Migration]: list of found migrations
"""
migrations: List[Migration] = []
migrations: list[Migration] = []
package_dir = Path(__file__).resolve().parent
modules = [module_name for (_, module_name, _) in iter_modules([str(package_dir)])]
for index, module_name in enumerate(sorted(modules)):
module = import_module(f"{__name__}.{module_name}")
steps: List[str] = getattr(module, "steps", [])
steps: list[str] = getattr(module, "steps", [])
self.logger.debug("found migration %s at index %s with steps count %s", module_name, index, len(steps))
migrate_data: Callable[[Connection, Configuration], None] = \

View File

@ -23,6 +23,7 @@ from ahriman.core.alpm.pacman import Pacman
from ahriman.core.configuration import Configuration
from ahriman.core.util import package_like
from ahriman.models.package import Package
from ahriman.models.pacman_synchronization import PacmanSynchronization
__all__ = ["migrate_data", "steps"]
@ -61,7 +62,7 @@ def migrate_package_depends(connection: Connection, configuration: Configuration
return
_, architecture = configuration.check_loaded()
pacman = Pacman(architecture, configuration, refresh_database=False)
pacman = Pacman(architecture, configuration, refresh_database=PacmanSynchronization.Disabled)
package_list = []
for full_path in filter(package_like, configuration.repository_paths.repository.iterdir()):

View File

@ -18,7 +18,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from sqlite3 import Connection
from typing import List, Optional
from ahriman.core.database.operations import Operations
from ahriman.models.user import User
@ -30,7 +29,7 @@ class AuthOperations(Operations):
authorization operations
"""
def user_get(self, username: str) -> Optional[User]:
def user_get(self, username: str) -> User | None:
"""
get user by username
@ -38,25 +37,25 @@ class AuthOperations(Operations):
username(str): username
Returns:
Optional[User]: user if it was found
User | None: user if it was found
"""
return next(iter(self.user_list(username, None)), None)
def user_list(self, username: Optional[str], access: Optional[UserAccess]) -> List[User]:
def user_list(self, username: str | None, access: UserAccess | None) -> list[User]:
"""
get users by filter
Args:
username(Optional[str]): optional filter by username
access(Optional[UserAccess]): optional filter by role
username(str | None): optional filter by username
access(UserAccess | None): optional filter by role
Returns:
List[User]: list of users who match criteria
list[User]: list of users who match criteria
"""
username_filter = username.lower() if username is not None else username
access_filter = access.value if access is not None else access
def run(connection: Connection) -> List[User]:
def run(connection: Connection) -> list[User]:
return [
User(username=cursor["username"], password=cursor["password"], access=UserAccess(cursor["access"]))
for cursor in connection.execute(

View File

@ -18,7 +18,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from sqlite3 import Connection
from typing import List, Optional
from ahriman.core.database.operations import Operations
from ahriman.models.package import Package
@ -29,12 +28,12 @@ class BuildOperations(Operations):
operations for build queue functions
"""
def build_queue_clear(self, package_base: Optional[str]) -> None:
def build_queue_clear(self, package_base: str | None) -> None:
"""
remove packages from build queue
Args:
package_base(Optional[str]): optional filter by package base
package_base(str | None): optional filter by package base
"""
def run(connection: Connection) -> None:
connection.execute(
@ -46,14 +45,14 @@ class BuildOperations(Operations):
return self.with_connection(run, commit=True)
def build_queue_get(self) -> List[Package]:
def build_queue_get(self) -> list[Package]:
"""
retrieve packages from build queue
Return:
List[Package]: list of packages to be built
list[Package]: list of packages to be built
"""
def run(connection: Connection) -> List[Package]:
def run(connection: Connection) -> list[Package]:
return [
Package.from_json(row["properties"])
for row in connection.execute("""select * from build_queue""")

View File

@ -18,7 +18,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from sqlite3 import Connection
from typing import List, Optional
from ahriman.core.database.operations import Operations
from ahriman.core.util import pretty_datetime
@ -40,7 +39,7 @@ class LogsOperations(Operations):
Return:
str: full package log
"""
def run(connection: Connection) -> List[str]:
def run(connection: Connection) -> list[str]:
return [
f"""[{pretty_datetime(row["created"])}] {row["record"]}"""
for row in connection.execute(
@ -81,13 +80,13 @@ class LogsOperations(Operations):
return self.with_connection(run, commit=True)
def logs_remove(self, package_base: str, current_process_id: Optional[int]) -> None:
def logs_remove(self, package_base: str, current_process_id: int | None) -> None:
"""
remove log records for the specified package
Args:
package_base(str): package base to remove logs
current_process_id(Optional[int]): current process id. If set it will remove only logs belonging to another
current_process_id(int | None): current process id. If set it will remove only logs belonging to another
process
"""
def run(connection: Connection) -> None:

View File

@ -19,8 +19,9 @@
#
import sqlite3
from collections.abc import Callable
from pathlib import Path
from typing import Any, Callable, Dict, Tuple, TypeVar
from typing import Any, TypeVar
from ahriman.core.log import LazyLogging
@ -46,16 +47,16 @@ class Operations(LazyLogging):
self.path = path
@staticmethod
def factory(cursor: sqlite3.Cursor, row: Tuple[Any, ...]) -> Dict[str, Any]:
def factory(cursor: sqlite3.Cursor, row: tuple[Any, ...]) -> dict[str, Any]:
"""
dictionary factory based on official documentation
Args:
cursor(Cursor): cursor descriptor
row(Tuple[Any, ...]): fetched row
row(tuple[Any, ...]): fetched row
Returns:
Dict[str, Any]: row converted to dictionary
dict[str, Any]: row converted to dictionary
"""
result = {}
for index, column in enumerate(cursor.description):

View File

@ -17,8 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from collections.abc import Generator, Iterable
from sqlite3 import Connection
from typing import Dict, Generator, Iterable, List, Tuple
from ahriman.core.database.operations import Operations
from ahriman.models.build_status import BuildStatus
@ -148,7 +148,7 @@ class PackageOperations(Operations):
{"package_base": package_base, "status": status.status.value, "last_updated": status.timestamp})
@staticmethod
def _packages_get_select_package_bases(connection: Connection) -> Dict[str, Package]:
def _packages_get_select_package_bases(connection: Connection) -> dict[str, Package]:
"""
select package bases from the table
@ -156,7 +156,7 @@ class PackageOperations(Operations):
connection(Connection): database connection
Returns:
Dict[str, Package]: map of the package base to its descriptor (without packages themselves)
dict[str, Package]: map of the package base to its descriptor (without packages themselves)
"""
return {
row["package_base"]: Package(
@ -168,16 +168,16 @@ class PackageOperations(Operations):
}
@staticmethod
def _packages_get_select_packages(connection: Connection, packages: Dict[str, Package]) -> Dict[str, Package]:
def _packages_get_select_packages(connection: Connection, packages: dict[str, Package]) -> dict[str, Package]:
"""
select packages from the table
Args:
connection(Connection): database connection
packages(Dict[str, Package]): packages descriptor map
packages(dict[str, Package]): packages descriptor map
Returns:
Dict[str, Package]: map of the package base to its descriptor including individual packages
dict[str, Package]: map of the package base to its descriptor including individual packages
"""
for row in connection.execute("""select * from packages"""):
if row["package_base"] not in packages:
@ -186,7 +186,7 @@ class PackageOperations(Operations):
return packages
@staticmethod
def _packages_get_select_statuses(connection: Connection) -> Dict[str, BuildStatus]:
def _packages_get_select_statuses(connection: Connection) -> dict[str, BuildStatus]:
"""
select package build statuses from the table
@ -194,7 +194,7 @@ class PackageOperations(Operations):
connection(Connection): database connection
Returns:
Dict[str, BuildStatus]: map of the package base to its status
dict[str, BuildStatus]: map of the package base to its status
"""
return {
row["package_base"]: BuildStatus.from_json({"status": row["status"], "timestamp": row["last_updated"]})
@ -230,14 +230,14 @@ class PackageOperations(Operations):
return self.with_connection(run, commit=True)
def packages_get(self) -> List[Tuple[Package, BuildStatus]]:
def packages_get(self) -> list[tuple[Package, BuildStatus]]:
"""
get package list and their build statuses from database
Return:
List[Tuple[Package, BuildStatus]]: list of package properties and their statuses
list[tuple[Package, BuildStatus]]: list of package properties and their statuses
"""
def run(connection: Connection) -> Generator[Tuple[Package, BuildStatus], None, None]:
def run(connection: Connection) -> Generator[tuple[Package, BuildStatus], None, None]:
packages = self._packages_get_select_package_bases(connection)
statuses = self._packages_get_select_statuses(connection)
for package_base, package in self._packages_get_select_packages(connection, packages).items():
@ -256,12 +256,12 @@ class PackageOperations(Operations):
lambda connection: self._package_update_insert_base(connection, package),
commit=True)
def remotes_get(self) -> Dict[str, RemoteSource]:
def remotes_get(self) -> dict[str, RemoteSource]:
"""
get packages remotes based on current settings
Returns:
Dict[str, RemoteSource]: map of package base to its remote sources
dict[str, RemoteSource]: map of package base to its remote sources
"""
packages = self.with_connection(self._packages_get_select_package_bases)
return {

View File

@ -20,7 +20,6 @@
from collections import defaultdict
from sqlite3 import Connection
from typing import Dict, List, Optional, Tuple
from ahriman.core.database.operations import Operations
from ahriman.models.pkgbuild_patch import PkgbuildPatch
@ -31,7 +30,7 @@ class PatchOperations(Operations):
operations for patches
"""
def patches_get(self, package_base: str) -> List[PkgbuildPatch]:
def patches_get(self, package_base: str) -> list[PkgbuildPatch]:
"""
retrieve patches for the package
@ -39,7 +38,7 @@ class PatchOperations(Operations):
package_base(str): package base to search for patches
Returns:
List[PkgbuildPatch]: plain text patch for the package
list[PkgbuildPatch]: plain text patch for the package
"""
return self.patches_list(package_base, []).get(package_base, [])
@ -65,18 +64,18 @@ class PatchOperations(Operations):
return self.with_connection(run, commit=True)
def patches_list(self, package_base: Optional[str], variables: List[str]) -> Dict[str, List[PkgbuildPatch]]:
def patches_list(self, package_base: str | None, variables: list[str]) -> dict[str, list[PkgbuildPatch]]:
"""
extract all patches
Args:
package_base(Optional[str]): optional filter by package base
variables(List[str]): extract patches only for specified PKGBUILD variables
package_base(str | None): optional filter by package base
variables(list[str]): extract patches only for specified PKGBUILD variables
Returns:
Dict[str, List[PkgbuildPatch]]: map of package base to patch content
dict[str, list[PkgbuildPatch]]: map of package base to patch content
"""
def run(connection: Connection) -> List[Tuple[str, PkgbuildPatch]]:
def run(connection: Connection) -> list[tuple[str, PkgbuildPatch]]:
return [
(cursor["package_base"], PkgbuildPatch(cursor["variable"], cursor["patch"]))
for cursor in connection.execute(
@ -85,20 +84,20 @@ class PatchOperations(Operations):
]
# we could use itertools & operator but why?
patches: Dict[str, List[PkgbuildPatch]] = defaultdict(list)
patches: dict[str, list[PkgbuildPatch]] = defaultdict(list)
for package, patch in self.with_connection(run):
if variables and patch.key not in variables:
continue
patches[package].append(patch)
return dict(patches)
def patches_remove(self, package_base: str, variables: List[str]) -> None:
def patches_remove(self, package_base: str, variables: list[str]) -> None:
"""
remove patch set
Args:
package_base(str): package base to clear patches
variables(List[str]): remove patches only for specified PKGBUILD variables
variables(list[str]): remove patches only for specified PKGBUILD variables
"""
def run_many(connection: Connection) -> None:
connection.executemany(

View File

@ -17,13 +17,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Type
from typing import Self
from ahriman.core.configuration import Configuration
from ahriman.core.database.migrations import Migrations
@ -47,7 +45,7 @@ class SQLite(AuthOperations, BuildOperations, LogsOperations, PackageOperations,
"""
@classmethod
def load(cls: Type[SQLite], configuration: Configuration) -> SQLite:
def load(cls, configuration: Configuration) -> Self:
"""
construct instance from configuration
@ -55,7 +53,7 @@ class SQLite(AuthOperations, BuildOperations, LogsOperations, PackageOperations,
configuration(Configuration): configuration instance
Returns:
SQLite: fully initialized instance of the database
Self: fully initialized instance of the database
"""
path = cls.database_path(configuration)
database = cls(path)

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import List
from ahriman.core.formatters import StringPrinter
from ahriman.core.util import pretty_datetime
from ahriman.models.aur_package import AURPackage
@ -43,12 +41,12 @@ class AurPrinter(StringPrinter):
StringPrinter.__init__(self, f"{package.name} {package.version} ({package.num_votes})")
self.package = package
def properties(self) -> List[Property]:
def properties(self) -> list[Property]:
"""
convert content into printable data
Returns:
List[Property]: list of content properties
list[Property]: list of content properties
"""
return [
Property("Package base", self.package.package_base),

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Dict, List
from ahriman.core.formatters import StringPrinter
from ahriman.models.property import Property
@ -28,9 +26,9 @@ class ConfigurationPrinter(StringPrinter):
print content of the configuration section
Attributes:
HIDE_KEYS(List[str]): (class attribute) hide values for mentioned keys. This list must be used in order to hide
HIDE_KEYS(list[str]): (class attribute) hide values for mentioned keys. This list must be used in order to hide
passwords from outputs
values(Dict[str, str]): configuration values dictionary
values(dict[str, str]): configuration values dictionary
"""
HIDE_KEYS = [
@ -42,23 +40,23 @@ class ConfigurationPrinter(StringPrinter):
"secret_key", # aws secret key
]
def __init__(self, section: str, values: Dict[str, str]) -> None:
def __init__(self, section: str, values: dict[str, str]) -> None:
"""
default constructor
Args:
section(str): section name
values(Dict[str, str]): configuration values dictionary
values(dict[str, str]): configuration values dictionary
"""
StringPrinter.__init__(self, f"[{section}]")
self.values = values
def properties(self) -> List[Property]:
def properties(self) -> list[Property]:
"""
convert content into printable data
Returns:
List[Property]: list of content properties
list[Property]: list of content properties
"""
return [
Property(key, value, is_required=key not in self.HIDE_KEYS)

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import List
from ahriman.core.formatters import StringPrinter
from ahriman.models.build_status import BuildStatus
from ahriman.models.package import Package
@ -46,12 +44,12 @@ class PackagePrinter(StringPrinter):
self.package = package
self.status = status
def properties(self) -> List[Property]:
def properties(self) -> list[Property]:
"""
convert content into printable data
Returns:
List[Property]: list of content properties
list[Property]: list of content properties
"""
return [
Property("Version", self.package.version, is_required=True),

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import List
from ahriman.core.formatters import StringPrinter
from ahriman.models.pkgbuild_patch import PkgbuildPatch
from ahriman.models.property import Property
@ -29,26 +27,26 @@ class PatchPrinter(StringPrinter):
print content of the PKGBUILD patch
Attributes:
patches(List[PkgbuildPatch]): PKGBUILD patch object
patches(list[PkgbuildPatch]): PKGBUILD patch object
"""
def __init__(self, package_base: str, patches: List[PkgbuildPatch]) -> None:
def __init__(self, package_base: str, patches: list[PkgbuildPatch]) -> None:
"""
default constructor
Args:
package_base(str): package base
patches(List[PkgbuildPatch]): PKGBUILD patch object
patches(list[PkgbuildPatch]): PKGBUILD patch object
"""
StringPrinter.__init__(self, package_base)
self.patches = patches
def properties(self) -> List[Property]:
def properties(self) -> list[Property]:
"""
convert content into printable data
Returns:
List[Property]: list of content properties
list[Property]: list of content properties
"""
return [
Property(patch.key or "Full source diff", patch.value, is_required=True)

View File

@ -17,7 +17,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Callable, List, Optional
from collections.abc import Callable
from ahriman.models.property import Property
@ -44,19 +44,19 @@ class Printer:
indent = "\t" * prop.indent
log_fn(f"{indent}{prop.name}{separator}{prop.value}")
def properties(self) -> List[Property]:
def properties(self) -> list[Property]:
"""
convert content into printable data
Returns:
List[Property]: list of content properties
list[Property]: list of content properties
"""
return []
def title(self) -> Optional[str]:
def title(self) -> str | None:
"""
generate entry title from content
Returns:
Optional[str]: content title if it can be generated and None otherwise
str | None: content title if it can be generated and None otherwise
"""

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Optional
from ahriman.core.formatters import Printer
@ -39,11 +37,11 @@ class StringPrinter(Printer):
"""
self.content = content
def title(self) -> Optional[str]:
def title(self) -> str | None:
"""
generate entry title from content
Returns:
Optional[str]: content title if it can be generated and None otherwise
str | None: content title if it can be generated and None otherwise
"""
return self.content

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Iterable, List
from ahriman.core.formatters import StringPrinter
from ahriman.models.package import Package
from ahriman.models.property import Property
@ -29,25 +27,25 @@ class TreePrinter(StringPrinter):
print content of the package tree level
Attributes:
packages(Iterable[Package]): packages which belong to this level
packages(list[Package]): packages which belong to this level
"""
def __init__(self, level: int, packages: Iterable[Package]) -> None:
def __init__(self, level: int, packages: list[Package]) -> None:
"""
default constructor
Args:
level(int): dependencies tree level
packages(Iterable[Package]): packages which belong to this level
packages(list[Package]): packages which belong to this level
"""
StringPrinter.__init__(self, f"level {level}")
self.packages = packages
def properties(self) -> List[Property]:
def properties(self) -> list[Property]:
"""
convert content into printable data
Returns:
List[Property]: list of content properties
list[Property]: list of content properties
"""
return [Property(package.base, package.version, is_required=True) for package in self.packages]

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import List, Optional
from ahriman.core.formatters import StringPrinter
from ahriman.models.package import Package
from ahriman.models.property import Property
@ -30,26 +28,26 @@ class UpdatePrinter(StringPrinter):
Attributes:
package(Package): remote (new) package object
local_version(Optional[str]): local version of the package if any
local_version(str | None): local version of the package if any
"""
def __init__(self, remote: Package, local_version: Optional[str]) -> None:
def __init__(self, remote: Package, local_version: str | None) -> None:
"""
default constructor
Args:
remote(Package): remote (new) package object
local_version(Optional[str]): local version of the package if any
local_version(str | None): local version of the package if any
"""
StringPrinter.__init__(self, remote.base)
self.package = remote
self.local_version = local_version or "N/A"
def properties(self) -> List[Property]:
def properties(self) -> list[Property]:
"""
convert content into printable data
Returns:
List[Property]: list of content properties
list[Property]: list of content properties
"""
return [Property(self.local_version, self.package.version, is_required=True)]

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import List
from ahriman.core.formatters import StringPrinter
from ahriman.models.property import Property
from ahriman.models.user import User
@ -42,11 +40,11 @@ class UserPrinter(StringPrinter):
StringPrinter.__init__(self, user.username)
self.user = user
def properties(self) -> List[Property]:
def properties(self) -> list[Property]:
"""
convert content into printable data
Returns:
List[Property]: list of content properties
list[Property]: list of content properties
"""
return [Property("role", self.user.access.value, is_required=True)]

View File

@ -17,7 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Any, Dict, Generator, List, Union
from collections.abc import Generator
from typing import Any
from ahriman.core.formatters import StringPrinter
from ahriman.models.property import Property
@ -29,30 +30,30 @@ class ValidationPrinter(StringPrinter):
Attributes:
node(str): root level name
errors(List[Union[str, Dict[str, Any]]]): validation errors
errors(list[str | dict[str, Any]]): validation errors
"""
def __init__(self, node: str, errors: List[Union[str, Dict[str, Any]]]) -> None:
def __init__(self, node: str, errors: list[str | dict[str, Any]]) -> None:
"""
default constructor
Args:
node(str): root level name
errors(List[Union[str, Dict[str, Any]]]): validation errors
errors(list[str | dict[str, Any]]): validation errors
"""
StringPrinter.__init__(self, node)
self.node = node
self.errors = errors
@staticmethod
def get_error_messages(node: str, errors: List[Union[str, Dict[str, Any]]],
def get_error_messages(node: str, errors: list[str | dict[str, Any]],
current_level: int = 1) -> Generator[Property, None, None]:
"""
extract default error message from cerberus class
Args:
node(str): current node level name
errors(List[Union[str, Dict[str, Any]]]): current node validation errors
errors(list[str | dict[str, Any]]): current node validation errors
current_level(int, optional): current level number (Default value = 1)
Yields:
@ -67,11 +68,11 @@ class ValidationPrinter(StringPrinter):
else: # current node errors
yield Property(node, error, is_required=True, indent=current_level)
def properties(self) -> List[Property]:
def properties(self) -> list[Property]:
"""
convert content into printable data
Returns:
List[Property]: list of content properties
list[Property]: list of content properties
"""
return list(self.get_error_messages(self.node, self.errors))

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Dict, List
from ahriman.core.formatters import StringPrinter
from ahriman.models.property import Property
@ -28,26 +26,26 @@ class VersionPrinter(StringPrinter):
print content of the python package versions
Attributes:
packages(Dict[str, str]): map of package name to its version
packages(dict[str, str]): map of package name to its version
"""
def __init__(self, title: str, packages: Dict[str, str]) -> None:
def __init__(self, title: str, packages: dict[str, str]) -> None:
"""
default constructor
Args:
title(str): title of the message
packages(Dict[str, str]): map of package name to its version
packages(dict[str, str]): map of package name to its version
"""
StringPrinter.__init__(self, title)
self.packages = packages
def properties(self) -> List[Property]:
def properties(self) -> list[Property]:
"""
convert content into printable data
Returns:
List[Property]: list of content properties
list[Property]: list of content properties
"""
return [
Property(package, version, is_required=True)

View File

@ -61,7 +61,8 @@ class RemotePull(LazyLogging):
"""
clone repository from remote source
"""
with TemporaryDirectory(ignore_cleanup_errors=True) as dir_name, (clone_dir := Path(dir_name)):
with TemporaryDirectory(ignore_cleanup_errors=True) as dir_name:
clone_dir = Path(dir_name)
Sources.fetch(clone_dir, self.remote_source)
self.repo_copy(clone_dir)

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import List, Type
from ahriman.core.configuration import Configuration
from ahriman.core.gitremote.remote_pull import RemotePull
from ahriman.core.triggers import Trigger
@ -29,7 +27,7 @@ class RemotePullTrigger(Trigger):
trigger based on pulling PKGBUILDs before the actions
Attributes:
targets(List[str]): git remote target list
targets(list[str]): git remote target list
"""
CONFIGURATION_SCHEMA = {
@ -70,7 +68,7 @@ class RemotePullTrigger(Trigger):
self.targets = self.configuration_sections(configuration)
@classmethod
def configuration_sections(cls: Type[Trigger], configuration: Configuration) -> List[str]:
def configuration_sections(cls, configuration: Configuration) -> list[str]:
"""
extract configuration sections from configuration
@ -78,7 +76,7 @@ class RemotePullTrigger(Trigger):
configuration(Configuration): configuration instance
Returns:
List[str]: read configuration sections belong to this trigger
list[str]: read configuration sections belong to this trigger
"""
return configuration.getlist("remote-pull", "target", fallback=[])

View File

@ -19,9 +19,9 @@
#
import shutil
from collections.abc import Generator
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Generator
from ahriman.core.build_tools.sources import Sources
from ahriman.core.configuration import Configuration
@ -39,7 +39,7 @@ class RemotePush(LazyLogging):
sync PKGBUILDs to remote repository after actions
Attributes:
commit_author(Optional[str]): optional commit author in form of git config (i.e. ``user <user@host>``)
commit_author(str | None): optional commit author in form of git config (i.e. ``user <user@host>``)
database(SQLite): database instance
remote_source(RemoteSource): repository remote source (remote pull url and branch)
"""
@ -115,7 +115,8 @@ class RemotePush(LazyLogging):
result(Result): build result
"""
try:
with TemporaryDirectory(ignore_cleanup_errors=True) as dir_name, (clone_dir := Path(dir_name)):
with TemporaryDirectory(ignore_cleanup_errors=True) as dir_name:
clone_dir = Path(dir_name)
Sources.fetch(clone_dir, self.remote_source)
Sources.push(clone_dir, self.remote_source, *self.packages_update(result, clone_dir),
commit_author=self.commit_author)

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Iterable, List, Type
from ahriman.core import context
from ahriman.core.configuration import Configuration
from ahriman.core.database import SQLite
@ -34,7 +32,7 @@ class RemotePushTrigger(Trigger):
trigger for syncing PKGBUILDs to remote repository
Attributes:
targets(List[str]): git remote target list
targets(list[str]): git remote target list
"""
CONFIGURATION_SCHEMA = {
@ -78,7 +76,7 @@ class RemotePushTrigger(Trigger):
self.targets = self.configuration_sections(configuration)
@classmethod
def configuration_sections(cls: Type[Trigger], configuration: Configuration) -> List[str]:
def configuration_sections(cls, configuration: Configuration) -> list[str]:
"""
extract configuration sections from configuration
@ -86,17 +84,17 @@ class RemotePushTrigger(Trigger):
configuration(Configuration): configuration instance
Returns:
List[str]: read configuration sections belong to this trigger
list[str]: read configuration sections belong to this trigger
"""
return configuration.getlist("remote-push", "target", fallback=[])
def on_result(self, result: Result, packages: Iterable[Package]) -> None:
def on_result(self, result: Result, packages: list[Package]) -> None:
"""
trigger action which will be called after build process with process result
Args:
result(Result): build result
packages(Iterable[Package]): list of all available packages
packages(list[Package]): list of all available packages
Raises:
GitRemoteError: if database is not set in context

View File

@ -17,10 +17,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import annotations
import logging
from typing import Self
from ahriman.core.configuration import Configuration
@ -52,7 +52,7 @@ class HttpLogHandler(logging.Handler):
self.suppress_errors = suppress_errors
@classmethod
def load(cls, configuration: Configuration, *, report: bool) -> HttpLogHandler:
def load(cls, configuration: Configuration, *, report: bool) -> Self:
"""
install logger. This function creates handler instance and adds it to the handler list in case if no other
http handler found
@ -60,6 +60,9 @@ class HttpLogHandler(logging.Handler):
Args:
configuration(Configuration): configuration instance
report(bool): force enable or disable reporting
Returns:
Self: logger instance with loaded settings
"""
root = logging.getLogger()
if (handler := next((handler for handler in root.handlers if isinstance(handler, cls)), None)) is not None:

View File

@ -20,7 +20,8 @@
import contextlib
import logging
from typing import Any, Generator
from collections.abc import Generator
from typing import Any
class LazyLogging:

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Iterable
from ahriman.core.configuration import Configuration
from ahriman.core.formatters import BuildPrinter
from ahriman.core.report.report import Report
@ -46,12 +44,12 @@ class Console(Report):
Report.__init__(self, architecture, configuration)
self.use_utf = configuration.getboolean(section, "use_utf", fallback=True)
def generate(self, packages: Iterable[Package], result: Result) -> None:
def generate(self, packages: list[Package], result: Result) -> None:
"""
generate report for the specified packages
Args:
packages(Iterable[Package]): list of packages to generate report
packages(list[Package]): list of packages to generate report
result(Result): build result
"""
for package in result.success:

View File

@ -21,7 +21,6 @@ import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Dict, Iterable
from ahriman.core.configuration import Configuration
from ahriman.core.report.jinja_template import JinjaTemplate
@ -40,13 +39,13 @@ class Email(Report, JinjaTemplate):
full_template_path(Path): path to template for full package list
host(str): SMTP host to connect
no_empty_report(bool): skip empty report generation
password(Optional[str]): password to authenticate via SMTP
password(str | None): password to authenticate via SMTP
port(int): SMTP port to connect
receivers(List[str]): list of receivers emails
receivers(list[str]): list of receivers emails
sender(str): sender email address
ssl(SmtpSSLSettings): SSL mode for SMTP connection
template_path(Path): path to template for built packages
user(Optional[str]): username to authenticate via SMTP
user(str | None): username to authenticate via SMTP
"""
def __init__(self, architecture: str, configuration: Configuration, section: str) -> None:
@ -74,13 +73,13 @@ class Email(Report, JinjaTemplate):
self.ssl = SmtpSSLSettings.from_option(configuration.get(section, "ssl", fallback="disabled"))
self.user = configuration.get(section, "user", fallback=None)
def _send(self, text: str, attachment: Dict[str, str]) -> None:
def _send(self, text: str, attachment: dict[str, str]) -> None:
"""
send email callback
Args:
text(str): email body text
attachment(Dict[str, str]): map of attachment filename to attachment content
attachment(dict[str, str]): map of attachment filename to attachment content
"""
message = MIMEMultipart()
message["From"] = self.sender
@ -104,12 +103,12 @@ class Email(Report, JinjaTemplate):
session.sendmail(self.sender, self.receivers, message.as_string())
session.quit()
def generate(self, packages: Iterable[Package], result: Result) -> None:
def generate(self, packages: list[Package], result: Result) -> None:
"""
generate report for the specified packages
Args:
packages(Iterable[Package]): list of packages to generate report
packages(list[Package]): list of packages to generate report
result(Result): build result
"""
if self.no_empty_report and not result.success:

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Iterable
from ahriman.core.configuration import Configuration
from ahriman.core.report.jinja_template import JinjaTemplate
from ahriman.core.report.report import Report
@ -50,12 +48,12 @@ class HTML(Report, JinjaTemplate):
self.report_path = configuration.getpath(section, "path")
self.template_path = configuration.getpath(section, "template_path")
def generate(self, packages: Iterable[Package], result: Result) -> None:
def generate(self, packages: list[Package], result: Result) -> None:
"""
generate report for the specified packages
Args:
packages(Iterable[Package]): list of packages to generate report
packages(list[Package]): list of packages to generate report
result(Result): build result
"""
html = self.make_html(Result(success=packages), self.template_path)

View File

@ -19,8 +19,8 @@
#
import jinja2
from collections.abc import Callable
from pathlib import Path
from typing import Callable, Dict
from ahriman.core.configuration import Configuration
from ahriman.core.sign.gpg import GPG
@ -56,11 +56,11 @@ class JinjaTemplate:
* repository - repository name, string, required
Attributes:
homepage(Optional[str]): homepage link if any (for footer)
homepage(str | None): homepage link if any (for footer)
link_path(str): prefix fo packages to download
name(str): repository name
default_pgp_key(Optional[str]): default PGP key
sign_targets(Set[SignSettings]): targets to sign enabled in configuration
default_pgp_key(str | None): default PGP key
sign_targets(set[SignSettings]): targets to sign enabled in configuration
"""
def __init__(self, section: str, configuration: Configuration) -> None:
@ -108,7 +108,7 @@ class JinjaTemplate:
"version": base.version
} for base in result.success for package, properties in base.packages.items()
]
comparator: Callable[[Dict[str, str]], str] = lambda item: item["filename"]
comparator: Callable[[dict[str, str]], str] = lambda item: item["filename"]
return template.render(
homepage=self.homepage,

View File

@ -19,8 +19,6 @@
#
from __future__ import annotations
from typing import Iterable, Type
from ahriman.core.configuration import Configuration
from ahriman.core.exceptions import ReportError
from ahriman.core.log import LazyLogging
@ -68,8 +66,8 @@ class Report(LazyLogging):
self.architecture = architecture
self.configuration = configuration
@classmethod
def load(cls: Type[Report], architecture: str, configuration: Configuration, target: str) -> Report:
@staticmethod
def load(architecture: str, configuration: Configuration, target: str) -> Report:
"""
load client from settings
@ -95,24 +93,24 @@ class Report(LazyLogging):
if provider == ReportSettings.Telegram:
from ahriman.core.report.telegram import Telegram
return Telegram(architecture, configuration, section)
return cls(architecture, configuration) # should never happen
return Report(architecture, configuration) # should never happen
def generate(self, packages: Iterable[Package], result: Result) -> None:
def generate(self, packages: list[Package], result: Result) -> None:
"""
generate report for the specified packages
Args:
packages(Iterable[Package]): list of packages to generate report
packages(list[Package]): list of packages to generate report
result(Result): build result
"""
def run(self, result: Result, packages: Iterable[Package]) -> None:
def run(self, result: Result, packages: list[Package]) -> None:
"""
run report generation
Args:
result(Result): build result
packages(Iterable[Package]): list of packages to generate report
packages(list[Package]): list of packages to generate report
Raises:
ReportFailed: in case of any report unmatched exception

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Iterable, List, Type
from ahriman.core.configuration import Configuration
from ahriman.core.triggers import Trigger
from ahriman.core.report.report import Report
@ -31,7 +29,7 @@ class ReportTrigger(Trigger):
report trigger
Attributes:
targets(List[str]): report target list
targets(list[str]): report target list
"""
CONFIGURATION_SCHEMA = {
@ -207,7 +205,7 @@ class ReportTrigger(Trigger):
self.targets = self.configuration_sections(configuration)
@classmethod
def configuration_sections(cls: Type[Trigger], configuration: Configuration) -> List[str]:
def configuration_sections(cls, configuration: Configuration) -> list[str]:
"""
extract configuration sections from configuration
@ -215,17 +213,17 @@ class ReportTrigger(Trigger):
configuration(Configuration): configuration instance
Returns:
List[str]: read configuration sections belong to this trigger
list[str]: read configuration sections belong to this trigger
"""
return configuration.getlist("report", "target", fallback=[])
def on_result(self, result: Result, packages: Iterable[Package]) -> None:
def on_result(self, result: Result, packages: list[Package]) -> None:
"""
run trigger
Args:
result(Result): build result
packages(Iterable[Package]): list of all available packages
packages(list[Package]): list of all available packages
"""
for target in self.targets:
runner = Report.load(self.architecture, self.configuration, target)

View File

@ -19,8 +19,6 @@
#
import requests # technically we could use python-telegram-bot, but it is just a single request, c'mon
from typing import Iterable
from ahriman.core.configuration import Configuration
from ahriman.core.report.jinja_template import JinjaTemplate
from ahriman.core.report.report import Report
@ -84,12 +82,12 @@ class Telegram(Report, JinjaTemplate):
self.logger.exception("could not perform request")
raise
def generate(self, packages: Iterable[Package], result: Result) -> None:
def generate(self, packages: list[Package], result: Result) -> None:
"""
generate report for the specified packages
Args:
packages(Iterable[Package]): list of packages to generate report
packages(list[Package]): list of packages to generate report
result(Result): build result
"""
if not result.success:

View File

@ -20,7 +20,6 @@
import shutil
from pathlib import Path
from typing import List
from ahriman.core.repository.repository_properties import RepositoryProperties
@ -69,12 +68,12 @@ class Cleaner(RepositoryProperties):
self.logger.info("clear build queue")
self.database.build_queue_clear(None)
def packages_built(self) -> List[Path]:
def packages_built(self) -> list[Path]:
"""
get list of files in built packages directory
Returns:
List[Path]: list of filenames from the directory
list[Path]: list of filenames from the directory
Raises:
NotImplementedError: not implemented method

Some files were not shown because too many files have changed in this diff Show More