Fixing the shit out of the feed fetcher's deleted feed handling. Also fixing PuSH support to correctly give parameters to PuSH server.

This commit is contained in:
Samuel Clay 2012-03-27 17:34:39 -07:00
parent a745e654ba
commit fd1e26a5b0
7 changed files with 54 additions and 60 deletions

View file

@ -2,7 +2,7 @@
from datetime import datetime, timedelta
import feedparser
from urllib import urlencode
import requests
import urllib2
from django.conf import settings
@ -38,7 +38,7 @@ class PushSubscriptionManager(models.Manager):
if callback is None:
try:
callback_path = reverse('pubsubhubbub_callback',
callback_path = reverse('push-callback',
args=(subscription.pk,))
except Resolver404:
raise TypeError(
@ -49,21 +49,20 @@ class PushSubscriptionManager(models.Manager):
callback_path
response = self._send_request(hub, {
'mode': 'subscribe',
'callback': callback,
'topic': topic,
'verify': ('async', 'sync'),
'verify_token': subscription.generate_token('subscribe'),
'lease_seconds': lease_seconds,
})
info = response.info()
if info.status == 204:
'hub.mode': 'subscribe',
'hub.callback': callback,
'hub.topic': topic,
'hub.verify': ['async', 'sync'],
'hub.verify_token': subscription.generate_token('subscribe'),
'hub.lease_seconds': lease_seconds,
})
import pdb; pdb.set_trace()
if response.status_code == 204:
subscription.verified = True
elif info.status == 202: # async verification
elif response.status_code == 202: # async verification
subscription.verified = False
else:
error = response.read()
error = response.content
raise urllib2.URLError('error subscribing to %s on %s:\n%s' % (
topic, hub, error))
@ -82,16 +81,7 @@ class PushSubscriptionManager(models.Manager):
return link['href']
def _send_request(self, url, data):
def data_generator():
for key, value in data.items():
key = 'hub.' + key
if isinstance(value, (basestring, int)):
yield key, str(value)
else:
for subvalue in value:
yield key, value
encoded_data = urlencode(list(data_generator()))
return urllib2.urlopen(url, encoded_data)
return requests.post(url, data=data)
class PushSubscription(models.Model):
feed = models.OneToOneField(Feed, db_index=True, related_name='push')

View file

