Source code for arangoasync.http

__all__ = [
    "HTTPClient",
    "AioHTTPClient",
    "DefaultHTTPClient",
]

from abc import ABC, abstractmethod
from ssl import SSLContext, create_default_context
from typing import Any, Optional

from aiohttp import (
    BaseConnector,
    BasicAuth,
    ClientSession,
    ClientTimeout,
    TCPConnector,
    client_exceptions,
)

from arangoasync.exceptions import ClientConnectionError
from arangoasync.request import Request
from arangoasync.response import Response


[docs] class HTTPClient(ABC): # pragma: no cover """Abstract base class for HTTP clients. Custom HTTP clients should inherit from this class. Example: .. code-block:: python class MyCustomHTTPClient(HTTPClient): def create_session(self, host): pass async def close_session(self, session): pass async def send_request(self, session, request): pass """
[docs] @abstractmethod def create_session(self, host: str) -> Any: """Return a new session given the base host URL. Note: This method must be overridden by the user. Args: host (str): ArangoDB host URL. Returns: Requests session object. """ raise NotImplementedError
[docs] @abstractmethod async def close_session(self, session: Any) -> None: """Close the session. Note: This method must be overridden by the user. Args: session (Any): Client session object. """ raise NotImplementedError
[docs] @abstractmethod async def send_request( self, session: Any, request: Request, ) -> Response: """Send an HTTP request. Note: This method must be overridden by the user. Args: session (Any): Client session object. request (Request): HTTP request. Returns: Response: HTTP response. """ raise NotImplementedError
[docs] class AioHTTPClient(HTTPClient): """HTTP client implemented on top of aiohttp_. Args: connector (aiohttp.BaseConnector | None): Supports connection pooling. By default, 100 simultaneous connections are supported, with a 60-second timeout for connection reusing after release. timeout (aiohttp.ClientTimeout | None): Client timeout settings. 300s total timeout by default for a complete request/response operation. read_bufsize (int): Size of read buffer (64KB default). ssl_context (ssl.SSLContext | bool): SSL validation mode. `True` for default SSL checks (see :func:`ssl.create_default_context`). `False` disables SSL checks. Additionally, you can pass a custom :class:`ssl.SSLContext`. .. _aiohttp: https://docs.aiohttp.org/en/stable/ """ def __init__( self, connector: Optional[BaseConnector] = None, timeout: Optional[ClientTimeout] = None, read_bufsize: int = 2**16, ssl_context: bool | SSLContext = True, ) -> None: self._connector = connector or TCPConnector( keepalive_timeout=60, # timeout for connection reusing after releasing limit=100, # total number simultaneous connections ) self._timeout = timeout or ClientTimeout( total=300, # total number of seconds for the whole request connect=60, # max number of seconds for acquiring a pool connection ) self._read_bufsize = read_bufsize self._ssl_context = ( ssl_context if ssl_context is not True else create_default_context() )
[docs] def create_session(self, host: str) -> ClientSession: """Return a new session given the base host URL. Args: host (str): ArangoDB host URL. Must not include any paths. Typically, this is the address and port of a coordinator (e.g. "http://127.0.0.1:8529"). Returns: aiohttp.ClientSession: Session object, used to send future requests. """ return ClientSession( base_url=host, connector=self._connector, timeout=self._timeout, read_bufsize=self._read_bufsize, )
[docs] async def close_session(self, session: ClientSession) -> None: """Close the session. Args: session (Any): Client session object. """ await session.close()
[docs] async def send_request( self, session: ClientSession, request: Request, ) -> Response: """Send an HTTP request. Args: session (aiohttp.ClientSession): Session object used to make the request. request (Request): HTTP request. Returns: Response: HTTP response. Raises: ClientConnectionError: If the request fails. """ if request.auth is not None: auth = BasicAuth( login=request.auth.username, password=request.auth.password, encoding=request.auth.encoding, ) else: auth = None try: async with session.request( request.method.name, request.endpoint, headers=request.normalized_headers(), params=request.normalized_params(), data=request.data, auth=auth, ssl=self._ssl_context, ) as response: raw_body = await response.read() return Response( method=request.method, url=str(response.real_url), headers=response.headers, status_code=response.status, status_text=str(response.reason), raw_body=raw_body, ) except client_exceptions.ClientConnectionError as e: raise ClientConnectionError(str(e)) from e
DefaultHTTPClient = AioHTTPClient