Source code for alibabacloud_oss_v2.serde_utils

"""utils for api only"""

import hashlib
import base64
import json
from urllib.parse import unquote, quote
from typing import List, cast, Union, Dict
from .types import OperationInput, HttpResponse, OperationOutput
from . import serde
from . import utils
from . import exceptions
from . import progress
from . import crc
from .models import (
    ListObjectsResult,
    ListObjectsV2Result,
    ListObjectVersionsResult,
    CopyObjectRequest,
    UploadPartCopyRequest,
    DeleteMultipleObjectsRequest,
    DeleteMultipleObjectsResult,
    InitiateMultipartUploadResult,
    CompleteMultipartUploadResult,
    ListMultipartUploadsResult,
    ListPartsResult,
    ProcessObjectRequest,
    AsyncProcessObjectRequest    
)


[docs] def add_content_type(_: serde.Model, op_input: OperationInput) -> OperationInput: """ Add content-type based on the file suffix when it does not exist. """ if op_input.headers.get('Content-Type', None) is not None: return op_input op_input.headers.update({ 'Content-Type': utils.guess_content_type(op_input.key, "application/octet-stream") }) return op_input
[docs] def add_content_md5(_: serde.Model, op_input: OperationInput) -> OperationInput: """ Add content-md5 when it does not exist. """ if op_input.headers.get('Content-MD5', None) is not None: return op_input if op_input.body is None: md5 = '1B2M2Y8AsgTpgAmY7PhCfg==' elif isinstance(op_input.body, (str, bytes)): h = hashlib.md5() h.update(op_input.body) md5 = base64.b64encode(h.digest()).decode() else: raise exceptions.SerializationError( error=f'add_content_md5 fail, not support instance <{op_input.body.__class__}>') op_input.headers.update({'Content-MD5': md5}) return op_input
[docs] def add_progress(request: serde.Model, op_input: OperationInput) -> OperationInput: """ Add progress writer when progress_fn is set. """ fn = getattr(request, 'progress_fn', None) if fn is None: return op_input trackers = cast(List, op_input.op_metadata.get( 'opm-request-body-tracker', [])) p = progress.Progress( progress_fn=fn, total=utils.guess_content_length(op_input.body) ) trackers.append(p) op_input.op_metadata['opm-request-body-tracker'] = trackers return op_input
[docs] def add_crc_checker(_: serde.Model, op_input: OperationInput) -> OperationInput: """ Add crc writer and crc checker. """ trackers = cast(List, op_input.op_metadata.get( 'opm-request-body-tracker', [])) p = crc.Crc64(init_crc=0) trackers.append(p) op_input.op_metadata['opm-request-body-tracker'] = trackers handlers = cast(List, op_input.op_metadata.get( 'opm-response-handler', [])) def crc_checker_handler(response: HttpResponse): scrc = response.headers.get('x-oss-hash-crc64ecma', None) if scrc is None: return ccrc = str(p.sum64()) if scrc != ccrc: raise exceptions.InconsistentError( client_crc=ccrc, server_crc=scrc ) handlers.append(crc_checker_handler) op_input.op_metadata['opm-response-handler'] = handlers return op_input
[docs] def deserialize_encode_type(result: serde.Model, _: OperationOutput) -> serde.Model: """ do url decode """ if not hasattr(result, 'encoding_type'): raise exceptions.DeserializationError( error=f'{result.__class__} has not encoding_type attribute') if result.encoding_type is None or result.encoding_type != 'url': return result if isinstance(result, ListObjectsResult): # fields fields = ['prefix', 'marker', 'delimiter', 'next_marker'] # Contents.Key if isinstance(result.contents, List): for i, _ in enumerate(result.contents): result.contents[i].key = unquote(result.contents[i].key) # CommonPrefixes.Prefix if isinstance(result.common_prefixes, List): for i, _ in enumerate(result.common_prefixes): result.common_prefixes[i].prefix = unquote( result.common_prefixes[i].prefix) elif isinstance(result, ListObjectsV2Result): # fields fields = ['prefix', 'start_after', 'continuation_token', 'delimiter', 'next_continuation_token'] # Contents.Key if isinstance(result.contents, List): for i, _ in enumerate(result.contents): result.contents[i].key = unquote(result.contents[i].key) # CommonPrefixes.Prefix if isinstance(result.common_prefixes, List): for i, _ in enumerate(result.common_prefixes): result.common_prefixes[i].prefix = unquote( result.common_prefixes[i].prefix) elif isinstance(result, ListObjectVersionsResult): # fields fields = ['prefix', 'key_marker', 'delimiter', 'next_key_marker'] # Version.Key if isinstance(result.version, List): for i, _ in enumerate(result.version): result.version[i].key = unquote(result.version[i].key) # DeleteMarker.Key if isinstance(result.delete_marker, List): for i, _ in enumerate(result.delete_marker): result.delete_marker[i].key = unquote(result.delete_marker[i].key) # CommonPrefixes.Prefix if isinstance(result.common_prefixes, List): for i, _ in enumerate(result.common_prefixes): result.common_prefixes[i].prefix = unquote( result.common_prefixes[i].prefix) elif isinstance(result, DeleteMultipleObjectsResult): # fields fields = [] # deleted_objects.Key if isinstance(result.deleted_objects, List): for i, _ in enumerate(result.deleted_objects): result.deleted_objects[i].key = unquote(result.deleted_objects[i].key) elif isinstance(result, InitiateMultipartUploadResult): # fields fields = ['key'] elif isinstance(result, CompleteMultipartUploadResult): # fields fields = ['key'] elif isinstance(result, ListMultipartUploadsResult): # fields fields = ['key_marker', 'next_key_marker', 'prefix', 'delimiter'] # Upload.Key if isinstance(result.uploads, List): for i, _ in enumerate(result.uploads): result.uploads[i].key = unquote(result.uploads[i].key) elif isinstance(result, ListPartsResult): # fields fields = ['key'] else: fields = [] for field in fields: val = getattr(result, field) if val is not None: setattr(result, field, unquote(val)) return result
[docs] def encode_copy_source(request: Union[CopyObjectRequest, UploadPartCopyRequest]) -> str: """ encode copy source parameter """ source = f'/{request.source_bucket or request.bucket}/{quote(request.source_key)}' if request.source_version_id is not None: source += f'?versionId={request.source_version_id}' return source
[docs] def serialize_delete_objects(request: serde.Model, op_input: OperationInput) -> OperationInput: """ serialize to Delete XML string """ if not isinstance(request, DeleteMultipleObjectsRequest): raise exceptions.SerializationError(error=f'Unsupport type {type(request)}') xml = '<Delete>' if request.quiet is not None: xml += f'<Quiet>{"true" if request.quiet else "false"}</Quiet>' if isinstance(request.objects, List): for _, o in enumerate(request.objects): xml += '<Object>' key = utils.safety_str(o.key) if len(key) > 0: xml += f'<Key>{utils.escape_xml_value(key)}</Key>' vid = utils.safety_str(o.version_id) if len(vid) > 0: xml += f'<VersionId>{vid}</VersionId>' xml += '</Object>' xml += '</Delete>' op_input.body = xml.encode() return op_input
[docs] def add_process_action(request: serde.Model, op_input: OperationInput) -> OperationInput: """ Add process parameter to body. """ if not isinstance(request, ProcessObjectRequest) and not isinstance(request, AsyncProcessObjectRequest): raise exceptions.SerializationError(error=f'Unsupport type {type(request)}') attr_map = cast(Dict, getattr(request, '_attribute_map')) attr_desc = cast(Dict, attr_map.get('process')) key = attr_desc.get('rename', None) if key is None: raise exceptions.SerializationError(error='process filed is invalid') op_input.body = f'{key}={request.process}'.encode() return op_input
[docs] def deserialize_process_body(result: serde.Model, op_output: OperationOutput) -> serde.Model: """deserialize process body Args: result (Model): _description_ op_output (OperationOutput): _description_ Returns: Any: _description_ """ xml_data = op_output.http_response.content if xml_data is None or len(xml_data) == 0: return result jo = json.loads(xml_data) if not isinstance(jo, Dict): return result # parse json body attributes = cast(Dict, getattr(result, '_attribute_map')) for attr, attr_desc in attributes.items(): if attr_desc.get('tag', '') != 'json': continue attr_key = attr_desc.get('rename', attr) value = jo.get(attr_key, None) if value is not None: setattr(result, attr, value) return result