Stubbing in PuSH support. Need to work it in with Feeds and upgrade rel='hub'.

This commit is contained in:
Samuel Clay 2012-03-27 14:58:13 -07:00
parent 1788d26374
commit daef8f3841
7 changed files with 675 additions and 2 deletions

0
apps/push/__init__.py Normal file
View file

130
apps/push/models.py Normal file
View file

@ -0,0 +1,130 @@
# Adapted from djpubsubhubbub. See License: http://git.participatoryculture.org/djpubsubhubbub/tree/LICENSE
from datetime import datetime, timedelta
import feedparser
from urllib import urlencode
import urllib2
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.urlresolvers import reverse, Resolver404
from django.db import models
from django.utils.hashcompat import sha_constructor
from djpubsubhubbub import signals
DEFAULT_LEASE_SECONDS = 2592000 # 30 days in seconds
class SubscriptionManager(models.Manager):
def subscribe(self, topic, hub=None, callback=None,
lease_seconds=None):
if hub is None:
hub = self._get_hub(topic)
if hub is None:
raise TypeError(
'hub cannot be None if the feed does not provide it')
if lease_seconds is None:
lease_seconds = getattr(settings, 'PUBSUBHUBBUB_LEASE_SECONDS',
DEFAULT_LEASE_SECONDS)
subscription, created = self.get_or_create(
hub=hub, topic=topic)
signals.pre_subscribe.send(sender=subscription, created=created)
subscription.set_expiration(lease_seconds)
if callback is None:
try:
callback_path = reverse('pubsubhubbub_callback',
args=(subscription.pk,))
except Resolver404:
raise TypeError(
'callback cannot be None if there is not a reverable URL')
else:
callback = 'http://' + Site.objects.get_current() + \
callback_path
response = self._send_request(hub, {
'mode': 'subscribe',
'callback': callback,
'topic': topic,
'verify': ('async', 'sync'),
'verify_token': subscription.generate_token('subscribe'),
'lease_seconds': lease_seconds,
})
info = response.info()
if info.status == 204:
subscription.verified = True
elif info.status == 202: # async verification
subscription.verified = False
else:
error = response.read()
raise urllib2.URLError('error subscribing to %s on %s:\n%s' % (
topic, hub, error))
subscription.save()
if subscription.verified:
signals.verified.send(sender=subscription)
return subscription
def _get_hub(self, topic):
parsed = feedparser.parse(topic)
for link in parsed.feed.links:
if link['rel'] == 'hub':
return link['href']
def _send_request(self, url, data):
def data_generator():
for key, value in data.items():
key = 'hub.' + key
if isinstance(value, (basestring, int)):
yield key, str(value)
else:
for subvalue in value:
yield key, value
encoded_data = urlencode(list(data_generator()))
return urllib2.urlopen(url, encoded_data)
class Subscription(models.Model):
hub = models.URLField()
topic = models.URLField()
verified = models.BooleanField(default=False)
verify_token = models.CharField(max_length=60)
lease_expires = models.DateTimeField(default=datetime.now)
objects = SubscriptionManager()
# class Meta:
# unique_together = [
# ('hub', 'topic')
# ]
def set_expiration(self, lease_seconds):
self.lease_expires = datetime.now() + timedelta(
seconds=lease_seconds)
self.save()
def generate_token(self, mode):
assert self.pk is not None, \
'Subscription must be saved before generating token'
token = mode[:20] + sha_constructor('%s%i%s' % (
settings.SECRET_KEY, self.pk, mode)).hexdigest()
self.verify_token = token
self.save()
return token
def __unicode__(self):
if self.verified:
verified = u'verified'
else:
verified = u'unverified'
return u'to %s on %s: %s' % (
self.topic, self.hub, verified)
def __str__(self):
return str(unicode(self))

7
apps/push/signals.py Normal file
View file

@ -0,0 +1,7 @@
# Adapted from djpubsubhubbub. See License: http://git.participatoryculture.org/djpubsubhubbub/tree/LICENSE
from django.dispatch import Signal
pre_subscribe = Signal(providing_args=['created'])
verified = Signal()
updated = Signal(providing_args=['update'])

468
apps/push/tests.py Normal file
View file

