__all__ = ["AQL", "AQLQueryCache"]
from typing import Optional, cast
from arangoasync.cursor import Cursor
from arangoasync.errno import HTTP_NOT_FOUND
from arangoasync.exceptions import (
AQLCacheClearError,
AQLCacheConfigureError,
AQLCacheEntriesError,
AQLCachePropertiesError,
AQLFunctionCreateError,
AQLFunctionDeleteError,
AQLFunctionListError,
AQLQueryClearError,
AQLQueryExecuteError,
AQLQueryExplainError,
AQLQueryHistoryError,
AQLQueryKillError,
AQLQueryListError,
AQLQueryRulesGetError,
AQLQueryTrackingGetError,
AQLQueryTrackingSetError,
AQLQueryValidateError,
)
from arangoasync.executor import ApiExecutor, DefaultApiExecutor, NonAsyncExecutor
from arangoasync.request import Method, Request
from arangoasync.response import Response
from arangoasync.result import Result
from arangoasync.serialization import Deserializer, Serializer
from arangoasync.typings import (
Json,
Jsons,
QueryCacheProperties,
QueryExplainOptions,
QueryProperties,
QueryTrackingConfiguration,
)
[docs]
class AQLQueryCache:
"""AQL Query Cache API wrapper.
Args:
executor: API executor. Required to execute the API requests.
"""
def __init__(self, executor: ApiExecutor) -> None:
self._executor = executor
@property
def name(self) -> str:
"""Return the name of the current database."""
return self._executor.db_name
@property
def serializer(self) -> Serializer[Json]:
"""Return the serializer."""
return self._executor.serializer
@property
def deserializer(self) -> Deserializer[Json, Jsons]:
"""Return the deserializer."""
return self._executor.deserializer
def __repr__(self) -> str:
return f"<AQLQueryCache in {self.name}>"
[docs]
async def entries(self) -> Result[Jsons]:
"""Return a list of all AQL query results cache entries.
Returns:
list: List of AQL query results cache entries.
Raises:
AQLCacheEntriesError: If retrieval fails.
References:
- `list-the-entries-of-the-aql-query-results-cache <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-query-results-cache/#list-the-entries-of-the-aql-query-results-cache>`__
""" # noqa: E501
request = Request(method=Method.GET, endpoint="/_api/query-cache/entries")
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise AQLCacheEntriesError(resp, request)
return self.deserializer.loads_many(resp.raw_body)
return await self._executor.execute(request, response_handler)
[docs]
async def plan_entries(self) -> Result[Jsons]:
"""Return a list of all AQL query plan cache entries.
Returns:
list: List of AQL query plan cache entries.
Raises:
AQLCacheEntriesError: If retrieval fails.
References:
- `list-the-entries-of-the-aql-query-plan-cache <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-query-plan-cache/#list-the-entries-of-the-aql-query-plan-cache>`__
""" # noqa: E501
request = Request(method=Method.GET, endpoint="/_api/query-plan-cache")
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise AQLCacheEntriesError(resp, request)
return self.deserializer.loads_many(resp.raw_body)
return await self._executor.execute(request, response_handler)
[docs]
async def clear(self) -> Result[None]:
"""Clear the AQL query results cache.
Raises:
AQLCacheClearError: If clearing the cache fails.
References:
- `clear-the-aql-query-results-cache <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-query-results-cache/#clear-the-aql-query-results-cache>`__
""" # noqa: E501
request = Request(method=Method.DELETE, endpoint="/_api/query-cache")
def response_handler(resp: Response) -> None:
if not resp.is_success:
raise AQLCacheClearError(resp, request)
return await self._executor.execute(request, response_handler)
[docs]
async def clear_plan(self) -> Result[None]:
"""Clear the AQL query plan cache.
Raises:
AQLCacheClearError: If clearing the cache fails.
References:
- `clear-the-aql-query-plan-cache <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-query-plan-cache/#clear-the-aql-query-plan-cache>`__
""" # noqa: E501
request = Request(method=Method.DELETE, endpoint="/_api/query-plan-cache")
def response_handler(resp: Response) -> None:
if not resp.is_success:
raise AQLCacheClearError(resp, request)
return await self._executor.execute(request, response_handler)
[docs]
async def properties(self) -> Result[QueryCacheProperties]:
"""Return the current AQL query results cache configuration.
Returns:
QueryCacheProperties: Current AQL query cache properties.
Raises:
AQLCachePropertiesError: If retrieval fails.
References:
- `get-the-aql-query-results-cache-configuration <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-query-results-cache/#get-the-aql-query-results-cache-configuration>`__
""" # noqa: E501
request = Request(method=Method.GET, endpoint="/_api/query-cache/properties")
def response_handler(resp: Response) -> QueryCacheProperties:
if not resp.is_success:
raise AQLCachePropertiesError(resp, request)
return QueryCacheProperties(self.deserializer.loads(resp.raw_body))
return await self._executor.execute(request, response_handler)
[docs]
class AQL:
"""AQL (ArangoDB Query Language) API wrapper.
Allows you to execute, track, kill, explain, and validate queries written
in ArangoDB’s query language.
Args:
executor: API executor. Required to execute the API requests.
"""
def __init__(self, executor: ApiExecutor) -> None:
self._executor = executor
@property
def name(self) -> str:
"""Return the name of the current database."""
return self._executor.db_name
@property
def context(self) -> str:
"""Return the current API execution context."""
return self._executor.context
@property
def serializer(self) -> Serializer[Json]:
"""Return the serializer."""
return self._executor.serializer
@property
def deserializer(self) -> Deserializer[Json, Jsons]:
"""Return the deserializer."""
return self._executor.deserializer
@property
def cache(self) -> AQLQueryCache:
"""Return the AQL Query Cache API wrapper."""
return AQLQueryCache(self._executor)
def __repr__(self) -> str:
return f"<AQL in {self.name}>"
[docs]
async def execute(
self,
query: str,
count: Optional[bool] = None,
batch_size: Optional[int] = None,
bind_vars: Optional[Json] = None,
cache: Optional[bool] = None,
memory_limit: Optional[int] = None,
ttl: Optional[int] = None,
allow_dirty_read: Optional[bool] = None,
options: Optional[QueryProperties | Json] = None,
) -> Result[Cursor]:
"""Execute the query and return the result cursor.
Args:
query (str): Query string to be executed.
count (bool | None): If set to `True`, the total document count is
calculated and included in the result cursor.
batch_size (int | None): Maximum number of result documents to be
transferred from the server to the client in one roundtrip.
bind_vars (dict | None): An object with key/value pairs representing
the bind parameters.
cache (bool | None): Flag to determine whether the AQL query results
cache shall be used.
memory_limit (int | None): Maximum memory (in bytes) that the query is
allowed to use.
ttl (int | None): The time-to-live for the cursor (in seconds). The cursor
will be removed on the server automatically after the specified amount
of time.
allow_dirty_read (bool | None): Allow reads from followers in a cluster.
options (QueryProperties | dict | None): Extra options for the query.
Returns:
Cursor: Result cursor.
References:
- `create-a-cursor <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-queries/#create-a-cursor>`__
""" # noqa: E501
data: Json = dict(query=query)
if count is not None:
data["count"] = count
if batch_size is not None:
data["batchSize"] = batch_size
if bind_vars is not None:
data["bindVars"] = bind_vars
if cache is not None:
data["cache"] = cache
if memory_limit is not None:
data["memoryLimit"] = memory_limit
if ttl is not None:
data["ttl"] = ttl
if options is not None:
if isinstance(options, QueryProperties):
options = options.to_dict()
data["options"] = options
headers = dict()
if allow_dirty_read is not None:
headers["x-arango-allow-dirty-read"] = str(allow_dirty_read).lower()
request = Request(
method=Method.POST,
endpoint="/_api/cursor",
data=self.serializer.dumps(data),
headers=headers,
)
def response_handler(resp: Response) -> Cursor:
if not resp.is_success:
raise AQLQueryExecuteError(resp, request)
if self._executor.context == "async":
# We cannot have a cursor giving back async jobs
executor: NonAsyncExecutor = DefaultApiExecutor(
self._executor.connection
)
else:
executor = cast(NonAsyncExecutor, self._executor)
return Cursor(executor, self.deserializer.loads(resp.raw_body))
return await self._executor.execute(request, response_handler)
[docs]
async def tracking(self) -> Result[QueryTrackingConfiguration]:
"""Returns the current query tracking configuration.
Returns:
QueryTrackingConfiguration: Returns the current query tracking configuration.
Raises:
AQLQueryTrackingGetError: If retrieval fails.
References:
- `get-the-aql-query-tracking-configuration <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-queries/#get-the-aql-query-tracking-configuration>`__
""" # noqa: E501
request = Request(method=Method.GET, endpoint="/_api/query/properties")
def response_handler(resp: Response) -> QueryTrackingConfiguration:
if not resp.is_success:
raise AQLQueryTrackingGetError(resp, request)
return QueryTrackingConfiguration(self.deserializer.loads(resp.raw_body))
return await self._executor.execute(request, response_handler)
[docs]
async def set_tracking(
self,
enabled: Optional[bool] = None,
max_slow_queries: Optional[int] = None,
slow_query_threshold: Optional[int] = None,
slow_streaming_query_threshold: Optional[int] = None,
max_query_string_length: Optional[int] = None,
track_bind_vars: Optional[bool] = None,
track_slow_queries: Optional[int] = None,
) -> Result[QueryTrackingConfiguration]:
"""Configure AQL query tracking properties.
Args:
enabled (bool | None): If set to `True`, then queries will be tracked.
If set to `False`, neither queries nor slow queries will be tracked.
max_slow_queries (int | None): Maximum number of slow queries to track. Oldest
entries are discarded first.
slow_query_threshold (int | None): Runtime threshold (in seconds) for treating a
query as slow.
slow_streaming_query_threshold (int | None): Runtime threshold (in seconds) for
treating a streaming query as slow.
max_query_string_length (int | None): The maximum query string length (in bytes)
to keep in the list of queries.
track_bind_vars (bool | None): If set to `True`, track bind variables used in
queries.
track_slow_queries (int | None): If set to `True`, then slow queries will be
tracked in the list of slow queries if their runtime exceeds the
value set in `slowQueryThreshold`.
Returns:
QueryTrackingConfiguration: Returns the updated query tracking configuration.
Raises:
AQLQueryTrackingSetError: If setting the configuration fails.
References:
- `update-the-aql-query-tracking-configuration <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-queries/#update-the-aql-query-tracking-configuration>`__
""" # noqa: E501
data: Json = dict()
if enabled is not None:
data["enabled"] = enabled
if max_slow_queries is not None:
data["maxSlowQueries"] = max_slow_queries
if max_query_string_length is not None:
data["maxQueryStringLength"] = max_query_string_length
if slow_query_threshold is not None:
data["slowQueryThreshold"] = slow_query_threshold
if slow_streaming_query_threshold is not None:
data["slowStreamingQueryThreshold"] = slow_streaming_query_threshold
if track_bind_vars is not None:
data["trackBindVars"] = track_bind_vars
if track_slow_queries is not None:
data["trackSlowQueries"] = track_slow_queries
request = Request(
method=Method.PUT,
endpoint="/_api/query/properties",
data=self.serializer.dumps(data),
)
def response_handler(resp: Response) -> QueryTrackingConfiguration:
if not resp.is_success:
raise AQLQueryTrackingSetError(resp, request)
return QueryTrackingConfiguration(self.deserializer.loads(resp.raw_body))
return await self._executor.execute(request, response_handler)
[docs]
async def history(self) -> Result[Json]:
"""Return recently executed AQL queries (admin only).
Returns:
dict: AQL query history.
Raises:
AQLQueryHistoryError: If retrieval fails.
"""
request = Request(method=Method.GET, endpoint="/_admin/server/aql-queries")
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise AQLQueryHistoryError(resp, request)
result: Json = self.deserializer.loads(resp.raw_body)
return cast(Json, result["result"])
return await self._executor.execute(request, response_handler)
[docs]
async def queries(self, all_queries: bool = False) -> Result[Jsons]:
"""Return a list of currently running queries.
Args:
all_queries (bool): If set to `True`, will return the currently
running queries in all databases, not just the selected one.
Using the parameter is only allowed in the `_system` database
and with superuser privileges.
Returns:
list: List of currently running queries and their properties.
Raises:
AQLQueryListError: If retrieval fails.
References:
- `list-the-running-queries <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-queries/#list-the-running-queries>`__
""" # noqa: E501
request = Request(
method=Method.GET,
endpoint="/_api/query/current",
params={"all": all_queries},
)
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise AQLQueryListError(resp, request)
return self.deserializer.loads_many(resp.raw_body)
return await self._executor.execute(request, response_handler)
[docs]
async def slow_queries(self, all_queries: bool = False) -> Result[Jsons]:
"""Returns a list containing the last AQL queries that are finished and
have exceeded the slow query threshold in the selected database.
Args:
all_queries (bool): If set to `True`, will return the slow queries
in all databases, not just the selected one. Using the parameter
is only allowed in the `_system` database and with superuser privileges.
Returns:
list: List of slow queries.
Raises:
AQLQueryListError: If retrieval fails.
References:
- `list-the-slow-aql-queries <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-queries/#list-the-slow-aql-queries>`__
""" # noqa: E501
request = Request(
method=Method.GET,
endpoint="/_api/query/slow",
params={"all": all_queries},
)
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise AQLQueryListError(resp, request)
return self.deserializer.loads_many(resp.raw_body)
return await self._executor.execute(request, response_handler)
[docs]
async def clear_slow_queries(self, all_queries: bool = False) -> Result[None]:
"""Clears the list of slow queries.
Args:
all_queries (bool): If set to `True`, will clear the slow queries
in all databases, not just the selected one. Using the parameter
is only allowed in the `_system` database and with superuser privileges.
Returns:
dict: Empty dictionary.
Raises:
AQLQueryClearError: If retrieval fails.
References:
- `clear-the-list-of-slow-aql-queries <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-queries/#clear-the-list-of-slow-aql-queries>`__
""" # noqa: E501
request = Request(
method=Method.DELETE,
endpoint="/_api/query/slow",
params={"all": all_queries},
)
def response_handler(resp: Response) -> None:
if not resp.is_success:
raise AQLQueryClearError(resp, request)
return await self._executor.execute(request, response_handler)
[docs]
async def kill(
self,
query_id: str,
ignore_missing: bool = False,
all_queries: bool = False,
) -> Result[bool]:
"""Kill a running query.
Args:
query_id (str): Thea ID of the query to kill.
ignore_missing (bool): If set to `True`, will not raise an exception
if the query is not found.
all_queries (bool): If set to `True`, will kill the query in all databases,
not just the selected one. Using the parameter is only allowed in the
`_system` database and with superuser privileges.
Returns:
bool: `True` if the query was killed successfully.
Raises:
AQLQueryKillError: If killing the query fails.
References:
- `kill-a-running-aql-query <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-queries/#kill-a-running-aql-query>`__
""" # noqa: E501
request = Request(
method=Method.DELETE,
endpoint=f"/_api/query/{query_id}",
params={"all": all_queries},
)
def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
if resp.status_code == HTTP_NOT_FOUND and ignore_missing:
return False
raise AQLQueryKillError(resp, request)
return await self._executor.execute(request, response_handler)
[docs]
async def explain(
self,
query: str,
bind_vars: Optional[Json] = None,
options: Optional[QueryExplainOptions | Json] = None,
) -> Result[Json]:
"""Inspect the query and return its metadata without executing it.
Args:
query (str): Query string to be explained.
bind_vars (dict | None): An object with key/value pairs representing
the bind parameters.
options (QueryExplainOptions | dict | None): Extra options for the query.
Returns:
dict: Query execution plan.
Raises:
AQLQueryExplainError: If retrieval fails.
References:
- `explain-an-aql-query <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-queries/#explain-an-aql-query>`__
""" # noqa: E501
data: Json = dict(query=query)
if bind_vars is not None:
data["bindVars"] = bind_vars
if options is not None:
if isinstance(options, QueryExplainOptions):
options = options.to_dict()
data["options"] = options
request = Request(
method=Method.POST,
endpoint="/_api/explain",
data=self.serializer.dumps(data),
)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise AQLQueryExplainError(resp, request)
return Response.format_body(self.deserializer.loads(resp.raw_body))
return await self._executor.execute(request, response_handler)
[docs]
async def validate(self, query: str) -> Result[Json]:
"""Parse and validate the query without executing it.
Args:
query (str): Query string to be validated.
Returns:
dict: Query information.
Raises:
AQLQueryValidateError: If validation fails.
References:
- `parse-an-aql-query <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-queries/#parse-an-aql-query>`__
""" # noqa: E501
request = Request(
method=Method.POST,
endpoint="/_api/query",
data=self.serializer.dumps(dict(query=query)),
)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise AQLQueryValidateError(resp, request)
return Response.format_body(self.deserializer.loads(resp.raw_body))
return await self._executor.execute(request, response_handler)
[docs]
async def query_rules(self) -> Result[Jsons]:
"""A list of all optimizer rules and their properties.
Returns:
list: Available optimizer rules.
Raises:
AQLQueryRulesGetError: If retrieval fails.
References:
- `list-all-aql-optimizer-rules <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/aql-queries/#list-all-aql-optimizer-rules>`__
""" # noqa: E501
request = Request(method=Method.GET, endpoint="/_api/query/rules")
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise AQLQueryRulesGetError(resp, request)
return self.deserializer.loads_many(resp.raw_body)
return await self._executor.execute(request, response_handler)
[docs]
async def functions(self, namespace: Optional[str] = None) -> Result[Jsons]:
"""List the registered used-defined AQL functions.
Warning:
AQL User Functions are no longer available in ArangoDB 4.0.
Args:
namespace (str | None): Returns all registered AQL user functions from
the specified namespace.
Returns:
list: List of the AQL functions defined in the database.
Raises:
AQLFunctionListError: If retrieval fails.
References:
- `list-the-registered-user-defined-aql-functions <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/user-defined-aql-functions/#list-the-registered-user-defined-aql-functions>`__
""" # noqa: E501
params: Json = dict()
if namespace is not None:
params["namespace"] = namespace
request = Request(
method=Method.GET,
endpoint="/_api/aqlfunction",
params=params,
)
def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise AQLFunctionListError(resp, request)
result = cast(Jsons, self.deserializer.loads(resp.raw_body).get("result"))
if result is None:
raise AQLFunctionListError(resp, request)
return result
return await self._executor.execute(request, response_handler)
[docs]
async def create_function(
self,
name: str,
code: str,
is_deterministic: Optional[bool] = None,
) -> Result[Json]:
"""Registers a user-defined AQL function (UDF) written in JavaScript.
Warning:
AQL User Functions are no longer available in ArangoDB 4.0.
Args:
name (str): Name of the function.
code (str): JavaScript code of the function.
is_deterministic (bool | None): If set to `True`, the function is
deterministic.
Returns:
dict: Information about the registered function.
Raises:
AQLFunctionCreateError: If registration fails.
References:
- `create-a-user-defined-aql-function <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/user-defined-aql-functions/#create-a-user-defined-aql-function>`__
""" # noqa: E501
request = Request(
method=Method.POST,
endpoint="/_api/aqlfunction",
data=self.serializer.dumps(
dict(name=name, code=code, isDeterministic=is_deterministic)
),
)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise AQLFunctionCreateError(resp, request)
return Response.format_body(self.deserializer.loads(resp.raw_body))
return await self._executor.execute(request, response_handler)
[docs]
async def delete_function(
self,
name: str,
group: Optional[bool] = None,
ignore_missing: bool = False,
) -> Result[Json]:
"""Remove a user-defined AQL function.
Warning:
AQL User Functions are no longer available in ArangoDB 4.0.
Args:
name (str): Name of the function.
group (bool | None): If set to `True`, the function name is treated
as a namespace prefix.
ignore_missing (bool): If set to `True`, will not raise an exception
if the function is not found.
Returns:
dict: Information about the removed functions (their count).
Raises:
AQLFunctionDeleteError: If removal fails.
References:
- `remove-a-user-defined-aql-function <https://docs.arango.ai/arangodb/stable/develop/http-api/queries/user-defined-aql-functions/#remove-a-user-defined-aql-function>`__
""" # noqa: E501
params: Json = dict()
if group is not None:
params["group"] = group
request = Request(
method=Method.DELETE,
endpoint=f"/_api/aqlfunction/{name}",
params=params,
)
def response_handler(resp: Response) -> Json:
if not resp.is_success:
if not (resp.status_code == HTTP_NOT_FOUND and ignore_missing):
raise AQLFunctionDeleteError(resp, request)
return Response.format_body(self.deserializer.loads(resp.raw_body))
return await self._executor.execute(request, response_handler)