mirror of
				https://github.com/arcan1s/ahriman.git
				synced 2025-10-31 13:53:41 +00:00 
			
		
		
		
	import pgp key implementation (#17)
* import pgp key implementation * do not ask confirmation for local sign. Also add argparser test * superseed requests by python-aur package * ...and drop --skippgpcheck makgepkg flag by default
This commit is contained in:
		| @ -17,7 +17,6 @@ optdepends=('aws-cli: sync to s3' | |||||||
|             'python-aiohttp: web server' |             'python-aiohttp: web server' | ||||||
|             'python-aiohttp-jinja2: web server' |             'python-aiohttp-jinja2: web server' | ||||||
|             'python-jinja: html report generation' |             'python-jinja: html report generation' | ||||||
|             'python-requests: web server' |  | ||||||
|             'rsync: sync by using rsync' |             'rsync: sync by using rsync' | ||||||
|             'subversion: -svn packages support') |             'subversion: -svn packages support') | ||||||
| source=("https://github.com/arcan1s/ahriman/releases/download/$pkgver/$pkgname-$pkgver-src.tar.xz" | source=("https://github.com/arcan1s/ahriman/releases/download/$pkgver/$pkgname-$pkgver-src.tar.xz" | ||||||
|  | |||||||
| @ -13,7 +13,7 @@ archbuild_flags = | |||||||
| build_command = extra-x86_64-build | build_command = extra-x86_64-build | ||||||
| ignore_packages = | ignore_packages = | ||||||
| makechrootpkg_flags = | makechrootpkg_flags = | ||||||
| makepkg_flags = --skippgpcheck | makepkg_flags = | ||||||
|  |  | ||||||
| [repository] | [repository] | ||||||
| name = aur-clone | name = aur-clone | ||||||
| @ -37,7 +37,7 @@ template_path = /usr/share/ahriman/repo-index.jinja2 | |||||||
| target = | target = | ||||||
|  |  | ||||||
| [rsync] | [rsync] | ||||||
| command = rsync --archive --verbose --compress --partial --delete | command = rsync --archive --compress --partial --delete | ||||||
|  |  | ||||||
| [s3] | [s3] | ||||||
| command = aws s3 sync --quiet --delete | command = aws s3 sync --quiet --delete | ||||||
|  | |||||||
							
								
								
									
										2
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								setup.py
									
									
									
									
									
								
							| @ -29,6 +29,7 @@ setup( | |||||||
|     install_requires=[ |     install_requires=[ | ||||||
|         "aur", |         "aur", | ||||||
|         "pyalpm", |         "pyalpm", | ||||||
|  |         "requests", | ||||||
|         "srcinfo", |         "srcinfo", | ||||||
|     ], |     ], | ||||||
|     setup_requires=[ |     setup_requires=[ | ||||||
| @ -89,7 +90,6 @@ setup( | |||||||
|             "Jinja2", |             "Jinja2", | ||||||
|             "aiohttp", |             "aiohttp", | ||||||
|             "aiohttp_jinja2", |             "aiohttp_jinja2", | ||||||
|             "requests", |  | ||||||
|         ], |         ], | ||||||
|     }, |     }, | ||||||
| ) | ) | ||||||
|  | |||||||
| @ -56,6 +56,7 @@ def _parser() -> argparse.ArgumentParser: | |||||||
|     _set_check_parser(subparsers) |     _set_check_parser(subparsers) | ||||||
|     _set_clean_parser(subparsers) |     _set_clean_parser(subparsers) | ||||||
|     _set_config_parser(subparsers) |     _set_config_parser(subparsers) | ||||||
|  |     _set_key_import_parser(subparsers) | ||||||
|     _set_rebuild_parser(subparsers) |     _set_rebuild_parser(subparsers) | ||||||
|     _set_remove_parser(subparsers) |     _set_remove_parser(subparsers) | ||||||
|     _set_report_parser(subparsers) |     _set_report_parser(subparsers) | ||||||
| @ -131,6 +132,21 @@ def _set_config_parser(root: SubParserAction) -> argparse.ArgumentParser: | |||||||
|     return parser |     return parser | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def _set_key_import_parser(root: SubParserAction) -> argparse.ArgumentParser: | ||||||
|  |     """ | ||||||
|  |     add parser for key import subcommand | ||||||
|  |     :param root: subparsers for the commands | ||||||
|  |     :return: created argument parser | ||||||
|  |     """ | ||||||
|  |     parser = root.add_parser("key-import", help="import PGP key", | ||||||
|  |                              description="import PGP key from public sources to repository user", | ||||||
|  |                              formatter_class=argparse.ArgumentDefaultsHelpFormatter) | ||||||
|  |     parser.add_argument("--key-server", help="key server for key import", default="keys.gnupg.net") | ||||||
|  |     parser.add_argument("key", help="PGP key to import from public server") | ||||||
|  |     parser.set_defaults(handler=handlers.KeyImport, lock=None, no_report=True) | ||||||
|  |     return parser | ||||||
|  |  | ||||||
|  |  | ||||||
| def _set_rebuild_parser(root: SubParserAction) -> argparse.ArgumentParser: | def _set_rebuild_parser(root: SubParserAction) -> argparse.ArgumentParser: | ||||||
|     """ |     """ | ||||||
|     add parser for rebuild subcommand |     add parser for rebuild subcommand | ||||||
|  | |||||||
| @ -22,6 +22,7 @@ from ahriman.application.handlers.handler import Handler | |||||||
| from ahriman.application.handlers.add import Add | from ahriman.application.handlers.add import Add | ||||||
| from ahriman.application.handlers.clean import Clean | from ahriman.application.handlers.clean import Clean | ||||||
| from ahriman.application.handlers.dump import Dump | from ahriman.application.handlers.dump import Dump | ||||||
|  | from ahriman.application.handlers.key_import import KeyImport | ||||||
| from ahriman.application.handlers.rebuild import Rebuild | from ahriman.application.handlers.rebuild import Rebuild | ||||||
| from ahriman.application.handlers.remove import Remove | from ahriman.application.handlers.remove import Remove | ||||||
| from ahriman.application.handlers.report import Report | from ahriman.application.handlers.report import Report | ||||||
|  | |||||||
							
								
								
									
										42
									
								
								src/ahriman/application/handlers/key_import.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								src/ahriman/application/handlers/key_import.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,42 @@ | |||||||
