Source code for alibabacloud_oss_v2.copier

"""Copier for handling objects for uploads."""
# pylint: disable=line-too-long, broad-exception-caught
import abc
import copy
import datetime
import concurrent.futures
import threading
from typing import Any, Optional, IO, MutableMapping, List
from . import exceptions, HeadObjectResult, GetObjectTaggingResult
from . import models
from . import validation
from . import utils
from . import defaults
from .serde import copy_request
from .paginator import ListPartsPaginator

metadata_copied = {
    "content-type": None,
    "content-language": None,
    "content-encoding": None,
    "content-disposition": None,
    "cache-control": None,
    "expires": None,
}

[docs] class CopyAPIClient(abc.ABC): """Abstract base class for copier client."""
[docs] @abc.abstractmethod def copy_object(self, request: models.CopyObjectRequest, **kwargs) -> models.CopyObjectResult: """Copies objects."""
[docs] @abc.abstractmethod def head_object(self, request: models.HeadObjectRequest, **kwargs) -> models.HeadObjectResult: """Queries information about the object in a bucket."""
[docs] @abc.abstractmethod def initiate_multipart_upload(self, request: models.InitiateMultipartUploadRequest, **kwargs ) -> models.InitiateMultipartUploadResult: """ Initiates a multipart upload task before you can upload data in parts to Object Storage Service (OSS). """
[docs] @abc.abstractmethod def upload_part_copy(self, request: models.UploadPartCopyRequest, **kwargs) -> models.UploadPartCopyResult: """ You can call this operation to copy data from an existing object to upload a part by adding a x-oss-copy-request header to UploadPart. """
[docs] @abc.abstractmethod def complete_multipart_upload(self, request: models.CompleteMultipartUploadRequest, **kwargs ) -> models.CompleteMultipartUploadResult: """ Completes the multipart upload task of an object after all parts of the object are uploaded. """
[docs] @abc.abstractmethod def abort_multipart_upload(self, request: models.AbortMultipartUploadRequest, **kwargs ) -> models.AbortMultipartUploadResult: """ Cancels a multipart upload task and deletes the parts uploaded in the task. """
[docs] @abc.abstractmethod def list_parts(self, request: models.ListPartsRequest, **kwargs ) -> models.ListPartsResult: """ Lists all parts that are uploaded by using a specified upload ID. """
[docs] @abc.abstractmethod def get_object_tagging(self, request: models.GetObjectTaggingRequest, **kwargs ) -> models.GetObjectTaggingResult: """ You can call this operation to query the tags of an object. """
[docs] class CopierOptions: """Options for copier """ def __init__( self, part_size: Optional[int] = None, parallel_num: Optional[int] = None, multipart_copy_threshold: Optional[int] = None, leave_parts_on_error: Optional[bool] = None, disable_shallow_copy: Optional[bool] = None ) -> None: self.part_size = part_size self.parallel_num = parallel_num self.multipart_copy_threshold = multipart_copy_threshold self.leave_parts_on_error = leave_parts_on_error or False self.disable_shallow_copy = disable_shallow_copy or False
[docs] class CopyResult: """The result about the copy operation. """ def __init__( self, upload_id: Optional[str] = None, etag: Optional[str] = None, version_id: Optional[str] = None, hash_crc64: Optional[str] = None, ) -> None: self.upload_id = upload_id self.etag = etag self.version_id = version_id self.hash_crc64 = hash_crc64 self.status = '' self.status_code = 0 self.request_id = '' self.headers: MutableMapping[str, str] = {}
[docs] class CopyError(exceptions.BaseError): """ Copy Error. """ fmt = 'copy failed, {upload_id}, {path}, {error}.' def __init__(self, **kwargs): exceptions.BaseError.__init__(self, **kwargs) self._error = kwargs.get("error", None) self.upload_id = kwargs.get("upload_id", None) self.path = kwargs.get("path", None)
[docs] def unwrap(self) -> Exception: """returns the detail error""" return self._error
[docs] class Copier: """Copy for handling objects for uploads.""" def __init__( self, client: CopyAPIClient, **kwargs: Any ) -> None: """ client (CopyAPIClient): A agent that implements the CopyObject and Multipart Copy api. kwargs: Extra keyword arguments. - part_size (int, optional): The part size. Default value: 64 MiB. - parallel_num (int, optional): The number of the upload tasks in parallel. Default value: 3. - multipart_copy_threshold (int, optional): The minimum object size for calling the multipart copy operation. Default value: 200 MiB. - leave_parts_on_error (bool, optional): Specifies whether to retain the copied parts when an copy task fails. By default, the copied parts are not retained. - disable_shallow_copy (bool, optional): Specifies that the shallow copy capability is not used. By default, the shallow copy capability is used. """ part_size = kwargs.get('part_size', defaults.DEFAULT_COPY_PART_SIZE) parallel_num = kwargs.get('parallel_num', defaults.DEFAULT_COPY_PARALLEL) multipart_copy_threshold = kwargs.get('multipart_copy_threshold', defaults.DEFAULT_COPY_THRESHOLD) leave_parts_on_error = kwargs.get('leave_parts_on_error', False) disable_shallow_copy = kwargs.get('disable_shallow_copy', False) self._client = client self._options = CopierOptions( part_size=part_size, parallel_num=parallel_num, multipart_copy_threshold=multipart_copy_threshold, leave_parts_on_error=leave_parts_on_error, disable_shallow_copy=disable_shallow_copy, ) feature_flags = 0 cstr = str(client) if cstr == '<OssClient>': feature_flags = client._client._options.feature_flags self._feature_flags = feature_flags
[docs] def copy( self, request: models.CopyObjectRequest, **kwargs: Any ) -> CopyResult: """copy source object to destination object. Args: request (CopyObjectRequest): the request parameters for the copy operation. kwargs: Extra keyword arguments. - part_size (int, optional): The part size. Default value: 64 MiB. - parallel_num (int, optional): The number of the upload tasks in parallel. Default value: 3. - multipart_copy_threshold (int, optional): The minimum object size for calling the multipart copy operation. Default value: 200 MiB. - leave_parts_on_error (bool, optional): Specifies whether to retain the copied parts when an copy task fails. By default, the copied parts are not retained. - disable_shallow_copy (bool, optional): Specifies that the shallow copy capability is not used. By default, the shallow copy capability is used. Returns: CopyResult: The result for the copy operation. """ delegate = self._delegate(request, **kwargs) delegate.check_source() delegate.apply_source() return delegate.copy()
def _delegate( self, request: models.CopyObjectRequest, **kwargs: Any ) -> "_CopierDelegate": if not validation.is_valid_bucket_name(utils.safety_str(request.bucket)): raise exceptions.ParamInvalidError(field='request.bucket') if not validation.is_valid_object_name(utils.safety_str(request.key)): raise exceptions.ParamInvalidError(field='request.key') if not validation.is_valid_object_name(utils.safety_str(request.source_key)): raise exceptions.ParamInvalidError(field='request.source_key') options = copy.copy(self._options) options.part_size = kwargs.get('part_size', self._options.part_size) options.parallel_num = kwargs.get('parallel_num', self._options.parallel_num) options.multipart_copy_threshold = kwargs.get('multipart_copy_threshold', self._options.multipart_copy_threshold) options.leave_parts_on_error = kwargs.get('leave_parts_on_error', self._options.leave_parts_on_error) options.disable_shallow_copy = kwargs.get('disable_shallow_copy', self._options.disable_shallow_copy) if options.part_size <= 0: options.part_size = defaults.DEFAULT_COPY_PART_SIZE if options.parallel_num <= 0: options.parallel_num = defaults.DEFAULT_COPY_PARALLEL if options.multipart_copy_threshold <= 0: options.multipart_copy_threshold = defaults.DEFAULT_COPY_THRESHOLD delegate = _CopierDelegate( base=self, client=self._client, request=request, options=options, metadata_prop=kwargs.get('metadata_properties', None), tag_prop=kwargs.get('tag_properties', None) ) return delegate
class _CopierDelegate: def __init__( self, base: Copier, client: CopyAPIClient, request: models.CopyObjectRequest, options: CopierOptions, metadata_prop: Optional[HeadObjectResult] = None, tag_prop: Optional[GetObjectTaggingResult] = None, ) -> None: """ """ self._base = base self._client = client self._request = request self._options = options self._reader_pos = 0 self._total_size = 0 self._transferred = 0 parallel = options.parallel_num > 1 and self._request.progress_fn is not None self._progress_lock = threading.Lock() if parallel else None #Source's Info self._metadata_prop = metadata_prop self._tag_prop = tag_prop #use mulitpart upload self._copy_part_lock = None self._copy_errors = [] self._copy_parts = [] # upload info self._upload_id = '' def check_source(self): """ """ if self._metadata_prop is not None: return request = models.HeadObjectRequest() copy_request(request, self._request) if self._request.source_bucket is not None: request.bucket = self._request.source_bucket request.key = self._request.source_key request.version_id = self._request.source_version_id result = self._client.head_object(request) self._metadata_prop = result def apply_source(self): """ """ total_size = self._metadata_prop.content_length if total_size is None: total_size = -1 self._total_size = total_size def can_use_shallow_copy(self): if self._options.disable_shallow_copy: return False if self._request.storage_class is not None: return False if self._request.source_bucket is not None and self._request.source_bucket != self._request.bucket: return False if self._metadata_prop.server_side_encryption is not None: return False return True def copy(self) -> CopyResult: """copy object """ try: if self._total_size <= self._options.multipart_copy_threshold: return self._single_copy() elif self.can_use_shallow_copy(): return self._shallow_copy() return self._multipart_copy() except Exception as err: raise self._wrap_error(self._upload_id, err) def _single_copy(self) -> CopyResult: result = self._client.copy_object(self._request) self._update_progress(self._total_size) ret = CopyResult( etag=result.etag, version_id=result.version_id, hash_crc64=result.hash_crc64, ) ret.status = result.status ret.status_code = result.status_code ret.request_id = result.request_id ret.headers = result.headers return ret def _shallow_copy(self) -> (CopyResult): # use signle copy first, if meets timeout, use multiCopy starttime = datetime.datetime.now() try: result = self._client.copy_object(self._request, readwrite_timeout=10, operation_timeout=30) except Exception as err: if (datetime.datetime.now() > starttime + datetime.timedelta(seconds=30)): return self._multipart_copy() raise self._update_progress(self._total_size) ret = CopyResult( etag=result.etag, version_id=result.version_id, hash_crc64=result.hash_crc64, ) ret.status = result.status ret.status_code = result.status_code ret.request_id = result.request_id ret.headers = result.headers return ret def _multipart_copy(self) -> CopyResult: # get tag prop if nesssessary self._get_tag_props() # init the multipart self._init_upload() # upload part part_size = self._options.part_size while self._total_size/part_size >= defaults.MAX_UPLOAD_PARTS: part_size += self._options.part_size self._options.part_size = part_size parallel = self._options.parallel_num > 1 if parallel: self._copy_part_lock = threading.Lock() with concurrent.futures.ThreadPoolExecutor(self._options.parallel_num) as executor: for result in executor.map(self._copy_part, self._iter_part()): self._update_upload_result_lock(result) else: for part in self._iter_part(): self._update_upload_result_lock(self._copy_part(part)) if len(self._copy_errors) > 0: break # complete upload cmresult: models.CompleteMultipartUploadResult = None if len(self._copy_errors) == 0: request = models.CompleteMultipartUploadRequest() copy_request(request, self._request) parts = sorted(self._copy_parts, key=lambda p: p.part_number) request.upload_id = self._upload_id request.complete_multipart_upload = models.CompleteMultipartUpload(parts=parts) try: cmresult = self._client.complete_multipart_upload(request) except Exception as err: self._copy_errors.append(err) # check last error if len(self._copy_errors) > 0: if not self._options.leave_parts_on_error: try: abort_request = models.AbortMultipartUploadRequest() copy_request(abort_request, self._request) abort_request.upload_id = self._upload_id self._client.abort_multipart_upload(abort_request) except Exception as _: pass raise self._copy_errors[-1] self._assert_crc_same(cmresult.headers) ret = CopyResult( upload_id=self._upload_id, etag=cmresult.etag, version_id=cmresult.version_id, hash_crc64=cmresult.hash_crc64, ) ret.status = cmresult.status ret.status_code = cmresult.status_code ret.request_id = cmresult.request_id ret.headers = cmresult.headers return ret def _get_tag_props(self): if self._tag_prop is not None: return if utils.safety_int(self._metadata_prop.tagging_count) <= 0: return # if directive is copy, get tags directive = utils.safety_str(self._request.tagging_directive) if directive == "" or directive.lower() == "copy": request = models.GetObjectTaggingRequest() copy_request(request, self._request) if self._request.source_bucket is not None: request.bucket = self._request.source_bucket request.key = self._request.source_key request.version_id = self._request.source_version_id result = self._client.get_object_tagging(request) self._tag_prop = result def _init_upload(self): request = models.InitiateMultipartUploadRequest() copy_request(request, self._request) self.overwrite_metadata_prop(request) self.overwrite_tag_prop(request) request.disable_auto_detect_mime_type = True result = self._client.initiate_multipart_upload(request) self._upload_id = result.upload_id def _iter_part(self): start_part_num = 0 upload_id = self._upload_id # timeout for MultiPartCopy API # 10s per 200M, max timeout is 50s PART_SIZE = 200 * 1024 * 1024 STEP = 20 timeout = defaults.DEFAULT_READWRITE_TIMEOUT part_size = self._options.part_size while part_size > 200 * 1024 * 1024: timeout += STEP part_size -= PART_SIZE if timeout > 50: break while len(self._copy_errors) == 0: n = self._options.part_size bytes_left = self._total_size - self._reader_pos if bytes_left <= 0: break if bytes_left < n: n = bytes_left range_end = self._reader_pos + n - 1 range = f'bytes={self._reader_pos}-{range_end}' self._reader_pos += n start_part_num += 1 yield upload_id, start_part_num, range, timeout, n def _copy_part(self, part): # When an error occurs, ignore other upload requests if len(self._copy_errors) > 0: return None upload_id = part[0] part_number = part[1] range = part[2] timeout = part[3] part_size = part[4] error: Exception = None etag = None try: request = models.UploadPartCopyRequest() copy_request(request, self._request) request.part_number = part_number request.upload_id = upload_id request.source_range = range result = self._client.upload_part_copy(request, readwrite_timeout=timeout) etag = result.etag self._update_progress(part_size) except Exception as err: error = err return part_number, etag, error def _update_progress(self, increment: int): if self._request.progress_fn is None: return if self._progress_lock: with self._progress_lock: self._transferred += increment self._request.progress_fn(increment, self._transferred, self._total_size) else: self._transferred += increment self._request.progress_fn(increment, self._transferred, self._total_size) def _update_upload_result_lock(self, result) -> None: if self._copy_part_lock: with self._copy_part_lock: self._update_upload_result(result) else: self._update_upload_result(result) def _update_upload_result(self, result): if result is None: return if result[2] is not None: self._copy_errors.append(result[2]) return part_number = result[0] etag = result[1] self._copy_parts.append(models.UploadPart(part_number=part_number, etag=etag)) def overwrite_metadata_prop(self, im_request: models.InitiateMultipartUploadRequest): directive = utils.safety_str(self._request.metadata_directive).lower() if directive in ["", "copy"]: if self._metadata_prop is None: return Exception(f"request.metadata_directive is COPY, but meets nil metadata_prop for source") im_request.cache_control = None im_request.content_type = None im_request.content_disposition = None im_request.content_encoding = None im_request.expires = None im_request.metadata = None im_request.headers = {} # copy meta form source for k, v in self._metadata_prop.headers.items(): low_k = k.lower() if low_k.startswith("x-oss-meta"): im_request.headers[k] = v elif low_k in metadata_copied: im_request.headers[k] = v elif directive == "replace": # the metadata has been copied via the copyRequest function before pass else: return Exception(f"Unsupported metadata_directive, {self._request.metadata_directive}") def overwrite_tag_prop(self, im_request: models.InitiateMultipartUploadRequest): directive = utils.safety_str(self._request.tagging_directive).lower() if directive in ["", "copy"]: if self._tag_prop is not None and self._tag_prop.tag_set is not None: tags = [] for t in self._tag_prop.tag_set.tags: tags.append(f"{str(t.key)}={str(t.value)}") if tags: im_request.tagging = '&'.join(tags) elif directive == "replace": # the metadata has been copied via the copyRequest function before pass else: return Exception(f"Unsupported tagging_directive, {self._request.tagging_directive}") def _assert_crc_same(self, headers: MutableMapping): scrc = headers.get('x-oss-hash-crc64ecma', None) if scrc is None: return ccrc = self._metadata_prop.hash_crc64 if ccrc is None: return if scrc != ccrc: raise exceptions.InconsistentError(client_crc=ccrc, server_crc=scrc) def _wrap_error(self, upload_id: str, error: Exception) -> Exception: return CopyError( upload_id=upload_id, path=f'oss://{self._request.bucket}/{self._request.key}', error=error )