mirror of
				https://github.com/arcan1s/ahriman.git
				synced 2025-10-30 21:33:43 +00:00 
			
		
		
		
	feat: notify users about outdated password hashes used
This commit is contained in:
		| @ -59,10 +59,10 @@ class Mapping(Auth): | ||||
|         """ | ||||
|         if password is None: | ||||
|             return False  # invalid data supplied | ||||
|         user = self.get_user(username) | ||||
|         user = await self.get_user(username) | ||||
|         return user is not None and user.check_credentials(password, self.salt) | ||||
|  | ||||
|     def get_user(self, username: str) -> User | None: | ||||
|     async def get_user(self, username: str) -> User | None: | ||||
|         """ | ||||
|         retrieve user from in-memory mapping | ||||
|  | ||||
| @ -84,7 +84,7 @@ class Mapping(Auth): | ||||
|         Returns: | ||||
|             bool: ``True`` in case if user is known and can be authorized and ``False`` otherwise | ||||
|         """ | ||||
|         return username is not None and self.get_user(username) is not None | ||||
|         return username is not None and await self.get_user(username) is not None | ||||
|  | ||||
|     async def verify_access(self, username: str, required: UserAccess, context: str | None) -> bool: | ||||
|         """ | ||||
| @ -98,5 +98,5 @@ class Mapping(Auth): | ||||
|         Returns: | ||||
|             bool: ``True`` in case if user is allowed to do this request and ``False`` otherwise | ||||
|         """ | ||||
|         user = self.get_user(username) | ||||
|         user = await self.get_user(username) | ||||
|         return user is not None and user.verify_access(required) | ||||
|  | ||||
| @ -120,7 +120,7 @@ class PAM(Mapping): | ||||
|             bool: ``True`` in case if user is allowed to do this request and ``False`` otherwise | ||||
|         """ | ||||
|         # this method is basically inverted, first we check overrides in database and then fallback to the PAM logic | ||||
|         if (user := self.get_user(username)) is not None: | ||||
|         if (user := await self.get_user(username)) is not None: | ||||
|             return user.verify_access(required) | ||||
|         # if username is in admin group, then we treat it as full access | ||||
|         if username in self.group_members(self.full_access_group): | ||||
|  | ||||
| @ -32,6 +32,7 @@ class User: | ||||
|     authorized web user model | ||||
|  | ||||
|     Attributes: | ||||
|         SUPPORTED_ALGOS(set[str]): (class attribute) list of the supported hashing algorithms | ||||
|         username(str): username | ||||
|         password(str): hashed user password with salt | ||||
|         access(UserAccess): user role | ||||
| @ -68,6 +69,8 @@ class User: | ||||
|     packager_id: str | None = None | ||||
|     key: str | None = None | ||||
|  | ||||
|     SUPPORTED_ALGOS = {"$2$", "$2a$", "$2x$", "$2y$", "$2b$"} | ||||
|  | ||||
|     def __post_init__(self) -> None: | ||||
|         """ | ||||
|         remove empty fields | ||||
| @ -75,6 +78,19 @@ class User: | ||||
|         object.__setattr__(self, "packager_id", self.packager_id or None) | ||||
|         object.__setattr__(self, "key", self.key or None) | ||||
|  | ||||
|     @property | ||||
|     def algo(self) -> str | None: | ||||
|         """ | ||||
|         extract algorithm used for the hashing password | ||||
|  | ||||
|         Returns: | ||||
|             str | None: first part of password hash (e.g. ``$2$``) if available or ``None`` otherwise | ||||
|         """ | ||||
|         if not self.password: | ||||
|             return None | ||||
|         algo = next(segment for segment in self.password.split("$") if segment) | ||||
|         return f"${algo}$" | ||||
|  | ||||
|     @staticmethod | ||||
|     def generate_password(length: int) -> str: | ||||
|         """ | ||||
| @ -98,7 +114,12 @@ class User: | ||||
|  | ||||
|         Returns: | ||||
|             bool: ``True`` in case if password matches, ``False`` otherwise | ||||
|  | ||||
|         Raises: | ||||
|             ValueError: if user password is set to unsupported algorithm | ||||
|         """ | ||||
|         if (algo := self.algo) is not None and algo not in self.SUPPORTED_ALGOS: | ||||
|             raise ValueError(f"Crypt {algo} is not supported, consider setting new password") | ||||
|         try: | ||||
|             return bcrypt.checkpw((password + salt).encode("utf8"), self.password.encode("utf8")) | ||||
|         except ValueError: | ||||
|  | ||||
| @ -31,27 +31,27 @@ async def test_check_credentials_unknown(mapping: Mapping, user: User) -> None: | ||||
|     assert not await mapping.check_credentials(user.username, user.password) | ||||
|  | ||||
|  | ||||
| def test_get_user(mapping: Mapping, user: User, mocker: MockerFixture) -> None: | ||||
| async def test_get_user(mapping: Mapping, user: User, mocker: MockerFixture) -> None: | ||||
|     """ | ||||
|     must return user from storage by username | ||||
|     """ | ||||
|     mocker.patch("ahriman.core.database.SQLite.user_get", return_value=user) | ||||
|     assert mapping.get_user(user.username) == user | ||||
|     assert await mapping.get_user(user.username) == user | ||||
|  | ||||
|  | ||||
| def test_get_user_normalized(mapping: Mapping, user: User, mocker: MockerFixture) -> None: | ||||
| async def test_get_user_normalized(mapping: Mapping, user: User, mocker: MockerFixture) -> None: | ||||
|     """ | ||||
|     must return user from storage by username case-insensitive | ||||
|     """ | ||||
|     mocker.patch("ahriman.core.database.SQLite.user_get", return_value=user) | ||||
|     assert mapping.get_user(user.username.upper()) == user | ||||
|     assert await mapping.get_user(user.username.upper()) == user | ||||
|  | ||||
|  | ||||
| def test_get_user_unknown(mapping: Mapping, user: User) -> None: | ||||
| async def test_get_user_unknown(mapping: Mapping, user: User) -> None: | ||||
|     """ | ||||
|     must return None in case if no user found | ||||
|     """ | ||||
|     assert mapping.get_user(user.username) is None | ||||
|     assert await mapping.get_user(user.username) is None | ||||
|  | ||||
|  | ||||
| async def test_known_username(mapping: Mapping, user: User, mocker: MockerFixture) -> None: | ||||
|  | ||||
| @ -1,9 +1,29 @@ | ||||
| import pytest | ||||
|  | ||||
| from dataclasses import replace | ||||
|  | ||||
| from ahriman.models.user import User | ||||
| from ahriman.models.user_access import UserAccess | ||||
|  | ||||
|  | ||||
| def test_algo() -> None: | ||||
|     """ | ||||
|     must correctly define algorithm used | ||||
|     """ | ||||
|     assert User(username="user", password=None, access=UserAccess.Read).algo is None | ||||
|     assert User(username="user", password="", access=UserAccess.Read).algo is None | ||||
|     assert User( | ||||
|         username="user", | ||||
|         password="$6$rounds=656000$mWBiecMPrHAL1VgX$oU4Y5HH8HzlvMaxwkNEJjK13ozElyU1wAHBoO/WW5dAaE4YEfnB0X3FxbynKMl4FBdC3Ovap0jINz4LPkNADg0", | ||||
|         access=UserAccess.Read, | ||||
|     ).algo == "$6$" | ||||
|     assert User( | ||||
|         username="user", | ||||
|         password="$2b$12$VCWKazvYxH7B0eAalDGAbu/3y1dSWs79sv/2ujjX1TMaFdVUy80hy", | ||||
|         access=UserAccess.Read, | ||||
|     ).algo == "$2b$" | ||||
|  | ||||
|  | ||||
| def test_check_credentials_hash_password(user: User) -> None: | ||||
|     """ | ||||
|     must generate and validate user password | ||||
| @ -20,11 +40,23 @@ def test_check_credentials_empty_hash(user: User) -> None: | ||||
|     must reject any authorization if the hash is invalid | ||||
|     """ | ||||
|     current_password = user.password | ||||
|     assert not user.check_credentials(current_password, "salt") | ||||
|     user = replace(user, password="") | ||||
|     assert not user.check_credentials(current_password, "salt") | ||||
|  | ||||
|  | ||||
| def test_check_credentials_sha512() -> None: | ||||
|     """ | ||||
|     must raise DeprecationWarning for sha512 hashed passwords | ||||
|     """ | ||||
|     user = User( | ||||
|         username="user", | ||||
|         password="$6$rounds=656000$mWBiecMPrHAL1VgX$oU4Y5HH8HzlvMaxwkNEJjK13ozElyU1wAHBoO/WW5dAaE4YEfnB0X3FxbynKMl4FBdC3Ovap0jINz4LPkNADg0", | ||||
|         access=UserAccess.Read, | ||||
|     ) | ||||
|     with pytest.raises(ValueError): | ||||
|         assert user.check_credentials("password", "salt") | ||||
|  | ||||
|  | ||||
| def test_hash_password_empty_hash(user: User) -> None: | ||||
|     """ | ||||
|     must return empty string after hash in case if password not set | ||||
|  | ||||
		Reference in New Issue
	
	Block a user