Source code for tinydb.queries

"""
Contains the querying interface.

Starting with :class:`~tinydb.queries.Query` you can construct complex
queries:

>>> ((where('f1') == 5) & (where('f2') != 2)) | where('s').matches(r'^\\w+$')
(('f1' == 5) and ('f2' != 2)) or ('s' ~= ^\\w+$ )

Queries are executed by using the ``__call__``:

>>> q = where('val') == 5
>>> q({'val': 5})
True
>>> q({'val': 1})
False
"""

import re
import sys
from typing import Mapping, Tuple, Callable, Any, Union, List, Optional

from .utils import freeze

if sys.version_info >= (3, 8):
    from typing import Protocol
else:
    from typing_extensions import Protocol

__all__ = ('Query', 'QueryLike', 'where')


def is_sequence(obj):
    return hasattr(obj, '__iter__')


class QueryLike(Protocol):
    """
    A typing protocol that acts like a query.

    Something that we use as a query must have three properties:

    1. It must be callable, accepting a `Mapping` object and returning a
       boolean that indicates whether the value matches the query, and
    2. it must have a stable hash that will be used for query caching.
    3. it must declare whether it is cacheable (that is, whether it is immutable).

    This query protocol is used to make MyPy correctly support the query
    pattern that TinyDB uses.

    See also https://mypy.readthedocs.io/en/stable/protocols.html#simple-user-defined-protocols
    """
    def __call__(self, value: Mapping) -> bool: ...

    def __hash__(self): ...

    def is_cacheable(self) -> bool: ...


