split github upload into generic http method and github specific

We might use some features from the http upload for another parser
This commit is contained in:
2021-10-15 23:36:09 +03:00
parent 5f7f58041d
commit fd38dfd176
7 changed files with 175 additions and 124 deletions

View File

@ -17,7 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import hashlib
import mimetypes
import requests
@ -25,15 +24,14 @@ from pathlib import Path
from typing import Any, Dict, Iterable, Optional
from ahriman.core.configuration import Configuration
from ahriman.core.upload.upload import Upload
from ahriman.core.util import exception_response_text, walk
from ahriman.core.upload.http_upload import HttpUpload
from ahriman.core.util import walk
from ahriman.models.package import Package
class Github(Upload):
class Github(HttpUpload):
"""
upload files to github releases
:ivar auth: requests authentication tuple
:ivar gh_owner: github repository owner
:ivar gh_repository: github repository name
"""
@ -44,64 +42,10 @@ class Github(Upload):
:param architecture: repository architecture
:param configuration: configuration instance
"""
Upload.__init__(self, architecture, configuration)
HttpUpload.__init__(self, architecture, configuration, "github")
self.gh_owner = configuration.get("github", "owner")
self.gh_repository = configuration.get("github", "repository")
gh_api_key = configuration.get("github", "api_key")
gh_username = configuration.get("github", "username", fallback=self.gh_owner)
self.auth = (gh_username, gh_api_key)
@staticmethod
def calculate_hash(path: Path) -> str:
"""
calculate file checksum. Github API does not provide hashes itself, so we have to handle it manually
:param path: path to local file
:return: calculated checksum of the file
"""
with path.open("rb") as local_file:
md5 = hashlib.md5(local_file.read()) # nosec
return md5.hexdigest()
@staticmethod
def get_body(local_files: Dict[Path, str]) -> str:
"""
generate release body from the checksums as returned from Github.get_hashes method
:param local_files: map of the paths to its checksum
:return: body to be inserted into release
"""
return "\n".join(f"{file.name} {md5}" for file, md5 in sorted(local_files.items()))
@staticmethod
def get_hashes(release: Dict[str, Any]) -> Dict[str, str]:
"""
get checksums of the content from the repository
:param release: release object
:return: map of the filename to its checksum as it is written in body
"""
body: str = release["body"] or ""
files = {}
for line in body.splitlines():
file, md5 = line.split()
files[file] = md5
return files
def _request(self, method: str, url: str, **kwargs: Any) -> requests.Response:
"""
github request wrapper
:param method: request method
:param url: request url
:param kwargs: request parameters to be passed as is
:return: request response object
"""
try:
response = requests.request(method, url, auth=self.auth, **kwargs)
response.raise_for_status()
except requests.HTTPError as e:
self.logger.exception("could not perform %s request to %s: %s", method, url, exception_response_text(e))
raise
return response
def asset_remove(self, release: Dict[str, Any], name: str) -> None:
"""
remove asset from the release by name
@ -210,7 +154,8 @@ class Github(Upload):
if release is None:
release = self.release_create()
remote_files = self.get_hashes(release)
body: str = release.get("body") or ""
remote_files = self.get_hashes(body)
local_files = self.get_local_files(path)
self.files_upload(release, local_files, remote_files)

View File

@ -0,0 +1,96 @@
#
# Copyright (c) 2021 ahriman team.
#
# This file is part of ahriman
# (see https://github.com/arcan1s/ahriman).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import hashlib
import requests
from pathlib import Path
from typing import Any, Dict
from ahriman.core.configuration import Configuration
from ahriman.core.upload.upload import Upload
from ahriman.core.util import exception_response_text
class HttpUpload(Upload):
"""
helper for the http based uploads
:ivar auth: HTTP auth object
"""
def __init__(self, architecture: str, configuration: Configuration, section: str) -> None:
"""
default constructor
:param architecture: repository architecture
:param configuration: configuration instance
:param section: configuration section name
"""
Upload.__init__(self, architecture, configuration)
password = configuration.get(section, "password")
username = configuration.get(section, "username")
self.auth = (password, username)
@staticmethod
def calculate_hash(path: Path) -> str:
"""
calculate file checksum
:param path: path to local file
:return: calculated checksum of the file
"""
with path.open("rb") as local_file:
md5 = hashlib.md5(local_file.read()) # nosec
return md5.hexdigest()
@staticmethod
def get_body(local_files: Dict[Path, str]) -> str:
"""
generate release body from the checksums as returned from HttpUpload.get_hashes method
:param local_files: map of the paths to its checksum
:return: body to be inserted into release
"""
return "\n".join(f"{file.name} {md5}" for file, md5 in sorted(local_files.items()))
@staticmethod
def get_hashes(body: str) -> Dict[str, str]:
"""
get checksums of the content from the repository
:param body: release string body object
:return: map of the filename to its checksum as it is written in body
"""
files = {}
for line in body.splitlines():
file, md5 = line.split()
files[file] = md5
return files
def _request(self, method: str, url: str, **kwargs: Any) -> requests.Response:
"""
request wrapper
:param method: request method
:param url: request url
:param kwargs: request parameters to be passed as is
:return: request response object
"""
try:
response = requests.request(method, url, auth=self.auth, **kwargs)
response.raise_for_status()
except requests.HTTPError as e:
self.logger.exception("could not perform %s request to %s: %s", method, url, exception_response_text(e))
raise
return response