@ -402,8 +402,7 @@ class PSHBUpdateTestCase(PSHBTestBase, TestCase):
self.responses.append(MockResponse(204))
response = self.client.post(reverse('pubsubhubbub_callback',
args=(sub.pk,)),
response = self.client.post(reverse('pubsubhubbub_callback', kwargs={'push_id': sub.pk}),
update_data, 'application/atom+xml')
self.assertEquals(response.status_code, 200)
self.assertEquals(

View file

@ -2,5 +2,5 @@ from django.conf.urls.defaults import *
from apps.push import views
urlpatterns = patterns('',
url(r'^(\d+)/?$', views.pubsubhubbub_callback),
url(r'^(?P<push_id>\d+)/?$', views.push_callback, name='push-callback'),
)

View file

@ -9,7 +9,7 @@ from django.shortcuts import get_object_or_404
from apps.push.models import PushSubscription
from apps.push.signals import verified, updated
def callback(request, pk):
def push_callback(request, push_id):
if request.method == 'GET':
mode = request.GET['hub.mode']
topic = request.GET['hub.topic']
@ -21,7 +21,7 @@ def callback(request, pk):
if not verify_token.startswith('subscribe'):
raise Http404
subscription = get_object_or_404(PushSubscription,
pk=pk,
pk=push_id,
topic=topic,
verify_token=verify_token)
subscription.verified = True
@ -30,7 +30,7 @@ def callback(request, pk):
return HttpResponse(challenge, content_type='text/plain')
elif request.method == 'POST':
subscription = get_object_or_404(PushSubscription, pk=pk)
subscription = get_object_or_404(PushSubscription, pk=push_id)
parsed = feedparser.parse(request.raw_post_data)
if parsed.feed.links: # single notification
hub_url = subscription.hub

View file

@ -145,7 +145,8 @@ class Feed(models.Model):
logging.debug("%s: %s" % (self.feed_address, duplicate_feed))
logging.debug(' ***> [%-30s] Feed deleted. Could not save: %s' % (unicode(self)[:30], e))
if duplicate_feed:
merge_feeds(self.pk, duplicate_feed[0].pk)
if self.pk != duplicate_feed[0].pk:
merge_feeds(self.pk, duplicate_feed[0].pk)
return duplicate_feed[0]
# Feed has been deleted. Just ignore it.
return
@ -1066,7 +1067,7 @@ class Feed(models.Model):
logging.debug(' ---> [%-30s] Scheduling feed fetch immediately...' % (unicode(self)[:30]))
self.next_scheduled_update = datetime.datetime.utcnow()
self.save()
return self.save()
# def calculate_collocations_story_content(self,
# collocation_measures=TrigramAssocMeasures,

View file

@ -15,6 +15,7 @@ urlpatterns = patterns('',
(r'^statistics/', include('apps.statistics.urls')),
(r'^mobile/', include('apps.mobile.urls')),
(r'^m/', include('apps.mobile.urls')),
(r'^push/', include('apps.push.urls')),
url(r'^about/?', static_views.about, name='about'),
url(r'^faq/?', static_views.faq, name='faq'),
url(r'^api/?', static_views.api, name='api'),

View file

@ -125,20 +125,9 @@ class ProcessFeed:
unicode(self.feed)[:30],
self.fpf.bozo_exception,
len(self.fpf.entries)))
if (not self.feed.is_push and hasattr(self.fpf, 'feeds') and
hasattr(self.fpf.feeds, 'links') and self.fpf.feed.links):
hub_url = None
self_url = self.feed.feed_link
for link in self.fpf.feed.links:
if link['rel'] == 'hub':
hub_url = link['href']
elif link['rel'] == 'self':
self_url = link['href']
if hub_url and self_url:
PushSubscription.objects.subscribe(self_url, hub_url, self.feed)
if self.fpf.status == 304:
self.feed.save()
self.feed = self.feed.save()
self.feed.save_feed_history(304, "Not modified")
return FEED_SAME, ret_values
@ -148,9 +137,9 @@ class ProcessFeed:
if not self.feed.known_good:
self.feed.fetched_once = True
logging.debug(" ---> [%-30s] ~SB~SK~FRFeed is %s'ing. Refetching..." % (unicode(self.feed)[:30], self.fpf.status))
self.feed.schedule_feed_fetch_immediately()
self.feed = self.feed.schedule_feed_fetch_immediately()
if not self.fpf.entries:
self.feed.save()
self.feed = self.feed.save()
self.feed.save_feed_history(self.fpf.status, "HTTP Redirect")
return FEED_ERRHTTP, ret_values
if self.fpf.status >= 400:
@ -160,7 +149,7 @@ class ProcessFeed:
fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(self.fpf.status, "HTTP Error")
self.feed.save()
self.feed = self.feed.save()
return FEED_ERRHTTP, ret_values
if not self.fpf.entries:
if self.fpf.bozo and isinstance(self.fpf.bozo_exception, feedparser.NonXMLContentType):
@ -170,7 +159,7 @@ class ProcessFeed:
fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(552, 'Non-xml feed', self.fpf.bozo_exception)
self.feed.save()
self.feed = self.feed.save()
return FEED_ERRPARSE, ret_values
elif self.fpf.bozo and isinstance(self.fpf.bozo_exception, xml.sax._exceptions.SAXException):
logging.debug(" ---> [%-30s] ~SB~FRFeed has SAX/XML parsing issues. %s entries. Checking address..." % (unicode(self.feed)[:30], len(self.fpf.entries)))
@ -179,7 +168,7 @@ class ProcessFeed:
fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(553, 'SAX Exception', self.fpf.bozo_exception)
self.feed.save()
self.feed = self.feed.save()
return FEED_ERRPARSE, ret_values
# the feed has changed (or it is the first time we parse it)
@ -215,7 +204,7 @@ class ProcessFeed:
guids.append(entry.link)
elif entry.get('title'):
guids.append(entry.title)
self.feed.save()
self.feed = self.feed.save()
# Compare new stories to existing stories, adding and updating
start_date = datetime.datetime.utcnow()
@ -241,6 +230,20 @@ class ProcessFeed:
# story_feed=self.feed
# ).order_by('-story_date')
ret_values = self.feed.add_update_stories(self.fpf.entries, existing_stories, verbose=self.options['verbose'])
if (not self.feed.is_push and hasattr(self.fpf, 'feed') and
hasattr(self.fpf.feed, 'links') and self.fpf.feed.links):
hub_url = None
self_url = self.feed.feed_link
for link in self.fpf.feed.links:
if link['rel'] == 'hub':
hub_url = link['href']
elif link['rel'] == 'self':
self_url = link['href']
if hub_url and self_url:
logging.debug(u' ---> [%-30s] ~BB~SK~FWSubscribing to PuSH hub: %s' % (
unicode(self.feed)[:30], hub_url))
PushSubscription.objects.subscribe(self_url, feed=self.feed, hub=hub_url)
logging.debug(u' ---> [%-30s] ~FYParsed Feed: new=~FG~SB%s~SN~FY up=~FY~SB%s~SN same=~FY%s err=~FR~SB%s' % (
unicode(self.feed)[:30],
@ -289,6 +292,7 @@ class Dispatcher:
delta = None
current_process = multiprocessing.current_process()
identity = "X"
feed = None
if current_process._identity:
identity = current_process._identity[0]
@ -331,14 +335,13 @@ class Dispatcher:
if ((fetched_feed and ret_feed == FEED_OK) or self.options['force']):
pfeed = ProcessFeed(feed_id, fetched_feed, self.options)
ret_feed, ret_entries = pfeed.process()
feed = self.refresh_feed(feed_id)
feed = pfeed.feed
if ret_entries.get(ENTRY_NEW) or self.options['force']:
start = time.time()
if not feed.known_good:
feed.known_good = True
feed.save()
feed = feed.save()
MUserStory.delete_old_stories(feed_id=feed.pk)
try:
self.count_unreads_for_subscribers(feed)
@ -353,13 +356,14 @@ class Dispatcher:
except KeyboardInterrupt:
break
except urllib2.HTTPError, e:
logging.debug(' ---> [%-30s] ~FRFeed throws HTTP error: ~SB%s' % (unicode(feed_id)[:30], e.fp.read()))
feed.save_feed_history(e.code, e.msg, e.fp.read())
fetched_feed = None
except Feed.DoesNotExist, e:
logging.debug(' ---> [%-30s] Feed is now gone...' % (unicode(feed_id)[:30]))
logging.debug(' ---> [%-30s] ~FRFeed is now gone...' % (unicode(feed_id)[:30]))
continue
except TimeoutError, e:
logging.debug(' ---> [%-30s] Feed fetch timed out...' % (unicode(feed)[:30]))
logging.debug(' ---> [%-30s] ~FRFeed fetch timed out...' % (unicode(feed)[:30]))
feed.save_feed_history(505, 'Timeout', '')
fetched_feed = None
except Exception, e:
@ -368,12 +372,12 @@ class Dispatcher:
logging.error(tb)
logging.debug('[%d] ! -------------------------' % (feed_id,))
ret_feed = FEED_ERREXC
feed = self.refresh_feed(feed_id)
feed = self.refresh_feed(feed.pk)
feed.save_feed_history(500, "Error", tb)
fetched_feed = None
mail_feed_error_to_admin(feed, e)
feed = self.refresh_feed(feed_id)
feed = self.refresh_feed(feed.pk)
if ((self.options['force']) or
(random.random() > .9) or
(fetched_feed and
@ -415,13 +419,13 @@ class Dispatcher:
else:
logging.debug(u' ---> [%-30s] ~FBSkipping page fetch: (%s on %s stories) %s' % (unicode(feed)[:30], self.feed_trans[ret_feed], feed.stories_last_month, '' if feed.has_page else ' [HAS NO PAGE]'))
feed = self.refresh_feed(feed_id)
feed = self.refresh_feed(feed.pk)
delta = time.time() - start_time
feed.last_load_time = round(delta)
feed.fetched_once = True
try:
feed.save()
feed = feed.save()
except IntegrityError:
logging.debug(" ---> [%-30s] ~FRIntegrityError on feed: %s" % (unicode(feed)[:30], feed.feed_address,))
@ -438,7 +442,6 @@ class Dispatcher:
self.entry_stats[key] += val
if len(feed_queue) == 1:
feed = self.refresh_feed(feed_queue[0])
return feed
# time_taken = datetime.datetime.utcnow() - self.time_start