try to replace os with pathlib

This commit is contained in:
Evgenii Alekseev 2021-03-22 08:13:37 +03:00
parent 1ab2921e25
commit c3b9933c64
20 changed files with 178 additions and 180 deletions

View File

@ -18,9 +18,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
import os
import shutil
from pathlib import Path
from typing import Callable, Iterable, List, Optional, Set
from ahriman.core.build_tools.task import Task
@ -101,32 +101,32 @@ class Application:
"""
known_packages = self._known_packages()
def add_directory(path: str) -> None:
for package in filter(package_like, os.listdir(path)):
full_path = os.path.join(path, package)
add_manual(full_path)
def add_directory(path: Path) -> None:
for full_path in filter(lambda p: package_like(p.name), path.iterdir()):
add_archive(full_path)
def add_manual(name: str) -> str:
def add_manual(name: str) -> Path:
package = Package.load(name, self.repository.pacman, self.config.get("alpm", "aur_url"))
path = os.path.join(self.repository.paths.manual, package.base)
path = self.repository.paths.manual / package.base
Task.fetch(path, package.git_url)
return path
def add_archive(src: str) -> None:
dst = os.path.join(self.repository.paths.packages, os.path.basename(src))
def add_archive(src: Path) -> None:
dst = self.repository.paths.packages / src.name
shutil.move(src, dst)
def process_dependencies(path: str) -> None:
def process_dependencies(path: Path) -> None:
if without_dependencies:
return
dependencies = Package.dependencies(path)
self.add(dependencies.difference(known_packages), without_dependencies)
def process_single(name: str) -> None:
if os.path.isdir(name):
add_directory(name)
elif os.path.isfile(name):
add_archive(name)
maybe_path = Path(name)
if maybe_path.is_dir():
add_directory(maybe_path)
elif maybe_path.is_file():
add_archive(maybe_path)
else:
path = add_manual(name)
process_dependencies(path)
@ -183,7 +183,7 @@ class Application:
run package updates
:param updates: list of packages to update
"""
def process_update(paths: Iterable[str]) -> None:
def process_update(paths: Iterable[Path]) -> None:
self.repository.process_update(paths)
self._finalize()

View File

