Source code for alibabacloud_oss_v2.checkpoint

import os
import json
from typing import MutableMapping, Dict
from urllib.parse import quote
from tempfile import gettempdir
import hashlib
from .models import GetObjectRequest, PutObjectRequest
from .utils import safety_str, parse_http_range
from .io_utils import LimitReader
from .crc import Crc64
from .defaults import (
    CHECKPOINT_FILE_SUFFIX_DOWNLOADER,
    CHECKPOINT_FILE_SUFFIX_UPLOADER,
    CHECKPOINT_MAGIC
)

# ----- download checkpoint  -----
# {
#     "CpDirPath": string,         // checkpoint dir full path
#     "CpFilePath": string,        // checkpoint file full path
#     "VerifyData": bool,          // verify downloaded data in FilePath
#     "Loaded": bool,              // If Info.Data.DownloadInfo is loaded from checkpoint
#     "Info": {                    // checkpoint data
#         "Magic": string,         // Magic
#         "MD5": string,           // The Data's MD5
#         "Data": {
#             "ObjectInfo": {         // source
#                "Name": string,
#                "VersionId": string,
#                "Range": string,
#             },
#             "ObjectMeta": {
#                 "Size": int,
#                 "LastModified": string,
#                 "ETag": string,
#             },
#             "FilePath": string,    // destination, Local file
#             "PartSize": int,
#
#             "DownloadInfo": {      // download info
#                 "Offset": int,
#                 "CRC64": int
#             },
#         }
#     },
# }

