Native S3 sync implementation

This commit is contained in:
Evgenii Alekseev 2021-08-10 04:06:55 +03:00
parent 952b55f707
commit 43d839c255
11 changed files with 210 additions and 26 deletions

View File

@ -89,10 +89,13 @@ Group name must refer to architecture, e.g. it should be `rsync:x86_64` for x86_
### `s3:*` groups ### `s3:*` groups
Group name must refer to architecture, e.g. it should be `s3:x86_64` for x86_64 architecture. Requires `aws-cli` package to be installed. Do not forget to configure it for user `ahriman`. Group name must refer to architecture, e.g. it should be `s3:x86_64` for x86_64 architecture.
* `command` - s3 command to run, space separated list of string, required. * `access_key` - AWS access key ID, string, required.
* `bucket` - bucket name (e.g. `s3://bucket/path`), string, required. * `bucket` - bucket name (e.g. `bucket`), string, required.
* `chunk_size` - chunk size for calculating entity tags, int, optional, default 8 * 1024 * 1024.
* `region` - bucket region (e.g. `eu-central-1`), string, required.
* `secret_key` - AWS secret access key, string, required.
## `web:*` groups ## `web:*` groups

View File

@ -9,13 +9,13 @@ url="https://github.com/arcan1s/ahriman"
license=('GPL3') license=('GPL3')
depends=('devtools' 'git' 'pyalpm' 'python-aur' 'python-srcinfo') depends=('devtools' 'git' 'pyalpm' 'python-aur' 'python-srcinfo')
makedepends=('python-pip') makedepends=('python-pip')
optdepends=('aws-cli: sync to s3' optdepends=('breezy: -bzr packages support'
'breezy: -bzr packages support'
'darcs: -darcs packages support' 'darcs: -darcs packages support'
'gnupg: package and repository sign' 'gnupg: package and repository sign'
'mercurial: -hg packages support' 'mercurial: -hg packages support'
'python-aiohttp: web server' 'python-aiohttp: web server'
'python-aiohttp-jinja2: web server' 'python-aiohttp-jinja2: web server'
'python-boto3: sync to s3'
'python-jinja: html report generation' 'python-jinja: html report generation'
'rsync: sync by using rsync' 'rsync: sync by using rsync'
'subversion: -svn packages support') 'subversion: -svn packages support')

View File

@ -40,7 +40,7 @@ target =
command = rsync --archive --compress --partial --delete command = rsync --archive --compress --partial --delete
[s3] [s3]
command = aws s3 sync --quiet --delete chunk_size = 8388608
[web] [web]
host = 0.0.0.0 host = 0.0.0.0

View File

