mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-04-24 07:17:17 +00:00
better processing for subcommands
Old versions cached full output to memory and only after that printed it into log. This behaviour causes issues in case if operation stucks and you would need to find the step at which it does. New check_output method uses Popen directly and iterates over stdout lines Also changed behaviour from merging stderr into stdout to separate stderr logging. Any other behaviour of the function must be the same. Also changed GPG.key_import method to disable local signing since it seems it is useless (and may break process in case if there is no private key)
This commit is contained in:
parent
1a0322b32e
commit
432ca0cc48
@ -116,7 +116,6 @@ class GPG:
|
|||||||
"""
|
"""
|
||||||
key_body = self.key_download(server, key)
|
key_body = self.key_download(server, key)
|
||||||
GPG._check_output("gpg", "--import", input_data=key_body, exception=None, logger=self.logger)
|
GPG._check_output("gpg", "--import", input_data=key_body, exception=None, logger=self.logger)
|
||||||
GPG._check_output("gpg", "--quick-lsign-key", key, exception=None, logger=self.logger)
|
|
||||||
|
|
||||||
def process(self, path: Path, key: str) -> List[Path]:
|
def process(self, path: Path, key: str) -> List[Path]:
|
||||||
"""
|
"""
|
||||||
|
@ -27,7 +27,7 @@ import tempfile
|
|||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from logging import Logger
|
from logging import Logger
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Dict, Generator, Iterable, Optional, Union
|
from typing import Any, Dict, Generator, Iterable, List, Optional, Union
|
||||||
|
|
||||||
from ahriman.core.exceptions import InvalidOption, UnsafeRun
|
from ahriman.core.exceptions import InvalidOption, UnsafeRun
|
||||||
from ahriman.models.repository_paths import RepositoryPaths
|
from ahriman.models.repository_paths import RepositoryPaths
|
||||||
@ -45,21 +45,39 @@ def check_output(*args: str, exception: Optional[Exception], cwd: Optional[Path]
|
|||||||
:param user: run process as specified user
|
:param user: run process as specified user
|
||||||
:return: command output
|
:return: command output
|
||||||
"""
|
"""
|
||||||
try:
|
def log(single: str) -> None:
|
||||||
# universal_newlines is required to read input from string
|
|
||||||
# FIXME additional workaround for linter and type check which do not know that user arg is supported
|
|
||||||
# pylint: disable=unexpected-keyword-arg
|
|
||||||
result: str = subprocess.check_output(args, cwd=cwd, input=input_data, stderr=subprocess.STDOUT,
|
|
||||||
universal_newlines=True, user=user).strip() # type: ignore
|
|
||||||
if logger is not None:
|
if logger is not None:
|
||||||
for line in result.splitlines():
|
logger.debug(single)
|
||||||
logger.debug(line)
|
|
||||||
return result
|
# FIXME additional workaround for linter and type check which do not know that user arg is supported
|
||||||
except subprocess.CalledProcessError as e:
|
# pylint: disable=unexpected-keyword-arg
|
||||||
if e.output is not None and logger is not None:
|
with subprocess.Popen(args, cwd=cwd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||||
for line in e.output.splitlines():
|
user=user, text=True, encoding="utf8", bufsize=1) as process: # type: ignore
|
||||||
logger.debug(line)
|
if input_data is not None:
|
||||||
raise exception or e
|
process.stdin.write(input_data)
|
||||||
|
process.stdin.close()
|
||||||
|
|
||||||
|
# read stdout and append to output result
|
||||||
|
result: List[str] = []
|
||||||
|
for line in iter(process.stdout.readline, ""):
|
||||||
|
line = line.strip()
|
||||||
|
if not line: # skip empty lines
|
||||||
|
continue
|
||||||
|
result.append(line)
|
||||||
|
log(line)
|
||||||
|
|
||||||
|
# read stderr and write info to logs
|
||||||
|
for line in iter(process.stderr.readline, ""):
|
||||||
|
log(line.strip())
|
||||||
|
|
||||||
|
process.terminate() # make sure that process is terminated
|
||||||
|
status_code = process.wait()
|
||||||
|
if status_code != 0:
|
||||||
|
if exception is not None:
|
||||||
|
raise exception
|
||||||
|
raise subprocess.CalledProcessError(status_code, process.args)
|
||||||
|
|
||||||
|
return "\n".join(result)
|
||||||
|
|
||||||
|
|
||||||
def check_user(paths: RepositoryPaths, unsafe: bool) -> None:
|
def check_user(paths: RepositoryPaths, unsafe: bool) -> None:
|
||||||
|
@ -3,7 +3,6 @@ import requests
|
|||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pytest_mock import MockerFixture
|
from pytest_mock import MockerFixture
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
from ahriman.core.sign.gpg import GPG
|
from ahriman.core.sign.gpg import GPG
|
||||||
from ahriman.models.sign_settings import SignSettings
|
from ahriman.models.sign_settings import SignSettings
|
||||||
@ -91,10 +90,8 @@ def test_key_import(gpg: GPG, mocker: MockerFixture) -> None:
|
|||||||
check_output_mock = mocker.patch("ahriman.core.sign.gpg.GPG._check_output")
|
check_output_mock = mocker.patch("ahriman.core.sign.gpg.GPG._check_output")
|
||||||
|
|
||||||
gpg.key_import("pgp.mit.edu", "0xE989490C")
|
gpg.key_import("pgp.mit.edu", "0xE989490C")
|
||||||
check_output_mock.assert_has_calls([
|
check_output_mock.assert_called_once_with(
|
||||||
mock.call("gpg", "--import", input_data="key", exception=None, logger=pytest.helpers.anyvar(int)),
|
"gpg", "--import", input_data="key", exception=None, logger=pytest.helpers.anyvar(int))
|
||||||
mock.call("gpg", "--quick-lsign-key", "0xE989490C", exception=None, logger=pytest.helpers.anyvar(int))
|
|
||||||
])
|
|
||||||
|
|
||||||
|
|
||||||
def test_process(gpg_with_key: GPG, mocker: MockerFixture) -> None:
|
def test_process(gpg_with_key: GPG, mocker: MockerFixture) -> None:
|
||||||
|
@ -1,13 +1,16 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import pytest
|
import pytest
|
||||||
|
import requests
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pytest_mock import MockerFixture
|
from pytest_mock import MockerFixture
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
from ahriman.core.exceptions import InvalidOption, UnsafeRun
|
from ahriman.core.exceptions import BuildFailed, InvalidOption, UnsafeRun
|
||||||
from ahriman.core.util import check_output, check_user, filter_json, package_like, pretty_datetime, pretty_size, tmpdir, walk
|
from ahriman.core.util import check_output, check_user, exception_response_text, filter_json, package_like, \
|
||||||
|
pretty_datetime, pretty_size, tmpdir, walk
|
||||||
from ahriman.models.package import Package
|
from ahriman.models.package import Package
|
||||||
from ahriman.models.repository_paths import RepositoryPaths
|
from ahriman.models.repository_paths import RepositoryPaths
|
||||||
|
|
||||||
@ -25,32 +28,77 @@ def test_check_output(mocker: MockerFixture) -> None:
|
|||||||
logger_mock.assert_called_once_with("hello")
|
logger_mock.assert_called_once_with("hello")
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_output_stderr(mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must run command and log stderr output
|
||||||
|
"""
|
||||||
|
logger_mock = mocker.patch("logging.Logger.debug")
|
||||||
|
|
||||||
|
assert check_output("python", "-c", """import sys; print("hello", file=sys.stderr)""", exception=None) == ""
|
||||||
|
logger_mock.assert_not_called()
|
||||||
|
|
||||||
|
assert check_output("python", "-c", """import sys; print("hello", file=sys.stderr)""",
|
||||||
|
exception=None, logger=logging.getLogger("")) == ""
|
||||||
|
logger_mock.assert_called_once_with("hello")
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_output_with_stdin() -> None:
|
||||||
|
"""
|
||||||
|
must run command and put string to stdin
|
||||||
|
"""
|
||||||
|
assert check_output("python", "-c", "import sys; value = sys.stdin.read(); print(value)",
|
||||||
|
exception=None, input_data="single line") == "single line"
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_output_with_stdin_newline() -> None:
|
||||||
|
"""
|
||||||
|
must run command and put string to stdin ending with new line
|
||||||
|
"""
|
||||||
|
assert check_output("python", "-c", "import sys; value = sys.stdin.read(); print(value)",
|
||||||
|
exception=None, input_data="single line\n") == "single line"
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_output_multiple_with_stdin() -> None:
|
||||||
|
"""
|
||||||
|
must run command and put multiple lines to stdin
|
||||||
|
"""
|
||||||
|
assert check_output("python", "-c", "import sys; value = sys.stdin.read(); print(value)",
|
||||||
|
exception=None, input_data="multiple\nlines") == "multiple\nlines"
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_output_multiple_with_stdin_newline() -> None:
|
||||||
|
"""
|
||||||
|
must run command and put multiple lines to stdin with new line at the end
|
||||||
|
"""
|
||||||
|
assert check_output("python", "-c", "import sys; value = sys.stdin.read(); print(value)",
|
||||||
|
exception=None, input_data="multiple\nlines\n") == "multiple\nlines"
|
||||||
|
|
||||||
|
|
||||||
def test_check_output_failure(mocker: MockerFixture) -> None:
|
def test_check_output_failure(mocker: MockerFixture) -> None:
|
||||||
"""
|
"""
|
||||||
must process exception correctly
|
must process exception correctly
|
||||||
"""
|
"""
|
||||||
logger_mock = mocker.patch("logging.Logger.debug")
|
mocker.patch("subprocess.Popen.wait", return_value=1)
|
||||||
mocker.patch("subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "echo"))
|
|
||||||
|
|
||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(subprocess.CalledProcessError):
|
||||||
check_output("echo", "hello", exception=None)
|
check_output("echo", "hello", exception=None)
|
||||||
logger_mock.assert_not_called()
|
|
||||||
|
|
||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(subprocess.CalledProcessError):
|
||||||
check_output("echo", "hello", exception=None, logger=logging.getLogger(""))
|
check_output("echo", "hello", exception=None, logger=logging.getLogger(""))
|
||||||
logger_mock.assert_not_called()
|
|
||||||
|
|
||||||
|
|
||||||
def test_check_output_failure_log(mocker: MockerFixture) -> None:
|
def test_check_output_failure_exception(mocker: MockerFixture) -> None:
|
||||||
"""
|
"""
|
||||||
must process exception correctly and log it
|
must raise exception provided instead of default
|
||||||
"""
|
"""
|
||||||
logger_mock = mocker.patch("logging.Logger.debug")
|
mocker.patch("subprocess.Popen.wait", return_value=1)
|
||||||
mocker.patch("subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "echo", output=b"result"))
|
exception = BuildFailed("")
|
||||||
|
|
||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(BuildFailed):
|
||||||
check_output("echo", "hello", exception=None, logger=logging.getLogger(""))
|
check_output("echo", "hello", exception=exception)
|
||||||
logger_mock.assert_called_once_with()
|
|
||||||
|
with pytest.raises(BuildFailed):
|
||||||
|
check_output("echo", "hello", exception=exception, logger=logging.getLogger(""))
|
||||||
|
|
||||||
|
|
||||||
def test_check_user(mocker: MockerFixture) -> None:
|
def test_check_user(mocker: MockerFixture) -> None:
|
||||||
@ -81,6 +129,25 @@ def test_check_user_exception(mocker: MockerFixture) -> None:
|
|||||||
check_user(paths, False)
|
check_user(paths, False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_exception_response_text() -> None:
|
||||||
|
"""
|
||||||
|
must parse HTTP response to string
|
||||||
|
"""
|
||||||
|
response_mock = MagicMock()
|
||||||
|
response_mock.text = "hello"
|
||||||
|
exception = requests.exceptions.HTTPError(response=response_mock)
|
||||||
|
|
||||||
|
assert exception_response_text(exception) == "hello"
|
||||||
|
|
||||||
|
|
||||||
|
def test_exception_response_text_empty() -> None:
|
||||||
|
"""
|
||||||
|
must parse HTTP exception with empty response to empty string
|
||||||
|
"""
|
||||||
|
exception = requests.exceptions.HTTPError(response=None)
|
||||||
|
assert exception_response_text(exception) == ""
|
||||||
|
|
||||||
|
|
||||||
def test_check_unsafe(mocker: MockerFixture) -> None:
|
def test_check_unsafe(mocker: MockerFixture) -> None:
|
||||||
"""
|
"""
|
||||||
must skip check if unsafe flag is set
|
must skip check if unsafe flag is set
|
||||||
|
Loading…
Reference in New Issue
Block a user