add models tests (#1)

also replace single quote to double one to confort PEP docstring
+ move _check_output to class properties to make it available for
mocking
This commit is contained in:
Evgenii Alekseev 2021-03-22 02:30:26 +03:00
parent 69499b2d0a
commit 449b8a2033
64 changed files with 1217 additions and 842 deletions

View File

@ -1,4 +1,4 @@
.PHONY: archive archive_directory archlinux check clean directory push version
.PHONY: archive archive_directory archlinux check clean directory push tests version
.DEFAULT_GOAL := archlinux
PROJECT := ahriman
@ -16,21 +16,21 @@ archive: archive_directory
archive_directory: $(TARGET_FILES)
rm -fr $(addprefix $(PROJECT)/, $(IGNORE_FILES))
find $(PROJECT) -type f -name '*.pyc' -delete
find $(PROJECT) -depth -type d -name '__pycache__' -execdir rm -rf {} +
find $(PROJECT) -depth -type d -name '*.egg-info' -execdir rm -rf {} +
find "$(PROJECT)" -type f -name "*.pyc" -delete
find "$(PROJECT)" -depth -type d -name "__pycache__" -execdir rm -rf {} +
find "$(PROJECT)" -depth -type d -name "*.egg-info" -execdir rm -rf {} +
archlinux: archive
sed -i "/sha512sums=('[0-9A-Fa-f]*/s/[^'][^)]*/sha512sums=('$$(sha512sum $(PROJECT)-$(VERSION)-src.tar.xz | awk '{print $$1}')'/" package/archlinux/PKGBUILD
sed -i "s/pkgver=[0-9.]*/pkgver=$(VERSION)/" package/archlinux/PKGBUILD
check:
cd src && mypy --implicit-reexport --strict -p $(PROJECT)
cd src && find $(PROJECT) -name '*.py' -execdir autopep8 --max-line-length 120 -aa -i {} +
cd src && pylint --rcfile=../.pylintrc $(PROJECT)
cd src && mypy --implicit-reexport --strict -p "$(PROJECT)"
find "src/$(PROJECT)" tests -name "*.py" -execdir autopep8 --max-line-length 120 -aa -i {} +
cd src && pylint --rcfile=../.pylintrc "$(PROJECT)"
clean:
find . -type f -name '$(PROJECT)-*-src.tar.xz' -delete
find . -type f -name "$(PROJECT)-*-src.tar.xz" -delete
rm -rf "$(PROJECT)"
directory: clean
@ -43,8 +43,11 @@ push: archlinux
git tag "$(VERSION)"
git push --tags
tests:
python setup.py test
version:
ifndef VERSION
$(error VERSION is required, but not set)
endif
sed -i "/__version__ = '[0-9.]*/s/[^'][^)]*/__version__ = '$(VERSION)'/" src/ahriman/version.py
sed -i '/__version__ = "[0-9.]*/s/[^"][^)]*/__version__ = "$(VERSION)"/' src/ahriman/version.py

View File

@ -23,7 +23,7 @@ optdepends=('aws-cli: sync to s3'
source=("https://github.com/arcan1s/ahriman/releases/download/$pkgver/$pkgname-$pkgver-src.tar.xz"
'ahriman.sysusers'
'ahriman.tmpfiles')
sha512sums=('a1db44390ce1785da3d535e3cfd2242d8d56070228eb9b3c1d5629163b65941d60753c481c0fdc69e475e534a828ceea39568dc6711abeee092616dac08e31a9'
sha512sums=('ed1ef5ee9a2fb25ee1220acb4e7ac30eec0783375766f7ca8c812e1aa84e28d8426e382c1ec3d5357f1141ff683f9dd346970fe1f8bb0c7e29373ee55e478ef4'
'13718afec2c6786a18f0b223ef8e58dccf0688bca4cdbe203f14071f5031ed20120eb0ce38b52c76cfd6e8b6581a9c9eaa2743eb11abbaca637451a84c33f075'
'55b20f6da3d66e7bbf2add5d95a3b60632df121717d25a993e56e737d14f51fe063eb6f1b38bd81cc32e05db01c0c1d80aaa720c45cde87f238d8b46cdb8cbc4')
backup=('etc/ahriman.ini'

5
setup.cfg Normal file
View File

@ -0,0 +1,5 @@
[aliases]
test=pytest
[tool:pytest]
addopts = --pspec

View File

@ -4,69 +4,72 @@ from os import path
here = path.abspath(path.dirname(__file__))
metadata = dict()
with open(convert_path('src/ahriman/version.py')) as metadata_file:
with open(convert_path("src/ahriman/version.py")) as metadata_file:
exec(metadata_file.read(), metadata)
setup(
name='ahriman',
name="ahriman",
version=metadata['__version__'],
version=metadata["__version__"],
zip_safe=False,
description='ArcHlinux ReposItory MANager',
description="ArcHlinux ReposItory MANager",
author='arcanis',
author_email='',
url='https://github.com/arcan1s/ahriman',
author="arcanis",
author_email="",
url="https://github.com/arcan1s/ahriman",
license='GPL3',
license="GPL3",
packages=find_packages('src'),
package_dir={'': 'src'},
packages=find_packages("src"),
package_dir={"": "src"},
dependency_links=[
],
install_requires=[
'aur',
'pyalpm',
'srcinfo',
"aur",
"pyalpm",
"srcinfo",
],
setup_requires=[
'pytest-runner',
"pytest-runner",
],
tests_require=[
'pytest',
"pytest",
"pytest-mock",
"pytest-pspec",
"pytest-resource-path",
],
include_package_data=True,
scripts=[
'package/bin/ahriman',
"package/bin/ahriman",
],
data_files=[
('/etc', [
'package/etc/ahriman.ini',
("/etc", [
"package/etc/ahriman.ini",
]),
('/etc/ahriman.ini.d', [
'package/etc/ahriman.ini.d/logging.ini',
("/etc/ahriman.ini.d", [
"package/etc/ahriman.ini.d/logging.ini",
]),
('lib/systemd/system', [
'package/lib/systemd/system/ahriman@.service',
'package/lib/systemd/system/ahriman@.timer',
'package/lib/systemd/system/ahriman-web@.service',
("lib/systemd/system", [
"package/lib/systemd/system/ahriman@.service",
"package/lib/systemd/system/ahriman@.timer",
"package/lib/systemd/system/ahriman-web@.service",
]),
('share/ahriman', [
'package/share/ahriman/build-status.jinja2',
'package/share/ahriman/repo-index.jinja2',
'package/share/ahriman/search.jinja2',
'package/share/ahriman/search-line.jinja2',
'package/share/ahriman/sorttable.jinja2',
'package/share/ahriman/style.jinja2',
("share/ahriman", [
"package/share/ahriman/build-status.jinja2",
"package/share/ahriman/repo-index.jinja2",
"package/share/ahriman/search.jinja2",
"package/share/ahriman/search-line.jinja2",
"package/share/ahriman/sorttable.jinja2",
"package/share/ahriman/style.jinja2",
]),
],
extras_require={
'html-templates': ['Jinja2'],
'test': ['coverage', 'pytest'],
'web': ['Jinja2', 'aiohttp', 'aiohttp_jinja2', 'requests'],
"html-templates": ["Jinja2"],
"test": ["coverage", "pytest", "pytest-mock", "pytest-pspec", "pytest-resource-path"],
"web": ["Jinja2", "aiohttp", "aiohttp_jinja2", "requests"],
},
)

View File

@ -24,80 +24,80 @@ import ahriman.application.handlers as handlers
import ahriman.version as version
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='ahriman', description='ArcHlinux ReposItory MANager')
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="ahriman", description="ArcHlinux ReposItory MANager")
parser.add_argument(
'-a',
'--architecture',
help='target architectures (can be used multiple times)',
action='append')
parser.add_argument('-c', '--config', help='configuration path', default='/etc/ahriman.ini')
parser.add_argument('--force', help='force run, remove file lock', action='store_true')
parser.add_argument('--lock', help='lock file', default='/tmp/ahriman.lock')
parser.add_argument('--no-log', help='redirect all log messages to stderr', action='store_true')
parser.add_argument('--no-report', help='force disable reporting to web service', action='store_true')
parser.add_argument('--unsafe', help='allow to run ahriman as non-ahriman user', action='store_true')
parser.add_argument('-v', '--version', action='version', version=version.__version__)
subparsers = parser.add_subparsers(title='command')
"-a",
"--architecture",
help="target architectures (can be used multiple times)",
action="append")
parser.add_argument("-c", "--config", help="configuration path", default="/etc/ahriman.ini")
parser.add_argument("--force", help="force run, remove file lock", action="store_true")
parser.add_argument("--lock", help="lock file", default="/tmp/ahriman.lock")
parser.add_argument("--no-log", help="redirect all log messages to stderr", action="store_true")
parser.add_argument("--no-report", help="force disable reporting to web service", action="store_true")
parser.add_argument("--unsafe", help="allow to run ahriman as non-ahriman user", action="store_true")
parser.add_argument("-v", "--version", action="version", version=version.__version__)
subparsers = parser.add_subparsers(title="command")
add_parser = subparsers.add_parser('add', description='add package')
add_parser.add_argument('package', help='package base/name or archive path', nargs='+')
add_parser.add_argument('--without-dependencies', help='do not add dependencies', action='store_true')
add_parser = subparsers.add_parser("add", description="add package")
add_parser.add_argument("package", help="package base/name or archive path", nargs="+")
add_parser.add_argument("--without-dependencies", help="do not add dependencies", action="store_true")
add_parser.set_defaults(handler=handlers.Add)
check_parser = subparsers.add_parser('check', description='check for updates. Same as update --dry-run --no-manual')
check_parser.add_argument('package', help='filter check by package base', nargs='*')
check_parser.add_argument('--no-vcs', help='do not check VCS packages', action='store_true')
check_parser = subparsers.add_parser("check", description="check for updates. Same as update --dry-run --no-manual")
check_parser.add_argument("package", help="filter check by package base", nargs="*")
check_parser.add_argument("--no-vcs", help="do not check VCS packages", action="store_true")
check_parser.set_defaults(handler=handlers.Update, no_aur=False, no_manual=True, dry_run=True)
clean_parser = subparsers.add_parser('clean', description='clear all local caches')
clean_parser.add_argument('--no-build', help='do not clear directory with package sources', action='store_true')
clean_parser.add_argument('--no-cache', help='do not clear directory with package caches', action='store_true')
clean_parser.add_argument('--no-chroot', help='do not clear build chroot', action='store_true')
clean_parser = subparsers.add_parser("clean", description="clear all local caches")
clean_parser.add_argument("--no-build", help="do not clear directory with package sources", action="store_true")
clean_parser.add_argument("--no-cache", help="do not clear directory with package caches", action="store_true")
clean_parser.add_argument("--no-chroot", help="do not clear build chroot", action="store_true")
clean_parser.add_argument(
'--no-manual',
help='do not clear directory with manually added packages',
action='store_true')
clean_parser.add_argument('--no-packages', help='do not clear directory with built packages', action='store_true')
"--no-manual",
help="do not clear directory with manually added packages",
action="store_true")
clean_parser.add_argument("--no-packages", help="do not clear directory with built packages", action="store_true")
clean_parser.set_defaults(handler=handlers.Clean)
config_parser = subparsers.add_parser('config', description='dump configuration for specified architecture')
config_parser = subparsers.add_parser("config", description="dump configuration for specified architecture")
config_parser.set_defaults(handler=handlers.Dump, lock=None, no_report=True, unsafe=True)
rebuild_parser = subparsers.add_parser('rebuild', description='rebuild whole repository')
rebuild_parser = subparsers.add_parser("rebuild", description="rebuild whole repository")
rebuild_parser.set_defaults(handler=handlers.Rebuild)
remove_parser = subparsers.add_parser('remove', description='remove package')
remove_parser.add_argument('package', help='package name or base', nargs='+')
remove_parser = subparsers.add_parser("remove", description="remove package")
remove_parser.add_argument("package", help="package name or base", nargs="+")
remove_parser.set_defaults(handler=handlers.Remove)
report_parser = subparsers.add_parser('report', description='generate report')
report_parser.add_argument('target', help='target to generate report', nargs='*')
report_parser = subparsers.add_parser("report", description="generate report")
report_parser.add_argument("target", help="target to generate report", nargs="*")
report_parser.set_defaults(handler=handlers.Report)
status_parser = subparsers.add_parser('status', description='request status of the package')
status_parser.add_argument('--ahriman', help='get service status itself', action='store_true')
status_parser.add_argument('package', help='filter status by package base', nargs='*')
status_parser = subparsers.add_parser("status", description="request status of the package")
status_parser.add_argument("--ahriman", help="get service status itself", action="store_true")
status_parser.add_argument("package", help="filter status by package base", nargs="*")
status_parser.set_defaults(handler=handlers.Status, lock=None, no_report=True, unsafe=True)
sync_parser = subparsers.add_parser('sync', description='sync packages to remote server')
sync_parser.add_argument('target', help='target to sync', nargs='*')
sync_parser = subparsers.add_parser("sync", description="sync packages to remote server")
sync_parser.add_argument("target", help="target to sync", nargs="*")
sync_parser.set_defaults(handler=handlers.Sync)
update_parser = subparsers.add_parser('update', description='run updates')
update_parser.add_argument('package', help='filter check by package base', nargs='*')
update_parser = subparsers.add_parser("update", description="run updates")
update_parser.add_argument("package", help="filter check by package base", nargs="*")
update_parser.add_argument(
'--dry-run', help='just perform check for updates, same as check command', action='store_true')
update_parser.add_argument('--no-aur', help='do not check for AUR updates. Implies --no-vcs', action='store_true')
update_parser.add_argument('--no-manual', help='do not include manual updates', action='store_true')
update_parser.add_argument('--no-vcs', help='do not check VCS packages', action='store_true')
"--dry-run", help="just perform check for updates, same as check command", action="store_true")
update_parser.add_argument("--no-aur", help="do not check for AUR updates. Implies --no-vcs", action="store_true")
update_parser.add_argument("--no-manual", help="do not include manual updates", action="store_true")
update_parser.add_argument("--no-vcs", help="do not check VCS packages", action="store_true")
update_parser.set_defaults(handler=handlers.Update)
web_parser = subparsers.add_parser('web', description='start web server')
web_parser = subparsers.add_parser("web", description="start web server")
web_parser.set_defaults(handler=handlers.Web, lock=None, no_report=True)
args = parser.parse_args()
if 'handler' not in args:
if "handler" not in args:
parser.print_help()
sys.exit(1)

View File

@ -32,30 +32,30 @@ from ahriman.models.package import Package
class Application:
'''
"""
base application class
:ivar architecture: repository architecture
:ivar config: configuration instance
:ivar logger: application logger
:ivar repository: repository instance
'''
"""
def __init__(self, architecture: str, config: Configuration) -> None:
'''
"""
default constructor
:param architecture: repository architecture
:param config: configuration instance
'''
self.logger = logging.getLogger('root')
"""
self.logger = logging.getLogger("root")
self.config = config
self.architecture = architecture
self.repository = Repository(architecture, config)
def _known_packages(self) -> Set[str]:
'''
"""
load packages from repository and pacman repositories
:return: list of known packages
'''
"""
known_packages: Set[str] = set()
# local set
for package in self.repository.packages():
@ -64,15 +64,15 @@ class Application:
return known_packages
def _finalize(self) -> None:
'''
"""
generate report and sync to remote server
'''
"""
self.report()
self.sync()
def get_updates(self, filter_packages: List[str], no_aur: bool, no_manual: bool, no_vcs: bool,
log_fn: Callable[[str], None]) -> List[Package]:
'''
"""
get list of packages to run update process
:param filter_packages: do not check every package just specified in the list
:param no_aur: do not check for aur updates
@ -80,7 +80,7 @@ class Application:
:param no_vcs: do not check VCS packages
:param log_fn: logger function to log updates
:return: list of out-of-dated packages
'''
"""
updates = []
if not no_aur:
@ -89,16 +89,16 @@ class Application:
updates.extend(self.repository.updates_manual())
for package in updates:
log_fn(f'{package.base} = {package.version}')
log_fn(f"{package.base} = {package.version}")
return updates
def add(self, names: Iterable[str], without_dependencies: bool) -> None:
'''
"""
add packages for the next build
:param names: list of package bases to add
:param without_dependencies: if set, dependency check will be disabled
'''
"""
known_packages = self._known_packages()
def add_directory(path: str) -> None:
@ -107,7 +107,7 @@ class Application:
add_manual(full_path)
def add_manual(name: str) -> str:
package = Package.load(name, self.repository.pacman, self.config.get('alpm', 'aur_url'))
package = Package.load(name, self.repository.pacman, self.config.get("alpm", "aur_url"))
path = os.path.join(self.repository.paths.manual, package.base)
Task.fetch(path, package.git_url)
return path
@ -135,14 +135,14 @@ class Application:
process_single(name)
def clean(self, no_build: bool, no_cache: bool, no_chroot: bool, no_manual: bool, no_packages: bool) -> None:
'''
"""
run all clean methods. Warning: some functions might not be available under non-root
:param no_build: do not clear directory with package sources
:param no_cache: do not clear directory with package caches
:param no_chroot: do not clear build chroot
:param no_manual: do not clear directory with manually added packages
:param no_packages: do not clear directory with built packages
'''
"""
if not no_build:
self.repository.clear_build()
if not no_cache:
@ -155,34 +155,34 @@ class Application:
self.repository.clear_packages()
def remove(self, names: Iterable[str]) -> None:
'''
"""
remove packages from repository
:param names: list of packages (either base or name) to remove
'''
"""
self.repository.process_remove(names)
self._finalize()
def report(self, target: Optional[Iterable[str]] = None) -> None:
'''
"""
generate report
:param target: list of targets to run (e.g. html)
'''
"""
targets = target or None
self.repository.process_report(targets)
def sync(self, target: Optional[Iterable[str]] = None) -> None:
'''
"""
sync to remote server
:param target: list of targets to run (e.g. s3)
'''
"""
targets = target or None
self.repository.process_sync(targets)
def update(self, updates: Iterable[Package]) -> None:
'''
"""
run package updates
:param updates: list of packages to update
'''
"""
def process_update(paths: Iterable[str]) -> None:
self.repository.process_update(paths)
self._finalize()
@ -195,6 +195,6 @@ class Application:
tree = Tree()
tree.load(updates)
for num, level in enumerate(tree.levels()):
self.logger.info(f'processing level #{num} {[package.base for package in level]}')
self.logger.info(f"processing level #{num} {[package.base for package in level]}")
packages = self.repository.process_build(level)
process_update(packages)

View File

@ -27,16 +27,16 @@ from ahriman.core.configuration import Configuration
class Add(Handler):
'''
"""
add packages handler
'''
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
Application(architecture, config).add(args.package, args.without_dependencies)

View File

@ -27,17 +27,17 @@ from ahriman.core.configuration import Configuration
class Clean(Handler):
'''
"""
clean caches handler
'''
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
Application(architecture, config).clean(args.no_build, args.no_cache, args.no_chroot,
args.no_manual, args.no_packages)

View File

@ -26,21 +26,21 @@ from ahriman.core.configuration import Configuration
class Dump(Handler):
'''
"""
dump config handler
'''
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
config_dump = config.dump(architecture)
for section, values in sorted(config_dump.items()):
print(f'[{section}]')
print(f"[{section}]")
for key, value in sorted(values.items()):
print(f'{key} = {value}')
print(f"{key} = {value}")
print()

View File

@ -30,34 +30,34 @@ from ahriman.core.configuration import Configuration
class Handler:
'''
"""
base handler class for command callbacks
'''
"""
@classmethod
def _call(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> bool:
'''
"""
additional function to wrap all calls for multiprocessing library
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
:return: True on success, False otherwise
'''
"""
try:
with Lock(args, architecture, config):
cls.run(args, architecture, config)
return True
except Exception:
logging.getLogger('root').exception('process exception', exc_info=True)
logging.getLogger("root").exception("process exception", exc_info=True)
return False
@classmethod
def execute(cls: Type[Handler], args: argparse.Namespace) -> int:
'''
"""
execute function for all aru
:param args: command line args
:return: 0 on success, 1 otherwise
'''
"""
configuration = Configuration.from_path(args.config, not args.no_log)
with Pool(len(args.architecture)) as pool:
result = pool.starmap(
@ -66,10 +66,10 @@ class Handler:
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
raise NotImplementedError

View File

@ -27,18 +27,18 @@ from ahriman.core.configuration import Configuration
class Rebuild(Handler):
'''
"""
make world handler
'''
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
application = Application(architecture, config)
packages = application.repository.packages()
application.update(packages)

View File

@ -27,16 +27,16 @@ from ahriman.core.configuration import Configuration
class Remove(Handler):
'''
"""
remove packages handler
'''
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
Application(architecture, config).remove(args.package)

View File

@ -27,16 +27,16 @@ from ahriman.core.configuration import Configuration
class Report(Handler):
'''
"""
generate report handler
'''
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
Application(architecture, config).report(args.target)

View File

@ -29,18 +29,18 @@ from ahriman.models.package import Package
class Status(Handler):
'''
"""
package status handler
'''
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
application = Application(architecture, config)
if args.ahriman:
ahriman = application.repository.reporter.get_self()
@ -54,5 +54,5 @@ class Status(Handler):
packages = application.repository.reporter.get(None)
for package, package_status in sorted(packages, key=lambda item: item[0].base):
print(package.pretty_print())
print(f'\t{package.version}')
print(f'\t{package_status.pretty_print()}')
print(f"\t{package.version}")
print(f"\t{package_status.pretty_print()}")

View File

@ -27,16 +27,16 @@ from ahriman.core.configuration import Configuration
class Sync(Handler):
'''
"""
remove sync handler
'''
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
Application(architecture, config).sync(args.target)

View File

@ -27,18 +27,18 @@ from ahriman.core.configuration import Configuration
class Update(Handler):
'''
"""
package update handler
'''
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
# typing workaround
def log_fn(line: str) -> None:
return print(line) if args.dry_run else application.logger.info(line)

View File

@ -26,18 +26,18 @@ from ahriman.core.configuration import Configuration
class Web(Handler):
'''
"""
web server handler
'''
"""
@classmethod
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
callback for command line
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
"""
from ahriman.web.web import run_server, setup_service
application = setup_service(architecture, config)
run_server(application, architecture)

View File

@ -32,31 +32,31 @@ from ahriman.models.build_status import BuildStatusEnum
class Lock:
'''
"""
wrapper for application lock file
:ivar force: remove lock file on start if any
:ivar path: path to lock file if any
:ivar reporter: build status reporter instance
:ivar root: repository root (i.e. ahriman home)
:ivar unsafe: skip user check
'''
"""
def __init__(self, args: argparse.Namespace, architecture: str, config: Configuration) -> None:
'''
"""
default constructor
:param args: command line args
:param architecture: repository architecture
:param config: configuration instance
'''
self.path = f'{args.lock}_{architecture}' if args.lock is not None else None
"""
self.path = f"{args.lock}_{architecture}" if args.lock is not None else None
self.force = args.force
self.unsafe = args.unsafe
self.root = config.get('repository', 'root')
self.root = config.get("repository", "root")
self.reporter = Client() if args.no_report else Client.load(architecture, config)
def __enter__(self) -> Lock:
'''
"""
default workflow is the following:
check user UID
@ -64,7 +64,7 @@ class Lock:
check if there is lock file
create lock file
report to web if enabled
'''
"""
self.check_user()
if self.force:
self.remove()
@ -75,31 +75,31 @@ class Lock:
def __exit__(self, exc_type: Optional[Type[Exception]], exc_val: Optional[Exception],
exc_tb: TracebackType) -> Literal[False]:
'''
"""
remove lock file when done
:param exc_type: exception type name if any
:param exc_val: exception raised if any
:param exc_tb: exception traceback if any
:return: always False (do not suppress any exception)
'''
"""
self.remove()
status = BuildStatusEnum.Success if exc_val is None else BuildStatusEnum.Failed
self.reporter.update_self(status)
return False
def check(self) -> None:
'''
"""
check if lock file exists, raise exception if it does
'''
"""
if self.path is None:
return
if os.path.exists(self.path):
raise DuplicateRun()
def check_user(self) -> None:
'''
"""
check if current user is actually owner of ahriman root
'''
"""
if self.unsafe:
return
current_uid = os.getuid()
@ -108,17 +108,17 @@ class Lock:
raise UnsafeRun(current_uid, root_uid)
def create(self) -> None:
'''
"""
create lock file
'''
"""
if self.path is None:
return
open(self.path, 'w').close()
open(self.path, "w").close()
def remove(self) -> None:
'''
"""
remove lock file
'''
"""
if self.path is None:
return
if os.path.exists(self.path):

View File

@ -24,27 +24,27 @@ from ahriman.core.configuration import Configuration
class Pacman:
'''
"""
alpm wrapper
:ivar handle: pyalpm root `Handle`
'''
"""
def __init__(self, config: Configuration) -> None:
'''
"""
default constructor
:param config: configuration instance
'''
root = config.get('alpm', 'root')
pacman_root = config.get('alpm', 'database')
"""
root = config.get("alpm", "root")
pacman_root = config.get("alpm", "database")
self.handle = Handle(root, pacman_root)
for repository in config.getlist('alpm', 'repositories'):
for repository in config.getlist("alpm", "repositories"):
self.handle.register_syncdb(repository, 0) # 0 is pgp_level
def all_packages(self) -> List[str]:
'''
"""
get list of packages known for alpm
:return: list of package names
'''
"""
result: Set[str] = set()
for database in self.handle.get_syncdbs():
result.update({package.name for package in database.pkgcache})

View File

@ -28,56 +28,58 @@ from ahriman.models.repository_paths import RepositoryPaths
class Repo:
'''
"""
repo-add and repo-remove wrapper
:ivar logger: class logger
:ivar name: repository name
:ivar paths: repository paths instance
:ivar sign_args: additional args which have to be used to sign repository archive
'''
"""
_check_output = check_output
def __init__(self, name: str, paths: RepositoryPaths, sign_args: List[str]) -> None:
'''
"""
default constructor
:param name: repository name
:param paths: repository paths instance
:param sign_args: additional args which have to be used to sign repository archive
'''
self.logger = logging.getLogger('build_details')
"""
self.logger = logging.getLogger("build_details")
self.name = name
self.paths = paths
self.sign_args = sign_args
@property
def repo_path(self) -> str:
'''
"""
:return: path to repository database
'''
return os.path.join(self.paths.repository, f'{self.name}.db.tar.gz')
"""
return os.path.join(self.paths.repository, f"{self.name}.db.tar.gz")
def add(self, path: str) -> None:
'''
"""
add new package to repository
:param path: path to archive to add
'''
check_output(
'repo-add', *self.sign_args, '-R', self.repo_path, path,
"""
Repo._check_output(
"repo-add", *self.sign_args, "-R", self.repo_path, path,
exception=BuildFailed(path),
cwd=self.paths.repository,
logger=self.logger)
def remove(self, package: str) -> None:
'''
"""
remove package from repository
:param package: package name to remove
'''
"""
# remove package and signature (if any) from filesystem
for fn in filter(lambda f: f.startswith(package), os.listdir(self.paths.repository)):
full_path = os.path.join(self.paths.repository, fn)
os.remove(full_path)
# remove package from registry
check_output(
'repo-remove', *self.sign_args, self.repo_path, package,
Repo._check_output(
"repo-remove", *self.sign_args, self.repo_path, package,
exception=BuildFailed(package),
cwd=self.paths.repository,
logger=self.logger)

View File

@ -31,92 +31,94 @@ from ahriman.models.repository_paths import RepositoryPaths
class Task:
'''
"""
base package build task
:ivar build_logger: logger for build process
:ivar logger: class logger
:ivar package: package definitions
:ivar paths: repository paths instance
'''
"""
_check_output = check_output
def __init__(self, package: Package, architecture: str, config: Configuration, paths: RepositoryPaths) -> None:
'''
"""
default constructor
:param package: package definitions
:param architecture: repository architecture
:param config: configuration instance
:param paths: repository paths instance
'''
self.logger = logging.getLogger('builder')
self.build_logger = logging.getLogger('build_details')
"""
self.logger = logging.getLogger("builder")
self.build_logger = logging.getLogger("build_details")
self.package = package
self.paths = paths
section = config.get_section_name('build', architecture)
self.archbuild_flags = config.getlist(section, 'archbuild_flags')
self.build_command = config.get(section, 'build_command')
self.makepkg_flags = config.getlist(section, 'makepkg_flags')
self.makechrootpkg_flags = config.getlist(section, 'makechrootpkg_flags')
section = config.get_section_name("build", architecture)
self.archbuild_flags = config.getlist(section, "archbuild_flags")
self.build_command = config.get(section, "build_command")
self.makepkg_flags = config.getlist(section, "makepkg_flags")
self.makechrootpkg_flags = config.getlist(section, "makechrootpkg_flags")
@property
def cache_path(self) -> str:
'''
"""
:return: path to cached packages
'''
"""
return os.path.join(self.paths.cache, self.package.base)
@property
def git_path(self) -> str:
'''
"""
:return: path to clone package from git
'''
"""
return os.path.join(self.paths.sources, self.package.base)
@staticmethod
def fetch(local: str, remote: str, branch: str = 'master') -> None:
'''
def fetch(local: str, remote: str, branch: str = "master") -> None:
"""
either clone repository or update it to origin/`branch`
:param local: local path to fetch
:param remote: remote target (from where to fetch)
:param branch: branch name to checkout, master by default
'''
logger = logging.getLogger('build_details')
"""
logger = logging.getLogger("build_details")
# local directory exists and there is .git directory
if os.path.isdir(os.path.join(local, '.git')):
check_output('git', 'fetch', 'origin', branch, exception=None, cwd=local, logger=logger)
if os.path.isdir(os.path.join(local, ".git")):
Task._check_output("git", "fetch", "origin", branch, exception=None, cwd=local, logger=logger)
else:
check_output('git', 'clone', remote, local, exception=None, logger=logger)
Task._check_output("git", "clone", remote, local, exception=None, logger=logger)
# and now force reset to our branch
check_output('git', 'reset', '--hard', f'origin/{branch}', exception=None, cwd=local, logger=logger)
Task._check_output("git", "reset", "--hard", f"origin/{branch}", exception=None, cwd=local, logger=logger)
def build(self) -> List[str]:
'''
"""
run package build
:return: paths of produced packages
'''
cmd = [self.build_command, '-r', self.paths.chroot]
"""
cmd = [self.build_command, "-r", self.paths.chroot]
cmd.extend(self.archbuild_flags)
cmd.extend(['--'] + self.makechrootpkg_flags)
cmd.extend(['--'] + self.makepkg_flags)
self.logger.info(f'using {cmd} for {self.package.base}')
cmd.extend(["--"] + self.makechrootpkg_flags)
cmd.extend(["--"] + self.makepkg_flags)
self.logger.info(f"using {cmd} for {self.package.base}")
check_output(
Task._check_output(
*cmd,
exception=BuildFailed(self.package.base),
cwd=self.git_path,
logger=self.build_logger)
# well it is not actually correct, but we can deal with it
return check_output('makepkg', '--packagelist',
return Task._check_output("makepkg", "--packagelist",
exception=BuildFailed(self.package.base),
cwd=self.git_path,
logger=self.build_logger).splitlines()
def init(self, path: Optional[str] = None) -> None:
'''
"""
fetch package from git
:param path: optional local path to fetch. If not set default path will be used
'''
"""
git_path = path or self.git_path
if os.path.isdir(self.cache_path):
# no need to clone whole repository, just copy from cache first

View File

@ -28,54 +28,54 @@ from typing import Dict, List, Optional, Type
class Configuration(configparser.RawConfigParser):
'''
"""
extension for built-in configuration parser
:ivar path: path to root configuration file
:cvar ARCHITECTURE_SPECIFIC_SECTIONS: known sections which can be architecture specific (required by dump)
:cvar DEFAULT_LOG_FORMAT: default log format (in case of fallback)
:cvar DEFAULT_LOG_LEVEL: default log level (in case of fallback)
:cvar STATIC_SECTIONS: known sections which are not architecture specific (required by dump)
'''
"""
DEFAULT_LOG_FORMAT = '[%(levelname)s %(asctime)s] [%(filename)s:%(lineno)d] [%(funcName)s]: %(message)s'
DEFAULT_LOG_FORMAT = "[%(levelname)s %(asctime)s] [%(filename)s:%(lineno)d] [%(funcName)s]: %(message)s"
DEFAULT_LOG_LEVEL = logging.DEBUG
STATIC_SECTIONS = ['alpm', 'report', 'repository', 'settings', 'upload']
ARCHITECTURE_SPECIFIC_SECTIONS = ['build', 'html', 'rsync', 's3', 'sign', 'web']
STATIC_SECTIONS = ["alpm", "report", "repository", "settings", "upload"]
ARCHITECTURE_SPECIFIC_SECTIONS = ["build", "html", "rsync", "s3", "sign", "web"]
def __init__(self) -> None:
'''
"""
default constructor. In the most cases must not be called directly
'''
"""
configparser.RawConfigParser.__init__(self, allow_no_value=True)
self.path: Optional[str] = None
@property
def include(self) -> str:
'''
"""
:return: path to directory with configuration includes
'''
return self.get('settings', 'include')
"""
return self.get("settings", "include")
@classmethod
def from_path(cls: Type[Configuration], path: str, logfile: bool) -> Configuration:
'''
"""
constructor with full object initialization
:param path: path to root configuration file
:param logfile: use log file to output messages
:return: configuration instance
'''
"""
config = cls()
config.load(path)
config.load_logging(logfile)
return config
def dump(self, architecture: str) -> Dict[str, Dict[str, str]]:
'''
"""
dump configuration to dictionary
:param architecture: repository architecture
:return: configuration dump for specific architecture
'''
"""
result: Dict[str, Dict[str, str]] = {}
for section in Configuration.STATIC_SECTIONS:
if not self.has_section(section):
@ -90,57 +90,57 @@ class Configuration(configparser.RawConfigParser):
return result
def getlist(self, section: str, key: str) -> List[str]:
'''
"""
get space separated string list option
:param section: section name
:param key: key name
:return: list of string if option is set, empty list otherwise
'''
"""
raw = self.get(section, key, fallback=None)
if not raw: # empty string or none
return []
return raw.split()
def get_section_name(self, prefix: str, suffix: str) -> str:
'''
"""
check if there is `prefix`_`suffix` section and return it on success. Return `prefix` otherwise
:param prefix: section name prefix
:param suffix: section name suffix (e.g. architecture name)
:return: found section name
'''
probe = f'{prefix}_{suffix}'
"""
probe = f"{prefix}_{suffix}"
return probe if self.has_section(probe) else prefix
def load(self, path: str) -> None:
'''
"""
fully load configuration
:param path: path to root configuration file
'''
"""
self.path = path
self.read(self.path)
self.load_includes()
def load_includes(self) -> None:
'''
"""
load configuration includes
'''
"""
try:
for conf in filter(lambda p: p.endswith('.ini'), sorted(os.listdir(self.include))):
for conf in filter(lambda p: p.endswith(".ini"), sorted(os.listdir(self.include))):
self.read(os.path.join(self.include, conf))
except (FileNotFoundError, configparser.NoOptionError):
pass
def load_logging(self, logfile: bool) -> None:
'''
"""
setup logging settings from configuration
:param logfile: use log file to output messages
'''
"""
def file_logger() -> None:
try:
fileConfig(self.get('settings', 'logging'))
fileConfig(self.get("settings", "logging"))
except PermissionError:
console_logger()
logging.error('could not create logfile, fallback to stderr', exc_info=True)
logging.error("could not create logfile, fallback to stderr", exc_info=True)
def console_logger() -> None:
logging.basicConfig(filename=None, format=Configuration.DEFAULT_LOG_FORMAT,

View File

@ -21,103 +21,103 @@ from typing import Any
class BuildFailed(Exception):
'''
"""
base exception for failed builds
'''
"""
def __init__(self, package: str) -> None:
'''
"""
default constructor
:param package: package base raised exception
'''
Exception.__init__(self, f'Package {package} build failed, check logs for details')
"""
Exception.__init__(self, f"Package {package} build failed, check logs for details")
class DuplicateRun(Exception):
'''
"""
exception which will be raised if there is another application instance
'''
"""
def __init__(self) -> None:
'''
"""
default constructor
'''
Exception.__init__(self, 'Another application instance is run')
"""
Exception.__init__(self, "Another application instance is run")
class InitializeException(Exception):
'''
"""
base service initialization exception
'''
"""
def __init__(self) -> None:
'''
"""
default constructor
'''
Exception.__init__(self, 'Could not load service')
"""
Exception.__init__(self, "Could not load service")
class InvalidOption(Exception):
'''
"""
exception which will be raised on configuration errors
'''
"""
def __init__(self, value: Any) -> None:
'''
"""
default constructor
:param value: option value
'''
Exception.__init__(self, f'Invalid or unknown option value `{value}`')
"""
Exception.__init__(self, f"Invalid or unknown option value `{value}`")
class InvalidPackageInfo(Exception):
'''
"""
exception which will be raised on package load errors
'''
"""
def __init__(self, details: Any) -> None:
'''
"""
default constructor
:param details: error details
'''
Exception.__init__(self, f'There are errors during reading package information: `{details}`')
"""
Exception.__init__(self, f"There are errors during reading package information: `{details}`")
class ReportFailed(Exception):
'''
"""
report generation exception
'''
"""
def __init__(self) -> None:
'''
"""
default constructor
'''
Exception.__init__(self, 'Report failed')
"""
Exception.__init__(self, "Report failed")
class SyncFailed(Exception):
'''
"""
remote synchronization exception
'''
"""
def __init__(self) -> None:
'''
"""
default constructor
'''
Exception.__init__(self, 'Sync failed')
"""
Exception.__init__(self, "Sync failed")
class UnsafeRun(Exception):
'''
"""
exception which will be raised in case if user is not owner of repository
'''
"""
def __init__(self, current_uid: int, root_uid: int) -> None:
'''
"""
default constructor
'''
"""
Exception.__init__(
self,
f'''Current UID {current_uid} differs from root owner {root_uid}.
f"""Current UID {current_uid} differs from root owner {root_uid}.
Note that for the most actions it is unsafe to run application as different user.
If you are 100% sure that it must be there try --unsafe option''')
If you are 100% sure that it must be there try --unsafe option""")

View File

@ -30,7 +30,7 @@ from ahriman.models.sign_settings import SignSettings
class HTML(Report):
'''
"""
html report generator
It uses jinja2 templates for report generation, the following variables are allowed:
@ -50,33 +50,33 @@ class HTML(Report):
:ivar report_path: output path to html report
:ivar sign_targets: targets to sign enabled in configuration
:ivar tempate_path: path to directory with jinja templates
'''
"""
def __init__(self, architecture: str, config: Configuration) -> None:
'''
"""
default constructor
:param architecture: repository architecture
:param config: configuration instance
'''
"""
Report.__init__(self, architecture, config)
section = config.get_section_name('html', architecture)
self.report_path = config.get(section, 'path')
self.link_path = config.get(section, 'link_path')
self.template_path = config.get(section, 'template_path')
section = config.get_section_name("html", architecture)
self.report_path = config.get(section, "path")
self.link_path = config.get(section, "link_path")
self.template_path = config.get(section, "template_path")
# base template vars
self.homepage = config.get(section, 'homepage', fallback=None)
self.name = config.get('repository', 'name')
self.homepage = config.get(section, "homepage", fallback=None)
self.name = config.get("repository", "name")
sign_section = config.get_section_name('sign', architecture)
self.sign_targets = [SignSettings.from_option(opt) for opt in config.getlist(sign_section, 'target')]
self.pgp_key = config.get(sign_section, 'key') if self.sign_targets else None
sign_section = config.get_section_name("sign", architecture)
self.sign_targets = [SignSettings.from_option(opt) for opt in config.getlist(sign_section, "target")]
self.pgp_key = config.get(sign_section, "key") if self.sign_targets else None
def generate(self, packages: Iterable[Package]) -> None:
'''
"""
generate report for the specified packages
:param packages: list of packages to generate report
'''
"""
# idea comes from https://stackoverflow.com/a/38642558
templates_dir, template_name = os.path.split(self.template_path)
loader = jinja2.FileSystemLoader(searchpath=templates_dir)
@ -85,15 +85,15 @@ class HTML(Report):
content = [
{
'archive_size': pretty_size(properties.archive_size),
'build_date': pretty_datetime(properties.build_date),
'filename': properties.filename,
'installed_size': pretty_size(properties.installed_size),
'name': package,
'version': base.version
"archive_size": pretty_size(properties.archive_size),
"build_date": pretty_datetime(properties.build_date),
"filename": properties.filename,
"installed_size": pretty_size(properties.installed_size),
"name": package,
"version": base.version
} for base in packages for package, properties in base.packages.items()
]
comparator: Callable[[Dict[str, str]], str] = lambda item: item['filename']
comparator: Callable[[Dict[str, str]], str] = lambda item: item["filename"]
html = template.render(
homepage=self.homepage,
@ -104,5 +104,5 @@ class HTML(Report):
pgp_key=self.pgp_key,
repository=self.name)
with open(self.report_path, 'w') as out:
with open(self.report_path, "w") as out:
out.write(html)

View File

@ -28,32 +28,32 @@ from ahriman.models.report_settings import ReportSettings
class Report:
'''
"""
base report generator
:ivar architecture: repository architecture
:ivar config: configuration instance
:ivar logger: class logger
'''
"""
def __init__(self, architecture: str, config: Configuration) -> None:
'''
"""
default constructor
:param architecture: repository architecture
:param config: configuration instance
'''
self.logger = logging.getLogger('builder')
"""
self.logger = logging.getLogger("builder")
self.architecture = architecture
self.config = config
@staticmethod
def run(architecture: str, config: Configuration, target: str, packages: Iterable[Package]) -> None:
'''
"""
run report generation
:param architecture: repository architecture
:param config: configuration instance
:param target: target to generate report (e.g. html)
:param packages: list of packages to generate report
'''
"""
provider = ReportSettings.from_option(target)
if provider == ReportSettings.HTML:
from ahriman.core.report.html import HTML
@ -64,11 +64,11 @@ class Report:
try:
report.generate(packages)
except Exception:
report.logger.exception('report generation failed', exc_info=True)
report.logger.exception("report generation failed", exc_info=True)
raise ReportFailed()
def generate(self, packages: Iterable[Package]) -> None:
'''
"""
generate report for the specified packages
:param packages: list of packages to generate report
'''
"""

View File

@ -26,53 +26,53 @@ from ahriman.core.repository.properties import Properties
class Cleaner(Properties):
'''
"""
trait to clean common repository objects
'''
"""
def packages_built(self) -> List[str]:
'''
"""
get list of files in built packages directory
:return: list of filenames from the directory
'''
"""
raise NotImplementedError
def clear_build(self) -> None:
'''
"""
clear sources directory
'''
self.logger.info('clear package sources directory')
"""
self.logger.info("clear package sources directory")
for package in os.listdir(self.paths.sources):
shutil.rmtree(os.path.join(self.paths.sources, package))
def clear_cache(self) -> None:
'''
"""
clear cache directory
'''
self.logger.info('clear packages sources cache directory')
"""
self.logger.info("clear packages sources cache directory")
for package in os.listdir(self.paths.cache):
shutil.rmtree(os.path.join(self.paths.cache, package))
def clear_chroot(self) -> None:
'''
"""
clear cache directory. Warning: this method is architecture independent and will clear every chroot
'''
self.logger.info('clear build chroot directory')
"""
self.logger.info("clear build chroot directory")
for chroot in os.listdir(self.paths.chroot):
shutil.rmtree(os.path.join(self.paths.chroot, chroot))
def clear_manual(self) -> None:
'''
"""
clear directory with manual package updates
'''
self.logger.info('clear manual packages')
"""
self.logger.info("clear manual packages")
for package in os.listdir(self.paths.manual):
shutil.rmtree(os.path.join(self.paths.manual, package))
def clear_packages(self) -> None:
'''
"""
clear directory with built packages (NOT repository itself)
'''
self.logger.info('clear built packages directory')
"""
self.logger.info("clear built packages directory")
for package in self.packages_built():
os.remove(package)

View File

@ -30,23 +30,23 @@ from ahriman.models.package import Package
class Executor(Cleaner):
'''
"""
trait for common repository update processes
'''
"""
def packages(self) -> List[Package]:
'''
"""
generate list of repository packages
:return: list of packages properties
'''
"""
raise NotImplementedError
def process_build(self, updates: Iterable[Package]) -> List[str]:
'''
"""
build packages
:param updates: list of packages properties to build
:return: `packages_built`
'''
"""
def build_single(package: Package) -> None:
self.reporter.set_building(package.base)
task = Task(package, self.architecture, self.config, self.paths)
@ -61,23 +61,23 @@ class Executor(Cleaner):
build_single(package)
except Exception:
self.reporter.set_failed(package.base)
self.logger.exception(f'{package.base} ({self.architecture}) build exception', exc_info=True)
self.logger.exception(f"{package.base} ({self.architecture}) build exception", exc_info=True)
continue
self.clear_build()
return self.packages_built()
def process_remove(self, packages: Iterable[str]) -> str:
'''
"""
remove packages from list
:param packages: list of package names or bases to rmeove
:return: path to repository database
'''
"""
def remove_single(package: str) -> None:
try:
self.repo.remove(package)
except Exception:
self.logger.exception(f'could not remove {package}', exc_info=True)
self.logger.exception(f"could not remove {package}", exc_info=True)
requested = set(packages)
for local in self.packages():
@ -94,34 +94,34 @@ class Executor(Cleaner):
return self.repo.repo_path
def process_report(self, targets: Optional[Iterable[str]]) -> None:
'''
"""
generate reports
:param targets: list of targets to generate reports. Configuration option will be used if it is not set
'''
"""
if targets is None:
targets = self.config.getlist('report', 'target')
targets = self.config.getlist("report", "target")
for target in targets:
Report.run(self.architecture, self.config, target, self.packages())
def process_sync(self, targets: Optional[Iterable[str]]) -> None:
'''
"""
process synchronization to remote servers
:param targets: list of targets to sync. Configuration option will be used if it is not set
'''
"""
if targets is None:
targets = self.config.getlist('upload', 'target')
targets = self.config.getlist("upload", "target")
for target in targets:
Uploader.run(self.architecture, self.config, target, self.paths.repository)
def process_update(self, packages: Iterable[str]) -> str:
'''
"""
sign packages, add them to repository and update repository database
:param packages: list of filenames to run
:return: path to repository database
'''
"""
def update_single(fn: Optional[str], base: str) -> None:
if fn is None:
self.logger.warning(f'received empty package name for base {base}')
self.logger.warning(f"received empty package name for base {base}")
return # suppress type checking, it never can be none actually
# in theory it might be NOT packages directory, but we suppose it is
full_path = os.path.join(self.paths.packages, fn)
@ -145,7 +145,7 @@ class Executor(Cleaner):
self.reporter.set_success(local)
except Exception:
self.reporter.set_failed(local.base)
self.logger.exception(f'could not process {local.base}', exc_info=True)
self.logger.exception(f"could not process {local.base}", exc_info=True)
self.clear_packages()
return self.repo.repo_path

View File

@ -28,7 +28,7 @@ from ahriman.models.repository_paths import RepositoryPaths
class Properties:
'''
"""
repository internal objects holder
:ivar architecture: repository architecture
:ivar aur_url: base AUR url
@ -40,17 +40,17 @@ class Properties:
:ivar repo: repo commands wrapper instance
:ivar reporter: build status reporter instance
:ivar sign: GPG wrapper instance
'''
"""
def __init__(self, architecture: str, config: Configuration) -> None:
self.logger = logging.getLogger('builder')
self.logger = logging.getLogger("builder")
self.architecture = architecture
self.config = config
self.aur_url = config.get('alpm', 'aur_url')
self.name = config.get('repository', 'name')
self.aur_url = config.get("alpm", "aur_url")
self.name = config.get("repository", "name")
self.paths = RepositoryPaths(config.get('repository', 'root'), architecture)
self.paths = RepositoryPaths(config.get("repository", "root"), architecture)
self.paths.create_tree()
self.pacman = Pacman(config)

View File

@ -28,15 +28,15 @@ from ahriman.models.package import Package
class Repository(Executor, UpdateHandler):
'''
"""
base repository control class
'''
"""
def packages(self) -> List[Package]:
'''
"""
generate list of repository packages
:return: list of packages properties
'''
"""
result: Dict[str, Package] = {}
for fn in os.listdir(self.paths.repository):
if not package_like(fn):
@ -46,15 +46,15 @@ class Repository(Executor, UpdateHandler):
local = Package.load(full_path, self.pacman, self.aur_url)
result.setdefault(local.base, local).packages.update(local.packages)
except Exception:
self.logger.exception(f'could not load package from {fn}', exc_info=True)
self.logger.exception(f"could not load package from {fn}", exc_info=True)
continue
return list(result.values())
def packages_built(self) -> List[str]:
'''
"""
get list of files in built packages directory
:return: list of filenames from the directory
'''
"""
return [
os.path.join(self.paths.packages, fn)
for fn in os.listdir(self.paths.packages)

View File

@ -26,28 +26,28 @@ from ahriman.models.package import Package
class UpdateHandler(Cleaner):
'''
"""
trait to get package update list
'''
"""
def packages(self) -> List[Package]:
'''
"""
generate list of repository packages
:return: list of packages properties
'''
"""
raise NotImplementedError
def updates_aur(self, filter_packages: Iterable[str], no_vcs: bool) -> List[Package]:
'''
"""
check AUR for updates
:param filter_packages: do not check every package just specified in the list
:param no_vcs: do not check VCS packages
:return: list of packages which are out-of-dated
'''
"""
result: List[Package] = []
build_section = self.config.get_section_name('build', self.architecture)
ignore_list = self.config.getlist(build_section, 'ignore_packages')
build_section = self.config.get_section_name("build", self.architecture)
ignore_list = self.config.getlist(build_section, "ignore_packages")
for local in self.packages():
if local.base in ignore_list:
@ -64,16 +64,16 @@ class UpdateHandler(Cleaner):
result.append(remote)
except Exception:
self.reporter.set_failed(local.base)
self.logger.exception(f'could not load remote package {local.base}', exc_info=True)
self.logger.exception(f"could not load remote package {local.base}", exc_info=True)
continue
return result
def updates_manual(self) -> List[Package]:
'''
"""
check for packages for which manual update has been requested
:return: list of packages which are out-of-dated
'''
"""
result: List[Package] = []
known_bases = {package.base for package in self.packages()}
@ -86,7 +86,7 @@ class UpdateHandler(Cleaner):
else:
self.reporter.set_pending(local.base)
except Exception:
self.logger.exception(f'could not add package from {fn}', exc_info=True)
self.logger.exception(f"could not add package from {fn}", exc_info=True)
self.clear_manual()
return result

View File

@ -29,79 +29,81 @@ from ahriman.models.sign_settings import SignSettings
class GPG:
'''
"""
gnupg wrapper
:ivar architecture: repository architecture
:ivar config: configuration instance
:ivar default_key: default PGP key ID to use
:ivar logger: class logger
:ivar target: list of targets to sign (repository, package etc)
'''
"""
_check_output = check_output
def __init__(self, architecture: str, config: Configuration) -> None:
'''
"""
default constructor
:param architecture: repository architecture
:param config: configuration instance
'''
self.logger = logging.getLogger('build_details')
"""
self.logger = logging.getLogger("build_details")
self.config = config
self.section = config.get_section_name('sign', architecture)
self.target = [SignSettings.from_option(opt) for opt in config.getlist(self.section, 'target')]
self.default_key = config.get(self.section, 'key') if self.target else ''
self.section = config.get_section_name("sign", architecture)
self.target = [SignSettings.from_option(opt) for opt in config.getlist(self.section, "target")]
self.default_key = config.get(self.section, "key") if self.target else ""
@property
def repository_sign_args(self) -> List[str]:
'''
"""
:return: command line arguments for repo-add command to sign database
'''
"""
if SignSettings.SignRepository not in self.target:
return []
return ['--sign', '--key', self.default_key]
return ["--sign", "--key", self.default_key]
@staticmethod
def sign_cmd(path: str, key: str) -> List[str]:
'''
"""
gpg command to run
:param path: path to file to sign
:param key: PGP key ID
:return: gpg command with all required arguments
'''
return ['gpg', '-u', key, '-b', path]
"""
return ["gpg", "-u", key, "-b", path]
def process(self, path: str, key: str) -> List[str]:
'''
"""
gpg command wrapper
:param path: path to file to sign
:param key: PGP key ID
:return: list of generated files including original file
'''
check_output(
"""
GPG._check_output(
*GPG.sign_cmd(path, key),
exception=BuildFailed(path),
cwd=os.path.dirname(path),
logger=self.logger)
return [path, f'{path}.sig']
return [path, f"{path}.sig"]
def sign_package(self, path: str, base: str) -> List[str]:
'''
"""
sign package if required by configuration
:param path: path to file to sign
:param base: package base required to check for key overrides
:return: list of generated files including original file
'''
"""
if SignSettings.SignPackages not in self.target:
return [path]
key = self.config.get(self.section, f'key_{base}', fallback=self.default_key)
key = self.config.get(self.section, f"key_{base}", fallback=self.default_key)
return self.process(path, key)
def sign_repository(self, path: str) -> List[str]:
'''
"""
sign repository if required by configuration
:note: more likely you just want to pass `repository_sign_args` to repo wrapper
:param path: path to repository database
:return: list of generated files including original file
'''
"""
if SignSettings.SignRepository not in self.target:
return [path]
return self.process(path, self.default_key)

View File

@ -29,42 +29,42 @@ from ahriman.models.package import Package
class Leaf:
'''
"""
tree leaf implementation
:ivar dependencies: list of package dependencies
:ivar package: leaf package properties
'''
"""
def __init__(self, package: Package) -> None:
'''
"""
default constructor
:param package: package properties
'''
"""
self.package = package
self.dependencies: Set[str] = set()
@property
def items(self) -> Iterable[str]:
'''
"""
:return: packages containing in this leaf
'''
"""
return self.package.packages.keys()
def is_root(self, packages: Iterable[Leaf]) -> bool:
'''
"""
check if package depends on any other package from list of not
:param packages: list of known leaves
:return: True if any of packages is dependency of the leaf, False otherwise
'''
"""
for leaf in packages:
if self.dependencies.intersection(leaf.items):
return False
return True
def load_dependencies(self) -> None:
'''
"""
load dependencies for the leaf
'''
"""
clone_dir = tempfile.mkdtemp()
try:
Task.fetch(clone_dir, self.package.git_url)
@ -74,22 +74,22 @@ class Leaf:
class Tree:
'''
"""
dependency tree implementation
:ivar leaves: list of tree leaves
'''
"""
def __init__(self) -> None:
'''
"""
default constructor
'''
"""
self.leaves: List[Leaf] = []
def levels(self) -> List[List[Package]]:
'''
"""
get build levels starting from the packages which do not require any other package to build
:return: list of packages lists
'''
"""
result: List[List[Package]] = []
unprocessed = self.leaves[:]
@ -100,10 +100,10 @@ class Tree:
return result
def load(self, packages: Iterable[Package]) -> None:
'''
"""
load tree from packages
:param packages: packages list
'''
"""
for package in packages:
leaf = Leaf(package)
leaf.load_dependencies()

View File

@ -23,26 +23,28 @@ from ahriman.core.util import check_output
class Rsync(Uploader):
'''
"""
rsync wrapper
:ivar remote: remote address to sync
'''
"""
_check_output = check_output
def __init__(self, architecture: str, config: Configuration) -> None:
'''
"""
default constructor
:param architecture: repository architecture
:param config: configuration instance
'''
"""
Uploader.__init__(self, architecture, config)
section = config.get_section_name('rsync', architecture)
self.remote = config.get(section, 'remote')
section = config.get_section_name("rsync", architecture)
self.remote = config.get(section, "remote")
def sync(self, path: str) -> None:
'''
"""
sync data to remote server
:param path: local path to sync
'''
check_output('rsync', '--archive', '--verbose', '--compress', '--partial', '--delete', path, self.remote,
"""
Rsync._check_output("rsync", "--archive", "--verbose", "--compress", "--partial", "--delete", path, self.remote,
exception=None,
logger=self.logger)

View File

@ -23,27 +23,29 @@ from ahriman.core.util import check_output
class S3(Uploader):
'''
"""
aws-cli wrapper
:ivar bucket: full bucket name
'''
"""
_check_output = check_output
def __init__(self, architecture: str, config: Configuration) -> None:
'''
"""
default constructor
:param architecture: repository architecture
:param config: configuration instance
'''
"""
Uploader.__init__(self, architecture, config)
section = config.get_section_name('s3', architecture)
self.bucket = config.get(section, 'bucket')
section = config.get_section_name("s3", architecture)
self.bucket = config.get(section, "bucket")
def sync(self, path: str) -> None:
'''
"""
sync data to remote server
:param path: local path to sync
'''
"""
# TODO rewrite to boto, but it is bullshit
check_output('aws', 's3', 'sync', '--quiet', '--delete', path, self.bucket,
S3._check_output("aws", "s3", "sync", "--quiet", "--delete", path, self.bucket,
exception=None,
logger=self.logger)

View File

@ -25,32 +25,32 @@ from ahriman.models.upload_settings import UploadSettings
class Uploader:
'''
"""
base remote sync class
:ivar architecture: repository architecture
:ivar config: configuration instance
:ivar logger: application logger
'''
"""
def __init__(self, architecture: str, config: Configuration) -> None:
'''
"""
default constructor
:param architecture: repository architecture
:param config: configuration instance
'''
self.logger = logging.getLogger('builder')
"""
self.logger = logging.getLogger("builder")
self.architecture = architecture
self.config = config
@staticmethod
def run(architecture: str, config: Configuration, target: str, path: str) -> None:
'''
"""
run remote sync
:param architecture: repository architecture
:param config: configuration instance
:param target: target to run sync (e.g. s3)
:param path: local path to sync
'''
"""
provider = UploadSettings.from_option(target)
if provider == UploadSettings.Rsync:
from ahriman.core.upload.rsync import Rsync
@ -64,11 +64,11 @@ class Uploader:
try:
uploader.sync(path)
except Exception:
uploader.logger.exception('remote sync failed', exc_info=True)
uploader.logger.exception("remote sync failed", exc_info=True)
raise SyncFailed()
def sync(self, path: str) -> None:
'''
"""
sync data to remote server
:param path: local path to sync
'''
"""

View File

@ -29,7 +29,7 @@ from ahriman.core.exceptions import InvalidOption
def check_output(*args: str, exception: Optional[Exception],
cwd: Optional[str] = None, stderr: int = subprocess.STDOUT,
logger: Optional[Logger] = None) -> str:
'''
"""
subprocess wrapper
:param args: command line arguments
:param exception: exception which has to be reraised instead of default subprocess exception
@ -37,58 +37,58 @@ def check_output(*args: str, exception: Optional[Exception],
:param stderr: standard error output mode
:param logger: logger to log command result if required
:return: command output
'''
"""
try:
result = subprocess.check_output(args, cwd=cwd, stderr=stderr).decode('utf8').strip()
result = subprocess.check_output(args, cwd=cwd, stderr=stderr).decode("utf8").strip()
if logger is not None:
for line in result.splitlines():
logger.debug(line)
except subprocess.CalledProcessError as e:
if e.output is not None and logger is not None:
for line in e.output.decode('utf8').splitlines():
for line in e.output.decode("utf8").splitlines():
logger.debug(line)
raise exception or e
return result
def package_like(filename: str) -> bool:
'''
"""
check if file looks like package
:param filename: name of file to check
:return: True in case if name contains `.pkg.` and not signature, False otherwise
'''
return '.pkg.' in filename and not filename.endswith('.sig')
"""
return ".pkg." in filename and not filename.endswith(".sig")
def pretty_datetime(timestamp: Optional[int]) -> str:
'''
"""
convert datetime object to string
:param timestamp: datetime to convert
:return: pretty printable datetime as string
'''
return '' if timestamp is None else datetime.datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
"""
return "" if timestamp is None else datetime.datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
def pretty_size(size: Optional[float], level: int = 0) -> str:
'''
"""
convert size to string
:param size: size to convert
:param level: represents current units, 0 is B, 1 is KiB etc
:return: pretty printable size as string
'''
"""
def str_level() -> str:
if level == 0:
return 'B'
return "B"
if level == 1:
return 'KiB'
return "KiB"
if level == 2:
return 'MiB'
return "MiB"
if level == 3:
return 'GiB'
return "GiB"
raise InvalidOption(level) # I hope it will not be more than 1024 GiB
if size is None:
return ''
return ""
if size < 1024:
return f'{round(size, 2)} {str_level()}'
return f"{round(size, 2)} {str_level()}"
return pretty_size(size / 1024, level + 1)

View File

@ -27,100 +27,100 @@ from ahriman.models.package import Package
class Client:
'''
"""
base build status reporter client
'''
"""
def add(self, package: Package, status: BuildStatusEnum) -> None:
'''
"""
add new package with status
:param package: package properties
:param status: current package build status
'''
"""
# pylint: disable=R0201
def get(self, base: Optional[str]) -> List[Tuple[Package, BuildStatus]]:
'''
"""
get package status
:param base: package base to get
:return: list of current package description and status if it has been found
'''
"""
del base
return []
# pylint: disable=R0201
def get_self(self) -> BuildStatus:
'''
"""
get ahriman status itself
:return: current ahriman status
'''
"""
return BuildStatus()
def remove(self, base: str) -> None:
'''
"""
remove packages from watcher
:param base: package base to remove
'''
"""
def update(self, base: str, status: BuildStatusEnum) -> None:
'''
"""
update package build status. Unlike `add` it does not update package properties
:param base: package base to update
:param status: current package build status
'''
"""
def update_self(self, status: BuildStatusEnum) -> None:
'''
"""
update ahriman status itself
:param status: current ahriman status
'''
"""
def set_building(self, base: str) -> None:
'''
"""
set package status to building
:param base: package base to update
'''
"""
return self.update(base, BuildStatusEnum.Building)
def set_failed(self, base: str) -> None:
'''
"""
set package status to failed
:param base: package base to update
'''
"""
return self.update(base, BuildStatusEnum.Failed)
def set_pending(self, base: str) -> None:
'''
"""
set package status to pending
:param base: package base to update
'''
"""
return self.update(base, BuildStatusEnum.Pending)
def set_success(self, package: Package) -> None:
'''
"""
set package status to success
:param package: current package properties
'''
"""
return self.add(package, BuildStatusEnum.Success)
def set_unknown(self, package: Package) -> None:
'''
"""
set package status to unknown
:param package: current package properties
'''
"""
return self.add(package, BuildStatusEnum.Unknown)
@staticmethod
def load(architecture: str, config: Configuration) -> Client:
'''
"""
load client from settings
:param architecture: repository architecture
:param config: configuration instance
:return: client according to current settings
'''
section = config.get_section_name('web', architecture)
host = config.get(section, 'host', fallback=None)
port = config.getint(section, 'port', fallback=None)
"""
section = config.get_section_name("web", architecture)
host = config.get(section, "host", fallback=None)
port = config.getint(section, "port", fallback=None)
if host is None or port is None:
return Client()

View File

@ -30,22 +30,22 @@ from ahriman.models.package import Package
class Watcher:
'''
"""
package status watcher
:ivar architecture: repository architecture
:ivar known: list of known packages. For the most cases `packages` should be used instead
:ivar logger: class logger
:ivar repository: repository object
:ivar status: daemon status
'''
"""
def __init__(self, architecture: str, config: Configuration) -> None:
'''
"""
default constructor
:param architecture: repository architecture
:param config: configuration instance
'''
self.logger = logging.getLogger('http')
"""
self.logger = logging.getLogger("http")
self.architecture = architecture
self.repository = Repository(architecture, config)
@ -55,25 +55,25 @@ class Watcher:
@property
def cache_path(self) -> str:
'''
"""
:return: path to dump with json cache
'''
return os.path.join(self.repository.paths.root, 'status_cache.json')
"""
return os.path.join(self.repository.paths.root, "status_cache.json")
@property
def packages(self) -> List[Tuple[Package, BuildStatus]]:
'''
"""
:return: list of packages together with their statuses
'''
"""
return list(self.known.values())
def _cache_load(self) -> None:
'''
"""
update current state from cache
'''
"""
def parse_single(properties: Dict[str, Any]) -> None:
package = Package.from_json(properties['package'])
status = BuildStatus.from_json(properties['status'])
package = Package.from_json(properties["package"])
status = BuildStatus.from_json(properties["status"])
if package.base in self.known:
self.known[package.base] = (package, status)
@ -81,41 +81,41 @@ class Watcher:
return
with open(self.cache_path) as cache:
dump = json.load(cache)
for item in dump['packages']:
for item in dump["packages"]:
try:
parse_single(item)
except Exception:
self.logger.exception(f'cannot parse item f{item} to package', exc_info=True)
self.logger.exception(f"cannot parse item f{item} to package", exc_info=True)
def _cache_save(self) -> None:
'''
"""
dump current cache to filesystem
'''
"""
dump = {
'packages': [
"packages": [
{
'package': package.view(),
'status': status.view()
"package": package.view(),
"status": status.view()
} for package, status in self.packages
]
}
try:
with open(self.cache_path, 'w') as cache:
with open(self.cache_path, "w") as cache:
json.dump(dump, cache)
except Exception:
self.logger.exception('cannot dump cache', exc_info=True)
self.logger.exception("cannot dump cache", exc_info=True)
def get(self, base: str) -> Tuple[Package, BuildStatus]:
'''
"""
get current package base build status
:return: package and its status
'''
"""
return self.known[base]
def load(self) -> None:
'''
"""
load packages from local repository. In case if last status is known, it will use it
'''
"""
for package in self.repository.packages():
# get status of build or assign unknown
current = self.known.get(package.base)
@ -127,20 +127,20 @@ class Watcher:
self._cache_load()
def remove(self, base: str) -> None:
'''
"""
remove package base from known list if any
:param base: package base
'''
"""
self.known.pop(base, None)
self._cache_save()
def update(self, base: str, status: BuildStatusEnum, package: Optional[Package]) -> None:
'''
"""
update package status and description
:param base: package base to update
:param status: new build status
:param package: optional new package description. In case if not set current properties will be used
'''
"""
if package is None:
package, _ = self.known[base]
full_status = BuildStatus(status)
@ -148,8 +148,8 @@ class Watcher:
self._cache_save()
def update_self(self, status: BuildStatusEnum) -> None:
'''
"""
update service status
:param status: new service status
'''
"""
self.status = BuildStatus(status)

View File

@ -28,83 +28,83 @@ from ahriman.models.package import Package
class WebClient(Client):
'''
"""
build status reporter web client
:ivar host: host of web service
:ivar logger: class logger
:ivar port: port of web service
'''
"""
def __init__(self, host: str, port: int) -> None:
'''
"""
default constructor
:param host: host of web service
:param port: port of web service
'''
self.logger = logging.getLogger('http')
"""
self.logger = logging.getLogger("http")
self.host = host
self.port = port
def _ahriman_url(self) -> str:
'''
"""
url generator
:return: full url for web service for ahriman service itself
'''
return f'http://{self.host}:{self.port}/api/v1/ahriman'
"""
return f"http://{self.host}:{self.port}/api/v1/ahriman"
def _package_url(self, base: str = '') -> str:
'''
def _package_url(self, base: str = "") -> str:
"""
url generator
:param base: package base to generate url
:return: full url of web service for specific package base
'''
return f'http://{self.host}:{self.port}/api/v1/packages/{base}'
"""
return f"http://{self.host}:{self.port}/api/v1/packages/{base}"
def add(self, package: Package, status: BuildStatusEnum) -> None:
'''
"""
add new package with status
:param package: package properties
:param status: current package build status
'''
"""
payload = {
'status': status.value,
'package': package.view()
"status": status.value,
"package": package.view()
}
try:
response = requests.post(self._package_url(package.base), json=payload)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
self.logger.exception(f'could not add {package.base}: {e.response.text}', exc_info=True)
self.logger.exception(f"could not add {package.base}: {e.response.text}", exc_info=True)
except Exception:
self.logger.exception(f'could not add {package.base}', exc_info=True)
self.logger.exception(f"could not add {package.base}", exc_info=True)
def get(self, base: Optional[str]) -> List[Tuple[Package, BuildStatus]]:
'''
"""
get package status
:param base: package base to get
:return: list of current package description and status if it has been found
'''
"""
try:
response = requests.get(self._package_url(base or ''))
response = requests.get(self._package_url(base or ""))
response.raise_for_status()
status_json = response.json()
return [
(Package.from_json(package['package']), BuildStatus.from_json(package['status']))
(Package.from_json(package["package"]), BuildStatus.from_json(package["status"]))
for package in status_json
]
except requests.exceptions.HTTPError as e:
self.logger.exception(f'could not get {base}: {e.response.text}', exc_info=True)
self.logger.exception(f"could not get {base}: {e.response.text}", exc_info=True)
except Exception:
self.logger.exception(f'could not get {base}', exc_info=True)
self.logger.exception(f"could not get {base}", exc_info=True)
return []
def get_self(self) -> BuildStatus:
'''
"""
get ahriman status itself
:return: current ahriman status
'''
"""
try:
response = requests.get(self._ahriman_url())
response.raise_for_status()
@ -112,51 +112,51 @@ class WebClient(Client):
status_json = response.json()
return BuildStatus.from_json(status_json)
except requests.exceptions.HTTPError as e:
self.logger.exception(f'could not get service status: {e.response.text}', exc_info=True)
self.logger.exception(f"could not get service status: {e.response.text}", exc_info=True)
except Exception:
self.logger.exception('could not get service status', exc_info=True)
self.logger.exception("could not get service status", exc_info=True)
return BuildStatus()
def remove(self, base: str) -> None:
'''
"""
remove packages from watcher
:param base: basename to remove
'''
"""
try:
response = requests.delete(self._package_url(base))
response.raise_for_status()
except requests.exceptions.HTTPError as e:
self.logger.exception(f'could not delete {base}: {e.response.text}', exc_info=True)
self.logger.exception(f"could not delete {base}: {e.response.text}", exc_info=True)
except Exception:
self.logger.exception(f'could not delete {base}', exc_info=True)
self.logger.exception(f"could not delete {base}", exc_info=True)
def update(self, base: str, status: BuildStatusEnum) -> None:
'''
"""
update package build status. Unlike `add` it does not update package properties
:param base: package base to update
:param status: current package build status
'''
payload = {'status': status.value}
"""
payload = {"status": status.value}
try:
response = requests.post(self._package_url(base), json=payload)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
self.logger.exception(f'could not update {base}: {e.response.text}', exc_info=True)
self.logger.exception(f"could not update {base}: {e.response.text}", exc_info=True)
except Exception:
self.logger.exception(f'could not update {base}', exc_info=True)
self.logger.exception(f"could not update {base}", exc_info=True)
def update_self(self, status: BuildStatusEnum) -> None:
'''
"""
update ahriman status itself
:param status: current ahriman status
'''
payload = {'status': status.value}
"""
payload = {"status": status.value}
try:
response = requests.post(self._ahriman_url(), json=payload)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
self.logger.exception(f'could not update service status: {e.response.text}', exc_info=True)
self.logger.exception(f"could not update service status: {e.response.text}", exc_info=True)
except Exception:
self.logger.exception('could not update service status', exc_info=True)
self.logger.exception("could not update service status", exc_info=True)

View File

@ -28,83 +28,93 @@ from ahriman.core.util import pretty_datetime
class BuildStatusEnum(Enum):
'''
"""
build status enumeration
:cvar Unknown: build status is unknown
:cvar Pending: package is out-of-dated and will be built soon
:cvar Building: package is building right now
:cvar Failed: package build failed
:cvar Success: package has been built without errors
'''
"""
Unknown = 'unknown'
Pending = 'pending'
Building = 'building'
Failed = 'failed'
Success = 'success'
Unknown = "unknown"
Pending = "pending"
Building = "building"
Failed = "failed"
Success = "success"
def badges_color(self) -> str:
'''
"""
convert itself to shield.io badges color
:return: shields.io color
'''
"""
if self == BuildStatusEnum.Pending:
return 'yellow'
return "yellow"
if self == BuildStatusEnum.Building:
return 'yellow'
return "yellow"
if self == BuildStatusEnum.Failed:
return 'critical'
return "critical"
if self == BuildStatusEnum.Success:
return 'success'
return 'inactive'
return "success"
return "inactive"
class BuildStatus:
'''
"""
build status holder
:ivar status: build status
:ivar _timestamp: build status update time
'''
"""
def __init__(self, status: Union[BuildStatusEnum, str, None] = None,
timestamp: Optional[int] = None) -> None:
'''
"""
default constructor
:param status: current build status if known. `BuildStatusEnum.Unknown` will be used if not set
:param timestamp: build status timestamp. Current timestamp will be used if not set
'''
"""
self.status = BuildStatusEnum(status) if status else BuildStatusEnum.Unknown
self.timestamp = timestamp or int(datetime.datetime.utcnow().timestamp())
@classmethod
def from_json(cls: Type[BuildStatus], dump: Dict[str, Any]) -> BuildStatus:
'''
"""
construct status properties from json dump
:param dump: json dump body
:return: status properties
'''
return cls(dump.get('status'), dump.get('timestamp'))
"""
return cls(dump.get("status"), dump.get("timestamp"))
def pretty_print(self) -> str:
'''
"""
generate pretty string representation
:return: print-friendly string
'''
return f'{self.status.value} ({pretty_datetime(self.timestamp)})'
"""
return f"{self.status.value} ({pretty_datetime(self.timestamp)})"
def view(self) -> Dict[str, Any]:
'''
"""
generate json status view
:return: json-friendly dictionary
'''
"""
return {
'status': self.status.value,
'timestamp': self.timestamp
"status": self.status.value,
"timestamp": self.timestamp
}
def __eq__(self, other: Any) -> bool:
"""
compare object to other
:param other: other object to compare
:return: True in case if objects are equal
"""
if not isinstance(other, BuildStatus):
return False
return self.status == other.status and self.timestamp == other.timestamp
def __repr__(self) -> str:
'''
"""
generate string representation of object
:return: unique string representation
'''
return f'BuildStatus(status={self.status.value}, timestamp={self.timestamp})'
"""
return f"BuildStatus(status={self.status.value}, timestamp={self.timestamp})"

View File

@ -37,151 +37,153 @@ from ahriman.models.repository_paths import RepositoryPaths
@dataclass
class Package:
'''
"""
package properties representation
:ivar aurl_url: AUR root url
:ivar aur_url: AUR root url
:ivar base: package base name
:ivar packages: map of package names to their properties. Filled only on load from archive
:ivar version: package full version
'''
"""
base: str
version: str
aur_url: str
packages: Dict[str, PackageDescription]
_check_output = check_output
@property
def git_url(self) -> str:
'''
"""
:return: package git url to clone
'''
return f'{self.aur_url}/{self.base}.git'
"""
return f"{self.aur_url}/{self.base}.git"
@property
def is_single_package(self) -> bool:
'''
"""
:return: true in case if this base has only one package with the same name
'''
"""
return self.base in self.packages and len(self.packages) == 1
@property
def is_vcs(self) -> bool:
'''
"""
:return: True in case if package base looks like VCS package and false otherwise
'''
return self.base.endswith('-bzr') \
or self.base.endswith('-csv')\
or self.base.endswith('-darcs')\
or self.base.endswith('-git')\
or self.base.endswith('-hg')\
or self.base.endswith('-svn')
"""
return self.base.endswith("-bzr") \
or self.base.endswith("-csv")\
or self.base.endswith("-darcs")\
or self.base.endswith("-git")\
or self.base.endswith("-hg")\
or self.base.endswith("-svn")
@property
def web_url(self) -> str:
'''
"""
:return: package AUR url
'''
return f'{self.aur_url}/packages/{self.base}'
"""
return f"{self.aur_url}/packages/{self.base}"
@classmethod
def from_archive(cls: Type[Package], path: str, pacman: Pacman, aur_url: str) -> Package:
'''
"""
construct package properties from package archive
:param path: path to package archive
:param pacman: alpm wrapper instance
:param aur_url: AUR root url
:return: package properties
'''
"""
package = pacman.handle.load_pkg(path)
properties = PackageDescription(package.size, package.builddate, os.path.basename(path), package.isize)
return cls(package.base, package.version, aur_url, {package.name: properties})
@classmethod
def from_aur(cls: Type[Package], name: str, aur_url: str) -> Package:
'''
"""
construct package properties from AUR page
:param name: package name (either base or normal name)
:param aur_url: AUR root url
:return: package properties
'''
"""
package = aur.info(name)
return cls(package.package_base, package.version, aur_url, {package.name: PackageDescription()})
@classmethod
def from_build(cls: Type[Package], path: str, aur_url: str) -> Package:
'''
"""
construct package properties from sources directory
:param path: path to package sources directory
:param aur_url: AUR root url
:return: package properties
'''
with open(os.path.join(path, '.SRCINFO')) as srcinfo_file:
"""
with open(os.path.join(path, ".SRCINFO")) as srcinfo_file:
srcinfo, errors = parse_srcinfo(srcinfo_file.read())
if errors:
raise InvalidPackageInfo(errors)
packages = {key: PackageDescription() for key in srcinfo['packages']}
version = cls.full_version(srcinfo.get('epoch'), srcinfo['pkgver'], srcinfo['pkgrel'])
packages = {key: PackageDescription() for key in srcinfo["packages"]}
version = cls.full_version(srcinfo.get("epoch"), srcinfo["pkgver"], srcinfo["pkgrel"])
return cls(srcinfo['pkgbase'], version, aur_url, packages)
return cls(srcinfo["pkgbase"], version, aur_url, packages)
@classmethod
def from_json(cls: Type[Package], dump: Dict[str, Any]) -> Package:
'''
"""
construct package properties from json dump
:param dump: json dump body
:return: package properties
'''
"""
packages = {
key: PackageDescription(**value)
for key, value in dump.get('packages', {}).items()
for key, value in dump.get("packages", {}).items()
}
return Package(
base=dump['base'],
version=dump['version'],
aur_url=dump['aur_url'],
base=dump["base"],
version=dump["version"],
aur_url=dump["aur_url"],
packages=packages)
@staticmethod
def dependencies(path: str) -> Set[str]:
'''
"""
load dependencies from package sources
:param path: path to package sources directory
:return: list of package dependencies including makedepends array, but excluding packages from this base
'''
with open(os.path.join(path, '.SRCINFO')) as srcinfo_file:
"""
with open(os.path.join(path, ".SRCINFO")) as srcinfo_file:
srcinfo, errors = parse_srcinfo(srcinfo_file.read())
if errors:
raise InvalidPackageInfo(errors)
makedepends = srcinfo.get('makedepends', [])
makedepends = srcinfo.get("makedepends", [])
# sum over each package
depends: List[str] = srcinfo.get('depends', [])
for package in srcinfo['packages'].values():
depends.extend(package.get('depends', []))
depends: List[str] = srcinfo.get("depends", [])
for package in srcinfo["packages"].values():
depends.extend(package.get("depends", []))
# we are not interested in dependencies inside pkgbase
packages = set(srcinfo['packages'].keys())
packages = set(srcinfo["packages"].keys())
return set(depends + makedepends) - packages
@staticmethod
def full_version(epoch: Optional[str], pkgver: str, pkgrel: str) -> str:
'''
"""
generate full version from components
:param epoch: package epoch if any
:param pkgver: package version
:param pkgrel: package release version (archlinux specific)
:return: generated version
'''
prefix = f'{epoch}:' if epoch else ''
return f'{prefix}{pkgver}-{pkgrel}'
"""
prefix = f"{epoch}:" if epoch else ""
return f"{prefix}{pkgver}-{pkgrel}"
@staticmethod
def load(path: str, pacman: Pacman, aur_url: str) -> Package:
'''
"""
package constructor from available sources
:param path: one of path to sources directory, path to archive or package name/base
:param pacman: alpm wrapper instance (required to load from archive)
:param aur_url: AUR root url
:return: package properties
'''
"""
try:
if os.path.isdir(path):
package: Package = Package.from_build(path, aur_url)
@ -196,52 +198,57 @@ class Package:
raise InvalidPackageInfo(str(e))
def actual_version(self, paths: RepositoryPaths) -> str:
'''
"""
additional method to handle VCS package versions
:param paths: repository paths instance
:return: package version if package is not VCS and current version according to VCS otherwise
'''
"""
if not self.is_vcs:
return self.version
from ahriman.core.build_tools.task import Task
clone_dir = os.path.join(paths.cache, self.base)
logger = logging.getLogger('build_details')
logger = logging.getLogger("build_details")
Task.fetch(clone_dir, self.git_url)
# update pkgver first
check_output('makepkg', '--nodeps', '--nobuild', exception=None, cwd=clone_dir, logger=logger)
Package._check_output("makepkg", "--nodeps", "--nobuild", exception=None, cwd=clone_dir, logger=logger)
# generate new .SRCINFO and put it to parser
srcinfo_source = check_output('makepkg', '--printsrcinfo', exception=None, cwd=clone_dir, logger=logger)
srcinfo_source = Package._check_output(
"makepkg",
"--printsrcinfo",
exception=None,
cwd=clone_dir,
logger=logger)
srcinfo, errors = parse_srcinfo(srcinfo_source)
if errors:
raise InvalidPackageInfo(errors)
return self.full_version(srcinfo.get('epoch'), srcinfo['pkgver'], srcinfo['pkgrel'])
return self.full_version(srcinfo.get("epoch"), srcinfo["pkgver"], srcinfo["pkgrel"])
def is_outdated(self, remote: Package, paths: RepositoryPaths) -> bool:
'''
"""
check if package is out-of-dated
:param remote: package properties from remote source
:param paths: repository paths instance. Required for VCS packages cache
:return: True if the package is out-of-dated and False otherwise
'''
"""
remote_version = remote.actual_version(paths) # either normal version or updated VCS
result: int = vercmp(self.version, remote_version)
return result < 0
def pretty_print(self) -> str:
'''
"""
generate pretty string representation
:return: print-friendly string
'''
details = '' if self.is_single_package else f''' ({' '.join(sorted(self.packages.keys()))})'''
return f'{self.base}{details}'
"""
details = "" if self.is_single_package else f""" ({" ".join(sorted(self.packages.keys()))})"""
return f"{self.base}{details}"
def view(self) -> Dict[str, Any]:
'''
"""
generate json package view
:return: json-friendly dictionary
'''
"""
return asdict(self)

View File

@ -23,13 +23,13 @@ from typing import Optional
@dataclass
class PackageDescription:
'''
"""
package specific properties
:ivar archive_size: package archive size
:ivar build_date: package build date
:ivar filename: package archive name
:ivar installed_size: package installed size
'''
"""
archive_size: Optional[int] = None
build_date: Optional[int] = None

View File

@ -25,20 +25,20 @@ from ahriman.core.exceptions import InvalidOption
class ReportSettings(Enum):
'''
"""
report targets enumeration
:cvar HTML: html report generation
'''
"""
HTML = auto()
@staticmethod
def from_option(value: str) -> ReportSettings:
'''
"""
construct value from configuration
:param value: configuration value
:return: parsed value
'''
if value.lower() in ('html',):
"""
if value.lower() in ("html",):
return ReportSettings.HTML
raise InvalidOption(value)

View File

@ -24,62 +24,62 @@ from dataclasses import dataclass
@dataclass
class RepositoryPaths:
'''
"""
repository paths holder. For the most operations with paths you want to use this object
:ivar root: repository root (i.e. ahriman home)
:ivar architecture: repository architecture
'''
"""
root: str
architecture: str
@property
def cache(self) -> str:
'''
"""
:return: directory for packages cache (mainly used for VCS packages)
'''
return os.path.join(self.root, 'cache')
"""
return os.path.join(self.root, "cache")
@property
def chroot(self) -> str:
'''
"""
:return: directory for devtools chroot
'''
# for the chroot directory devtools will create own tree and we don't have to specify architecture here
return os.path.join(self.root, 'chroot')
"""
# for the chroot directory devtools will create own tree and we don"t have to specify architecture here
return os.path.join(self.root, "chroot")
@property
def manual(self) -> str:
'''
"""
:return: directory for manual updates (i.e. from add command)
'''
return os.path.join(self.root, 'manual', self.architecture)
"""
return os.path.join(self.root, "manual", self.architecture)
@property
def packages(self) -> str:
'''
"""
:return: directory for built packages
'''
return os.path.join(self.root, 'packages', self.architecture)
"""
return os.path.join(self.root, "packages", self.architecture)
@property
def repository(self) -> str:
'''
"""
:return: repository directory
'''
return os.path.join(self.root, 'repository', self.architecture)
"""
return os.path.join(self.root, "repository", self.architecture)
@property
def sources(self) -> str:
'''
"""
:return: directory for downloaded PKGBUILDs for current build
'''
return os.path.join(self.root, 'sources', self.architecture)
"""
return os.path.join(self.root, "sources", self.architecture)
def create_tree(self) -> None:
'''
"""
create ahriman working tree
'''
"""
os.makedirs(self.cache, mode=0o755, exist_ok=True)
os.makedirs(self.chroot, mode=0o755, exist_ok=True)
os.makedirs(self.manual, mode=0o755, exist_ok=True)

View File

@ -25,24 +25,24 @@ from ahriman.core.exceptions import InvalidOption
class SignSettings(Enum):
'''
"""
sign targets enumeration
:cvar SignPackages: sign each package
:cvar SignRepository: sign repository database file
'''
"""
SignPackages = auto()
SignRepository = auto()
@staticmethod
def from_option(value: str) -> SignSettings:
'''
"""
construct value from configuration
:param value: configuration value
:return: parsed value
'''
if value.lower() in ('package', 'packages', 'sign-package'):
"""
if value.lower() in ("package", "packages", "sign-package"):
return SignSettings.SignPackages
if value.lower() in ('repository', 'sign-repository'):
if value.lower() in ("repository", "sign-repository"):
return SignSettings.SignRepository
raise InvalidOption(value)

View File

@ -25,24 +25,24 @@ from ahriman.core.exceptions import InvalidOption
class UploadSettings(Enum):
'''
"""
remote synchronization targets enumeration
:cvar Rsync: sync via rsync
:cvar S3: sync to Amazon S3
'''
"""
Rsync = auto()
S3 = auto()
@staticmethod
def from_option(value: str) -> UploadSettings:
'''
"""
construct value from configuration
:param value: configuration value
:return: parsed value
'''
if value.lower() in ('rsync',):
"""
if value.lower() in ("rsync",):
return UploadSettings.Rsync
if value.lower() in ('s3',):
if value.lower() in ("s3",):
return UploadSettings.S3
raise InvalidOption(value)

View File

@ -17,4 +17,4 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__version__ = '0.15.0'
__version__ = "0.15.0"

View File

@ -28,11 +28,11 @@ HandlerType = Callable[[Request], Awaitable[StreamResponse]]
def exception_handler(logger: Logger) -> Callable[[Request, HandlerType], Awaitable[StreamResponse]]:
'''
"""
exception handler middleware. Just log any exception (except for client ones)
:param logger: class logger
:return: built middleware
'''
"""
@middleware
async def handle(request: Request, handler: HandlerType) -> StreamResponse:
try:
@ -40,7 +40,7 @@ def exception_handler(logger: Logger) -> Callable[[Request, HandlerType], Awaita
except HTTPClientError:
raise
except Exception:
logger.exception(f'exception during performing request to {request.path}', exc_info=True)
logger.exception(f"exception during performing request to {request.path}", exc_info=True)
raise
return handle

View File

@ -26,7 +26,7 @@ from ahriman.web.views.packages import PackagesView
def setup_routes(application: Application) -> None:
'''
"""
setup all defined routes
Available routes are:
@ -45,16 +45,16 @@ def setup_routes(application: Application) -> None:
POST /api/v1/package/:base update package base status
:param application: web application instance
'''
application.router.add_get('/', IndexView)
application.router.add_get('/index.html', IndexView)
"""
application.router.add_get("/", IndexView)
application.router.add_get("/index.html", IndexView)
application.router.add_get('/api/v1/ahriman', AhrimanView)
application.router.add_post('/api/v1/ahriman', AhrimanView)
application.router.add_get("/api/v1/ahriman", AhrimanView)
application.router.add_post("/api/v1/ahriman", AhrimanView)
application.router.add_get('/api/v1/packages', PackagesView)
application.router.add_post('/api/v1/packages', PackagesView)
application.router.add_get("/api/v1/packages", PackagesView)
application.router.add_post("/api/v1/packages", PackagesView)
application.router.add_delete('/api/v1/packages/{package}', PackageView)
application.router.add_get('/api/v1/packages/{package}', PackageView)
application.router.add_post('/api/v1/packages/{package}', PackageView)
application.router.add_delete("/api/v1/packages/{package}", PackageView)
application.router.add_get("/api/v1/packages/{package}", PackageView)
application.router.add_post("/api/v1/packages/{package}", PackageView)

View File

@ -24,19 +24,19 @@ from ahriman.web.views.base import BaseView
class AhrimanView(BaseView):
'''
"""
service status web view
'''
"""
async def get(self) -> Response:
'''
"""
get current service status
:return: 200 with service status object
'''
"""
return json_response(self.service.status.view())
async def post(self) -> Response:
'''
"""
update service status
JSON body must be supplied, the following model is used:
@ -45,11 +45,11 @@ class AhrimanView(BaseView):
}
:return: 204 on success
'''
"""
data = await self.request.json()
try:
status = BuildStatusEnum(data['status'])
status = BuildStatusEnum(data["status"])
except Exception as e:
raise HTTPBadRequest(text=str(e))

View File

@ -23,14 +23,14 @@ from ahriman.core.watcher.watcher import Watcher
class BaseView(View):
'''
"""
base web view to make things typed
'''
"""
@property
def service(self) -> Watcher:
'''
"""
:return: build status watcher instance
'''
watcher: Watcher = self.request.app['watcher']
"""
watcher: Watcher = self.request.app["watcher"]
return watcher

View File

@ -28,7 +28,7 @@ from ahriman.web.views.base import BaseView
class IndexView(BaseView):
'''
"""
root view
It uses jinja2 templates for report generation, the following variables are allowed:
@ -39,35 +39,35 @@ class IndexView(BaseView):
repository - repository name, string, required
service - service status properties: status, status_color, timestamp. Required
version - ahriman version, string, required
'''
"""
@aiohttp_jinja2.template('build-status.jinja2')
@aiohttp_jinja2.template("build-status.jinja2")
async def get(self) -> Dict[str, Any]:
'''
"""
process get request. No parameters supported here
:return: parameters for jinja template
'''
"""
# some magic to make it jinja-friendly
packages = [
{
'base': package.base,
'packages': list(sorted(package.packages)),
'status': status.status.value,
'timestamp': pretty_datetime(status.timestamp),
'version': package.version,
'web_url': package.web_url
"base": package.base,
"packages": list(sorted(package.packages)),
"status": status.status.value,
"timestamp": pretty_datetime(status.timestamp),
"version": package.version,
"web_url": package.web_url
} for package, status in sorted(self.service.packages, key=lambda item: item[0].base)
]
service = {
'status': self.service.status.status.value,
'status_color': self.service.status.status.badges_color(),
'timestamp': pretty_datetime(self.service.status.timestamp)
"status": self.service.status.status.value,
"status_color": self.service.status.status.badges_color(),
"timestamp": pretty_datetime(self.service.status.timestamp)
}
return {
'architecture': self.service.architecture,
'packages': packages,
'repository': self.service.repository.name,
'service': service,
'version': version.__version__,
"architecture": self.service.architecture,
"packages": packages,
"repository": self.service.repository.name,
"service": service,
"version": version.__version__,
}

View File

@ -25,16 +25,16 @@ from ahriman.web.views.base import BaseView
class PackageView(BaseView):
'''
"""
package base specific web view
'''
"""
async def get(self) -> Response:
'''
"""
get current package base status
:return: 200 with package description on success
'''
base = self.request.match_info['package']
"""
base = self.request.match_info["package"]
try:
package, status = self.service.get(base)
@ -43,24 +43,24 @@ class PackageView(BaseView):
response = [
{
'package': package.view(),
'status': status.view()
"package": package.view(),
"status": status.view()
}
]
return json_response(response)
async def delete(self) -> Response:
'''
"""
delete package base from status page
:return: 204 on success
'''
base = self.request.match_info['package']
"""
base = self.request.match_info["package"]
self.service.remove(base)
return HTTPNoContent()
async def post(self) -> Response:
'''
"""
update package build status
JSON body must be supplied, the following model is used:
@ -71,19 +71,19 @@ class PackageView(BaseView):
}
:return: 204 on success
'''
base = self.request.match_info['package']
"""
base = self.request.match_info["package"]
data = await self.request.json()
try:
package = Package.from_json(data['package']) if 'package' in data else None
status = BuildStatusEnum(data['status'])
package = Package.from_json(data["package"]) if "package" in data else None
status = BuildStatusEnum(data["status"])
except Exception as e:
raise HTTPBadRequest(text=str(e))
try:
self.service.update(base, status, package)
except KeyError:
raise HTTPBadRequest(text=f'Package {base} is unknown, but no package body set')
raise HTTPBadRequest(text=f"Package {base} is unknown, but no package body set")
return HTTPNoContent()

View File

@ -23,28 +23,28 @@ from ahriman.web.views.base import BaseView
class PackagesView(BaseView):
'''
"""
global watcher view
'''
"""
async def get(self) -> Response:
'''
"""
get current packages status
:return: 200 with package description on success
'''
"""
response = [
{
'package': package.view(),
'status': status.view()
"package": package.view(),
"status": status.view()
} for package, status in self.service.packages
]
return json_response(response)
async def post(self) -> Response:
'''
"""
reload all packages from repository. No parameters supported here
:return: 204 on success
'''
"""
self.service.load()
return HTTPNoContent()

View File

@ -31,65 +31,65 @@ from ahriman.web.routes import setup_routes
async def on_shutdown(application: web.Application) -> None:
'''
"""
web application shutdown handler
:param application: web application instance
'''
application.logger.warning('server terminated')
"""
application.logger.warning("server terminated")
async def on_startup(application: web.Application) -> None:
'''
"""
web application start handler
:param application: web application instance
'''
application.logger.info('server started')
"""
application.logger.info("server started")
try:
application['watcher'].load()
application["watcher"].load()
except Exception:
application.logger.exception('could not load packages', exc_info=True)
application.logger.exception("could not load packages", exc_info=True)
raise InitializeException()
def run_server(application: web.Application, architecture: str) -> None:
'''
"""
run web application
:param application: web application instance
:param architecture: repository architecture
'''
application.logger.info('start server')
"""
application.logger.info("start server")
section = application['config'].get_section_name('web', architecture)
host = application['config'].get(section, 'host')
port = application['config'].getint(section, 'port')
section = application["config"].get_section_name("web", architecture)
host = application["config"].get(section, "host")
port = application["config"].getint(section, "port")
web.run_app(application, host=host, port=port, handle_signals=False,
access_log=logging.getLogger('http'))
access_log=logging.getLogger("http"))
def setup_service(architecture: str, config: Configuration) -> web.Application:
'''
"""
create web application
:param architecture: repository architecture
:param config: configuration instance
:return: web application instance
'''
application = web.Application(logger=logging.getLogger('http'))
"""
application = web.Application(logger=logging.getLogger("http"))
application.on_shutdown.append(on_shutdown)
application.on_startup.append(on_startup)
application.middlewares.append(web.normalize_path_middleware(append_slash=False, remove_slash=True))
application.middlewares.append(exception_handler(application.logger))
application.logger.info('setup routes')
application.logger.info("setup routes")
setup_routes(application)
application.logger.info('setup templates')
aiohttp_jinja2.setup(application, loader=jinja2.FileSystemLoader(config.get('web', 'templates')))
application.logger.info("setup templates")
aiohttp_jinja2.setup(application, loader=jinja2.FileSystemLoader(config.get("web", "templates")))
application.logger.info('setup configuration')
application['config'] = config
application.logger.info("setup configuration")
application["config"] = config
application.logger.info('setup watcher')
application['watcher'] = Watcher(architecture, config)
application.logger.info("setup watcher")
application["watcher"] = Watcher(architecture, config)
return application

View File

@ -0,0 +1,79 @@
import pytest
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
from ahriman.models.package import Package
from ahriman.models.package_desciption import PackageDescription
from ahriman.models.repository_paths import RepositoryPaths
@pytest.fixture
def build_status_failed() -> BuildStatus:
return BuildStatus(BuildStatusEnum.Failed, 42)
@pytest.fixture
def package_ahriman(package_description_ahriman: PackageDescription) -> Package:
packages = {"ahriman": package_description_ahriman}
return Package(
base="ahriman",
version="0.12.1-1",
aur_url="https://aur.archlinux.org",
packages=packages)
@pytest.fixture
def package_python_schedule(
package_description_python_schedule: PackageDescription,
package_description_python2_schedule: PackageDescription) -> Package:
packages = {
"python-schedule": package_description_python_schedule,
"python2-schedule": package_description_python2_schedule
}
return Package(
base="python-schedule",
version="1.0.0-2",
aur_url="https://aur.archlinux.org",
packages=packages)
@pytest.fixture
def package_tpacpi_bat_git() -> Package:
return Package(
base="tpacpi-bat-git",
version="3.1.r12.g4959b52-1",
aur_url="https://aur.archlinux.org",
packages={"tpacpi-bat-git": PackageDescription()})
@pytest.fixture
def package_description_ahriman() -> PackageDescription:
return PackageDescription(
archive_size=4200,
build_date=42,
filename="ahriman-0.12.1-1-any.pkg.tar.zst",
installed_size=4200000)
@pytest.fixture
def package_description_python_schedule() -> PackageDescription:
return PackageDescription(
archive_size=4201,
build_date=421,
filename="python-schedule-1.0.0-2-any.pkg.tar.zst",
installed_size=4200001)
@pytest.fixture
def package_description_python2_schedule() -> PackageDescription:
return PackageDescription(
archive_size=4202,
build_date=422,
filename="python2-schedule-1.0.0-2-any.pkg.tar.zst",
installed_size=4200002)
@pytest.fixture
def repository_paths() -> RepositoryPaths:
return RepositoryPaths(
architecture="x86_64",
root="/var/lib/ahriman")

View File

@ -0,0 +1,38 @@
from ahriman.models.build_status import BuildStatus, BuildStatusEnum
def test_build_status_enum_badges_color() -> None:
"""
status color must be one of shields.io supported
"""
SUPPORTED_COLORS = [
"brightgreen", "green", "yellowgreen", "yellow", "orange", "red", "blue", "lightgrey",
"success", "important", "critical", "informational", "inactive", "blueviolet"
]
for status in BuildStatusEnum:
assert status.badges_color() in SUPPORTED_COLORS
def test_build_status_init_1() -> None:
"""
must construct status object from None
"""
status = BuildStatus()
assert status.status == BuildStatusEnum.Unknown
assert status.timestamp > 0
def test_build_status_init_2(build_status_failed: BuildStatus) -> None:
"""
must construct status object from objects
"""
status = BuildStatus(BuildStatusEnum.Failed, 42)
assert status == build_status_failed
def test_build_status_from_json_view(build_status_failed: BuildStatus) -> None:
"""
must construct same object from json
"""
assert BuildStatus.from_json(build_status_failed.view()) == build_status_failed

View File

@ -0,0 +1,109 @@
from pathlib import Path
from pytest_mock import MockerFixture
from ahriman.models.package import Package
from ahriman.models.repository_paths import RepositoryPaths
def test_git_url(package_ahriman: Package) -> None:
"""
must generate valid git url
"""
assert package_ahriman.git_url.endswith(".git")
assert package_ahriman.git_url.startswith(package_ahriman.aur_url)
assert package_ahriman.base in package_ahriman.git_url
def test_is_single_package_false(package_python_schedule: Package) -> None:
"""
python-schedule must not be single package
"""
assert not package_python_schedule.is_single_package
def test_is_single_package_true(package_ahriman: Package) -> None:
"""
ahriman must be single package
"""
assert package_ahriman.is_single_package
def test_is_vcs_false(package_ahriman: Package) -> None:
"""
ahriman must not be VCS package
"""
assert not package_ahriman.is_vcs
def test_is_vcs_true(package_tpacpi_bat_git: Package) -> None:
"""
tpacpi-bat-git must be VCS package
"""
assert package_tpacpi_bat_git.is_vcs
def test_web_url(package_ahriman: Package) -> None:
"""
must generate valid web url
"""
assert package_ahriman.web_url.startswith(package_ahriman.aur_url)
assert package_ahriman.base in package_ahriman.web_url
def test_from_json_view_1(package_ahriman: Package) -> None:
"""
must construct same object from json
"""
assert Package.from_json(package_ahriman.view()) == package_ahriman
def test_from_json_view_2(package_python_schedule: Package) -> None:
"""
must construct same object from json
"""
assert Package.from_json(package_python_schedule.view()) == package_python_schedule
def test_from_json_view_3(package_tpacpi_bat_git: Package) -> None:
"""
must construct same object from json
"""
assert Package.from_json(package_tpacpi_bat_git.view()) == package_tpacpi_bat_git
def test_actual_version(package_ahriman: Package, repository_paths: RepositoryPaths) -> None:
"""
must return same actual_version as version is
"""
assert package_ahriman.actual_version(repository_paths) == package_ahriman.version
def test_actual_version_vcs(package_tpacpi_bat_git: Package, repository_paths: RepositoryPaths,
mocker: MockerFixture, resource_path_root: Path) -> None:
"""
must return valid actual_version for VCS package
"""
srcinfo = (resource_path_root / "models" / "package_tpacpi-bat-git_srcinfo").read_text()
mocker.patch("ahriman.models.package.Package._check_output", return_value=srcinfo)
mocker.patch("ahriman.core.build_tools.task.Task.fetch", return_value=None)
assert package_tpacpi_bat_git.actual_version(repository_paths) == "3.1.r13.g4959b52-1"
def test_is_outdated_false(package_ahriman: Package, repository_paths: RepositoryPaths) -> None:
"""
must be not outdated for the same package
"""
assert not package_ahriman.is_outdated(package_ahriman, repository_paths)
def test_is_outdated_true(package_ahriman: Package, repository_paths: RepositoryPaths) -> None:
"""
must be outdated for the new version
"""
other = Package.from_json(package_ahriman.view())
other.version = other.version.replace("-1", "-2")
assert package_ahriman.is_outdated(other, repository_paths)

View File

@ -0,0 +1,20 @@
import pytest
from ahriman.core.exceptions import InvalidOption
from ahriman.models.report_settings import ReportSettings
def test_from_option_invalid() -> None:
"""
must raise exception on invalid option
"""
with pytest.raises(InvalidOption, match=".* `invalid`$"):
ReportSettings.from_option("invalid")
def test_from_option_valid() -> None:
"""
must return value from valid options
"""
assert ReportSettings.from_option("html") == ReportSettings.HTML
assert ReportSettings.from_option("HTML") == ReportSettings.HTML

View File

@ -0,0 +1,25 @@
import os
from pytest_mock import MockerFixture
from unittest import mock
from ahriman.models.repository_paths import RepositoryPaths
def test_create_tree(repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
"""
must create whole tree
"""
paths = {
property
for property in dir(repository_paths)
if not property.startswith("_") and property not in ("architecture", "create_tree", "root")
}
mocker.patch("os.makedirs")
repository_paths.create_tree()
os.makedirs.assert_has_calls(
[
mock.call(getattr(repository_paths, path), mode=0o755, exist_ok=True)
for path in paths
], any_order=True)

View File

@ -0,0 +1,26 @@
import pytest
from ahriman.core.exceptions import InvalidOption
from ahriman.models.sign_settings import SignSettings
def test_from_option_invalid() -> None:
"""
must raise exception on invalid option
"""
with pytest.raises(InvalidOption, match=".* `invalid`$"):
SignSettings.from_option("invalid")
def test_from_option_valid() -> None:
"""
must return value from valid options
"""
assert SignSettings.from_option("package") == SignSettings.SignPackages
assert SignSettings.from_option("PACKAGE") == SignSettings.SignPackages
assert SignSettings.from_option("packages") == SignSettings.SignPackages
assert SignSettings.from_option("sign-package") == SignSettings.SignPackages
assert SignSettings.from_option("repository") == SignSettings.SignRepository
assert SignSettings.from_option("REPOSITORY") == SignSettings.SignRepository
assert SignSettings.from_option("sign-repository") == SignSettings.SignRepository

View File

@ -0,0 +1,23 @@
import pytest
from ahriman.core.exceptions import InvalidOption
from ahriman.models.upload_settings import UploadSettings
def test_from_option_invalid() -> None:
"""
must raise exception on invalid option
"""
with pytest.raises(InvalidOption, match=".* `invalid`$"):
UploadSettings.from_option("invalid")
def test_from_option_valid() -> None:
"""
must return value from valid options
"""
assert UploadSettings.from_option("rsync") == UploadSettings.Rsync
assert UploadSettings.from_option("RSYNC") == UploadSettings.Rsync
assert UploadSettings.from_option("s3") == UploadSettings.S3
assert UploadSettings.from_option("S3") == UploadSettings.S3

0
tests/conftest.py Normal file
View File

View File

@ -0,0 +1,17 @@
pkgbase = tpacpi-bat-git
pkgdesc = A Perl script with ACPI calls for recent ThinkPads which are not supported by tp_smapi
pkgver = 3.1.r13.g4959b52
pkgrel = 1
url = https://github.com/teleshoes/tpacpi-bat
arch = any
license = GPL3
makedepends = git
depends = perl
depends = acpi_call
provides = tpacpi-bat
conflicts = tpacpi-bat
backup = etc/conf.d/tpacpi
source = git+https://github.com/teleshoes/tpacpi-bat.git
b2sums = SKIP
pkgname = tpacpi-bat-git