Source code for thorn.tasks
"""Tasks used by the Celery dispatcher."""
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from celery.utils.functional import memoize
from ._state import app_or_default
__all__ = ['send_event', 'dispatch_requests', 'dispatch_request']
@memoize()
def _worker_dispatcher():
# type: () -> Dispatcher
from .dispatch.celery import WorkerDispatcher
return WorkerDispatcher()
@shared_task(ignore_result=True)
[docs]def send_event(event, payload, sender, timeout, context={}):
# type: (str, Dict, Any, float, Dict) -> None
"""Task called by process dispatching the event.
Note:
This will use the WorkerDispatcher to dispatch the individual
HTTP requests in batches (``dispatch_requests -> dispatch_request``).
"""
_worker_dispatcher().send(
event, payload, sender, timeout=timeout, context=context)
@shared_task(ignore_result=True)
[docs]def dispatch_requests(reqs, app=None):
# type: (Sequence[Dict], App) -> None
"""Process a batch of HTTP requests."""
app = app_or_default(app)
session = app.Request.Session()
[dispatch_request(session=session, app=app, **req) for req in reqs]
@shared_task(bind=True, ignore_result=True)
[docs]def dispatch_request(self, event, data, sender, subscriber,
session=None, app=None, **kwargs):
# type: (str, Dict, Any, Dict, requests.Session, App, **Any) -> None
"""Process a single HTTP request."""
app = app_or_default(app)
# the user is serialized as the pk, so we cannot pass it
# directly to Subscriber, but we also don't need it at this point.
subscriber.pop('user', None)
subscriber = app.Subscriber(**subscriber)
request = app.Request(event, data, sender, subscriber, **kwargs)
try:
request.dispatch(session=session, propagate=request.retry)
except request.connection_errors + request.timeout_errors as exc:
if request.retry:
raise self.retry(exc=exc, max_retries=request.retry_max,
countdown=request.retry_delay)
raise