Source code for alibabacloud_oss_v2.encryption_client

# pylint: disable=line-too-long
"""Encryption Client"""
import copy
import base64
from typing import MutableMapping, List, Optional, cast
from .types import StreamBody, CaseInsensitiveDict
from .client import Client
from . import models
from . import exceptions
from . import utils
from . import io_utils
from .crypto import MasterCipher, Envelope, ContentCipherBuilder, ContentCipher, CipherData
from .crypto.aes_ctr_cipher import AESCtrCipherBuilder
from .crypto.aes_ctr import _BLOCK_SIZE_LEN

[docs] class EncryptionMultiPartContext: """EncryptionMultiPartContext save encryption or decryption information """ def __init__( self, content_cipher: ContentCipher, data_size: int, part_size: int, ) -> None: self.content_cipher = content_cipher self.data_size = data_size self.part_size = part_size
[docs] def is_valid(self) -> bool: """is valid """ if (self.content_cipher is None or self.data_size == 0 or self.part_size == 0): return False return True
[docs] class EncryptionClient: """Encryption Client """ def __init__( self, client: Client, master_cipher: MasterCipher, decrypt_master_ciphers: Optional[List[MasterCipher]] = None, ) -> None: self._client = client self._master_cipher = master_cipher self._defualt_ccbuilder = AESCtrCipherBuilder(master_cipher) self._decrypt_master_ciphers = decrypt_master_ciphers or [] self._ccbuilders = {} for mc in self._decrypt_master_ciphers: mat_desc = mc.get_mat_desc() or '' if len(mat_desc) > 0: self._ccbuilders[mat_desc] = AESCtrCipherBuilder(mc)
[docs] def unwrap(self) -> Client: """unwrap Returns: Client: _description_ """ return self._client
def __repr__(self) -> str: return "<OssEncryptionClient>" # object
[docs] def put_object(self, request: models.PutObjectRequest, **kwargs ) -> models.PutObjectResult: """ Uploads objects. Args: request (PutObjectRequest): Request parameters for PutObject operation. Returns: PutObjectResult: Reponse result for PutObject operation. """ return self._put_object_securely(request, **kwargs)
[docs] def get_object(self, request: models.GetObjectRequest, **kwargs ) -> models.GetObjectResult: """ Queries an object. To call this operation, you must have read permissions on the object. Args: request (GetObjectRequest): Request parameters for GetObject operation. Returns: GetObjectResult: Reponse result for GetObject operation. """ return self._get_object_securely(request, **kwargs)
[docs] def head_object(self, request: models.HeadObjectRequest, **kwargs ) -> models.HeadObjectResult: """ Queries information about the object in a bucket. Args: request (HeadObjectRequest): Request parameters for HeadObject operation. Returns: HeadObjectResult: Reponse result for HeadObject operation. """ return self._client.head_object(request, **kwargs)
[docs] def initiate_multipart_upload(self, request: models.InitiateMultipartUploadRequest, **kwargs ) -> models.InitiateMultipartUploadResult: """ Initiates a multipart upload task before you can upload data in parts to Object Storage Service (OSS). Args: request (InitiateMultipartUploadRequest): Request parameters for InitiateMultipartUpload operation. Returns: InitiateMultipartUploadResult: Reponse result for InitiateMultipartUpload operation. """ return self._initiate_multipart_upload_securely(request, **kwargs)
[docs] def upload_part(self, request: models.UploadPartRequest, **kwargs ) -> models.UploadPartResult: """ Call the UploadPart interface to upload data in blocks (parts) based on the specified Object name and uploadId. Args: request (UploadPartRequest): Request parameters for UploadPart operation. Returns: UploadPartResult: Reponse result for UploadPart operation. """ return self._upload_part_securely(request, **kwargs)
[docs] def complete_multipart_upload(self, request: models.CompleteMultipartUploadRequest, **kwargs ) -> models.CompleteMultipartUploadResult: """ Completes the multipart upload task of an object after all parts of the object are uploaded. Args: request (CompleteMultipartUploadRequest): Request parameters for CompleteMultipartUpload operation. Returns: CompleteMultipartUploadResult: Reponse result for CompleteMultipartUpload operation. """ return self._client.complete_multipart_upload(request, **kwargs)
[docs] def abort_multipart_upload(self, request: models.AbortMultipartUploadRequest, **kwargs ) -> models.AbortMultipartUploadResult: """ Cancels a multipart upload task and deletes the parts uploaded in the task. Args: request (AbortMultipartUploadRequest): Request parameters for AbortMultipartUpload operation. Returns: AbortMultipartUploadResult: Reponse result for AbortMultipartUpload operation. """ return self._client.abort_multipart_upload(request, **kwargs)
[docs] def list_parts(self, request: models.ListPartsRequest, **kwargs ) -> models.ListPartsResult: """ Lists all parts that are uploaded by using a specified upload ID. Args: request (ListPartsRequest): Request parameters for ListParts operation. Returns: ListPartsResult: Reponse result for ListParts operation. """ return self._client.list_parts(request, **kwargs)
def _get_ccbuilder(self, envelope: Envelope ) -> ContentCipherBuilder: return self._ccbuilders.get(envelope.mat_desc or '', self._defualt_ccbuilder) def _put_object_securely(self, request: models.PutObjectRequest, **kwargs ) -> models.PutObjectResult: cc = self._defualt_ccbuilder.content_cipher() body = cc.encrypt_content(request.body) erequest = copy.copy(request) erequest.body = body _add_crypto_header_putobject(erequest, cc.get_cipher_data()) return self._client.put_object(erequest, **kwargs) def _get_object_securely(self, request: models.GetObjectRequest, **kwargs ) -> models.GetObjectResult: adjust_range_start = 0 discard_count = 0 erequest = request if request.range_header is not None: http_range = utils.parse_http_range(request.range_header) adjust_range_start = _adjust_range_start(http_range[0]) discard_count = http_range[0] - adjust_range_start if discard_count > 0: erequest = copy.copy(request) range_start = str(adjust_range_start) if adjust_range_start >= 0 else '' range_end = str(http_range[1]) if http_range[1] >= 0 else '' erequest.range_header = f'bytes={range_start}-{range_end}' erequest.range_behavior = 'standard' result = self._client.get_object(erequest, **kwargs) try: if _has_encrypted_header(result.headers): envelope = _get_envelope_from_header(result.headers) if not _is_valid_content_alg(envelope.cek_algorithm or ''): raise exceptions.ParamInvalidError(field='envelope.cek_algorithm') if not envelope.is_valid(): raise exceptions.ParamInvalidError(field='envelope') cc = self._get_ccbuilder(envelope).content_cipher_from_env(envelope, offset=adjust_range_start) result.body = cast(StreamBody, cc.decrypt_content(result.body)) except Exception as err: if result.body is not None: result.body.close() raise err if discard_count > 0: #rewrite ContentRange & ContentLength if result.content_range is not None: crange = utils.parse_content_range(result.content_range) value = f'bytes {crange[0] + discard_count}-{crange[1]}/{crange[2]}' result.content_range = value result.headers.update({"Content-Range": value}) else: result.headers.update({"Content-Range": f'bytes {discard_count}-/*'}) if result.content_length is not None: result.content_length -= discard_count result.headers.update({"Content-Length": str(result.content_length)}) result.body = io_utils.StreamBodyDiscarder(result.body, discard_count) return result def _initiate_multipart_upload_securely(self, request: models.InitiateMultipartUploadRequest, **kwargs ) -> models.InitiateMultipartUploadResult: _valid_encryption_context(request) erequest = copy.copy(request) cc = self._defualt_ccbuilder.content_cipher() _add_crypto_header_initpart(erequest, cc.get_cipher_data()) result = self._client.initiate_multipart_upload(erequest, **kwargs) result.cse_multipart_context = EncryptionMultiPartContext( content_cipher=cc, part_size= utils.safety_int(request.cse_part_size), data_size= utils.safety_int(request.cse_data_size), ) return result def _upload_part_securely(self, request: models.UploadPartRequest, **kwargs ) -> models.UploadPartResult: cse_context = request.cse_multipart_context if cse_context is None: raise exceptions.ParamNullError( field='request.cse_multipart_context') if (not isinstance(cse_context, EncryptionMultiPartContext) or not request.cse_multipart_context.is_valid()): raise exceptions.ParamInvalidError( field='request.cse_multipart_context') if cse_context.part_size % _BLOCK_SIZE_LEN != 0: raise ValueError(f'EncryptionMultiPartContext.part_size must be aligned to {_BLOCK_SIZE_LEN}') offset = 0 if request.part_number > 1: offset = (request.part_number - 1) * cse_context.part_size cc = cse_context.content_cipher.clone(offset=offset) erequest = copy.copy(request) erequest.body = cc.encrypt_content(request.body) _add_crypto_header_uploadpart(erequest, cse_context, cc.get_cipher_data()) return self._client.upload_part(erequest, **kwargs)
def _add_crypto_common_header(headers: MutableMapping, cipher_data: CipherData) -> None: # mat desc if len(cipher_data.mat_desc) > 0: headers['x-oss-meta-client-side-encryption-matdesc'] = cipher_data.mat_desc # encrypted key value = base64.b64encode(cipher_data.encrypted_key) headers['x-oss-meta-client-side-encryption-key'] = value.decode() # encrypted iv value = base64.b64encode(cipher_data.encrypted_iv) headers['x-oss-meta-client-side-encryption-start'] = value.decode() # wrap alg headers['x-oss-meta-client-side-encryption-wrap-alg'] = cipher_data.wrap_algorithm # cek alg headers['x-oss-meta-client-side-encryption-cek-alg'] = cipher_data.cek_algorithm def _add_crypto_header_putobject(request: models.PutObjectRequest, cipher_data: CipherData) -> None: headers = getattr(request, 'headers', None) if headers is None: headers = CaseInsensitiveDict() # convert content-md5 if request.content_md5 is not None: headers['x-oss-meta-client-side-encryption-unencrypted-content-md5'] = request.content_md5 request.content_md5 = None # convert content-length if request.content_length is not None: headers['x-oss-meta-client-side-encryption-unencrypted-content-length'] = str(request.content_length) request.content_length = None _add_crypto_common_header(headers, cipher_data) setattr(request, 'headers', headers) def _add_crypto_header_initpart(request: models.InitiateMultipartUploadRequest, cipher_data: CipherData) -> None: headers = getattr(request, 'headers', None) if headers is None: headers = CaseInsensitiveDict() # data size if utils.safety_int(int(request.cse_data_size)) > 0: headers['x-oss-meta-client-side-encryption-data-size'] = str(request.cse_data_size) # part size headers['x-oss-meta-client-side-encryption-part-size'] = str(request.cse_part_size) _add_crypto_common_header(headers, cipher_data) setattr(request, 'headers', headers) def _add_crypto_header_uploadpart(request: models.UploadPartRequest, cse_context: EncryptionMultiPartContext, cipher_data: CipherData) -> None: headers = getattr(request, 'headers', None) if headers is None: headers = CaseInsensitiveDict() # data size if utils.safety_int(int(cse_context.data_size)) > 0: headers['x-oss-meta-client-side-encryption-data-size'] = str(cse_context.data_size) # part size headers['x-oss-meta-client-side-encryption-part-size'] = str(cse_context.part_size) _add_crypto_common_header(headers, cipher_data) setattr(request, 'headers', headers) def _has_encrypted_header(headers: MutableMapping[str, str]) -> bool: return len(headers.get("x-oss-meta-client-side-encryption-key", '')) > 0 def _get_envelope_from_header(headers: MutableMapping[str, str]) -> Envelope: env = Envelope() env.iv = base64.b64decode(headers.get("x-oss-meta-client-side-encryption-start", '')) env.cipher_key = base64.b64decode(headers.get("x-oss-meta-client-side-encryption-key", '')) env.mat_desc = headers.get("x-oss-meta-client-side-encryption-matdesc", '') env.cek_algorithm = headers.get("x-oss-meta-client-side-encryption-cek-alg", '') env.wrap_algorithm = headers.get("x-oss-meta-client-side-encryption-wrap-alg", '') env.unencrypted_md5 = headers.get("x-oss-meta-client-side-encryption-unencrypted-content-md5", '') env.unencrypted_content_length = headers.get("x-oss-meta-client-side-encryption-unencrypted-content-length", '') return env def _get_envelope_from_list_parts(result: models.ListPartsResult) -> Envelope: env = Envelope() env.iv = base64.b64decode(utils.safety_str(result.client_encryption_start)) env.cipher_key = base64.b64decode(utils.safety_str(result.client_encryption_key)) env.cek_algorithm = utils.safety_str(result.client_encryption_cek_alg) env.wrap_algorithm = utils.safety_str(result.client_encryption_wrap_alg) env.mat_desc = '' return env def _is_valid_content_alg(alg_name:str) -> bool: #now content encyrption only support aec/ctr algorithm return alg_name == 'AES/CTR/NoPadding' def _adjust_range_start(start): return (start // _BLOCK_SIZE_LEN) * _BLOCK_SIZE_LEN def _valid_encryption_context(request: models.InitiateMultipartUploadRequest) -> None: part_size = request.cse_part_size if part_size is None: raise exceptions.ParamNullError(field='request.cse_part_size') if not isinstance(part_size, int): raise TypeError(f'request.cse_part_size need int, but got {type(part_size)}') if part_size <= 0: raise exceptions.ParamInvalidError(field='request.cse_part_size') if not 0 == part_size % _BLOCK_SIZE_LEN: raise ValueError(f'request.CSEPartSize must aligned to the {_BLOCK_SIZE_LEN}') data_size = request.cse_data_size if data_size is not None and not isinstance(data_size, int): raise TypeError(f'request.cse_data_size need int, but got {type(data_size)}')