Source code for alibabacloud_oss_v2.transport.requests_client

"""HttpClient implement based on requests
"""
from typing import Optional, MutableMapping, Iterator, cast
from urllib3.util.retry import Retry
from urllib3.exceptions import NewConnectionError, ConnectTimeoutError
import requests
import requests.adapters
from ..types import HttpRequest, HttpResponse, HttpClient
from .. import exceptions
from .. import defaults


class _RequestsHttpResponseImpl(HttpResponse):
    """Implementation class for HttpRespone from requests's response
    """

    def __init__(self, **kwargs) -> None:
        super().__init__()
        self._request = kwargs.pop("request")
        self._block_size = kwargs.pop("block_size")
        self._internal_response = cast(requests.Response, kwargs.pop("internal_response"))
        self._is_closed = False
        self._is_stream_consumed = False
        self._content = None

    @property
    def request(self) -> HttpRequest:
        return self._request

    @property
    def is_closed(self) -> bool:
        return self._is_closed

    @property
    def is_stream_consumed(self) -> bool:
        return self._is_stream_consumed

    @property
    def status_code(self) -> int:
        return self._internal_response.status_code

    @property
    def headers(self) -> MutableMapping[str, str]:
        return self._internal_response.headers

    @property
    def reason(self) -> str:
        return self._internal_response.reason

    @property
    def content(self) -> bytes:
        if self._content is None:
            raise exceptions.ResponseNotReadError()
        return self._content

    def __repr__(self) -> str:
        content_type = self.headers.get("Content-Type", "")
        return f"<HttpResponse: {self.status_code} {self.reason}, Content-Type: {content_type}>"

    def __enter__(self) -> "_RequestsHttpResponseImpl":
        return self

    def __exit__(self, *args) -> None:
        self.close()

    def close(self) -> None:
        if not self.is_closed:
            self._is_closed = True
            self._internal_response.close()

    def read(self) -> bytes:
        if self._content is None:
            self._content = self._internal_response.content
        self._set_read_checks()
        return self._content

    def iter_bytes(self, **kwargs) -> Iterator[bytes]:
        """Iterates over the response's bytes.

        Args:
            block_size (int, optional): The number of bytes it should read into memory.

        Returns:
            Iterator[bytes]: An iterator of bytes from the response
        """
        block_size = kwargs.pop("block_size", self._block_size)

        return self._internal_response.iter_content(block_size)

    def _set_read_checks(self):
        self._is_stream_consumed = True
        self.close()

[docs] class RequestsHttpClient(HttpClient): """Implements a basic requests HTTP sender. In this implementation: - You provide the configured session if you want to, or a basic session is created. - All kwargs received by "do" are sent to session.request directly """ _protocols = ["http://", "https://"] def __init__(self, **kwargs) -> None: """ Args: session (requests.Session, optional): Request session to use instead of the default one. adapters (requests.adapters, optional): Request adapters to use instead of the default one. """ self.session_owner = False self.session = kwargs.get("session", None) self.adapter = kwargs.get("adapter", None) # client's configuration self._connect_timeout = kwargs.get("connect_timeout", defaults.DEFAULT_CONNECT_TIMEOUT) self._read_timeout = kwargs.get("readwrite_timeout", defaults.DEFAULT_READWRITE_TIMEOUT) self._max_connections = kwargs.get("max_connections", defaults.DEFAULT_MAX_CONNECTIONS) self._verify = True if kwargs.get("insecure_skip_verify") is True: self._verify = False self._allow_redirects = kwargs.get("enabled_redirect", False) self._proxies = kwargs.get("proxy_host", None) self._block_size = kwargs.get("block_size", defaults.DEFAULT_BLOCK_SIZE) def __enter__(self): self.open() return self def __exit__(self, *args): self.close() def _init_session(self, session: requests.Session) -> None: """Init session level configuration of requests. This is initialization I want to do once only on a session. """ if self.adapter is None: disable_retries = Retry(total=False, redirect=False, raise_on_status=False) self.adapter = requests.adapters.HTTPAdapter(max_retries=disable_retries, pool_maxsize=self._max_connections, pool_connections=self._max_connections) self.adapter = cast(requests.adapters.HTTPAdapter, self.adapter) for p in self._protocols: session.mount(p, self.adapter)
[docs] def open(self): if not self.session: self.session = requests.Session() self.session_owner = True self._init_session(self.session) self.session = cast(requests.Session, self.session)
[docs] def close(self): if self.session_owner and self.session is not None: self.session.close() self.session_owner = False self.session = None
[docs] def send(self, request: HttpRequest, **kwargs) -> HttpResponse: self.open() error: Optional[Exception] = None resp: _RequestsHttpResponseImpl = None try: # api's configuration connect_timeout = kwargs.pop("connect_timeout", self._connect_timeout) read_timeout = kwargs.pop("readwrite_timeout", self._read_timeout) stream = kwargs.pop("stream", False) # tell requests not to add 'Accept-Encoding: gzip, deflate' by default if 'accept-encoding' not in request.headers: request.headers.update({'Accept-Encoding': None}) response = self.session.request( request.method, request.url, headers=request.headers, data=request.body, verify=self._verify, timeout=(connect_timeout, read_timeout), allow_redirects=self._allow_redirects, proxies=self._proxies, stream=stream, **kwargs ) resp = _RequestsHttpResponseImpl( request=request, internal_response=response, block_size=self._block_size ) if not stream: _ = resp.read() resp.close() except (NewConnectionError, ConnectTimeoutError) as err: error = exceptions.RequestError(error=err) except requests.exceptions.ConnectionError as err: error = exceptions.RequestError(error=err) except requests.exceptions.ReadTimeout as err: error = exceptions.ResponseError(error=err) except requests.RequestException as err: error = exceptions.RequestError(error=err) if error: raise error return resp