mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-07-23 02:39:57 +00:00
multipart upload with signatures as well as safe file save
This commit is contained in:
@ -76,6 +76,13 @@ def test_sign_options(configuration: Configuration) -> None:
|
||||
assert default_key == "default-key"
|
||||
|
||||
|
||||
def test_signature() -> None:
|
||||
"""
|
||||
must correctly generate the signature path
|
||||
"""
|
||||
assert GPG.signature(Path("path") / "to" / "package.tar.xz") == Path("path") / "to" / "package.tar.xz.sig"
|
||||
|
||||
|
||||
def test_key_download(gpg: GPG, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must download the key from public server
|
||||
@ -222,6 +229,18 @@ def test_process_sign_package_skip_4(gpg: GPG, mocker: MockerFixture) -> None:
|
||||
process_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_process_sign_package_skip_already_signed(gpg_with_key: GPG, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must not sign package if it was already signed
|
||||
"""
|
||||
result = [Path("a"), Path("a.sig")]
|
||||
mocker.patch("pathlib.Path.is_file", return_value=True)
|
||||
process_mock = mocker.patch("ahriman.core.sign.gpg.GPG.process")
|
||||
|
||||
assert gpg_with_key.process_sign_package(Path("a"), gpg_with_key.default_key) == result
|
||||
process_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_process_sign_repository_1(gpg_with_key: GPG, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must sign repository
|
||||
|
@ -273,6 +273,15 @@ def test_package_like(package_ahriman: Package) -> None:
|
||||
assert package_like(package_ahriman.packages[package_ahriman.base].filepath)
|
||||
|
||||
|
||||
def test_package_like_hidden(package_ahriman: Package) -> None:
|
||||
"""
|
||||
package_like must return false for hidden files
|
||||
"""
|
||||
package_file = package_ahriman.packages[package_ahriman.base].filepath
|
||||
hidden_file = package_file.parent / f".{package_file.name}"
|
||||
assert not package_like(hidden_file)
|
||||
|
||||
|
||||
def test_package_like_sig(package_ahriman: Package) -> None:
|
||||
"""
|
||||
package_like must return false for signature files
|
||||
|
@ -2,6 +2,7 @@ import pytest
|
||||
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from unittest.mock import MagicMock, call as MockCall
|
||||
|
||||
from ahriman.core.upload.remote_service import RemoteService
|
||||
from ahriman.models.package import Package
|
||||
@ -20,21 +21,44 @@ def test_package_upload(remote_service: RemoteService, package_ahriman: Package,
|
||||
"""
|
||||
must upload package to remote host
|
||||
"""
|
||||
open_mock = mocker.patch("pathlib.Path.open")
|
||||
mocker.patch("pathlib.Path.is_file", return_value=False)
|
||||
file_mock = MagicMock()
|
||||
open_mock = mocker.patch("pathlib.Path.open", return_value=file_mock)
|
||||
upload_mock = mocker.patch("ahriman.core.upload.http_upload.HttpUpload._request")
|
||||
filename = package_ahriman.packages[package_ahriman.base].filename
|
||||
|
||||
remote_service.sync(Path("local"), [package_ahriman])
|
||||
open_mock.assert_called_once_with("rb")
|
||||
file_mock.close.assert_called_once()
|
||||
upload_mock.assert_called_once_with("POST", f"{remote_service.client.address}/api/v1/service/upload", files={
|
||||
"archive": (filename, pytest.helpers.anyvar(int), "application/octet-stream", {})
|
||||
"package": (filename, pytest.helpers.anyvar(int), "application/octet-stream", {})
|
||||
})
|
||||
|
||||
|
||||
def test_package_upload_no_filename(
|
||||
remote_service: RemoteService,
|
||||
package_ahriman: Package,
|
||||
mocker: MockerFixture) -> None:
|
||||
def test_package_upload_with_signature(remote_service: RemoteService, package_ahriman: Package,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must upload package to remote host with signatures
|
||||
"""
|
||||
mocker.patch("pathlib.Path.is_file", return_value=True)
|
||||
file_mock = MagicMock()
|
||||
open_mock = mocker.patch("pathlib.Path.open", return_value=file_mock)
|
||||
upload_mock = mocker.patch("ahriman.core.upload.http_upload.HttpUpload._request")
|
||||
filename = package_ahriman.packages[package_ahriman.base].filename
|
||||
|
||||
remote_service.sync(Path("local"), [package_ahriman])
|
||||
open_mock.assert_has_calls([MockCall("rb"), MockCall("rb")])
|
||||
file_mock.close.assert_has_calls([MockCall(), MockCall()])
|
||||
upload_mock.assert_called_once_with(
|
||||
"POST", f"{remote_service.client.address}/api/v1/service/upload", files={
|
||||
"package": (filename, pytest.helpers.anyvar(int), "application/octet-stream", {}),
|
||||
"signature": (f"{filename}.sig", pytest.helpers.anyvar(int), "application/octet-stream", {})
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def test_package_upload_no_filename(remote_service: RemoteService, package_ahriman: Package,
|
||||
mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must skip upload if no filename set
|
||||
"""
|
||||
|
@ -2,9 +2,13 @@ import pytest
|
||||
|
||||
from aiohttp import FormData
|
||||
from aiohttp.test_utils import TestClient
|
||||
from aiohttp.web import HTTPBadRequest
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from unittest.mock import AsyncMock, MagicMock, call as MockCall
|
||||
|
||||
from ahriman.models.repository_paths import RepositoryPaths
|
||||
from ahriman.models.user_access import UserAccess
|
||||
from ahriman.web.views.service.upload import UploadView
|
||||
|
||||
@ -18,21 +22,109 @@ async def test_get_permission() -> None:
|
||||
assert await UploadView.get_permission(request) == UserAccess.Full
|
||||
|
||||
|
||||
async def test_post(client: TestClient, mocker: MockerFixture) -> None:
|
||||
async def test_save_file(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must correctly save file
|
||||
"""
|
||||
part_mock = MagicMock()
|
||||
part_mock.filename = "filename"
|
||||
part_mock.read_chunk = AsyncMock(side_effect=[b"content", None])
|
||||
|
||||
tempfile_mock = mocker.patch("tempfile.NamedTemporaryFile")
|
||||
file_mock = MagicMock()
|
||||
tempfile_mock.return_value.__enter__.return_value = file_mock
|
||||
|
||||
open_mock = mocker.patch("pathlib.Path.open")
|
||||
copy_mock = mocker.patch("shutil.copyfileobj")
|
||||
local = Path("local")
|
||||
|
||||
assert await UploadView.save_file(part_mock, local, max_body_size=None) == \
|
||||
(part_mock.filename, local / f".{part_mock.filename}")
|
||||
file_mock.write.assert_called_once_with(b"content")
|
||||
open_mock.assert_called_once_with("wb")
|
||||
copy_mock.assert_called_once_with(pytest.helpers.anyvar(int), pytest.helpers.anyvar(int))
|
||||
|
||||
|
||||
async def test_save_file_no_filename() -> None:
|
||||
"""
|
||||
must raise exception on missing filename
|
||||
"""
|
||||
part_mock = MagicMock()
|
||||
part_mock.filename = None
|
||||
|
||||
with pytest.raises(HTTPBadRequest):
|
||||
await UploadView.save_file(part_mock, Path("local"), max_body_size=None)
|
||||
|
||||
|
||||
async def test_save_file_invalid_filename() -> None:
|
||||
"""
|
||||
must raise exception on invalid filename
|
||||
"""
|
||||
part_mock = MagicMock()
|
||||
part_mock.filename = ".."
|
||||
|
||||
with pytest.raises(HTTPBadRequest):
|
||||
await UploadView.save_file(part_mock, Path("local"), max_body_size=None)
|
||||
|
||||
|
||||
async def test_save_file_too_big() -> None:
|
||||
"""
|
||||
must raise exception on too big file
|
||||
"""
|
||||
part_mock = MagicMock()
|
||||
part_mock.filename = "filename"
|
||||
part_mock.read_chunk = AsyncMock(side_effect=[b"content", None])
|
||||
|
||||
with pytest.raises(HTTPBadRequest):
|
||||
await UploadView.save_file(part_mock, Path("local"), max_body_size=0)
|
||||
|
||||
|
||||
async def test_post(client: TestClient, repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must process file upload via http
|
||||
"""
|
||||
open_mock = mocker.patch("pathlib.Path.open")
|
||||
copy_mock = mocker.patch("shutil.copyfileobj")
|
||||
local = Path("local")
|
||||
save_mock = mocker.patch("ahriman.web.views.service.upload.UploadView.save_file",
|
||||
side_effect=AsyncMock(return_value=("filename", local / ".filename")))
|
||||
rename_mock = mocker.patch("pathlib.Path.rename")
|
||||
# no content validation here because it has invalid schema
|
||||
|
||||
data = FormData()
|
||||
data.add_field("archive", BytesIO(b"content"), filename="filename", content_type="application/octet-stream")
|
||||
data.add_field("package", BytesIO(b"content"), filename="filename", content_type="application/octet-stream")
|
||||
|
||||
response = await client.post("/api/v1/service/upload", data=data)
|
||||
assert response.ok
|
||||
open_mock.assert_called_once_with("wb")
|
||||
copy_mock.assert_called_once_with(pytest.helpers.anyvar(int), pytest.helpers.anyvar(int))
|
||||
save_mock.assert_called_once_with(pytest.helpers.anyvar(int), repository_paths.packages, max_body_size=None)
|
||||
rename_mock.assert_called_once_with(local / "filename")
|
||||
|
||||
|
||||
async def test_post_with_sig(client: TestClient, repository_paths: RepositoryPaths, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must process file upload with signature via http
|
||||
"""
|
||||
local = Path("local")
|
||||
save_mock = mocker.patch("ahriman.web.views.service.upload.UploadView.save_file",
|
||||
side_effect=AsyncMock(side_effect=[
|
||||
("filename", local / ".filename"),
|
||||
("filename.sig", local / ".filename.sig"),
|
||||
]))
|
||||
rename_mock = mocker.patch("pathlib.Path.rename")
|
||||
# no content validation here because it has invalid schema
|
||||
|
||||
data = FormData()
|
||||
data.add_field("package", BytesIO(b"content"), filename="filename")
|
||||
data.add_field("signature", BytesIO(b"sig"), filename="filename.sig")
|
||||
|
||||
response = await client.post("/api/v1/service/upload", data=data)
|
||||
assert response.ok
|
||||
save_mock.assert_has_calls([
|
||||
MockCall(pytest.helpers.anyvar(int), repository_paths.packages, max_body_size=None),
|
||||
MockCall(pytest.helpers.anyvar(int), repository_paths.packages, max_body_size=None),
|
||||
])
|
||||
rename_mock.assert_has_calls([
|
||||
MockCall(local / "filename"),
|
||||
MockCall(local / "filename.sig"),
|
||||
])
|
||||
|
||||
|
||||
async def test_post_not_found(client: TestClient, mocker: MockerFixture) -> None:
|
||||
@ -41,7 +133,7 @@ async def test_post_not_found(client: TestClient, mocker: MockerFixture) -> None
|
||||
"""
|
||||
mocker.patch("ahriman.core.configuration.Configuration.getboolean", return_value=False)
|
||||
data = FormData()
|
||||
data.add_field("archive", BytesIO(b"content"), filename="filename", content_type="application/octet-stream")
|
||||
data.add_field("package", BytesIO(b"content"), filename="filename", content_type="application/octet-stream")
|
||||
response_schema = pytest.helpers.schema_response(UploadView.post, code=404)
|
||||
|
||||
response = await client.post("/api/v1/service/upload", data=data)
|
||||
@ -67,14 +159,14 @@ async def test_post_not_body_part(client: TestClient, mocker: MockerFixture) ->
|
||||
response_schema = pytest.helpers.schema_response(UploadView.post, code=400)
|
||||
mocker.patch("aiohttp.MultipartReader.next", return_value=42) # surprise, motherfucker
|
||||
data = FormData()
|
||||
data.add_field("archive", BytesIO(b"content"), filename="filename", content_type="application/octet-stream")
|
||||
data.add_field("package", BytesIO(b"content"), filename="filename", content_type="application/octet-stream")
|
||||
|
||||
response = await client.post("/api/v1/service/upload", data=data)
|
||||
assert response.status == 400
|
||||
assert not response_schema.validate(await response.json())
|
||||
|
||||
|
||||
async def test_post_not_archive(client: TestClient) -> None:
|
||||
async def test_post_not_package(client: TestClient) -> None:
|
||||
"""
|
||||
must return 400 on invalid multipart key
|
||||
"""
|
||||
@ -85,31 +177,3 @@ async def test_post_not_archive(client: TestClient) -> None:
|
||||
response = await client.post("/api/v1/service/upload", data=data)
|
||||
assert response.status == 400
|
||||
assert not response_schema.validate(await response.json())
|
||||
|
||||
|
||||
async def test_post_filename_invalid(client: TestClient) -> None:
|
||||
"""
|
||||
must return 400 if filename is invalid
|
||||
"""
|
||||
response_schema = pytest.helpers.schema_response(UploadView.post, code=400)
|
||||
|
||||
data = FormData()
|
||||
data.add_field("archive", BytesIO(b"content"), filename="..", content_type="application/octet-stream")
|
||||
response = await client.post("/api/v1/service/upload", data=data)
|
||||
assert response.status == 400
|
||||
assert not response_schema.validate(await response.json())
|
||||
|
||||
|
||||
async def test_post_file_too_big(client: TestClient, mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must return 400 if file is too big
|
||||
"""
|
||||
mocker.patch("pathlib.Path.open")
|
||||
mocker.patch("ahriman.core.configuration.Configuration.getint", return_value=0)
|
||||
data = FormData()
|
||||
data.add_field("archive", BytesIO(b"content"), filename="filename", content_type="application/octet-stream")
|
||||
response_schema = pytest.helpers.schema_response(UploadView.post, code=400)
|
||||
|
||||
response = await client.post("/api/v1/service/upload", data=data)
|
||||
assert response.status == 400
|
||||
assert not response_schema.validate(await response.json())
|
||||
|
Reference in New Issue
Block a user