mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-04-24 23:37:18 +00:00
verbose subprocess exception handle annd quite git
This commit is contained in:
parent
d3f6ca24c8
commit
5dc6df11c5
@ -71,7 +71,7 @@ class Repo(LazyLogging):
|
|||||||
"""
|
"""
|
||||||
Repo._check_output(
|
Repo._check_output(
|
||||||
"repo-add", *self.sign_args, "-R", str(self.repo_path), str(path),
|
"repo-add", *self.sign_args, "-R", str(self.repo_path), str(path),
|
||||||
exception=BuildError(path.name),
|
exception=BuildError.from_process(path.name),
|
||||||
cwd=self.paths.repository,
|
cwd=self.paths.repository,
|
||||||
logger=self.logger,
|
logger=self.logger,
|
||||||
user=self.uid)
|
user=self.uid)
|
||||||
@ -98,7 +98,7 @@ class Repo(LazyLogging):
|
|||||||
# remove package from registry
|
# remove package from registry
|
||||||
Repo._check_output(
|
Repo._check_output(
|
||||||
"repo-remove", *self.sign_args, str(self.repo_path), package,
|
"repo-remove", *self.sign_args, str(self.repo_path), package,
|
||||||
exception=BuildError(package),
|
exception=BuildError.from_process(package),
|
||||||
cwd=self.paths.repository,
|
cwd=self.paths.repository,
|
||||||
logger=self.logger,
|
logger=self.logger,
|
||||||
user=self.uid)
|
user=self.uid)
|
||||||
|
@ -82,10 +82,11 @@ class Sources(LazyLogging):
|
|||||||
branch = remote.branch or instance.DEFAULT_BRANCH
|
branch = remote.branch or instance.DEFAULT_BRANCH
|
||||||
if is_initialized_git:
|
if is_initialized_git:
|
||||||
instance.logger.info("update HEAD to remote at %s using branch %s", sources_dir, branch)
|
instance.logger.info("update HEAD to remote at %s using branch %s", sources_dir, branch)
|
||||||
Sources._check_output("git", "fetch", "origin", branch, cwd=sources_dir, logger=instance.logger)
|
Sources._check_output("git", "fetch", "--quiet", "origin", branch,
|
||||||
|
cwd=sources_dir, logger=instance.logger)
|
||||||
elif remote.git_url is not None:
|
elif remote.git_url is not None:
|
||||||
instance.logger.info("clone remote %s to %s using branch %s", remote.git_url, sources_dir, branch)
|
instance.logger.info("clone remote %s to %s using branch %s", remote.git_url, sources_dir, branch)
|
||||||
Sources._check_output("git", "clone", "--branch", branch, "--single-branch",
|
Sources._check_output("git", "clone", "--quiet", "--branch", branch, "--single-branch",
|
||||||
remote.git_url, str(sources_dir), cwd=sources_dir.parent, logger=instance.logger)
|
remote.git_url, str(sources_dir), cwd=sources_dir.parent, logger=instance.logger)
|
||||||
else:
|
else:
|
||||||
# it will cause an exception later
|
# it will cause an exception later
|
||||||
@ -93,7 +94,8 @@ class Sources(LazyLogging):
|
|||||||
|
|
||||||
# and now force reset to our branch
|
# and now force reset to our branch
|
||||||
Sources._check_output("git", "checkout", "--force", branch, cwd=sources_dir, logger=instance.logger)
|
Sources._check_output("git", "checkout", "--force", branch, cwd=sources_dir, logger=instance.logger)
|
||||||
Sources._check_output("git", "reset", "--hard", f"origin/{branch}", cwd=sources_dir, logger=instance.logger)
|
Sources._check_output("git", "reset", "--quiet", "--hard", f"origin/{branch}",
|
||||||
|
cwd=sources_dir, logger=instance.logger)
|
||||||
|
|
||||||
# move content if required
|
# move content if required
|
||||||
# we are using full path to source directory in order to make append possible
|
# we are using full path to source directory in order to make append possible
|
||||||
@ -126,7 +128,7 @@ class Sources(LazyLogging):
|
|||||||
instance = Sources()
|
instance = Sources()
|
||||||
if not (sources_dir / ".git").is_dir():
|
if not (sources_dir / ".git").is_dir():
|
||||||
# skip initializing in case if it was already
|
# skip initializing in case if it was already
|
||||||
Sources._check_output("git", "init", "--initial-branch", instance.DEFAULT_BRANCH,
|
Sources._check_output("git", "init", "--quiet", "--initial-branch", instance.DEFAULT_BRANCH,
|
||||||
cwd=sources_dir, logger=instance.logger)
|
cwd=sources_dir, logger=instance.logger)
|
||||||
|
|
||||||
# extract local files...
|
# extract local files...
|
||||||
@ -191,7 +193,7 @@ class Sources(LazyLogging):
|
|||||||
return # no changes to push, just skip action
|
return # no changes to push, just skip action
|
||||||
|
|
||||||
git_url, branch = remote.git_source()
|
git_url, branch = remote.git_source()
|
||||||
Sources._check_output("git", "push", git_url, branch, cwd=sources_dir, logger=instance.logger)
|
Sources._check_output("git", "push", "--quiet", git_url, branch, cwd=sources_dir, logger=instance.logger)
|
||||||
|
|
||||||
def add(self, sources_dir: Path, *pattern: str, intent_to_add: bool = False) -> None:
|
def add(self, sources_dir: Path, *pattern: str, intent_to_add: bool = False) -> None:
|
||||||
"""
|
"""
|
||||||
@ -243,7 +245,8 @@ class Sources(LazyLogging):
|
|||||||
environment["GIT_AUTHOR_NAME"] = environment["GIT_COMMITTER_NAME"] = user
|
environment["GIT_AUTHOR_NAME"] = environment["GIT_COMMITTER_NAME"] = user
|
||||||
environment["GIT_AUTHOR_EMAIL"] = environment["GIT_COMMITTER_EMAIL"] = email
|
environment["GIT_AUTHOR_EMAIL"] = environment["GIT_COMMITTER_EMAIL"] = email
|
||||||
|
|
||||||
Sources._check_output("git", "commit", *args, cwd=sources_dir, logger=self.logger, environment=environment)
|
Sources._check_output("git", "commit", "--quiet", *args,
|
||||||
|
cwd=sources_dir, logger=self.logger, environment=environment)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -92,7 +92,7 @@ class Task(LazyLogging):
|
|||||||
|
|
||||||
Task._check_output(
|
Task._check_output(
|
||||||
*command,
|
*command,
|
||||||
exception=BuildError(self.package.base),
|
exception=BuildError.from_process(self.package.base),
|
||||||
cwd=sources_dir,
|
cwd=sources_dir,
|
||||||
logger=self.logger,
|
logger=self.logger,
|
||||||
user=self.uid,
|
user=self.uid,
|
||||||
@ -101,7 +101,7 @@ class Task(LazyLogging):
|
|||||||
# well it is not actually correct, but we can deal with it
|
# well it is not actually correct, but we can deal with it
|
||||||
packages = Task._check_output(
|
packages = Task._check_output(
|
||||||
"makepkg", "--packagelist",
|
"makepkg", "--packagelist",
|
||||||
exception=BuildError(self.package.base),
|
exception=BuildError.from_process(self.package.base),
|
||||||
cwd=sources_dir,
|
cwd=sources_dir,
|
||||||
logger=self.logger
|
logger=self.logger
|
||||||
).splitlines()
|
).splitlines()
|
||||||
|
@ -17,8 +17,11 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
#
|
#
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from collections.abc import Callable
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any, Self
|
||||||
|
|
||||||
|
|
||||||
class BuildError(RuntimeError):
|
class BuildError(RuntimeError):
|
||||||
@ -26,14 +29,61 @@ class BuildError(RuntimeError):
|
|||||||
base exception for failed builds
|
base exception for failed builds
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, package_base: str) -> None:
|
def __init__(self, package_base: str, stderr: str | None = None) -> None:
|
||||||
"""
|
"""
|
||||||
default constructor
|
default constructor
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
package_base(str): package base raised exception
|
package_base(str): package base raised exception
|
||||||
|
stderr(str | None, optional): stderr of the process if available (Default value = None)
|
||||||
"""
|
"""
|
||||||
RuntimeError.__init__(self, f"Package {package_base} build failed, check logs for details")
|
message = f"Package {package_base} build failed,\n"
|
||||||
|
if stderr is not None:
|
||||||
|
message += f"process stderr:\n{stderr}\n"
|
||||||
|
message += "check logs for details"
|
||||||
|
|
||||||
|
RuntimeError.__init__(self, message)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_process(cls, package_base: str) -> Callable[[int, list[str], str, str], Self]:
|
||||||
|
"""
|
||||||
|
generate exception callable from process error
|
||||||
|
|
||||||
|
Args:
|
||||||
|
package_base(str): package base raised exception
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Callable[[int, list[str], str, str], Self]: exception generator to be passed to subprocess utils
|
||||||
|
"""
|
||||||
|
return lambda code, process, stdout, stderr: cls(package_base, stderr)
|
||||||
|
|
||||||
|
|
||||||
|
class CalledProcessError(subprocess.CalledProcessError):
|
||||||
|
"""
|
||||||
|
like ``subprocess.CalledProcessError``, but better
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, status_code: int, process: list[str], stderr: str) -> None:
|
||||||
|
"""
|
||||||
|
default constructor
|
||||||
|
|
||||||
|
Args:
|
||||||
|
status_code(int): process return code
|
||||||
|
process(list[str]): process argument list
|
||||||
|
stderr(str): stderr of the process
|
||||||
|
"""
|
||||||
|
subprocess.CalledProcessError.__init__(self, status_code, process, stderr=stderr)
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
"""
|
||||||
|
string representation of the exception
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: string view of the exception
|
||||||
|
"""
|
||||||
|
return f"""{subprocess.CalledProcessError.__str__(self)}
|
||||||
|
Process stderr:
|
||||||
|
{self.stderr}"""
|
||||||
|
|
||||||
|
|
||||||
class DuplicateRunError(RuntimeError):
|
class DuplicateRunError(RuntimeError):
|
||||||
|
@ -190,7 +190,7 @@ class GPG(LazyLogging):
|
|||||||
"""
|
"""
|
||||||
GPG._check_output(
|
GPG._check_output(
|
||||||
*GPG.sign_command(path, key),
|
*GPG.sign_command(path, key),
|
||||||
exception=BuildError(path.name),
|
exception=BuildError.from_process(path.name),
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
return [path, self.signature(path)]
|
return [path, self.signature(path)]
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ from pathlib import Path
|
|||||||
from pwd import getpwuid
|
from pwd import getpwuid
|
||||||
from typing import Any, IO, TypeVar
|
from typing import Any, IO, TypeVar
|
||||||
|
|
||||||
from ahriman.core.exceptions import OptionError, UnsafeRunError
|
from ahriman.core.exceptions import CalledProcessError, OptionError, UnsafeRunError
|
||||||
from ahriman.models.repository_paths import RepositoryPaths
|
from ahriman.models.repository_paths import RepositoryPaths
|
||||||
|
|
||||||
|
|
||||||
@ -65,7 +65,9 @@ __all__ = [
|
|||||||
T = TypeVar("T")
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
def check_output(*args: str, exception: Exception | None = None, cwd: Path | None = None, input_data: str | None = None,
|
# pylint: disable=too-many-locals
|
||||||
|
def check_output(*args: str, exception: Exception | Callable[[int, list[str], str, str], Exception] | None = None,
|
||||||
|
cwd: Path | None = None, input_data: str | None = None,
|
||||||
logger: logging.Logger | None = None, user: int | None = None,
|
logger: logging.Logger | None = None, user: int | None = None,
|
||||||
environment: dict[str, str] | None = None) -> str:
|
environment: dict[str, str] | None = None) -> str:
|
||||||
"""
|
"""
|
||||||
@ -73,8 +75,9 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
*args(str): command line arguments
|
*args(str): command line arguments
|
||||||
exception(Exception | None, optional): exception which has to be reraised instead of default subprocess
|
exception(Exception | Callable[[int, list[str], str, str]] | None, optional): exception which has to be raised
|
||||||
exception (Default value = None)
|
instead of default subprocess exception. If callable us is supplied, the ``subprocess.CalledProcessError``
|
||||||
|
arguments will be passed (Default value = None)
|
||||||
cwd(Path | None, optional): current working directory (Default value = None)
|
cwd(Path | None, optional): current working directory (Default value = None)
|
||||||
input_data(str | None, optional): data which will be written to command stdin (Default value = None)
|
input_data(str | None, optional): data which will be written to command stdin (Default value = None)
|
||||||
logger(logging.Logger | None, optional): logger to log command result if required (Default value = None)
|
logger(logging.Logger | None, optional): logger to log command result if required (Default value = None)
|
||||||
@ -85,7 +88,7 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
|
|||||||
str: command output
|
str: command output
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
subprocess.CalledProcessError: if subprocess ended with status code different from 0 and no exception supplied
|
CalledProcessError: if subprocess ended with status code different from 0 and no exception supplied
|
||||||
|
|
||||||
Examples:
|
Examples:
|
||||||
Simply call the function::
|
Simply call the function::
|
||||||
@ -110,7 +113,7 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
|
|||||||
return channel if channel is not None else io.StringIO()
|
return channel if channel is not None else io.StringIO()
|
||||||
|
|
||||||
# wrapper around selectors polling
|
# wrapper around selectors polling
|
||||||
def poll(sel: selectors.BaseSelector) -> Generator[str, None, None]:
|
def poll(sel: selectors.BaseSelector) -> Generator[tuple[str, str], None, None]:
|
||||||
for key, _ in sel.select(): # we don't need to check mask here because we have only subscribed on reading
|
for key, _ in sel.select(): # we don't need to check mask here because we have only subscribed on reading
|
||||||
line = key.fileobj.readline() # type: ignore[union-attr]
|
line = key.fileobj.readline() # type: ignore[union-attr]
|
||||||
if not line: # in case of empty line we remove selector as there is no data here anymore
|
if not line: # in case of empty line we remove selector as there is no data here anymore
|
||||||
@ -121,8 +124,7 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
|
|||||||
if logger is not None:
|
if logger is not None:
|
||||||
logger.debug(line)
|
logger.debug(line)
|
||||||
|
|
||||||
if key.data == "stdout":
|
yield key.data, line
|
||||||
yield line # yield only stdout data
|
|
||||||
|
|
||||||
environment = environment or {}
|
environment = environment or {}
|
||||||
if user is not None:
|
if user is not None:
|
||||||
@ -138,17 +140,26 @@ def check_output(*args: str, exception: Exception | None = None, cwd: Path | Non
|
|||||||
selector.register(get_io(process, "stdout"), selectors.EVENT_READ, data="stdout")
|
selector.register(get_io(process, "stdout"), selectors.EVENT_READ, data="stdout")
|
||||||
selector.register(get_io(process, "stderr"), selectors.EVENT_READ, data="stderr")
|
selector.register(get_io(process, "stderr"), selectors.EVENT_READ, data="stderr")
|
||||||
|
|
||||||
result: list[str] = []
|
result: dict[str, list[str]] = {
|
||||||
|
"stdout": [],
|
||||||
|
"stderr": [],
|
||||||
|
}
|
||||||
while selector.get_map(): # while there are unread selectors, keep reading
|
while selector.get_map(): # while there are unread selectors, keep reading
|
||||||
result.extend(poll(selector))
|
for key_data, output in poll(selector):
|
||||||
|
result[key_data].append(output)
|
||||||
|
|
||||||
|
stdout = "\n".join(result["stdout"]).rstrip("\n") # remove newline at the end of any
|
||||||
|
stderr = "\n".join(result["stderr"]).rstrip("\n")
|
||||||
|
|
||||||
status_code = process.wait()
|
status_code = process.wait()
|
||||||
if status_code != 0:
|
if status_code != 0:
|
||||||
if exception is not None:
|
if isinstance(exception, Exception):
|
||||||
raise exception
|
raise exception
|
||||||
raise subprocess.CalledProcessError(status_code, process.args)
|
if callable(exception):
|
||||||
|
raise exception(status_code, list(args), stdout, stderr)
|
||||||
|
raise CalledProcessError(status_code, list(args), stderr)
|
||||||
|
|
||||||
return "\n".join(result).rstrip("\n") # remove newline at the end of any
|
return stdout
|
||||||
|
|
||||||
|
|
||||||
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
|
def check_user(paths: RepositoryPaths, *, unsafe: bool) -> None:
|
||||||
|
@ -56,9 +56,9 @@ def test_fetch_existing(remote_source: RemoteSource, mocker: MockerFixture) -> N
|
|||||||
local = Path("local")
|
local = Path("local")
|
||||||
Sources.fetch(local, remote_source)
|
Sources.fetch(local, remote_source)
|
||||||
check_output_mock.assert_has_calls([
|
check_output_mock.assert_has_calls([
|
||||||
MockCall("git", "fetch", "origin", remote_source.branch, cwd=local, logger=pytest.helpers.anyvar(int)),
|
MockCall("git", "fetch", "--quiet", "origin", remote_source.branch, cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||||
MockCall("git", "checkout", "--force", remote_source.branch, cwd=local, logger=pytest.helpers.anyvar(int)),
|
MockCall("git", "checkout", "--force", remote_source.branch, cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||||
MockCall("git", "reset", "--hard", f"origin/{remote_source.branch}",
|
MockCall("git", "reset", "--quiet", "--hard", f"origin/{remote_source.branch}",
|
||||||
cwd=local, logger=pytest.helpers.anyvar(int)),
|
cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||||
])
|
])
|
||||||
move_mock.assert_called_once_with(local.resolve(), local)
|
move_mock.assert_called_once_with(local.resolve(), local)
|
||||||
@ -75,10 +75,10 @@ def test_fetch_new(remote_source: RemoteSource, mocker: MockerFixture) -> None:
|
|||||||
local = Path("local")
|
local = Path("local")
|
||||||
Sources.fetch(local, remote_source)
|
Sources.fetch(local, remote_source)
|
||||||
check_output_mock.assert_has_calls([
|
check_output_mock.assert_has_calls([
|
||||||
MockCall("git", "clone", "--branch", remote_source.branch, "--single-branch",
|
MockCall("git", "clone", "--quiet", "--branch", remote_source.branch, "--single-branch",
|
||||||
remote_source.git_url, str(local), cwd=local.parent, logger=pytest.helpers.anyvar(int)),
|
remote_source.git_url, str(local), cwd=local.parent, logger=pytest.helpers.anyvar(int)),
|
||||||
MockCall("git", "checkout", "--force", remote_source.branch, cwd=local, logger=pytest.helpers.anyvar(int)),
|
MockCall("git", "checkout", "--force", remote_source.branch, cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||||
MockCall("git", "reset", "--hard", f"origin/{remote_source.branch}",
|
MockCall("git", "reset", "--quiet", "--hard", f"origin/{remote_source.branch}",
|
||||||
cwd=local, logger=pytest.helpers.anyvar(int))
|
cwd=local, logger=pytest.helpers.anyvar(int))
|
||||||
])
|
])
|
||||||
move_mock.assert_called_once_with(local.resolve(), local)
|
move_mock.assert_called_once_with(local.resolve(), local)
|
||||||
@ -96,7 +96,7 @@ def test_fetch_new_without_remote(mocker: MockerFixture) -> None:
|
|||||||
Sources.fetch(local, RemoteSource(source=PackageSource.Archive))
|
Sources.fetch(local, RemoteSource(source=PackageSource.Archive))
|
||||||
check_output_mock.assert_has_calls([
|
check_output_mock.assert_has_calls([
|
||||||
MockCall("git", "checkout", "--force", Sources.DEFAULT_BRANCH, cwd=local, logger=pytest.helpers.anyvar(int)),
|
MockCall("git", "checkout", "--force", Sources.DEFAULT_BRANCH, cwd=local, logger=pytest.helpers.anyvar(int)),
|
||||||
MockCall("git", "reset", "--hard", f"origin/{Sources.DEFAULT_BRANCH}",
|
MockCall("git", "reset", "--quiet", "--hard", f"origin/{Sources.DEFAULT_BRANCH}",
|
||||||
cwd=local, logger=pytest.helpers.anyvar(int))
|
cwd=local, logger=pytest.helpers.anyvar(int))
|
||||||
])
|
])
|
||||||
move_mock.assert_called_once_with(local.resolve(), local)
|
move_mock.assert_called_once_with(local.resolve(), local)
|
||||||
@ -144,7 +144,7 @@ def test_init(mocker: MockerFixture) -> None:
|
|||||||
|
|
||||||
local = Path("local")
|
local = Path("local")
|
||||||
Sources.init(local)
|
Sources.init(local)
|
||||||
check_output_mock.assert_called_once_with("git", "init", "--initial-branch", Sources.DEFAULT_BRANCH,
|
check_output_mock.assert_called_once_with("git", "init", "--quiet", "--initial-branch", Sources.DEFAULT_BRANCH,
|
||||||
cwd=local, logger=pytest.helpers.anyvar(int))
|
cwd=local, logger=pytest.helpers.anyvar(int))
|
||||||
add_mock.assert_called_once_with(local, "PKGBUILD", ".SRCINFO", "local")
|
add_mock.assert_called_once_with(local, "PKGBUILD", ".SRCINFO", "local")
|
||||||
commit_mock.assert_called_once_with(local)
|
commit_mock.assert_called_once_with(local)
|
||||||
@ -241,7 +241,7 @@ def test_push(package_ahriman: Package, mocker: MockerFixture) -> None:
|
|||||||
add_mock.assert_called_once_with(local, "glob")
|
add_mock.assert_called_once_with(local, "glob")
|
||||||
commit_mock.assert_called_once_with(local, commit_author=commit_author)
|
commit_mock.assert_called_once_with(local, commit_author=commit_author)
|
||||||
check_output_mock.assert_called_once_with(
|
check_output_mock.assert_called_once_with(
|
||||||
"git", "push", package_ahriman.remote.git_url, package_ahriman.remote.branch,
|
"git", "push", "--quiet", package_ahriman.remote.git_url, package_ahriman.remote.branch,
|
||||||
cwd=local, logger=pytest.helpers.anyvar(int))
|
cwd=local, logger=pytest.helpers.anyvar(int))
|
||||||
|
|
||||||
|
|
||||||
@ -310,7 +310,7 @@ def test_commit(sources: Sources, mocker: MockerFixture) -> None:
|
|||||||
user, email = sources.DEFAULT_COMMIT_AUTHOR
|
user, email = sources.DEFAULT_COMMIT_AUTHOR
|
||||||
assert sources.commit(local, message=message)
|
assert sources.commit(local, message=message)
|
||||||
check_output_mock.assert_called_once_with(
|
check_output_mock.assert_called_once_with(
|
||||||
"git", "commit", "--message", message,
|
"git", "commit", "--quiet", "--message", message,
|
||||||
cwd=local, logger=pytest.helpers.anyvar(int), environment={
|
cwd=local, logger=pytest.helpers.anyvar(int), environment={
|
||||||
"GIT_AUTHOR_NAME": user,
|
"GIT_AUTHOR_NAME": user,
|
||||||
"GIT_AUTHOR_EMAIL": email,
|
"GIT_AUTHOR_EMAIL": email,
|
||||||
@ -343,7 +343,7 @@ def test_commit_author(sources: Sources, mocker: MockerFixture) -> None:
|
|||||||
user, email = author = ("commit author", "user@host")
|
user, email = author = ("commit author", "user@host")
|
||||||
assert sources.commit(Path("local"), message=message, commit_author=author)
|
assert sources.commit(Path("local"), message=message, commit_author=author)
|
||||||
check_output_mock.assert_called_once_with(
|
check_output_mock.assert_called_once_with(
|
||||||
"git", "commit", "--message", message,
|
"git", "commit", "--quiet", "--message", message,
|
||||||
cwd=local, logger=pytest.helpers.anyvar(int), environment={
|
cwd=local, logger=pytest.helpers.anyvar(int), environment={
|
||||||
"GIT_AUTHOR_NAME": user,
|
"GIT_AUTHOR_NAME": user,
|
||||||
"GIT_AUTHOR_EMAIL": email,
|
"GIT_AUTHOR_EMAIL": email,
|
||||||
@ -364,7 +364,7 @@ def test_commit_autogenerated_message(sources: Sources, mocker: MockerFixture) -
|
|||||||
assert sources.commit(Path("local"))
|
assert sources.commit(Path("local"))
|
||||||
user, email = sources.DEFAULT_COMMIT_AUTHOR
|
user, email = sources.DEFAULT_COMMIT_AUTHOR
|
||||||
check_output_mock.assert_called_once_with(
|
check_output_mock.assert_called_once_with(
|
||||||
"git", "commit", "--message", pytest.helpers.anyvar(str, strict=True),
|
"git", "commit", "--quiet", "--message", pytest.helpers.anyvar(str, strict=True),
|
||||||
cwd=local, logger=pytest.helpers.anyvar(int), environment={
|
cwd=local, logger=pytest.helpers.anyvar(int), environment={
|
||||||
"GIT_AUTHOR_NAME": user,
|
"GIT_AUTHOR_NAME": user,
|
||||||
"GIT_AUTHOR_EMAIL": email,
|
"GIT_AUTHOR_EMAIL": email,
|
||||||
|
@ -0,0 +1,19 @@
|
|||||||
|
from ahriman.core.exceptions import BuildError, CalledProcessError
|
||||||
|
|
||||||
|
|
||||||
|
def test_from_process() -> None:
|
||||||
|
"""
|
||||||
|
must correctly generate exception instance from subprocess
|
||||||
|
"""
|
||||||
|
instance = BuildError.from_process("ahriman")(0, [], "out", "err")
|
||||||
|
assert isinstance(instance, BuildError)
|
||||||
|
assert instance.args == ("Package ahriman build failed,\nprocess stderr:\nerr\ncheck logs for details",)
|
||||||
|
|
||||||
|
|
||||||
|
def test_str() -> None:
|
||||||
|
"""
|
||||||
|
must correctly transform CalledProcessError to string
|
||||||
|
"""
|
||||||
|
instance = CalledProcessError(1, ["cmd"], "error")
|
||||||
|
message = "Command '['cmd']' returned non-zero exit status 1.\nProcess stderr:\nerror"
|
||||||
|
assert str(instance) == message
|
@ -3,14 +3,13 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import pytest
|
import pytest
|
||||||
import requests
|
import requests
|
||||||
import subprocess
|
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pytest_mock import MockerFixture
|
from pytest_mock import MockerFixture
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from unittest.mock import MagicMock, call as MockCall
|
from unittest.mock import MagicMock, call as MockCall
|
||||||
|
|
||||||
from ahriman.core.exceptions import BuildError, OptionError, UnsafeRunError
|
from ahriman.core.exceptions import BuildError, CalledProcessError, OptionError, UnsafeRunError
|
||||||
from ahriman.core.util import check_output, check_user, dataclass_view, enum_values, exception_response_text, \
|
from ahriman.core.util import check_output, check_user, dataclass_view, enum_values, exception_response_text, \
|
||||||
extract_user, filter_json, full_version, package_like, parse_version, partition, pretty_datetime, pretty_size, \
|
extract_user, filter_json, full_version, package_like, parse_version, partition, pretty_datetime, pretty_size, \
|
||||||
safe_filename, srcinfo_property, srcinfo_property_list, trim_package, utcnow, walk
|
safe_filename, srcinfo_property, srcinfo_property_list, trim_package, utcnow, walk
|
||||||
@ -107,10 +106,10 @@ def test_check_output_failure(mocker: MockerFixture) -> None:
|
|||||||
"""
|
"""
|
||||||
mocker.patch("subprocess.Popen.wait", return_value=1)
|
mocker.patch("subprocess.Popen.wait", return_value=1)
|
||||||
|
|
||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(CalledProcessError):
|
||||||
check_output("echo", "hello")
|
check_output("echo", "hello")
|
||||||
|
|
||||||
with pytest.raises(subprocess.CalledProcessError):
|
with pytest.raises(CalledProcessError):
|
||||||
check_output("echo", "hello", logger=logging.getLogger(""))
|
check_output("echo", "hello", logger=logging.getLogger(""))
|
||||||
|
|
||||||
|
|
||||||
@ -128,6 +127,20 @@ def test_check_output_failure_exception(mocker: MockerFixture) -> None:
|
|||||||
check_output("echo", "hello", exception=exception, logger=logging.getLogger(""))
|
check_output("echo", "hello", exception=exception, logger=logging.getLogger(""))
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_output_failure_exception_callable(mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must raise exception from callable provided instead of default
|
||||||
|
"""
|
||||||
|
mocker.patch("subprocess.Popen.wait", return_value=1)
|
||||||
|
exception = BuildError.from_process("")
|
||||||
|
|
||||||
|
with pytest.raises(BuildError):
|
||||||
|
check_output("echo", "hello", exception=exception)
|
||||||
|
|
||||||
|
with pytest.raises(BuildError):
|
||||||
|
check_output("echo", "hello", exception=exception, logger=logging.getLogger(""))
|
||||||
|
|
||||||
|
|
||||||
def test_check_output_empty_line(mocker: MockerFixture) -> None:
|
def test_check_output_empty_line(mocker: MockerFixture) -> None:
|
||||||
"""
|
"""
|
||||||
must correctly process empty lines in command output
|
must correctly process empty lines in command output
|
||||||
|
Loading…
Reference in New Issue
Block a user