@ -77,6 +77,9 @@ setup(
"mypy", "mypy",
"pylint", "pylint",
], ],
"s3": [
"boto3",
],
"test": [ "test": [
"pytest", "pytest",
"pytest-aiohttp", "pytest-aiohttp",

View File

@ -17,24 +17,24 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
import boto3 # type: ignore
import hashlib
from pathlib import Path from pathlib import Path
from typing import Iterable from typing import Any, Dict, Generator, Iterable
from ahriman.core.configuration import Configuration from ahriman.core.configuration import Configuration
from ahriman.core.upload.upload import Upload from ahriman.core.upload.upload import Upload
from ahriman.core.util import check_output
from ahriman.models.package import Package from ahriman.models.package import Package
class S3(Upload): class S3(Upload):
""" """
aws-cli wrapper aws-cli wrapper
:ivar bucket: full bucket name :ivar bucket: boto3 S3 bucket object
:ivar command: command arguments for sync :ivar chunk_size: chunk size for calculating checksums
""" """
_check_output = check_output
def __init__(self, architecture: str, configuration: Configuration) -> None: def __init__(self, architecture: str, configuration: Configuration) -> None:
""" """
default constructor default constructor
@ -42,8 +42,68 @@ class S3(Upload):
:param configuration: configuration instance :param configuration: configuration instance
""" """
Upload.__init__(self, architecture, configuration) Upload.__init__(self, architecture, configuration)
self.bucket = configuration.get("s3", "bucket") self.bucket = self.get_bucket(configuration)
self.command = configuration.getlist("s3", "command") self.chunk_size = configuration.getint("s3", "chunk_size", fallback=8 * 1024 * 1024)
@staticmethod
def calculate_etag(path: Path, chunk_size: int) -> str:
"""
calculate amazon s3 etag
credits to https://teppen.io/2018/10/23/aws_s3_verify_etags/
:param path: path to local file
:param chunk_size: read chunk size, which depends on client settings
:return: calculated entity tag for local file
"""
md5s = []
with path.open("rb") as local_file:
for chunk in iter(lambda: local_file.read(chunk_size), b""):
md5s.append(hashlib.md5(chunk))
# in case if there is only one chunk it must be just this checksum
# and checksum of joined digest otherwise (including empty list)
checksum = md5s[0] if len(md5s) == 1 else hashlib.md5(b"".join(md5.digest() for md5 in md5s))
# in case if there are more than one chunk it should be appended with amount of chunks
suffix = f"-{len(md5s)}" if len(md5s) > 1 else ""
return f"{checksum.hexdigest()}{suffix}"
@staticmethod
def get_bucket(configuration: Configuration) -> Any:
"""
create resource client from configuration
:param configuration: configuration instance
:return: amazon client
"""
client = boto3.resource(service_name="s3",
region_name=configuration.get("s3", "region"),
aws_access_key_id=configuration.get("s3", "access_key"),
aws_secret_access_key=configuration.get("s3", "secret_key"))
return client.Bucket(configuration.get("s3", "bucket"))
def get_local_files(self, path: Path) -> Dict[Path, str]:
"""
get all local files and their calculated checksums
:param path: local path to sync
:return: map of path object to its checksum
"""
# credits to https://stackoverflow.com/a/64915960
def walk(directory_path: Path) -> Generator[Path, None, None]:
for element in directory_path.iterdir():
if element.is_dir():
yield from walk(element)
continue
yield element.resolve()
return {
local_file.relative_to(path): self.calculate_etag(local_file, self.chunk_size)
for local_file in walk(path)
}
def get_remote_objects(self) -> Dict[Path, Any]:
"""
get all remote objects and their checksums
:return: map of path object to the remote s3 object
"""
objects = self.bucket.objects.all()
return {Path(item.key): item for item in objects}
def sync(self, path: Path, built_packages: Iterable[Package]) -> None: def sync(self, path: Path, built_packages: Iterable[Package]) -> None:
""" """
@ -51,5 +111,20 @@ class S3(Upload):
:param path: local path to sync :param path: local path to sync
:param built_packages: list of packages which has just been built :param built_packages: list of packages which has just been built
""" """
# TODO rewrite to boto, but it is bullshit remote_objects = self.get_remote_objects()
S3._check_output(*self.command, str(path), self.bucket, exception=None, logger=self.logger) local_files = self.get_local_files(path)
# sync to remotes first
for local_file, checksum in local_files.items():
remote_object = remote_objects.get(local_file)
# 0 and -1 elements are " (double quote)
remote_checksum = remote_object.e_tag[1:-1] if remote_object is not None else None
if remote_checksum == checksum:
continue
self.bucket.upload_file(str(path / local_file), str(local_file))
# remove files which were removed locally
for local_file, remote_object in remote_objects.items():
if local_file in local_files:
continue
remote_object.delete()

View File

@ -14,4 +14,4 @@ def test_all_packages_with_provides(pacman: Pacman) -> None:
""" """
package list must contain provides packages package list must contain provides packages
""" """
assert 'sh' in pacman.all_packages() assert "sh" in pacman.all_packages()

View File

@ -0,0 +1,23 @@
import pytest
from collections import namedtuple
from pytest_mock import MockerFixture
from typing import List
from unittest.mock import MagicMock
from ahriman.core.configuration import Configuration
from ahriman.core.upload.s3 import S3
_s3_object = namedtuple("s3_object", ["key", "e_tag", "delete"])
@pytest.fixture
def s3(configuration: Configuration) -> S3:
return S3("x86_64", configuration)
@pytest.fixture
def s3_remote_objects() -> List[_s3_object]:
delete_mock = MagicMock()
return list(map(lambda item: _s3_object(item, f"\"{item}\"", delete_mock), ["a", "b", "c"]))

View File

@ -1,16 +1,94 @@
from pathlib import Path from pathlib import Path
from pytest_mock import MockerFixture from unittest import mock
from pytest_mock import MockerFixture
from typing import Any, List
from unittest.mock import MagicMock
from ahriman.core.configuration import Configuration
from ahriman.core.upload.s3 import S3 from ahriman.core.upload.s3 import S3
def test_sync(configuration: Configuration, mocker: MockerFixture) -> None: _chunk_size = 8 * 1024 * 1024
def test_calculate_etag_big(resource_path_root: Path) -> None:
"""
must calculate checksum for path which is more than one chunk
"""
path = resource_path_root / "models" / "big_file_checksum"
assert S3.calculate_etag(path, _chunk_size) == "3b15154eaeed22ae19ae4667d4b98d28-2"
def test_calculate_etag_empty(resource_path_root: Path) -> None:
"""
must calculate checksum for empty file correctly
"""
path = resource_path_root / "models" / "empty_file_checksum"
assert S3.calculate_etag(path, _chunk_size) == "d41d8cd98f00b204e9800998ecf8427e"
def test_calculate_etag_small(resource_path_root: Path) -> None:
"""
must calculate checksum for path which is single chunk
"""
path = resource_path_root / "models" / "package_ahriman_srcinfo"
assert S3.calculate_etag(path, _chunk_size) == "04e75b4aa0fe6033e711e8ea98e059b2"
def test_get_local_files(s3: S3, resource_path_root: Path) -> None:
"""
must get all local files recursively
"""
expected = sorted([
Path("core/ahriman.ini"),
Path("core/logging.ini"),
Path("models/big_file_checksum"),
Path("models/empty_file_checksum"),
Path("models/package_ahriman_srcinfo"),
Path("models/package_tpacpi-bat-git_srcinfo"),
Path("models/package_yay_srcinfo"),
Path("web/templates/search-line.jinja2"),
Path("web/templates/build-status.jinja2"),
Path("web/templates/repo-index.jinja2"),
Path("web/templates/sorttable.jinja2"),
Path("web/templates/style.jinja2"),
Path("web/templates/search.jinja2"),
])
local_files = list(sorted(s3.get_local_files(resource_path_root).keys()))
assert local_files == expected
def test_get_remote_objects(s3: S3, s3_remote_objects: List[Any]) -> None:
"""
must generate list of remote objects by calling boto3 function
"""
expected = {Path(item.key): item for item in s3_remote_objects}
s3.bucket = MagicMock()
s3.bucket.objects.all.return_value = s3_remote_objects
assert s3.get_remote_objects() == expected
def test_sync(s3: S3, s3_remote_objects: List[Any], mocker: MockerFixture) -> None:
""" """
must run sync command must run sync command
""" """
check_output_mock = mocker.patch("ahriman.core.upload.s3.S3._check_output") root = Path("path")
local_files = {Path(item.key.replace("a", "d")): item.key.replace("b", "d") for item in s3_remote_objects}
remote_objects = {Path(item.key): item for item in s3_remote_objects}
upload = S3("x86_64", configuration) local_files_mock = mocker.patch("ahriman.core.upload.s3.S3.get_local_files", return_value=local_files)
upload.sync(Path("path"), []) remote_objects_mock = mocker.patch("ahriman.core.upload.s3.S3.get_remote_objects", return_value=remote_objects)
check_output_mock.assert_called_once() upload_mock = s3.bucket = MagicMock()
s3.sync(root, [])
local_files_mock.assert_called_once()
remote_objects_mock.assert_called_once()
upload_mock.upload_file.assert_has_calls([
mock.call(str(root / Path("b")), str(Path("b"))),
mock.call(str(root / Path("d")), str(Path("d"))),
], any_order=True)
remote_objects[Path("a")].delete.assert_called_once()

View File

@ -48,8 +48,10 @@ command = rsync --archive --verbose --compress --partial --delete
remote = remote =
[s3] [s3]
bucket = access_key =
command = aws s3 sync --quiet --delete bucket = bucket
region = eu-central-1
secret_key =
[web] [web]
host = 0.0.0.0 host = 0.0.0.0

Binary file not shown.