@ -0,0 +1,468 @@
# Copyright 2009 - Participatory Culture Foundation
#
# This file is part of djpubsubhubbub.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from datetime import datetime, timedelta
import urllib2
from django.core.urlresolvers import reverse
from django.test import TestCase
from djpubsubhubbub.models import Subscription, SubscriptionManager
from djpubsubhubbub.signals import pre_subscribe, verified, updated
class MockResponse(object):
def __init__(self, status, data=None):
self.status = status
self.data = data
def info(self):
return self
def read(self):
if self.data is None:
return ''
data, self.data = self.data, None
return data
class PSHBTestBase:
urls = 'djpubsubhubbub.urls'
def setUp(self):
self._old_send_request = SubscriptionManager._send_request
SubscriptionManager._send_request = self._send_request
self.responses = []
self.requests = []
self.signals = []
for connecter in pre_subscribe, verified, updated:
def callback(signal=None, **kwargs):
self.signals.append((signal, kwargs))
connecter.connect(callback, dispatch_uid=connecter, weak=False)
def tearDown(self):
SubscriptionManager._send_request = self._old_send_request
del self._old_send_request
for signal in pre_subscribe, verified:
signal.disconnect(dispatch_uid=signal)
def _send_request(self, url, data):
self.requests.append((url, data))
return self.responses.pop()
class PSHBSubscriptionManagerTest(PSHBTestBase, TestCase):
def test_sync_verify(self):
"""
If the hub returns a 204 response, the subscription is verified and
active.
"""
self.responses.append(MockResponse(204))
sub = Subscription.objects.subscribe('topic', 'hub', 'callback', 2000)
self.assertEquals(len(self.signals), 2)
self.assertEquals(self.signals[0], (pre_subscribe, {'sender': sub,
'created': True}))
self.assertEquals(self.signals[1], (verified, {'sender': sub}))
self.assertEquals(sub.hub, 'hub')
self.assertEquals(sub.topic, 'topic')
self.assertEquals(sub.verified, True)
rough_expires = datetime.now() + timedelta(seconds=2000)
self.assert_(abs(sub.lease_expires - rough_expires).seconds < 5,
'lease more than 5 seconds off')
self.assertEquals(len(self.requests), 1)
request = self.requests[0]
self.assertEquals(request[0], 'hub')
self.assertEquals(request[1]['mode'], 'subscribe')
self.assertEquals(request[1]['topic'], 'topic')
self.assertEquals(request[1]['callback'], 'callback')
self.assertEquals(request[1]['verify'], ('async', 'sync'))
self.assertEquals(request[1]['verify_token'], sub.verify_token)
self.assertEquals(request[1]['lease_seconds'], 2000)
def test_async_verify(self):
"""
If the hub returns a 202 response, we should not assume the
subscription is verified.
"""
self.responses.append(MockResponse(202))
sub = Subscription.objects.subscribe('topic', 'hub', 'callback', 2000)
self.assertEquals(len(self.signals), 1)
self.assertEquals(self.signals[0], (pre_subscribe, {'sender': sub,
'created': True}))
self.assertEquals(sub.hub, 'hub')
self.assertEquals(sub.topic, 'topic')
self.assertEquals(sub.verified, False)
rough_expires = datetime.now() + timedelta(seconds=2000)
self.assert_(abs(sub.lease_expires - rough_expires).seconds < 5,
'lease more than 5 seconds off')
self.assertEquals(len(self.requests), 1)
request = self.requests[0]
self.assertEquals(request[0], 'hub')
self.assertEquals(request[1]['mode'], 'subscribe')
self.assertEquals(request[1]['topic'], 'topic')
self.assertEquals(request[1]['callback'], 'callback')
self.assertEquals(request[1]['verify'], ('async', 'sync'))
self.assertEquals(request[1]['verify_token'], sub.verify_token)
self.assertEquals(request[1]['lease_seconds'], 2000)
def test_least_seconds_default(self):
"""
If the number of seconds to lease the subscription is not specified, it
should default to 2592000 (30 days).
"""
self.responses.append(MockResponse(202))
sub = Subscription.objects.subscribe('topic', 'hub', 'callback')
rough_expires = datetime.now() + timedelta(seconds=2592000)
self.assert_(abs(sub.lease_expires - rough_expires).seconds < 5,
'lease more than 5 seconds off')
self.assertEquals(len(self.requests), 1)
request = self.requests[0]
self.assertEquals(request[1]['lease_seconds'], 2592000)
def test_error_on_subscribe_raises_URLError(self):
"""
If a non-202/204 status is returned, raise a URLError.
"""
self.responses.append(MockResponse(500, 'error data'))
try:
Subscription.objects.subscribe('topic', 'hub', 'callback')
except urllib2.URLError, e:
self.assertEquals(e.reason,
'error subscribing to topic on hub:\nerror data')
else:
self.fail('subscription did not raise URLError exception')
class PSHBCallbackViewTestCase(PSHBTestBase, TestCase):
def test_verify(self):
"""
Getting the callback from the server should verify the subscription.
"""
sub = Subscription.objects.create(
topic='topic',
hub='hub',
verified=False)
verify_token = sub.generate_token('subscribe')
response = self.client.get(reverse('pubsubhubbub_callback',
args=(sub.pk,)),
{'hub.mode': 'subscribe',
'hub.topic': sub.topic,
'hub.challenge': 'challenge',
'hub.lease_seconds': 2000,
'hub.verify_token': verify_token})
self.assertEquals(response.status_code, 200)
self.assertEquals(response.content, 'challenge')
sub = Subscription.objects.get(pk=sub.pk)
self.assertEquals(sub.verified, True)
self.assertEquals(len(self.signals), 1)
self.assertEquals(self.signals[0], (verified, {'sender': sub}))
def test_404(self):
"""
Various things sould return a 404:
* invalid primary key in the URL
* token doesn't start with 'subscribe'
* subscription doesn't exist
* token doesn't match the subscription
"""
sub = Subscription.objects.create(
topic='topic',
hub='hub',
verified=False)
verify_token = sub.generate_token('subscribe')
response = self.client.get(reverse('pubsubhubbub_callback',
args=(0,)),
{'hub.mode': 'subscribe',
'hub.topic': sub.topic,
'hub.challenge': 'challenge',
'hub.lease_seconds': 2000,
'hub.verify_token': verify_token[1:]})
self.assertEquals(response.status_code, 404)
self.assertEquals(len(self.signals), 0)
response = self.client.get(reverse('pubsubhubbub_callback',
args=(sub.pk,)),
{'hub.mode': 'subscribe',
'hub.topic': sub.topic,
'hub.challenge': 'challenge',
'hub.lease_seconds': 2000,
'hub.verify_token': verify_token[1:]})
self.assertEquals(response.status_code, 404)
self.assertEquals(len(self.signals), 0)
response = self.client.get(reverse('pubsubhubbub_callback',
args=(sub.pk,)),
{'hub.mode': 'subscribe',
'hub.topic': sub.topic + 'extra',
'hub.challenge': 'challenge',
'hub.lease_seconds': 2000,
'hub.verify_token': verify_token})
self.assertEquals(response.status_code, 404)
self.assertEquals(len(self.signals), 0)
response = self.client.get(reverse('pubsubhubbub_callback',
args=(sub.pk,)),
{'hub.mode': 'subscribe',
'hub.topic': sub.topic,
'hub.challenge': 'challenge',
'hub.lease_seconds': 2000,
'hub.verify_token': verify_token[:-5]})
self.assertEquals(response.status_code, 404)
self.assertEquals(len(self.signals), 0)
class PSHBUpdateTestCase(PSHBTestBase, TestCase):
def test_update(self):
# this data comes from
# http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.1.html#anchor3
update_data = """<?xml version="1.0"?>
<atom:feed>
<!-- Normally here would be source, title, etc ... -->
<link rel="hub" href="http://myhub.example.com/endpoint" />
<link rel="self" href="http://publisher.example.com/happycats.xml" />
<updated>2008-08-11T02:15:01Z</updated>
<!-- Example of a full entry. -->
<entry>
<title>Heathcliff</title>
<link href="http://publisher.example.com/happycat25.xml" />
<id>http://publisher.example.com/happycat25.xml</id>
<updated>2008-08-11T02:15:01Z</updated>
<content>
What a happy cat. Full content goes here.
</content>
</entry>
<!-- Example of an entity that isn't full/is truncated. This is implied
by the lack of a <content> element and a <summary> element instead. -->
<entry >
<title>Heathcliff</title>
<link href="http://publisher.example.com/happycat25.xml" />
<id>http://publisher.example.com/happycat25.xml</id>
<updated>2008-08-11T02:15:01Z</updated>
<summary>
What a happy cat!
</summary>
</entry>
<!-- Meta-data only; implied by the lack of <content> and
<summary> elements. -->
<entry>
<title>Garfield</title>
<link rel="alternate" href="http://publisher.example.com/happycat24.xml" />
<id>http://publisher.example.com/happycat25.xml</id>
<updated>2008-08-11T02:15:01Z</updated>
</entry>
<!-- Context entry that's meta-data only and not new. Implied because the
update time on this entry is before the //atom:feed/updated time. -->
<entry>
<title>Nermal</title>
<link rel="alternate" href="http://publisher.example.com/happycat23s.xml" />
<id>http://publisher.example.com/happycat25.xml</id>
<updated>2008-07-10T12:28:13Z</updated>
</entry>
</atom:feed>
"""
sub = Subscription.objects.create(
hub="http://myhub.example.com/endpoint",
topic="http://publisher.example.com/happycats.xml")
callback_data = []
updated.connect(
lambda sender=None, update=None, **kwargs: callback_data.append(
(sender, update)),
weak=False)
response = self.client.post(reverse('pubsubhubbub_callback',
args=(sub.pk,)),
update_data, 'application/atom+xml')
self.assertEquals(response.status_code, 200)
self.assertEquals(len(callback_data), 1)
sender, update = callback_data[0]
self.assertEquals(sender, sub)
self.assertEquals(len(update.entries), 4)
self.assertEquals(update.entries[0].id,
'http://publisher.example.com/happycat25.xml')
self.assertEquals(update.entries[1].id,
'http://publisher.example.com/happycat25.xml')
self.assertEquals(update.entries[2].id,
'http://publisher.example.com/happycat25.xml')
self.assertEquals(update.entries[3].id,
'http://publisher.example.com/happycat25.xml')
def test_update_with_changed_hub(self):
update_data = """<?xml version="1.0"?>
<atom:feed>
<!-- Normally here would be source, title, etc ... -->
<link rel="hub" href="http://myhub.example.com/endpoint" />
<link rel="self" href="http://publisher.example.com/happycats.xml" />
<updated>2008-08-11T02:15:01Z</updated>
<entry>
<title>Heathcliff</title>
<link href="http://publisher.example.com/happycat25.xml" />
<id>http://publisher.example.com/happycat25.xml</id>
<updated>2008-08-11T02:15:01Z</updated>
<content>
What a happy cat. Full content goes here.
</content>
</entry>
</atom:feed>
"""
sub = Subscription.objects.create(
hub="hub",
topic="http://publisher.example.com/happycats.xml",
lease_expires=datetime.now() + timedelta(days=1))
callback_data = []
updated.connect(
lambda sender=None, update=None, **kwargs: callback_data.append(
(sender, update)),
weak=False)
self.responses.append(MockResponse(204))
response = self.client.post(reverse('pubsubhubbub_callback',
args=(sub.pk,)),
update_data, 'application/atom+xml')
self.assertEquals(response.status_code, 200)
self.assertEquals(
Subscription.objects.filter(
hub='http://myhub.example.com/endpoint',
topic='http://publisher.example.com/happycats.xml',
verified=True).count(), 1)
self.assertEquals(len(self.requests), 1)
self.assertEquals(self.requests[0][0],
'http://myhub.example.com/endpoint')
self.assertEquals(self.requests[0][1]['callback'],
'http://testserver/1/')
self.assert_((self.requests[0][1]['lease_seconds'] - 86400) < 5)
def test_update_with_changed_self(self):
update_data = """<?xml version="1.0"?>
<atom:feed>
<!-- Normally here would be source, title, etc ... -->
<link rel="hub" href="http://myhub.example.com/endpoint" />
<link rel="self" href="http://publisher.example.com/happycats.xml" />
<updated>2008-08-11T02:15:01Z</updated>
<entry>
<title>Heathcliff</title>
<link href="http://publisher.example.com/happycat25.xml" />
<id>http://publisher.example.com/happycat25.xml</id>
<updated>2008-08-11T02:15:01Z</updated>
<content>
What a happy cat. Full content goes here.
</content>
</entry>
</atom:feed>
"""
sub = Subscription.objects.create(
hub="http://myhub.example.com/endpoint",
topic="topic",
lease_expires=datetime.now() + timedelta(days=1))
callback_data = []
updated.connect(
lambda sender=None, update=None, **kwargs: callback_data.append(
(sender, update)),
weak=False)
self.responses.append(MockResponse(204))
response = self.client.post(reverse('pubsubhubbub_callback',
args=(sub.pk,)),
update_data, 'application/atom+xml')
self.assertEquals(response.status_code, 200)
self.assertEquals(
Subscription.objects.filter(
hub='http://myhub.example.com/endpoint',
topic='http://publisher.example.com/happycats.xml',
verified=True).count(), 1)
self.assertEquals(len(self.requests), 1)
self.assertEquals(self.requests[0][0],
'http://myhub.example.com/endpoint')
self.assertEquals(self.requests[0][1]['callback'],
'http://testserver/1/')
self.assert_((self.requests[0][1]['lease_seconds'] - 86400) < 5)
def test_update_with_changed_hub_and_self(self):
update_data = """<?xml version="1.0"?>
<atom:feed>
<!-- Normally here would be source, title, etc ... -->
<link rel="hub" href="http://myhub.example.com/endpoint" />
<link rel="self" href="http://publisher.example.com/happycats.xml" />
<updated>2008-08-11T02:15:01Z</updated>
<entry>
<title>Heathcliff</title>
<link href="http://publisher.example.com/happycat25.xml" />
<id>http://publisher.example.com/happycat25.xml</id>
<updated>2008-08-11T02:15:01Z</updated>
<content>
What a happy cat. Full content goes here.
</content>
</entry>
</atom:feed>
"""
sub = Subscription.objects.create(
hub="hub",
topic="topic",
lease_expires=datetime.now() + timedelta(days=1))
callback_data = []
updated.connect(
lambda sender=None, update=None, **kwargs: callback_data.append(
(sender, update)),
weak=False)
self.responses.append(MockResponse(204))
response = self.client.post(reverse('pubsubhubbub_callback',
args=(sub.pk,)),
update_data, 'application/atom+xml')
self.assertEquals(response.status_code, 200)
self.assertEquals(
Subscription.objects.filter(
hub='http://myhub.example.com/endpoint',
topic='http://publisher.example.com/happycats.xml',
verified=True).count(), 1)
self.assertEquals(len(self.requests), 1)
self.assertEquals(self.requests[0][0],
'http://myhub.example.com/endpoint')
self.assertEquals(self.requests[0][1]['callback'],
'http://testserver/1/')
self.assert_((self.requests[0][1]['lease_seconds'] - 86400) < 5)