|  | # | ||||||
|  | # Copyright (c) 2021 ahriman team. | ||||||
|  | # | ||||||
|  | # This file is part of ahriman | ||||||
|  | # (see https://github.com/arcan1s/ahriman). | ||||||
|  | # | ||||||
|  | # This program is free software: you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU General Public License as published by | ||||||
|  | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | # | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU General Public License for more details. | ||||||
|  | # | ||||||
|  | # You should have received a copy of the GNU General Public License | ||||||
|  | # along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||||
|  | # | ||||||
|  | import argparse | ||||||
|  |  | ||||||
|  | from typing import Type | ||||||
|  |  | ||||||
|  | from ahriman.application.application import Application | ||||||
|  | from ahriman.application.handlers.handler import Handler | ||||||
|  | from ahriman.core.configuration import Configuration | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class KeyImport(Handler): | ||||||
|  |     """ | ||||||
|  |     key import packages handler | ||||||
|  |     """ | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def run(cls: Type[Handler], args: argparse.Namespace, architecture: str, configuration: Configuration) -> None: | ||||||
|  |         """ | ||||||
|  |         callback for command line | ||||||
|  |         :param args: command line args | ||||||
|  |         :param architecture: repository architecture | ||||||
|  |         :param configuration: configuration instance | ||||||
|  |         """ | ||||||
|  |         Application(architecture, configuration).repository.sign.import_key(args.key_server, args.key) | ||||||
| @ -18,13 +18,14 @@ | |||||||
| # along with this program. If not, see <http://www.gnu.org/licenses/>. | # along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||||
| # | # | ||||||
| import logging | import logging | ||||||
|  | import requests | ||||||
|  |  | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| from typing import List, Optional, Set, Tuple | from typing import List, Optional, Set, Tuple | ||||||
|  |  | ||||||
| from ahriman.core.configuration import Configuration | from ahriman.core.configuration import Configuration | ||||||
| from ahriman.core.exceptions import BuildFailed | from ahriman.core.exceptions import BuildFailed | ||||||
| from ahriman.core.util import check_output | from ahriman.core.util import check_output, exception_response_text | ||||||
| from ahriman.models.sign_settings import SignSettings | from ahriman.models.sign_settings import SignSettings | ||||||
|  |  | ||||||
|  |  | ||||||
| @ -87,6 +88,36 @@ class GPG: | |||||||
|         default_key = configuration.get("sign", "key") if targets else None |         default_key = configuration.get("sign", "key") if targets else None | ||||||
|         return targets, default_key |         return targets, default_key | ||||||
|  |  | ||||||
|  |     def download_key(self, server: str, key: str) -> str: | ||||||
|  |         """ | ||||||
|  |         download key from public PGP server | ||||||
|  |         :param server: public PGP server which will be used to download the key | ||||||
|  |         :param key: key ID to download | ||||||
|  |         :return: key as plain text | ||||||
|  |         """ | ||||||
|  |         key = key if key.startswith("0x") else f"0x{key}" | ||||||
|  |         try: | ||||||
|  |             response = requests.get(f"http://{server}/pks/lookup", params={ | ||||||
|  |                 "op": "get", | ||||||
|  |                 "options": "mr", | ||||||
|  |                 "search": key | ||||||
|  |             }) | ||||||
|  |             response.raise_for_status() | ||||||
|  |         except requests.exceptions.HTTPError as e: | ||||||
|  |             self.logger.exception(f"could not download key {key} from {server}: {exception_response_text(e)}") | ||||||
|  |             raise | ||||||
|  |         return response.text | ||||||
|  |  | ||||||
|  |     def import_key(self, server: str, key: str) -> None: | ||||||
|  |         """ | ||||||
|  |         import key to current user and sign it locally | ||||||
|  |         :param server: public PGP server which will be used to download the key | ||||||
|  |         :param key: key ID to import | ||||||
|  |         """ | ||||||
|  |         key_body = self.download_key(server, key) | ||||||
|  |         GPG._check_output("gpg", "--import", input_data=key_body, exception=None, logger=self.logger) | ||||||
|  |         GPG._check_output("gpg", "--quick-lsign-key", key, exception=None, logger=self.logger) | ||||||
|  |  | ||||||
|     def process(self, path: Path, key: str) -> List[Path]: |     def process(self, path: Path, key: str) -> List[Path]: | ||||||
|         """ |         """ | ||||||
|         gpg command wrapper |         gpg command wrapper | ||||||
|  | |||||||
| @ -23,6 +23,7 @@ import requests | |||||||
| from typing import List, Optional, Tuple | from typing import List, Optional, Tuple | ||||||
|  |  | ||||||
| from ahriman.core.status.client import Client | from ahriman.core.status.client import Client | ||||||
|  | from ahriman.core.util import exception_response_text | ||||||
| from ahriman.models.build_status import BuildStatusEnum, BuildStatus | from ahriman.models.build_status import BuildStatusEnum, BuildStatus | ||||||
| from ahriman.models.internal_status import InternalStatus | from ahriman.models.internal_status import InternalStatus | ||||||
| from ahriman.models.package import Package | from ahriman.models.package import Package | ||||||
| @ -46,16 +47,6 @@ class WebClient(Client): | |||||||
|         self.host = host |         self.host = host | ||||||
|         self.port = port |         self.port = port | ||||||
|  |  | ||||||
|     @staticmethod |  | ||||||
|     def _exception_response_text(exception: requests.exceptions.HTTPError) -> str: |  | ||||||
|         """ |  | ||||||
|         safe response exception text generation |  | ||||||
|         :param exception: exception raised |  | ||||||
|         :return: text of the response if it is not None and empty string otherwise |  | ||||||
|         """ |  | ||||||
|         result: str = exception.response.text if exception.response is not None else "" |  | ||||||
|         return result |  | ||||||
|  |  | ||||||
|     def _ahriman_url(self) -> str: |     def _ahriman_url(self) -> str: | ||||||
|         """ |         """ | ||||||
|         url generator |         url generator | ||||||
| @ -93,7 +84,7 @@ class WebClient(Client): | |||||||
|             response = requests.post(self._package_url(package.base), json=payload) |             response = requests.post(self._package_url(package.base), json=payload) | ||||||
|             response.raise_for_status() |             response.raise_for_status() | ||||||
|         except requests.exceptions.HTTPError as e: |         except requests.exceptions.HTTPError as e: | ||||||
|             self.logger.exception(f"could not add {package.base}: {WebClient._exception_response_text(e)}") |             self.logger.exception(f"could not add {package.base}: {exception_response_text(e)}") | ||||||
|         except Exception: |         except Exception: | ||||||
|             self.logger.exception(f"could not add {package.base}") |             self.logger.exception(f"could not add {package.base}") | ||||||
|  |  | ||||||
| @ -113,7 +104,7 @@ class WebClient(Client): | |||||||
|                 for package in status_json |                 for package in status_json | ||||||
|             ] |             ] | ||||||
|         except requests.exceptions.HTTPError as e: |         except requests.exceptions.HTTPError as e: | ||||||
|             self.logger.exception(f"could not get {base}: {WebClient._exception_response_text(e)}") |             self.logger.exception(f"could not get {base}: {exception_response_text(e)}") | ||||||
|         except Exception: |         except Exception: | ||||||
|             self.logger.exception(f"could not get {base}") |             self.logger.exception(f"could not get {base}") | ||||||
|         return [] |         return [] | ||||||
| @ -130,7 +121,7 @@ class WebClient(Client): | |||||||
|             status_json = response.json() |             status_json = response.json() | ||||||
|             return InternalStatus.from_json(status_json) |             return InternalStatus.from_json(status_json) | ||||||
|         except requests.exceptions.HTTPError as e: |         except requests.exceptions.HTTPError as e: | ||||||
|             self.logger.exception(f"could not get web service status: {WebClient._exception_response_text(e)}") |             self.logger.exception(f"could not get web service status: {exception_response_text(e)}") | ||||||
|         except Exception: |         except Exception: | ||||||
|             self.logger.exception("could not get web service status") |             self.logger.exception("could not get web service status") | ||||||
|         return InternalStatus() |         return InternalStatus() | ||||||
| @ -147,7 +138,7 @@ class WebClient(Client): | |||||||
|             status_json = response.json() |             status_json = response.json() | ||||||
|             return BuildStatus.from_json(status_json) |             return BuildStatus.from_json(status_json) | ||||||
|         except requests.exceptions.HTTPError as e: |         except requests.exceptions.HTTPError as e: | ||||||
|             self.logger.exception(f"could not get service status: {WebClient._exception_response_text(e)}") |             self.logger.exception(f"could not get service status: {exception_response_text(e)}") | ||||||
|         except Exception: |         except Exception: | ||||||
|             self.logger.exception("could not get service status") |             self.logger.exception("could not get service status") | ||||||
|         return BuildStatus() |         return BuildStatus() | ||||||
| @ -161,7 +152,7 @@ class WebClient(Client): | |||||||
|             response = requests.delete(self._package_url(base)) |             response = requests.delete(self._package_url(base)) | ||||||
|             response.raise_for_status() |             response.raise_for_status() | ||||||
|         except requests.exceptions.HTTPError as e: |         except requests.exceptions.HTTPError as e: | ||||||
|             self.logger.exception(f"could not delete {base}: {WebClient._exception_response_text(e)}") |             self.logger.exception(f"could not delete {base}: {exception_response_text(e)}") | ||||||
|         except Exception: |         except Exception: | ||||||
|             self.logger.exception(f"could not delete {base}") |             self.logger.exception(f"could not delete {base}") | ||||||
|  |  | ||||||
| @ -177,7 +168,7 @@ class WebClient(Client): | |||||||
|             response = requests.post(self._package_url(base), json=payload) |             response = requests.post(self._package_url(base), json=payload) | ||||||
|             response.raise_for_status() |             response.raise_for_status() | ||||||
|         except requests.exceptions.HTTPError as e: |         except requests.exceptions.HTTPError as e: | ||||||
|             self.logger.exception(f"could not update {base}: {WebClient._exception_response_text(e)}") |             self.logger.exception(f"could not update {base}: {exception_response_text(e)}") | ||||||
|         except Exception: |         except Exception: | ||||||
|             self.logger.exception(f"could not update {base}") |             self.logger.exception(f"could not update {base}") | ||||||
|  |  | ||||||
| @ -192,6 +183,6 @@ class WebClient(Client): | |||||||
|             response = requests.post(self._ahriman_url(), json=payload) |             response = requests.post(self._ahriman_url(), json=payload) | ||||||
|             response.raise_for_status() |             response.raise_for_status() | ||||||
|         except requests.exceptions.HTTPError as e: |         except requests.exceptions.HTTPError as e: | ||||||
|             self.logger.exception(f"could not update service status: {WebClient._exception_response_text(e)}") |             self.logger.exception(f"could not update service status: {exception_response_text(e)}") | ||||||
|         except Exception: |         except Exception: | ||||||
|             self.logger.exception("could not update service status") |             self.logger.exception("could not update service status") | ||||||
|  | |||||||
| @ -19,6 +19,7 @@ | |||||||
| # | # | ||||||
| import datetime | import datetime | ||||||
| import subprocess | import subprocess | ||||||
|  | import requests | ||||||
|  |  | ||||||
| from logging import Logger | from logging import Logger | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| @ -27,29 +28,42 @@ from typing import Optional, Union | |||||||
| from ahriman.core.exceptions import InvalidOption | from ahriman.core.exceptions import InvalidOption | ||||||
|  |  | ||||||
|  |  | ||||||
| def check_output(*args: str, exception: Optional[Exception], | def check_output(*args: str, exception: Optional[Exception], cwd: Optional[Path] = None, | ||||||
|                  cwd: Optional[Path] = None, logger: Optional[Logger] = None) -> str: |                  input_data: Optional[str] = None, logger: Optional[Logger] = None) -> str: | ||||||
|     """ |     """ | ||||||
|     subprocess wrapper |     subprocess wrapper | ||||||
|     :param args: command line arguments |     :param args: command line arguments | ||||||
|     :param exception: exception which has to be reraised instead of default subprocess exception |     :param exception: exception which has to be reraised instead of default subprocess exception | ||||||
|     :param cwd: current working directory |     :param cwd: current working directory | ||||||
|  |     :param input_data: data which will be written to command stdin | ||||||
|     :param logger: logger to log command result if required |     :param logger: logger to log command result if required | ||||||
|     :return: command output |     :return: command output | ||||||
|     """ |     """ | ||||||
|     try: |     try: | ||||||
|         result = subprocess.check_output(args, cwd=cwd, stderr=subprocess.STDOUT).decode("utf8").strip() |         # universal_newlines is required to read input from string | ||||||
|  |         result: str = subprocess.check_output(args, cwd=cwd, input=input_data, stderr=subprocess.STDOUT,  # type: ignore | ||||||
|  |                                               universal_newlines=True).strip() | ||||||
|         if logger is not None: |         if logger is not None: | ||||||
|             for line in result.splitlines(): |             for line in result.splitlines(): | ||||||
|                 logger.debug(line) |                 logger.debug(line) | ||||||
|     except subprocess.CalledProcessError as e: |     except subprocess.CalledProcessError as e: | ||||||
|         if e.output is not None and logger is not None: |         if e.output is not None and logger is not None: | ||||||
|             for line in e.output.decode("utf8").splitlines(): |             for line in e.output.splitlines(): | ||||||
|                 logger.debug(line) |                 logger.debug(line) | ||||||
|         raise exception or e |         raise exception or e | ||||||
|     return result |     return result | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def exception_response_text(exception: requests.exceptions.HTTPError) -> str: | ||||||
|  |     """ | ||||||
|  |     safe response exception text generation | ||||||
|  |     :param exception: exception raised | ||||||
|  |     :return: text of the response if it is not None and empty string otherwise | ||||||
|  |     """ | ||||||
|  |     result: str = exception.response.text if exception.response is not None else "" | ||||||
|  |     return result | ||||||
|  |  | ||||||
|  |  | ||||||
| def package_like(filename: Path) -> bool: | def package_like(filename: Path) -> bool: | ||||||
|     """ |     """ | ||||||
|     check if file looks like package |     check if file looks like package | ||||||
|  | |||||||
| @ -0,0 +1,24 @@ | |||||||
|  | import argparse | ||||||
|  |  | ||||||
|  | from pytest_mock import MockerFixture | ||||||
|  |  | ||||||
|  | from ahriman.application.handlers import KeyImport | ||||||
|  | from ahriman.core.configuration import Configuration | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def _default_args(args: argparse.Namespace) -> argparse.Namespace: | ||||||
|  |     args.key = "0xE989490C" | ||||||
|  |     args.key_server = "keys.gnupg.net" | ||||||
|  |     return args | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def test_run(args: argparse.Namespace, configuration: Configuration, mocker: MockerFixture) -> None: | ||||||
|  |     """ | ||||||
|  |     must run command | ||||||
|  |     """ | ||||||
|  |     args = _default_args(args) | ||||||
|  |     mocker.patch("pathlib.Path.mkdir") | ||||||
|  |     application_mock = mocker.patch("ahriman.core.sign.gpg.GPG.import_key") | ||||||
|  |  | ||||||
|  |     KeyImport.run(args, "x86_64", configuration) | ||||||
|  |     application_mock.assert_called_once() | ||||||
| @ -71,6 +71,15 @@ def test_subparsers_config(parser: argparse.ArgumentParser) -> None: | |||||||
|     assert args.unsafe |     assert args.unsafe | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def test_subparsers_key_import(parser: argparse.ArgumentParser) -> None: | ||||||
|  |     """ | ||||||
|  |     key-import command must imply lock and no_report | ||||||
|  |     """ | ||||||
|  |     args = parser.parse_args(["-a", "x86_64", "key-import", "key"]) | ||||||
|  |     assert args.lock is None | ||||||
|  |     assert args.no_report | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_subparsers_search(parser: argparse.ArgumentParser) -> None: | def test_subparsers_search(parser: argparse.ArgumentParser) -> None: | ||||||
|     """ |     """ | ||||||
|     search command must imply lock, no_report and unsafe |     search command must imply lock, no_report and unsafe | ||||||
|  | |||||||
| @ -1,5 +1,9 @@ | |||||||
|  | import pytest | ||||||
|  | import requests | ||||||
|  |  | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| from pytest_mock import MockerFixture | from pytest_mock import MockerFixture | ||||||
|  | from unittest import mock | ||||||
|  |  | ||||||
| from ahriman.core.sign.gpg import GPG | from ahriman.core.sign.gpg import GPG | ||||||
| from ahriman.models.sign_settings import SignSettings | from ahriman.models.sign_settings import SignSettings | ||||||
| @ -60,6 +64,38 @@ def test_sign_command(gpg_with_key: GPG) -> None: | |||||||
|     assert gpg_with_key.sign_command(Path("a"), gpg_with_key.default_key) |     assert gpg_with_key.sign_command(Path("a"), gpg_with_key.default_key) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def test_download_key(gpg: GPG, mocker: MockerFixture) -> None: | ||||||
|  |     """ | ||||||
|  |     must download the key from public server | ||||||
|  |     """ | ||||||
|  |     requests_mock = mocker.patch("requests.get") | ||||||
|  |     gpg.download_key("keys.gnupg.net", "0xE989490C") | ||||||
|  |     requests_mock.assert_called_once() | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def test_download_key_failure(gpg: GPG, mocker: MockerFixture) -> None: | ||||||
|  |     """ | ||||||
|  |     must download the key from public server and log error if any (and raise it again) | ||||||
|  |     """ | ||||||
|  |     mocker.patch("requests.get", side_effect=requests.exceptions.HTTPError()) | ||||||
|  |     with pytest.raises(requests.exceptions.HTTPError): | ||||||
|  |         gpg.download_key("keys.gnupg.net", "0xE989490C") | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def test_import_key(gpg: GPG, mocker: MockerFixture) -> None: | ||||||
|  |     """ | ||||||
|  |     must import PGP key from the server | ||||||
|  |     """ | ||||||
|  |     mocker.patch("ahriman.core.sign.gpg.GPG.download_key", return_value="key") | ||||||
|  |     check_output_mock = mocker.patch("ahriman.core.sign.gpg.GPG._check_output") | ||||||
|  |  | ||||||
|  |     gpg.import_key("keys.gnupg.net", "0xE989490C") | ||||||
|  |     check_output_mock.assert_has_calls([ | ||||||
|  |         mock.call("gpg", "--import", input_data="key", exception=None, logger=pytest.helpers.anyvar(int)), | ||||||
|  |         mock.call("gpg", "--quick-lsign-key", "0xE989490C", exception=None, logger=pytest.helpers.anyvar(int)) | ||||||
|  |     ]) | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_process(gpg_with_key: GPG, mocker: MockerFixture) -> None: | def test_process(gpg_with_key: GPG, mocker: MockerFixture) -> None: | ||||||
|     """ |     """ | ||||||
|     must call process method correctly |     must call process method correctly | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user