mirror of
https://github.com/arcan1s/ahriman.git
synced 2026-03-07 02:33:38 +00:00
fix: catch some StopIteration exceptions
This commit is contained in:
@@ -138,8 +138,14 @@ class PacmanDatabase(SyncHttpClient):
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
force(bool): force database synchronization (same as ``pacman -Syy``)
|
force(bool): force database synchronization (same as ``pacman -Syy``)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PacmanError: on operation error (invalid scheme or incomplete configuration)
|
||||||
"""
|
"""
|
||||||
server = next(iter(self.database.servers))
|
try:
|
||||||
|
server = next(iter(self.database.servers))
|
||||||
|
except StopIteration:
|
||||||
|
raise PacmanError("No configured servers available for database") from None
|
||||||
filename = f"{self.database.name}.files.tar.gz"
|
filename = f"{self.database.name}.files.tar.gz"
|
||||||
url = f"{server}/{filename}"
|
url = f"{server}/{filename}"
|
||||||
|
|
||||||
|
|||||||
@@ -116,6 +116,19 @@ class GitRemoteError(RuntimeError):
|
|||||||
RuntimeError.__init__(self, "Git remote failed")
|
RuntimeError.__init__(self, "Git remote failed")
|
||||||
|
|
||||||
|
|
||||||
|
class GPGError(RuntimeError):
|
||||||
|
"""
|
||||||
|
PGP/GPG related exception
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, details: str) -> None:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
details(str): details of the exception
|
||||||
|
"""
|
||||||
|
RuntimeError.__init__(self, f"GPG operation failed: {details}")
|
||||||
|
|
||||||
|
|
||||||
class InitializeError(RuntimeError):
|
class InitializeError(RuntimeError):
|
||||||
"""
|
"""
|
||||||
base service initialization exception
|
base service initialization exception
|
||||||
|
|||||||
@@ -20,7 +20,7 @@
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from ahriman.core.configuration import Configuration
|
from ahriman.core.configuration import Configuration
|
||||||
from ahriman.core.exceptions import BuildError
|
from ahriman.core.exceptions import BuildError, GPGError
|
||||||
from ahriman.core.http import SyncHttpClient
|
from ahriman.core.http import SyncHttpClient
|
||||||
from ahriman.core.utils import check_output
|
from ahriman.core.utils import check_output
|
||||||
from ahriman.models.sign_settings import SignSettings
|
from ahriman.models.sign_settings import SignSettings
|
||||||
@@ -147,12 +147,19 @@ class GPG(SyncHttpClient):
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
str: full PGP key fingerprint
|
str: full PGP key fingerprint
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
GPGError: if key is in wrong format
|
||||||
"""
|
"""
|
||||||
metadata = check_output("gpg", "--with-colons", "--fingerprint", key, logger=self.logger)
|
|
||||||
# fingerprint line will be like
|
# fingerprint line will be like
|
||||||
# fpr:::::::::43A663569A07EE1E4ECC55CC7E3A4240CE3C45C2:
|
# fpr:::::::::43A663569A07EE1E4ECC55CC7E3A4240CE3C45C2:
|
||||||
fingerprint = next(filter(lambda line: line[:3] == "fpr", metadata.splitlines()))
|
metadata = check_output("gpg", "--with-colons", "--fingerprint", key, logger=self.logger)
|
||||||
return fingerprint.split(":")[-2]
|
|
||||||
|
try:
|
||||||
|
fingerprint = next(filter(lambda line: line[:3] == "fpr", metadata.splitlines()))
|
||||||
|
return fingerprint.split(":")[-2]
|
||||||
|
except (IndexError, StopIteration):
|
||||||
|
raise GPGError(f"key {key} has invalid metadata") from None
|
||||||
|
|
||||||
def key_import(self, server: str, key: str) -> None:
|
def key_import(self, server: str, key: str) -> None:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -88,8 +88,12 @@ class User:
|
|||||||
"""
|
"""
|
||||||
if not self.password:
|
if not self.password:
|
||||||
return None
|
return None
|
||||||
algo = next(segment for segment in self.password.split("$") if segment)
|
|
||||||
return f"${algo}$"
|
try:
|
||||||
|
algo = next(segment for segment in self.password.split("$") if segment)
|
||||||
|
return f"${algo}$"
|
||||||
|
except StopIteration:
|
||||||
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def generate_password(length: int) -> str:
|
def generate_password(length: int) -> str:
|
||||||
|
|||||||
@@ -183,6 +183,15 @@ def test_sync_files_local(pacman_database: PacmanDatabase, mocker: MockerFixture
|
|||||||
copy_mock.assert_called_once_with(Path("/var/core.files.tar.gz"), pytest.helpers.anyvar(int))
|
copy_mock.assert_called_once_with(Path("/var/core.files.tar.gz"), pytest.helpers.anyvar(int))
|
||||||
|
|
||||||
|
|
||||||
|
def test_sync_files_no_servers(pacman_database: PacmanDatabase) -> None:
|
||||||
|
"""
|
||||||
|
must raise PacmanError if no servers are configured
|
||||||
|
"""
|
||||||
|
pacman_database.database.servers = []
|
||||||
|
with pytest.raises(PacmanError):
|
||||||
|
pacman_database.sync_files(force=False)
|
||||||
|
|
||||||
|
|
||||||
def test_sync_files_unknown_source(pacman_database: PacmanDatabase) -> None:
|
def test_sync_files_unknown_source(pacman_database: PacmanDatabase) -> None:
|
||||||
"""
|
"""
|
||||||
must raise an exception in case if server scheme is unsupported
|
must raise an exception in case if server scheme is unsupported
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ from pathlib import Path
|
|||||||
from pytest_mock import MockerFixture
|
from pytest_mock import MockerFixture
|
||||||
|
|
||||||
from ahriman.core.configuration import Configuration
|
from ahriman.core.configuration import Configuration
|
||||||
|
from ahriman.core.exceptions import GPGError
|
||||||
from ahriman.core.sign.gpg import GPG
|
from ahriman.core.sign.gpg import GPG
|
||||||
from ahriman.models.sign_settings import SignSettings
|
from ahriman.models.sign_settings import SignSettings
|
||||||
|
|
||||||
@@ -113,6 +114,15 @@ fpr:::::::::43A663569A07EE1E4ECC55CC7E3A4240CE3C45C2:""")
|
|||||||
check_output_mock.assert_called_once_with("gpg", "--with-colons", "--fingerprint", key, logger=gpg.logger)
|
check_output_mock.assert_called_once_with("gpg", "--with-colons", "--fingerprint", key, logger=gpg.logger)
|
||||||
|
|
||||||
|
|
||||||
|
def test_key_fingerprint_invalid(gpg: GPG, mocker: MockerFixture) -> None:
|
||||||
|
"""
|
||||||
|
must raise GPGError if no fingerprint found in output
|
||||||
|
"""
|
||||||
|
mocker.patch("ahriman.core.sign.gpg.check_output", return_value="no fingerprint here")
|
||||||
|
with pytest.raises(GPGError):
|
||||||
|
gpg.key_fingerprint("0xCE3C45C2")
|
||||||
|
|
||||||
|
|
||||||
def test_key_import(gpg: GPG, mocker: MockerFixture) -> None:
|
def test_key_import(gpg: GPG, mocker: MockerFixture) -> None:
|
||||||
"""
|
"""
|
||||||
must import PGP key from the server
|
must import PGP key from the server
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ def test_algo() -> None:
|
|||||||
"""
|
"""
|
||||||
assert User(username="user", password=None, access=UserAccess.Read).algo is None
|
assert User(username="user", password=None, access=UserAccess.Read).algo is None
|
||||||
assert User(username="user", password="", access=UserAccess.Read).algo is None
|
assert User(username="user", password="", access=UserAccess.Read).algo is None
|
||||||
|
assert User(username="user", password="$$$", access=UserAccess.Read).algo is None
|
||||||
|
|
||||||
assert User(
|
assert User(
|
||||||
username="user",
|
username="user",
|
||||||
password="$6$rounds=656000$mWBiecMPrHAL1VgX$oU4Y5HH8HzlvMaxwkNEJjK13ozElyU1wAHBoO/WW5dAaE4YEfnB0X3FxbynKMl4FBdC3Ovap0jINz4LPkNADg0",
|
password="$6$rounds=656000$mWBiecMPrHAL1VgX$oU4Y5HH8HzlvMaxwkNEJjK13ozElyU1wAHBoO/WW5dAaE4YEfnB0X3FxbynKMl4FBdC3Ovap0jINz4LPkNADg0",
|
||||||
|
|||||||
Reference in New Issue
Block a user