diff --git a/apps/push/__init__.py b/apps/push/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/push/models.py b/apps/push/models.py new file mode 100644 index 000000000..65a68ed1e --- /dev/null +++ b/apps/push/models.py @@ -0,0 +1,130 @@ +# Adapted from djpubsubhubbub. See License: http://git.participatoryculture.org/djpubsubhubbub/tree/LICENSE + +from datetime import datetime, timedelta +import feedparser +from urllib import urlencode +import urllib2 + +from django.conf import settings +from django.contrib.sites.models import Site +from django.core.urlresolvers import reverse, Resolver404 +from django.db import models +from django.utils.hashcompat import sha_constructor + +from djpubsubhubbub import signals + +DEFAULT_LEASE_SECONDS = 2592000 # 30 days in seconds + +class SubscriptionManager(models.Manager): + + def subscribe(self, topic, hub=None, callback=None, + lease_seconds=None): + if hub is None: + hub = self._get_hub(topic) + + if hub is None: + raise TypeError( + 'hub cannot be None if the feed does not provide it') + + if lease_seconds is None: + lease_seconds = getattr(settings, 'PUBSUBHUBBUB_LEASE_SECONDS', + DEFAULT_LEASE_SECONDS) + + subscription, created = self.get_or_create( + hub=hub, topic=topic) + signals.pre_subscribe.send(sender=subscription, created=created) + subscription.set_expiration(lease_seconds) + + if callback is None: + try: + callback_path = reverse('pubsubhubbub_callback', + args=(subscription.pk,)) + except Resolver404: + raise TypeError( + 'callback cannot be None if there is not a reverable URL') + else: + callback = 'http://' + Site.objects.get_current() + \ + callback_path + + response = self._send_request(hub, { + 'mode': 'subscribe', + 'callback': callback, + 'topic': topic, + 'verify': ('async', 'sync'), + 'verify_token': subscription.generate_token('subscribe'), + 'lease_seconds': lease_seconds, + }) + + info = response.info() + if info.status == 204: + subscription.verified = True + elif info.status == 202: # async verification + subscription.verified = False + else: + error = response.read() + raise urllib2.URLError('error subscribing to %s on %s:\n%s' % ( + topic, hub, error)) + + subscription.save() + if subscription.verified: + signals.verified.send(sender=subscription) + return subscription + + + def _get_hub(self, topic): + parsed = feedparser.parse(topic) + for link in parsed.feed.links: + if link['rel'] == 'hub': + return link['href'] + + def _send_request(self, url, data): + def data_generator(): + for key, value in data.items(): + key = 'hub.' + key + if isinstance(value, (basestring, int)): + yield key, str(value) + else: + for subvalue in value: + yield key, value + encoded_data = urlencode(list(data_generator())) + return urllib2.urlopen(url, encoded_data) + +class Subscription(models.Model): + + hub = models.URLField() + topic = models.URLField() + verified = models.BooleanField(default=False) + verify_token = models.CharField(max_length=60) + lease_expires = models.DateTimeField(default=datetime.now) + + objects = SubscriptionManager() + + # class Meta: + # unique_together = [ + # ('hub', 'topic') + # ] + + def set_expiration(self, lease_seconds): + self.lease_expires = datetime.now() + timedelta( + seconds=lease_seconds) + self.save() + + def generate_token(self, mode): + assert self.pk is not None, \ + 'Subscription must be saved before generating token' + token = mode[:20] + sha_constructor('%s%i%s' % ( + settings.SECRET_KEY, self.pk, mode)).hexdigest() + self.verify_token = token + self.save() + return token + + def __unicode__(self): + if self.verified: + verified = u'verified' + else: + verified = u'unverified' + return u'to %s on %s: %s' % ( + self.topic, self.hub, verified) + + def __str__(self): + return str(unicode(self)) diff --git a/apps/push/signals.py b/apps/push/signals.py new file mode 100644 index 000000000..2f2aa7d3d --- /dev/null +++ b/apps/push/signals.py @@ -0,0 +1,7 @@ +# Adapted from djpubsubhubbub. See License: http://git.participatoryculture.org/djpubsubhubbub/tree/LICENSE + +from django.dispatch import Signal + +pre_subscribe = Signal(providing_args=['created']) +verified = Signal() +updated = Signal(providing_args=['update']) diff --git a/apps/push/tests.py b/apps/push/tests.py new file mode 100644 index 000000000..43d1c8e5c --- /dev/null +++ b/apps/push/tests.py @@ -0,0 +1,468 @@ +# Copyright 2009 - Participatory Culture Foundation +# +# This file is part of djpubsubhubbub. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT +# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from datetime import datetime, timedelta +import urllib2 + +from django.core.urlresolvers import reverse +from django.test import TestCase + +from djpubsubhubbub.models import Subscription, SubscriptionManager +from djpubsubhubbub.signals import pre_subscribe, verified, updated + +class MockResponse(object): + def __init__(self, status, data=None): + self.status = status + self.data = data + + def info(self): + return self + + def read(self): + if self.data is None: + return '' + data, self.data = self.data, None + return data + +class PSHBTestBase: + + urls = 'djpubsubhubbub.urls' + + def setUp(self): + self._old_send_request = SubscriptionManager._send_request + SubscriptionManager._send_request = self._send_request + self.responses = [] + self.requests = [] + self.signals = [] + for connecter in pre_subscribe, verified, updated: + def callback(signal=None, **kwargs): + self.signals.append((signal, kwargs)) + connecter.connect(callback, dispatch_uid=connecter, weak=False) + + def tearDown(self): + SubscriptionManager._send_request = self._old_send_request + del self._old_send_request + for signal in pre_subscribe, verified: + signal.disconnect(dispatch_uid=signal) + + def _send_request(self, url, data): + self.requests.append((url, data)) + return self.responses.pop() + +class PSHBSubscriptionManagerTest(PSHBTestBase, TestCase): + + def test_sync_verify(self): + """ + If the hub returns a 204 response, the subscription is verified and + active. + """ + self.responses.append(MockResponse(204)) + sub = Subscription.objects.subscribe('topic', 'hub', 'callback', 2000) + self.assertEquals(len(self.signals), 2) + self.assertEquals(self.signals[0], (pre_subscribe, {'sender': sub, + 'created': True})) + self.assertEquals(self.signals[1], (verified, {'sender': sub})) + self.assertEquals(sub.hub, 'hub') + self.assertEquals(sub.topic, 'topic') + self.assertEquals(sub.verified, True) + rough_expires = datetime.now() + timedelta(seconds=2000) + self.assert_(abs(sub.lease_expires - rough_expires).seconds < 5, + 'lease more than 5 seconds off') + self.assertEquals(len(self.requests), 1) + request = self.requests[0] + self.assertEquals(request[0], 'hub') + self.assertEquals(request[1]['mode'], 'subscribe') + self.assertEquals(request[1]['topic'], 'topic') + self.assertEquals(request[1]['callback'], 'callback') + self.assertEquals(request[1]['verify'], ('async', 'sync')) + self.assertEquals(request[1]['verify_token'], sub.verify_token) + self.assertEquals(request[1]['lease_seconds'], 2000) + + def test_async_verify(self): + """ + If the hub returns a 202 response, we should not assume the + subscription is verified. + """ + self.responses.append(MockResponse(202)) + sub = Subscription.objects.subscribe('topic', 'hub', 'callback', 2000) + self.assertEquals(len(self.signals), 1) + self.assertEquals(self.signals[0], (pre_subscribe, {'sender': sub, + 'created': True})) + self.assertEquals(sub.hub, 'hub') + self.assertEquals(sub.topic, 'topic') + self.assertEquals(sub.verified, False) + rough_expires = datetime.now() + timedelta(seconds=2000) + self.assert_(abs(sub.lease_expires - rough_expires).seconds < 5, + 'lease more than 5 seconds off') + self.assertEquals(len(self.requests), 1) + request = self.requests[0] + self.assertEquals(request[0], 'hub') + self.assertEquals(request[1]['mode'], 'subscribe') + self.assertEquals(request[1]['topic'], 'topic') + self.assertEquals(request[1]['callback'], 'callback') + self.assertEquals(request[1]['verify'], ('async', 'sync')) + self.assertEquals(request[1]['verify_token'], sub.verify_token) + self.assertEquals(request[1]['lease_seconds'], 2000) + + def test_least_seconds_default(self): + """ + If the number of seconds to lease the subscription is not specified, it + should default to 2592000 (30 days). + """ + self.responses.append(MockResponse(202)) + sub = Subscription.objects.subscribe('topic', 'hub', 'callback') + rough_expires = datetime.now() + timedelta(seconds=2592000) + self.assert_(abs(sub.lease_expires - rough_expires).seconds < 5, + 'lease more than 5 seconds off') + self.assertEquals(len(self.requests), 1) + request = self.requests[0] + self.assertEquals(request[1]['lease_seconds'], 2592000) + + def test_error_on_subscribe_raises_URLError(self): + """ + If a non-202/204 status is returned, raise a URLError. + """ + self.responses.append(MockResponse(500, 'error data')) + try: + Subscription.objects.subscribe('topic', 'hub', 'callback') + except urllib2.URLError, e: + self.assertEquals(e.reason, + 'error subscribing to topic on hub:\nerror data') + else: + self.fail('subscription did not raise URLError exception') + +class PSHBCallbackViewTestCase(PSHBTestBase, TestCase): + + def test_verify(self): + """ + Getting the callback from the server should verify the subscription. + """ + sub = Subscription.objects.create( + topic='topic', + hub='hub', + verified=False) + verify_token = sub.generate_token('subscribe') + + response = self.client.get(reverse('pubsubhubbub_callback', + args=(sub.pk,)), + {'hub.mode': 'subscribe', + 'hub.topic': sub.topic, + 'hub.challenge': 'challenge', + 'hub.lease_seconds': 2000, + 'hub.verify_token': verify_token}) + + self.assertEquals(response.status_code, 200) + self.assertEquals(response.content, 'challenge') + sub = Subscription.objects.get(pk=sub.pk) + self.assertEquals(sub.verified, True) + self.assertEquals(len(self.signals), 1) + self.assertEquals(self.signals[0], (verified, {'sender': sub})) + + def test_404(self): + """ + Various things sould return a 404: + + * invalid primary key in the URL + * token doesn't start with 'subscribe' + * subscription doesn't exist + * token doesn't match the subscription + """ + sub = Subscription.objects.create( + topic='topic', + hub='hub', + verified=False) + verify_token = sub.generate_token('subscribe') + + response = self.client.get(reverse('pubsubhubbub_callback', + args=(0,)), + {'hub.mode': 'subscribe', + 'hub.topic': sub.topic, + 'hub.challenge': 'challenge', + 'hub.lease_seconds': 2000, + 'hub.verify_token': verify_token[1:]}) + self.assertEquals(response.status_code, 404) + self.assertEquals(len(self.signals), 0) + + response = self.client.get(reverse('pubsubhubbub_callback', + args=(sub.pk,)), + {'hub.mode': 'subscribe', + 'hub.topic': sub.topic, + 'hub.challenge': 'challenge', + 'hub.lease_seconds': 2000, + 'hub.verify_token': verify_token[1:]}) + self.assertEquals(response.status_code, 404) + self.assertEquals(len(self.signals), 0) + + response = self.client.get(reverse('pubsubhubbub_callback', + args=(sub.pk,)), + {'hub.mode': 'subscribe', + 'hub.topic': sub.topic + 'extra', + 'hub.challenge': 'challenge', + 'hub.lease_seconds': 2000, + 'hub.verify_token': verify_token}) + self.assertEquals(response.status_code, 404) + self.assertEquals(len(self.signals), 0) + + response = self.client.get(reverse('pubsubhubbub_callback', + args=(sub.pk,)), + {'hub.mode': 'subscribe', + 'hub.topic': sub.topic, + 'hub.challenge': 'challenge', + 'hub.lease_seconds': 2000, + 'hub.verify_token': verify_token[:-5]}) + self.assertEquals(response.status_code, 404) + self.assertEquals(len(self.signals), 0) + +class PSHBUpdateTestCase(PSHBTestBase, TestCase): + + def test_update(self): + # this data comes from + # http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.1.html#anchor3 + update_data = """ + + + + + + 2008-08-11T02:15:01Z + + + + Heathcliff + + http://publisher.example.com/happycat25.xml + 2008-08-11T02:15:01Z + + What a happy cat. Full content goes here. + + + + + + Heathcliff + + http://publisher.example.com/happycat25.xml + 2008-08-11T02:15:01Z + + What a happy cat! + + + + + + Garfield + + http://publisher.example.com/happycat25.xml + 2008-08-11T02:15:01Z + + + + + Nermal + + http://publisher.example.com/happycat25.xml + 2008-07-10T12:28:13Z + + + +""" + + sub = Subscription.objects.create( + hub="http://myhub.example.com/endpoint", + topic="http://publisher.example.com/happycats.xml") + + callback_data = [] + updated.connect( + lambda sender=None, update=None, **kwargs: callback_data.append( + (sender, update)), + weak=False) + + response = self.client.post(reverse('pubsubhubbub_callback', + args=(sub.pk,)), + update_data, 'application/atom+xml') + self.assertEquals(response.status_code, 200) + + self.assertEquals(len(callback_data), 1) + sender, update = callback_data[0] + self.assertEquals(sender, sub) + self.assertEquals(len(update.entries), 4) + self.assertEquals(update.entries[0].id, + 'http://publisher.example.com/happycat25.xml') + self.assertEquals(update.entries[1].id, + 'http://publisher.example.com/happycat25.xml') + self.assertEquals(update.entries[2].id, + 'http://publisher.example.com/happycat25.xml') + self.assertEquals(update.entries[3].id, + 'http://publisher.example.com/happycat25.xml') + + def test_update_with_changed_hub(self): + update_data = """ + + + + + + 2008-08-11T02:15:01Z + + + Heathcliff + + http://publisher.example.com/happycat25.xml + 2008-08-11T02:15:01Z + + What a happy cat. Full content goes here. + + + +""" + sub = Subscription.objects.create( + hub="hub", + topic="http://publisher.example.com/happycats.xml", + lease_expires=datetime.now() + timedelta(days=1)) + + callback_data = [] + updated.connect( + lambda sender=None, update=None, **kwargs: callback_data.append( + (sender, update)), + weak=False) + + self.responses.append(MockResponse(204)) + + response = self.client.post(reverse('pubsubhubbub_callback', + args=(sub.pk,)), + update_data, 'application/atom+xml') + self.assertEquals(response.status_code, 200) + self.assertEquals( + Subscription.objects.filter( + hub='http://myhub.example.com/endpoint', + topic='http://publisher.example.com/happycats.xml', + verified=True).count(), 1) + self.assertEquals(len(self.requests), 1) + self.assertEquals(self.requests[0][0], + 'http://myhub.example.com/endpoint') + self.assertEquals(self.requests[0][1]['callback'], + 'http://testserver/1/') + self.assert_((self.requests[0][1]['lease_seconds'] - 86400) < 5) + + def test_update_with_changed_self(self): + update_data = """ + + + + + + 2008-08-11T02:15:01Z + + + Heathcliff + + http://publisher.example.com/happycat25.xml + 2008-08-11T02:15:01Z + + What a happy cat. Full content goes here. + + + +""" + sub = Subscription.objects.create( + hub="http://myhub.example.com/endpoint", + topic="topic", + lease_expires=datetime.now() + timedelta(days=1)) + + callback_data = [] + updated.connect( + lambda sender=None, update=None, **kwargs: callback_data.append( + (sender, update)), + weak=False) + + self.responses.append(MockResponse(204)) + + response = self.client.post(reverse('pubsubhubbub_callback', + args=(sub.pk,)), + update_data, 'application/atom+xml') + self.assertEquals(response.status_code, 200) + self.assertEquals( + Subscription.objects.filter( + hub='http://myhub.example.com/endpoint', + topic='http://publisher.example.com/happycats.xml', + verified=True).count(), 1) + self.assertEquals(len(self.requests), 1) + self.assertEquals(self.requests[0][0], + 'http://myhub.example.com/endpoint') + self.assertEquals(self.requests[0][1]['callback'], + 'http://testserver/1/') + self.assert_((self.requests[0][1]['lease_seconds'] - 86400) < 5) + + def test_update_with_changed_hub_and_self(self): + update_data = """ + + + + + + 2008-08-11T02:15:01Z + + + Heathcliff + + http://publisher.example.com/happycat25.xml + 2008-08-11T02:15:01Z + + What a happy cat. Full content goes here. + + + +""" + sub = Subscription.objects.create( + hub="hub", + topic="topic", + lease_expires=datetime.now() + timedelta(days=1)) + + callback_data = [] + updated.connect( + lambda sender=None, update=None, **kwargs: callback_data.append( + (sender, update)), + weak=False) + + self.responses.append(MockResponse(204)) + + response = self.client.post(reverse('pubsubhubbub_callback', + args=(sub.pk,)), + update_data, 'application/atom+xml') + self.assertEquals(response.status_code, 200) + self.assertEquals( + Subscription.objects.filter( + hub='http://myhub.example.com/endpoint', + topic='http://publisher.example.com/happycats.xml', + verified=True).count(), 1) + self.assertEquals(len(self.requests), 1) + self.assertEquals(self.requests[0][0], + 'http://myhub.example.com/endpoint') + self.assertEquals(self.requests[0][1]['callback'], + 'http://testserver/1/') + self.assert_((self.requests[0][1]['lease_seconds'] - 86400) < 5) diff --git a/apps/push/urls.py b/apps/push/urls.py new file mode 100644 index 000000000..50988fa7e --- /dev/null +++ b/apps/push/urls.py @@ -0,0 +1,6 @@ +from django.conf.urls.defaults import * +from apps.push import views + +urlpatterns = patterns('', + url(r'^(\d+)/?$', views.pubsubhubbub_callback), +) diff --git a/apps/push/views.py b/apps/push/views.py new file mode 100644 index 000000000..d2b259b6b --- /dev/null +++ b/apps/push/views.py @@ -0,0 +1,62 @@ +# Adapted from djpubsubhubbub. See License: http://git.participatoryculture.org/djpubsubhubbub/tree/LICENSE + +from datetime import datetime +import feedparser + +from django.http import HttpResponse, Http404 +from django.shortcuts import get_object_or_404 + +from djpubsubhubbub.models import Subscription +from djpubsubhubbub.signals import verified, updated + +def callback(request, pk): + if request.method == 'GET': + mode = request.GET['hub.mode'] + topic = request.GET['hub.topic'] + challenge = request.GET['hub.challenge'] + lease_seconds = request.GET.get('hub.lease_seconds') + verify_token = request.GET.get('hub.verify_token', '') + + if mode == 'subscribe': + if not verify_token.startswith('subscribe'): + raise Http404 + subscription = get_object_or_404(Subscription, + pk=pk, + topic=topic, + verify_token=verify_token) + subscription.verified = True + subscription.set_expiration(int(lease_seconds)) + verified.send(sender=subscription) + + return HttpResponse(challenge, content_type='text/plain') + elif request.method == 'POST': + subscription = get_object_or_404(Subscription, pk=pk) + parsed = feedparser.parse(request.raw_post_data) + if parsed.feed.links: # single notification + hub_url = subscription.hub + self_url = subscription.topic + for link in parsed.feed.links: + if link['rel'] == 'hub': + hub_url = link['href'] + elif link['rel'] == 'self': + self_url = link['href'] + + needs_update = False + if hub_url and subscription.hub != hub_url: + # hub URL has changed; let's update our subscription + needs_update = True + elif self_url != subscription.topic: + # topic URL has changed + needs_update = True + + if needs_update: + expiration_time = subscription.lease_expires - datetime.now() + seconds = expiration_time.days*86400 + expiration_time.seconds + Subscription.objects.subscribe( + self_url, hub_url, + callback=request.build_absolute_uri(), + lease_seconds=seconds) + + updated.send(sender=subscription, update=parsed) + return HttpResponse('') + return Http404 diff --git a/utils/feed_fetcher.py b/utils/feed_fetcher.py index 2fa4cf61b..29bd00f41 100644 --- a/utils/feed_fetcher.py +++ b/utils/feed_fetcher.py @@ -435,9 +435,9 @@ class Dispatcher: r = redis.Redis(connection_pool=settings.REDIS_POOL) listeners_count = r.publish(str(feed.pk), 'story:new') if listeners_count: - logging.debug(" ---> [%-30s] Published to %s subscribers" % (unicode(feed)[:30], listeners_count)) + logging.debug(" ---> [%-30s] ~FMPublished to %s subscribers" % (unicode(feed)[:30], listeners_count)) except redis.ConnectionError: - logging.debug(" ***> [%-30s] Redis is unavailable for real-time." % (unicode(feed)[:30],)) + logging.debug(" ***> [%-30s] ~BMRedis is unavailable for real-time." % (unicode(feed)[:30],)) @timelimit(20) def count_unreads_for_subscribers(self, feed):