6
apps/push/urls.py Normal file
View file

@ -0,0 +1,6 @@
from django.conf.urls.defaults import *
from apps.push import views
urlpatterns = patterns('',
url(r'^(\d+)/?$', views.pubsubhubbub_callback),
)

62
apps/push/views.py Normal file
View file

@ -0,0 +1,62 @@
# Adapted from djpubsubhubbub. See License: http://git.participatoryculture.org/djpubsubhubbub/tree/LICENSE
from datetime import datetime
import feedparser
from django.http import HttpResponse, Http404
from django.shortcuts import get_object_or_404
from djpubsubhubbub.models import Subscription
from djpubsubhubbub.signals import verified, updated
def callback(request, pk):
if request.method == 'GET':
mode = request.GET['hub.mode']
topic = request.GET['hub.topic']
challenge = request.GET['hub.challenge']
lease_seconds = request.GET.get('hub.lease_seconds')
verify_token = request.GET.get('hub.verify_token', '')
if mode == 'subscribe':
if not verify_token.startswith('subscribe'):
raise Http404
subscription = get_object_or_404(Subscription,
pk=pk,
topic=topic,
verify_token=verify_token)
subscription.verified = True
subscription.set_expiration(int(lease_seconds))
verified.send(sender=subscription)
return HttpResponse(challenge, content_type='text/plain')
elif request.method == 'POST':
subscription = get_object_or_404(Subscription, pk=pk)
parsed = feedparser.parse(request.raw_post_data)
if parsed.feed.links: # single notification
hub_url = subscription.hub
self_url = subscription.topic
for link in parsed.feed.links:
if link['rel'] == 'hub':
hub_url = link['href']
elif link['rel'] == 'self':
self_url = link['href']
needs_update = False
if hub_url and subscription.hub != hub_url:
# hub URL has changed; let's update our subscription
needs_update = True
elif self_url != subscription.topic:
# topic URL has changed
needs_update = True
if needs_update:
expiration_time = subscription.lease_expires - datetime.now()
seconds = expiration_time.days*86400 + expiration_time.seconds
Subscription.objects.subscribe(
self_url, hub_url,
callback=request.build_absolute_uri(),
lease_seconds=seconds)
updated.send(sender=subscription, update=parsed)
return HttpResponse('')
return Http404

View file

@ -435,9 +435,9 @@ class Dispatcher:
r = redis.Redis(connection_pool=settings.REDIS_POOL)
listeners_count = r.publish(str(feed.pk), 'story:new')
if listeners_count:
logging.debug(" ---> [%-30s] Published to %s subscribers" % (unicode(feed)[:30], listeners_count))
logging.debug(" ---> [%-30s] ~FMPublished to %s subscribers" % (unicode(feed)[:30], listeners_count))
except redis.ConnectionError:
logging.debug(" ***> [%-30s] Redis is unavailable for real-time." % (unicode(feed)[:30],))
logging.debug(" ***> [%-30s] ~BMRedis is unavailable for real-time." % (unicode(feed)[:30],))
@timelimit(20)
def count_unreads_for_subscribers(self, feed):