Source code for oss2.xml_utils

# -*- coding: utf-8 -*-

"""
oss2.xml_utils
~~~~~~~~~~~~~~

Utility class for XML processing.

It includes two kind of APIs:
    - APIs starting with parse_: This is for paring the xml from OSS server
    - APIs starting with to_: This is for generating the xml to sent to OSS servers

"""

import xml.etree.ElementTree as ElementTree

from .models import (SimplifiedObjectInfo,
                     SimplifiedBucketInfo,
                     PartInfo,
                     MultipartUploadInfo,
                     LifecycleRule,
                     LifecycleExpiration,
                     CorsRule,
                     LiveChannelInfoTarget,
                     LiveChannelInfo,
                     LiveRecord,
                     LiveChannelVideoStat,
                     LiveChannelAudioStat,
                     Owner,
                     AccessControlList,
                     AbortMultipartUpload,
                     StorageTransition)

from .compat import urlunquote, to_unicode, to_string
from .utils import iso8601_to_unixtime, date_to_iso8601, iso8601_to_date


def _find_tag(parent, path):
    child = parent.find(path)
    if child is None:
        raise RuntimeError("parse xml: " + path + " could not be found under " + parent.tag)

    if child.text is None:
        return ''

    return to_string(child.text)


def _find_bool(parent, path):
    text = _find_tag(parent, path)
    if text == 'true':
        return True
    elif text == 'false':
        return False
    else:
        raise RuntimeError("parse xml: value of " + path + " is not a boolean under " + parent.tag)


def _find_int(parent, path):
    return int(_find_tag(parent, path))


def _find_object(parent, path, url_encoded):
    name = _find_tag(parent, path)
    if url_encoded:
        return urlunquote(name)
    else:
        return name


def _find_all_tags(parent, tag):
    return [to_string(node.text) or '' for node in parent.findall(tag)]


def _is_url_encoding(root):
    node = root.find('EncodingType')
    if node is not None and to_string(node.text) == 'url':
        return True
    else:
        return False


def _node_to_string(root):
    return ElementTree.tostring(root, encoding='utf-8')


def _add_node_list(parent, tag, entries):
    for e in entries:
        _add_text_child(parent, tag, e)


def _add_text_child(parent, tag, text):
    ElementTree.SubElement(parent, tag).text = to_unicode(text)

def _add_node_child(parent, tag):
    return ElementTree.SubElement(parent, tag)

