Source code for redis_func_cache.policies.base

"""Base class for single-pair cache."""

from __future__ import annotations

import hashlib
import sys
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Optional

if sys.version_info < (3, 12):  # pragma: no cover
    from typing_extensions import override
else:  # pragma: no cover
    from typing import override

if TYPE_CHECKING:  # pragma: no cover
    pass

from ..typing import is_redis_async_client, is_redis_sync_client
from ..utils import b64digest, get_callable_bytecode
from .abstract import AbstractPolicy

if TYPE_CHECKING:  # pragma: no cover
    pass

__all__ = ("BaseSinglePolicy", "BaseClusterSinglePolicy", "BaseMultiplePolicy", "BaseClusterMultiplePolicy")


[docs] class BaseSinglePolicy(AbstractPolicy): """ Base policy for a single sorted-set or hash-map key pair. .. inheritance-diagram:: BaseSinglePolicy :parts: 1 All decorated functions using this policy share the same Redis key pair. Not intended for direct use. """ __key__: str @override def __init__(self) -> None: super().__init__() self._keys: Optional[tuple[str, str]] = None
[docs] @override def calc_keys( self, f: Optional[Callable] = None, args: Optional[tuple[Any, ...]] = None, kwds: Optional[dict[str, Any]] = None, ) -> tuple[str, str]: """ Return the static Redis key pair for this cache policy. Returns: Tuple of (sorted set key, hash map key). """ if self._keys is None: k = f"{self.cache.prefix}{self.cache.name}:{self.__key__}" self._keys = f"{k}:0", f"{k}:1" return self._keys
[docs] @override def purge(self) -> int: """ Delete the cache's Redis keys synchronously. Returns: Number of keys deleted. """ client = self.cache.get_client() if not is_redis_sync_client(client): raise RuntimeError("Can not perform a synchronous operation with an asynchronous redis client") return client.delete(*self.calc_keys())
[docs] @override async def apurge(self) -> int: """ Delete the cache's Redis keys asynchronously. Returns: Number of keys deleted. """ client = self.cache.get_client() if not is_redis_async_client(client): raise RuntimeError("Can not perform an asynchronous operation with a synchronous redis client") return await client.delete(*self.calc_keys()) # type: ignore[union-attr]
[docs] @override def get_size(self) -> int: """ Get the number of items in the cache synchronously. Returns: Number of items in the cache. """ client = self.cache.get_client() if not is_redis_sync_client(client): raise RuntimeError("Can not perform a synchronous operation with an asynchronous redis client") return client.hlen(self.calc_keys()[1])
[docs] @override async def aget_size(self) -> int: """ Get the number of items in the cache asynchronously. Returns: Number of items in the cache. """ client = self.cache.get_client() if not is_redis_async_client(client): raise RuntimeError("Can not perform an asynchronous operation with a synchronous redis client") keys = self.calc_keys() return await client.hlen(keys[1]) # type: ignore[union-attr, return-value]
[docs] class BaseClusterSinglePolicy(BaseSinglePolicy): """ Base policy for a single sorted-set or hash-map key pair with Redis cluster support. .. inheritance-diagram:: BaseClusterSinglePolicy :parts: 1 All decorated functions using this policy share the same Redis key pair. Not intended for direct use. """
[docs] @override def calc_keys( self, f: Optional[Callable] = None, args: Optional[tuple[Any, ...]] = None, kwds: Optional[dict[str, Any]] = None, ) -> tuple[str, str]: """ Return the static Redis key pair for this cache policy, using cluster hash tags. Returns: Tuple of (sorted set key, hash map key). """ if self._keys is None: k = f"{self.cache.prefix}{{{self.cache.name}:{self.__key__}}}" self._keys = f"{k}:0", f"{k}:1" return self._keys
[docs] class BaseMultiplePolicy(AbstractPolicy): """ Base policy for multiple sorted-set or hash-map key pairs. .. inheritance-diagram:: BaseMultiplePolicy :parts: 1 Each decorated function using this policy has its own Redis key pair. Not intended for direct use. """
[docs] @override def calc_keys( self, f: Optional[Callable] = None, args: Optional[tuple[Any, ...]] = None, kwds: Optional[dict[str, Any]] = None, ) -> tuple[str, str]: """ Calculate a unique Redis key pair for the given function. Args: f: The decorated function. Returns: Tuple of (sorted set key, hash map key). """ if not callable(f): raise TypeError("Can not calculate hash for a non-callable object") fullname = f"{f.__module__}:{f.__qualname__}" h = hashlib.md5(fullname.encode()) h.update(get_callable_bytecode(f)) checksum = b64digest(h).decode() k = f"{self.cache.prefix}{self.cache.name}:{self.__key__}:{fullname}#{checksum}" return f"{k}:0", f"{k}:1"
[docs] @override def purge(self) -> int: """ Delete all Redis keys for this policy synchronously. Returns: Number of keys deleted. """ client = self.cache.get_client() if not is_redis_sync_client(client): raise RuntimeError("Can not perform a synchronous operation with an asynchronous redis client") pat = f"{self.cache.prefix}{self.cache.name}:{self.__key__}:*" if keys := client.keys(pat): return client.delete(*keys) return 0
[docs] @override async def apurge(self) -> int: """ Delete all Redis keys for this policy asynchronously. Returns: Number of keys deleted. """ client = self.cache.get_client() if not is_redis_async_client(client): raise RuntimeError("Can not perform an asynchronous operation with a synchronous redis client") pat = f"{self.cache.prefix}{self.cache.name}:{self.__key__}:*" if keys := await client.keys(pat): # type: ignore[union-attr] return await client.delete(*keys) # type: ignore[union-attr] return 0
[docs] class BaseClusterMultiplePolicy(BaseMultiplePolicy): """ Base policy for multiple sorted-set or hash-map key pairs with Redis cluster support. .. inheritance-diagram:: BaseClusterMultiplePolicy :parts: 1 Each decorated function using this policy has its own Redis key pair. Not intended for direct use. """
[docs] @override def calc_keys( self, f: Optional[Callable] = None, args: Optional[tuple[Any, ...]] = None, kwds: Optional[dict[str, Any]] = None, ) -> tuple[str, str]: """ Calculate a unique Redis key pair for the given function, using cluster hash tags. Args: f: The decorated function. Returns: Tuple of (sorted set key, hash map key). """ if not callable(f): raise TypeError("Can not calculate hash for a non-callable object") fullname = f"{f.__module__}:{f.__qualname__}" h = hashlib.md5(fullname.encode()) h.update(get_callable_bytecode(f)) checksum = b64digest(h).decode() k = f"{self.cache.prefix}{self.cache.name}:{self.__key__}:{fullname}#{{{checksum}}}" return f"{k}:0", f"{k}:1"