# -*- coding: utf-8 -*-
"""
oss2.encryption
~~~~~~~~~~~~~~
The module contains functions and classes related to client-side encryption and decryption.
"""
import json
from functools import partial
from oss2.utils import b64decode_from_string, b64encode_as_string
from . import utils
from .compat import to_string, to_bytes, to_unicode
from .exceptions import OssError, ClientError, OpenApiFormatError, OpenApiServerError
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from requests.structures import CaseInsensitiveDict
from aliyunsdkcore import client
from aliyunsdkcore.acs_exception.exceptions import ServerException, ClientException
from aliyunsdkcore.http import protocol_type, format_type, method_type
from aliyunsdkkms.request.v20160120 import ListKeysRequest, GenerateDataKeyRequest, DecryptRequest, EncryptRequest
import os
[文档]class BaseCryptoProvider(object):
"""CryptoProvider base class provides an encryption and decryption adapter for basic data.
"""
def __init__(self, cipher):
self.plain_key = None
self.plain_start = None
self.cipher = cipher
[文档] def make_encrypt_adapter(self, stream, key, start):
return utils.make_cipher_adapter(stream, partial(self.cipher.encrypt, self.cipher(key, start)))
[文档] def make_decrypt_adapter(self, stream, key, start):
return utils.make_cipher_adapter(stream, partial(self.cipher.decrypt, self.cipher(key, start)))
_LOCAL_RSA_TMP_DIR = '.oss-local-rsa'
[文档]class LocalRsaProvider(BaseCryptoProvider):
"""Use the local RSA to encrypt the data key.
:param str dir: The storage path of local RSA public key and private key.
:param str key: The prefix of local RSA public and private key name.
:param str passphrase: The password of local RSA public key and private key.
:param class cipher: Data encryption. It is aes256 by default. Users can achieve their own symmetric encryption algorithm, but need to comply with the AESCipher annotation rules.
"""
PUB_KEY_FILE = '.public_key.pem'
PRIV_KEY_FILE = '.private_key.pem'
def __init__(self, dir=None, key='', passphrase=None, cipher=utils.AESCipher):
super(LocalRsaProvider, self).__init__(cipher=cipher)
self.dir = dir or os.path.join(os.path.expanduser('~'), _LOCAL_RSA_TMP_DIR)
utils.makedir_p(self.dir)
priv_key_full_path = os.path.join(self.dir, key + self.PRIV_KEY_FILE)
pub_key_full_path = os.path.join(self.dir, key + self.PUB_KEY_FILE)
try:
if os.path.exists(priv_key_full_path) and os.path.exists(pub_key_full_path):
with open(priv_key_full_path, 'rb') as f:
self.__decrypt_obj = PKCS1_OAEP.new(RSA.importKey(f.read(), passphrase=passphrase))
with open(pub_key_full_path, 'rb') as f:
self.__encrypt_obj = PKCS1_OAEP.new(RSA.importKey(f.read(), passphrase=passphrase))
else:
private_key = RSA.generate(2048)
public_key = private_key.publickey()
self.__encrypt_obj = PKCS1_OAEP.new(public_key)
self.__decrypt_obj = PKCS1_OAEP.new(private_key)
with open(priv_key_full_path, 'wb') as f:
f.write(private_key.exportKey(passphrase=passphrase))
with open(pub_key_full_path, 'wb') as f:
f.write(public_key.exportKey(passphrase=passphrase))
except (ValueError, TypeError, IndexError) as e:
raise ClientError(str(e))
[文档] def get_key(self):
self.plain_key = self.cipher.get_key()
return self.plain_key
[文档] def get_start(self):
self.plain_start = self.cipher.get_start()
return self.plain_start
[文档]class AliKMSProvider(BaseCryptoProvider):
"""Use the aliyun kms service to encrypt the data key. The detailed description of KMS please refer to https://help.aliyun.com/product/28933.html?spm=a2c4g.11186623.3.1.jlYT4v
This interface is not available if the Python version is less than py3.3. Refer to https://github.com/aliyun/aliyun-openapi-python-sdk/issues/61 for more information.
:param str access_key_id: The access_key_id to visit the KMS key service.
:param str access_key_secret: The access_key_secret to visit the KMS key service.
:param str region: The kms key service region.
:param str cmkey: User master key.
:param str sts_token: The security token which you need to provide if you use temporary AK.
:param str passphrase: Kms key service password.
:param class cipher: Data encryption, default is aes256, currently supports only the default implementation.
"""
def __init__(self, access_key_id, access_key_secret, region, cmkey, sts_token = None, passphrase=None, cipher=utils.AESCipher):
if not issubclass(cipher, utils.AESCipher):
raise ClientError('AliKMSProvider only support AES256 cipher')
super(AliKMSProvider, self).__init__(cipher=cipher)
self.cmkey = cmkey
self.sts_token = sts_token
self.context = '{"x-passphrase":"' + passphrase + '"}' if passphrase else ''
self.clt = client.AcsClient(access_key_id, access_key_secret, region)
self.encrypted_key = None
[文档] def get_key(self):
plain_key, self.encrypted_key = self.__generate_data_key()
return plain_key
[文档] def get_start(self):
self.plain_start = utils.random_counter()
return self.plain_start
def __generate_data_key(self):
req = GenerateDataKeyRequest.GenerateDataKeyRequest()
req.set_accept_format(format_type.JSON)
req.set_method(method_type.POST)
req.set_KeyId(self.cmkey)
req.set_KeySpec('AES_256')
req.set_NumberOfBytes(32)
req.set_EncryptionContext(self.context)
if self.sts_token:
req.set_STSToken(self.sts_token)
resp = self.__do(req)
return b64decode_from_string(resp['Plaintext']), resp['CiphertextBlob']
def __encrypt_data(self, data):
req = EncryptRequest.EncryptRequest()
req.set_accept_format(format_type.JSON)
req.set_method(method_type.POST)
req.set_KeyId(self.cmkey)
req.set_Plaintext(data)
req.set_EncryptionContext(self.context)
if self.sts_token:
req.set_STSToken(self.sts_token)
resp = self.__do(req)
return resp['CiphertextBlob']
def __decrypt_data(self, data):
req = DecryptRequest.DecryptRequest()
req.set_accept_format(format_type.JSON)
req.set_method(method_type.POST)
req.set_CiphertextBlob(data)
req.set_EncryptionContext(self.context)
if self.sts_token:
req.set_STSToken(self.sts_token)
resp = self.__do(req)
return resp['Plaintext']
def __do(self, req):
try:
body = self.clt.do_action_with_exception(req)
return json.loads(to_unicode(body))
except ServerException as e:
raise OpenApiServerError(e.http_status, e.request_id, e.message, e.error_code)
except ClientException as e:
raise ClientError(e.message)
except (ValueError, TypeError) as e:
raise OpenApiFormatError('Json Error: ' + str(e))