@ -22,6 +22,7 @@ from __future__ import annotations
import argparse
import os
from pathlib import Path
from types import TracebackType
from typing import Literal, Optional, Type
@ -48,11 +49,11 @@ class Lock:
:param architecture: repository architecture
:param config: configuration instance
"""
self.path = f"{args.lock}_{architecture}" if args.lock is not None else None
self.path = Path(f"{args.lock}_{architecture}") if args.lock is not None else None
self.force = args.force
self.unsafe = args.unsafe
self.root = config.get("repository", "root")
self.root = Path(config.get("repository", "root"))
self.reporter = Client() if args.no_report else Client.load(architecture, config)
def __enter__(self) -> Lock:
@ -68,7 +69,6 @@ class Lock:
self.check_user()
if self.force:
self.remove()
self.check()
self.create()
self.reporter.update_self(BuildStatusEnum.Building)
return self
@ -87,15 +87,6 @@ class Lock:
self.reporter.update_self(status)
return False
def check(self) -> None:
"""
check if lock file exists, raise exception if it does
"""
if self.path is None:
return
if os.path.exists(self.path):
raise DuplicateRun()
def check_user(self) -> None:
"""
check if current user is actually owner of ahriman root
@ -103,7 +94,7 @@ class Lock:
if self.unsafe:
return
current_uid = os.getuid()
root_uid = os.stat(self.root).st_uid
root_uid = self.root.stat().st_uid
if current_uid != root_uid:
raise UnsafeRun(current_uid, root_uid)
@ -113,7 +104,10 @@ class Lock:
"""
if self.path is None:
return
open(self.path, "w").close()
try:
self.path.touch()
except FileExistsError:
raise DuplicateRun()
def remove(self) -> None:
"""
@ -121,5 +115,4 @@ class Lock:
"""
if self.path is None:
return
if os.path.exists(self.path):
os.remove(self.path)
self.path.unlink(missing_ok=True)

View File

@ -18,8 +18,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
import os
from pathlib import Path
from typing import List
from ahriman.core.exceptions import BuildFailed
@ -51,37 +51,36 @@ class Repo:
self.sign_args = sign_args
@property
def repo_path(self) -> str:
def repo_path(self) -> Path:
"""
:return: path to repository database
"""
return os.path.join(self.paths.repository, f"{self.name}.db.tar.gz")
return self.paths.repository / f"{self.name}.db.tar.gz"
def add(self, path: str) -> None:
def add(self, path: Path) -> None:
"""
add new package to repository
:param path: path to archive to add
"""
Repo._check_output(
"repo-add", *self.sign_args, "-R", self.repo_path, path,
exception=BuildFailed(path),
"repo-add", *self.sign_args, "-R", str(self.repo_path), str(path),
exception=BuildFailed(path.name),
cwd=self.paths.repository,
logger=self.logger)
def remove(self, package: str, filename: str) -> None:
def remove(self, package: str, filename: Path) -> None:
"""
remove package from repository
:param package: package name to remove
:param filename: package filename to remove
"""
# remove package and signature (if any) from filesystem
for fn in filter(lambda f: f.startswith(filename), os.listdir(self.paths.repository)):
full_path = os.path.join(self.paths.repository, fn)
os.remove(full_path)
for full_path in self.paths.repository.glob(f"{filename}*"):
full_path.unlink()
# remove package from registry
Repo._check_output(
"repo-remove", *self.sign_args, self.repo_path, package,
"repo-remove", *self.sign_args, str(self.repo_path), package,
exception=BuildFailed(package),
cwd=self.paths.repository,
logger=self.logger)

View File

@ -17,10 +17,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import logging
import shutil
from pathlib import Path
from typing import List, Optional
from ahriman.core.configuration import Configuration
@ -61,21 +61,21 @@ class Task:
self.makechrootpkg_flags = config.getlist(section, "makechrootpkg_flags")
@property
def cache_path(self) -> str:
def cache_path(self) -> Path:
"""
:return: path to cached packages
"""
return os.path.join(self.paths.cache, self.package.base)
return self.paths.cache / self.package.base
@property
def git_path(self) -> str:
def git_path(self) -> Path:
"""
:return: path to clone package from git
"""
return os.path.join(self.paths.sources, self.package.base)
return self.paths.sources / self.package.base
@staticmethod
def fetch(local: str, remote: str, branch: str = "master") -> None:
def fetch(local: Path, remote: str, branch: str = "master") -> None:
"""
either clone repository or update it to origin/`branch`
:param local: local path to fetch
@ -84,19 +84,19 @@ class Task:
"""
logger = logging.getLogger("build_details")
# local directory exists and there is .git directory
if os.path.isdir(os.path.join(local, ".git")):
if (local / ".git").is_dir():
Task._check_output("git", "fetch", "origin", branch, exception=None, cwd=local, logger=logger)
else:
Task._check_output("git", "clone", remote, local, exception=None, logger=logger)
Task._check_output("git", "clone", remote, str(local), exception=None, logger=logger)
# and now force reset to our branch
Task._check_output("git", "reset", "--hard", f"origin/{branch}", exception=None, cwd=local, logger=logger)
def build(self) -> List[str]:
def build(self) -> List[Path]:
"""
run package build
:return: paths of produced packages
"""
cmd = [self.build_command, "-r", self.paths.chroot]
cmd = [self.build_command, "-r", str(self.paths.chroot)]
cmd.extend(self.archbuild_flags)
cmd.extend(["--"] + self.makechrootpkg_flags)
cmd.extend(["--"] + self.makepkg_flags)
@ -109,18 +109,19 @@ class Task:
logger=self.build_logger)
# well it is not actually correct, but we can deal with it
return Task._check_output("makepkg", "--packagelist",
exception=BuildFailed(self.package.base),
cwd=self.git_path,
logger=self.build_logger).splitlines()
packages = Task._check_output("makepkg", "--packagelist",
exception=BuildFailed(self.package.base),
cwd=self.git_path,
logger=self.build_logger).splitlines()
return [Path(package) for package in packages]
def init(self, path: Optional[str] = None) -> None:
def init(self, path: Optional[Path] = None) -> None:
"""
fetch package from git
:param path: optional local path to fetch. If not set default path will be used
"""
git_path = path or self.git_path
if os.path.isdir(self.cache_path):
if self.cache_path.is_dir():
# no need to clone whole repository, just copy from cache first
shutil.copytree(self.cache_path, git_path)
return Task.fetch(git_path, self.package.git_url)

View File

@ -21,9 +21,9 @@ from __future__ import annotations
import configparser
import logging
import os
from logging.config import fileConfig
from pathlib import Path
from typing import Dict, List, Optional, Type
@ -48,17 +48,17 @@ class Configuration(configparser.RawConfigParser):
default constructor. In the most cases must not be called directly
"""
configparser.RawConfigParser.__init__(self, allow_no_value=True)
self.path: Optional[str] = None
self.path: Optional[Path] = None
@property
def include(self) -> str:
def include(self) -> Path:
"""
:return: path to directory with configuration includes
"""
return self.get("settings", "include")
return Path(self.get("settings", "include"))
@classmethod
def from_path(cls: Type[Configuration], path: str, logfile: bool) -> Configuration:
def from_path(cls: Type[Configuration], path: Path, logfile: bool) -> Configuration:
"""
constructor with full object initialization
:param path: path to root configuration file
@ -111,7 +111,7 @@ class Configuration(configparser.RawConfigParser):
probe = f"{prefix}_{suffix}"
return probe if self.has_section(probe) else prefix
def load(self, path: str) -> None:
def load(self, path: Path) -> None:
"""
fully load configuration
:param path: path to root configuration file
@ -125,8 +125,8 @@ class Configuration(configparser.RawConfigParser):
load configuration includes
"""
try:
for conf in filter(lambda p: p.endswith(".ini"), sorted(os.listdir(self.include))):
self.read(os.path.join(self.include, conf))
for path in sorted(self.include.glob(".ini")):
self.read(path)
except (FileNotFoundError, configparser.NoOptionError):
pass

View File

@ -18,8 +18,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import jinja2
import os
from pathlib import Path
from typing import Callable, Dict, Iterable
from ahriman.core.configuration import Configuration
@ -60,9 +60,9 @@ class HTML(Report):
"""
Report.__init__(self, architecture, config)
section = config.get_section_name("html", architecture)
self.report_path = config.get(section, "path")
self.report_path = Path(config.get(section, "path"))
self.link_path = config.get(section, "link_path")
self.template_path = config.get(section, "template_path")
self.template_path = Path(config.get(section, "template_path"))
# base template vars
self.homepage = config.get(section, "homepage", fallback=None)
@ -78,10 +78,9 @@ class HTML(Report):
:param packages: list of packages to generate report
"""
# idea comes from https://stackoverflow.com/a/38642558
templates_dir, template_name = os.path.split(self.template_path)
loader = jinja2.FileSystemLoader(searchpath=templates_dir)
loader = jinja2.FileSystemLoader(searchpath=self.template_path.parent)
environment = jinja2.Environment(loader=loader)
template = environment.get_template(template_name)
template = environment.get_template(self.template_path.name)
content = [
{
@ -104,5 +103,4 @@ class HTML(Report):
pgp_key=self.pgp_key,
repository=self.name)
with open(self.report_path, "w") as out:
out.write(html)
self.report_path.write_text(html)

View File

@ -17,9 +17,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import shutil
from pathlib import Path
from typing import List
from ahriman.core.repository.properties import Properties
@ -30,7 +30,7 @@ class Cleaner(Properties):
trait to clean common repository objects
"""
def packages_built(self) -> List[str]:
def packages_built(self) -> List[Path]:
"""
get list of files in built packages directory
:return: list of filenames from the directory
@ -42,32 +42,32 @@ class Cleaner(Properties):
clear sources directory
"""
self.logger.info("clear package sources directory")
for package in os.listdir(self.paths.sources):
shutil.rmtree(os.path.join(self.paths.sources, package))
for package in self.paths.sources.iterdir():
shutil.rmtree(package)
def clear_cache(self) -> None:
"""
clear cache directory
"""
self.logger.info("clear packages sources cache directory")
for package in os.listdir(self.paths.cache):
shutil.rmtree(os.path.join(self.paths.cache, package))
for package in self.paths.cache.iterdir():
shutil.rmtree(package)
def clear_chroot(self) -> None:
"""
clear cache directory. Warning: this method is architecture independent and will clear every chroot
"""
self.logger.info("clear build chroot directory")
for chroot in os.listdir(self.paths.chroot):
shutil.rmtree(os.path.join(self.paths.chroot, chroot))
for chroot in self.paths.chroot.iterdir():
shutil.rmtree(chroot)
def clear_manual(self) -> None:
"""
clear directory with manual package updates
"""
self.logger.info("clear manual packages")
for package in os.listdir(self.paths.manual):
shutil.rmtree(os.path.join(self.paths.manual, package))
for package in self.paths.manual.iterdir():
shutil.rmtree(package)
def clear_packages(self) -> None:
"""
@ -75,4 +75,4 @@ class Cleaner(Properties):
"""
self.logger.info("clear built packages directory")
for package in self.packages_built():
os.remove(package)
package.unlink()

