"""utils for sdk"""
import io
import platform
from typing import Optional, Any, MutableMapping, Tuple, Dict
import mimetypes
import os.path
from ._version import VERSION
_EXTRA_TYPES_MAP = {
".js": "application/javascript",
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
".xltx": "application/vnd.openxmlformats-officedocument.spreadsheetml.template",
".potx": "application/vnd.openxmlformats-officedocument.presentationml.template",
".ppsx": "application/vnd.openxmlformats-officedocument.presentationml.slideshow",
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
".sldx": "application/vnd.openxmlformats-officedocument.presentationml.slide",
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".dotx": "application/vnd.openxmlformats-officedocument.wordprocessingml.template",
".xlam": "application/vnd.ms-excel.addin.macroEnabled.12",
".xlsb": "application/vnd.ms-excel.sheet.binary.macroEnabled.12",
".apk": "application/vnd.android.package-archive"
}
[docs]
def safety_str(value: Optional[str]) -> str:
"""
Returns str value if the value is not None.
Returns a emtpy str if the value is None.
"""
return value if value is not None else ''
[docs]
def safety_bool(value: Optional[bool]) -> bool:
"""
Returns bool value if the value is not None.
Returns False if the value is None.
"""
return value if value is not None else False
[docs]
def safety_int(value: Optional[int]) -> int:
"""
Returns int value if the value is not None.
Returns 0 if the value is None.
"""
return value if value is not None else 0
[docs]
def ensure_boolean(val):
"""
Ensures a boolean value if a string or boolean is provided
For strings, the value for True/False is case insensitive
"""
if isinstance(val, bool):
return val
if isinstance(val, str):
return val.lower() == 'true'
return False
[docs]
def merge_dicts(dict1, dict2, append_lists=False):
"""Given two dict, merge the second dict into the first.
The dicts can have arbitrary nesting.
Args:
dict1 (Dict): the first dict.
dict2 (Dict): the second dict.
append_lists (bool, optional): If true, instead of clobbering a list with the new
value, append all of the new values onto the original list.
"""
for key in dict2:
if isinstance(dict2[key], dict):
if key in dict1 and key in dict2:
merge_dicts(dict1[key], dict2[key])
else:
dict1[key] = dict2[key]
# If the value is a list and the ``append_lists`` flag is set,
# append the new values onto the original list
elif isinstance(dict2[key], list) and append_lists:
# The value in dict1 must be a list in order to append new
# values onto it.
if key in dict1 and isinstance(dict1[key], list):
dict1[key].extend(dict2[key])
else:
dict1[key] = dict2[key]
else:
# At scalar types, we iterate and merge the
# current dict that we're on.
dict1[key] = dict2[key]
[docs]
def lowercase_dict(original):
"""Copies the given dictionary ensuring all keys are lowercase strings."""
copy = {}
for key in original:
copy[key.lower()] = original[key]
return copy
[docs]
def guess_content_type(name:str, default:str) -> str:
"""Guess the type of a file based on its name"""
ext = os.path.splitext(name)[1].lower()
if ext in _EXTRA_TYPES_MAP:
return _EXTRA_TYPES_MAP[ext]
return mimetypes.guess_type(name)[0] or default
[docs]
def guess_content_length(body: Any) -> Optional[int]:
"""Guess the content length of body"""
if not body:
return 0
try:
return len(body)
except (AttributeError, TypeError):
pass
if hasattr(body, 'seek') and hasattr(body, 'tell'):
try:
orig_pos = body.tell()
body.seek(0, os.SEEK_END)
end_file_pos = body.tell()
body.seek(orig_pos)
return end_file_pos - orig_pos
except io.UnsupportedOperation:
pass
return None
[docs]
def escape_xml_value(s: str) -> str:
"""escapeXml EscapeString writes to p the properly escaped XML equivalent
of the plain text data s
"""
ss = ''
for _, d in enumerate(s):
if d == "&":
ss += "&"
elif d == "<":
ss += "<"
elif d == ">":
ss += ">"
elif d == "\"":
ss += """
elif d == "\r":
ss += " "
elif d == "\n":
ss += " "
elif d == "\t":
ss += "	"
else:
n = ord(d)
if 0 <= n < 0x20:
ss += f'&#{n:02d};'
else:
ss += d
return ss
[docs]
def parse_content_range(content_range: str) -> Tuple[int, int, int]:
"""
Parses the content range header
accepts bytes 22-33/42 and bytes 22-33/* format
"""
if not content_range:
raise ValueError("Invalid content-range header, it is none or empty.")
if not content_range.startswith('bytes '):
raise ValueError("Invalid content-range header, it dose not start with bytes.")
vals = content_range.split(" ")[1].split("/")
if len(vals) != 2:
raise ValueError(f'Invalid content-range header: {content_range}')
rvals = vals[0].split("-")
if len(rvals) != 2:
raise ValueError(f'Invalid content-range header: {content_range}')
start = int(rvals[0])
if start < 0:
raise ValueError(f'Invalid content-range header: {start}')
end = int(rvals[1])
if end < 0:
raise ValueError(f'Invalid content-range header: {end}')
if vals[1] == "*":
size = -1
else:
size = int(vals[1])
if size <= 0:
raise ValueError(f'Invalid content-range header: {size}')
return start, end, size
[docs]
def parse_content_length(headers: MutableMapping[str, str]) -> int:
"""Parses the length from the content length header"""
if not headers.get("Content-Length", None):
raise ValueError("Missing content-length header.")
size = int(headers["Content-Length"])
if size <= 0:
raise ValueError(f"Invalid content-length header: {size}")
return size
[docs]
def parse_http_range(range_header: str) -> Tuple[int, int]:
"""
Parses the range header
It only accepts single ranges.
"""
if not range_header:
raise ValueError("Invalid range header, it is none or empty.")
if not range_header.startswith("bytes="):
raise ValueError("Invalid range header, doesn't start with bytes=.")
if range_header.count(',') > 0:
raise ValueError("Invalid range header, contains multiple ranges which isn't supported.")
dash = range_header.find('-')
if dash < 0:
raise ValueError("Invalid range header, contains no '-'")
start_str = range_header[6:dash].strip()
end_str = range_header[dash+1:].strip()
start = -1
if len(start_str) > 0:
start = int(start_str)
if start < 0:
raise ValueError(f'Invalid range header: {start_str} in {range_header}')
end = -1
if len(end_str) > 0:
end = int(end_str)
if end < 0:
raise ValueError(f'Invalid range header: {end_str} in {range_header}')
return start, end
[docs]
def is_seekable(obj: Any) -> bool:
"""Tests if this object supports Seek method
Returns True is suppurts seek, else False
"""
if hasattr(obj, 'seekable'):
return obj.seekable()
if hasattr(obj, 'seek') and hasattr(obj, 'tell'):
try:
obj.seek(0, os.SEEK_CUR)
return True
except OSError:
return False
return False
[docs]
def is_fileobj(obj: Any) -> bool:
"""Tests if this object has seek and tell method
Returns True if it is a file object, else False
"""
if hasattr(obj, 'seek') and hasattr(obj, 'tell'):
return True
return False
[docs]
def get_default_user_agent() -> str:
"""Returns the default user agent string
"""
sysinfo = f'{platform.system()}/{platform.release()}/{platform.machine()};{platform.python_version()}'
return f'alibabacloud-python-sdk-v2/{VERSION} ({sysinfo})'