[docs] class DownloadCheckpoint: """Download Checkpoint """ def __init__( self, request: GetObjectRequest, filepath: str, basedir: str, headers: MutableMapping, part_size: int ) -> None: name = f'{request.bucket}/{request.key}' canon_src = f'oss://{quote(name)}\n{safety_str(request.version_id)}\n{safety_str(request.range_header)}' h = hashlib.md5() h.update(canon_src.encode()) src_hash = h.hexdigest() absfilepath = os.path.abspath(filepath) h = hashlib.md5() h.update(absfilepath.encode()) dst_hash = h.hexdigest() if not basedir: dirbase = gettempdir() else: dirbase = os.path.dirname(basedir) cp_filepath = os.path.join(dirbase, f'{src_hash}-{dst_hash}{CHECKPOINT_FILE_SUFFIX_DOWNLOADER}') object_size = int(headers.get("Content-Length")) self.cp_filepath = cp_filepath self.cp_dirpath = dirbase self.verify_data = False self.loaded = False self.cp_info = { "Magic": CHECKPOINT_MAGIC, #"MD5": md5hex, "Data": { "ObjectInfo": { "Name": f'oss://{name}', "VersionId": safety_str(request.version_id), "Range": safety_str(request.range_header), }, "ObjectMeta": { "Size": object_size, "LastModified": headers.get("Last-Modified", ''), "ETag": headers.get("ETag", ''), }, "FilePath": filepath, "PartSize": part_size, }, } self.doffset = 0 self.dcrc64 = 0
[docs] def load(self): """load checkpoint from local file """ if len(self.cp_dirpath) > 0 and not os.path.isdir(self.cp_dirpath): raise ValueError(f'Invaid checkpoint dir {self.cp_dirpath}') if not os.path.isfile(self.cp_filepath): return if not self._is_valid(): self.remove() return self.loaded = True
def _is_valid(self) -> bool: try: dcp_info = {} with open(self.cp_filepath, 'rb') as f: dcp_info = json.loads(f.read()) if not isinstance(dcp_info, Dict): return False js = json.dumps(dcp_info.get("Data", {})).encode() h = hashlib.md5() h.update(js) md5sum = h.hexdigest() if (CHECKPOINT_MAGIC != dcp_info.get("Magic") or md5sum != dcp_info.get("MD5")): return False cpid = self.cp_info["Data"] dcpid = dcp_info["Data"] #compare if (cpid["ObjectInfo"] != dcpid["ObjectInfo"] or cpid["ObjectMeta"] != dcpid["ObjectMeta"] or cpid["FilePath"] != dcpid["FilePath"] or cpid["PartSize"] != dcpid["PartSize"]): return False #download info offset = dcpid["DownloadInfo"].get('Offset', 0) crc64 = dcpid["DownloadInfo"].get('CRC64', 0) if (not isinstance(offset, int) or not isinstance(crc64, int)): return False if offset == 0 and crc64 != 0: return False roffset = 0 if len(cpid["ObjectInfo"]["Range"]) > 0: range_header = parse_http_range(cpid["ObjectInfo"]["Range"]) if offset < range_header[0]: return False roffset = range_header[0] remains = (offset - roffset) % dcpid["PartSize"] if remains != 0: return False #valid data if self.verify_data and crc64 != 0: try: with open(dcpid["FilePath"], 'rb') as f: chash = Crc64(0) limitn = offset - roffset r = LimitReader(f, limitn) chunk = 32 * 1024 for _ in range(0, limitn, chunk): chash.write(r.read(chunk)) if chash.sum64() != crc64: return False except Exception: return False self.doffset = offset self.dcrc64 = crc64 return True except Exception: #print(f"err = {err}") pass return False
[docs] def dump(self) -> bool: """dump Returns: _type_: _description_ """ #Calculate MD5 self.cp_info["Data"]["DownloadInfo"] = { "Offset": self.doffset, "CRC64": self.dcrc64 } js = json.dumps(self.cp_info["Data"]).encode() h = hashlib.md5() h.update(js) self.cp_info["MD5"] = h.hexdigest() #Serialize try: js = json.dumps(self.cp_info).encode() with open(self.cp_filepath, 'wb') as f: f.write(js) except (OSError, ValueError): return False return True
[docs] def remove(self) -> None: try: os.remove(self.cp_filepath) except (OSError, ValueError): pass
# ----- upload checkpoint ----- # { # "CpDirPath": string, // checkpoint dir full path # "CpFilePath": string, // checkpoint file full path # "Loaded": bool, // If Info.Data.DownloadInfo is loaded from checkpoint # "Info": { // checkpoint data # "Magic": string, // Magic # "MD5": string, // The Data's MD5 # "Data": { # "FilePath": string, // source, Local file # "FileMeta": { // source # "Size": int, # "LastModified": string, # }, # "ObjectInfo": { # "Name": string, # }, # # "PartSize": int, # "UploadInfo": { // upload info # "UploadId": string, # }, # } # }, # }
[docs] class UploadCheckpoint: """Upload Checkpoint """ def __init__( self, request: PutObjectRequest, filepath: str, basedir: str, fileinfo: os.stat_result, part_size: int ) -> None: name = f'{request.bucket}/{request.key}' canon_dst = f'oss://{quote(name)}' h = hashlib.md5() h.update(canon_dst.encode()) dst_hash = h.hexdigest() absfilepath = os.path.abspath(filepath) h = hashlib.md5() h.update(absfilepath.encode()) src_hash = h.hexdigest() if len(basedir) == 0: dirbase = gettempdir() else: dirbase = os.path.dirname(basedir) cp_filepath = os.path.join(dirbase, f'{src_hash}-{dst_hash}{CHECKPOINT_FILE_SUFFIX_UPLOADER}') self.cp_filepath = cp_filepath self.cp_dirpath = dirbase self.loaded = False self.cp_info = { "Magic": CHECKPOINT_MAGIC, #"MD5": md5hex, "Data": { "FilePath": filepath, "FileMeta": { "Size": fileinfo.st_size, "LastModified": fileinfo.st_mtime, }, "ObjectInfo": { "Name": f'oss://{name}', }, "PartSize": part_size, }, } self.upload_id = ''
[docs] def load(self): """load checkpoint from local file """ if len(self.cp_dirpath) > 0 and not os.path.isdir(self.cp_dirpath): raise ValueError(f'Invaid checkpoint dir {self.cp_dirpath}') if not os.path.isfile(self.cp_filepath): return if not self._is_valid(): self.remove() return self.loaded = True
def _is_valid(self) -> bool: try: ucp_info = {} with open(self.cp_filepath, 'rb') as f: ucp_info = json.loads(f.read()) if not isinstance(ucp_info, Dict): return False js = json.dumps(ucp_info.get("Data", {})).encode() h = hashlib.md5() h.update(js) md5sum = h.hexdigest() if (CHECKPOINT_MAGIC != ucp_info.get("Magic") or md5sum != ucp_info.get("MD5")): return False cpid = self.cp_info["Data"] ucpid = ucp_info["Data"] #compare if (cpid["ObjectInfo"] != ucpid["ObjectInfo"] or cpid["FileMeta"] != ucpid["FileMeta"] or cpid["FilePath"] != ucpid["FilePath"] or cpid["PartSize"] != ucpid["PartSize"]): return False #upload info uploadid = ucpid["UploadInfo"]["UploadId"] if not isinstance(uploadid, str) or uploadid == '': return False self.upload_id = uploadid return True except Exception: #print(f"err = {err}") pass return False
[docs] def dump(self) -> bool: """dump Returns: _type_: _description_ """ #Calculate MD5 self.cp_info["Data"]["UploadInfo"] = { "UploadId": self.upload_id, } js = json.dumps(self.cp_info["Data"]).encode() h = hashlib.md5() h.update(js) self.cp_info["MD5"] = h.hexdigest() #Serialize try: js = json.dumps(self.cp_info).encode() with open(self.cp_filepath, 'wb') as f: f.write(js) except (OSError, ValueError): return False return True
[docs] def remove(self) -> None: """remove """ try: os.remove(self.cp_filepath) except (OSError, ValueError): pass