"""Uploader for handling objects for uploads."""
# pylint: disable=line-too-long, broad-exception-caught
import abc
import copy
import os
import concurrent.futures
import threading
from typing import Any, Optional, IO, MutableMapping, List
from . import exceptions
from . import models
from . import validation
from . import utils
from . import io_utils
from . import defaults
from .serde import copy_request
from .checkpoint import UploadCheckpoint
from .crc import Crc64
from .paginator import ListPartsPaginator
[docs]
class UploadAPIClient(abc.ABC):
"""Abstract base class for uploader client."""
[docs]
@abc.abstractmethod
def put_object(self, request: models.PutObjectRequest, **kwargs) -> models.PutObjectResult:
"""Uploads objects."""
[docs]
@abc.abstractmethod
def head_object(self, request: models.HeadObjectRequest, **kwargs) -> models.HeadObjectResult:
"""Queries information about the object in a bucket."""
[docs]
@abc.abstractmethod
def initiate_multipart_upload(self, request: models.InitiateMultipartUploadRequest, **kwargs
) -> models.InitiateMultipartUploadResult:
"""
Initiates a multipart upload task before you can upload data
in parts to Object Storage Service (OSS).
"""
[docs]
@abc.abstractmethod
def upload_part(self, request: models.UploadPartRequest, **kwargs) -> models.UploadPartResult:
"""
Call the UploadPart interface to upload data in blocks (parts)
based on the specified Object name and uploadId.
"""
[docs]
@abc.abstractmethod
def complete_multipart_upload(self, request: models.CompleteMultipartUploadRequest, **kwargs
) -> models.CompleteMultipartUploadResult:
"""
Completes the multipart upload task of an object after all parts
of the object are uploaded.
"""
[docs]
@abc.abstractmethod
def abort_multipart_upload(self, request: models.AbortMultipartUploadRequest, **kwargs
) -> models.AbortMultipartUploadResult:
"""
Cancels a multipart upload task and deletes the parts uploaded in the task.
"""
[docs]
@abc.abstractmethod
def list_parts(self, request: models.ListPartsRequest, **kwargs
) -> models.ListPartsResult:
"""
Lists all parts that are uploaded by using a specified upload ID.
"""
[docs]
class UploaderOptions:
"""
uploader options
"""
def __init__(
self,
part_size: Optional[int] = None,
parallel_num: Optional[int] = None,
leave_parts_on_error: Optional[bool] = None,
enable_checkpoint: Optional[bool] = None,
checkpoint_dir: Optional[str] = None,
) -> None:
self.part_size = part_size
self.parallel_num = parallel_num
self.leave_parts_on_error = leave_parts_on_error or False
self.enable_checkpoint = enable_checkpoint or False
self.checkpoint_dir = checkpoint_dir
[docs]
class UploadResult:
"""
upload result
"""
def __init__(
self,
upload_id: Optional[str] = None,
etag: Optional[str] = None,
version_id: Optional[str] = None,
hash_crc64: Optional[str] = None,
) -> None:
self.upload_id = upload_id
self.etag = etag
self.version_id = version_id
self.hash_crc64 = hash_crc64
self.status = ''
self.status_code = 0
self.request_id = ''
self.headers: MutableMapping[str, str] = {}
[docs]
class UploadError(exceptions.BaseError):
"""
Upload Error.
"""
fmt = 'upload failed, {upload_id}, {path}, {error}.'
def __init__(self, **kwargs):
exceptions.BaseError.__init__(self, **kwargs)
self._error = kwargs.get("error", None)
self.upload_id = kwargs.get("upload_id", None)
self.path = kwargs.get("path", None)
[docs]
def unwrap(self) -> Exception:
"""returns the detail error"""
return self._error
[docs]
class Uploader:
"""Uploader for handling objects for uploads."""
def __init__(
self,
client: UploadAPIClient,
**kwargs: Any
) -> None:
"""
client (UploadAPIClient): A agent that implements the PutObject and Multipart Upload api.
"""
part_size = kwargs.get('part_size', defaults.DEFAULT_UPLOAD_PART_SIZE)
parallel_num = kwargs.get('parallel_num', defaults.DEFAULT_UPLOAD_PARALLEL)
leave_parts_on_error = kwargs.get('leave_parts_on_error', False)
self._client = client
self._options = UploaderOptions(
part_size=part_size,
parallel_num=parallel_num,
leave_parts_on_error=leave_parts_on_error,
enable_checkpoint=kwargs.get('enable_checkpoint', None),
checkpoint_dir=kwargs.get('checkpoint_dir', None),
)
feature_flags = 0
is_eclient = False
cstr = str(client)
if cstr == '<OssClient>':
feature_flags = client._client._options.feature_flags
elif cstr == '<OssEncryptionClient>':
feature_flags = client.unwrap()._client._options.feature_flags
is_eclient = True
self._feature_flags = feature_flags
self._is_eclient = is_eclient
[docs]
def upload_file(
self,
request: models.PutObjectRequest,
filepath: str,
**kwargs: Any
) -> UploadResult:
"""upload file
Args:
request (models.PutObjectRequest): _description_
filepath (str): _description_
Returns:
UploadResult: _description_
"""
delegate = self._delegate(request, **kwargs)
delegate.check_source(filepath)
with open(delegate.reader_filepath, 'rb') as reader:
delegate.apply_source(reader)
delegate.check_checkpoint()
delegate.update_crc_flag()
delegate.adjust_source()
result = delegate.upload()
delegate.close_reader()
return result
[docs]
def upload_from(
self,
request: models.PutObjectRequest,
reader: IO[bytes],
**kwargs: Any
) -> UploadResult:
"""upload from
Args:
request (models.PutObjectRequest): _description_
reader (IO[bytes]): _description_
Returns:
UploadResult: _description_
"""
delegate = self._delegate(request, **kwargs)
delegate.apply_source(reader)
return delegate.upload()
def _delegate(
self,
request: models.GetObjectRequest,
**kwargs: Any
) -> "_UploaderDelegate":
if request is None:
raise exceptions.ParamNullError(field='request')
if not validation.is_valid_bucket_name(utils.safety_str(request.bucket)):
raise exceptions.ParamInvalidError(field='request.bucket')
if not validation.is_valid_object_name(utils.safety_str(request.key)):
raise exceptions.ParamInvalidError(field='request.key')
options = copy.copy(self._options)
options.part_size = kwargs.get('part_size', self._options.part_size)
options.parallel_num = kwargs.get('parallel_num', self._options.parallel_num)
options.leave_parts_on_error = kwargs.get('leave_parts_on_error', self._options.leave_parts_on_error)
options.enable_checkpoint = kwargs.get('enable_checkpoint', self._options.enable_checkpoint)
options.checkpoint_dir = kwargs.get('checkpoint_dir', self._options.checkpoint_dir)
if options.part_size <= 0:
options.part_size = defaults.DEFAULT_UPLOAD_PART_SIZE
if options.parallel_num <= 0:
options.parallel_num = defaults.DEFAULT_UPLOAD_PARALLEL
delegate = _UploaderDelegate(
base=self,
client=self._client,
request=request,
options=options
)
return delegate
class _UploadContext:
def __init__(
self,
upload_id: str = None,
start_num: int = None,
) -> None:
self.upload_id = upload_id
self.start_num = start_num
class _UploaderDelegate:
def __init__(
self,
base: Uploader,
client: UploadAPIClient,
request: models.PutObjectRequest,
options: UploaderOptions,
) -> None:
"""
"""
self._base = base
self._client = client
self._reqeust = request
self._options = options
parallel = options.parallel_num > 1
self._reader: IO[bytes] = None
self._progress_lock = threading.Lock() if parallel else None
self._reader_pos = 0
self._total_size = 0
self._transferred = 0
self._reader_seekable = False
#Source's Info
self._filepath: str = None
self._file_stat: os.stat_result = None
#checkpoint
self._checkpoint = None
#CRC
self._check_crc = False
self._ccrc = 0
#use mulitpart upload
self._upload_part_lock = None
self._upload_errors = []
self._uploaded_parts = []
# resumable upload
self._upload_id = None
self._part_number = None
@property
def reader_filepath(self) -> str:
"""reader filepath
"""
return self._filepath
def check_source(self, filepath:str):
"""check source
"""
if len(filepath) == 0:
raise exceptions.ParamInvalidError(field='filepath')
absfilepath = os.path.abspath(filepath)
if not os.path.isfile(absfilepath):
raise exceptions.FileNotExist(filepath=filepath)
if not os.access(absfilepath, os.R_OK):
raise exceptions.FileNotReadable(filepath=filepath)
self._filepath = absfilepath
self._file_stat = os.stat(absfilepath)
def apply_source(self, reader):
"""apply source
"""
if reader is None:
raise exceptions.ParamInvalidError(field = 'reader')
total_size = utils.guess_content_length(reader)
if total_size is None:
total_size = -1
part_size = self._options.part_size
if total_size > 0:
while self._total_size/part_size >= defaults.MAX_UPLOAD_PARTS:
part_size += self._options.part_size
self._reader = reader
self._options.part_size = part_size
self._total_size = total_size
self._reader_seekable = utils.is_seekable(reader)
def check_checkpoint(self):
"""check checkpoint
"""
if not self._options.enable_checkpoint:
return
if not self._reader_seekable:
return
checkpoint = UploadCheckpoint(
request=self._reqeust,
filepath=self._filepath,
basedir=self._options.checkpoint_dir,
fileinfo=self._file_stat,
part_size=self._options.part_size)
checkpoint.load()
if checkpoint.loaded:
self._upload_id = checkpoint.upload_id
self._options.leave_parts_on_error = True
self._checkpoint = checkpoint
def update_crc_flag(self):
"""update crc flag
"""
#FF_ENABLE_CRC64_CHECK_UPLOAD = 0x00000008
if (self._base._feature_flags & 0x00000008) > 0:
self._check_crc = True
def adjust_source(self):
""" resume from upload id
"""
if not self._upload_id:
return
uploaded_parts:List[models.Part] = []
ccrc = 0
for part in self._iter_uploaded_part():
uploaded_parts.append(models.UploadPart(part_number=part.part_number, etag=part.etag))
if self._check_crc and part.hash_crc64 is not None:
ccrc = Crc64.combine(ccrc, int(part.hash_crc64), part.size)
if len(uploaded_parts) == 0:
return
# update from upload's result
part_number = uploaded_parts[-1].part_number
next_offset = part_number * self._options.part_size
#print(f'last part number={part_number}, next offset={next_offset}')
self._uploaded_parts = uploaded_parts
self._reader_pos = next_offset
self._part_number = part_number + 1
self._ccrc = ccrc
def set_reader(self, reader) ->IO[bytes]:
"""set reader
"""
self._reader = reader
def close_reader(self):
"""close reader
"""
if self._checkpoint:
self._checkpoint.remove()
self._reader = None
self._checkpoint = None
def upload(self) -> UploadResult:
"""Breakpoint upload
"""
if self._total_size >= 0 and self._total_size < self._options.part_size:
return self._single_part()
return self._multipart_part()
def _single_part(self) -> UploadResult:
request = models.PutObjectRequest()
copy_request(request, self._reqeust)
request.body = self._reader
if request.content_type is None:
request.content_type = self._get_content_type()
try:
result = self._client.put_object(request)
except Exception as err:
raise self._wrap_error('', err)
ret = UploadResult(
etag=result.etag,
version_id=result.version_id,
hash_crc64=result.hash_crc64,
)
ret.status = result.status
ret.status_code = result.status_code
ret.request_id = result.request_id
ret.headers = result.headers
return ret
def _multipart_part(self) -> UploadResult:
# init the multipart
try:
upload_ctx = self._get_upload_context()
except Exception as err:
raise self._wrap_error('', err)
# update checkpoint
if self._checkpoint:
self._checkpoint.upload_id = upload_ctx.upload_id
self._checkpoint.dump()
# upload part
parallel = self._options.parallel_num > 1
if parallel:
self._upload_part_lock = threading.Lock()
with concurrent.futures.ThreadPoolExecutor(self._options.parallel_num) as executor:
for result in executor.map(self._upload_part, self._iter_part(upload_ctx)):
self._update_upload_result(result)
else:
for part in self._iter_part(upload_ctx):
self._update_upload_result(self._upload_part(part))
if len(self._upload_errors) > 0:
break
# complete upload
cmresult: models.CompleteMultipartUploadResult = None
if len(self._upload_errors) == 0:
request = models.CompleteMultipartUploadRequest()
copy_request(request, self._reqeust)
parts = sorted(self._uploaded_parts, key=lambda p: p.part_number)
request.upload_id = upload_ctx.upload_id
request.complete_multipart_upload = models.CompleteMultipartUpload(parts=parts)
try:
cmresult = self._client.complete_multipart_upload(request)
except Exception as err:
self._upload_errors.append(err)
# check last error
if len(self._upload_errors) > 0:
if not self._options.leave_parts_on_error:
try:
abort_request = models.AbortMultipartUploadRequest()
abort_request.upload_id = upload_ctx.upload_id
copy_request(request, self._reqeust)
self._client.abort_multipart_upload(abort_request)
except Exception as _:
pass
raise self._wrap_error(upload_ctx.upload_id, self._upload_errors[-1])
self._assert_crc_same(cmresult.headers)
ret = UploadResult(
upload_id=upload_ctx.upload_id,
etag=cmresult.etag,
version_id=cmresult.version_id,
hash_crc64=cmresult.hash_crc64,
)
ret.status = cmresult.status
ret.status_code = cmresult.status_code
ret.request_id = cmresult.request_id
ret.headers = cmresult.headers
return ret
def _get_upload_context(self) -> _UploadContext:
if self._upload_id:
return _UploadContext(
upload_id=self._upload_id,
start_num=self._part_number - 1,
)
#if not exist or fail, create a new upload id
request = models.InitiateMultipartUploadRequest()
copy_request(request, self._reqeust)
if request.content_type is None:
request.content_type = self._get_content_type()
result = self._client.initiate_multipart_upload(request)
return _UploadContext(
upload_id=result.upload_id,
start_num=0,
)
def _iter_part(self, upload_ctx: _UploadContext):
start_part_num = upload_ctx.start_num
reader = self._reader
if self._reader_seekable:
reader = io_utils.ReadAtReader(reader)
def next_body():
n = self._options.part_size
if self._reader_seekable:
bytes_left = self._total_size - self._reader_pos
if bytes_left < n:
n = bytes_left
body = io_utils.SectionReader(reader, self._reader_pos, n)
else:
body = reader.read(n)
self._reader_pos += len(body)
return body
while len(self._upload_errors) == 0:
try:
body = next_body()
if len(body) == 0:
break
except Exception as err:
self._save_error(err)
break
start_part_num += 1
yield upload_ctx.upload_id, start_part_num, body
def _upload_part(self, part):
# When an error occurs, ignore other upload requests
if len(self._upload_errors) > 0:
return None
upload_id = part[0]
part_number = part[1]
body = part[2]
error: Exception = None
etag = None
size = len(body)
hash_crc64 = None
try:
result = self._client.upload_part(models.UploadPartRequest(
bucket=self._reqeust.bucket,
key=self._reqeust.key,
upload_id=upload_id,
part_number=part_number,
body=body,
request_payer=self._reqeust.request_payer
))
etag = result.etag
hash_crc64 = result.hash_crc64
except Exception as err:
error = err
return part_number, etag, error, hash_crc64, size
def _save_error(self, error) -> None:
if self._upload_part_lock:
with self._upload_part_lock:
self._upload_errors.append(error)
else:
self._upload_errors.append(error)
def _get_content_type(self) -> str:
if self._filepath is not None:
return utils.guess_content_type(self._filepath, 'application/octet-stream')
return None
def _iter_uploaded_part(self):
if self._upload_id is None:
return
try:
paginator = ListPartsPaginator(self._client)
iterator = paginator.iter_page(models.ListPartsRequest(
bucket=self._reqeust.bucket,
key=self._reqeust.key,
request_payer=self._reqeust.request_payer,
upload_id=self._upload_id,
))
check_part_number = 1
for page in iterator:
for part in page.parts:
if (part.part_number != check_part_number or
part.size != self._options.part_size):
return
yield part
check_part_number += 1
except Exception:
self._upload_id = None
def _update_upload_result(self, result):
#print(f'_update_upload_result: {result}')
if result is None:
return
if result[2] is not None:
self._save_error(result[2])
return
part_number = result[0]
etag = result[1]
hash_crc64 = result[3]
size = result[4]
self._uploaded_parts.append(models.UploadPart(part_number=part_number, etag=etag))
if self._check_crc and hash_crc64 is not None:
self._ccrc = Crc64.combine(self._ccrc, int(hash_crc64), size)
def _assert_crc_same(self, headers: MutableMapping):
if not self._check_crc:
return
scrc = headers.get('x-oss-hash-crc64ecma', None)
if scrc is None:
return
ccrc = str(self._ccrc)
if scrc != ccrc:
raise self._wrap_error(self._upload_id, exceptions.InconsistentError(client_crc=ccrc, server_crc=scrc))
def _wrap_error(self, upload_id: str, error: Exception) -> Exception:
return UploadError(
upload_id=upload_id,
path=f'oss://{self._reqeust.bucket}/{self._reqeust.key}',
error=error
)