Source code for alibabacloud_oss_v2.credentials.provider_impl

import os
from typing import Optional, Callable
from ..types import Credentials, CredentialsProvider
from ..exceptions import CredentialsEmptyError


[docs] class AnonymousCredentialsProvider(CredentialsProvider): """Access OSS anonymously. """ def __init__(self) -> None: self._credentials = Credentials("", "")
[docs] def get_credentials(self) -> Credentials: return self._credentials
[docs] class StaticCredentialsProvider(CredentialsProvider): """Explicitly specify the AccessKey pair that you want to use to access OSS. """ def __init__( self, access_key_id: str, access_key_secret: str, security_token: Optional[str] = None, ) -> None: """ Args: access_key_id (str): access key id to access OSS. access_key_secret (str): access key secret to access OSS. security_token (Optional[str], optional): The sts session token. """ self._credentials = Credentials( access_key_id, access_key_secret, security_token)
[docs] def get_credentials(self) -> Credentials: return self._credentials
[docs] class EnvironmentVariableCredentialsProvider(CredentialsProvider): """Obtaining credentials from environment variables. OSS_ACCESS_KEY_ID OSS_ACCESS_KEY_SECRET OSS_SESSION_TOKEN (Optional) """ def __init__(self) -> None: access_key_id = os.getenv("OSS_ACCESS_KEY_ID", '') access_key_secret = os.getenv("OSS_ACCESS_KEY_SECRET", '') if access_key_id == '' or access_key_secret == '': raise CredentialsEmptyError() self._credentials = Credentials( access_key_id, access_key_secret, os.getenv("OSS_SESSION_TOKEN", None))
[docs] def get_credentials(self) -> Credentials: return self._credentials
[docs] class CredentialsProviderFunc(CredentialsProvider): """Provides a helper wrapping a function value to satisfy the CredentialsProvider interface. """ def __init__( self, func: Callable, ) -> None: self._func = func
[docs] def get_credentials(self) -> Credentials: return self._func()