mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-04-24 07:17:17 +00:00
import pgp key implementation (#17)
* import pgp key implementation * do not ask confirmation for local sign. Also add argparser test * superseed requests by python-aur package * ...and drop --skippgpcheck makgepkg flag by default
This commit is contained in:
parent
a0e6009876
commit
856a94ff00
@ -17,7 +17,6 @@ optdepends=('aws-cli: sync to s3'
|
|||||||
'python-aiohttp: web server'
|
'python-aiohttp: web server'
|
||||||
'python-aiohttp-jinja2: web server'
|
'python-aiohttp-jinja2: web server'
|
||||||
'python-jinja: html report generation'
|
'python-jinja: html report generation'
|
||||||
'python-requests: web server'
|
|
||||||
'rsync: sync by using rsync'
|
'rsync: sync by using rsync'
|
||||||
'subversion: -svn packages support')
|
'subversion: -svn packages support')
|
||||||
source=("https://github.com/arcan1s/ahriman/releases/download/$pkgver/$pkgname-$pkgver-src.tar.xz"
|
source=("https://github.com/arcan1s/ahriman/releases/download/$pkgver/$pkgname-$pkgver-src.tar.xz"
|
||||||
|
@ -13,7 +13,7 @@ archbuild_flags =
|
|||||||
build_command = extra-x86_64-build
|
build_command = extra-x86_64-build
|
||||||
ignore_packages =
|
ignore_packages =
|
||||||
makechrootpkg_flags =
|
makechrootpkg_flags =
|
||||||
makepkg_flags = --skippgpcheck
|
makepkg_flags =
|
||||||
|
|
||||||
[repository]
|
[repository]
|
||||||
name = aur-clone
|
name = aur-clone
|
||||||
@ -37,7 +37,7 @@ template_path = /usr/share/ahriman/repo-index.jinja2
|
|||||||
target =
|
target =
|
||||||
|
|
||||||
[rsync]
|
[rsync]
|
||||||
command = rsync --archive --verbose --compress --partial --delete
|
command = rsync --archive --compress --partial --delete
|
||||||
|
|
||||||
[s3]
|
[s3]
|
||||||
command = aws s3 sync --quiet --delete
|
command = aws s3 sync --quiet --delete
|
||||||
|
2
setup.py
2
setup.py
@ -29,6 +29,7 @@ setup(
|
|||||||
install_requires=[
|
install_requires=[
|
||||||
"aur",
|
"aur",
|
||||||
"pyalpm",
|
"pyalpm",
|
||||||
|
"requests",
|
||||||
"srcinfo",
|
"srcinfo",
|
||||||
],
|
],
|
||||||
setup_requires=[
|
setup_requires=[
|
||||||
@ -89,7 +90,6 @@ setup(
|
|||||||
"Jinja2",
|
"Jinja2",
|
||||||
"aiohttp",
|
"aiohttp",
|
||||||
"aiohttp_jinja2",
|
"aiohttp_jinja2",
|
||||||
"requests",
|
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
@ -56,6 +56,7 @@ def _parser() -> argparse.ArgumentParser:
|
|||||||
_set_check_parser(subparsers)
|
_set_check_parser(subparsers)
|
||||||
_set_clean_parser(subparsers)
|
_set_clean_parser(subparsers)
|
||||||
_set_config_parser(subparsers)
|
_set_config_parser(subparsers)
|
||||||
|
_set_key_import_parser(subparsers)
|
||||||
_set_rebuild_parser(subparsers)
|
_set_rebuild_parser(subparsers)
|
||||||
_set_remove_parser(subparsers)
|
_set_remove_parser(subparsers)
|
||||||
_set_report_parser(subparsers)
|
_set_report_parser(subparsers)
|
||||||
@ -131,6 +132,21 @@ def _set_config_parser(root: SubParserAction) -> argparse.ArgumentParser:
|
|||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
|
||||||
|
def _set_key_import_parser(root: SubParserAction) -> argparse.ArgumentParser:
|
||||||
|
"""
|
||||||
|
add parser for key import subcommand
|
||||||
|
:param root: subparsers for the commands
|
||||||
|
:return: created argument parser
|
||||||
|
"""
|
||||||
|
parser = root.add_parser("key-import", help="import PGP key",
|
||||||
|
description="import PGP key from public sources to repository user",
|
||||||
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||||
|
parser.add_argument("--key-server", help="key server for key import", default="keys.gnupg.net")
|
||||||
|
parser.add_argument("key", help="PGP key to import from public server")
|
||||||
|
parser.set_defaults(handler=handlers.KeyImport, lock=None, no_report=True)
|
||||||
|
return parser
|
||||||
|
|
||||||
|
|
||||||
def _set_rebuild_parser(root: SubParserAction) -> argparse.ArgumentParser:
|
def _set_rebuild_parser(root: SubParserAction) -> argparse.ArgumentParser:
|
||||||
"""
|
"""
|
||||||
add parser for rebuild subcommand
|
add parser for rebuild subcommand
|
||||||
|
@ -22,6 +22,7 @@ from ahriman.application.handlers.handler import Handler
|
|||||||
from ahriman.application.handlers.add import Add
|
from ahriman.application.handlers.add import Add
|
||||||
from ahriman.application.handlers.clean import Clean
|
from ahriman.application.handlers.clean import Clean
|
||||||
from ahriman.application.handlers.dump import Dump
|
from ahriman.application.handlers.dump import Dump
|
||||||
|
from ahriman.application.handlers.key_import import KeyImport
|
||||||
from ahriman.application.handlers.rebuild import Rebuild
|
from ahriman.application.handlers.rebuild import Rebuild
|
||||||
from ahriman.application.handlers.remove import Remove
|
from ahriman.application.handlers.remove import Remove
|
||||||
from ahriman.application.handlers.report import Report
|
from ahriman.application.handlers.report import Report
|
||||||
|
42
src/ahriman/application/handlers/key_import.py
Normal file
42
src/ahriman/application/handlers/key_import.py
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2021 ahriman team.
|
||||||
|
#
|
||||||
|
# This file is part of ahriman
|
||||||
|
# (see https://github.com/arcan1s/ahriman).
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
from typing import Type
|
||||||
|
|
||||||
|
from ahriman.application.application import Application
|
||||||
|
from ahriman.application.handlers.handler import Handler
|
||||||
|
from ahriman.core.configuration import Configuration
|
||||||
|
|
||||||
|
|
||||||
|
class KeyImport(Handler):
|
||||||
|
"""
|
||||||
|
key import packages handler
|
||||||
|
"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration) -> None:
|
||||||
|
"""
|
||||||
|
callback for command line
|
||||||
|
:param args: command line args
|
||||||
|
:param architecture: repository architecture
|
||||||
|
:param configuration: configuration instance
|
||||||
|
"""
|
||||||
|
Application(architecture, configuration).repository.sign.import_key(args.key_server, args.key)
|
@ -18,13 +18,14 @@
|
|||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
#
|
#
|
||||||
import logging
|
import logging
|
||||||
|
import requests
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Optional, Set, Tuple
|
from typing import List, Optional, Set, Tuple
|
||||||
|
|
||||||
from ahriman.core.configuration import Configuration
|
from ahriman.core.configuration import Configuration
|
||||||
from ahriman.core.exceptions import BuildFailed
|
from ahriman.core.exceptions import BuildFailed
|
||||||
from ahriman.core.util import check_output
|
from ahriman.core.util import check_output, exception_response_text
|
||||||
from ahriman.models.sign_settings import SignSettings
|
from ahriman.models.sign_settings import SignSettings
|
||||||
|
|
||||||
|
|
||||||
@ -87,6 +88,36 @@ class GPG:
|
|||||||
default_key = configuration.get("sign", "key") if targets else None
|
default_key = configuration.get("sign", "key") if targets else None
|
||||||
return targets, default_key
|
return targets, default_key
|
||||||
|
|
||||||
|
def download_key(self, server: str, key: str) -> str:
|
||||||
|
"""
|
||||||
|
download key from public PGP server
|
||||||
|
:param server: public PGP server which will be used to download the key
|
||||||
|
:param key: key ID to download
|
||||||
|
:return: key as plain text
|
||||||
|
"""
|
||||||
|
key = key if key.startswith("0x") else f"0x{key}"
|
||||||
|
try:
|
||||||
|
response = requests.get(f"http://{server}/pks/lookup", params={
|
||||||
|
"op": "get",
|
||||||
|
"options": "mr",
|
||||||
|
"search": key
|
||||||
|
})
|
||||||
|
response.raise_for_status()
|
||||||
|
except requests.exceptions.HTTPError as e:
|
||||||
|
self.logger.exception(f"could not download key {key} from {server}: {exception_response_text(e)}")
|
||||||
|
raise
|
||||||
|
return response.text
|
||||||
|
|
||||||
|
def import_key(self, server: str, key: str) -> None:
|
||||||
|
"""
|
||||||
|
import key to current user and sign it locally
|
||||||
|
:param server: public PGP server which will be used to download the key
|
||||||
|
:param key: key ID to import
|
||||||
|
"""
|
||||||
|
key_body = self.download_key(server, key)
|
||||||
|
GPG._check_output("gpg", "--import", input_data=key_body, exception=None, logger=self.logger)
|
||||||
|
GPG._check_output("gpg", "--quick-lsign-key", key, exception=None, logger=self.logger)
|
||||||
|
|
||||||
def process(self, path: Path, key: str) -> List[Path]:
|
def process(self, path: Path, key: str) -> List[Path]:
|
||||||
"""
|
"""
|
||||||
gpg command wrapper
|
gpg command wrapper
|
||||||
|
@ -23,6 +23,7 @@ import requests
|
|||||||
from typing import List, Optional, Tuple
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
from ahriman.core.status.client import Client
|
from ahriman.core.status.client import Client
|
||||||
|
from ahriman.core.util import exception_response_text
|
||||||
from ahriman.models.build_status import BuildStatusEnum, BuildStatus
|
from ahriman.models.build_status import BuildStatusEnum, BuildStatus
|
||||||
from ahriman.models.internal_status import InternalStatus
|
from ahriman.models.internal_status import InternalStatus
|
||||||
from ahriman.models.package import Package
|
from ahriman.models.package import Package
|
||||||
@ -46,16 +47,6 @@ class WebClient(Client):
|
|||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _exception_response_text(exception: requests.exceptions.HTTPError) -> str:
|
|
||||||
"""
|
|
||||||
safe response exception text generation
|
|
||||||
:param exception: exception raised
|
|
||||||
:return: text of the response if it is not None and empty string otherwise
|
|
||||||
"""
|
|
||||||
result: str = exception.response.text if exception.response is not None else ""
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _ahriman_url(self) -> str:
|
def _ahriman_url(self) -> str:
|
||||||
"""
|
"""
|
||||||
url generator
|
url generator
|
||||||
@ -93,7 +84,7 @@ class WebClient(Client):
|
|||||||
response = requests.post(self._package_url(package.base), json=payload)
|
response = requests.post(self._package_url(package.base), json=payload)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
self.logger.exception(f"could not add {package.base}: {WebClient._exception_response_text(e)}")
|
self.logger.exception(f"could not add {package.base}: {exception_response_text(e)}")
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception(f"could not add {package.base}")
|
self.logger.exception(f"could not add {package.base}")
|
||||||
|
|
||||||
@ -113,7 +104,7 @@ class WebClient(Client):
|
|||||||
for package in status_json
|
for package in status_json
|
||||||
]
|
]
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
self.logger.exception(f"could not get {base}: {WebClient._exception_response_text(e)}")
|
self.logger.exception(f"could not get {base}: {exception_response_text(e)}")
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception(f"could not get {base}")
|
self.logger.exception(f"could not get {base}")
|
||||||
return []
|
return []
|
||||||
@ -130,7 +121,7 @@ class WebClient(Client):
|
|||||||
status_json = response.json()
|
status_json = response.json()
|
||||||
return InternalStatus.from_json(status_json)
|
return InternalStatus.from_json(status_json)
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
self.logger.exception(f"could not get web service status: {WebClient._exception_response_text(e)}")
|
self.logger.exception(f"could not get web service status: {exception_response_text(e)}")
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception("could not get web service status")
|
self.logger.exception("could not get web service status")
|
||||||
return InternalStatus()
|
return InternalStatus()
|
||||||
@ -147,7 +138,7 @@ class WebClient(Client):
|
|||||||
status_json = response.json()
|
status_json = response.json()
|
||||||
return BuildStatus.from_json(status_json)
|
return BuildStatus.from_json(status_json)
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
self.logger.exception(f"could not get service status: {WebClient._exception_response_text(e)}")
|
self.logger.exception(f"could not get service status: {exception_response_text(e)}")
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception("could not get service status")
|
self.logger.exception("could not get service status")
|
||||||
return BuildStatus()
|
return BuildStatus()
|
||||||
@ -161,7 +152,7 @@ class WebClient(Client):
|
|||||||
response = requests.delete(self._package_url(base))
|
response = requests.delete(self._package_url(base))
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
self.logger.exception(f"could not delete {base}: {WebClient._exception_response_text(e)}")
|
self.logger.exception(f"could not delete {base}: {exception_response_text(e)}")
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception(f"could not delete {base}")
|
self.logger.exception(f"could not delete {base}")
|
||||||
|
|
||||||
@ -177,7 +168,7 @@ class WebClient(Client):
|
|||||||
response = requests.post(self._package_url(base), json=payload)
|
response = requests.post(self._package_url(base), json=payload)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
self.logger.exception(f"could not update {base}: {WebClient._exception_response_text(e)}")
|
self.logger.exception(f"could not update {base}: {exception_response_text(e)}")
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception(f"could not update {base}")
|
self.logger.exception(f"could not update {base}")
|
||||||
|
|
||||||
@ -192,6 +183,6 @@ class WebClient(Client):
|
|||||||
response = requests.post(self._ahriman_url(), json=payload)
|
response = requests.post(self._ahriman_url(), json=payload)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
self.logger.exception(f"could not update service status: {WebClient._exception_response_text(e)}")
|
self.logger.exception(f"could not update service status: {exception_response_text(e)}")
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception("could not update service status")
|
self.logger.exception("could not update service status")
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
#
|
#
|
||||||
import datetime
|
import datetime
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import requests
|
||||||
|
|
||||||
from logging import Logger
|
from logging import Logger
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@ -27,29 +28,42 @@ from typing import Optional, Union
|
|||||||
from ahriman.core.exceptions import InvalidOption
|
from ahriman.core.exceptions import InvalidOption
|
||||||
|
|
||||||
|
|
||||||
def check_output(*args: str, exception: Optional[Exception],
|
def check_output(*args: str, exception: Optional[Exception], cwd: Optional[Path] = None,
|
||||||
cwd: Optional[Path] = None, logger: Optional[Logger] = None) -> str:
|
input_data: Optional[str] = None, logger: Optional[Logger] = None) -> str:
|
||||||
"""
|
"""
|
||||||
subprocess wrapper
|
subprocess wrapper
|
||||||
:param args: command line arguments
|
:param args: command line arguments
|
||||||
:param exception: exception which has to be reraised instead of default subprocess exception
|
:param exception: exception which has to be reraised instead of default subprocess exception
|
||||||
:param cwd: current working directory
|
:param cwd: current working directory
|
||||||
|
:param input_data: data which will be written to command stdin
|
||||||
:param logger: logger to log command result if required
|
:param logger: logger to log command result if required
|
||||||
:return: command output
|
:return: command output
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
result = subprocess.check_output(args, cwd=cwd, stderr=subprocess.STDOUT).decode("utf8").strip()
|
# universal_newlines is required to read input from string
|
||||||
|
result: str = subprocess.check_output(args, cwd=cwd, input=input_data, stderr=subprocess.STDOUT, # type: ignore
|
||||||
|
universal_newlines=True).strip()
|
||||||
if logger is not None:
|
if logger is not None:
|
||||||
for line in result.splitlines():
|
for line in result.splitlines():
|
||||||
logger.debug(line)
|
logger.debug(line)
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
if e.output is not None and logger is not None:
|
if e.output is not None and logger is not None:
|
||||||
for line in e.output.decode("utf8").splitlines():
|
for line in e.output.splitlines():
|
||||||
logger.debug(line)
|
logger.debug(line)
|
||||||
raise exception or e
|
raise exception or e
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def exception_response_text(exception: requests.exceptions.HTTPError) -> str:
|
||||||
|
"""
|
||||||
|
safe response exception text generation
|
||||||
|
:param exception: exception raised
|
||||||
|
:return: text of the response if it is not None and empty string otherwise
|
||||||
|
"""
|
||||||
|
result: str = exception.response.text if exception.response is not None else ""
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def package_like(filename: Path) -> bool:
|
def package_like(filename: Path) -> bool:
|
||||||
"""
|
"""
|
||||||
check if file looks like package
|
check if file looks like package
|
||||||
|
@ -0,0 +1,24 @@
|
|||||||
|
import argparse
|
||||||
|
|
||||||
|
from pytest_mock import MockerFixture
|
||||||
|
|
||||||
|
from ahriman.application.handlers import KeyImport
|
||||||
|
from ahriman.core.configuration import Configuration
|
||||||
|
|
||||||
|
|
||||||
|
def _default_args(args: argparse.Namespace) -> argparse.Namespace:
|
||||||
|
args.key = "0xE989490C"
|
||||||
|
args.key_server = "keys.gnupg.net"
|
||||||
|
return args
|
||||||
|
|
||||||
|
|
||||||
|
def test_run(args: argparse.Namespace, configuration: Configuration, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must run command
|
||||||
|
"""
|
||||||
|
args = _default_args(args)
|
||||||
|
mocker.patch("pathlib.Path.mkdir")
|
||||||
|
application_mock = mocker.patch("ahriman.core.sign.gpg.GPG.import_key")
|
||||||
|
|
||||||
|
KeyImport.run(args, "x86_64", configuration)
|
||||||
|
application_mock.assert_called_once()
|
@ -71,6 +71,15 @@ def test_subparsers_config(parser: argparse.ArgumentParser) -> None:
|
|||||||
assert args.unsafe
|
assert args.unsafe
|
||||||
|
|
||||||
|
|
||||||
|
def test_subparsers_key_import(parser: argparse.ArgumentParser) -> None:
|
||||||
|
"""
|
||||||
|
key-import command must imply lock and no_report
|
||||||
|
"""
|
||||||
|
args = parser.parse_args(["-a", "x86_64", "key-import", "key"])
|
||||||
|
assert args.lock is None
|
||||||
|
assert args.no_report
|
||||||
|
|
||||||
|
|
||||||
def test_subparsers_search(parser: argparse.ArgumentParser) -> None:
|
def test_subparsers_search(parser: argparse.ArgumentParser) -> None:
|
||||||
"""
|
"""
|
||||||
search command must imply lock, no_report and unsafe
|
search command must imply lock, no_report and unsafe
|
||||||
|
@ -1,5 +1,9 @@
|
|||||||
|
import pytest
|
||||||
|
import requests
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pytest_mock import MockerFixture
|
from pytest_mock import MockerFixture
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
from ahriman.core.sign.gpg import GPG
|
from ahriman.core.sign.gpg import GPG
|
||||||
from ahriman.models.sign_settings import SignSettings
|
from ahriman.models.sign_settings import SignSettings
|
||||||
@ -60,6 +64,38 @@ def test_sign_command(gpg_with_key: GPG) -> None:
|
|||||||
assert gpg_with_key.sign_command(Path("a"), gpg_with_key.default_key)
|
assert gpg_with_key.sign_command(Path("a"), gpg_with_key.default_key)
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_key(gpg: GPG, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must download the key from public server
|
||||||
|
"""
|
||||||
|
requests_mock = mocker.patch("requests.get")
|
||||||
|
gpg.download_key("keys.gnupg.net", "0xE989490C")
|
||||||
|
requests_mock.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_key_failure(gpg: GPG, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must download the key from public server and log error if any (and raise it again)
|
||||||
|
"""
|
||||||
|
mocker.patch("requests.get", side_effect=requests.exceptions.HTTPError())
|
||||||
|
with pytest.raises(requests.exceptions.HTTPError):
|
||||||
|
gpg.download_key("keys.gnupg.net", "0xE989490C")
|
||||||
|
|
||||||
|
|
||||||
|
def test_import_key(gpg: GPG, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must import PGP key from the server
|
||||||
|
"""
|
||||||
|
mocker.patch("ahriman.core.sign.gpg.GPG.download_key", return_value="key")
|
||||||
|
check_output_mock = mocker.patch("ahriman.core.sign.gpg.GPG._check_output")
|
||||||
|
|
||||||
|
gpg.import_key("keys.gnupg.net", "0xE989490C")
|
||||||
|
check_output_mock.assert_has_calls([
|
||||||
|
mock.call("gpg", "--import", input_data="key", exception=None, logger=pytest.helpers.anyvar(int)),
|
||||||
|
mock.call("gpg", "--quick-lsign-key", "0xE989490C", exception=None, logger=pytest.helpers.anyvar(int))
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
def test_process(gpg_with_key: GPG, mocker: MockerFixture) -> None:
|
def test_process(gpg_with_key: GPG, mocker: MockerFixture) -> None:
|
||||||
"""
|
"""
|
||||||
must call process method correctly
|
must call process method correctly
|
||||||
|
Loading…
Reference in New Issue
Block a user