[docs]class QueryInstance: """ A query instance. This is the object on which the actual query operations are performed. The :class:`~tinydb.queries.Query` class acts like a query builder and generates :class:`~tinydb.queries.QueryInstance` objects which will evaluate their query against a given document when called. Query instances can be combined using logical OR and AND and inverted using logical NOT. In order to be usable in a query cache, a query needs to have a stable hash value with the same query always returning the same hash. That way a query instance can be used as a key in a dictionary. """
[docs] def __init__(self, test: Callable[[Mapping], bool], hashval: Optional[Tuple]): self._test = test self._hash = hashval
def is_cacheable(self) -> bool: return self._hash is not None
[docs] def __call__(self, value: Mapping) -> bool: """ Evaluate the query to check if it matches a specified value. :param value: The value to check. :return: Whether the value matches this query. """ return self._test(value)
[docs] def __hash__(self): # We calculate the query hash by using the ``hashval`` object which # describes this query uniquely so we can calculate a stable hash value # by simply hashing it return hash(self._hash)
[docs] def __repr__(self): return 'QueryImpl{}'.format(self._hash)
[docs] def __eq__(self, other: object): if isinstance(other, QueryInstance): return self._hash == other._hash return False
# --- Query modifiers ----------------------------------------------------- def __and__(self, other: 'QueryInstance') -> 'QueryInstance': # We use a frozenset for the hash as the AND operation is commutative # (a & b == b & a) and the frozenset does not consider the order of # elements if self.is_cacheable() and other.is_cacheable(): hashval = ('and', frozenset([self._hash, other._hash])) else: hashval = None return QueryInstance(lambda value: self(value) and other(value), hashval) def __or__(self, other: 'QueryInstance') -> 'QueryInstance': # We use a frozenset for the hash as the OR operation is commutative # (a | b == b | a) and the frozenset does not consider the order of # elements if self.is_cacheable() and other.is_cacheable(): hashval = ('or', frozenset([self._hash, other._hash])) else: hashval = None return QueryInstance(lambda value: self(value) or other(value), hashval) def __invert__(self) -> 'QueryInstance': hashval = ('not', self._hash) if self.is_cacheable() else None return QueryInstance(lambda value: not self(value), hashval)
[docs]class Query(QueryInstance): """ TinyDB Queries. Allows to build queries for TinyDB databases. There are two main ways of using queries: 1) ORM-like usage: >>> User = Query() >>> db.search(User.name == 'John Doe') >>> db.search(User['logged-in'] == True) 2) Classical usage: >>> db.search(where('value') == True) Note that ``where(...)`` is a shorthand for ``Query(...)`` allowing for a more fluent syntax. Besides the methods documented here you can combine queries using the binary AND and OR operators: >>> # Binary AND: >>> db.search((where('field1').exists()) & (where('field2') == 5)) >>> # Binary OR: >>> db.search((where('field1').exists()) | (where('field2') == 5)) Queries are executed by calling the resulting object. They expect to get the document to test as the first argument and return ``True`` or ``False`` depending on whether the documents matches the query or not. """
[docs] def __init__(self) -> None: # The current path of fields to access when evaluating the object self._path = () # type: Tuple[Union[str, Callable], ...] # Prevent empty queries to be evaluated def notest(_): raise RuntimeError('Empty query was evaluated') super().__init__( test=notest, hashval=(None,) )
[docs] def __repr__(self): return '{}()'.format(type(self).__name__)
[docs] def __hash__(self): return super().__hash__()
def __getattr__(self, item: str): # Generate a new query object with the new query path # We use type(self) to get the class of the current query in case # someone uses a subclass of ``Query`` query = type(self)() # Now we add the accessed item to the query path ... query._path = self._path + (item,) # ... and update the query hash query._hash = ('path', query._path) if self.is_cacheable() else None return query def __getitem__(self, item: str): # A different syntax for ``__getattr__`` # We cannot call ``getattr(item)`` here as it would try to resolve # the name as a method name first, only then call our ``__getattr__`` # method. By calling ``__getattr__`` directly, we make sure that # calling e.g. ``Query()['test']`` will always generate a query for a # document's ``test`` field instead of returning a reference to the # ``Query.test`` method return self.__getattr__(item) def _generate_test( self, test: Callable[[Any], bool], hashval: Tuple, allow_empty_path: bool = False ) -> QueryInstance: """ Generate a query based on a test function that first resolves the query path. :param test: The test the query executes. :param hashval: The hash of the query. :return: A :class:`~tinydb.queries.QueryInstance` object """ if not self._path and not allow_empty_path: raise ValueError('Query has no path') def runner(value): try: # Resolve the path for part in self._path: if isinstance(part, str): value = value[part] else: value = part(value) except (KeyError, TypeError): return False else: # Perform the specified test return test(value) return QueryInstance( lambda value: runner(value), (hashval if self.is_cacheable() else None) )
[docs] def __eq__(self, rhs: Any): """ Test a dict value for equality. >>> Query().f1 == 42 :param rhs: The value to compare against """ return self._generate_test( lambda value: value == rhs, ('==', self._path, freeze(rhs)) )
[docs] def __ne__(self, rhs: Any): """ Test a dict value for inequality. >>> Query().f1 != 42 :param rhs: The value to compare against """ return self._generate_test( lambda value: value != rhs, ('!=', self._path, freeze(rhs)) )
[docs] def __lt__(self, rhs: Any) -> QueryInstance: """ Test a dict value for being lower than another value. >>> Query().f1 < 42 :param rhs: The value to compare against """ return self._generate_test( lambda value: value < rhs, ('<', self._path, rhs) )
[docs] def __le__(self, rhs: Any) -> QueryInstance: """ Test a dict value for being lower than or equal to another value. >>> where('f1') <= 42 :param rhs: The value to compare against """ return self._generate_test( lambda value: value <= rhs, ('<=', self._path, rhs) )
[docs] def __gt__(self, rhs: Any) -> QueryInstance: """ Test a dict value for being greater than another value. >>> Query().f1 > 42 :param rhs: The value to compare against """ return self._generate_test( lambda value: value > rhs, ('>', self._path, rhs) )
[docs] def __ge__(self, rhs: Any) -> QueryInstance: """ Test a dict value for being greater than or equal to another value. >>> Query().f1 >= 42 :param rhs: The value to compare against """ return self._generate_test( lambda value: value >= rhs, ('>=', self._path, rhs) )
[docs] def exists(self) -> QueryInstance: """ Test for a dict where a provided key exists. >>> Query().f1.exists() """ return self._generate_test( lambda _: True, ('exists', self._path) )
[docs] def matches(self, regex: str, flags: int = 0) -> QueryInstance: """ Run a regex test against a dict value (whole string has to match). >>> Query().f1.matches(r'^\\w+$') :param regex: The regular expression to use for matching :param flags: regex flags to pass to ``re.match`` """ def test(value): if not isinstance(value, str): return False return re.match(regex, value, flags) is not None return self._generate_test(test, ('matches', self._path, regex))
[docs] def search(self, regex: str, flags: int = 0) -> QueryInstance: """ Run a regex test against a dict value (only substring string has to match). >>> Query().f1.search(r'^\\w+$') :param regex: The regular expression to use for matching :param flags: regex flags to pass to ``re.match`` """ def test(value): if not isinstance(value, str): return False return re.search(regex, value, flags) is not None return self._generate_test(test, ('search', self._path, regex))
[docs] def test(self, func: Callable[[Mapping], bool], *args) -> QueryInstance: """ Run a user-defined test function against a dict value. >>> def test_func(val): ... return val == 42 ... >>> Query().f1.test(test_func) .. warning:: The test fuction provided needs to be deterministic (returning the same value when provided with the same arguments), otherwise this may mess up the query cache that :class:`~tinydb.table.Table` implements. :param func: The function to call, passing the dict as the first argument :param args: Additional arguments to pass to the test function """ return self._generate_test( lambda value: func(value, *args), ('test', self._path, func, args) )
[docs] def any(self, cond: Union[QueryInstance, List[Any]]) -> QueryInstance: """ Check if a condition is met by any document in a list, where a condition can also be a sequence (e.g. list). >>> Query().f1.any(Query().f2 == 1) Matches:: {'f1': [{'f2': 1}, {'f2': 0}]} >>> Query().f1.any([1, 2, 3]) Matches:: {'f1': [1, 2]} {'f1': [3, 4, 5]} :param cond: Either a query that at least one document has to match or a list of which at least one document has to be contained in the tested document. """ if callable(cond): def test(value): return is_sequence(value) and any(cond(e) for e in value) else: def test(value): return is_sequence(value) and any(e in cond for e in value) return self._generate_test( lambda value: test(value), ('any', self._path, freeze(cond)) )
[docs] def all(self, cond: Union['QueryInstance', List[Any]]) -> QueryInstance: """ Check if a condition is met by all documents in a list, where a condition can also be a sequence (e.g. list). >>> Query().f1.all(Query().f2 == 1) Matches:: {'f1': [{'f2': 1}, {'f2': 1}]} >>> Query().f1.all([1, 2, 3]) Matches:: {'f1': [1, 2, 3, 4, 5]} :param cond: Either a query that all documents have to match or a list which has to be contained in the tested document. """ if callable(cond): def test(value): return is_sequence(value) and all(cond(e) for e in value) else: def test(value): return is_sequence(value) and all(e in value for e in cond) return self._generate_test( lambda value: test(value), ('all', self._path, freeze(cond)) )
[docs] def one_of(self, items: List[Any]) -> QueryInstance: """ Check if the value is contained in a list or generator. >>> Query().f1.one_of(['value 1', 'value 2']) :param items: The list of items to check with """ return self._generate_test( lambda value: value in items, ('one_of', self._path, freeze(items)) )
def fragment(self, document: Mapping) -> QueryInstance: def test(value): for key in document: if key not in value or value[key] != document[key]: return False return True return self._generate_test( lambda value: test(value), ('fragment', freeze(document)), allow_empty_path=True )
[docs] def noop(self) -> QueryInstance: """ Always evaluate to ``True``. Useful for having a base value when composing queries dynamically. """ return QueryInstance( lambda value: True, () )
[docs] def map(self, fn: Callable[[Any], Any]) -> 'Query': """ Add a function to the query path. Similar to __getattr__ but for arbitrary functions. """ query = type(self)() # Now we add the callable to the query path ... query._path = self._path + (fn,) # ... and kill the hash - callable objects can be mutable so it's # harmful to cache their results. query._hash = None return query
def where(key: str) -> Query: """ A shorthand for ``Query()[key]`` """ return Query()[key]