View File

@ -17,9 +17,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import shutil
from pathlib import Path
from typing import Dict, Iterable, List, Optional
from ahriman.core.build_tools.task import Task
@ -41,7 +41,7 @@ class Executor(Cleaner):
"""
raise NotImplementedError
def process_build(self, updates: Iterable[Package]) -> List[str]:
def process_build(self, updates: Iterable[Package]) -> List[Path]:
"""
build packages
:param updates: list of packages properties to build
@ -53,7 +53,7 @@ class Executor(Cleaner):
task.init()
built = task.build()
for src in built:
dst = os.path.join(self.paths.packages, os.path.basename(src))
dst = self.paths.packages / src.name
shutil.move(src, dst)
for package in updates:
@ -67,16 +67,13 @@ class Executor(Cleaner):
return self.packages_built()
def process_remove(self, packages: Iterable[str]) -> str:
def process_remove(self, packages: Iterable[str]) -> Path:
"""
remove packages from list
:param packages: list of package names or bases to rmeove
:param packages: list of package names or bases to remove
:return: path to repository database
"""
def remove_single(package: str, filename: Optional[str]) -> None:
if filename is None:
self.logger.warning(f"could not remove {package} because no filename set")
return
def remove_single(package: str, filename: Path) -> None:
try:
self.repo.remove(package, filename)
except Exception:
@ -86,15 +83,16 @@ class Executor(Cleaner):
for local in self.packages():
if local.base in packages or all(package in requested for package in local.packages):
to_remove = {
package: properties.filename
package: Path(properties.filename)
for package, properties in local.packages.items()
if properties.filename is not None
}
self.reporter.remove(local.base) # we only update status page in case of base removal
elif requested.intersection(local.packages.keys()):
to_remove = {
package: properties.filename
package: Path(properties.filename)
for package, properties in local.packages.items()
if package in requested
if package in requested and properties.filename is not None
}
else:
to_remove = dict()
@ -123,7 +121,7 @@ class Executor(Cleaner):
for target in targets:
Uploader.run(self.architecture, self.config, target, self.paths.repository)
def process_update(self, packages: Iterable[str]) -> str:
def process_update(self, packages: Iterable[Path]) -> Path:
"""
sign packages, add them to repository and update repository database
:param packages: list of filenames to run
@ -134,12 +132,12 @@ class Executor(Cleaner):
self.logger.warning(f"received empty package name for base {base}")
return # suppress type checking, it never can be none actually
# in theory it might be NOT packages directory, but we suppose it is
full_path = os.path.join(self.paths.packages, fn)
full_path = self.paths.packages / fn
files = self.sign.sign_package(full_path, base)
for src in files:
dst = os.path.join(self.paths.repository, os.path.basename(src))
dst = self.paths.repository / src.name
shutil.move(src, dst)
package_path = os.path.join(self.paths.repository, fn)
package_path = self.paths.repository / fn
self.repo.add(package_path)
# we are iterating over bases, not single packages

View File

@ -19,6 +19,8 @@
#
import logging
from pathlib import Path
from ahriman.core.alpm.pacman import Pacman
from ahriman.core.alpm.repo import Repo
from ahriman.core.configuration import Configuration
@ -50,7 +52,7 @@ class Properties:
self.aur_url = config.get("alpm", "aur_url")
self.name = config.get("repository", "name")
self.paths = RepositoryPaths(config.get("repository", "root"), architecture)
self.paths = RepositoryPaths(Path(config.get("repository", "root")), architecture)
self.paths.create_tree()
self.pacman = Pacman(config)

View File

@ -17,8 +17,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from pathlib import Path
from typing import Dict, List
from ahriman.core.repository.executor import Executor
@ -38,24 +37,20 @@ class Repository(Executor, UpdateHandler):
:return: list of packages properties
"""
result: Dict[str, Package] = {}
for fn in os.listdir(self.paths.repository):
if not package_like(fn):
for full_path in self.paths.repository.iterdir():
if not package_like(full_path.name):
continue
full_path = os.path.join(self.paths.repository, fn)
try:
local = Package.load(full_path, self.pacman, self.aur_url)
result.setdefault(local.base, local).packages.update(local.packages)
except Exception:
self.logger.exception(f"could not load package from {fn}", exc_info=True)
self.logger.exception(f"could not load package from {full_path}", exc_info=True)
continue
return list(result.values())
def packages_built(self) -> List[str]:
def packages_built(self) -> List[Path]:
"""
get list of files in built packages directory
:return: list of filenames from the directory
"""
return [
os.path.join(self.paths.packages, fn)
for fn in os.listdir(self.paths.packages)
]
return list(self.paths.packages.iterdir())

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from typing import Iterable, List
from ahriman.core.repository.cleaner import Cleaner
@ -77,9 +75,9 @@ class UpdateHandler(Cleaner):
result: List[Package] = []
known_bases = {package.base for package in self.packages()}
for fn in os.listdir(self.paths.manual):
for fn in self.paths.manual.iterdir():
try:
local = Package.load(os.path.join(self.paths.manual, fn), self.pacman, self.aur_url)
local = Package.load(fn, self.pacman, self.aur_url)
result.append(local)
if local.base not in known_bases:
self.reporter.set_unknown(local)

