Source code for oss2.models

# -*- coding: utf-8 -*-

"""
oss2.models
~~~~~~~~~~

The module contains all classes' definitions for parameters and returns values in the Python SDK API.
"""

from .utils import http_to_unixtime, make_progress_adapter, make_crc_adapter
from .exceptions import ClientError, InconsistentError
from .compat import urlunquote

[docs]class PartInfo(object): """Part information. This class is the output object of :func:`list_parts <oss2.Bucket.list_parts>`, and the input parameters for :func:`complete_multipart_upload <oss2.Bucket.complete_multipart_upload>`. :param int part_number: The part number (starting from 1) :param str etag: ETag :param int size: Part size in bytes. It is only used in `list_parts` result. :param int last_modified: The time the part was last modified in Unix time. The type is int. See :ref:`unix_time` for more information. """ def __init__(self, part_number, etag, size=None, last_modified=None): self.part_number = part_number self.etag = etag self.size = size self.last_modified = last_modified
def _hget(headers, key, converter=lambda x: x): if key in headers: return converter(headers[key]) else: return None def _get_etag(headers): return _hget(headers, 'etag', lambda x: x.strip('"'))
[docs]class RequestResult(object): def __init__(self, resp): #: HTTP response self.resp = resp #: HTTP status code (such as 200,404, etc) self.status = resp.status #: HTTP headers self.headers = resp.headers #: Request ID which is used for tracking OSS requests. It is useful when submitting a customer ticket. self.request_id = resp.headers.get('x-oss-request-id', '')
[docs]class HeadObjectResult(RequestResult): def __init__(self, resp): super(HeadObjectResult, self).__init__(resp) #: File type, three types are supported: 'Normal','Multipart','Appendable'. self.object_type = _hget(self.headers, 'x-oss-object-type') #: File's last modified time in unix time, type is int. self.last_modified = _hget(self.headers, 'last-modified', http_to_unixtime) #: File's MIME type. self.content_type = _hget(self.headers, 'content-type') #: Content-Length,it could be None。 self.content_length = _hget(self.headers, 'content-length', int) #: HTTP ETag self.etag = _get_etag(self.headers)
[docs]class GetObjectMetaResult(RequestResult): def __init__(self, resp): super(GetObjectMetaResult, self).__init__(resp) #: Last modified time of a file, in Unix time. See :ref:`unix_time` for more information. self.last_modified = _hget(self.headers, 'last-modified', http_to_unixtime) #: Content-Length,file size in bytes. Type is int. self.content_length = _hget(self.headers, 'content-length', int) #: HTTP ETag self.etag = _get_etag(self.headers)
[docs]class GetSymlinkResult(RequestResult): def __init__(self, resp): super(GetSymlinkResult, self).__init__(resp) #: The target file of the symlink file. self.target_key = urlunquote(_hget(self.headers, 'x-oss-symlink-target'))
[docs]class GetObjectResult(HeadObjectResult): def __init__(self, resp, progress_callback=None, crc_enabled=False, crypto_provider=None): super(GetObjectResult, self).__init__(resp) self.__crc_enabled = crc_enabled self.__crypto_provider = crypto_provider if _hget(resp.headers, 'x-oss-meta-oss-crypto-key') and _hget(resp.headers, 'Content-Range'): raise ClientError('Could not get an encrypted object using byte-range parameter') if progress_callback: self.stream = make_progress_adapter(self.resp, progress_callback, self.content_length) else: self.stream = self.resp self.__crc = _hget(self.headers, 'x-oss-hash-crc64ecma', int) if self.__crc_enabled: self.stream = make_crc_adapter(self.stream) if self.__crypto_provider: key = self.__crypto_provider.decrypt_oss_meta_data(resp.headers, 'x-oss-meta-oss-crypto-key') start = self.__crypto_provider.decrypt_oss_meta_data(resp.headers, 'x-oss-meta-oss-crypto-start') cek_alg = _hget(resp.headers, 'x-oss-meta-oss-cek-alg') if key and start and cek_alg: self.stream = self.__crypto_provider.make_decrypt_adapter(self.stream, key, start) else: raise InconsistentError('all metadata keys are required for decryption (x-oss-meta-oss-crypto-key, \ x-oss-meta-oss-crypto-start, x-oss-meta-oss-cek-alg)', self.request_id)
[docs] def read(self, amt=None): return self.stream.read(amt)
def __iter__(self): return iter(self.stream) @property def client_crc(self): if self.__crc_enabled: return self.stream.crc else: return None @property def server_crc(self): return self.__crc
[docs]class PutObjectResult(RequestResult): def __init__(self, resp): super(PutObjectResult, self).__init__(resp) #: HTTP ETag self.etag = _get_etag(self.headers) #: CRC value of the file uploaded. self.crc = _hget(resp.headers, 'x-oss-hash-crc64ecma', int)
[docs]class AppendObjectResult(RequestResult): def __init__(self, resp): super(AppendObjectResult, self).__init__(resp) #: HTTP ETag self.etag = _get_etag(self.headers) #: The updated CRC64 value after the append operation. self.crc = _hget(resp.headers, 'x-oss-hash-crc64ecma', int) #: The next position for append operation. self.next_position = _hget(resp.headers, 'x-oss-next-append-position', int)
[docs]class BatchDeleteObjectsResult(RequestResult): def __init__(self, resp): super(BatchDeleteObjectsResult, self).__init__(resp) #: The deleted file name list self.deleted_keys = []
[docs]class InitMultipartUploadResult(RequestResult): def __init__(self, resp): super(InitMultipartUploadResult, self).__init__(resp) #: Initial Upload ID self.upload_id = None
[docs]class ListObjectsResult(RequestResult): def __init__(self, resp): super(ListObjectsResult, self).__init__(resp) #: True means there are more files to list. False means all files are listed. self.is_truncated = False #: Paging marker for next call. It should be the value of parameter marker in next :func:`list_objects <oss2.Bucket.list_objects>` call. self.next_marker = '' #: The object list. The object type is :class:`SimplifiedObjectInfo`. self.object_list = [] #: The prefix list, type is list of str . self.prefix_list = []
[docs]class SimplifiedObjectInfo(object): def __init__(self, key, last_modified, etag, type, size, storage_class): #: The file name or common prefix name (folder name). self.key = key #: Last modified time. self.last_modified = last_modified #: HTTP ETag self.etag = etag #: File type self.type = type #: File size self.size = size #: Storage class (Standard, IA and Archive) self.storage_class = storage_class
[docs] def is_prefix(self): """It returns True if it is a common prefix(folder). Otherwise it returns False.""" return self.last_modified is None
OBJECT_ACL_DEFAULT = 'default' OBJECT_ACL_PRIVATE = 'private' OBJECT_ACL_PUBLIC_READ = 'public-read' OBJECT_ACL_PUBLIC_READ_WRITE = 'public-read-write'
[docs]class GetObjectAclResult(RequestResult): def __init__(self, resp): super(GetObjectAclResult, self).__init__(resp) #: File ACL, The value could be `OBJECT_ACL_DEFAULT`, `OBJECT_ACL_PRIVATE`, `OBJECT_ACL_PUBLIC_READ` or #: `OBJECT_ACL_PUBLIC_READ_WRITE` self.acl = ''
[docs]class SimplifiedBucketInfo(object): """The single element type in the result of :func:`list_buckets <oss2.Service.list_objects>` .""" def __init__(self, name, location, creation_date): #: Bucket name self.name = name #: Bucket location self.location = location #: Bucket created time in Unix time. See :ref:`unix_time` for more information. self.creation_date = creation_date
[docs]class ListBucketsResult(RequestResult): def __init__(self, resp): super(ListBucketsResult, self).__init__(resp) #: True means more buckets to list; False means all buckets have been listed. self.is_truncated = False #: The next paging marker. It can be the value of a parameter `marker` in :func:`list_buckets <oss2.Service.list_buckets>`. self.next_marker = '' #: Get the bucket list. The type is :class:`SimplifiedBucketInfo`. self.buckets = []
[docs]class MultipartUploadInfo(object): def __init__(self, key, upload_id, initiation_date): #: File name self.key = key #: Upload ID self.upload_id = upload_id #: The initialization time of a multipart upload in Unix time whose type is int. See :ref:`unix_time` for more information. self.initiation_date = initiation_date
[docs] def is_prefix(self): """It returns True if it is a common prefix. Otherwise it returns False""" return self.upload_id is None
[docs]class ListMultipartUploadsResult(RequestResult): def __init__(self, resp): super(ListMultipartUploadsResult, self).__init__(resp) #: True means there are more ongoing multiparts uploads to list. False means there are no more multiparts uploads to list. self.is_truncated = False #: The paging key marker. self.next_key_marker = '' #: The paging upload Id marker self.next_upload_id_marker = '' #: The list of the multiparts upload. The type is `MultipartUploadInfo`. self.upload_list = [] #: The common prefix. The type is str. self.prefix_list = []
[docs]class ListPartsResult(RequestResult): def __init__(self, resp): super(ListPartsResult, self).__init__(resp) # True means more parts to list. False means no more parts to list. self.is_truncated = False # Next paging marker self.next_marker = '' # Parts list. The type is `PartInfo`. self.parts = []
BUCKET_ACL_PRIVATE = 'private' BUCKET_ACL_PUBLIC_READ = 'public-read' BUCKET_ACL_PUBLIC_READ_WRITE = 'public-read-write' BUCKET_STORAGE_CLASS_STANDARD = 'Standard' BUCKET_STORAGE_CLASS_IA = 'IA' BUCKET_STORAGE_CLASS_ARCHIVE = 'Archive'
[docs]class GetBucketAclResult(RequestResult): def __init__(self, resp): super(GetBucketAclResult, self).__init__(resp) #: Bucket ACL, the value could be `BUCKET_ACL_PRIVATE`, `BUCKET_ACL_PUBLIC_READ`, `BUCKET_ACL_PUBLIC_READ_WRITE` . self.acl = ''
[docs]class GetBucketLocationResult(RequestResult): def __init__(self, resp): super(GetBucketLocationResult, self).__init__(resp) #: Bucket's datacenter location. self.location = ''
[docs]class BucketLogging(object): """Bucket logging configuration. :param str target_bucket: The logging files' bucket。 :param str target_prefix: The prefix of the logging files. """ def __init__(self, target_bucket, target_prefix): self.target_bucket = target_bucket self.target_prefix = target_prefix
[docs]class GetBucketLoggingResult(RequestResult, BucketLogging): def __init__(self, resp): RequestResult.__init__(self, resp) BucketLogging.__init__(self, '', '')
[docs]class BucketCreateConfig(object): def __init__(self, storage_class): self.storage_class = storage_class
[docs]class BucketStat(object): def __init__(self, storage_size_in_bytes, object_count, multi_part_upload_count): self.storage_size_in_bytes = storage_size_in_bytes self.object_count = object_count self.multi_part_upload_count = multi_part_upload_count
[docs]class AccessControlList(object): def __init__(self, grant): self.grant = grant
[docs]class Owner(object): def __init__(self, display_name, owner_id): self.display_name = display_name self.id = owner_id
[docs]class BucketInfo(object): def __init__(self, name=None, owner=None, location=None, storage_class=None, intranet_endpoint=None, extranet_endpoint=None, creation_date=None, acl=None): self.name = name self.owner = owner self.location = location self.storage_class = storage_class self.intranet_endpoint = intranet_endpoint self.extranet_endpoint = extranet_endpoint self.creation_date = creation_date self.acl = acl
[docs]class GetBucketStatResult(RequestResult, BucketStat): def __init__(self, resp): RequestResult.__init__(self, resp) BucketStat.__init__(self, 0, 0, 0)
[docs]class GetBucketInfoResult(RequestResult, BucketInfo): def __init__(self, resp): RequestResult.__init__(self, resp) BucketInfo.__init__(self)
[docs]class BucketReferer(object): """Bucket referer settings. :param bool allow_empty_referer: Flag of allowing empty Referer. :param referers: Referer list. The type of element is str. """ def __init__(self, allow_empty_referer, referers): self.allow_empty_referer = allow_empty_referer self.referers = referers
[docs]class GetBucketRefererResult(RequestResult, BucketReferer): def __init__(self, resp): RequestResult.__init__(self, resp) BucketReferer.__init__(self, False, [])
[docs]class BucketWebsite(object): """Static website configuration. :param str index_file: The home page file. :param str error_file: 404 not found file. """ def __init__(self, index_file, error_file): self.index_file = index_file self.error_file = error_file
[docs]class GetBucketWebsiteResult(RequestResult, BucketWebsite): def __init__(self, resp): RequestResult.__init__(self, resp) BucketWebsite.__init__(self, '', '')
[docs]class LifecycleExpiration(object): """Life cycle expiration. :param days: The days after last modified to trigger the expiration rule (such as delete files). :param date: The date threshold to trigger the expiration rule---after this date the expiration rule. After this date the expiration rule will always be valid (not recommended). :param created_before_date: Delete files if their last modified time is earlier than created_before_date :type date: `datetime.date` """ def __init__(self, days=None, date=None, created_before_date=None): not_none_fields = 0 if days is not None: not_none_fields += 1 if date is not None: not_none_fields += 1 if created_before_date is not None: not_none_fields += 1 if not_none_fields > 1: raise ClientError('More than one field(days, date and created_before_date) has been specified') self.days = days self.date = date self.created_before_date = created_before_date
[docs]class AbortMultipartUpload(object): """Abort multipart uploads. :param days: Delete multipart upload parts whose last modified time is 'days' (days in the past). :param created_before_date: Delete multipart upload parts whose last modified time is earlier than created_before_date. """ def __init__(self, days=None, created_before_date=None): if days is not None and created_before_date is not None: raise ClientError('days and created_before_date should not be both specified') self.days = days self.created_before_date = created_before_date
[docs]class StorageTransition(object): """Transit objects :param days: Transit objects whose last modified time is 'days' (days in the past). :param created_before_date: Transit objects whose last modified time is earlier than created_before_date. :param storage_class: The target storage type of object transited to OSS. """ def __init__(self, days=None, created_before_date=None, storage_class=None): if days is not None and created_before_date is not None: raise ClientError('days and created_before_date should not be both specified') self.days = days self.created_before_date = created_before_date self.storage_class = storage_class
[docs]class LifecycleRule(object): """Life cycle rule. :param id: Rule name. :param prefix: File prefix to match the rule. :param expiration: Expiration time. :type expiration: :class:`LifecycleExpiration` :param status: Enable or disable the rule. The value is either `LifecycleRule.ENABLED` or `LifecycleRule.DISABLED` """ ENABLED = 'Enabled' DISABLED = 'Disabled' def __init__(self, id, prefix, status=ENABLED, expiration=None, abort_multipart_upload=None, storage_transitions=None): self.id = id self.prefix = prefix self.status = status self.expiration = expiration self.abort_multipart_upload = abort_multipart_upload self.storage_transitions = storage_transitions
[docs]class BucketLifecycle(object): """The bucket's lifecycle configuration. :param rules: Lifecycle rule list. :type rules: list of :class:`LifecycleRule` """ def __init__(self, rules=None): self.rules = rules or []
[docs]class GetBucketLifecycleResult(RequestResult, BucketLifecycle): def __init__(self, resp): RequestResult.__init__(self, resp) BucketLifecycle.__init__(self)
[docs]class CorsRule(object): """CORS (cross origin resource sharing) rules :param allowed_origins: Allow origins to access the bucket. :type allowed_origins: list of str :param allowed_methods: Allowed HTTP methods for CORS. :type allowed_methods: list of str :param allowed_headers: Allowed HTTP headers for CORS. :type allowed_headers: list of str """ def __init__(self, allowed_origins=None, allowed_methods=None, allowed_headers=None, expose_headers=None, max_age_seconds=None): self.allowed_origins = allowed_origins or [] self.allowed_methods = allowed_methods or [] self.allowed_headers = allowed_headers or [] self.expose_headers = expose_headers or [] self.max_age_seconds = max_age_seconds
[docs]class BucketCors(object): def __init__(self, rules=None): self.rules = rules or []
[docs]class GetBucketCorsResult(RequestResult, BucketCors): def __init__(self, resp): RequestResult.__init__(self, resp) BucketCors.__init__(self)
[docs]class LiveChannelInfoTarget(object): """Target information in the live channel,which includes the parameters of the target protocol. :param type: Prtocol, only HLS is supported for now. :type type: str :param frag_duration: The expected time length in seconds of HLS protocol's TS files. :type frag_duration: int :param frag_count: TS file count in the m3u8 file of HLS protocol. :type frag_count: int""" def __init__(self, type = 'HLS', frag_duration = 5, frag_count = 3, playlist_name = ''): self.type = type self.frag_duration = frag_duration self.frag_count = frag_count self.playlist_name = playlist_name
[docs]class LiveChannelInfo(object): """Live channel configuration. :param status: status: The value is either "enabled" or "disabled". :type status: str :param description: The live channel's description. The maximum length is 128 bytes. :type description: str :param target: The target informtion of a pushing streaming, including the parameters about the target protocol. :type target: :class:`LiveChannelInfoTarget <oss2.models.LiveChannelInfoTarget>` :param last_modified: The last modified time of the live channel. It is only used in `ListLiveChannel`. :type last_modified: Last modified time in unix time (int type), See :ref:`unix_time`. :param name: The live channel name. :type name: str :param play_url: The play URL. :type play_url: str :param publish_url: The publish URL. :type publish_url: str""" def __init__(self, status = 'enabled', description = '', last_modified = None, name = None, play_url = None, publish_url = None): self.status = status self.description = description self.target = target self.last_modified = last_modified self.name = name self.play_url = play_url self.publish_url = publish_url
[docs]class LiveChannelList(object): """The result of live channel list operation. :param prefix: The live channel to list :type prefix: str :param marker: The paging marker in the live channel list operation. :type marker: str :param max_keys: Max entries to return. :type max_keys: int :param is_truncated: Is there more live channels to list. :type is_truncated: bool :param next_marker: The next paging marker :type marker: str :param channels: The live channel list returned. :type channels: list of :class:`LiveChannelInfo` """ def __init__(self, prefix = '', marker = '', max_keys = 100, is_truncated = False, next_marker = ''): self.prefix = prefix self.marker = marker self.max_keys = max_keys self.is_truncated = is_truncated self.next_marker = next_marker self.channels = []
[docs]class LiveChannelVideoStat(object): """The video node in LiveStat. :param width: Video width :type width: int :param height: Video height :type height: int :param frame_rate: Frame rate :type frame_rate: int :param codec: codec :type codec: str :param bandwidth: Bandwidth of the video :type bandwidth: int""" def __init__(self, width = 0, height = 0, frame_rate = 0, codec = '', bandwidth = 0): self.width = width self.height = height self.frame_rate = frame_rate self.codec = codec self.bandwidth = bandwidth
[docs]class LiveChannelAudioStat(object): """Audio node in LiveStat :param codec: Audio codec. :type codec: str :param sample_rate: Sample rate :type sample_rate: int :param bandwidth: Bandwidth :type bandwidth: int""" def __init__(self, codec = '', sample_rate = 0, bandwidth = 0): self.codec = codec self.sample_rate = sample_rate self.bandwidth = bandwidth
[docs]class LiveChannelStat(object): """LiveStat result. :param status: The live channel status :type codec: str :param remote_addr: Remote address :type remote_addr: str :param connected_time: The connected time for the pusing streaming. :type connected_time: int, unix time :param video: Video description information :type video: :class:`LiveChannelVideoStat <oss2.models.LiveChannelVideoStat>` :param audio: Audio description information :type audio: :class:`LiveChannelAudioStat <oss2.models.LiveChannelAudioStat>`""" def __init__(self, status = '', remote_addr = '', connected_time = '', video = None, audio = None): self.status = status self.remote_addr = remote_addr self.connected_time = connected_time self.video = video self.audio = audio
[docs]class LiveRecord(object): """Pushing streaming record :param start_time: The start time of the push streaming. :type start_time: int, see :ref:`unix_time`. :param end_time: The end time of the push streaming. :type end_time: int, see :ref:`unix_time` for more information. :param remote_addr: The remote address of the pushing streaming. :type remote_addr: str""" def __init__(self, start_time = '', end_time = '', remote_addr = ''): self.start_time = start_time self.end_time = end_time self.remote_addr = remote_addr
[docs]class LiveChannelHistory(object): """Pushing streaming record of the live channel.""" def __init__(self): self.records = []
[docs]class CreateLiveChannelResult(RequestResult, LiveChannelInfo): def __init__(self, resp): RequestResult.__init__(self, resp) LiveChannelInfo.__init__(self)
[docs]class GetLiveChannelResult(RequestResult, LiveChannelInfo): def __init__(self, resp): RequestResult.__init__(self, resp) LiveChannelInfo.__init__(self)
[docs]class ListLiveChannelResult(RequestResult, LiveChannelList): def __init__(self, resp): RequestResult.__init__(self, resp) LiveChannelList.__init__(self)
[docs]class GetLiveChannelStatResult(RequestResult, LiveChannelStat): def __init__(self, resp): RequestResult.__init__(self, resp) LiveChannelStat.__init__(self)
[docs]class GetLiveChannelHistoryResult(RequestResult, LiveChannelHistory): def __init__(self, resp): RequestResult.__init__(self, resp) LiveChannelHistory.__init__(self)