mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-04-24 15:27:17 +00:00
verbose subprocess exception handle annd quite git
This commit is contained in:
parent
6530afbfc7
commit
598af7d9db
@ -71,7 +71,7 @@ class Repo(LazyLogging):
|
||||
"""
|
||||
Repo._check_output(
|
||||
"repo-add", *self.sign_args, "-R", str(self.repo_path), str(path),
|
||||
exception=BuildError(path.name),
|
||||
exception=BuildError.from_process(path.name),
|
||||
cwd=self.paths.repository,
|
||||
logger=self.logger,
|
||||
user=self.uid)
|
||||
@ -98,7 +98,7 @@ class Repo(LazyLogging):
|
||||
# remove package from registry
|
||||
Repo._check_output(
|
||||
"repo-remove", *self.sign_args, str(self.repo_path), package,
|
||||
exception=BuildError(package),
|
||||
exception=BuildError.from_process(package),
|
||||
cwd=self.paths.repository,
|
||||
logger=self.logger,
|
||||
user=self.uid)
|
||||
|
@ -82,10 +82,11 @@ class Sources(LazyLogging):
|
||||
branch = remote.branch or instance.DEFAULT_BRANCH
|
||||
if is_initialized_git:
|
||||
instance.logger.info("update HEAD to remote at %s using branch %s", sources_dir, branch)
|
||||
Sources._check_output("git", "fetch", "origin", branch, cwd=sources_dir, logger=instance.logger)
|
||||
Sources._check_output("git", "fetch", "--quiet", "origin", branch,
|
||||
cwd=sources_dir, logger=instance.logger)
|
||||
elif remote.git_url is not None:
|
||||
instance.logger.info("clone remote %s to %s using branch %s", remote.git_url, sources_dir, branch)
|
||||
Sources._check_output("git", "clone", "--branch", branch, "--single-branch",
|
||||
Sources._check_output("git", "clone", "--quiet", "--branch", branch, "--single-branch",
|
||||
remote.git_url, str(sources_dir), cwd=sources_dir.parent, logger=instance.logger)
|
||||
else:
|
||||
# it will cause an exception later
|
||||
@ -93,7 +94,8 @@ class Sources(LazyLogging):
|
||||
|
||||
# and now force reset to our branch
|
||||
Sources._check_output("git", "checkout", "--force", branch, cwd=sources_dir, logger=instance.logger)
|
||||
Sources._check_output("git", "reset", "--hard", f"origin/{branch}", cwd=sources_dir, logger=instance.logger)
|
||||
Sources._check_output("git", "reset", "--quiet", "--hard", f"origin/{branch}",
|
||||
cwd=sources_dir, logger=instance.logger)
|
||||
|
||||
# move content if required
|
||||
# we are using full path to source directory in order to make append possible
|
||||
@ -126,7 +128,7 @@ class Sources(LazyLogging):
|
||||
instance = Sources()
|
||||
if not (sources_dir / ".git").is_dir():
|
||||
# skip initializing in case if it was already
|
||||
Sources._check_output("git", "init", "--initial-branch", instance.DEFAULT_BRANCH,
|
||||
Sources._check_output("git", "init", "--quiet", "--initial-branch", instance.DEFAULT_BRANCH,
|
||||
cwd=sources_dir, logger=instance.logger)
|
||||
|
||||
# extract local files...
|
||||
@ -191,7 +193,7 @@ class Sources(LazyLogging):
|
||||
return # no changes to push, just skip action
|
||||
|
||||
git_url, branch = remote.git_source()
|
||||
Sources._check_output("git", "push", git_url, branch, cwd=sources_dir, logger=instance.logger)
|
||||
Sources._check_output("git", "push", "--quiet", git_url, branch, cwd=sources_dir, logger=instance.logger)
|
||||
|
||||
def add(self, sources_dir: Path, *pattern: str, intent_to_add: bool = False) -> None:
|
||||
"""
|
||||
@ -243,7 +245,8 @@ class Sources(LazyLogging):
|
||||
environment["GIT_AUTHOR_NAME"] = environment["GIT_COMMITTER_NAME"] = user
|
||||
environment["GIT_AUTHOR_EMAIL"] = environment["GIT_COMMITTER_EMAIL"] = email
|
||||
|
||||
Sources._check_output("git", "commit", *args, cwd=sources_dir, logger=self.logger, environment=environment)
|
||||
Sources._check_output("git", "commit", "--quiet", *args,
|
||||
cwd=sources_dir, logger=self.logger, environment=environment)
|
||||
|
||||
return True
|
||||
|
||||
|
@ -92,7 +92,7 @@ class Task(LazyLogging):
|
||||
|
||||
Task._check_output(
|
||||
*command,
|
||||
exception=BuildError(self.package.base),
|
||||
exception=BuildError.from_process(self.package.base),
|
||||
cwd=sources_dir,
|
||||
logger=self.logger,
|
||||
user=self.uid,
|
||||
@ -101,7 +101,7 @@ class Task(LazyLogging):
|
||||
# well it is not actually correct, but we can deal with it
|
||||
packages = Task._check_output(
|
||||
"makepkg", "--packagelist",
|
||||
exception=BuildError(self.package.base),
|
||||
exception=BuildError.from_process(self.package.base),
|
||||
cwd=sources_dir,
|
||||
logger=self.logger
|
||||
).splitlines()
|
||||
|
@ -17,8 +17,11 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import subprocess
|
||||
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Any, Self
|
||||
|
||||
|
||||
class BuildError(RuntimeError):
|
||||
@ -26,14 +29,61 @@ class BuildError(RuntimeError):
|
||||
base exception for failed builds
|
||||
"""
|
||||
|
||||
def __init__(self, package_base: str) -> None:
|
||||
def __init__(self, package_base: str, stderr: str | None = None) -> None:
|
||||
"""
|
||||
default constructor
|
||||
|
||||
Args:
|
||||
package_base(str): package base raised exception
|
||||
stderr(str | None, optional): stderr of the process if available (Default value = None)
|
||||
"""
|
||||
RuntimeError.__init__(self, f"Package {package_base} build failed, check logs for details")
|
||||
message = f"Package {package_base} build failed,\n"
|
||||
if stderr is not None:
|
||||
message += f"process stderr:\n{stderr}\n"
|
||||
message += "check logs for details"
|
||||
|
||||
RuntimeError.__init__(self, message)
|
||||
|
||||
@classmethod
|
||||
def from_process(cls, package_base: str) -> Callable[[int, list[str], str, str], Self]:
|
||||
"""
|
||||
generate exception callable from process error
|
||||
|
||||
Args:
|
||||
package_base(str): package base raised exception
|
||||
|
||||
Returns:
|
||||
Callable[[int, list[str], str, str], Self]: exception generator to be passed to subprocess utils
|
||||
"""
|
||||
return lambda code, process, stdout, stderr: cls(package_base, stderr)
|
||||
|
||||
|
||||
class CalledProcessError(subprocess.CalledProcessError):
|
||||
"""
|
||||
like ``subprocess.CalledProcessError``, but better
|
||||
"""
|
||||
|
||||
def __init__(self, status_code: int, process: list[str], stderr: str) -> None:
|
||||
"""
|
||||
default constructor
|
||||
|
||||
Args:
|
||||
status_code(int): process return code
|
||||
process(list[str]): process argument list
|
||||
stderr(str): stderr of the process
|
||||
"""
|
||||
subprocess.CalledProcessError.__init__(self, status_code, process, stderr=stderr)
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
string representation of the exception
|
||||
|
||||
Returns:
|
||||
str: string view of the exception
|
||||
"""
|
||||
return f"""{subprocess.CalledProcessError.__str__(self)}
|
||||
Process stderr:
|
||||
{self.stderr}"""
|
||||
|
||||
|
||||
class DuplicateRunError(RuntimeError):
|
||||
|
@ -190,7 +190,7 @@ class GPG(LazyLogging):
|
||||
"""
|
||||
GPG._check_output(
|
||||
*GPG.sign_command(path, key),
|
||||
exception=BuildError(path.name),
|
||||
exception=BuildError.from_process(path.name),
|
||||
logger=self.logger)
|
||||
return [path, self.signature(path)]
|
||||
|
||||
|
@ -35,7 +35,7 @@ from pathlib import Path
|
||||
from pwd import getpwuid
|
||||
from typing import Any, IO, TypeVar
|
||||
|
||||
from ahriman.core.exceptions import OptionError, UnsafeRunError
|
||||
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
|
||||
|
||||
@ -65,7 +65,9 @@ __all__ = [
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def check_output(*args: str, exception: Exception | None = None, cwd: Path | None = None, input_data: str | None = None,
|
||||
# pylint: disable=too-many-locals
|
||||
def check_output(*args: str, exception: Exception | Callable[[int, list[str], str, str], Exception] | None = None,
|
||||
cwd: Path | None = None, input_data: str | None = None,
|
||||
logger: logging.Logger | None = None, user: int | None = None,
|
||||
environment: dict[str, str] | None = None) -> str:
|
||||
"""
|
||||
@ -73,8 +75,9 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
|
||||
|
||||
Args:
|
||||
*args(str): command line arguments
|
||||
exception(Exception | None, optional): exception which has to be reraised instead of default subprocess
|
||||
exception (Default value = None)
|
||||
exception(Exception | Callable[[int, list[str], str, str]] | None, optional): exception which has to be raised
|
||||
instead of default subprocess exception. If callable us is supplied, the ``subprocess.CalledProcessError``
|
||||
arguments will be passed (Default value = None)
|
||||
cwd(Path | None, optional): current working directory (Default value = None)
|
||||
input_data(str | None, optional): data which will be written to command stdin (Default value = None)
|
||||
logger(logging.Logger | None, optional): logger to log command result if required (Default value = None)
|
||||
@ -85,7 +88,7 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
|
||||
str: command output
|
||||
|
||||
Raises:
|
||||
subprocess.CalledProcessError: if subprocess ended with status code different from 0 and no exception supplied
|
||||
CalledProcessError: if subprocess ended with status code different from 0 and no exception supplied
|
||||
|
||||
Examples:
|
||||
Simply call the function::
|
||||
@ -110,7 +113,7 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
|
||||
return channel if channel is not None else io.StringIO()
|
||||
|
||||
# wrapper around selectors polling
|
||||
def poll(sel: selectors.BaseSelector) -> Generator[str, None, None]:
|
||||
def poll(sel: selectors.BaseSelector) -> Generator[tuple[str, str], None, None]:
|
||||
for key, _ in sel.select(): # we don't need to check mask here because we have only subscribed on reading
|
||||
line = key.fileobj.readline() # type: ignore[union-attr]
|
||||
if not line: # in case of empty line we remove selector as there is no data here anymore
|
||||
@ -121,8 +124,7 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
|
||||
if logger is not None:
|
||||
logger.debug(line)
|
||||
|
||||
if key.data == "stdout":
|
||||
yield line # yield only stdout data
|
||||
yield key.data, line
|
||||
|
||||
environment = environment or {}
|
||||
if user is not None:
|
||||
@ -138,17 +140,26 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
|
||||
selector.register(get_io(process, "stdout"), selectors.EVENT_READ, data="stdout")
|
||||
selector.register(get_io(process, "stderr"), selectors.EVENT_READ, data="stderr")
|
||||
|
||||
result: list[str] = []
|
||||
result: dict[str, list[str]] = {
|
||||
"stdout": [],
|
||||
"stderr": [],
|
||||
}
|
||||
while selector.get_map(): # while there are unread selectors, keep reading
|
||||
result.extend(poll(selector))
|
||||
for key_data, output in poll(selector):
|
||||
result[key_data].append(output)
|
||||
|
||||
stdout = "\n".join(result["stdout"]).rstrip("\n") # remove newline at the end of any
|
||||
stderr = "\n".join(result["stderr"]).rstrip("\n")
|
||||
|
||||
status_code = process.wait()
|
||||
if status_code != 0:
|
||||
if exception is not None:
|
||||
if isinstance(exception, Exception):
|
||||
raise exception
|
||||
raise subprocess.CalledProcessError(status_code, process.args)
|
||||
if callable(exception):
|
||||
raise exception(status_code, list(args), stdout, stderr)
|
||||
raise CalledProcessError(status_code, list(args), stderr)
|
||||
|
||||
return "\n".join(result).rstrip("\n") # remove newline at the end of any
|
||||
return stdout
|
||||
|
||||
|
||||
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
|
||||
|
@ -56,9 +56,9 @@ def test_fetch_existing(remote_source: RemoteSource, mocker: MockerFixture) -> N
|
||||
local = Path("local")
|
||||
Sources.fetch(local, remote_source)
|
||||
check_output_mock.assert_has_calls([
|
||||
MockCall("git", "fetch", "origin", remote_source.branch, cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||
MockCall("git", "fetch", "--quiet", "origin", remote_source.branch, cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||
MockCall("git", "checkout", "--force", remote_source.branch, cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||
MockCall("git", "reset", "--hard", f"origin/{remote_source.branch}",
|
||||
MockCall("git", "reset", "--quiet", "--hard", f"origin/{remote_source.branch}",
|
||||
cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||
])
|
||||
move_mock.assert_called_once_with(local.resolve(), local)
|
||||
@ -75,10 +75,10 @@ def test_fetch_new(remote_source: RemoteSource, mocker: MockerFixture) -> None:
|
||||
local = Path("local")
|
||||
Sources.fetch(local, remote_source)
|
||||
check_output_mock.assert_has_calls([
|
||||
MockCall("git", "clone", "--branch", remote_source.branch, "--single-branch",
|
||||
MockCall("git", "clone", "--quiet", "--branch", remote_source.branch, "--single-branch",
|
||||
remote_source.git_url, str(local), cwd=local.parent, logger=pytest.helpers.anyvar(int)),
|
||||
MockCall("git", "checkout", "--force", remote_source.branch, cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||
MockCall("git", "reset", "--hard", f"origin/{remote_source.branch}",
|
||||
MockCall("git", "reset", "--quiet", "--hard", f"origin/{remote_source.branch}",
|
||||
cwd=local, logger=pytest.helpers.anyvar(int))
|
||||
])
|
||||
move_mock.assert_called_once_with(local.resolve(), local)
|
||||
@ -96,7 +96,7 @@ def test_fetch_new_without_remote(mocker: MockerFixture) -> None:
|
||||
Sources.fetch(local, RemoteSource(source=PackageSource.Archive))
|
||||
check_output_mock.assert_has_calls([
|
||||
MockCall("git", "checkout", "--force", Sources.DEFAULT_BRANCH, cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||
MockCall("git", "reset", "--hard", f"origin/{Sources.DEFAULT_BRANCH}",
|
||||
MockCall("git", "reset", "--quiet", "--hard", f"origin/{Sources.DEFAULT_BRANCH}",
|
||||
cwd=local, logger=pytest.helpers.anyvar(int))
|
||||
])
|
||||
move_mock.assert_called_once_with(local.resolve(), local)
|
||||
@ -144,7 +144,7 @@ def test_init(mocker: MockerFixture) -> None:
|
||||
|
||||
local = Path("local")
|
||||
Sources.init(local)
|
||||
check_output_mock.assert_called_once_with("git", "init", "--initial-branch", Sources.DEFAULT_BRANCH,
|
||||
check_output_mock.assert_called_once_with("git", "init", "--quiet", "--initial-branch", Sources.DEFAULT_BRANCH,
|
||||
cwd=local, logger=pytest.helpers.anyvar(int))
|
||||
add_mock.assert_called_once_with(local, "PKGBUILD", ".SRCINFO", "local")
|
||||
commit_mock.assert_called_once_with(local)
|
||||
@ -241,7 +241,7 @@ def test_push(package_ahriman: Package, mocker: MockerFixture) -> None:
|
||||
add_mock.assert_called_once_with(local, "glob")
|
||||
commit_mock.assert_called_once_with(local, commit_author=commit_author)
|
||||
check_output_mock.assert_called_once_with(
|
||||
"git", "push", package_ahriman.remote.git_url, package_ahriman.remote.branch,
|
||||
"git", "push", "--quiet", package_ahriman.remote.git_url, package_ahriman.remote.branch,
|
||||
cwd=local, logger=pytest.helpers.anyvar(int))
|
||||
|
||||
|
||||
@ -310,7 +310,7 @@ def test_commit(sources: Sources, mocker: MockerFixture) -> None:
|
||||
user, email = sources.DEFAULT_COMMIT_AUTHOR
|
||||
assert sources.commit(local, message=message)
|
||||
check_output_mock.assert_called_once_with(
|
||||
"git", "commit", "--message", message,
|
||||
"git", "commit", "--quiet", "--message", message,
|
||||
cwd=local, logger=pytest.helpers.anyvar(int), environment={
|
||||
"GIT_AUTHOR_NAME": user,
|
||||
"GIT_AUTHOR_EMAIL": email,
|
||||
@ -343,7 +343,7 @@ def test_commit_author(sources: Sources, mocker: MockerFixture) -> None:
|
||||
user, email = author = ("commit author", "user@host")
|
||||
assert sources.commit(Path("local"), message=message, commit_author=author)
|
||||
check_output_mock.assert_called_once_with(
|
||||
"git", "commit", "--message", message,
|
||||
"git", "commit", "--quiet", "--message", message,
|
||||
cwd=local, logger=pytest.helpers.anyvar(int), environment={
|
||||
"GIT_AUTHOR_NAME": user,
|
||||
"GIT_AUTHOR_EMAIL": email,
|
||||
@ -364,7 +364,7 @@ def test_commit_autogenerated_message(sources: Sources, mocker: MockerFixture) -
|
||||
assert sources.commit(Path("local"))
|
||||
user, email = sources.DEFAULT_COMMIT_AUTHOR
|
||||
check_output_mock.assert_called_once_with(
|
||||
"git", "commit", "--message", pytest.helpers.anyvar(str, strict=True),
|
||||
"git", "commit", "--quiet", "--message", pytest.helpers.anyvar(str, strict=True),
|
||||
cwd=local, logger=pytest.helpers.anyvar(int), environment={
|
||||
"GIT_AUTHOR_NAME": user,
|
||||
"GIT_AUTHOR_EMAIL": email,
|
||||
|
@ -0,0 +1,19 @@
|
||||
from ahriman.core.exceptions import BuildError, CalledProcessError
|
||||
|
||||
|
||||
def test_from_process() -> None:
|
||||
"""
|
||||
must correctly generate exception instance from subprocess
|
||||
"""
|
||||
instance = BuildError.from_process("ahriman")(0, [], "out", "err")
|
||||
assert isinstance(instance, BuildError)
|
||||
assert instance.args == ("Package ahriman build failed,\nprocess stderr:\nerr\ncheck logs for details",)
|
||||
|
||||
|
||||
def test_str() -> None:
|
||||
"""
|
||||
must correctly transform CalledProcessError to string
|
||||
"""
|
||||
instance = CalledProcessError(1, ["cmd"], "error")
|
||||
message = "Command '['cmd']' returned non-zero exit status 1.\nProcess stderr:\nerror"
|
||||
assert str(instance) == message
|
@ -3,14 +3,13 @@ import logging
|
||||
import os
|
||||
import pytest
|
||||
import requests
|
||||
import subprocess
|
||||
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, call as MockCall
|
||||
|
||||
from ahriman.core.exceptions import BuildError, OptionError, UnsafeRunError
|
||||
from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError
|
||||
from ahriman.core.util import check_output, check_user, dataclass_view, enum_values, exception_response_text, \
|
||||
extract_user, filter_json, full_version, package_like, parse_version, partition, pretty_datetime, pretty_size, \
|
||||
safe_filename, srcinfo_property, srcinfo_property_list, trim_package, utcnow, walk
|
||||
@ -107,10 +106,10 @@ def test_check_output_failure(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
mocker.patch("subprocess.Popen.wait", return_value=1)
|
||||
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
with pytest.raises(CalledProcessError):
|
||||
check_output("echo", "hello")
|
||||
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
with pytest.raises(CalledProcessError):
|
||||
check_output("echo", "hello", logger=logging.getLogger(""))
|
||||
|
||||
|
||||
@ -128,6 +127,20 @@ def test_check_output_failure_exception(mocker: MockerFixture) -> None:
|
||||
check_output("echo", "hello", exception=exception, logger=logging.getLogger(""))
|
||||
|
||||
|
||||
def test_check_output_failure_exception_callable(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must raise exception from callable provided instead of default
|
||||
"""
|
||||
mocker.patch("subprocess.Popen.wait", return_value=1)
|
||||
exception = BuildError.from_process("")
|
||||
|
||||
with pytest.raises(BuildError):
|
||||
check_output("echo", "hello", exception=exception)
|
||||
|
||||
with pytest.raises(BuildError):
|
||||
check_output("echo", "hello", exception=exception, logger=logging.getLogger(""))
|
||||
|
||||
|
||||
def test_check_output_empty_line(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly process empty lines in command output
|
||||
|
Loading…
Reference in New Issue
Block a user