diff --git a/src/ahriman/core/sign/gpg.py b/src/ahriman/core/sign/gpg.py index f3e68d4f..d67f1d93 100644 --- a/src/ahriman/core/sign/gpg.py +++ b/src/ahriman/core/sign/gpg.py @@ -116,7 +116,6 @@ class GPG: """ key_body = self.key_download(server, key) GPG._check_output("gpg", "--import", input_data=key_body, exception=None, logger=self.logger) - GPG._check_output("gpg", "--quick-lsign-key", key, exception=None, logger=self.logger) def process(self, path: Path, key: str) -> List[Path]: """ diff --git a/src/ahriman/core/util.py b/src/ahriman/core/util.py index 11b0aa26..7021a251 100644 --- a/src/ahriman/core/util.py +++ b/src/ahriman/core/util.py @@ -27,7 +27,7 @@ import tempfile from contextlib import contextmanager from logging import Logger from pathlib import Path -from typing import Any, Dict, Generator, Iterable, Optional, Union +from typing import Any, Dict, Generator, Iterable, List, Optional, Union from ahriman.core.exceptions import InvalidOption, UnsafeRun from ahriman.models.repository_paths import RepositoryPaths @@ -45,21 +45,39 @@ def check_output(*args: str, exception: Optional[Exception], cwd: Optional[Path] :param user: run process as specified user :return: command output """ - try: - # universal_newlines is required to read input from string - # FIXME additional workaround for linter and type check which do not know that user arg is supported - # pylint: disable=unexpected-keyword-arg - result: str = subprocess.check_output(args, cwd=cwd, input=input_data, stderr=subprocess.STDOUT, - universal_newlines=True, user=user).strip() # type: ignore + def log(single: str) -> None: if logger is not None: - for line in result.splitlines(): - logger.debug(line) - return result - except subprocess.CalledProcessError as e: - if e.output is not None and logger is not None: - for line in e.output.splitlines(): - logger.debug(line) - raise exception or e + logger.debug(single) + + # FIXME additional workaround for linter and type check which do not know that user arg is supported + # pylint: disable=unexpected-keyword-arg + with subprocess.Popen(args, cwd=cwd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + user=user, text=True, encoding="utf8", bufsize=1) as process: # type: ignore + if input_data is not None: + process.stdin.write(input_data) + process.stdin.close() + + # read stdout and append to output result + result: List[str] = [] + for line in iter(process.stdout.readline, ""): + line = line.strip() + if not line: # skip empty lines + continue + result.append(line) + log(line) + + # read stderr and write info to logs + for line in iter(process.stderr.readline, ""): + log(line.strip()) + + process.terminate() # make sure that process is terminated + status_code = process.wait() + if status_code != 0: + if exception is not None: + raise exception + raise subprocess.CalledProcessError(status_code, process.args) + + return "\n".join(result) def check_user(paths: RepositoryPaths, unsafe: bool) -> None: diff --git a/tests/ahriman/core/sign/test_gpg.py b/tests/ahriman/core/sign/test_gpg.py index b9840769..41a83440 100644 --- a/tests/ahriman/core/sign/test_gpg.py +++ b/tests/ahriman/core/sign/test_gpg.py @@ -3,7 +3,6 @@ import requests from pathlib import Path from pytest_mock import MockerFixture -from unittest import mock from ahriman.core.sign.gpg import GPG from ahriman.models.sign_settings import SignSettings @@ -91,10 +90,8 @@ def test_key_import(gpg: GPG, mocker: MockerFixture) -> None: check_output_mock = mocker.patch("ahriman.core.sign.gpg.GPG._check_output") gpg.key_import("pgp.mit.edu", "0xE989490C") - check_output_mock.assert_has_calls([ - mock.call("gpg", "--import", input_data="key", exception=None, logger=pytest.helpers.anyvar(int)), - mock.call("gpg", "--quick-lsign-key", "0xE989490C", exception=None, logger=pytest.helpers.anyvar(int)) - ]) + check_output_mock.assert_called_once_with( + "gpg", "--import", input_data="key", exception=None, logger=pytest.helpers.anyvar(int)) def test_process(gpg_with_key: GPG, mocker: MockerFixture) -> None: diff --git a/tests/ahriman/core/test_util.py b/tests/ahriman/core/test_util.py index b38e44d2..a748b4c9 100644 --- a/tests/ahriman/core/test_util.py +++ b/tests/ahriman/core/test_util.py @@ -1,13 +1,16 @@ import datetime import logging import pytest +import requests import subprocess from pathlib import Path from pytest_mock import MockerFixture +from unittest.mock import MagicMock -from ahriman.core.exceptions import InvalidOption, UnsafeRun -from ahriman.core.util import check_output, check_user, filter_json, package_like, pretty_datetime, pretty_size, tmpdir, walk +from ahriman.core.exceptions import BuildFailed, InvalidOption, UnsafeRun +from ahriman.core.util import check_output, check_user, exception_response_text, filter_json, package_like, \ + pretty_datetime, pretty_size, tmpdir, walk from ahriman.models.package import Package from ahriman.models.repository_paths import RepositoryPaths @@ -25,32 +28,77 @@ def test_check_output(mocker: MockerFixture) -> None: logger_mock.assert_called_once_with("hello") +def test_check_output_stderr(mocker: MockerFixture) -> None: + """ + must run command and log stderr output + """ + logger_mock = mocker.patch("logging.Logger.debug") + + assert check_output("python", "-c", """import sys; print("hello", file=sys.stderr)""", exception=None) == "" + logger_mock.assert_not_called() + + assert check_output("python", "-c", """import sys; print("hello", file=sys.stderr)""", + exception=None, logger=logging.getLogger("")) == "" + logger_mock.assert_called_once_with("hello") + + +def test_check_output_with_stdin() -> None: + """ + must run command and put string to stdin + """ + assert check_output("python", "-c", "import sys; value = sys.stdin.read(); print(value)", + exception=None, input_data="single line") == "single line" + + +def test_check_output_with_stdin_newline() -> None: + """ + must run command and put string to stdin ending with new line + """ + assert check_output("python", "-c", "import sys; value = sys.stdin.read(); print(value)", + exception=None, input_data="single line\n") == "single line" + + +def test_check_output_multiple_with_stdin() -> None: + """ + must run command and put multiple lines to stdin + """ + assert check_output("python", "-c", "import sys; value = sys.stdin.read(); print(value)", + exception=None, input_data="multiple\nlines") == "multiple\nlines" + + +def test_check_output_multiple_with_stdin_newline() -> None: + """ + must run command and put multiple lines to stdin with new line at the end + """ + assert check_output("python", "-c", "import sys; value = sys.stdin.read(); print(value)", + exception=None, input_data="multiple\nlines\n") == "multiple\nlines" + + def test_check_output_failure(mocker: MockerFixture) -> None: """ must process exception correctly """ - logger_mock = mocker.patch("logging.Logger.debug") - mocker.patch("subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "echo")) + mocker.patch("subprocess.Popen.wait", return_value=1) with pytest.raises(subprocess.CalledProcessError): check_output("echo", "hello", exception=None) - logger_mock.assert_not_called() with pytest.raises(subprocess.CalledProcessError): check_output("echo", "hello", exception=None, logger=logging.getLogger("")) - logger_mock.assert_not_called() -def test_check_output_failure_log(mocker: MockerFixture) -> None: +def test_check_output_failure_exception(mocker: MockerFixture) -> None: """ - must process exception correctly and log it + must raise exception provided instead of default """ - logger_mock = mocker.patch("logging.Logger.debug") - mocker.patch("subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "echo", output=b"result")) + mocker.patch("subprocess.Popen.wait", return_value=1) + exception = BuildFailed("") - with pytest.raises(subprocess.CalledProcessError): - check_output("echo", "hello", exception=None, logger=logging.getLogger("")) - logger_mock.assert_called_once_with() + with pytest.raises(BuildFailed): + check_output("echo", "hello", exception=exception) + + with pytest.raises(BuildFailed): + check_output("echo", "hello", exception=exception, logger=logging.getLogger("")) def test_check_user(mocker: MockerFixture) -> None: @@ -81,6 +129,25 @@ def test_check_user_exception(mocker: MockerFixture) -> None: check_user(paths, False) +def test_exception_response_text() -> None: + """ + must parse HTTP response to string + """ + response_mock = MagicMock() + response_mock.text = "hello" + exception = requests.exceptions.HTTPError(response=response_mock) + + assert exception_response_text(exception) == "hello" + + +def test_exception_response_text_empty() -> None: + """ + must parse HTTP exception with empty response to empty string + """ + exception = requests.exceptions.HTTPError(response=None) + assert exception_response_text(exception) == "" + + def test_check_unsafe(mocker: MockerFixture) -> None: """ must skip check if unsafe flag is set diff --git a/tox.ini b/tox.ini index 1bae71d2..2676827a 100644 --- a/tox.ini +++ b/tox.ini @@ -32,4 +32,4 @@ deps = {[tox]dependencies} -e .[tests] commands = - pytest + pytest {posargs}