"""utils for stream wrapper"""
import os
import sys
import errno
import threading
from typing import Optional, IO, List, Iterable, Any, AnyStr, Iterator
from . import utils
from .types import StreamBody, HttpResponse
# pylint: disable=no-member
# pylint: disable=protected-access
[docs]
class TeeIterator:
"""A Iterator that writes to w what it reads from source
"""
def __iter__(self):
return self.iter_bytes()
def __next__(self):
d = self.next()
if self._writers is not None:
for w in self._writers:
w.write(d)
return d
[docs]
def seekable(self):
"""Is there a file pointer offset
"""
return self._seekable
[docs]
def reset(self) -> None:
"""Resets the buffer to the marked position. The marked position
"""
if self._writers is not None:
for w in self._writers:
if hasattr(self._writers, 'reset'):
w.reset()
[docs]
@staticmethod
def from_source(source: Any, writers: List[Any], **kwargs: Any) -> "TeeIterator":
"""Converts source to TeeIterator
Args:
source (Any): what it reads from
writers (List[Any]): what it writes to
Raises:
TypeError: If the type of source is is not supported, raises error.
Returns:
TeeIterator: A Iterator that writes to w what it reads from source
"""
block_size = kwargs.get("block_size", 32 * 1024)
if isinstance(source, str):
return _TeeIteratorStr(source, writers, block_size)
if isinstance(source, bytes):
return _TeeIteratorBytes(source, writers, block_size)
# file-like object
if hasattr(source, 'seek') and hasattr(source, 'read'):
data_len = utils.guess_content_length(source)
if data_len is not None:
return _TeeIteratorIOLen(source, data_len, writers, block_size)
return _TeeIteratorIO(source, writers, block_size)
if isinstance(source, Iterable):
return _TeeIteratorIter(source, writers)
raise TypeError(
f'Invalid type for body. Expected str, bytes, file-like object, got {type(source)}')
class _TeeIteratorStr(TeeIterator):
"""Iterator str information
"""
def __init__(
self,
data: str,
writers: List[Any],
block_size: Optional[int] = None
) -> None:
self._data = data
self._writers = writers
self._block_size = block_size
self._offset = 0
self._total = 0
self._seekable = True
self._content = None
def __len__(self):
return len(self._data)
def iter_bytes(self):
"""iter bytes
"""
self._content = self._data.encode()
self._total = len(self._content)
self._offset = 0
return self
def next(self):
"""Next data
"""
if self._offset >= self._total:
raise StopIteration
remains = self._total - self._offset
remains = min(self._block_size, remains)
ret = self._content[self._offset: self._offset + remains]
self._offset += remains
return ret
class _TeeIteratorBytes(TeeIterator):
"""Iterator bytes information
"""
def __init__(
self,
data: bytes,
writers: List[Any],
block_size: Optional[int] = None
) -> None:
self._data = data
self._writers = writers
self._block_size = block_size
self._offset = 0
self._total = 0
self._seekable = True
self._content = None
def __len__(self):
return len(self._data)
def iter_bytes(self):
"""iter bytes
"""
self._content = self._data
self._total = len(self._content)
self._offset = 0
return self
def next(self):
"""Next data
"""
if self._offset >= self._total:
raise StopIteration
remains = self._total - self._offset
remains = min(self._block_size, remains)
ret = self._content[self._offset: self._offset + remains]
self._offset += remains
return ret
class _TeeIteratorIOLen(TeeIterator):
"""Iterator io len information
"""
def __init__(
self,
data: IO,
total: int,
writers: List[Any],
block_size: Optional[int] = None
) -> None:
self._data = data
self._total = total
self._writers = writers
self._block_size = block_size
seekable = is_seekable_io(data)
self._start_offset = 0 if not seekable else data.seek(0, os.SEEK_CUR)
self._seekable = seekable
def __len__(self):
return self._total
def iter_bytes(self):
"""iter bytes
"""
if self._seekable:
self._data.seek(self._start_offset, os.SEEK_SET)
return self
def next(self):
"""Next data
"""
d = self._data.read(self._block_size)
if d:
return d
raise StopIteration
class _TeeIteratorIO(TeeIterator):
"""Iterator io information
"""
def __init__(
self,
data: IO,
writers: List[Any],
block_size: Optional[int] = None
) -> None:
self._data = data
self._writers = writers
self._block_size = block_size
seekable = is_seekable_io(data)
self._start_offset = 0 if not seekable else data.seek(0, os.SEEK_CUR)
self._total = utils.guess_content_length(data)
self._seekable = seekable
if self._total is not None:
setattr(self, '__len__', lambda x: x._total)
def iter_bytes(self):
"""iter bytes
"""
if self._seekable:
self._data.seek(self._start_offset, os.SEEK_SET)
return self
def next(self):
"""Next data
"""
d = self._data.read(self._block_size)
if d:
return d
raise StopIteration
class _TeeIteratorIter(TeeIterator):
"""Iterator iter information
"""
def __init__(
self,
data: Iterable[bytes],
writers: List[Any],
) -> None:
self._data = data
self._writers = writers
self._iter = None
self._seekable = not isinstance(self._data, Iterator)
self._check_type_done = False
self._cast_func = None
def iter_bytes(self):
"""iter bytes
"""
if isinstance(self._data, Iterator):
self._iter = self._data
else:
self._iter = iter(self._data)
return self
def next(self):
"""Next data
"""
return self._to_bytes(next(self._iter))
def _to_bytes(self, d) -> bytes:
if d is None:
return d
if not self._check_type_done:
self._check_type_done = True
if isinstance(d, str):
self._cast_func = lambda x: x.encode()
if self._cast_func:
return self._cast_func(d)
return d
[docs]
def is_seekable_io(fileobj):
"""is seekable io
"""
if hasattr(fileobj, 'seekable'):
return fileobj.seekable()
if hasattr(fileobj, 'seek') and hasattr(fileobj, 'tell'):
try:
fileobj.seek(0, os.SEEK_CUR)
return True
except OSError:
return False
return False
if sys.platform.startswith('win'):
[docs]
def rename_file(current_filename, new_filename):
try:
os.remove(new_filename)
except OSError as e:
if e.errno != errno.ENOENT:
raise
os.rename(current_filename, new_filename)
else:
rename_file = os.rename
[docs]
class ReadAtReader:
"""A IO that implements read_at
"""
def __init__(
self,
reader: IO[bytes],
) -> None:
self._reader = reader
self._readat_lock = threading.Lock()
@property
def mode(self) -> str:
"""mode
"""
return self._reader.mode
@property
def name(self) -> str:
"""name
"""
return f'{self._reader.name} with read_at'
[docs]
def close(self) -> None:
"""close
"""
self._reader.close()
@property
def closed(self) -> bool:
"""closed
"""
return self._reader.closed
[docs]
def fileno(self) -> int:
"""file no
"""
return self._reader.fileno()
[docs]
def flush(self) -> None:
"""flush
"""
self._reader.flush()
[docs]
def isatty(self) -> bool:
"""is atty
"""
return self._reader.isatty()
[docs]
def read(self, n: int = -1) -> AnyStr:
"""read
"""
return self._reader.read(n)
[docs]
def read_at(self, off: int, n: int = -1) -> AnyStr:
"""read at
"""
with self._readat_lock:
self._reader.seek(off)
return self._reader.read(n)
[docs]
def readable(self) -> bool:
"""read able
"""
return self._reader.readable()
[docs]
def readline(self, limit: int = -1) -> AnyStr:
"""read line
"""
return self._reader.readline(limit)
[docs]
def readlines(self, hint: int = -1) -> List[AnyStr]:
"""read lines
"""
return self._reader.readlines(hint)
[docs]
def seek(self, offset: int, whence: int = 0) -> int:
"""seek
"""
return self._reader.seek(offset, whence)
[docs]
def seekable(self) -> bool:
"""seek able
"""
return self._reader.seekable()
[docs]
def tell(self) -> int:
"""tell
"""
return self._reader.tell()
def __enter__(self) -> 'IO[AnyStr]':
self._reader.__enter__()
return self
def __exit__(self, type_, value, traceback) -> None:
self._reader.__exit__(type_, value, traceback)
[docs]
class SectionReader:
"""
A SectionReader that reads from r starting at offset off and stops with EOF after n bytes
"""
def __init__(
self,
reader: ReadAtReader,
off: int,
n: int,
) -> None:
self._reader = reader
if off <= sys.maxsize-n:
remaining = n + off
else:
remaining = sys.maxsize
self._base = off
self._off = off
self._limit = remaining
[docs]
def read(self, n: int = -1) -> AnyStr:
"""read
"""
if self._off >= self._limit:
return b''
max_size = self._limit - self._off
if n < 0 or n > max_size:
n = max_size
d = self._reader.read_at(self._off, n)
self._off += len(d)
return d
[docs]
def read_at(self, off: int, n: int = -1) -> AnyStr:
"""read at
"""
if off < 0 or off >= self._limit - self._base:
return b''
off += self._base
max_size = self._limit - off
if n < 0 or n > max_size:
n = max_size
return self._reader.read_at(self._off, n)
[docs]
def readable(self) -> bool:
"""read able
"""
return self._reader.readable()
[docs]
def seek(self, offset: int, whence: int = 0) -> int:
"""seek
"""
if whence == os.SEEK_SET:
offset += self._base
elif whence == os.SEEK_CUR:
offset += self._off
elif whence == os.SEEK_END:
offset += self._limit
else:
raise ValueError(f'invalid whence {whence}')
if offset < self._base:
raise OSError("seek() returned an invalid position")
self._off = offset
return offset - self._base
[docs]
def seekable(self) -> bool:
"""seek able
"""
return self._reader.seekable()
[docs]
def tell(self) -> int:
"""tell
"""
return self._off - self._base
def __len__(self):
return self._limit - self._base
[docs]
class StreamBodyReader(StreamBody):
"""
A StreamBodyReader that convert HttpResponse type to StreamBody type.
"""
def __init__(
self,
response: HttpResponse,
) -> None:
self._response = response
def __enter__(self) -> "StreamBodyReader":
self._response.__enter__()
return self
def __exit__(self, *args: Any) -> None:
self._response.__exit__(*args)
@property
def is_closed(self) -> bool:
return self._response.is_closed
@property
def is_stream_consumed(self) -> bool:
return self._response.is_stream_consumed
@property
def content(self) -> bytes:
if not self._response.is_stream_consumed:
self._response.read()
return self._response.content
[docs]
def read(self) -> bytes:
return self._response.read()
[docs]
def close(self) -> None:
self._response.close()
[docs]
def iter_bytes(self, **kwargs: Any) -> Iterator[bytes]:
return self._response.iter_bytes(**kwargs)
[docs]
class StreamBodyDiscarder(StreamBody):
"""stream body discarder information
"""
def __init__(
self,
stream: StreamBody,
discard: int
) -> None:
self._stream = stream
self._discard = discard
def __enter__(self) -> "StreamBodyDiscarder":
self._stream.__enter__()
return self
def __exit__(self, *args: Any) -> None:
self._stream.__exit__(*args)
@property
def is_closed(self) -> bool:
return self._stream.is_closed
@property
def is_stream_consumed(self) -> bool:
return self._stream.is_stream_consumed
@property
def content(self) -> bytes:
if not self._stream.is_stream_consumed:
self._stream.read()
return self._stream.content[self._discard:]
[docs]
def read(self) -> bytes:
data = self._stream.read()
return data[self._discard:]
[docs]
def close(self) -> None:
self._stream.close()
[docs]
def iter_bytes(self, **kwargs: Any) -> Iterator[bytes]:
discard = self._discard
for d in self._stream.iter_bytes(**kwargs):
if discard > 0:
if discard > len(d):
discard -= len(d)
else:
yield d[discard:]
discard = 0
else:
yield d
[docs]
class LimitReader:
"""limit reader information
"""
def __init__(
self,
reader: IO[bytes],
n: int,
) -> None:
self._reader = reader
self._n = n
[docs]
def read(self, n: int = -1) -> AnyStr:
"""read data
"""
if self._n <= 0:
return b''
if n < 0 or n > self._n:
n = self._n
d = self._reader.read(n)
self._n -= len(d)
return d