diff --git a/apps/push/models.py b/apps/push/models.py index 12e71ebe9..b625f26e5 100644 --- a/apps/push/models.py +++ b/apps/push/models.py @@ -2,7 +2,7 @@ from datetime import datetime, timedelta import feedparser -from urllib import urlencode +import requests import urllib2 from django.conf import settings @@ -38,7 +38,7 @@ class PushSubscriptionManager(models.Manager): if callback is None: try: - callback_path = reverse('pubsubhubbub_callback', + callback_path = reverse('push-callback', args=(subscription.pk,)) except Resolver404: raise TypeError( @@ -49,21 +49,20 @@ class PushSubscriptionManager(models.Manager): callback_path response = self._send_request(hub, { - 'mode': 'subscribe', - 'callback': callback, - 'topic': topic, - 'verify': ('async', 'sync'), - 'verify_token': subscription.generate_token('subscribe'), - 'lease_seconds': lease_seconds, - }) - - info = response.info() - if info.status == 204: + 'hub.mode': 'subscribe', + 'hub.callback': callback, + 'hub.topic': topic, + 'hub.verify': ['async', 'sync'], + 'hub.verify_token': subscription.generate_token('subscribe'), + 'hub.lease_seconds': lease_seconds, + }) + import pdb; pdb.set_trace() + if response.status_code == 204: subscription.verified = True - elif info.status == 202: # async verification + elif response.status_code == 202: # async verification subscription.verified = False else: - error = response.read() + error = response.content raise urllib2.URLError('error subscribing to %s on %s:\n%s' % ( topic, hub, error)) @@ -82,16 +81,7 @@ class PushSubscriptionManager(models.Manager): return link['href'] def _send_request(self, url, data): - def data_generator(): - for key, value in data.items(): - key = 'hub.' + key - if isinstance(value, (basestring, int)): - yield key, str(value) - else: - for subvalue in value: - yield key, value - encoded_data = urlencode(list(data_generator())) - return urllib2.urlopen(url, encoded_data) + return requests.post(url, data=data) class PushSubscription(models.Model): feed = models.OneToOneField(Feed, db_index=True, related_name='push') diff --git a/apps/push/tests.py b/apps/push/tests.py index 2005b472e..f3d5a9484 100644 --- a/apps/push/tests.py +++ b/apps/push/tests.py @@ -402,8 +402,7 @@ class PSHBUpdateTestCase(PSHBTestBase, TestCase): self.responses.append(MockResponse(204)) - response = self.client.post(reverse('pubsubhubbub_callback', - args=(sub.pk,)), + response = self.client.post(reverse('pubsubhubbub_callback', kwargs={'push_id': sub.pk}), update_data, 'application/atom+xml') self.assertEquals(response.status_code, 200) self.assertEquals( diff --git a/apps/push/urls.py b/apps/push/urls.py index 50988fa7e..e7bad94fc 100644 --- a/apps/push/urls.py +++ b/apps/push/urls.py @@ -2,5 +2,5 @@ from django.conf.urls.defaults import * from apps.push import views urlpatterns = patterns('', - url(r'^(\d+)/?$', views.pubsubhubbub_callback), + url(r'^(?P\d+)/?$', views.push_callback, name='push-callback'), ) diff --git a/apps/push/views.py b/apps/push/views.py index 35393883b..0f811295a 100644 --- a/apps/push/views.py +++ b/apps/push/views.py @@ -9,7 +9,7 @@ from django.shortcuts import get_object_or_404 from apps.push.models import PushSubscription from apps.push.signals import verified, updated -def callback(request, pk): +def push_callback(request, push_id): if request.method == 'GET': mode = request.GET['hub.mode'] topic = request.GET['hub.topic'] @@ -21,7 +21,7 @@ def callback(request, pk): if not verify_token.startswith('subscribe'): raise Http404 subscription = get_object_or_404(PushSubscription, - pk=pk, + pk=push_id, topic=topic, verify_token=verify_token) subscription.verified = True @@ -30,7 +30,7 @@ def callback(request, pk): return HttpResponse(challenge, content_type='text/plain') elif request.method == 'POST': - subscription = get_object_or_404(PushSubscription, pk=pk) + subscription = get_object_or_404(PushSubscription, pk=push_id) parsed = feedparser.parse(request.raw_post_data) if parsed.feed.links: # single notification hub_url = subscription.hub diff --git a/apps/rss_feeds/models.py b/apps/rss_feeds/models.py index 064b264b3..76d6a27a3 100644 --- a/apps/rss_feeds/models.py +++ b/apps/rss_feeds/models.py @@ -145,7 +145,8 @@ class Feed(models.Model): logging.debug("%s: %s" % (self.feed_address, duplicate_feed)) logging.debug(' ***> [%-30s] Feed deleted. Could not save: %s' % (unicode(self)[:30], e)) if duplicate_feed: - merge_feeds(self.pk, duplicate_feed[0].pk) + if self.pk != duplicate_feed[0].pk: + merge_feeds(self.pk, duplicate_feed[0].pk) return duplicate_feed[0] # Feed has been deleted. Just ignore it. return @@ -1066,7 +1067,7 @@ class Feed(models.Model): logging.debug(' ---> [%-30s] Scheduling feed fetch immediately...' % (unicode(self)[:30])) self.next_scheduled_update = datetime.datetime.utcnow() - self.save() + return self.save() # def calculate_collocations_story_content(self, # collocation_measures=TrigramAssocMeasures, diff --git a/urls.py b/urls.py index 02375722d..d489f1c7d 100644 --- a/urls.py +++ b/urls.py @@ -15,6 +15,7 @@ urlpatterns = patterns('', (r'^statistics/', include('apps.statistics.urls')), (r'^mobile/', include('apps.mobile.urls')), (r'^m/', include('apps.mobile.urls')), + (r'^push/', include('apps.push.urls')), url(r'^about/?', static_views.about, name='about'), url(r'^faq/?', static_views.faq, name='faq'), url(r'^api/?', static_views.api, name='api'), diff --git a/utils/feed_fetcher.py b/utils/feed_fetcher.py index d32570e54..003b65c39 100644 --- a/utils/feed_fetcher.py +++ b/utils/feed_fetcher.py @@ -125,20 +125,9 @@ class ProcessFeed: unicode(self.feed)[:30], self.fpf.bozo_exception, len(self.fpf.entries))) - if (not self.feed.is_push and hasattr(self.fpf, 'feeds') and - hasattr(self.fpf.feeds, 'links') and self.fpf.feed.links): - hub_url = None - self_url = self.feed.feed_link - for link in self.fpf.feed.links: - if link['rel'] == 'hub': - hub_url = link['href'] - elif link['rel'] == 'self': - self_url = link['href'] - if hub_url and self_url: - PushSubscription.objects.subscribe(self_url, hub_url, self.feed) if self.fpf.status == 304: - self.feed.save() + self.feed = self.feed.save() self.feed.save_feed_history(304, "Not modified") return FEED_SAME, ret_values @@ -148,9 +137,9 @@ class ProcessFeed: if not self.feed.known_good: self.feed.fetched_once = True logging.debug(" ---> [%-30s] ~SB~SK~FRFeed is %s'ing. Refetching..." % (unicode(self.feed)[:30], self.fpf.status)) - self.feed.schedule_feed_fetch_immediately() + self.feed = self.feed.schedule_feed_fetch_immediately() if not self.fpf.entries: - self.feed.save() + self.feed = self.feed.save() self.feed.save_feed_history(self.fpf.status, "HTTP Redirect") return FEED_ERRHTTP, ret_values if self.fpf.status >= 400: @@ -160,7 +149,7 @@ class ProcessFeed: fixed_feed = self.feed.check_feed_link_for_feed_address() if not fixed_feed: self.feed.save_feed_history(self.fpf.status, "HTTP Error") - self.feed.save() + self.feed = self.feed.save() return FEED_ERRHTTP, ret_values if not self.fpf.entries: if self.fpf.bozo and isinstance(self.fpf.bozo_exception, feedparser.NonXMLContentType): @@ -170,7 +159,7 @@ class ProcessFeed: fixed_feed = self.feed.check_feed_link_for_feed_address() if not fixed_feed: self.feed.save_feed_history(552, 'Non-xml feed', self.fpf.bozo_exception) - self.feed.save() + self.feed = self.feed.save() return FEED_ERRPARSE, ret_values elif self.fpf.bozo and isinstance(self.fpf.bozo_exception, xml.sax._exceptions.SAXException): logging.debug(" ---> [%-30s] ~SB~FRFeed has SAX/XML parsing issues. %s entries. Checking address..." % (unicode(self.feed)[:30], len(self.fpf.entries))) @@ -179,7 +168,7 @@ class ProcessFeed: fixed_feed = self.feed.check_feed_link_for_feed_address() if not fixed_feed: self.feed.save_feed_history(553, 'SAX Exception', self.fpf.bozo_exception) - self.feed.save() + self.feed = self.feed.save() return FEED_ERRPARSE, ret_values # the feed has changed (or it is the first time we parse it) @@ -215,7 +204,7 @@ class ProcessFeed: guids.append(entry.link) elif entry.get('title'): guids.append(entry.title) - self.feed.save() + self.feed = self.feed.save() # Compare new stories to existing stories, adding and updating start_date = datetime.datetime.utcnow() @@ -241,6 +230,20 @@ class ProcessFeed: # story_feed=self.feed # ).order_by('-story_date') ret_values = self.feed.add_update_stories(self.fpf.entries, existing_stories, verbose=self.options['verbose']) + + if (not self.feed.is_push and hasattr(self.fpf, 'feed') and + hasattr(self.fpf.feed, 'links') and self.fpf.feed.links): + hub_url = None + self_url = self.feed.feed_link + for link in self.fpf.feed.links: + if link['rel'] == 'hub': + hub_url = link['href'] + elif link['rel'] == 'self': + self_url = link['href'] + if hub_url and self_url: + logging.debug(u' ---> [%-30s] ~BB~SK~FWSubscribing to PuSH hub: %s' % ( + unicode(self.feed)[:30], hub_url)) + PushSubscription.objects.subscribe(self_url, feed=self.feed, hub=hub_url) logging.debug(u' ---> [%-30s] ~FYParsed Feed: new=~FG~SB%s~SN~FY up=~FY~SB%s~SN same=~FY%s err=~FR~SB%s' % ( unicode(self.feed)[:30], @@ -289,6 +292,7 @@ class Dispatcher: delta = None current_process = multiprocessing.current_process() identity = "X" + feed = None if current_process._identity: identity = current_process._identity[0] @@ -331,14 +335,13 @@ class Dispatcher: if ((fetched_feed and ret_feed == FEED_OK) or self.options['force']): pfeed = ProcessFeed(feed_id, fetched_feed, self.options) ret_feed, ret_entries = pfeed.process() - - feed = self.refresh_feed(feed_id) + feed = pfeed.feed if ret_entries.get(ENTRY_NEW) or self.options['force']: start = time.time() if not feed.known_good: feed.known_good = True - feed.save() + feed = feed.save() MUserStory.delete_old_stories(feed_id=feed.pk) try: self.count_unreads_for_subscribers(feed) @@ -353,13 +356,14 @@ class Dispatcher: except KeyboardInterrupt: break except urllib2.HTTPError, e: + logging.debug(' ---> [%-30s] ~FRFeed throws HTTP error: ~SB%s' % (unicode(feed_id)[:30], e.fp.read())) feed.save_feed_history(e.code, e.msg, e.fp.read()) fetched_feed = None except Feed.DoesNotExist, e: - logging.debug(' ---> [%-30s] Feed is now gone...' % (unicode(feed_id)[:30])) + logging.debug(' ---> [%-30s] ~FRFeed is now gone...' % (unicode(feed_id)[:30])) continue except TimeoutError, e: - logging.debug(' ---> [%-30s] Feed fetch timed out...' % (unicode(feed)[:30])) + logging.debug(' ---> [%-30s] ~FRFeed fetch timed out...' % (unicode(feed)[:30])) feed.save_feed_history(505, 'Timeout', '') fetched_feed = None except Exception, e: @@ -368,12 +372,12 @@ class Dispatcher: logging.error(tb) logging.debug('[%d] ! -------------------------' % (feed_id,)) ret_feed = FEED_ERREXC - feed = self.refresh_feed(feed_id) + feed = self.refresh_feed(feed.pk) feed.save_feed_history(500, "Error", tb) fetched_feed = None mail_feed_error_to_admin(feed, e) - feed = self.refresh_feed(feed_id) + feed = self.refresh_feed(feed.pk) if ((self.options['force']) or (random.random() > .9) or (fetched_feed and @@ -415,13 +419,13 @@ class Dispatcher: else: logging.debug(u' ---> [%-30s] ~FBSkipping page fetch: (%s on %s stories) %s' % (unicode(feed)[:30], self.feed_trans[ret_feed], feed.stories_last_month, '' if feed.has_page else ' [HAS NO PAGE]')) - feed = self.refresh_feed(feed_id) + feed = self.refresh_feed(feed.pk) delta = time.time() - start_time feed.last_load_time = round(delta) feed.fetched_once = True try: - feed.save() + feed = feed.save() except IntegrityError: logging.debug(" ---> [%-30s] ~FRIntegrityError on feed: %s" % (unicode(feed)[:30], feed.feed_address,)) @@ -438,7 +442,6 @@ class Dispatcher: self.entry_stats[key] += val if len(feed_queue) == 1: - feed = self.refresh_feed(feed_queue[0]) return feed # time_taken = datetime.datetime.utcnow() - self.time_start