Source code for redis_func_cache.policies.abstract

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Optional, Union

if TYPE_CHECKING:  # pragma: no cover
    from weakref import CallableProxyType

from redis.commands.core import AsyncScript, Script

from ..utils import clean_lua_script, read_lua_file

if TYPE_CHECKING:  # pragma: no cover
    from redis.typing import EncodableT, KeyT, ScriptTextT

    from ..cache import RedisFuncCache


__all__ = ("AbstractPolicy",)


[docs] class AbstractPolicy(ABC): """ Abstract base class for cache eviction policies used by :class:`RedisFuncCache`. .. inheritance-diagram:: AbstractPolicy :parts: 1 Subclasses **MUST** implement: - :meth:`calc_keys` - :meth:`calc_hash` Optionally, subclasses may define: - __key__: A string component used in Redis key naming. - __scripts__: A tuple of two Lua script filenames (get, put). The use of :attr:`__key__` or :attr:`__scripts__` depends on the implementation of :meth:`calc_keys` and :meth:`calc_hash`. """ __key__: str __scripts__: tuple[str, str] def __init__(self) -> None: """ Args: cache: Optional weakref proxy to the :class:`RedisFuncCache` instance using this policy. Note: The cache argument may be omitted when instantiating a policy. The `RedisFuncCache` will bind itself to the policy instance by setting this attribute to a weakref proxy during cache construction. """ self._cache: Optional[CallableProxyType[RedisFuncCache]] = None self._lua_scripts: Union[None, tuple[Script, Script], tuple[AsyncScript, AsyncScript]] = None @property def cache(self) -> CallableProxyType[RedisFuncCache]: """ Returns: The :class:`RedisFuncCache` instance (via weakref proxy) that uses this policy. """ if self._cache is None: raise RuntimeError("Policy instance is not bound to a RedisFuncCache") return self._cache
[docs] @abstractmethod def calc_keys( self, f: Optional[Callable] = None, args: Optional[tuple[Any, ...]] = None, kwds: Optional[dict[str, Any]] = None, ) -> tuple[str, str]: """ Calculate the Redis key pair for caching. Args: f: The function being cached. args: Positional arguments. kwds: Keyword arguments. Returns: Tuple of two Redis key names (e.g., for set and hash). """ raise NotImplementedError() # pragma: no cover
[docs] @abstractmethod def calc_hash( self, f: Optional[Callable] = None, args: Optional[tuple[Any, ...]] = None, kwds: Optional[dict[str, Any]] = None, ) -> KeyT: """ Calculate a unique hash for the function and its arguments. Args: f: The function being cached. args: Positional arguments. kwds: Keyword arguments. Returns: The calculated hash value. """ raise NotImplementedError() # pragma: no cover
[docs] def calc_ext_args( self, f: Optional[Callable] = None, args: Optional[Sequence] = None, kwds: Optional[Mapping[str, Any]] = None ) -> Optional[Iterable[EncodableT]]: """ Optionally calculate extra arguments to pass to the Lua script. Args: f: The function being cached. args: Positional arguments. kwds: Keyword arguments. Returns: Iterable of extra encodable arguments, or None. """ return None
[docs] def read_lua_scripts(self) -> tuple[ScriptTextT, ScriptTextT]: """ Read and clean the Lua scripts from package resources. Returns: Tuple of cleaned Lua script texts (get, put). """ return ( clean_lua_script(read_lua_file(self.__scripts__[0])), clean_lua_script(read_lua_file(self.__scripts__[1])), )
@property def lua_scripts(self) -> Union[tuple[Script, Script], tuple[AsyncScript, AsyncScript]]: """ Register and return Lua scripts as Redis Script/AsyncScript objects. Returns: Tuple of registered Script or AsyncScript objects. """ if self._lua_scripts is None: client = self.cache.get_client() script_texts = self.read_lua_scripts() self._lua_scripts = ( client.register_script(script_texts[0]), client.register_script(script_texts[1]), ) return self._lua_scripts
[docs] def purge(self) -> int: """ Purge the cache. Returns: Number of items removed (if implemented). Raises: NotImplementedError: If not implemented by subclass. """ raise NotImplementedError() # pragma: no cover
[docs] async def apurge(self) -> int: """ Asynchronously purge the cache. Returns: Number of items removed (if implemented). Raises: NotImplementedError: If not implemented by subclass. """ raise NotImplementedError() # pragma: no cover
[docs] def get_size(self) -> int: """ Get the number of items in the cache. Returns: The cache size. Raises: NotImplementedError: If not implemented by subclass. """ raise NotImplementedError() # pragma: no cover
[docs] async def aget_size(self) -> int: """ Asynchronously get the number of items in the cache. Returns: The cache size. Raises: NotImplementedError: If not implemented by subclass. """ raise NotImplementedError() # pragma: no cover