"""Base-class for functional test suites.
Extends Cyanide stress test suite with utilities used
to test Thorn.
"""
from __future__ import absolute_import, unicode_literals
import json
import requests
from contextlib import contextmanager
from six import iteritems as items, itervalues as values
from uuid import uuid4
from django.contrib.auth import get_user_model
from django.core.urlresolvers import reverse
from cyanide.suite import Suite, testcase
from thorn.django.models import Subscriber
from thorn.utils import hmac
from testapp.models import Article, SubscriberLog, Tag
BASE_URL = 'http://localhost:8000'
__all__ = ['WebhookSuite', 'new_ref', 'url', 'event_url', 'testcase']
[docs]def new_ref():
"""Create new reference ID."""
return uuid4().hex
[docs]def url(*s):
"""Create URL by components in *s."""
return '/'.join((BASE_URL,) + s)
[docs]def event_url(event, ref=None, rest=None):
"""Return url for event."""
return url('r', event, '?ref={0}{1}'.format(ref, rest or ''))
[docs]class WebhookSuite(Suite):
"""Thorn integration test suite."""
user = user2 = None
token = None
token_type = 'Token'
[docs] def setup(self):
Article.objects.all().delete()
Tag.objects.all().delete()
SubscriberLog.objects.all().delete()
Subscriber.objects.all().delete()
self.user = self._get_user('test', 'test')
self.user2 = self._get_user('test2', 'test2')
self.token = self._login()
self.ref = new_ref()
assert not SubscriberLog.objects.filter(ref=self.ref)
[docs] def create_article(self, title, state='PENDING', author=None):
return Article.objects.create(
title=title, state=state, author=author or self.user,
)
[docs] def assert_webhook_not_received(self, ref=None):
return self.ensure_not_for_a_while(
SubscriberLog.objects.get, SubscriberLog.DoesNotExist,
desc='webhook received [ref={0}]'.format(ref or self.ref),
kwargs={'ref': ref or self.ref},
)
[docs] def subscribe(self, event, ref=None, rest=None):
return self.post(
'hooks/',
event=event, url=self._event_url(event, ref, rest),
)
[docs] def unsubscribe(self, url):
return self._delete(url)
[docs] def list_subscriptions(self):
return self.get('hooks/')
def _event_url(self, event, ref=None, rest=None):
return event_url(event, ref=ref or self.ref, rest=rest)
[docs] def assert_article_event_received(self, article, event, sub=None,
reverse=None, ref=None, n=1):
logs = self.wait_for_webhook_received(ref or self.ref, maxlen=n)
assert len(logs) == n
if sub is not None:
hmac_secret = sub['hmac_secret']
log = SubscriberLog.objects.filter(ref=ref or self.ref)[0]
if hmac_secret:
assert hmac.verify(log.hmac, 'sha256', hmac_secret, log.data)
assert log.subscription == sub['id']
self.assert_log_matches(
logs[0],
event=event,
ref=reverse or self.reverse_article(article),
data=article.webhooks.payload(article),
)
[docs] def reverse_article(self, article):
return reverse('article:detail', kwargs={'id': article.pk})
[docs] def wait_for_webhook_received(self, ref=None, maxlen=1):
return self.wait_for(
self.assert_log, Exception,
'webhook (ref={0})'.format(ref or self.ref),
args=(ref or self.ref, maxlen),
)
[docs] def assert_log(self, ref=None, maxlen=1):
logs = SubscriberLog.objects.filter(ref=ref or self.ref)
assert logs and (len(logs) == maxlen if maxlen else True)
return [json.loads(entry.data) for entry in logs]
[docs] def assert_log_matches(self, log, **expected):
for k, v in items(expected):
assert log[k] == v, 'key={0} expected {1} == {2}'.format(
k, v, log[k])
def _login(self, username='test', password='test'):
return self.post(
'api-token-auth/',
username=username,
password=password,
)['token']
def _get_user(self, username, password, email='test@example.com'):
try:
return get_user_model().objects.create_user(
username=username, password=password, email=email,
)
except Exception:
return get_user_model().objects.get(username=username)
@contextmanager
[docs] def override_worker_setting(self, setting_name, new_value):
old_value = list(values(self.setenv(setting_name, new_value)))[0]
try:
yield
finally:
self.setenv(setting_name, old_value)
@contextmanager
[docs] def worker_subscribe_to(self, event, url=None, callback=None):
self.hook_subscribe(event, url=url, callback=callback)
try:
yield
finally:
self.hook_clear(event)
[docs] def setenv(self, setting_name, new_value):
return self.app.control.inspect()._request(
'setenv', setting_name=setting_name, new_value=new_value)
[docs] def assert_ok_pidbox_response(self, replies):
for reply in values(replies):
if not reply['ok']:
raise RuntimeError(
'Worker remote control command raised: {0!r}'.format(
reply.get('error', reply)))
return replies
[docs] def hook_subscribe(self, event, url, callback=None):
return self.assert_ok_pidbox_response(
self.app.control.inspect()._request(
'hook_subscribe', event=event, url=url, callback=callback,
),
)
[docs] def hook_unsubscribe(self, event, url):
return self.assert_ok_pidbox_response(
self.app.control.inspect()._request(
'hook_unsubscribe', event=event, url=url,
),
)
[docs] def hook_clear(self, event):
return self.assert_ok_pidbox_response(
self.app.control.inspect()._request(
'hook_clear', event=event,
)
)
[docs] def get(self, *path, **data):
return self._request(requests.get, 'data', url(*path), data)
[docs] def post(self, *path, **data):
return self._request(requests.post, 'json', url(*path), data)
[docs] def delete(self, *path, **data):
return self._request(requests.delete, 'json', url(*path), data)
def _delete(self, url, **data):
return self._request(requests.delete, 'json', url, data)
def _request(self, fun, data_key, url, data):
response = fun(url, headers=self.headers(), **{data_key: data})
try:
response.raise_for_status()
except Exception:
self.error('HTTP response body was: {0!r}'.format(
response.content))
raise
return response.json() if response.content else None