mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-09-09 18:29:55 +00:00
Compare commits
1 Commits
61a09ce705
...
fa56cee92a
Author | SHA1 | Date | |
---|---|---|---|
fa56cee92a |
@ -61,15 +61,17 @@ libalpm and AUR related configuration. Group name can refer to architecture, e.g
|
||||
|
||||
Base authorization settings. ``OAuth`` provider requires ``aioauth-client`` library to be installed.
|
||||
|
||||
* ``target`` - specifies authorization provider, string, optional, default ``disabled``. Allowed values are ``disabled``, ``configuration``, ``oauth``.
|
||||
* ``target`` - specifies authorization provider, string, optional, default ``disabled``. Allowed values are ``disabled``, ``configuration``, ``oauth``, ``pam``.
|
||||
* ``allow_read_only`` - allow requesting status APIs without authorization, boolean, required.
|
||||
* ``client_id`` - OAuth2 application client ID, string, required in case if ``oauth`` is used.
|
||||
* ``client_secret`` - OAuth2 application client secret key, string, required in case if ``oauth`` is used.
|
||||
* ``cookie_secret_key`` - secret key which will be used for cookies encryption, string, optional. It must be 32 bytes URL-safe base64-encoded and can be generated as following ``base64.urlsafe_b64encode(os.urandom(32)).decode("utf8")``. If not set, it will be generated automatically; note, however, that in this case, all sessions will be automatically invalidated during the service restart.
|
||||
* ``full_access_group`` - name of the secondary group (e.g. ``wheel``) to be used as admin group in the service, string, required in case if ``pam`` is used.
|
||||
* ``max_age`` - parameter which controls both cookie expiration and token expiration inside the service in seconds, integer, optional, default is 7 days.
|
||||
* ``oauth_icon`` - OAuth2 login button icon, string, optional, default is ``google``. Must be valid `Bootstrap icon <https://icons.getbootstrap.com/>`__ name.
|
||||
* ``oauth_provider`` - OAuth2 provider class name as is in ``aioauth-client`` (e.g. ``GoogleClient``, ``GithubClient`` etc), string, required in case if ``oauth`` is used.
|
||||
* ``oauth_scopes`` - scopes list for OAuth2 provider, which will allow retrieving user email (which is used for checking user permissions), e.g. ``https://www.googleapis.com/auth/userinfo.email`` for ``GoogleClient`` or ``user:email`` for ``GithubClient``, space separated list of strings, required in case if ``oauth`` is used.
|
||||
* ``permit_root_login`` - allow login as root user, boolean, optional, default ``no``.
|
||||
* ``salt`` - additional password hash salt, string, optional.
|
||||
|
||||
Authorized users are stored inside internal database, if any of external providers (e.g. ``oauth``) are used, the password field for non-service users must be empty.
|
||||
|
@ -34,6 +34,8 @@ allow_read_only = yes
|
||||
; Cookie secret key to be used for cookies encryption. Must be valid 32 bytes URL-safe base64-encoded string.
|
||||
; If not set, it will be generated automatically.
|
||||
;cookie_secret_key =
|
||||
; Name of the secondary group to be used as admin group in the service.
|
||||
;full_access_group = wheel
|
||||
; Authentication cookie expiration in seconds.
|
||||
;max_age = 604800
|
||||
; OAuth2 provider icon for the web interface.
|
||||
@ -42,6 +44,8 @@ allow_read_only = yes
|
||||
;oauth_provider = GoogleClient
|
||||
; Scopes list for OAuth2 provider. Required if oauth is used.
|
||||
;oauth_scopes = https://www.googleapis.com/auth/userinfo.email
|
||||
; Allow login as root user (only if PAM is used).
|
||||
;permit_root_login = no
|
||||
; Optional password salt.
|
||||
;salt =
|
||||
|
||||
|
@ -12,6 +12,7 @@ Collection of the examples of docker compose configuration files, which covers s
|
||||
* [Index](index): repository with index page generator enabled.
|
||||
* [Multi repo](multirepo): run web service with two separated repositories.
|
||||
* [OAuth](oauth): web service with OAuth (GitHub provider) authentication enabled.
|
||||
* [PAM](pam): web service with PAM authentication enabled.
|
||||
* [Pull](pull): normal service, but in addition with pulling packages from another source (e.g. GitHub repository).
|
||||
* [Sign](sign): create repository with database signing.
|
||||
* [Web](web): simple web service with authentication enabled.
|
||||
|
6
recipes/pam/README.md
Normal file
6
recipes/pam/README.md
Normal file
@ -0,0 +1,6 @@
|
||||
# PAM
|
||||
|
||||
1. Create system user `demo` with password from `AHRIMAN_PASSWORD` environment variable and group `wheel`.
|
||||
2. Setup repository named `ahriman-demo` with architecture `x86_64`.
|
||||
3. Start web server at port `8080`.
|
||||
4. Repository is available at `http://localhost:8080/repo`.
|
63
recipes/pam/compose.yml
Normal file
63
recipes/pam/compose.yml
Normal file
@ -0,0 +1,63 @@
|
||||
services:
|
||||
backend:
|
||||
image: ahriman
|
||||
privileged: true
|
||||
|
||||
environment:
|
||||
AHRIMAN_DEBUG: yes
|
||||
AHRIMAN_OUTPUT: console
|
||||
AHRIMAN_PASSWORD: ${AHRIMAN_PASSWORD}
|
||||
AHRIMAN_PORT: 8080
|
||||
AHRIMAN_PRESETUP_COMMAND: useradd -d / -G wheel -M demo; (cat /run/secrets/password; echo; cat /run/secrets/password) | passwd demo
|
||||
AHRIMAN_REPOSITORY: ahriman-demo
|
||||
AHRIMAN_UNIX_SOCKET: /var/lib/ahriman/ahriman/ahriman.sock
|
||||
|
||||
configs:
|
||||
- source: service
|
||||
target: /etc/ahriman.ini.d/99-settings.ini
|
||||
secrets:
|
||||
- password
|
||||
|
||||
volumes:
|
||||
- type: volume
|
||||
source: repository
|
||||
target: /var/lib/ahriman
|
||||
volume:
|
||||
nocopy: true
|
||||
|
||||
healthcheck:
|
||||
test: curl --fail --silent --output /dev/null http://backend:8080/api/v1/info
|
||||
interval: 10s
|
||||
start_period: 30s
|
||||
|
||||
command: web
|
||||
|
||||
frontend:
|
||||
image: nginx
|
||||
ports:
|
||||
- 8080:80
|
||||
|
||||
configs:
|
||||
- source: nginx
|
||||
target: /etc/nginx/conf.d/default.conf
|
||||
|
||||
volumes:
|
||||
- type: volume
|
||||
source: repository
|
||||
target: /srv
|
||||
read_only: true
|
||||
volume:
|
||||
nocopy: true
|
||||
|
||||
configs:
|
||||
nginx:
|
||||
file: nginx.conf
|
||||
service:
|
||||
file: service.ini
|
||||
|
||||
secrets:
|
||||
password:
|
||||
environment: AHRIMAN_PASSWORD
|
||||
|
||||
volumes:
|
||||
repository:
|
18
recipes/pam/nginx.conf
Normal file
18
recipes/pam/nginx.conf
Normal file
@ -0,0 +1,18 @@
|
||||
server {
|
||||
listen 80;
|
||||
|
||||
location /repo {
|
||||
rewrite ^/repo/(.*) /$1 break;
|
||||
autoindex on;
|
||||
root /srv/ahriman/repository;
|
||||
}
|
||||
|
||||
location / {
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarder-Proto $scheme;
|
||||
|
||||
proxy_pass http://backend:8080;
|
||||
}
|
||||
}
|
3
recipes/pam/service.ini
Normal file
3
recipes/pam/service.ini
Normal file
@ -0,0 +1,3 @@
|
||||
[auth]
|
||||
target = pam
|
||||
full_access_group = wheel
|
@ -81,15 +81,18 @@ class Auth(LazyLogging):
|
||||
case AuthSettings.OAuth:
|
||||
from ahriman.core.auth.oauth import OAuth
|
||||
return OAuth(configuration, database)
|
||||
case AuthSettings.PAM:
|
||||
from ahriman.core.auth.pam import PAM
|
||||
return PAM(configuration, database)
|
||||
case _:
|
||||
return Auth(configuration)
|
||||
|
||||
async def check_credentials(self, username: str | None, password: str | None) -> bool:
|
||||
async def check_credentials(self, username: str, password: str | None) -> bool:
|
||||
"""
|
||||
validate user password
|
||||
|
||||
Args:
|
||||
username(str | None): username
|
||||
username(str): username
|
||||
password(str | None): entered password
|
||||
|
||||
Returns:
|
||||
@ -98,12 +101,12 @@ class Auth(LazyLogging):
|
||||
del username, password
|
||||
return True
|
||||
|
||||
async def known_username(self, username: str | None) -> bool:
|
||||
async def known_username(self, username: str) -> bool:
|
||||
"""
|
||||
check if user is known
|
||||
|
||||
Args:
|
||||
username(str | None): username
|
||||
username(str): username
|
||||
|
||||
Returns:
|
||||
bool: True in case if user is known and can be authorized and False otherwise
|
||||
|
@ -48,18 +48,18 @@ class Mapping(Auth):
|
||||
self.database = database
|
||||
self.salt = configuration.get("auth", "salt", fallback="")
|
||||
|
||||
async def check_credentials(self, username: str | None, password: str | None) -> bool:
|
||||
async def check_credentials(self, username: str, password: str | None) -> bool:
|
||||
"""
|
||||
validate user password
|
||||
|
||||
Args:
|
||||
username(str | None): username
|
||||
username(str): username
|
||||
password(str | None): entered password
|
||||
|
||||
Returns:
|
||||
bool: True in case if password matches, False otherwise
|
||||
"""
|
||||
if username is None or password is None:
|
||||
if password is None:
|
||||
return False # invalid data supplied
|
||||
user = self.get_user(username)
|
||||
return user is not None and user.check_credentials(password, self.salt)
|
||||
@ -76,12 +76,12 @@ class Mapping(Auth):
|
||||
"""
|
||||
return self.database.user_get(username)
|
||||
|
||||
async def known_username(self, username: str | None) -> bool:
|
||||
async def known_username(self, username: str) -> bool:
|
||||
"""
|
||||
check if user is known
|
||||
|
||||
Args:
|
||||
username(str | None): username
|
||||
username(str): username
|
||||
|
||||
Returns:
|
||||
bool: True in case if user is known and can be authorized and False otherwise
|
||||
|
131
src/ahriman/core/auth/pam.py
Normal file
131
src/ahriman/core/auth/pam.py
Normal file
@ -0,0 +1,131 @@
|
||||
#
|
||||
# Copyright (c) 2021-2024 ahriman team.
|
||||
#
|
||||
# This file is part of ahriman
|
||||
# (see https://github.com/arcan1s/ahriman).
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from grp import getgrnam
|
||||
from pwd import getpwnam
|
||||
|
||||
from ahriman.core.auth.mapping import Mapping
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.database import SQLite
|
||||
from ahriman.core.exceptions import CalledProcessError
|
||||
from ahriman.core.utils import check_output
|
||||
from ahriman.models.auth_settings import AuthSettings
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
|
||||
class PAM(Mapping):
|
||||
"""
|
||||
User authorization implementation by using default PAM
|
||||
|
||||
Attributes:
|
||||
full_access_group(str): group name users of which have full access
|
||||
permit_root_login(bool): permit login as root
|
||||
"""
|
||||
|
||||
def __init__(self, configuration: Configuration, database: SQLite,
|
||||
provider: AuthSettings = AuthSettings.PAM) -> None:
|
||||
"""
|
||||
default constructor
|
||||
|
||||
Args:
|
||||
configuration(Configuration): configuration instance
|
||||
database(SQLite): database instance
|
||||
provider(AuthSettings, optional): authorization type definition (Default value = AuthSettings.PAM)
|
||||
"""
|
||||
Mapping.__init__(self, configuration, database, provider)
|
||||
self.full_access_group = configuration.get("auth", "full_access_group")
|
||||
self.permit_root_login = configuration.getboolean("auth", "permit_root_login", fallback=False)
|
||||
|
||||
@staticmethod
|
||||
def group_members(group_name: str) -> list[str]:
|
||||
"""
|
||||
extract current group members
|
||||
|
||||
Args:
|
||||
group_name(str): group name
|
||||
|
||||
Returns:
|
||||
list[str]: list of users which belong to the specified group. In case if group wasn't found, the empty list
|
||||
will be returned
|
||||
"""
|
||||
try:
|
||||
group = getgrnam(group_name)
|
||||
except KeyError:
|
||||
return []
|
||||
return group.gr_mem
|
||||
|
||||
async def check_credentials(self, username: str, password: str | None) -> bool:
|
||||
"""
|
||||
validate user password
|
||||
|
||||
Args:
|
||||
username(str): username
|
||||
password(str | None): entered password
|
||||
|
||||
Returns:
|
||||
bool: True in case if password matches, False otherwise
|
||||
"""
|
||||
if password is None:
|
||||
return False # invalid data supplied
|
||||
if not self.permit_root_login and username == "root":
|
||||
return False # login as root is not allowed
|
||||
# the reason why do we call su here is that python-pam actually read shadow file
|
||||
# and hence requires root privileges
|
||||
try:
|
||||
check_output("su", "--command", "true", "-", username, input_data=password)
|
||||
return True
|
||||
except CalledProcessError:
|
||||
return await Mapping.check_credentials(self, username, password)
|
||||
|
||||
async def known_username(self, username: str) -> bool:
|
||||
"""
|
||||
check if user is known
|
||||
|
||||
Args:
|
||||
username(str): username
|
||||
|
||||
Returns:
|
||||
bool: True in case if user is known and can be authorized and False otherwise
|
||||
"""
|
||||
try:
|
||||
_ = getpwnam(username)
|
||||
return True
|
||||
except KeyError:
|
||||
return await Mapping.known_username(self, username)
|
||||
|
||||
async def verify_access(self, username: str, required: UserAccess, context: str | None) -> bool:
|
||||
"""
|
||||
validate if user has access to requested resource
|
||||
|
||||
Args:
|
||||
username(str): username
|
||||
required(UserAccess): required access level
|
||||
context(str | None): URI request path
|
||||
|
||||
Returns:
|
||||
bool: True in case if user is allowed to do this request and False otherwise
|
||||
"""
|
||||
# this method is basically inverted, first we check overrides in database and then fallback to the PAM logic
|
||||
if (user := self.get_user(username)) is not None:
|
||||
return user.verify_access(required)
|
||||
# if username is in admin group, then we treat it as full access
|
||||
if username in self.group_members(self.full_access_group):
|
||||
return UserAccess.Full.permits(required)
|
||||
# fallback to read-only accounts
|
||||
return UserAccess.Read.permits(required)
|
@ -115,6 +115,7 @@ CONFIGURATION_SCHEMA: ConfigurationSchema = {
|
||||
"oauth_provider",
|
||||
"oauth_scopes",
|
||||
]},
|
||||
{"allowed": ["pam"], "dependencies": ["full_access_group"]},
|
||||
],
|
||||
},
|
||||
"allow_read_only": {
|
||||
@ -135,6 +136,10 @@ CONFIGURATION_SCHEMA: ConfigurationSchema = {
|
||||
"minlength": 32,
|
||||
"maxlength": 64, # we cannot verify maxlength, because base64 representation might be longer than bytes
|
||||
},
|
||||
"full_access_group": {
|
||||
"type": "string",
|
||||
"empty": False,
|
||||
},
|
||||
"max_age": {
|
||||
"type": "integer",
|
||||
"coerce": "integer",
|
||||
@ -152,6 +157,10 @@ CONFIGURATION_SCHEMA: ConfigurationSchema = {
|
||||
"type": "string",
|
||||
"empty": False,
|
||||
},
|
||||
"permit_root_login": {
|
||||
"type": "boolean",
|
||||
"coerce": "boolean",
|
||||
},
|
||||
"salt": {
|
||||
"type": "string",
|
||||
},
|
||||
|
@ -30,11 +30,13 @@ class AuthSettings(StrEnum):
|
||||
Disabled(AuthSettings): (class attribute) authorization is disabled
|
||||
Configuration(AuthSettings): (class attribute) configuration based authorization
|
||||
OAuth(AuthSettings): (class attribute) OAuth based provider
|
||||
PAM(AuthSettings): (class attribute) PAM based provider
|
||||
"""
|
||||
|
||||
Disabled = "disabled"
|
||||
Configuration = "configuration"
|
||||
OAuth = "oauth2"
|
||||
PAM = "pam"
|
||||
|
||||
@property
|
||||
def is_enabled(self) -> bool:
|
||||
@ -62,5 +64,7 @@ class AuthSettings(StrEnum):
|
||||
return AuthSettings.Configuration
|
||||
case "oauth" | "oauth2":
|
||||
return AuthSettings.OAuth
|
||||
case "pam":
|
||||
return AuthSettings.PAM
|
||||
case _:
|
||||
return AuthSettings.Disabled
|
||||
|
@ -2,6 +2,7 @@ import pytest
|
||||
|
||||
from ahriman.core.auth.mapping import Mapping
|
||||
from ahriman.core.auth.oauth import OAuth
|
||||
from ahriman.core.auth.pam import PAM
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.database import SQLite
|
||||
|
||||
@ -35,3 +36,19 @@ def oauth(configuration: Configuration, database: SQLite) -> OAuth:
|
||||
"""
|
||||
configuration.set("web", "address", "https://example.com")
|
||||
return OAuth(configuration, database)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pam(configuration: Configuration, database: SQLite) -> PAM:
|
||||
"""
|
||||
PAM provider fixture
|
||||
|
||||
Args:
|
||||
configuration(Configuration): configuration fixture
|
||||
database(SQLite): database fixture
|
||||
|
||||
Returns:
|
||||
PAM: PAM service instance
|
||||
"""
|
||||
configuration.set_option("auth", "full_access_group", "wheel")
|
||||
return PAM(configuration, database)
|
||||
|
@ -1,6 +1,7 @@
|
||||
from ahriman.core.auth import Auth
|
||||
from ahriman.core.auth.mapping import Mapping
|
||||
from ahriman.core.auth.oauth import OAuth
|
||||
from ahriman.core.auth.pam import PAM
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.database import SQLite
|
||||
from ahriman.models.user import User
|
||||
@ -51,14 +52,22 @@ def test_load_oauth(configuration: Configuration, database: SQLite) -> None:
|
||||
assert isinstance(auth, OAuth)
|
||||
|
||||
|
||||
def test_load_pam(configuration: Configuration, database: SQLite) -> None:
|
||||
"""
|
||||
must load pam validator if option set
|
||||
"""
|
||||
configuration.set_option("auth", "target", "pam")
|
||||
configuration.set_option("auth", "full_access_group", "wheel")
|
||||
auth = Auth.load(configuration, database)
|
||||
assert isinstance(auth, PAM)
|
||||
|
||||
|
||||
async def test_check_credentials(auth: Auth, user: User) -> None:
|
||||
"""
|
||||
must pass any credentials
|
||||
"""
|
||||
assert await auth.check_credentials(user.username, user.password)
|
||||
assert await auth.check_credentials(None, "")
|
||||
assert await auth.check_credentials("", None)
|
||||
assert await auth.check_credentials(None, None)
|
||||
|
||||
|
||||
async def test_known_username(auth: Auth, user: User) -> None:
|
||||
|
@ -21,9 +21,7 @@ async def test_check_credentials_empty(mapping: Mapping) -> None:
|
||||
"""
|
||||
must reject on empty credentials
|
||||
"""
|
||||
assert not await mapping.check_credentials(None, "")
|
||||
assert not await mapping.check_credentials("", None)
|
||||
assert not await mapping.check_credentials(None, None)
|
||||
|
||||
|
||||
async def test_check_credentials_unknown(mapping: Mapping, user: User) -> None:
|
||||
@ -66,9 +64,8 @@ async def test_known_username(mapping: Mapping, user: User, mocker: MockerFixtur
|
||||
|
||||
async def test_known_username_unknown(mapping: Mapping, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must not allow only known users
|
||||
must not allow unknown users
|
||||
"""
|
||||
assert not await mapping.known_username(None)
|
||||
mocker.patch("ahriman.core.database.SQLite.user_get", return_value=None)
|
||||
assert not await mapping.known_username(user.password)
|
||||
|
||||
|
118
tests/ahriman/core/auth/test_pam.py
Normal file
118
tests/ahriman/core/auth/test_pam.py
Normal file
@ -0,0 +1,118 @@
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from ahriman.core.auth.pam import PAM
|
||||
from ahriman.core.exceptions import CalledProcessError
|
||||
from ahriman.models.user import User
|
||||
from ahriman.models.user_access import UserAccess
|
||||
|
||||
|
||||
def test_group_members() -> None:
|
||||
"""
|
||||
must return current group members
|
||||
"""
|
||||
assert "root" in PAM.group_members("root")
|
||||
|
||||
|
||||
def test_group_members_unknown() -> None:
|
||||
"""
|
||||
must return empty list for unknown group
|
||||
"""
|
||||
assert not PAM.group_members("somerandomgroupname")
|
||||
|
||||
|
||||
async def test_check_credentials(pam: PAM, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly check user credentials via PAM
|
||||
"""
|
||||
authenticate_mock = mocker.patch("ahriman.core.auth.pam.check_output")
|
||||
mapping_mock = mocker.patch("ahriman.core.auth.mapping.Mapping.check_credentials")
|
||||
|
||||
assert await pam.check_credentials(user.username, user.password)
|
||||
authenticate_mock.assert_called_once_with("su", "--command", "true", "-", user.username,
|
||||
input_data=user.password)
|
||||
mapping_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_check_credentials_empty(pam: PAM) -> None:
|
||||
"""
|
||||
must reject on empty credentials
|
||||
"""
|
||||
assert not await pam.check_credentials("", None)
|
||||
|
||||
|
||||
async def test_check_credentials_root(pam: PAM, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must reject on root logon attempt
|
||||
"""
|
||||
mocker.patch("ahriman.core.auth.pam.check_output")
|
||||
assert not await pam.check_credentials("root", user.password)
|
||||
|
||||
pam.permit_root_login = True
|
||||
assert await pam.check_credentials("root", user.password)
|
||||
|
||||
|
||||
async def test_check_credentials_mapping(pam: PAM, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly check user credentials via database if PAM rejected
|
||||
"""
|
||||
mocker.patch("ahriman.core.auth.pam.check_output",
|
||||
side_effect=CalledProcessError(1, ["command"], "error"))
|
||||
mapping_mock = mocker.patch("ahriman.core.auth.mapping.Mapping.check_credentials")
|
||||
|
||||
await pam.check_credentials(user.username, user.password)
|
||||
mapping_mock.assert_called_once_with(pam, user.username, user.password)
|
||||
|
||||
|
||||
async def test_known_username(pam: PAM, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must check if user exists in system
|
||||
"""
|
||||
getpwnam_mock = mocker.patch("ahriman.core.auth.pam.getpwnam")
|
||||
mapping_mock = mocker.patch("ahriman.core.auth.mapping.Mapping.known_username")
|
||||
|
||||
assert await pam.known_username(user.username)
|
||||
getpwnam_mock.assert_called_once_with(user.username)
|
||||
mapping_mock.assert_not_called()
|
||||
|
||||
|
||||
async def test_known_username_mapping(pam: PAM, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must fallback to username checking to database if no user found in system
|
||||
"""
|
||||
mocker.patch("ahriman.core.auth.pam.getpwnam", side_effect=KeyError)
|
||||
mapping_mock = mocker.patch("ahriman.core.auth.mapping.Mapping.known_username")
|
||||
|
||||
await pam.known_username(user.username)
|
||||
mapping_mock.assert_called_once_with(pam, user.username)
|
||||
|
||||
|
||||
async def test_verify_access(pam: PAM, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must verify user access via PAM groups
|
||||
"""
|
||||
mocker.patch("ahriman.core.auth.pam.PAM.get_user", return_value=None)
|
||||
mocker.patch("ahriman.core.auth.pam.PAM.group_members", return_value=[user.username])
|
||||
assert await pam.verify_access(user.username, UserAccess.Full, None)
|
||||
|
||||
|
||||
async def test_verify_access_readonly(pam: PAM, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must set user access to read only if it doesn't belong to the admin group
|
||||
"""
|
||||
mocker.patch("ahriman.core.auth.pam.PAM.get_user", return_value=None)
|
||||
mocker.patch("ahriman.core.auth.pam.PAM.group_members", return_value=[])
|
||||
|
||||
assert not await pam.verify_access(user.username, UserAccess.Full, None)
|
||||
assert not await pam.verify_access(user.username, UserAccess.Reporter, None)
|
||||
assert await pam.verify_access(user.username, UserAccess.Read, None)
|
||||
|
||||
|
||||
async def test_verify_access_override(pam: PAM, user: User, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must verify user access via database if there is override
|
||||
"""
|
||||
mocker.patch("ahriman.core.auth.pam.PAM.get_user", return_value=user)
|
||||
group_mock = mocker.patch("ahriman.core.auth.pam.PAM.group_members")
|
||||
|
||||
assert await pam.verify_access(user.username, user.access, None)
|
||||
group_mock.assert_not_called()
|
@ -26,6 +26,9 @@ def test_from_option_valid() -> None:
|
||||
assert AuthSettings.from_option("mapping") == AuthSettings.Configuration
|
||||
assert AuthSettings.from_option("MAPPing") == AuthSettings.Configuration
|
||||
|
||||
assert AuthSettings.from_option("pam") == AuthSettings.PAM
|
||||
assert AuthSettings.from_option("PAM") == AuthSettings.PAM
|
||||
|
||||
|
||||
def test_is_enabled() -> None:
|
||||
"""
|
||||
|
Reference in New Issue
Block a user