Source code for thorn.dispatch.celery
"""Celery-based webhook dispatcher."""
from __future__ import absolute_import, unicode_literals
from celery import group
from thorn.tasks import send_event, dispatch_requests
from thorn.utils.functional import chunks
from . import base
__all__ = ['Dispatcher', 'WorkerDispatcher']
class _CeleryDispatcher(base.Dispatcher):
def as_request_group(self, requests):
return group(
dispatch_requests.s([req.as_dict() for req in chunk])
for chunk in self.group_requests(requests)
)
def group_requests(self, requests):
"""Group requests by keep-alive host/port/scheme ident."""
return chunks(iter(requests), self.app.settings.THORN_CHUNKSIZE)
def _compare_requests(self, a, b):
return a.urlident == b.urlident
[docs]class Dispatcher(_CeleryDispatcher):
"""Dispatcher using Celery tasks to dispatch events.
Note:
Overrides what happens when :meth:`thorn.webhook.Event.send` is
called so that dispatching the HTTP request tasks is performed by
a worker, instead of in the current process.
"""
[docs] def send(self, event, payload, sender,
timeout=None, context=None, **kwargs):
return send_event.s(
event, payload,
sender.pk if sender else sender, timeout, context,
).apply_async()
[docs] def flush_buffer(self):
# XXX Not thread-safe
g = self.as_request_group(self.pending_outbound)
self.pending_outbound.clear()
g.delay()
[docs]class WorkerDispatcher(_CeleryDispatcher):
"""Dispatcher used by the :func:`thorn.tasks.send_event` task."""
[docs] def send(self, event, payload, sender,
timeout=None, context=None, **kwargs):
# the requests are sorted by url, so we group them into chunks
# each containing a list of requests for that host/port/scheme pair,
# with up to :setting:`THORN_CHUNKSIZE` requests each.
#
# this way requests have a good chance of reusing keepalive
# connections as requests with the same host are grouped together.
return self.as_request_group(self.prepare_requests(
event, payload, sender, timeout, context, **kwargs)).delay()