View File

@ -18,8 +18,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
import os
from pathlib import Path
from typing import List
from ahriman.core.configuration import Configuration
@ -62,16 +62,16 @@ class GPG:
return ["--sign", "--key", self.default_key]
@staticmethod
def sign_cmd(path: str, key: str) -> List[str]:
def sign_cmd(path: Path, key: str) -> List[str]:
"""
gpg command to run
:param path: path to file to sign
:param key: PGP key ID
:return: gpg command with all required arguments
"""
return ["gpg", "-u", key, "-b", path]
return ["gpg", "-u", key, "-b", str(path)]
def process(self, path: str, key: str) -> List[str]:
def process(self, path: Path, key: str) -> List[Path]:
"""
gpg command wrapper
:param path: path to file to sign
@ -80,12 +80,11 @@ class GPG:
"""
GPG._check_output(
*GPG.sign_cmd(path, key),
exception=BuildFailed(path),
cwd=os.path.dirname(path),
exception=BuildFailed(path.name),
logger=self.logger)
return [path, f"{path}.sig"]
return [path, path.parent / f"{path.name}.sig"]
def sign_package(self, path: str, base: str) -> List[str]:
def sign_package(self, path: Path, base: str) -> List[Path]:
"""
sign package if required by configuration
:param path: path to file to sign
@ -97,7 +96,7 @@ class GPG:
key = self.config.get(self.section, f"key_{base}", fallback=self.default_key)
return self.process(path, key)
def sign_repository(self, path: str) -> List[str]:
def sign_repository(self, path: Path) -> List[Path]:
"""
sign repository if required by configuration
:note: more likely you just want to pass `repository_sign_args` to repo wrapper

