mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-07-14 14:35:47 +00:00
fix: print current and updated version correctly
The issue appears in case if versions ar the same (e.g. rebuild); in this case printer doesn't increment version as builder does. Also util has been renamed to utils, keeping backward compatibiltiy
This commit is contained in:
@ -25,7 +25,7 @@ from typing import TypeVar
|
||||
|
||||
from ahriman import __version__
|
||||
from ahriman.application import handlers
|
||||
from ahriman.core.util import enum_values, extract_user
|
||||
from ahriman.core.utils import enum_values, extract_user
|
||||
from ahriman.models.action import Action
|
||||
from ahriman.models.build_status import BuildStatusEnum
|
||||
from ahriman.models.log_handler import LogHandler
|
||||
|
@ -27,7 +27,7 @@ from typing import Any
|
||||
from ahriman.application.application.application_properties import ApplicationProperties
|
||||
from ahriman.core.build_tools.sources import Sources
|
||||
from ahriman.core.exceptions import UnknownPackageError
|
||||
from ahriman.core.util import package_like
|
||||
from ahriman.core.utils import package_like
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.package_source import PackageSource
|
||||
from ahriman.models.result import Result
|
||||
|
@ -31,7 +31,7 @@ from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.exceptions import DuplicateRunError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.status import Client
|
||||
from ahriman.core.util import check_user
|
||||
from ahriman.core.utils import check_user
|
||||
from ahriman.models.build_status import BuildStatusEnum
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.models.waiter import Waiter
|
||||
|
@ -30,7 +30,7 @@ from string import Template
|
||||
from ahriman.core.alpm.pacman_database import PacmanDatabase
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.util import trim_package
|
||||
from ahriman.core.utils import trim_package
|
||||
from ahriman.models.pacman_synchronization import PacmanSynchronization
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
|
||||
|
@ -21,7 +21,7 @@ from pathlib import Path
|
||||
|
||||
from ahriman.core.exceptions import BuildError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.util import check_output
|
||||
from ahriman.core.utils import check_output
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
|
||||
|
@ -23,7 +23,7 @@ from pathlib import Path
|
||||
|
||||
from ahriman.core.exceptions import CalledProcessError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.util import check_output, utcnow, walk
|
||||
from ahriman.core.utils import check_output, utcnow, walk
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.pkgbuild_patch import PkgbuildPatch
|
||||
from ahriman.models.remote_source import RemoteSource
|
||||
|
@ -23,7 +23,7 @@ from ahriman.core.build_tools.sources import Sources
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.exceptions import BuildError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.util import check_output
|
||||
from ahriman.core.utils import check_output
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.pkgbuild_patch import PkgbuildPatch
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
@ -21,7 +21,7 @@ from sqlite3 import Connection
|
||||
|
||||
from ahriman.core.alpm.pacman import Pacman
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.util import package_like
|
||||
from ahriman.core.utils import package_like
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.pacman_synchronization import PacmanSynchronization
|
||||
|
||||
|
@ -21,7 +21,7 @@ from sqlite3 import Connection
|
||||
|
||||
from ahriman.core.alpm.pacman import Pacman
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.util import package_like
|
||||
from ahriman.core.utils import package_like
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.pacman_synchronization import PacmanSynchronization
|
||||
|
||||
|
@ -21,7 +21,7 @@ from sqlite3 import Connection
|
||||
|
||||
from ahriman.core.alpm.pacman import Pacman
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.util import package_like
|
||||
from ahriman.core.utils import package_like
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.pacman_synchronization import PacmanSynchronization
|
||||
|
||||
|
@ -18,7 +18,7 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from ahriman.core.formatters.string_printer import StringPrinter
|
||||
from ahriman.core.util import pretty_datetime
|
||||
from ahriman.core.utils import pretty_datetime
|
||||
from ahriman.models.aur_package import AURPackage
|
||||
from ahriman.models.property import Property
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from ahriman.core.formatters.string_printer import StringPrinter
|
||||
from ahriman.core.utils import full_version, parse_version
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.property import Property
|
||||
|
||||
@ -41,7 +42,7 @@ class UpdatePrinter(StringPrinter):
|
||||
"""
|
||||
StringPrinter.__init__(self, remote.base)
|
||||
self.package = remote
|
||||
self.local_version = local_version or "N/A"
|
||||
self.local_version = local_version
|
||||
|
||||
def properties(self) -> list[Property]:
|
||||
"""
|
||||
@ -50,4 +51,9 @@ class UpdatePrinter(StringPrinter):
|
||||
Returns:
|
||||
list[Property]: list of content properties
|
||||
"""
|
||||
return [Property(self.local_version, self.package.version, is_required=True)]
|
||||
if (pkgrel := self.package.next_pkgrel(self.local_version)) is not None:
|
||||
epoch, pkgver, _ = parse_version(self.package.version)
|
||||
effective_new_version = full_version(epoch, pkgver, pkgrel)
|
||||
else:
|
||||
effective_new_version = self.package.version
|
||||
return [Property(self.local_version or "N/A", effective_new_version, is_required=True)]
|
||||
|
@ -26,7 +26,7 @@ from ahriman.core.build_tools.sources import Sources
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.exceptions import GitRemoteError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.util import walk
|
||||
from ahriman.core.utils import walk
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.package_source import PackageSource
|
||||
from ahriman.models.remote_source import RemoteSource
|
||||
|
@ -25,7 +25,7 @@ from email.mime.text import MIMEText
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.report.jinja_template import JinjaTemplate
|
||||
from ahriman.core.report.report import Report
|
||||
from ahriman.core.util import pretty_datetime, utcnow
|
||||
from ahriman.core.utils import pretty_datetime, utcnow
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.models.result import Result
|
||||
|
@ -24,7 +24,7 @@ from pathlib import Path
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.sign.gpg import GPG
|
||||
from ahriman.core.util import pretty_datetime, pretty_size
|
||||
from ahriman.core.utils import pretty_datetime, pretty_size
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
from ahriman.models.result import Result
|
||||
from ahriman.models.sign_settings import SignSettings
|
||||
|
@ -26,7 +26,7 @@ from tempfile import TemporaryDirectory
|
||||
from ahriman.core.build_tools.task import Task
|
||||
from ahriman.core.repository.cleaner import Cleaner
|
||||
from ahriman.core.repository.package_info import PackageInfo
|
||||
from ahriman.core.util import safe_filename
|
||||
from ahriman.core.utils import safe_filename
|
||||
from ahriman.models.changes import Changes
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.package_archive import PackageArchive
|
||||
|
@ -23,7 +23,7 @@ from tempfile import TemporaryDirectory
|
||||
|
||||
from ahriman.core.build_tools.sources import Sources
|
||||
from ahriman.core.repository.repository_properties import RepositoryProperties
|
||||
from ahriman.core.util import package_like
|
||||
from ahriman.core.utils import package_like
|
||||
from ahriman.models.changes import Changes
|
||||
from ahriman.models.package import Package
|
||||
|
||||
|
@ -22,7 +22,7 @@ from pathlib import Path
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.exceptions import BuildError
|
||||
from ahriman.core.http import SyncHttpClient
|
||||
from ahriman.core.util import check_output
|
||||
from ahriman.core.utils import check_output
|
||||
from ahriman.models.sign_settings import SignSettings
|
||||
|
||||
|
||||
|
@ -23,7 +23,7 @@ import itertools
|
||||
from collections.abc import Callable, Generator
|
||||
from pathlib import Path
|
||||
|
||||
from ahriman.core.util import utcnow
|
||||
from ahriman.core.utils import utcnow
|
||||
from ahriman.models.pkgbuild_patch import PkgbuildPatch
|
||||
|
||||
|
||||
|
@ -23,7 +23,7 @@ from collections.abc import Iterable
|
||||
from functools import partial
|
||||
|
||||
from ahriman.core.exceptions import PartitionError
|
||||
from ahriman.core.util import minmax, partition
|
||||
from ahriman.core.utils import minmax, partition
|
||||
from ahriman.models.package import Package
|
||||
|
||||
|
||||
|
@ -26,7 +26,7 @@ from typing import Any
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.upload.http_upload import HttpUpload
|
||||
from ahriman.core.upload.upload import Upload
|
||||
from ahriman.core.util import walk
|
||||
from ahriman.core.utils import walk
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
|
||||
|
@ -21,7 +21,7 @@ from pathlib import Path
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.upload.upload import Upload
|
||||
from ahriman.core.util import check_output
|
||||
from ahriman.core.utils import check_output
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
|
||||
|
@ -26,7 +26,7 @@ from typing import Any
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.upload.upload import Upload
|
||||
from ahriman.core.util import walk
|
||||
from ahriman.core.utils import walk
|
||||
from ahriman.models.package import Package
|
||||
from ahriman.models.repository_id import RepositoryId
|
||||
|
||||
|
@ -17,488 +17,5 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
# pylint: disable=too-many-lines
|
||||
import datetime
|
||||
import io
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import selectors
|
||||
import subprocess
|
||||
|
||||
from collections.abc import Callable, Generator, Iterable
|
||||
from dataclasses import asdict
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from pwd import getpwuid
|
||||
from typing import Any, IO, TypeVar
|
||||
|
||||
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
|
||||
__all__ = [
|
||||
"check_output",
|
||||
"check_user",
|
||||
"dataclass_view",
|
||||
"enum_values",
|
||||
"extract_user",
|
||||
"filter_json",
|
||||
"full_version",
|
||||
"minmax",
|
||||
"package_like",
|
||||
"parse_version",
|
||||
"partition",
|
||||
"pretty_datetime",
|
||||
"pretty_size",
|
||||
"safe_filename",
|
||||
"srcinfo_property",
|
||||
"srcinfo_property_list",
|
||||
"trim_package",
|
||||
"utcnow",
|
||||
"walk",
|
||||
]
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
# pylint: disable=too-many-locals
|
||||
def check_output(*args: str, exception: Exception | Callable[[int, list[str], str, str], Exception] | None = None,
|
||||
cwd: Path | None = None, input_data: str | None = None,
|
||||
logger: logging.Logger | None = None, user: int | None = None,
|
||||
environment: dict[str, str] | None = None) -> str:
|
||||
"""
|
||||
subprocess wrapper
|
||||
|
||||
Args:
|
||||
*args(str): command line arguments
|
||||
exception(Exception | Callable[[int, list[str], str, str]] | None, optional): exception which has to be raised
|
||||
instead of default subprocess exception. If callable us is supplied, the
|
||||
:exc:`subprocess.CalledProcessError` arguments will be passed (Default value = None)
|
||||
cwd(Path | None, optional): current working directory (Default value = None)
|
||||
input_data(str | None, optional): data which will be written to command stdin (Default value = None)
|
||||
logger(logging.Logger | None, optional): logger to log command result if required (Default value = None)
|
||||
user(int | None, optional): run process as specified user (Default value = None)
|
||||
environment(dict[str, str] | None, optional): optional environment variables if any (Default value = None)
|
||||
|
||||
Returns:
|
||||
str: command output
|
||||
|
||||
Raises:
|
||||
CalledProcessError: if subprocess ended with status code different from 0 and no exception supplied
|
||||
|
||||
Examples:
|
||||
Simply call the function::
|
||||
|
||||
>>> check_output("echo", "hello world")
|
||||
|
||||
The more complicated calls which include result logging and input data are also possible::
|
||||
|
||||
>>> import logging
|
||||
>>>
|
||||
>>> logger = logging.getLogger()
|
||||
>>> check_output("python", "-c", "greeting = input('say hello: '); print(); print(greeting)",
|
||||
>>> input_data="hello world", logger=logger)
|
||||
|
||||
An additional argument ``exception`` can be supplied in order to override the default exception::
|
||||
|
||||
>>> check_output("false", exception=RuntimeError("An exception occurred"))
|
||||
"""
|
||||
# hack for IO[str] handle
|
||||
def get_io(proc: subprocess.Popen[str], channel_name: str) -> IO[str]:
|
||||
channel: IO[str] | None = getattr(proc, channel_name, None)
|
||||
return channel if channel is not None else io.StringIO()
|
||||
|
||||
# wrapper around selectors polling
|
||||
def poll(sel: selectors.BaseSelector) -> Generator[tuple[str, str], None, None]:
|
||||
for key, _ in sel.select(): # we don't need to check mask here because we have only subscribed on reading
|
||||
line = key.fileobj.readline() # type: ignore[union-attr]
|
||||
if not line: # in case of empty line we remove selector as there is no data here anymore
|
||||
sel.unregister(key.fileobj)
|
||||
continue
|
||||
line = line.rstrip()
|
||||
|
||||
if logger is not None:
|
||||
logger.debug(line)
|
||||
|
||||
yield key.data, line
|
||||
|
||||
# build system environment based on args and current environment
|
||||
environment = environment or {}
|
||||
if user is not None:
|
||||
environment["HOME"] = getpwuid(user).pw_dir
|
||||
full_environment = {
|
||||
key: value
|
||||
for key, value in os.environ.items()
|
||||
if key in ("PATH",) # whitelisted variables only
|
||||
} | environment
|
||||
|
||||
with subprocess.Popen(args, cwd=cwd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
user=user, env=full_environment, text=True, encoding="utf8", bufsize=1) as process:
|
||||
if input_data is not None:
|
||||
input_channel = get_io(process, "stdin")
|
||||
input_channel.write(input_data)
|
||||
input_channel.close()
|
||||
|
||||
selector = selectors.DefaultSelector()
|
||||
selector.register(get_io(process, "stdout"), selectors.EVENT_READ, data="stdout")
|
||||
selector.register(get_io(process, "stderr"), selectors.EVENT_READ, data="stderr")
|
||||
|
||||
result: dict[str, list[str]] = {
|
||||
"stdout": [],
|
||||
"stderr": [],
|
||||
}
|
||||
while selector.get_map(): # while there are unread selectors, keep reading
|
||||
for key_data, output in poll(selector):
|
||||
result[key_data].append(output)
|
||||
|
||||
stdout = "\n".join(result["stdout"]).rstrip("\n") # remove newline at the end of any
|
||||
stderr = "\n".join(result["stderr"]).rstrip("\n")
|
||||
|
||||
status_code = process.wait()
|
||||
if status_code != 0:
|
||||
if isinstance(exception, Exception):
|
||||
raise exception
|
||||
if callable(exception):
|
||||
raise exception(status_code, list(args), stdout, stderr)
|
||||
raise CalledProcessError(status_code, list(args), stderr)
|
||||
|
||||
return stdout
|
||||
|
||||
|
||||
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
|
||||
"""
|
||||
check if current user is the owner of the root
|
||||
|
||||
Args:
|
||||
paths(RepositoryPaths): repository paths object
|
||||
unsafe(bool): if set no user check will be performed before path creation
|
||||
|
||||
Raises:
|
||||
UnsafeRunError: if root uid differs from current uid and check is enabled
|
||||
|
||||
Examples:
|
||||
Simply run function with arguments::
|
||||
|
||||
>>> check_user(paths, unsafe=False)
|
||||
"""
|
||||
if not paths.root.exists():
|
||||
return # no directory found, skip check
|
||||
if unsafe:
|
||||
return # unsafe flag is enabled, no check performed
|
||||
current_uid = os.getuid()
|
||||
root_uid, _ = paths.root_owner
|
||||
if current_uid != root_uid:
|
||||
raise UnsafeRunError(current_uid, root_uid)
|
||||
|
||||
|
||||
def dataclass_view(instance: Any) -> dict[str, Any]:
|
||||
"""
|
||||
convert dataclass instance to json object
|
||||
|
||||
Args:
|
||||
instance(Any): dataclass instance
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: json representation of the dataclass with empty field removed
|
||||
"""
|
||||
return asdict(instance, dict_factory=lambda fields: {key: value for key, value in fields if value is not None})
|
||||
|
||||
|
||||
def enum_values(enum: type[Enum]) -> list[str]:
|
||||
"""
|
||||
generate list of enumeration values from the source
|
||||
|
||||
Args:
|
||||
enum(type[Enum]): source enumeration class
|
||||
|
||||
Returns:
|
||||
list[str]: available enumeration values as string
|
||||
"""
|
||||
return [str(key.value) for key in enum] # explicit str conversion for typing
|
||||
|
||||
|
||||
def extract_user() -> str | None:
|
||||
"""
|
||||
extract user from system environment
|
||||
|
||||
Returns:
|
||||
str | None: SUDO_USER in case if set and USER otherwise. It can return None in case if environment has been
|
||||
cleared before application start
|
||||
"""
|
||||
return os.getenv("SUDO_USER") or os.getenv("DOAS_USER") or os.getenv("USER")
|
||||
|
||||
|
||||
def filter_json(source: dict[str, Any], known_fields: Iterable[str]) -> dict[str, Any]:
|
||||
"""
|
||||
filter json object by fields used for json-to-object conversion
|
||||
|
||||
Args:
|
||||
source(dict[str, Any]): raw json object
|
||||
known_fields(Iterable[str]): list of fields which have to be known for the target object
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: json object without unknown and empty fields
|
||||
|
||||
Examples:
|
||||
This wrapper is mainly used for the dataclasses, thus the flow must be something like this::
|
||||
|
||||
>>> from dataclasses import fields
|
||||
>>> from ahriman.models.package import Package
|
||||
>>>
|
||||
>>> known_fields = [pair.name for pair in fields(Package)]
|
||||
>>> properties = filter_json(dump, known_fields)
|
||||
>>> package = Package(**properties)
|
||||
"""
|
||||
return {key: value for key, value in source.items() if key in known_fields and value is not None}
|
||||
|
||||
|
||||
def full_version(epoch: str | int | None, pkgver: str, pkgrel: str) -> str:
|
||||
"""
|
||||
generate full version from components
|
||||
|
||||
Args:
|
||||
epoch(str | int | None): package epoch if any
|
||||
pkgver(str): package version
|
||||
pkgrel(str): package release version (arch linux specific)
|
||||
|
||||
Returns:
|
||||
str: generated version
|
||||
"""
|
||||
prefix = f"{epoch}:" if epoch else ""
|
||||
return f"{prefix}{pkgver}-{pkgrel}"
|
||||
|
||||
|
||||
def minmax(source: Iterable[T], *, key: Callable[[T], Any] | None = None) -> tuple[T, T]:
|
||||
"""
|
||||
get min and max value from iterable
|
||||
|
||||
Args:
|
||||
source(Iterable[T]): source list to find min and max values
|
||||
key(Callable[[T], Any] | None, optional): key to sort (Default value = None)
|
||||
|
||||
Returns:
|
||||
tuple[T, T]: min and max values for sequence
|
||||
"""
|
||||
first_iter, second_iter = itertools.tee(source)
|
||||
# typing doesn't expose SupportLessThan, so we just ignore this in typecheck
|
||||
return min(first_iter, key=key), max(second_iter, key=key) # type: ignore
|
||||
|
||||
|
||||
def package_like(filename: Path) -> bool:
|
||||
"""
|
||||
check if file looks like package
|
||||
|
||||
Args:
|
||||
filename(Path): name of file to check
|
||||
|
||||
Returns:
|
||||
bool: True in case if name contains ``.pkg.`` and not signature, False otherwise
|
||||
"""
|
||||
name = filename.name
|
||||
return not name.startswith(".") and ".pkg." in name and not name.endswith(".sig")
|
||||
|
||||
|
||||
def parse_version(version: str) -> tuple[str | None, str, str]:
|
||||
"""
|
||||
parse version and returns its components
|
||||
|
||||
Args:
|
||||
version(str): full version string
|
||||
|
||||
Returns:
|
||||
tuple[str | None, str, str]: epoch if any, pkgver and pkgrel variables
|
||||
"""
|
||||
if ":" in version:
|
||||
epoch, version = version.split(":", maxsplit=1)
|
||||
else:
|
||||
epoch = None
|
||||
pkgver, pkgrel = version.rsplit("-", maxsplit=1)
|
||||
|
||||
return epoch, pkgver, pkgrel
|
||||
|
||||
|
||||
def partition(source: Iterable[T], predicate: Callable[[T], bool]) -> tuple[list[T], list[T]]:
|
||||
"""
|
||||
partition list into two based on predicate, based on https://docs.python.org/dev/library/itertools.html#itertools-recipes
|
||||
|
||||
Args:
|
||||
source(Iterable[T]): source list to be partitioned
|
||||
predicate(Callable[[T], bool]): filter function
|
||||
|
||||
Returns:
|
||||
tuple[list[T], list[T]]: two lists, first is which ``predicate`` is ``True``, second is ``False``
|
||||
"""
|
||||
first_iter, second_iter = itertools.tee(source)
|
||||
return list(filter(predicate, first_iter)), list(itertools.filterfalse(predicate, second_iter))
|
||||
|
||||
|
||||
def pretty_datetime(timestamp: datetime.datetime | float | int | None) -> str:
|
||||
"""
|
||||
convert datetime object to string
|
||||
|
||||
Args:
|
||||
timestamp(datetime.datetime | float | int | None): datetime to convert
|
||||
|
||||
Returns:
|
||||
str: pretty printable datetime as string
|
||||
"""
|
||||
if timestamp is None:
|
||||
return ""
|
||||
if isinstance(timestamp, (int, float)):
|
||||
timestamp = datetime.datetime.fromtimestamp(timestamp, datetime.UTC)
|
||||
return timestamp.strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
|
||||
def pretty_size(size: float | None, level: int = 0) -> str:
|
||||
"""
|
||||
convert size to string
|
||||
|
||||
Args:
|
||||
size(float | None): size to convert
|
||||
level(int, optional): represents current units, 0 is B, 1 is KiB, etc. (Default value = 0)
|
||||
|
||||
Returns:
|
||||
str: pretty printable size as string
|
||||
|
||||
Raises:
|
||||
OptionError: if size is more than 1TiB
|
||||
"""
|
||||
def str_level() -> str:
|
||||
match level:
|
||||
case 0:
|
||||
return "B"
|
||||
case 1:
|
||||
return "KiB"
|
||||
case 2:
|
||||
return "MiB"
|
||||
case 3:
|
||||
return "GiB"
|
||||
case _:
|
||||
raise OptionError(level) # must never happen actually
|
||||
|
||||
if size is None:
|
||||
return ""
|
||||
if size < 1024 or level >= 3:
|
||||
return f"{size:.1f} {str_level()}"
|
||||
return pretty_size(size / 1024, level + 1)
|
||||
|
||||
|
||||
def safe_filename(source: str) -> str:
|
||||
"""
|
||||
convert source string to its safe representation
|
||||
|
||||
Args:
|
||||
source(str): string to convert
|
||||
|
||||
Returns:
|
||||
str: result string in which all unsafe characters are replaced by dash
|
||||
"""
|
||||
# RFC-3986 https://datatracker.ietf.org/doc/html/rfc3986 states that unreserved characters are
|
||||
# https://datatracker.ietf.org/doc/html/rfc3986#section-2.3
|
||||
# unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
|
||||
# however we would like to allow some gen-delims characters in filename, because those characters are used
|
||||
# as delimiter in other URI parts. The ones we allow to are:
|
||||
# ":" - used as separator in schema and userinfo
|
||||
# "[" and "]" - used for host part
|
||||
# "@" - used as separator between host and userinfo
|
||||
return re.sub(r"[^A-Za-z\d\-._~:\[\]@]", "-", source)
|
||||
|
||||
|
||||
def srcinfo_property(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[str, Any], *,
|
||||
default: Any = None) -> Any:
|
||||
"""
|
||||
extract property from SRCINFO. This method extracts property from package if this property is presented in
|
||||
``srcinfo``. Otherwise, it looks for the same property in root srcinfo. If none found, the default value will be
|
||||
returned
|
||||
|
||||
Args:
|
||||
key(str): key to extract
|
||||
srcinfo(dict[str, Any]): root structure of SRCINFO
|
||||
package_srcinfo(dict[str, Any]): package specific SRCINFO
|
||||
default(Any, optional): the default value for the specified key (Default value = None)
|
||||
|
||||
Returns:
|
||||
Any: extracted value from SRCINFO
|
||||
"""
|
||||
return package_srcinfo.get(key) or srcinfo.get(key) or default
|
||||
|
||||
|
||||
def srcinfo_property_list(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[str, Any], *,
|
||||
architecture: str | None = None) -> list[Any]:
|
||||
"""
|
||||
extract list property from SRCINFO. Unlike :func:`srcinfo_property()` it supposes that default return value is
|
||||
always empty list. If ``architecture`` is supplied, then it will try to lookup for architecture specific values and
|
||||
will append it at the end of result
|
||||
|
||||
Args:
|
||||
key(str): key to extract
|
||||
srcinfo(dict[str, Any]): root structure of SRCINFO
|
||||
package_srcinfo(dict[str, Any]): package specific SRCINFO
|
||||
architecture(str | None, optional): package architecture if set (Default value = None)
|
||||
|
||||
Returns:
|
||||
list[Any]: list of extracted properties from SRCINFO
|
||||
"""
|
||||
values: list[Any] = srcinfo_property(key, srcinfo, package_srcinfo, default=[])
|
||||
if architecture is not None:
|
||||
values.extend(srcinfo_property(f"{key}_{architecture}", srcinfo, package_srcinfo, default=[]))
|
||||
return values
|
||||
|
||||
|
||||
def trim_package(package_name: str) -> str:
|
||||
"""
|
||||
remove version bound and description from package name. Pacman allows to specify version bound (=, <=, >= etc.) for
|
||||
packages in dependencies and also allows to specify description (via ``:``); this function removes trailing parts
|
||||
and return exact package name
|
||||
|
||||
Args:
|
||||
package_name(str): source package name
|
||||
|
||||
Returns:
|
||||
str: package name without description or version bound
|
||||
"""
|
||||
for symbol in ("<", "=", ">", ":"):
|
||||
package_name = package_name.partition(symbol)[0]
|
||||
return package_name
|
||||
|
||||
|
||||
def utcnow() -> datetime.datetime:
|
||||
"""
|
||||
get current time
|
||||
|
||||
Returns:
|
||||
datetime.datetime: current time in UTC
|
||||
"""
|
||||
return datetime.datetime.now(datetime.UTC)
|
||||
|
||||
|
||||
def walk(directory_path: Path) -> Generator[Path, None, None]:
|
||||
"""
|
||||
list all file paths in given directory
|
||||
Credits to https://stackoverflow.com/a/64915960
|
||||
|
||||
Args:
|
||||
directory_path(Path): root directory path
|
||||
|
||||
Yields:
|
||||
Path: all found files in given directory with full path
|
||||
|
||||
Examples:
|
||||
Since the :mod:`pathlib` module does not provide an alternative to :func:`os.walk()`, this wrapper
|
||||
can be used instead::
|
||||
|
||||
>>> from pathlib import Path
|
||||
>>>
|
||||
>>> for file_path in walk(Path.cwd()):
|
||||
>>> print(file_path)
|
||||
|
||||
Note, however, that unlike the original method, it does not yield directories.
|
||||
"""
|
||||
for element in directory_path.iterdir():
|
||||
if element.is_dir():
|
||||
yield from walk(element)
|
||||
continue
|
||||
yield element
|
||||
# backward compatibility wrapper
|
||||
from ahriman.core.utils import * # pylint: disable=wildcard-import,unused-wildcard-import
|
||||
|
504
src/ahriman/core/utils.py
Normal file
504
src/ahriman/core/utils.py
Normal file
@ -0,0 +1,504 @@
|
||||
#
|
||||
# Copyright (c) 2021-2024 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
# pylint: disable=too-many-lines
|
||||
import datetime
|
||||
import io
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import selectors
|
||||
import subprocess
|
||||
|
||||
from collections.abc import Callable, Generator, Iterable
|
||||
from dataclasses import asdict
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from pwd import getpwuid
|
||||
from typing import Any, IO, TypeVar
|
||||
|
||||
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
|
||||
__all__ = [
|
||||
"check_output",
|
||||
"check_user",
|
||||
"dataclass_view",
|
||||
"enum_values",
|
||||
"extract_user",
|
||||
"filter_json",
|
||||
"full_version",
|
||||
"minmax",
|
||||
"package_like",
|
||||
"parse_version",
|
||||
"partition",
|
||||
"pretty_datetime",
|
||||
"pretty_size",
|
||||
"safe_filename",
|
||||
"srcinfo_property",
|
||||
"srcinfo_property_list",
|
||||
"trim_package",
|
||||
"utcnow",
|
||||
"walk",
|
||||
]
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
# pylint: disable=too-many-locals
|
||||
def check_output(*args: str, exception: Exception | Callable[[int, list[str], str, str], Exception] | None = None,
|
||||
cwd: Path | None = None, input_data: str | None = None,
|
||||
logger: logging.Logger | None = None, user: int | None = None,
|
||||
environment: dict[str, str] | None = None) -> str:
|
||||
"""
|
||||
subprocess wrapper
|
||||
|
||||
Args:
|
||||
*args(str): command line arguments
|
||||
exception(Exception | Callable[[int, list[str], str, str]] | None, optional): exception which has to be raised
|
||||
instead of default subprocess exception. If callable us is supplied, the
|
||||
:exc:`subprocess.CalledProcessError` arguments will be passed (Default value = None)
|
||||
cwd(Path | None, optional): current working directory (Default value = None)
|
||||
input_data(str | None, optional): data which will be written to command stdin (Default value = None)
|
||||
logger(logging.Logger | None, optional): logger to log command result if required (Default value = None)
|
||||
user(int | None, optional): run process as specified user (Default value = None)
|
||||
environment(dict[str, str] | None, optional): optional environment variables if any (Default value = None)
|
||||
|
||||
Returns:
|
||||
str: command output
|
||||
|
||||
Raises:
|
||||
CalledProcessError: if subprocess ended with status code different from 0 and no exception supplied
|
||||
|
||||
Examples:
|
||||
Simply call the function::
|
||||
|
||||
>>> check_output("echo", "hello world")
|
||||
|
||||
The more complicated calls which include result logging and input data are also possible::
|
||||
|
||||
>>> import logging
|
||||
>>>
|
||||
>>> logger = logging.getLogger()
|
||||
>>> check_output("python", "-c", "greeting = input('say hello: '); print(); print(greeting)",
|
||||
>>> input_data="hello world", logger=logger)
|
||||
|
||||
An additional argument ``exception`` can be supplied in order to override the default exception::
|
||||
|
||||
>>> check_output("false", exception=RuntimeError("An exception occurred"))
|
||||
"""
|
||||
# hack for IO[str] handle
|
||||
def get_io(proc: subprocess.Popen[str], channel_name: str) -> IO[str]:
|
||||
channel: IO[str] | None = getattr(proc, channel_name, None)
|
||||
return channel if channel is not None else io.StringIO()
|
||||
|
||||
# wrapper around selectors polling
|
||||
def poll(sel: selectors.BaseSelector) -> Generator[tuple[str, str], None, None]:
|
||||
for key, _ in sel.select(): # we don't need to check mask here because we have only subscribed on reading
|
||||
line = key.fileobj.readline() # type: ignore[union-attr]
|
||||
if not line: # in case of empty line we remove selector as there is no data here anymore
|
||||
sel.unregister(key.fileobj)
|
||||
continue
|
||||
line = line.rstrip()
|
||||
|
||||
if logger is not None:
|
||||
logger.debug(line)
|
||||
|
||||
yield key.data, line
|
||||
|
||||
# build system environment based on args and current environment
|
||||
environment = environment or {}
|
||||
if user is not None:
|
||||
environment["HOME"] = getpwuid(user).pw_dir
|
||||
full_environment = {
|
||||
key: value
|
||||
for key, value in os.environ.items()
|
||||
if key in ("PATH",) # whitelisted variables only
|
||||
} | environment
|
||||
|
||||
with subprocess.Popen(args, cwd=cwd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
user=user, env=full_environment, text=True, encoding="utf8", bufsize=1) as process:
|
||||
if input_data is not None:
|
||||
input_channel = get_io(process, "stdin")
|
||||
input_channel.write(input_data)
|
||||
input_channel.close()
|
||||
|
||||
selector = selectors.DefaultSelector()
|
||||
selector.register(get_io(process, "stdout"), selectors.EVENT_READ, data="stdout")
|
||||
selector.register(get_io(process, "stderr"), selectors.EVENT_READ, data="stderr")
|
||||
|
||||
result: dict[str, list[str]] = {
|
||||
"stdout": [],
|
||||
"stderr": [],
|
||||
}
|
||||
while selector.get_map(): # while there are unread selectors, keep reading
|
||||
for key_data, output in poll(selector):
|
||||
result[key_data].append(output)
|
||||
|
||||
stdout = "\n".join(result["stdout"]).rstrip("\n") # remove newline at the end of any
|
||||
stderr = "\n".join(result["stderr"]).rstrip("\n")
|
||||
|
||||
status_code = process.wait()
|
||||
if status_code != 0:
|
||||
if isinstance(exception, Exception):
|
||||
raise exception
|
||||
if callable(exception):
|
||||
raise exception(status_code, list(args), stdout, stderr)
|
||||
raise CalledProcessError(status_code, list(args), stderr)
|
||||
|
||||
return stdout
|
||||
|
||||
|
||||
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
|
||||
"""
|
||||
check if current user is the owner of the root
|
||||
|
||||
Args:
|
||||
paths(RepositoryPaths): repository paths object
|
||||
unsafe(bool): if set no user check will be performed before path creation
|
||||
|
||||
Raises:
|
||||
UnsafeRunError: if root uid differs from current uid and check is enabled
|
||||
|
||||
Examples:
|
||||
Simply run function with arguments::
|
||||
|
||||
>>> check_user(paths, unsafe=False)
|
||||
"""
|
||||
if not paths.root.exists():
|
||||
return # no directory found, skip check
|
||||
if unsafe:
|
||||
return # unsafe flag is enabled, no check performed
|
||||
current_uid = os.getuid()
|
||||
root_uid, _ = paths.root_owner
|
||||
if current_uid != root_uid:
|
||||
raise UnsafeRunError(current_uid, root_uid)
|
||||
|
||||
|
||||
def dataclass_view(instance: Any) -> dict[str, Any]:
|
||||
"""
|
||||
convert dataclass instance to json object
|
||||
|
||||
Args:
|
||||
instance(Any): dataclass instance
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: json representation of the dataclass with empty field removed
|
||||
"""
|
||||
return asdict(instance, dict_factory=lambda fields: {key: value for key, value in fields if value is not None})
|
||||
|
||||
|
||||
def enum_values(enum: type[Enum]) -> list[str]:
|
||||
"""
|
||||
generate list of enumeration values from the source
|
||||
|
||||
Args:
|
||||
enum(type[Enum]): source enumeration class
|
||||
|
||||
Returns:
|
||||
list[str]: available enumeration values as string
|
||||
"""
|
||||
return [str(key.value) for key in enum] # explicit str conversion for typing
|
||||
|
||||
|
||||
def extract_user() -> str | None:
|
||||
"""
|
||||
extract user from system environment
|
||||
|
||||
Returns:
|
||||
str | None: SUDO_USER in case if set and USER otherwise. It can return None in case if environment has been
|
||||
cleared before application start
|
||||
"""
|
||||
return os.getenv("SUDO_USER") or os.getenv("DOAS_USER") or os.getenv("USER")
|
||||
|
||||
|
||||
def filter_json(source: dict[str, Any], known_fields: Iterable[str]) -> dict[str, Any]:
|
||||
"""
|
||||
filter json object by fields used for json-to-object conversion
|
||||
|
||||
Args:
|
||||
source(dict[str, Any]): raw json object
|
||||
known_fields(Iterable[str]): list of fields which have to be known for the target object
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: json object without unknown and empty fields
|
||||
|
||||
Examples:
|
||||
This wrapper is mainly used for the dataclasses, thus the flow must be something like this::
|
||||
|
||||
>>> from dataclasses import fields
|
||||
>>> from ahriman.models.package import Package
|
||||
>>>
|
||||
>>> known_fields = [pair.name for pair in fields(Package)]
|
||||
>>> properties = filter_json(dump, known_fields)
|
||||
>>> package = Package(**properties)
|
||||
"""
|
||||
return {key: value for key, value in source.items() if key in known_fields and value is not None}
|
||||
|
||||
|
||||
def full_version(epoch: str | int | None, pkgver: str, pkgrel: str) -> str:
|
||||
"""
|
||||
generate full version from components
|
||||
|
||||
Args:
|
||||
epoch(str | int | None): package epoch if any
|
||||
pkgver(str): package version
|
||||
pkgrel(str): package release version (arch linux specific)
|
||||
|
||||
Returns:
|
||||
str: generated version
|
||||
"""
|
||||
prefix = f"{epoch}:" if epoch else ""
|
||||
return f"{prefix}{pkgver}-{pkgrel}"
|
||||
|
||||
|
||||
def minmax(source: Iterable[T], *, key: Callable[[T], Any] | None = None) -> tuple[T, T]:
|
||||
"""
|
||||
get min and max value from iterable
|
||||
|
||||
Args:
|
||||
source(Iterable[T]): source list to find min and max values
|
||||
key(Callable[[T], Any] | None, optional): key to sort (Default value = None)
|
||||
|
||||
Returns:
|
||||
tuple[T, T]: min and max values for sequence
|
||||
"""
|
||||
first_iter, second_iter = itertools.tee(source)
|
||||
# typing doesn't expose SupportLessThan, so we just ignore this in typecheck
|
||||
return min(first_iter, key=key), max(second_iter, key=key) # type: ignore
|
||||
|
||||
|
||||
def package_like(filename: Path) -> bool:
|
||||
"""
|
||||
check if file looks like package
|
||||
|
||||
Args:
|
||||
filename(Path): name of file to check
|
||||
|
||||
Returns:
|
||||
bool: True in case if name contains ``.pkg.`` and not signature, False otherwise
|
||||
"""
|
||||
name = filename.name
|
||||
return not name.startswith(".") and ".pkg." in name and not name.endswith(".sig")
|
||||
|
||||
|
||||
def parse_version(version: str) -> tuple[str | None, str, str]:
|
||||
"""
|
||||
parse version and returns its components
|
||||
|
||||
Args:
|
||||
version(str): full version string
|
||||
|
||||
Returns:
|
||||
tuple[str | None, str, str]: epoch if any, pkgver and pkgrel variables
|
||||
"""
|
||||
if ":" in version:
|
||||
epoch, version = version.split(":", maxsplit=1)
|
||||
else:
|
||||
epoch = None
|
||||
pkgver, pkgrel = version.rsplit("-", maxsplit=1)
|
||||
|
||||
return epoch, pkgver, pkgrel
|
||||
|
||||
|
||||
def partition(source: Iterable[T], predicate: Callable[[T], bool]) -> tuple[list[T], list[T]]:
|
||||
"""
|
||||
partition list into two based on predicate, based on https://docs.python.org/dev/library/itertools.html#itertools-recipes
|
||||
|
||||
Args:
|
||||
source(Iterable[T]): source list to be partitioned
|
||||
predicate(Callable[[T], bool]): filter function
|
||||
|
||||
Returns:
|
||||
tuple[list[T], list[T]]: two lists, first is which ``predicate`` is ``True``, second is ``False``
|
||||
"""
|
||||
first_iter, second_iter = itertools.tee(source)
|
||||
return list(filter(predicate, first_iter)), list(itertools.filterfalse(predicate, second_iter))
|
||||
|
||||
|
||||
def pretty_datetime(timestamp: datetime.datetime | float | int | None) -> str:
|
||||
"""
|
||||
convert datetime object to string
|
||||
|
||||
Args:
|
||||
timestamp(datetime.datetime | float | int | None): datetime to convert
|
||||
|
||||
Returns:
|
||||
str: pretty printable datetime as string
|
||||
"""
|
||||
if timestamp is None:
|
||||
return ""
|
||||
if isinstance(timestamp, (int, float)):
|
||||
timestamp = datetime.datetime.fromtimestamp(timestamp, datetime.UTC)
|
||||
return timestamp.strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
|
||||
def pretty_size(size: float | None, level: int = 0) -> str:
|
||||
"""
|
||||
convert size to string
|
||||
|
||||
Args:
|
||||
size(float | None): size to convert
|
||||
level(int, optional): represents current units, 0 is B, 1 is KiB, etc. (Default value = 0)
|
||||
|
||||
Returns:
|
||||
str: pretty printable size as string
|
||||
|
||||
Raises:
|
||||
OptionError: if size is more than 1TiB
|
||||
"""
|
||||
def str_level() -> str:
|
||||
match level:
|
||||
case 0:
|
||||
return "B"
|
||||
case 1:
|
||||
return "KiB"
|
||||
case 2:
|
||||
return "MiB"
|
||||
case 3:
|
||||
return "GiB"
|
||||
case _:
|
||||
raise OptionError(level) # must never happen actually
|
||||
|
||||
if size is None:
|
||||
return ""
|
||||
if size < 1024 or level >= 3:
|
||||
return f"{size:.1f} {str_level()}"
|
||||
return pretty_size(size / 1024, level + 1)
|
||||
|
||||
|
||||
def safe_filename(source: str) -> str:
|
||||
"""
|
||||
convert source string to its safe representation
|
||||
|
||||
Args:
|
||||
source(str): string to convert
|
||||
|
||||
Returns:
|
||||
str: result string in which all unsafe characters are replaced by dash
|
||||
"""
|
||||
# RFC-3986 https://datatracker.ietf.org/doc/html/rfc3986 states that unreserved characters are
|
||||
# https://datatracker.ietf.org/doc/html/rfc3986#section-2.3
|
||||
# unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
|
||||
# however we would like to allow some gen-delims characters in filename, because those characters are used
|
||||
# as delimiter in other URI parts. The ones we allow to are:
|
||||
# ":" - used as separator in schema and userinfo
|
||||
# "[" and "]" - used for host part
|
||||
# "@" - used as separator between host and userinfo
|
||||
return re.sub(r"[^A-Za-z\d\-._~:\[\]@]", "-", source)
|
||||
|
||||
|
||||
def srcinfo_property(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[str, Any], *,
|
||||
default: Any = None) -> Any:
|
||||
"""
|
||||
extract property from SRCINFO. This method extracts property from package if this property is presented in
|
||||
``srcinfo``. Otherwise, it looks for the same property in root srcinfo. If none found, the default value will be
|
||||
returned
|
||||
|
||||
Args:
|
||||
key(str): key to extract
|
||||
srcinfo(dict[str, Any]): root structure of SRCINFO
|
||||
package_srcinfo(dict[str, Any]): package specific SRCINFO
|
||||
default(Any, optional): the default value for the specified key (Default value = None)
|
||||
|
||||
Returns:
|
||||
Any: extracted value from SRCINFO
|
||||
"""
|
||||
return package_srcinfo.get(key) or srcinfo.get(key) or default
|
||||
|
||||
|
||||
def srcinfo_property_list(key: str, srcinfo: dict[str, Any], package_srcinfo: dict[str, Any], *,
|
||||
architecture: str | None = None) -> list[Any]:
|
||||
"""
|
||||
extract list property from SRCINFO. Unlike :func:`srcinfo_property()` it supposes that default return value is
|
||||
always empty list. If ``architecture`` is supplied, then it will try to lookup for architecture specific values and
|
||||
will append it at the end of result
|
||||
|
||||
Args:
|
||||
key(str): key to extract
|
||||
srcinfo(dict[str, Any]): root structure of SRCINFO
|
||||
package_srcinfo(dict[str, Any]): package specific SRCINFO
|
||||
architecture(str | None, optional): package architecture if set (Default value = None)
|
||||
|
||||
Returns:
|
||||
list[Any]: list of extracted properties from SRCINFO
|
||||
"""
|
||||
values: list[Any] = srcinfo_property(key, srcinfo, package_srcinfo, default=[])
|
||||
if architecture is not None:
|
||||
values.extend(srcinfo_property(f"{key}_{architecture}", srcinfo, package_srcinfo, default=[]))
|
||||
return values
|
||||
|
||||
|
||||
def trim_package(package_name: str) -> str:
|
||||
"""
|
||||
remove version bound and description from package name. Pacman allows to specify version bound (=, <=, >= etc.) for
|
||||
packages in dependencies and also allows to specify description (via ``:``); this function removes trailing parts
|
||||
and return exact package name
|
||||
|
||||
Args:
|
||||
package_name(str): source package name
|
||||
|
||||
Returns:
|
||||
str: package name without description or version bound
|
||||
"""
|
||||
for symbol in ("<", "=", ">", ":"):
|
||||
package_name = package_name.partition(symbol)[0]
|
||||
return package_name
|
||||
|
||||
|
||||
def utcnow() -> datetime.datetime:
|
||||
"""
|
||||
get current time
|
||||
|
||||
Returns:
|
||||
datetime.datetime: current time in UTC
|
||||
"""
|
||||
return datetime.datetime.now(datetime.UTC)
|
||||
|
||||
|
||||
def walk(directory_path: Path) -> Generator[Path, None, None]:
|
||||
"""
|
||||
list all file paths in given directory
|
||||
Credits to https://stackoverflow.com/a/64915960
|
||||
|
||||
Args:
|
||||
directory_path(Path): root directory path
|
||||
|
||||
Yields:
|
||||
Path: all found files in given directory with full path
|
||||
|
||||
Examples:
|
||||
Since the :mod:`pathlib` module does not provide an alternative to :func:`os.walk()`, this wrapper
|
||||
can be used instead::
|
||||
|
||||
>>> from pathlib import Path
|
||||
>>>
|
||||
>>> for file_path in walk(Path.cwd()):
|
||||
>>> print(file_path)
|
||||
|
||||
Note, however, that unlike the original method, it does not yield directories.
|
||||
"""
|
||||
for element in directory_path.iterdir():
|
||||
if element.is_dir():
|
||||
yield from walk(element)
|
||||
continue
|
||||
yield element
|
@ -25,7 +25,7 @@ from dataclasses import dataclass, field, fields
|
||||
from pyalpm import Package # type: ignore[import-not-found]
|
||||
from typing import Any, Self
|
||||
|
||||
from ahriman.core.util import filter_json, full_version
|
||||
from ahriman.core.utils import filter_json, full_version
|
||||
|
||||
|
||||
@dataclass(frozen=True, kw_only=True)
|
||||
|
@ -21,7 +21,7 @@ from dataclasses import dataclass, field, fields
|
||||
from enum import StrEnum
|
||||
from typing import Any, Self
|
||||
|
||||
from ahriman.core.util import filter_json, pretty_datetime, utcnow
|
||||
from ahriman.core.utils import filter_json, pretty_datetime, utcnow
|
||||
|
||||
|
||||
class BuildStatusEnum(StrEnum):
|
||||
|
@ -20,7 +20,7 @@
|
||||
from dataclasses import dataclass, fields
|
||||
from typing import Any, Self
|
||||
|
||||
from ahriman.core.util import dataclass_view, filter_json
|
||||
from ahriman.core.utils import dataclass_view, filter_json
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
@ -20,7 +20,7 @@
|
||||
from dataclasses import dataclass, fields
|
||||
from typing import Any, Self
|
||||
|
||||
from ahriman.core.util import filter_json
|
||||
from ahriman.core.utils import filter_json
|
||||
from ahriman.models.build_status import BuildStatus
|
||||
from ahriman.models.package import Package
|
||||
|
||||
|
@ -20,7 +20,7 @@
|
||||
from dataclasses import dataclass, field, fields
|
||||
from typing import Any, Self
|
||||
|
||||
from ahriman.core.util import dataclass_view, filter_json
|
||||
from ahriman.core.utils import dataclass_view, filter_json
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
@ -23,7 +23,7 @@ from collections.abc import Iterable
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
from ahriman.core.util import trim_package
|
||||
from ahriman.core.utils import trim_package
|
||||
|
||||
|
||||
@dataclass(frozen=True, kw_only=True)
|
||||
|
@ -20,7 +20,7 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Self
|
||||
|
||||
from ahriman.core.util import dataclass_view
|
||||
from ahriman.core.utils import dataclass_view
|
||||
from ahriman.models.build_status import BuildStatus
|
||||
from ahriman.models.counters import Counters
|
||||
|
||||
|
@ -34,7 +34,7 @@ from ahriman.core.alpm.pacman import Pacman
|
||||
from ahriman.core.alpm.remote import AUR, Official, OfficialSyncdb
|
||||
from ahriman.core.exceptions import PackageInfoError
|
||||
from ahriman.core.log import LazyLogging
|
||||
from ahriman.core.util import check_output, dataclass_view, full_version, parse_version, srcinfo_property_list, utcnow
|
||||
from ahriman.core.utils import check_output, dataclass_view, full_version, parse_version, srcinfo_property_list, utcnow
|
||||
from ahriman.models.package_description import PackageDescription
|
||||
from ahriman.models.package_source import PackageSource
|
||||
from ahriman.models.remote_source import RemoteSource
|
||||
@ -539,20 +539,23 @@ class Package(LazyLogging):
|
||||
result: int = vercmp(self.version, remote_version)
|
||||
return result < 0
|
||||
|
||||
def next_pkgrel(self, local_version: str) -> str | None:
|
||||
def next_pkgrel(self, local_version: str | None) -> str | None:
|
||||
"""
|
||||
generate next pkgrel variable. The package release will be incremented if ``local_version`` is more or equal to
|
||||
the :attr:`version`; in this case the function will return new pkgrel value, otherwise ``None`` will be
|
||||
returned
|
||||
|
||||
Args:
|
||||
local_version(str): locally stored package version
|
||||
local_version(str | None): locally stored package version if available
|
||||
|
||||
Returns:
|
||||
str | None: new generated package release version if any. In case if the release contains dot (e.g. 1.2),
|
||||
the minor part will be incremented by 1. If the release does not contain major.minor notation, the minor
|
||||
version equals to 1 will be appended
|
||||
"""
|
||||
if local_version is None:
|
||||
return None # local version not found, keep upstream pkgrel
|
||||
|
||||
epoch, pkgver, _ = parse_version(self.version)
|
||||
local_epoch, local_pkgver, local_pkgrel = parse_version(local_version)
|
||||
|
||||
|
@ -26,7 +26,7 @@ from typing import IO
|
||||
from ahriman.core.alpm.pacman import Pacman
|
||||
from ahriman.core.alpm.remote import OfficialSyncdb
|
||||
from ahriman.core.exceptions import UnknownPackageError
|
||||
from ahriman.core.util import walk
|
||||
from ahriman.core.utils import walk
|
||||
from ahriman.models.dependencies import Dependencies
|
||||
from ahriman.models.filesystem_package import FilesystemPackage
|
||||
from ahriman.models.package import Package
|
||||
|
@ -22,7 +22,7 @@ from pathlib import Path
|
||||
from pyalpm import Package # type: ignore[import-not-found]
|
||||
from typing import Any, Self
|
||||
|
||||
from ahriman.core.util import dataclass_view, filter_json, trim_package
|
||||
from ahriman.core.utils import dataclass_view, filter_json, trim_package
|
||||
from ahriman.models.aur_package import AURPackage
|
||||
|
||||
|
||||
|
@ -23,7 +23,7 @@ from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from ahriman.core.util import package_like
|
||||
from ahriman.core.utils import package_like
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
|
||||
|
@ -23,7 +23,7 @@ from dataclasses import dataclass, fields
|
||||
from pathlib import Path
|
||||
from typing import Any, Generator, Self
|
||||
|
||||
from ahriman.core.util import dataclass_view, filter_json
|
||||
from ahriman.core.utils import dataclass_view, filter_json
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
@ -22,7 +22,7 @@ from pathlib import Path
|
||||
from typing import Any, Self
|
||||
|
||||
from ahriman.core.exceptions import InitializeError
|
||||
from ahriman.core.util import dataclass_view, filter_json
|
||||
from ahriman.core.utils import dataclass_view, filter_json
|
||||
from ahriman.models.package_source import PackageSource
|
||||
|
||||
|
||||
|
@ -21,7 +21,7 @@ from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from ahriman.core.util import dataclass_view
|
||||
from ahriman.core.utils import dataclass_view
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
@ -20,7 +20,7 @@
|
||||
from aiohttp.web import Response, json_response
|
||||
from collections.abc import Callable
|
||||
|
||||
from ahriman.core.util import partition
|
||||
from ahriman.core.utils import partition
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.web.views.base import BaseView
|
||||
|
||||
|
@ -22,7 +22,7 @@ import aiohttp_apispec # type: ignore[import-untyped]
|
||||
from aiohttp.web import HTTPBadRequest, HTTPNoContent, HTTPNotFound, Response, json_response
|
||||
|
||||
from ahriman.core.exceptions import UnknownPackageError
|
||||
from ahriman.core.util import pretty_datetime
|
||||
from ahriman.core.utils import pretty_datetime
|
||||
from ahriman.models.log_record_id import LogRecordId
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.web.schemas import AuthSchema, ErrorSchema, LogsSchema, PackageNameSchema, PackageVersionSchema, \
|
||||
|
Reference in New Issue
Block a user