[docs]def parse_list_objects(result, body): root = ElementTree.fromstring(body) url_encoded = _is_url_encoding(root) result.is_truncated = _find_bool(root, 'IsTruncated') if result.is_truncated: result.next_marker = _find_object(root, 'NextMarker', url_encoded) for contents_node in root.findall('Contents'): result.object_list.append(SimplifiedObjectInfo( _find_object(contents_node, 'Key', url_encoded), iso8601_to_unixtime(_find_tag(contents_node, 'LastModified')), _find_tag(contents_node, 'ETag').strip('"'), _find_tag(contents_node, 'Type'), int(_find_tag(contents_node, 'Size')), _find_tag(contents_node, 'StorageClass') )) for prefix_node in root.findall('CommonPrefixes'): result.prefix_list.append(_find_object(prefix_node, 'Prefix', url_encoded)) return result
[docs]def parse_list_buckets(result, body): root = ElementTree.fromstring(body) if root.find('IsTruncated') is None: result.is_truncated = False else: result.is_truncated = _find_bool(root, 'IsTruncated') if result.is_truncated: result.next_marker = _find_tag(root, 'NextMarker') for bucket_node in root.findall('Buckets/Bucket'): result.buckets.append(SimplifiedBucketInfo( _find_tag(bucket_node, 'Name'), _find_tag(bucket_node, 'Location'), iso8601_to_unixtime(_find_tag(bucket_node, 'CreationDate')) ))
[docs]def parse_init_multipart_upload(result, body): root = ElementTree.fromstring(body) result.upload_id = _find_tag(root, 'UploadId') return result
[docs]def parse_list_multipart_uploads(result, body): root = ElementTree.fromstring(body) url_encoded = _is_url_encoding(root) result.is_truncated = _find_bool(root, 'IsTruncated') result.next_key_marker = _find_object(root, 'NextKeyMarker', url_encoded) result.next_upload_id_marker = _find_tag(root, 'NextUploadIdMarker') for upload_node in root.findall('Upload'): result.upload_list.append(MultipartUploadInfo( _find_object(upload_node, 'Key', url_encoded), _find_tag(upload_node, 'UploadId'), iso8601_to_unixtime(_find_tag(upload_node, 'Initiated')) )) for prefix_node in root.findall('CommonPrefixes'): result.prefix_list.append(_find_object(prefix_node, 'Prefix', url_encoded)) return result
[docs]def parse_list_parts(result, body): root = ElementTree.fromstring(body) result.is_truncated = _find_bool(root, 'IsTruncated') result.next_marker = _find_tag(root, 'NextPartNumberMarker') for part_node in root.findall('Part'): result.parts.append(PartInfo( _find_int(part_node, 'PartNumber'), _find_tag(part_node, 'ETag').strip('"'), size=_find_int(part_node, 'Size'), last_modified=iso8601_to_unixtime(_find_tag(part_node, 'LastModified')) )) return result
[docs]def parse_batch_delete_objects(result, body): if not body: return result root = ElementTree.fromstring(body) url_encoded = _is_url_encoding(root) for deleted_node in root.findall('Deleted'): result.deleted_keys.append(_find_object(deleted_node, 'Key', url_encoded)) return result
[docs]def parse_get_bucket_acl(result, body): root = ElementTree.fromstring(body) result.acl = _find_tag(root, 'AccessControlList/Grant') return result
parse_get_object_acl = parse_get_bucket_acl
[docs]def parse_get_bucket_location(result, body): result.location = to_string(ElementTree.fromstring(body).text) return result
[docs]def parse_get_bucket_logging(result, body): root = ElementTree.fromstring(body) if root.find('LoggingEnabled/TargetBucket') is not None: result.target_bucket = _find_tag(root, 'LoggingEnabled/TargetBucket') if root.find('LoggingEnabled/TargetPrefix') is not None: result.target_prefix = _find_tag(root, 'LoggingEnabled/TargetPrefix') return result
[docs]def parse_get_bucket_stat(result, body): root = ElementTree.fromstring(body) result.storage_size_in_bytes = _find_int(root, 'Storage') result.object_count = _find_int(root, 'ObjectCount') result.multi_part_upload_count = _find_int(root, 'MultipartUploadCount') return result
[docs]def parse_get_bucket_info(result, body): root = ElementTree.fromstring(body) result.name = _find_tag(root, 'Bucket/Name') result.creation_date = _find_tag(root, 'Bucket/CreationDate') result.storage_class = _find_tag(root, 'Bucket/StorageClass') result.extranet_endpoint = _find_tag(root, 'Bucket/ExtranetEndpoint') result.intranet_endpoint = _find_tag(root, 'Bucket/IntranetEndpoint') result.location = _find_tag(root, 'Bucket/Location') result.owner = Owner(_find_tag(root, 'Bucket/Owner/DisplayName'), _find_tag(root, 'Bucket/Owner/ID')) result.acl = AccessControlList(_find_tag(root, 'Bucket/AccessControlList/Grant')) return result
[docs]def parse_get_bucket_referer(result, body): root = ElementTree.fromstring(body) result.allow_empty_referer = _find_bool(root, 'AllowEmptyReferer') result.referers = _find_all_tags(root, 'RefererList/Referer') return result
[docs]def parse_get_bucket_websiste(result, body): root = ElementTree.fromstring(body) result.index_file = _find_tag(root, 'IndexDocument/Suffix') result.error_file = _find_tag(root, 'ErrorDocument/Key') return result
[docs]def parse_create_live_channel(result, body): root = ElementTree.fromstring(body) result.play_url = _find_tag(root, 'PlayUrls/Url') result.publish_url = _find_tag(root, 'PublishUrls/Url') return result
[docs]def parse_get_live_channel(result, body): root = ElementTree.fromstring(body) result.status = _find_tag(root, 'Status') result.description = _find_tag(root, 'Description') target = LiveChannelInfoTarget() target.type = _find_tag(root, 'Target/Type') target.frag_duration = _find_tag(root, 'Target/FragDuration') target.frag_count = _find_tag(root, 'Target/FragCount') target.playlist_name = _find_tag(root, 'Target/PlaylistName') result.target = target return result
[docs]def parse_list_live_channel(result, body): root = ElementTree.fromstring(body) result.prefix = _find_tag(root, 'Prefix') result.marker = _find_tag(root, 'Marker') result.max_keys = _find_int(root, 'MaxKeys') result.is_truncated = _find_bool(root, 'IsTruncated') if result.is_truncated: result.next_marker = _find_tag(root, 'NextMarker') channels = root.findall('LiveChannel') for channel in channels: tmp = LiveChannelInfo() tmp.name = _find_tag(channel, 'Name') tmp.description = _find_tag(channel, 'Description') tmp.status = _find_tag(channel, 'Status') tmp.last_modified = iso8601_to_unixtime(_find_tag(channel, 'LastModified')) tmp.play_url = _find_tag(channel, 'PlayUrls/Url') tmp.publish_url = _find_tag(channel, 'PublishUrls/Url') result.channels.append(tmp) return result
[docs]def parse_stat_video(video_node, video): video.width = _find_int(video_node, 'Width') video.height = _find_int(video_node, 'Height') video.frame_rate = _find_int(video_node, 'FrameRate') video.bandwidth = _find_int(video_node, 'Bandwidth') video.codec = _find_tag(video_node, 'Codec')
[docs]def parse_stat_audio(audio_node, audio): audio.bandwidth = _find_int(audio_node, 'Bandwidth') audio.sample_rate = _find_int(audio_node, 'SampleRate') audio.codec = _find_tag(audio_node, 'Codec')
[docs]def parse_live_channel_stat(result, body): root = ElementTree.fromstring(body) result.status = _find_tag(root, 'Status') if root.find('RemoteAddr') is not None: result.remote_addr = _find_tag(root, 'RemoteAddr') if root.find('ConnectedTime') is not None: result.connected_time = iso8601_to_unixtime(_find_tag(root, 'ConnectedTime')) video_node = root.find('Video') audio_node = root.find('Audio') if video_node is not None: result.video = LiveChannelVideoStat() parse_stat_video(video_node, result.video) if audio_node is not None: result.audio = LiveChannelAudioStat() parse_stat_audio(audio_node, result.audio) return result
[docs]def parse_live_channel_history(result, body): root = ElementTree.fromstring(body) records = root.findall('LiveRecord') for record in records: tmp = LiveRecord() tmp.start_time = iso8601_to_unixtime(_find_tag(record, 'StartTime')) tmp.end_time = iso8601_to_unixtime(_find_tag(record, 'EndTime')) tmp.remote_addr = _find_tag(record, 'RemoteAddr') result.records.append(tmp) return result
[docs]def parse_lifecycle_expiration(expiration_node): if expiration_node is None: return None expiration = LifecycleExpiration() if expiration_node.find('Days') is not None: expiration.days = _find_int(expiration_node, 'Days') elif expiration_node.find('Date') is not None: expiration.date = iso8601_to_date(_find_tag(expiration_node, 'Date')) return expiration
[docs]def parse_lifecycle_abort_multipart_upload(abort_multipart_upload_node): if abort_multipart_upload_node is None: return None abort_multipart_upload = AbortMultipartUpload() if abort_multipart_upload_node.find('Days') is not None: abort_multipart_upload.days = _find_int(abort_multipart_upload_node, 'Days') elif abort_multipart_upload_node.find('CreatedBeforeDate') is not None: abort_multipart_upload.created_before_date = iso8601_to_date(_find_tag(abort_multipart_upload_node, 'CreatedBeforeDate')) return abort_multipart_upload
[docs]def parse_lifecycle_storage_transitions(storage_transition_nodes): storage_transitions = [] for storage_transition_node in storage_transition_nodes: storage_class = _find_tag(storage_transition_node, 'StorageClass') storage_transition = StorageTransition(storage_class=storage_class) if storage_transition_node.find('Days') is not None: storage_transition.days = _find_int(storage_transition_node, 'Days') elif storage_transition_node.find('CreatedBeforeDate') is not None: storage_transition.created_before_date = iso8601_to_date(_find_tag(storage_transition_node, 'CreatedBeforeDate')) storage_transitions.append(storage_transition) return storage_transitions
[docs]def parse_get_bucket_lifecycle(result, body): root = ElementTree.fromstring(body) for rule_node in root.findall('Rule'): expiration = parse_lifecycle_expiration(rule_node.find('Expiration')) abort_multipart_upload = parse_lifecycle_abort_multipart_upload(rule_node.find('AbortMultipartUpload')) storage_transitions = parse_lifecycle_storage_transitions(rule_node.findall('Transition')) rule = LifecycleRule( _find_tag(rule_node, 'ID'), _find_tag(rule_node, 'Prefix'), status=_find_tag(rule_node, 'Status'), expiration=expiration, abort_multipart_upload=abort_multipart_upload, storage_transitions=storage_transitions ) result.rules.append(rule) return result
[docs]def parse_get_bucket_cors(result, body): root = ElementTree.fromstring(body) for rule_node in root.findall('CORSRule'): rule = CorsRule() rule.allowed_origins = _find_all_tags(rule_node, 'AllowedOrigin') rule.allowed_methods = _find_all_tags(rule_node, 'AllowedMethod') rule.allowed_headers = _find_all_tags(rule_node, 'AllowedHeader') rule.expose_headers = _find_all_tags(rule_node, 'ExposeHeader') max_age_node = rule_node.find('MaxAgeSeconds') if max_age_node is not None: rule.max_age_seconds = int(max_age_node.text) result.rules.append(rule) return result
[docs]def to_complete_upload_request(parts): root = ElementTree.Element('CompleteMultipartUpload') for p in parts: part_node = ElementTree.SubElement(root, "Part") _add_text_child(part_node, 'PartNumber', str(p.part_number)) _add_text_child(part_node, 'ETag', '"{0}"'.format(p.etag)) return _node_to_string(root)
[docs]def to_batch_delete_objects_request(keys, quiet): root_node = ElementTree.Element('Delete') _add_text_child(root_node, 'Quiet', str(quiet).lower()) for key in keys: object_node = ElementTree.SubElement(root_node, 'Object') _add_text_child(object_node, 'Key', key) return _node_to_string(root_node)
[docs]def to_put_bucket_config(bucket_config): root = ElementTree.Element('CreateBucketConfiguration') _add_text_child(root, 'StorageClass', str(bucket_config.storage_class)) return _node_to_string(root)
[docs]def to_put_bucket_logging(bucket_logging): root = ElementTree.Element('BucketLoggingStatus') if bucket_logging.target_bucket: logging_node = ElementTree.SubElement(root, 'LoggingEnabled') _add_text_child(logging_node, 'TargetBucket', bucket_logging.target_bucket) _add_text_child(logging_node, 'TargetPrefix', bucket_logging.target_prefix) return _node_to_string(root)
[docs]def to_put_bucket_referer(bucket_referer): root = ElementTree.Element('RefererConfiguration') _add_text_child(root, 'AllowEmptyReferer', str(bucket_referer.allow_empty_referer).lower()) list_node = ElementTree.SubElement(root, 'RefererList') for r in bucket_referer.referers: _add_text_child(list_node, 'Referer', r) return _node_to_string(root)
[docs]def to_put_bucket_website(bucket_websiste): root = ElementTree.Element('WebsiteConfiguration') index_node = ElementTree.SubElement(root, 'IndexDocument') _add_text_child(index_node, 'Suffix', bucket_websiste.index_file) error_node = ElementTree.SubElement(root, 'ErrorDocument') _add_text_child(error_node, 'Key', bucket_websiste.error_file) return _node_to_string(root)
[docs]def to_put_bucket_lifecycle(bucket_lifecycle): root = ElementTree.Element('LifecycleConfiguration') for rule in bucket_lifecycle.rules: rule_node = ElementTree.SubElement(root, 'Rule') _add_text_child(rule_node, 'ID', rule.id) _add_text_child(rule_node, 'Prefix', rule.prefix) _add_text_child(rule_node, 'Status', rule.status) expiration = rule.expiration if expiration: expiration_node = ElementTree.SubElement(rule_node, 'Expiration') if expiration.days is not None: _add_text_child(expiration_node, 'Days', str(expiration.days)) elif expiration.date is not None: _add_text_child(expiration_node, 'Date', date_to_iso8601(expiration.date)) elif expiration.created_before_date is not None: _add_text_child(expiration_node, 'CreatedBeforeDate', date_to_iso8601(expiration.created_before_date)) abort_multipart_upload = rule.abort_multipart_upload if abort_multipart_upload: abort_multipart_upload_node = ElementTree.SubElement(rule_node, 'AbortMultipartUpload') if abort_multipart_upload.days is not None: _add_text_child(abort_multipart_upload_node, 'Days', str(abort_multipart_upload.days)) elif abort_multipart_upload.created_before_date is not None: _add_text_child(abort_multipart_upload_node, 'CreatedBeforeDate', date_to_iso8601(abort_multipart_upload.created_before_date)) storage_transitions = rule.storage_transitions if storage_transitions: for storage_transition in storage_transitions: storage_transition_node = ElementTree.SubElement(rule_node, 'Transition') _add_text_child(storage_transition_node, 'StorageClass', str(storage_transition.storage_class)) if storage_transition.days is not None: _add_text_child(storage_transition_node, 'Days', str(storage_transition.days)) elif storage_transition.created_before_date is not None: _add_text_child(storage_transition_node, 'CreatedBeforeDate', date_to_iso8601(storage_transition.created_before_date)) return _node_to_string(root)
[docs]def to_put_bucket_cors(bucket_cors): root = ElementTree.Element('CORSConfiguration') for rule in bucket_cors.rules: rule_node = ElementTree.SubElement(root, 'CORSRule') _add_node_list(rule_node, 'AllowedOrigin', rule.allowed_origins) _add_node_list(rule_node, 'AllowedMethod', rule.allowed_methods) _add_node_list(rule_node, 'AllowedHeader', rule.allowed_headers) _add_node_list(rule_node, 'ExposeHeader', rule.expose_headers) if rule.max_age_seconds is not None: _add_text_child(rule_node, 'MaxAgeSeconds', str(rule.max_age_seconds)) return _node_to_string(root)
[docs]def to_create_live_channel(live_channel): root = ElementTree.Element('LiveChannelConfiguration') _add_text_child(root, 'Description', live_channel.description) _add_text_child(root, 'Status', live_channel.status) target_node = _add_node_child(root, 'Target') _add_text_child(target_node, 'Type', live_channel.target.type) _add_text_child(target_node, 'FragDuration', str(live_channel.target.frag_duration)) _add_text_child(target_node, 'FragCount', str(live_channel.target.frag_count)) _add_text_child(target_node, 'PlaylistName', str(live_channel.target.playlist_name)) return _node_to_string(root)