View File

@ -22,6 +22,7 @@ from __future__ import annotations
import shutil
import tempfile
from pathlib import Path
from typing import Iterable, List, Set
from ahriman.core.build_tools.task import Task
@ -65,7 +66,7 @@ class Leaf:
"""
load dependencies for the leaf
"""
clone_dir = tempfile.mkdtemp()
clone_dir = Path(tempfile.mkdtemp())
try:
Task.fetch(clone_dir, self.package.git_url)
self.dependencies = Package.dependencies(clone_dir)

View File

@ -17,6 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from pathlib import Path
from ahriman.core.configuration import Configuration
from ahriman.core.upload.uploader import Uploader
from ahriman.core.util import check_output
@ -40,11 +42,19 @@ class Rsync(Uploader):
section = config.get_section_name("rsync", architecture)
self.remote = config.get(section, "remote")
def sync(self, path: str) -> None:
def sync(self, path: Path) -> None:
"""
sync data to remote server
:param path: local path to sync
"""
Rsync._check_output("rsync", "--archive", "--verbose", "--compress", "--partial", "--delete", path, self.remote,
exception=None,
logger=self.logger)
Rsync._check_output(
"rsync",
"--archive",
"--verbose",
"--compress",
"--partial",
"--delete",
str(path),
self.remote,
exception=None,
logger=self.logger)

