Source code for arangoasync.client

__all__ = ["ArangoClient"]

import asyncio
from typing import Any, Optional, Sequence

from arangoasync.auth import Auth, JwtToken
from arangoasync.compression import CompressionManager
from arangoasync.connection import (
    BasicConnection,
    Connection,
    JwtConnection,
    JwtSuperuserConnection,
)
from arangoasync.database import StandardDatabase
from arangoasync.http import DefaultHTTPClient, HTTPClient
from arangoasync.resolver import HostResolver, get_resolver
from arangoasync.serialization import (
    DefaultDeserializer,
    DefaultSerializer,
    Deserializer,
    Serializer,
)
from arangoasync.typings import Json, Jsons
from arangoasync.version import __version__


[docs] class ArangoClient: """ArangoDB client. Args: hosts (str | Sequence[str]): Host URL or list of URL's. In case of a cluster, this would be the list of coordinators. Which coordinator to use is determined by the `host_resolver`. host_resolver (str | HostResolver): Host resolver strategy. This determines how the client will choose which server to use. Passing a string would configure a resolver with the default settings. See :class:`DefaultHostResolver <arangoasync.resolver.DefaultHostResolver>` and :func:`get_resolver <arangoasync.resolver.get_resolver>` for more information. If you need more customization, pass a subclass of :class:`HostResolver <arangoasync.resolver.HostResolver>`. http_client (HTTPClient | None): HTTP client implementation. This is the core component that sends requests to the ArangoDB server. Defaults to :class:`DefaultHttpClient <arangoasync.http.DefaultHTTPClient>`, but you can fully customize its parameters or even use a different implementation by subclassing :class:`HTTPClient <arangoasync.http.HTTPClient>`. compression (CompressionManager | None): Disabled by default. Used to compress requests to the server or instruct the server to compress responses. Enable it by passing an instance of :class:`DefaultCompressionManager <arangoasync.compression.DefaultCompressionManager>` or a custom subclass of :class:`CompressionManager <arangoasync.compression.CompressionManager>`. serializer (Serializer | None): Custom JSON serializer implementation. Leave as `None` to use the default serializer. See :class:`DefaultSerializer <arangoasync.serialization.DefaultSerializer>`. For custom serialization of collection documents, see :class:`Collection <arangoasync.collection.Collection>`. deserializer (Deserializer | None): Custom JSON deserializer implementation. Leave as `None` to use the default deserializer. See :class:`DefaultDeserializer <arangoasync.serialization.DefaultDeserializer>`. For custom deserialization of collection documents, see :class:`Collection <arangoasync.collection.Collection>`. Raises: ValueError: If the `host_resolver` is not supported. """ def __init__( self, hosts: str | Sequence[str] = "http://127.0.0.1:8529", host_resolver: str | HostResolver = "default", http_client: Optional[HTTPClient] = None, compression: Optional[CompressionManager] = None, serializer: Optional[Serializer[Json]] = None, deserializer: Optional[Deserializer[Json, Jsons]] = None, ) -> None: self._hosts = [hosts] if isinstance(hosts, str) else hosts self._host_resolver = ( get_resolver(host_resolver, len(self._hosts)) if isinstance(host_resolver, str) else host_resolver ) self._http_client = http_client or DefaultHTTPClient() self._sessions = [ self._http_client.create_session(host) for host in self._hosts ] self._compression = compression self._serializer: Serializer[Json] = serializer or DefaultSerializer() self._deserializer: Deserializer[Json, Jsons] = ( deserializer or DefaultDeserializer() ) def __repr__(self) -> str: return f"<ArangoClient {','.join(self._hosts)}>" async def __aenter__(self) -> "ArangoClient": return self async def __aexit__(self, *exc: Any) -> None: await self.close() @property def hosts(self) -> Sequence[str]: """Return the list of hosts.""" return self._hosts @property def host_resolver(self) -> HostResolver: """Return the host resolver.""" return self._host_resolver @property def compression(self) -> Optional[CompressionManager]: """Return the compression manager.""" return self._compression @property def sessions(self) -> Sequence[Any]: """Return the list of sessions. You may use this to customize sessions on the fly (for example, adjust the timeout). Not recommended unless you know what you are doing. Warning: Modifying only a subset of sessions may lead to unexpected behavior. In order to keep the client in a consistent state, you should make sure all sessions are configured in the same way. """ return self._sessions @property def version(self) -> str: """Return the version of the client.""" return __version__
[docs] async def close(self) -> None: """Close HTTP sessions.""" await asyncio.gather( *(self._http_client.close_session(session) for session in self._sessions) )
[docs] async def db( self, name: str, auth_method: str = "basic", auth: Optional[Auth | str] = None, token: Optional[JwtToken] = None, verify: bool = False, compression: Optional[CompressionManager] = None, serializer: Optional[Serializer[Json]] = None, deserializer: Optional[Deserializer[Json, Jsons]] = None, ) -> StandardDatabase: """Connects to a database and returns and API wrapper. Args: name (str): Database name. auth_method (str): The following methods are supported: - "basic": HTTP authentication. Requires the `auth` parameter. The `token` parameter is ignored. - "jwt": User JWT authentication. At least one of the `auth` or `token` parameters are required. If `auth` is provided, but the `token` is not, the token will be refreshed automatically. This assumes that the clocks of the server and client are synchronized. - "superuser": Superuser JWT authentication. The `token` parameter is required. The `auth` parameter is ignored. auth (Auth | None): Login information (username and password) or access token. token (JwtToken | None): JWT token. verify (bool): Verify the connection by sending a test request. compression (CompressionManager | None): If set, supersedes the client-level compression settings. serializer (Serializer | None): If set, supersedes the client-level serializer. deserializer (Deserializer | None): If set, supersedes the client-level deserializer. Returns: StandardDatabase: Database API wrapper. Raises: ValueError: If the authentication is invalid. ServerConnectionError: If `verify` is `True` and the connection fails. """ connection: Connection if isinstance(auth, str): auth = Auth(password=auth) if auth_method == "basic": if auth is None: raise ValueError("Basic authentication requires the `auth` parameter") connection = BasicConnection( sessions=self._sessions, host_resolver=self._host_resolver, http_client=self._http_client, db_name=name, compression=compression or self._compression, serializer=serializer or self._serializer, deserializer=deserializer or self._deserializer, auth=auth, ) elif auth_method == "jwt": if auth is None and token is None: raise ValueError( "JWT authentication requires the `auth` or `token` parameter" ) connection = JwtConnection( sessions=self._sessions, host_resolver=self._host_resolver, http_client=self._http_client, db_name=name, compression=compression or self._compression, serializer=serializer or self._serializer, deserializer=deserializer or self._deserializer, auth=auth, token=token, ) elif auth_method == "superuser": if token is None: raise ValueError( "Superuser JWT authentication requires the `token` parameter" ) connection = JwtSuperuserConnection( sessions=self._sessions, host_resolver=self._host_resolver, http_client=self._http_client, db_name=name, compression=compression or self._compression, serializer=serializer or self._serializer, deserializer=deserializer or self._deserializer, token=token, ) else: raise ValueError(f"Invalid authentication method: {auth_method}") if verify: await connection.ping() return StandardDatabase(connection)