Source code for alibabacloud_oss_v2.downloader

"""Downloader for handling objects for downloads."""
import abc
import copy
import os
import concurrent.futures
import threading
from typing import Iterator, Any, Optional, IO
from . import exceptions
from . import models
from . import validation
from . import utils
from . import io_utils
from . import defaults
from .serde import copy_request
from .checkpoint import DownloadCheckpoint
from .crc import Crc64

[docs] class DownloadAPIClient(abc.ABC): """Abstract base class for downloader client."""
[docs] @abc.abstractmethod def head_object(self, request: models.HeadObjectRequest, **kwargs) -> models.HeadObjectResult: """Queries information about the object in a bucket."""
[docs] @abc.abstractmethod def get_object(self, request: models.GetObjectRequest, **kwargs) -> models.GetObjectResult: """ Queries an object. To call this operation, you must have read permissions on the object. """
[docs] class DownloaderOptions: """Options for downloader """ def __init__( self, part_size: Optional[int] = None, parallel_num: Optional[int] = None, block_size: Optional[int] = None, use_temp_file: Optional[bool] = None, enable_checkpoint: Optional[bool] = None, checkpoint_dir: Optional[str] = None, verify_data: Optional[bool] = None, ) -> None: """ part_size (int, optional): The part size. Default value: 6 MiB. parallel_num (int, optional): The number of the download tasks in parallel. Default value: 3. block_size (int, optional): The block size is the number of bytes it should read into memory. Default value: 16 KiB. use_temp_file (bool, optional): Specifies whether to use a temporary file when you download an object. A temporary file is used by default. The object is downloaded to the temporary file. Then, the temporary file is renamed and uses the same name as the object that you want to download. enable_checkpoint (bool, optional): Specifies whether to record the download progress in the checkpoint file. By default, no download progress is recorded. checkpoint_dir (str, optional): The path in which the checkpoint file is stored. Example: /local/dir/. This parameter is valid only if enable_checkpoint is set to true. verify_data (bool, optional): Specifies whether to verify the CRC-64 of the downloaded object when the download is resumed. By default, the CRC-64 is not verified. This parameter is valid only if enable_checkpoint is set to true. """ self.part_size = part_size self.parallel_num = parallel_num self.block_size = block_size self.use_temp_file = use_temp_file or False self.enable_checkpoint = enable_checkpoint or False self.checkpoint_dir = checkpoint_dir self.verify_data = verify_data
[docs] class DownloadResult: """The result about the download operation. """ def __init__( self, written: Optional[int], ) -> None: """ written (int, optional): The size of the downloaded data, in bytes. """ self.written = written
[docs] class DownloadError(exceptions.BaseError): """ Download Error. """ fmt = 'download failed, {path}, {error}.' def __init__(self, **kwargs): exceptions.BaseError.__init__(self, **kwargs) self._error = kwargs.get("error", None) self.path = kwargs.get("path", None)
[docs] def unwrap(self) -> Exception: """returns the detail error""" return self._error
[docs] class Downloader: """Downloader for handling objects for downloads.""" def __init__( self, client: DownloadAPIClient, **kwargs: Any ) -> None: """ client (DownloadAPIClient): A agent that implements the HeadObject and GetObject api. kwargs: Extra keyword arguments used to initialize the downloader. - part_size (int): The part size. Default value: 6 MiB. - parallel_num (int): The number of the download tasks in parallel. Default value: 3. - block_size (int): The block size is the number of bytes it should read into memory. Default value: 16 KiB. - use_temp_file (bool): Whether to use a temporary file when you download an object. A temporary file is used by default. - enable_checkpoint (bool): Whether to enable checkpoint. Defaults to False. - checkpoint_dir (str): The directory to store checkpoint. - verify_data (bool): Whether to verify data when the download is resumed. Defaults to False. """ part_size = kwargs.get('part_size', defaults.DEFAULT_DOWNLOAD_PART_SIZE) parallel_num = kwargs.get('parallel_num', defaults.DEFAULT_DOWNLOAD_PARALLEL) self._client = client self._options = DownloaderOptions( part_size=part_size, parallel_num=parallel_num, block_size=kwargs.get('block_size', None), use_temp_file=kwargs.get('use_temp_file', None), enable_checkpoint=kwargs.get('enable_checkpoint', None), checkpoint_dir=kwargs.get('checkpoint_dir', None), verify_data=kwargs.get('verify_data', None), ) feature_flags = 0 cstr = str(client) if cstr == '<OssClient>': feature_flags = client._client._options.feature_flags elif cstr == '<OssEncryptionClient>': feature_flags = client.unwrap()._client._options.feature_flags self._feature_flags = feature_flags
[docs] def download_file( self, request: models.GetObjectRequest, filepath: str, **kwargs: Any ) -> DownloadResult: """Downloads an object into a local file. Args: request (models.GetObjectRequest): the request parameters for the download operation. filepath (str): The path of a local file. kwargs: Extra keyword arguments. - part_size (int): The part size. - parallel_num (int): The number of the download tasks in parallel. - block_size (int): The block size is the number of bytes it should read into memory. - use_temp_file (bool): Whether to use a temporary file when you download an object. - enable_checkpoint (bool): Whether to enable checkpoint. - checkpoint_dir (str): The directory to store checkpoint. - verify_data (bool): Whether to verify data when the download is resumed. Returns: DownloadResult: The result for the download operation. """ delegate = self._delegate(request, **kwargs) delegate.check_source() delegate.check_destination(filepath) delegate.adjust_range() delegate.check_checkpoint() with open(delegate.writer_filepath, 'ab') as _: pass with open(delegate.writer_filepath, 'rb+') as writer: delegate.adjust_writer(writer) delegate.update_crc_flag() result = delegate.download() delegate.close_writer(writer) return result
[docs] def download_to( self, request: models.GetObjectRequest, writer: IO[bytes], **kwargs: Any ) -> DownloadResult: """Downloads an object into a stream. Args: request (models.GetObjectRequest): the request parameters for the download operation. writer (IO[bytes]): writes the data into writer kwargs: Extra keyword arguments. - part_size (int): The part size. - parallel_num (int): The number of the download tasks in parallel. - block_size (int): The block size is the number of bytes it should read into memory. Returns: DownloadResult: The result for the download operation. """ delegate = self._delegate(request, **kwargs) delegate.check_source() delegate.adjust_range() delegate.adjust_writer(writer) result = delegate.download() return result
def _delegate( self, request: models.GetObjectRequest, **kwargs: Any ) -> "_DownloaderDelegate": if request is None: raise exceptions.ParamNullError(field='request') if not validation.is_valid_bucket_name(utils.safety_str(request.bucket)): raise exceptions.ParamInvalidError(field='request.bucket') if not validation.is_valid_object_name(utils.safety_str(request.key)): raise exceptions.ParamInvalidError(field='request.key') if request.range_header and not validation.is_valid_range(request.range_header): raise exceptions.ParamNullError(field='request.range_header') options = copy.copy(self._options) options.part_size = kwargs.get('part_size', self._options.part_size) options.parallel_num = kwargs.get('parallel_num', self._options.parallel_num) options.block_size = kwargs.get('block_size', self._options.block_size) options.use_temp_file = kwargs.get('use_temp_file', self._options.use_temp_file) options.enable_checkpoint = kwargs.get('enable_checkpoint', self._options.enable_checkpoint) options.checkpoint_dir = kwargs.get('checkpoint_dir', self._options.checkpoint_dir) options.verify_data = kwargs.get('verify_data', self._options.verify_data) if options.part_size <= 0: options.part_size = defaults.DEFAULT_DOWNLOAD_PART_SIZE if options.parallel_num <= 0: options.parallel_num = defaults.DEFAULT_DOWNLOAD_PARALLEL delegate = _DownloaderDelegate( base=self, client=self._client, request=request, options=options ) return delegate
class _DownloaderDelegate: def __init__( self, base: Downloader, client: DownloadAPIClient, request: models.GetObjectRequest, options: DownloaderOptions, ) -> None: """ """ self._base = base self._client = client self._reqeust = request self._options = options self._rstart = 0 self._pos = 0 self._epos = 0 self._written = 0 parallel = options.parallel_num > 1 self._writer = None self._writer_lock = threading.Lock() if parallel else None self._progress_lock = threading.Lock() if parallel else None #Source's Info self._size_in_bytes = None self._modtime = None self._etag = None self._headers = None #Destination's Info self._filepath = None self._temp_filepath = None #CRC self._calc_crc = False self._check_crc = False self._ccrc = 0 self._next_offset = 0 #checkpoint self._checkpoint: DownloadCheckpoint = None #use mulitpart download self._download_errors = [] @property def writer_filepath(self) -> str: """writer filepath """ return self._temp_filepath def check_source(self): """check source """ request = models.HeadObjectRequest(self._reqeust.bucket, self._reqeust.key) copy_request(request, self._reqeust) result = self._client.head_object(request) self._size_in_bytes = result.content_length self._modtime = result.last_modified self._etag = result.etag self._headers = result.headers def check_destination(self, filepath: str): """check destination """ if len(utils.safety_str(filepath)) == 0: raise exceptions.ParamInvalidError(field='filepath') absfilepath = os.path.abspath(filepath) tempfilepath = absfilepath if self._options.use_temp_file: tempfilepath += defaults.DEFAULT_TEMP_FILE_SUFFIX self._filepath = absfilepath self._temp_filepath = tempfilepath def adjust_range(self): """adjust range """ self._pos = 0 self._rstart = 0 self._epos = self._size_in_bytes if self._reqeust.range_header is not None: range_header = utils.parse_http_range(self._reqeust.range_header) if range_header[0] >= self._size_in_bytes: raise ValueError(f'invalid range, size :{self._size_in_bytes}, range: {self._reqeust.range_header}') if range_header[0] > 0: self._pos = range_header[0] self._rstart = self._pos if range_header[1] > 0: self._epos = min(range_header[1] + 1, self._size_in_bytes) def check_checkpoint(self): """check checkpoint """ if not self._options.enable_checkpoint: return checkpoint = DownloadCheckpoint( request=self._reqeust, filepath=self._temp_filepath, basedir=self._options.checkpoint_dir, headers=self._headers, part_size=self._options.part_size) checkpoint.verify_data = self._options.verify_data checkpoint.load() if checkpoint.loaded: self._pos = checkpoint.doffset self._written = self._pos - self._rstart else: checkpoint.doffset = self._pos self._checkpoint = checkpoint #crc self._ccrc = checkpoint.dcrc64 self._next_offset = checkpoint.doffset def adjust_writer(self, writer:IO[bytes]): """adjust writer Args: writer (_type_): _description_ """ try: writer.truncate(self._pos - self._rstart) except OSError: pass self._writer = writer def close_writer(self, writer:IO[bytes]): """close writer Args: writer (_type_): _description_ """ if writer: writer.close() if self._temp_filepath != self._filepath: io_utils.rename_file(self._temp_filepath, self._filepath) if self._checkpoint: self._checkpoint.remove() self._writer = None self._checkpoint = None def update_crc_flag(self): """update crc flag """ #FF_ENABLE_CRC64_CHECK_DOWNLOAD if (self._base._feature_flags & 0x00000010) > 0: self._check_crc = self._reqeust.range_header is None self._calc_crc = (self._checkpoint is not None and self._checkpoint.verify_data) or self._check_crc def download(self) -> DownloadResult: """Breakpoint download """ parallel = self._options.parallel_num > 1 seekable = utils.is_seekable(self._writer) if not seekable: parallel = False if self._epos - self._pos <= self._options.part_size: parallel = False if parallel: with concurrent.futures.ThreadPoolExecutor(self._options.parallel_num) as executor: for result in executor.map(self._process_part, self._iter_part_start()): self._update_process_result(result) else: if seekable: self._writer.seek(self._pos - self._rstart, os.SEEK_SET) for start in self._iter_part_start(): self._update_process_result(self._process_part(start)) if len(self._download_errors) > 0: break if len(self._download_errors) > 0: raise self._wrap_error(self._download_errors[-1]) self._assert_crc_same() return DownloadResult(written=self._written) def _iter_part_start(self) -> Iterator[int]: start = self._pos while start < self._epos: yield start start += self._options.part_size # When an error occurs, stop download if len(self._download_errors) > 0: break def _calc_part_size(self, start:int): if start + self._options.part_size > self._epos: size = self._epos - start else: size = self._options.part_size return size def _process_part(self, start:int): # When an error occurs, ignore other download requests if len(self._download_errors) > 0: return None size = self._calc_part_size(start) request = copy.copy(self._reqeust) got = 0 error: Exception = None chash: Crc64 = None if self._calc_crc: chash = Crc64(0) while True: request.range_header = f'bytes={start + got}-{start + size - 1}' request.range_behavior = 'standard' try: result = self._client.get_object(request) except Exception as err: error = err break kwargs = {} if self._options.block_size: kwargs['block_size'] = self._options.block_size try: gotlen = 0 for d in result.body.iter_bytes(**kwargs): l = len(d) if l > 0: self._write_to_stream(d, start + got) self._update_progress(l) got += l gotlen += l if chash: chash.update(d) if result.content_length is not None and gotlen < result.content_length: if not result.body.is_closed: result.body.close() continue break except Exception: pass return start, got, error, (chash.sum64() if chash else 0) def _write_to_stream(self, data, start): if self._writer_lock: with self._writer_lock: self._writer.seek(start - self._rstart) self._writer.write(data) else: self._writer.write(data) def _update_progress(self, increment: int): if self._progress_lock: with self._progress_lock: self._written += increment if self._reqeust.progress_fn is not None: self._reqeust.progress_fn(increment, self._written, self._size_in_bytes) else: self._written += increment if self._reqeust.progress_fn is not None: self._reqeust.progress_fn(increment, self._written, self._size_in_bytes) #print(f'_update_progress: {increment}, {self._written}, {self._size_in_bytes}\n') def _update_process_result(self, result): #print(f'_update_process_result: {result}') if result is None: return if result[2] is not None: self._download_errors.append(result[2]) return start = result[0] size = result[1] crc = result[3] if self._next_offset != start: if len(self._download_errors) == 0: self._download_errors.append( ValueError(f'out of order, expect offset {self._next_offset}, but got {start}')) if len(self._download_errors) > 0: return self._next_offset = start + size if self._check_crc: self._ccrc = Crc64.combine(self._ccrc, crc, size) if self._checkpoint: self._checkpoint.dcrc64 = self._ccrc self._checkpoint.doffset = self._next_offset self._checkpoint.dump() def _assert_crc_same(self): if not self._check_crc: return scrc = self._headers.get('x-oss-hash-crc64ecma', None) if scrc is None: return ccrc = str(self._ccrc) if scrc != ccrc: raise self._wrap_error(exceptions.InconsistentError(client_crc=ccrc, server_crc=scrc)) def _wrap_error(self, error: Exception) -> Exception: return DownloadError( path=f'oss://{self._reqeust.bucket}/{self._reqeust.key}', error=error )