mirror of
https://github.com/arcan1s/ahriman.git
synced 2025-04-24 07:17:17 +00:00
Native s3 sync (#23)
* Native S3 sync implementation * fix imports * fix paths reading * install s3 components duriing test stage
This commit is contained in:
parent
f8ff2cbf5b
commit
aad599be67
1
.github/workflows/run-tests.yml
vendored
1
.github/workflows/run-tests.yml
vendored
@ -21,5 +21,6 @@ jobs:
|
||||
/bin/bash -c "pacman --noconfirm -Syu base-devel python python-pip && \
|
||||
pip install -e .[web] && \
|
||||
pip install -e .[check] && \
|
||||
pip install -e .[s3] && \
|
||||
pip install -e .[test] && \
|
||||
make check tests"
|
||||
|
@ -89,10 +89,13 @@ Group name must refer to architecture, e.g. it should be `rsync:x86_64` for x86_
|
||||
|
||||
### `s3:*` groups
|
||||
|
||||
Group name must refer to architecture, e.g. it should be `s3:x86_64` for x86_64 architecture. Requires `aws-cli` package to be installed. Do not forget to configure it for user `ahriman`.
|
||||
Group name must refer to architecture, e.g. it should be `s3:x86_64` for x86_64 architecture.
|
||||
|
||||
* `command` - s3 command to run, space separated list of string, required.
|
||||
* `bucket` - bucket name (e.g. `s3://bucket/path`), string, required.
|
||||
* `access_key` - AWS access key ID, string, required.
|
||||
* `bucket` - bucket name (e.g. `bucket`), string, required.
|
||||
* `chunk_size` - chunk size for calculating entity tags, int, optional, default 8 * 1024 * 1024.
|
||||
* `region` - bucket region (e.g. `eu-central-1`), string, required.
|
||||
* `secret_key` - AWS secret access key, string, required.
|
||||
|
||||
## `web:*` groups
|
||||
|
||||
|
@ -9,13 +9,13 @@ url="https://github.com/arcan1s/ahriman"
|
||||
license=('GPL3')
|
||||
depends=('devtools' 'git' 'pyalpm' 'python-aur' 'python-srcinfo')
|
||||
makedepends=('python-pip')
|
||||
optdepends=('aws-cli: sync to s3'
|
||||
'breezy: -bzr packages support'
|
||||
optdepends=('breezy: -bzr packages support'
|
||||
'darcs: -darcs packages support'
|
||||
'gnupg: package and repository sign'
|
||||
'mercurial: -hg packages support'
|
||||
'python-aiohttp: web server'
|
||||
'python-aiohttp-jinja2: web server'
|
||||
'python-boto3: sync to s3'
|
||||
'python-jinja: html report generation'
|
||||
'rsync: sync by using rsync'
|
||||
'subversion: -svn packages support')
|
||||
|
@ -40,7 +40,7 @@ target =
|
||||
command = rsync --archive --compress --partial --delete
|
||||
|
||||
[s3]
|
||||
command = aws s3 sync --quiet --delete
|
||||
chunk_size = 8388608
|
||||
|
||||
[web]
|
||||
host = 0.0.0.0
|
||||
|
3
setup.py
3
setup.py
@ -77,6 +77,9 @@ setup(
|
||||
"mypy",
|
||||
"pylint",
|
||||
],
|
||||
"s3": [
|
||||
"boto3",
|
||||
],
|
||||
"test": [
|
||||
"pytest",
|
||||
"pytest-aiohttp",
|
||||
|
@ -17,24 +17,24 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import boto3 # type: ignore
|
||||
import hashlib
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
from typing import Any, Dict, Generator, Iterable
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.upload.upload import Upload
|
||||
from ahriman.core.util import check_output
|
||||
from ahriman.models.package import Package
|
||||
|
||||
|
||||
class S3(Upload):
|
||||
"""
|
||||
aws-cli wrapper
|
||||
:ivar bucket: full bucket name
|
||||
:ivar command: command arguments for sync
|
||||
:ivar bucket: boto3 S3 bucket object
|
||||
:ivar chunk_size: chunk size for calculating checksums
|
||||
"""
|
||||
|
||||
_check_output = check_output
|
||||
|
||||
def __init__(self, architecture: str, configuration: Configuration) -> None:
|
||||
"""
|
||||
default constructor
|
||||
@ -42,8 +42,68 @@ class S3(Upload):
|
||||
:param configuration: configuration instance
|
||||
"""
|
||||
Upload.__init__(self, architecture, configuration)
|
||||
self.bucket = configuration.get("s3", "bucket")
|
||||
self.command = configuration.getlist("s3", "command")
|
||||
self.bucket = self.get_bucket(configuration)
|
||||
self.chunk_size = configuration.getint("s3", "chunk_size", fallback=8 * 1024 * 1024)
|
||||
|
||||
@staticmethod
|
||||
def calculate_etag(path: Path, chunk_size: int) -> str:
|
||||
"""
|
||||
calculate amazon s3 etag
|
||||
credits to https://teppen.io/2018/10/23/aws_s3_verify_etags/
|
||||
:param path: path to local file
|
||||
:param chunk_size: read chunk size, which depends on client settings
|
||||
:return: calculated entity tag for local file
|
||||
"""
|
||||
md5s = []
|
||||
with path.open("rb") as local_file:
|
||||
for chunk in iter(lambda: local_file.read(chunk_size), b""):
|
||||
md5s.append(hashlib.md5(chunk))
|
||||
|
||||
# in case if there is only one chunk it must be just this checksum
|
||||
# and checksum of joined digest otherwise (including empty list)
|
||||
checksum = md5s[0] if len(md5s) == 1 else hashlib.md5(b"".join(md5.digest() for md5 in md5s))
|
||||
# in case if there are more than one chunk it should be appended with amount of chunks
|
||||
suffix = f"-{len(md5s)}" if len(md5s) > 1 else ""
|
||||
return f"{checksum.hexdigest()}{suffix}"
|
||||
|
||||
@staticmethod
|
||||
def get_bucket(configuration: Configuration) -> Any:
|
||||
"""
|
||||
create resource client from configuration
|
||||
:param configuration: configuration instance
|
||||
:return: amazon client
|
||||
"""
|
||||
client = boto3.resource(service_name="s3",
|
||||
region_name=configuration.get("s3", "region"),
|
||||
aws_access_key_id=configuration.get("s3", "access_key"),
|
||||
aws_secret_access_key=configuration.get("s3", "secret_key"))
|
||||
return client.Bucket(configuration.get("s3", "bucket"))
|
||||
|
||||
def get_local_files(self, path: Path) -> Dict[Path, str]:
|
||||
"""
|
||||
get all local files and their calculated checksums
|
||||
:param path: local path to sync
|
||||
:return: map of path object to its checksum
|
||||
"""
|
||||
# credits to https://stackoverflow.com/a/64915960
|
||||
def walk(directory_path: Path) -> Generator[Path, None, None]:
|
||||
for element in directory_path.iterdir():
|
||||
if element.is_dir():
|
||||
yield from walk(element)
|
||||
continue
|
||||
yield element
|
||||
return {
|
||||
local_file.relative_to(path): self.calculate_etag(local_file, self.chunk_size)
|
||||
for local_file in walk(path)
|
||||
}
|
||||
|
||||
def get_remote_objects(self) -> Dict[Path, Any]:
|
||||
"""
|
||||
get all remote objects and their checksums
|
||||
:return: map of path object to the remote s3 object
|
||||
"""
|
||||
objects = self.bucket.objects.filter(Prefix=self.architecture)
|
||||
return {Path(item.key).relative_to(self.architecture): item for item in objects}
|
||||
|
||||
def sync(self, path: Path, built_packages: Iterable[Package]) -> None:
|
||||
"""
|
||||
@ -51,5 +111,21 @@ class S3(Upload):
|
||||
:param path: local path to sync
|
||||
:param built_packages: list of packages which has just been built
|
||||
"""
|
||||
# TODO rewrite to boto, but it is bullshit
|
||||
S3._check_output(*self.command, str(path), self.bucket, exception=None, logger=self.logger)
|
||||
remote_objects = self.get_remote_objects()
|
||||
local_files = self.get_local_files(path)
|
||||
|
||||
# sync to remotes first
|
||||
for local_file, checksum in local_files.items():
|
||||
remote_object = remote_objects.get(local_file)
|
||||
# 0 and -1 elements are " (double quote)
|
||||
remote_checksum = remote_object.e_tag[1:-1] if remote_object is not None else None
|
||||
if remote_checksum == checksum:
|
||||
continue
|
||||
remote_path = Path(self.architecture) / local_file
|
||||
self.bucket.upload_file(str(path / local_file), str(remote_path))
|
||||
|
||||
# remove files which were removed locally
|
||||
for local_file, remote_object in remote_objects.items():
|
||||
if local_file in local_files:
|
||||
continue
|
||||
remote_object.delete()
|
||||
|
@ -14,4 +14,4 @@ def test_all_packages_with_provides(pacman: Pacman) -> None:
|
||||
"""
|
||||
package list must contain provides packages
|
||||
"""
|
||||
assert 'sh' in pacman.all_packages()
|
||||
assert "sh" in pacman.all_packages()
|
||||
|
22
tests/ahriman/core/upload/conftest.py
Normal file
22
tests/ahriman/core/upload/conftest.py
Normal file
@ -0,0 +1,22 @@
|
||||
import pytest
|
||||
|
||||
from collections import namedtuple
|
||||
from typing import List
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.upload.s3 import S3
|
||||
|
||||
|
||||
_s3_object = namedtuple("s3_object", ["key", "e_tag", "delete"])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def s3(configuration: Configuration) -> S3:
|
||||
return S3("x86_64", configuration)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def s3_remote_objects() -> List[_s3_object]:
|
||||
delete_mock = MagicMock()
|
||||
return list(map(lambda item: _s3_object(f"x86_64/{item}", f"\"{item}\"", delete_mock), ["a", "b", "c"]))
|
@ -1,16 +1,98 @@
|
||||
from pathlib import Path
|
||||
from pytest_mock import MockerFixture
|
||||
from typing import Any, List
|
||||
from unittest import mock
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from ahriman.core.configuration import Configuration
|
||||
from ahriman.core.upload.s3 import S3
|
||||
|
||||
|
||||
def test_sync(configuration: Configuration, mocker: MockerFixture) -> None:
|
||||
_chunk_size = 8 * 1024 * 1024
|
||||
|
||||
|
||||
def test_calculate_etag_big(resource_path_root: Path) -> None:
|
||||
"""
|
||||
must calculate checksum for path which is more than one chunk
|
||||
"""
|
||||
path = resource_path_root / "models" / "big_file_checksum"
|
||||
assert S3.calculate_etag(path, _chunk_size) == "3b15154eaeed22ae19ae4667d4b98d28-2"
|
||||
|
||||
|
||||
def test_calculate_etag_empty(resource_path_root: Path) -> None:
|
||||
"""
|
||||
must calculate checksum for empty file correctly
|
||||
"""
|
||||
path = resource_path_root / "models" / "empty_file_checksum"
|
||||
assert S3.calculate_etag(path, _chunk_size) == "d41d8cd98f00b204e9800998ecf8427e"
|
||||
|
||||
|
||||
def test_calculate_etag_small(resource_path_root: Path) -> None:
|
||||
"""
|
||||
must calculate checksum for path which is single chunk
|
||||
"""
|
||||
path = resource_path_root / "models" / "package_ahriman_srcinfo"
|
||||
assert S3.calculate_etag(path, _chunk_size) == "04e75b4aa0fe6033e711e8ea98e059b2"
|
||||
|
||||
|
||||
def test_get_local_files(s3: S3, resource_path_root: Path) -> None:
|
||||
"""
|
||||
must get all local files recursively
|
||||
"""
|
||||
expected = sorted([
|
||||
Path("core/ahriman.ini"),
|
||||
Path("core/logging.ini"),
|
||||
Path("models/big_file_checksum"),
|
||||
Path("models/empty_file_checksum"),
|
||||
Path("models/package_ahriman_srcinfo"),
|
||||
Path("models/package_tpacpi-bat-git_srcinfo"),
|
||||
Path("models/package_yay_srcinfo"),
|
||||
Path("web/templates/search-line.jinja2"),
|
||||
Path("web/templates/build-status.jinja2"),
|
||||
Path("web/templates/repo-index.jinja2"),
|
||||
Path("web/templates/sorttable.jinja2"),
|
||||
Path("web/templates/style.jinja2"),
|
||||
Path("web/templates/search.jinja2"),
|
||||
])
|
||||
|
||||
local_files = list(sorted(s3.get_local_files(resource_path_root).keys()))
|
||||
assert local_files == expected
|
||||
|
||||
|
||||
def test_get_remote_objects(s3: S3, s3_remote_objects: List[Any]) -> None:
|
||||
"""
|
||||
must generate list of remote objects by calling boto3 function
|
||||
"""
|
||||
expected = {Path(item.key).relative_to(s3.architecture): item for item in s3_remote_objects}
|
||||
|
||||
s3.bucket = MagicMock()
|
||||
s3.bucket.objects.filter.return_value = s3_remote_objects
|
||||
|
||||
assert s3.get_remote_objects() == expected
|
||||
|
||||
|
||||
def test_sync(s3: S3, s3_remote_objects: List[Any], mocker: MockerFixture) -> None:
|
||||
"""
|
||||
must run sync command
|
||||
"""
|
||||
check_output_mock = mocker.patch("ahriman.core.upload.s3.S3._check_output")
|
||||
root = Path("path")
|
||||
local_files = {
|
||||
Path(item.key.replace("a", "d")): item.e_tag.replace("b", "d").replace("\"", "")
|
||||
for item in s3_remote_objects
|
||||
}
|
||||
remote_objects = {Path(item.key): item for item in s3_remote_objects}
|
||||
print(local_files)
|
||||
print(remote_objects)
|
||||
|
||||
upload = S3("x86_64", configuration)
|
||||
upload.sync(Path("path"), [])
|
||||
check_output_mock.assert_called_once()
|
||||
local_files_mock = mocker.patch("ahriman.core.upload.s3.S3.get_local_files", return_value=local_files)
|
||||
remote_objects_mock = mocker.patch("ahriman.core.upload.s3.S3.get_remote_objects", return_value=remote_objects)
|
||||
upload_mock = s3.bucket = MagicMock()
|
||||
|
||||
s3.sync(root, [])
|
||||
|
||||
local_files_mock.assert_called_once()
|
||||
remote_objects_mock.assert_called_once()
|
||||
upload_mock.upload_file.assert_has_calls([
|
||||
mock.call(str(root / s3.architecture / "b"), f"{s3.architecture}/{s3.architecture}/b"),
|
||||
mock.call(str(root / s3.architecture / "d"), f"{s3.architecture}/{s3.architecture}/d"),
|
||||
], any_order=True)
|
||||
remote_objects[Path("x86_64/a")].delete.assert_called_once()
|
||||
|
@ -48,8 +48,10 @@ command = rsync --archive --verbose --compress --partial --delete
|
||||
remote =
|
||||
|
||||
[s3]
|
||||
bucket =
|
||||
command = aws s3 sync --quiet --delete
|
||||
access_key =
|
||||
bucket = bucket
|
||||
region = eu-central-1
|
||||
secret_key =
|
||||
|
||||
[web]
|
||||
host = 0.0.0.0
|
||||
|
BIN
tests/testresources/models/big_file_checksum
Normal file
BIN
tests/testresources/models/big_file_checksum
Normal file
Binary file not shown.
0
tests/testresources/models/empty_file_checksum
Normal file
0
tests/testresources/models/empty_file_checksum
Normal file
Loading…
Reference in New Issue
Block a user