Source code for oss2.crypto

# -*- coding: utf-8 -*-

"""
oss2.encryption
~~~~~~~~~~~~~~

The module contains functions and classes related to client-side encryption and decryption. 
"""
import json
from functools import partial

from oss2.utils import b64decode_from_string, b64encode_as_string
from . import utils
from .compat import to_string, to_bytes, to_unicode
from .exceptions import OssError, ClientError, OpenApiFormatError, OpenApiServerError

from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from requests.structures import CaseInsensitiveDict

from aliyunsdkcore import client
from aliyunsdkcore.acs_exception.exceptions import ServerException, ClientException
from aliyunsdkcore.http import protocol_type, format_type, method_type
from aliyunsdkkms.request.v20160120 import ListKeysRequest, GenerateDataKeyRequest, DecryptRequest, EncryptRequest

import os


[docs]class BaseCryptoProvider(object): """CryptoProvider base class provides an encryption and decryption adapter for basic data. """ def __init__(self, cipher): self.plain_key = None self.plain_start = None self.cipher = cipher
[docs] def make_encrypt_adapter(self, stream, key, start): return utils.make_cipher_adapter(stream, partial(self.cipher.encrypt, self.cipher(key, start)))
[docs] def make_decrypt_adapter(self, stream, key, start): return utils.make_cipher_adapter(stream, partial(self.cipher.decrypt, self.cipher(key, start)))
_LOCAL_RSA_TMP_DIR = '.oss-local-rsa'
[docs]class LocalRsaProvider(BaseCryptoProvider): """Use the local RSA to encrypt the data key. :param str dir: The storage path of local RSA public key and private key. :param str key: The prefix of local RSA public and private key name. :param str passphrase: The password of local RSA public key and private key. :param class cipher: Data encryption. It is aes256 by default. Users can achieve their own symmetric encryption algorithm, but need to comply with the AESCipher annotation rules. """ PUB_KEY_FILE = '.public_key.pem' PRIV_KEY_FILE = '.private_key.pem' def __init__(self, dir=None, key='', passphrase=None, cipher=utils.AESCipher): super(LocalRsaProvider, self).__init__(cipher=cipher) self.dir = dir or os.path.join(os.path.expanduser('~'), _LOCAL_RSA_TMP_DIR) utils.makedir_p(self.dir) priv_key_full_path = os.path.join(self.dir, key + self.PRIV_KEY_FILE) pub_key_full_path = os.path.join(self.dir, key + self.PUB_KEY_FILE) try: if os.path.exists(priv_key_full_path) and os.path.exists(pub_key_full_path): with open(priv_key_full_path, 'rb') as f: self.__decrypt_obj = PKCS1_OAEP.new(RSA.importKey(f.read(), passphrase=passphrase)) with open(pub_key_full_path, 'rb') as f: self.__encrypt_obj = PKCS1_OAEP.new(RSA.importKey(f.read(), passphrase=passphrase)) else: private_key = RSA.generate(2048) public_key = private_key.publickey() self.__encrypt_obj = PKCS1_OAEP.new(public_key) self.__decrypt_obj = PKCS1_OAEP.new(private_key) with open(priv_key_full_path, 'wb') as f: f.write(private_key.exportKey(passphrase=passphrase)) with open(pub_key_full_path, 'wb') as f: f.write(public_key.exportKey(passphrase=passphrase)) except (ValueError, TypeError, IndexError) as e: raise ClientError(str(e))
[docs] def build_header(self, headers=None): if not isinstance(headers, CaseInsensitiveDict): headers = CaseInsensitiveDict(headers) if 'content-md5' in headers: headers['x-oss-meta-unencrypted-content-md5'] = headers['content-md5'] del headers['content-md5'] if 'content-length' in headers: headers['x-oss-meta-unencrypted-content-length'] = headers['content-length'] del headers['content-length'] headers['x-oss-meta-oss-crypto-key'] = b64encode_as_string(self.__encrypt_obj.encrypt(self.plain_key)) headers['x-oss-meta-oss-crypto-start'] = b64encode_as_string(self.__encrypt_obj.encrypt(to_bytes(str(self.plain_start)))) headers['x-oss-meta-oss-cek-alg'] = self.cipher.ALGORITHM headers['x-oss-meta-oss-wrap-alg'] = 'rsa' self.plain_key = None self.plain_start = None return headers
[docs] def get_key(self): self.plain_key = self.cipher.get_key() return self.plain_key
[docs] def get_start(self): self.plain_start = self.cipher.get_start() return self.plain_start
[docs] def decrypt_oss_meta_data(self, headers, key, conv=lambda x:x): try: return conv(self.__decrypt_obj.decrypt(utils.b64decode_from_string(headers[key]))) except: return None
[docs]class AliKMSProvider(BaseCryptoProvider): """Use the aliyun kms service to encrypt the data key. The detailed description of KMS please refer to https://help.aliyun.com/product/28933.html?spm=a2c4g.11186623.3.1.jlYT4v This interface is not available if the Python version is less than py3.3. Refer to https://github.com/aliyun/aliyun-openapi-python-sdk/issues/61 for more information. :param str access_key_id: The access_key_id to visit the KMS key service. :param str access_key_secret: The access_key_secret to visit the KMS key service. :param str region: The kms key service region. :param str cmkey: User master key. :param str sts_token: The security token which you need to provide if you use temporary AK. :param str passphrase: Kms key service password. :param class cipher: Data encryption, default is aes256, currently supports only the default implementation. """ def __init__(self, access_key_id, access_key_secret, region, cmkey, sts_token = None, passphrase=None, cipher=utils.AESCipher): if not issubclass(cipher, utils.AESCipher): raise ClientError('AliKMSProvider only support AES256 cipher') super(AliKMSProvider, self).__init__(cipher=cipher) self.cmkey = cmkey self.sts_token = sts_token self.context = '{"x-passphrase":"' + passphrase + '"}' if passphrase else '' self.clt = client.AcsClient(access_key_id, access_key_secret, region) self.encrypted_key = None
[docs] def build_header(self, headers=None): if not isinstance(headers, CaseInsensitiveDict): headers = CaseInsensitiveDict(headers) if 'content-md5' in headers: headers['x-oss-meta-unencrypted-content-md5'] = headers['content-md5'] del headers['content-md5'] if 'content-length' in headers: headers['x-oss-meta-unencrypted-content-length'] = headers['content-length'] del headers['content-length'] headers['x-oss-meta-oss-crypto-key'] = self.encrypted_key headers['x-oss-meta-oss-crypto-start'] = self.__encrypt_data(to_bytes(str(self.plain_start))) headers['x-oss-meta-oss-cek-alg'] = self.cipher.ALGORITHM headers['x-oss-meta-oss-wrap-alg'] = 'kms' self.encrypted_key = None self.plain_start = None return headers
[docs] def get_key(self): plain_key, self.encrypted_key = self.__generate_data_key() return plain_key
[docs] def get_start(self): self.plain_start = utils.random_counter() return self.plain_start
def __generate_data_key(self): req = GenerateDataKeyRequest.GenerateDataKeyRequest() req.set_accept_format(format_type.JSON) req.set_method(method_type.POST) req.set_KeyId(self.cmkey) req.set_KeySpec('AES_256') req.set_NumberOfBytes(32) req.set_EncryptionContext(self.context) if self.sts_token: req.set_STSToken(self.sts_token) resp = self.__do(req) return b64decode_from_string(resp['Plaintext']), resp['CiphertextBlob'] def __encrypt_data(self, data): req = EncryptRequest.EncryptRequest() req.set_accept_format(format_type.JSON) req.set_method(method_type.POST) req.set_KeyId(self.cmkey) req.set_Plaintext(data) req.set_EncryptionContext(self.context) if self.sts_token: req.set_STSToken(self.sts_token) resp = self.__do(req) return resp['CiphertextBlob'] def __decrypt_data(self, data): req = DecryptRequest.DecryptRequest() req.set_accept_format(format_type.JSON) req.set_method(method_type.POST) req.set_CiphertextBlob(data) req.set_EncryptionContext(self.context) if self.sts_token: req.set_STSToken(self.sts_token) resp = self.__do(req) return resp['Plaintext'] def __do(self, req): try: body = self.clt.do_action_with_exception(req) return json.loads(to_unicode(body)) except ServerException as e: raise OpenApiServerError(e.http_status, e.request_id, e.message, e.error_code) except ClientException as e: raise ClientError(e.message) except (ValueError, TypeError) as e: raise OpenApiFormatError('Json Error: ' + str(e))
[docs] def decrypt_oss_meta_data(self, headers, key, conv=lambda x: x): try: if key.lower() == 'x-oss-meta-oss-crypto-key'.lower(): return conv(b64decode_from_string(self.__decrypt_data(headers[key]))) else: return conv(self.__decrypt_data(headers[key])) except OssError as e: raise e except: return None