"""Functional-style utilities."""
from __future__ import absolute_import, unicode_literals
import operator
from collections import Callable, deque
from functools import partial
from itertools import islice
from six import string_types
from celery.utils import cached_property
from celery.utils.functional import is_list, maybe_list
from celery.utils.imports import symbol_by_name
try:
from django.db.models.query import Q as _Q_
except ImportError: # pragma: no cover
from .django.query_utils import Q as _Q_ # noqa
__all__ = ['chunks', 'Q']
E_FILTER_FIELD_MISSING_OP = (
"filter field argument {0!r} not allowed: did you mean '{0}__eq'?"
)
def not_contains(a, b):
"""Operator for ``b not in a``.
not_contains(a, b) -> b not in a``
"""
return b not in a
def startswith(a, b):
"""Function calling obj.startsiwth.
``startswith(a, b) -> a.startswith(b)``
"""
return a.startswith(b)
def endswith(a, b):
"""Function calling obj.endswith.
``endswith(a, b) -> a.endswith(b)``
"""
return a.endswith(b)
def reverse_n(N, tup):
"""Reverse n first elements in a tuple."""
return tuple(reversed(tup[:N])) + tup[N:] if N else tuple(reversed(tup))
def negate(fun):
"""Return function negating the value of a boolean function."""
def negated(*args, **kwargs):
return not fun(*args, **kwargs)
return negated
def reverse_arguments(n):
"""Return transformed function where first N arguments are reversed."""
def _inner(fun):
def reversed(*args, **kwargs):
return fun(*reverse_n(n, args), **kwargs)
return reversed
return _inner
def traverse_subscribers(it, *args, **kwargs):
stream = deque([it])
while stream:
for node in maybe_list(stream.popleft()):
if isinstance(node, string_types) and node.startswith('!'):
node = symbol_by_name(node[1:])
if isinstance(node, Callable):
node = node(*args, **kwargs)
if is_list(node):
stream.append(node)
elif node:
yield node
def wrap_transition(op, did_change):
"""Transform operator into a transition operator.
That is, an operator that
only returns true if the ``did_change`` operator also returns true.
Note:
E.g. ``wrap_transition(operator.eq, operator.ne)`` returns function
with signature ``(new_value, needle, old_value)`` and only returns
true if new_value is equal to needle, but old_value was not equal
to needle.
"""
def compare(new_value, needle, old_value):
return did_change(old_value, needle) and op(new_value, needle)
return compare
[docs]def chunks(it, n):
"""Split an iterator into chunks with `n` elements each.
Example:
# n == 2
>>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
>>> list(x)
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]]
# n == 3
>>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
>>> list(x)
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
"""
for first in it:
yield [first] + list(islice(it, n - 1))
[docs]class Q(_Q_):
"""Object query node.
This class works like :class:`django.db.models.Q`, but is used for
filtering regular Python objects instead of database rows.
Examples:
>>> # Match object with `last_name` attribute set to "Costanza":
>>> Q(last_name__eq="Costanza")
>>> # Match object with `author.last_name` attribute set to "Benes":
>>> Q(author__last_name__eq="Benes")
>>> # You are not allowed to specify any key without an operator,
>>> # even when the following would be fine using Django`s Q objects:
>>> Q(author__last_name="Benes") # <-- ERROR, will raise ValueError
>>> # Attributes can be nested arbitrarily deep:
>>> Q(a__b__c__d__e__f__g__x__gt=3.03)
>>> # The special `*__eq=True` means "match any *true-ish* value":
>>> Q(author__account__is_staff__eq=True)
>>> # Similarly the `*__eq=False` means "match any *false-y*" value":
>>> Q(author__account__is_disabled=False)
See Also:
:ref:`events-model-filtering-operators`.
Returns:
Callable: to match an object with the given predicates,
call the return value with the object to match:
``Q(x__eq==808)(obj)``.
"""
#: The gate decides the boolean operator of this tree node.
#: A node can either be *OR* (``a | b``), or an *AND* note (``a & b``).
#: - Default is *AND*.
gates = {
_Q_.OR: any,
_Q_.AND: all,
}
#: If the node is negated (``~a`` / ``a.negate()``), branch will be True,
#: and we reverse the query into a ``not a`` one.
branches = {
True: operator.not_,
False: operator.truth,
}
#: Mapping of opcode to binary operator function: ``f(a, b)``.
#: Operators may return any true-ish or false-y value.
operators = {
'eq': operator.eq,
'now_eq': wrap_transition(operator.eq, operator.ne),
'ne': operator.ne,
'now_ne': wrap_transition(operator.ne, operator.ne),
'gt': operator.gt,
'now_gt': wrap_transition(operator.gt, operator.lt),
'lt': operator.lt,
'now_lt': wrap_transition(operator.lt, operator.gt),
'gte': operator.ge,
'now_gte': wrap_transition(operator.ge, operator.le),
'lte': operator.le,
'now_lte': wrap_transition(operator.le, operator.ge),
'in': reverse_arguments(2)(operator.contains),
'now_in': wrap_transition(
reverse_arguments(2)(operator.contains),
reverse_arguments(2)(not_contains),
),
'not_in': reverse_arguments(2)(not_contains),
'now_not_in': wrap_transition(
reverse_arguments(2)(not_contains),
reverse_arguments(2)(operator.contains),
),
'is': operator.is_,
'now_is': wrap_transition(operator.is_, operator.is_not),
'is_not': operator.is_not,
'now_is_not': wrap_transition(operator.is_not, lambda a, _: a is None),
'contains': operator.contains,
'now_contains': wrap_transition(
operator.contains, negate(operator.contains),
),
'not': lambda x, _: operator.not_(x),
'true': lambda x, _: operator.truth(x),
'startswith': startswith,
'now_startswith': wrap_transition(startswith, negate(startswith)),
'endswith': endswith,
'now_endswith': wrap_transition(endswith, negate(endswith)),
}
def __call__(self, obj):
# NOT?( AND|OR(...) )
return self.branches[self.negated](
self.gate(f(obj) for f in self.stack)
)
[docs] def compile(self, fields):
# this does not traverse the tree, but compiles the nodes
# in ``self.children`` only. The nodes below will be compiled
# and cached when they are called.
return [self.compile_node(field) for field in fields]
[docs] def compile_node(self, field):
"""Compile node into a cached function that performs the match.
Returns:
Callable: taking the object to match.
"""
# can embed other Q objects
if isinstance(field, type(self)):
return field
# convert Django Q objects in-place.
elif isinstance(field, _Q_):
field.__class__ = type(self)
return field
# or it's a key, value pair.
lhs, rhs = field
lhs, opcode = self.prepare_statement(lhs, rhs)
# this creates the new matching function to be added to the stack.
return self.compile_op(lhs, rhs, opcode)
[docs] def prepare_statement(self, lhs, rhs):
lhs, _, opcode = lhs.rpartition('__')
if not opcode or opcode not in self.operators:
raise ValueError(E_FILTER_FIELD_MISSING_OP.format(lhs))
return lhs.replace('__', '.'), self.prepare_opcode(opcode, rhs)
[docs] def prepare_opcode(self, O, rhs):
# eq=True and friends are special, as they should match any
# true-ish value (__bool__), not check for equality.
if (O == 'eq' and rhs is True) or O == 'ne' and rhs is False:
return 'true'
elif (O == 'eq' and rhs is False) or O == 'ne' and rhs is True:
return 'not'
return O
[docs] def compile_op(self, lhs, rhs, opcode):
return self._compile_op(
self.apply_trans_op if 'now' in opcode else self.apply_op,
lhs, rhs, opcode,
)
def _compile_op(self, apply, lhs, rhs, opcode, *args):
return partial(
apply,
operator.attrgetter(lhs), self.operators[opcode], rhs, *args
)
[docs] def apply_op(self, getter, op, rhs, obj, *args):
# compiled nodes end up being partial versions of this method,
# with the getter, op and rhs arguments already set.
return op(getter(obj), rhs, *args)
[docs] def apply_trans_op(self, getter, op, rhs, obj):
# transition op (e.g. now_eq) only matches if the
# value differs from the previous version.
return self.apply_op(
getter, op, rhs, obj, self._get_from_prev_version(getter, obj),
)
def _get_from_prev_version(self, getter, obj):
try:
prev = obj._previous_version
except AttributeError:
pass
else:
return getter(prev)
@property
def gate(self):
return self.gates[self.connector]
[docs] @cached_property
def stack(self):
# the stack is cached on first call.
return self.compile(self.children)