Source code for arangoasync.auth

__all__ = [
    "Auth",
    "JwtToken",
]

import time
from dataclasses import dataclass
from typing import Optional

import jwt


[docs] @dataclass class Auth: """Authentication details for the ArangoDB instance. Attributes: username (str): Username. password (str): Password. encoding (str): Encoding for the password (default: utf-8) """ username: str = "" password: str = "" encoding: str = "utf-8"
[docs] class JwtToken: """JWT token. Args: token (str): JWT token. Raises: TypeError: If the token type is not str or bytes. jwt.exceptions.ExpiredSignatureError: If the token expired. """ def __init__(self, token: str) -> None: self._token = token self._validate()
[docs] @staticmethod def generate_token( secret: str | bytes, iat: Optional[int] = None, exp: int = 3600, iss: str = "arangodb", server_id: str = "client", ) -> "JwtToken": """Generate and return a JWT token. Args: secret (str | bytes): JWT secret. iat (int): Time the token was issued in seconds. Defaults to current time. exp (int): Time to expire in seconds. iss (str): Issuer. server_id (str): Server ID. Returns: str: JWT token. """ iat = iat or int(time.time()) token = jwt.encode( payload={ "iat": iat, "exp": iat + exp, "iss": iss, "server_id": server_id, }, key=secret, ) return JwtToken(token)
@property def token(self) -> str: """Get token.""" return self._token @token.setter def token(self, token: str) -> None: """Set token. Raises: jwt.exceptions.ExpiredSignatureError: If the token expired. """ self._token = token self._validate()
[docs] def needs_refresh(self, leeway: int = 0) -> bool: """Check if the token needs to be refreshed. Args: leeway (int): Leeway in seconds, before official expiration, when to consider the token expired. Returns: bool: True if the token needs to be refreshed, False otherwise. """ refresh: bool = int(time.time()) > self._token_exp - leeway return refresh
def _validate(self) -> None: """Validate the token.""" if type(self._token) is not str: raise TypeError("Token must be str") jwt_payload = jwt.decode( self._token, issuer="arangodb", algorithms=["HS256"], options={ "require_exp": True, "require_iat": True, "verify_iat": True, "verify_exp": True, "verify_signature": False, }, # type: ignore[arg-type] ) self._token_exp = jwt_payload["exp"]