From 8d53a59a6abe4f45de9a063bce67ab1d8276cfd8 Mon Sep 17 00:00:00 2001 From: Evgenii Alekseev Date: Mon, 11 Nov 2024 18:21:00 +0200 Subject: [PATCH] feat: notify users about outdated password hashes used --- src/ahriman/core/auth/mapping.py | 8 +++--- src/ahriman/core/auth/pam.py | 2 +- src/ahriman/models/user.py | 21 +++++++++++++++ tests/ahriman/core/auth/test_mapping.py | 12 ++++----- tests/ahriman/models/test_user.py | 34 ++++++++++++++++++++++++- 5 files changed, 65 insertions(+), 12 deletions(-) diff --git a/src/ahriman/core/auth/mapping.py b/src/ahriman/core/auth/mapping.py index 0c2f472d..f9aac132 100644 --- a/src/ahriman/core/auth/mapping.py +++ b/src/ahriman/core/auth/mapping.py @@ -59,10 +59,10 @@ class Mapping(Auth): """ if password is None: return False # invalid data supplied - user = self.get_user(username) + user = await self.get_user(username) return user is not None and user.check_credentials(password, self.salt) - def get_user(self, username: str) -> User | None: + async def get_user(self, username: str) -> User | None: """ retrieve user from in-memory mapping @@ -84,7 +84,7 @@ class Mapping(Auth): Returns: bool: ``True`` in case if user is known and can be authorized and ``False`` otherwise """ - return username is not None and self.get_user(username) is not None + return username is not None and await self.get_user(username) is not None async def verify_access(self, username: str, required: UserAccess, context: str | None) -> bool: """ @@ -98,5 +98,5 @@ class Mapping(Auth): Returns: bool: ``True`` in case if user is allowed to do this request and ``False`` otherwise """ - user = self.get_user(username) + user = await self.get_user(username) return user is not None and user.verify_access(required) diff --git a/src/ahriman/core/auth/pam.py b/src/ahriman/core/auth/pam.py index ac4303a4..71ffeafe 100644 --- a/src/ahriman/core/auth/pam.py +++ b/src/ahriman/core/auth/pam.py @@ -120,7 +120,7 @@ class PAM(Mapping): bool: ``True`` in case if user is allowed to do this request and ``False`` otherwise """ # this method is basically inverted, first we check overrides in database and then fallback to the PAM logic - if (user := self.get_user(username)) is not None: + if (user := await self.get_user(username)) is not None: return user.verify_access(required) # if username is in admin group, then we treat it as full access if username in self.group_members(self.full_access_group): diff --git a/src/ahriman/models/user.py b/src/ahriman/models/user.py index 4e3daa5f..722922bd 100644 --- a/src/ahriman/models/user.py +++ b/src/ahriman/models/user.py @@ -32,6 +32,7 @@ class User: authorized web user model Attributes: + SUPPORTED_ALGOS(set[str]): (class attribute) list of the supported hashing algorithms username(str): username password(str): hashed user password with salt access(UserAccess): user role @@ -68,6 +69,8 @@ class User: packager_id: str | None = None key: str | None = None + SUPPORTED_ALGOS = {"$2$", "$2a$", "$2x$", "$2y$", "$2b$"} + def __post_init__(self) -> None: """ remove empty fields @@ -75,6 +78,19 @@ class User: object.__setattr__(self, "packager_id", self.packager_id or None) object.__setattr__(self, "key", self.key or None) + @property + def algo(self) -> str | None: + """ + extract algorithm used for the hashing password + + Returns: + str | None: first part of password hash (e.g. ``$2$``) if available or ``None`` otherwise + """ + if not self.password: + return None + algo = next(segment for segment in self.password.split("$") if segment) + return f"${algo}$" + @staticmethod def generate_password(length: int) -> str: """ @@ -98,7 +114,12 @@ class User: Returns: bool: ``True`` in case if password matches, ``False`` otherwise + + Raises: + ValueError: if user password is set to unsupported algorithm """ + if (algo := self.algo) is not None and algo not in self.SUPPORTED_ALGOS: + raise ValueError(f"Crypt {algo} is not supported, consider setting new password") try: return bcrypt.checkpw((password + salt).encode("utf8"), self.password.encode("utf8")) except ValueError: diff --git a/tests/ahriman/core/auth/test_mapping.py b/tests/ahriman/core/auth/test_mapping.py index 194f4fbd..3b8348f1 100644 --- a/tests/ahriman/core/auth/test_mapping.py +++ b/tests/ahriman/core/auth/test_mapping.py @@ -31,27 +31,27 @@ async def test_check_credentials_unknown(mapping: Mapping, user: User) -> None: assert not await mapping.check_credentials(user.username, user.password) -def test_get_user(mapping: Mapping, user: User, mocker: MockerFixture) -> None: +async def test_get_user(mapping: Mapping, user: User, mocker: MockerFixture) -> None: """ must return user from storage by username """ mocker.patch("ahriman.core.database.SQLite.user_get", return_value=user) - assert mapping.get_user(user.username) == user + assert await mapping.get_user(user.username) == user -def test_get_user_normalized(mapping: Mapping, user: User, mocker: MockerFixture) -> None: +async def test_get_user_normalized(mapping: Mapping, user: User, mocker: MockerFixture) -> None: """ must return user from storage by username case-insensitive """ mocker.patch("ahriman.core.database.SQLite.user_get", return_value=user) - assert mapping.get_user(user.username.upper()) == user + assert await mapping.get_user(user.username.upper()) == user -def test_get_user_unknown(mapping: Mapping, user: User) -> None: +async def test_get_user_unknown(mapping: Mapping, user: User) -> None: """ must return None in case if no user found """ - assert mapping.get_user(user.username) is None + assert await mapping.get_user(user.username) is None async def test_known_username(mapping: Mapping, user: User, mocker: MockerFixture) -> None: diff --git a/tests/ahriman/models/test_user.py b/tests/ahriman/models/test_user.py index 7f4de565..a04fdeef 100644 --- a/tests/ahriman/models/test_user.py +++ b/tests/ahriman/models/test_user.py @@ -1,9 +1,29 @@ +import pytest + from dataclasses import replace from ahriman.models.user import User from ahriman.models.user_access import UserAccess +def test_algo() -> None: + """ + must correctly define algorithm used + """ + assert User(username="user", password=None, access=UserAccess.Read).algo is None + assert User(username="user", password="", access=UserAccess.Read).algo is None + assert User( + username="user", + password="$6$rounds=656000$mWBiecMPrHAL1VgX$oU4Y5HH8HzlvMaxwkNEJjK13ozElyU1wAHBoO/WW5dAaE4YEfnB0X3FxbynKMl4FBdC3Ovap0jINz4LPkNADg0", + access=UserAccess.Read, + ).algo == "$6$" + assert User( + username="user", + password="$2b$12$VCWKazvYxH7B0eAalDGAbu/3y1dSWs79sv/2ujjX1TMaFdVUy80hy", + access=UserAccess.Read, + ).algo == "$2b$" + + def test_check_credentials_hash_password(user: User) -> None: """ must generate and validate user password @@ -20,11 +40,23 @@ def test_check_credentials_empty_hash(user: User) -> None: must reject any authorization if the hash is invalid """ current_password = user.password - assert not user.check_credentials(current_password, "salt") user = replace(user, password="") assert not user.check_credentials(current_password, "salt") +def test_check_credentials_sha512() -> None: + """ + must raise DeprecationWarning for sha512 hashed passwords + """ + user = User( + username="user", + password="$6$rounds=656000$mWBiecMPrHAL1VgX$oU4Y5HH8HzlvMaxwkNEJjK13ozElyU1wAHBoO/WW5dAaE4YEfnB0X3FxbynKMl4FBdC3Ovap0jINz4LPkNADg0", + access=UserAccess.Read, + ) + with pytest.raises(ValueError): + assert user.check_credentials("password", "salt") + + def test_hash_password_empty_hash(user: User) -> None: """ must return empty string after hash in case if password not set