View File

@ -17,6 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from pathlib import Path
from ahriman.core.configuration import Configuration
from ahriman.core.upload.uploader import Uploader
from ahriman.core.util import check_output
@ -40,12 +42,12 @@ class S3(Uploader):
section = config.get_section_name("s3", architecture)
self.bucket = config.get(section, "bucket")
def sync(self, path: str) -> None:
def sync(self, path: Path) -> None:
"""
sync data to remote server
:param path: local path to sync
"""
# TODO rewrite to boto, but it is bullshit
S3._check_output("aws", "s3", "sync", "--quiet", "--delete", path, self.bucket,
S3._check_output("aws", "s3", "sync", "--quiet", "--delete", str(path), self.bucket,
exception=None,
logger=self.logger)

View File

@ -19,6 +19,8 @@
#
import logging
from pathlib import Path
from ahriman.core.configuration import Configuration
from ahriman.core.exceptions import SyncFailed
from ahriman.models.upload_settings import UploadSettings
@ -43,7 +45,7 @@ class Uploader:
self.config = config
@staticmethod
def run(architecture: str, config: Configuration, target: str, path: str) -> None:
def run(architecture: str, config: Configuration, target: str, path: Path) -> None:
"""
run remote sync
:param architecture: repository architecture
@ -67,7 +69,7 @@ class Uploader:
uploader.logger.exception("remote sync failed", exc_info=True)
raise SyncFailed()
def sync(self, path: str) -> None:
def sync(self, path: Path) -> None:
"""
sync data to remote server
:param path: local path to sync

View File

@ -21,13 +21,14 @@ import datetime
import subprocess
from logging import Logger
from pathlib import Path
from typing import Optional
from ahriman.core.exceptions import InvalidOption
def check_output(*args: str, exception: Optional[Exception],
cwd: Optional[str] = None, stderr: int = subprocess.STDOUT,
cwd: Optional[Path] = None, stderr: int = subprocess.STDOUT,
logger: Optional[Logger] = None) -> str:
"""
subprocess wrapper

View File

@ -19,8 +19,8 @@
#
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from ahriman.core.configuration import Configuration
@ -54,11 +54,11 @@ class Watcher:
self.status = BuildStatus()
@property
def cache_path(self) -> str:
def cache_path(self) -> Path:
"""
:return: path to dump with json cache
"""
return os.path.join(self.repository.paths.root, "status_cache.json")
return self.repository.paths.root / "status_cache.json"
@property
def packages(self) -> List[Tuple[Package, BuildStatus]]:
@ -77,9 +77,9 @@ class Watcher:
if package.base in self.known:
self.known[package.base] = (package, status)
if not os.path.isfile(self.cache_path):
if not self.cache_path.is_file():
return
with open(self.cache_path) as cache:
with self.cache_path.open() as cache:
dump = json.load(cache)
for item in dump["packages"]:
try:
@ -100,7 +100,7 @@ class Watcher:
]
}
try:
with open(self.cache_path, "w") as cache:
with self.cache_path.open("w") as cache:
json.dump(dump, cache)
except Exception:
self.logger.exception("cannot dump cache", exc_info=True)

View File

@ -21,12 +21,12 @@ from __future__ import annotations
import aur # type: ignore
import logging
import os
from dataclasses import asdict, dataclass
from pathlib import Path
from pyalpm import vercmp # type: ignore
from srcinfo.parse import parse_srcinfo # type: ignore
from typing import Any, Dict, List, Optional, Set, Type
from typing import Any, Dict, List, Optional, Set, Type, Union
from ahriman.core.alpm.pacman import Pacman
from ahriman.core.exceptions import InvalidPackageInfo
@ -86,7 +86,7 @@ class Package:
return f"{self.aur_url}/packages/{self.base}"
@classmethod
def from_archive(cls: Type[Package], path: str, pacman: Pacman, aur_url: str) -> Package:
def from_archive(cls: Type[Package], path: Path, pacman: Pacman, aur_url: str) -> Package:
"""
construct package properties from package archive
:param path: path to package archive
@ -94,8 +94,8 @@ class Package:
:param aur_url: AUR root url
:return: package properties
"""
package = pacman.handle.load_pkg(path)
properties = PackageDescription(package.size, package.builddate, os.path.basename(path), package.isize)
package = pacman.handle.load_pkg(str(path))
properties = PackageDescription(package.size, package.builddate, path.name, package.isize)
return cls(package.base, package.version, aur_url, {package.name: properties})
@classmethod
@ -110,15 +110,14 @@ class Package:
return cls(package.package_base, package.version, aur_url, {package.name: PackageDescription()})
@classmethod
def from_build(cls: Type[Package], path: str, aur_url: str) -> Package:
def from_build(cls: Type[Package], path: Path, aur_url: str) -> Package:
"""
construct package properties from sources directory
:param path: path to package sources directory
:param aur_url: AUR root url
:return: package properties
"""
with open(os.path.join(path, ".SRCINFO")) as srcinfo_file:
srcinfo, errors = parse_srcinfo(srcinfo_file.read())
srcinfo, errors = parse_srcinfo((path / ".SRCINFO").read_text())
if errors:
raise InvalidPackageInfo(errors)
packages = {key: PackageDescription() for key in srcinfo["packages"]}
@ -144,14 +143,13 @@ class Package:
packages=packages)
@staticmethod
def dependencies(path: str) -> Set[str]:
def dependencies(path: Path) -> Set[str]:
"""
load dependencies from package sources
:param path: path to package sources directory
:return: list of package dependencies including makedepends array, but excluding packages from this base
"""
with open(os.path.join(path, ".SRCINFO")) as srcinfo_file:
srcinfo, errors = parse_srcinfo(srcinfo_file.read())
srcinfo, errors = parse_srcinfo((path / ".SRCINFO").read_text())
if errors:
raise InvalidPackageInfo(errors)
makedepends = srcinfo.get("makedepends", [])
@ -176,7 +174,7 @@ class Package:
return f"{prefix}{pkgver}-{pkgrel}"
@staticmethod
def load(path: str, pacman: Pacman, aur_url: str) -> Package:
def load(path: Union[Path, str], pacman: Pacman, aur_url: str) -> Package:
"""
package constructor from available sources
:param path: one of path to sources directory, path to archive or package name/base
@ -185,12 +183,13 @@ class Package:
:return: package properties
"""
try:
if os.path.isdir(path):
package: Package = Package.from_build(path, aur_url)
elif os.path.exists(path):
package = Package.from_archive(path, pacman, aur_url)
maybe_path = Path(path)
if maybe_path.is_dir():
package: Package = Package.from_build(maybe_path, aur_url)
elif maybe_path.is_file():
package = Package.from_archive(maybe_path, pacman, aur_url)
else:
package = Package.from_aur(path, aur_url)
package = Package.from_aur(str(path), aur_url)
return package
except InvalidPackageInfo:
raise
@ -208,7 +207,7 @@ class Package:
from ahriman.core.build_tools.task import Task
clone_dir = os.path.join(paths.cache, self.base)
clone_dir = paths.cache / self.base
logger = logging.getLogger("build_details")
Task.fetch(clone_dir, self.git_url)

View File

@ -17,7 +17,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from pathlib import Path
from dataclasses import dataclass
@ -30,59 +30,59 @@ class RepositoryPaths:
:ivar architecture: repository architecture
"""
root: str
root: Path
architecture: str
@property
def cache(self) -> str:
def cache(self) -> Path:
"""
:return: directory for packages cache (mainly used for VCS packages)
"""
return os.path.join(self.root, "cache")
return self.root / "cache"
@property
def chroot(self) -> str:
def chroot(self) -> Path:
"""
:return: directory for devtools chroot
"""
# for the chroot directory devtools will create own tree and we don"t have to specify architecture here
return os.path.join(self.root, "chroot")
return self.root / "chroot"
@property
def manual(self) -> str:
def manual(self) -> Path:
"""
:return: directory for manual updates (i.e. from add command)
"""
return os.path.join(self.root, "manual", self.architecture)
return self.root / "manual" / self.architecture
@property
def packages(self) -> str:
def packages(self) -> Path:
"""
:return: directory for built packages
"""
return os.path.join(self.root, "packages", self.architecture)
return self.root / "packages" / self.architecture
@property
def repository(self) -> str:
def repository(self) -> Path:
"""
:return: repository directory
"""
return os.path.join(self.root, "repository", self.architecture)
return self.root / "repository" / self.architecture
@property
def sources(self) -> str:
def sources(self) -> Path:
"""
:return: directory for downloaded PKGBUILDs for current build
"""
return os.path.join(self.root, "sources", self.architecture)
return self.root / "sources" / self.architecture
def create_tree(self) -> None:
"""
create ahriman working tree
"""
os.makedirs(self.cache, mode=0o755, exist_ok=True)
os.makedirs(self.chroot, mode=0o755, exist_ok=True)
os.makedirs(self.manual, mode=0o755, exist_ok=True)
os.makedirs(self.packages, mode=0o755, exist_ok=True)
os.makedirs(self.repository, mode=0o755, exist_ok=True)
os.makedirs(self.sources, mode=0o755, exist_ok=True)
self.cache.mkdir(mode=0o755, parents=True, exist_ok=True)
self.chroot.mkdir(mode=0o755, parents=True, exist_ok=True)
self.manual.mkdir(mode=0o755, parents=True, exist_ok=True)
self.packages.mkdir(mode=0o755, parents=True, exist_ok=True)
self.repository.mkdir(mode=0o755, parents=True, exist_ok=True)
self.sources.mkdir(mode=0o755, parents=True, exist_ok=True)