diff --git a/apps/push/views.py b/apps/push/views.py index a9d7b870e..e16f84968 100644 --- a/apps/push/views.py +++ b/apps/push/views.py @@ -58,7 +58,7 @@ def push_callback(request, push_id): callback=request.build_absolute_uri(), lease_seconds=seconds) - subscription.feed.update(fpf=parsed) + subscription.feed.queue_pushed_feed_xml(request.raw_post_data) updated.send(sender=subscription, update=parsed) return HttpResponse('') diff --git a/apps/reader/models.py b/apps/reader/models.py index 6b1ef1122..8b5d50991 100644 --- a/apps/reader/models.py +++ b/apps/reader/models.py @@ -364,7 +364,8 @@ class UserSubscription(models.Model): # Switch read stories user_stories = MUserStory.objects(user_id=self.user.pk, feed_id=old_feed.pk) - logging.info(" ---> %s read stories" % user_stories.count()) + if user_stories.count() > 0: + logging.info(" ---> %s read stories" % user_stories.count()) for user_story in user_stories: user_story.feed_id = new_feed.pk duplicate_story = user_story.story diff --git a/apps/rss_feeds/models.py b/apps/rss_feeds/models.py index b5c6bebe7..32621b414 100644 --- a/apps/rss_feeds/models.py +++ b/apps/rss_feeds/models.py @@ -16,7 +16,7 @@ from django.conf import settings from django.db.models.query import QuerySet from mongoengine.queryset import OperationError from mongoengine.base import ValidationError -from apps.rss_feeds.tasks import UpdateFeeds +from apps.rss_feeds.tasks import UpdateFeeds, PushFeeds from celery.task import Task from utils import json_functions as json from utils import feedfinder, feedparser @@ -171,18 +171,17 @@ class Feed(models.Model): def by_url(address): feed = cls.objects.filter(**criteria('feed_address', address)).order_by('-num_subscribers') - if not feed: - feed = cls.objects.filter(**criteria('feed_link', address)).order_by('-num_subscribers') if not feed: duplicate_feed = DuplicateFeed.objects.filter(**criteria('duplicate_address', address)) if duplicate_feed and len(duplicate_feed) > offset: feed = [duplicate_feed[offset].feed] + if not feed and aggressive: + feed = cls.objects.filter(**criteria('feed_link', address)).order_by('-num_subscribers') return feed # Normalize and check for feed_address, dupes, and feed_link - if not aggressive: - url = urlnorm.normalize(url) + url = urlnorm.normalize(url) feed = by_url(url) # Create if it looks good @@ -634,6 +633,7 @@ class Feed(models.Model): 'quick': kwargs.get('quick'), 'debug': kwargs.get('debug'), 'fpf': kwargs.get('fpf'), + 'feed_xml': kwargs.get('feed_xml'), } disp = feed_fetcher.Dispatcher(options, 1) disp.add_jobs([[self.pk]]) @@ -1039,7 +1039,8 @@ class Feed(models.Model): if self.active_premium_subscribers > 0: total = min(total, 60) # 1 hour minimum for premiums - + if self.is_push: + total = total * 20 if verbose: print "[%s] %s (%s/%s/%s/%s), %s, %s: %s" % (self, updates_per_day_delay, self.num_subscribers, self.active_subscribers, @@ -1054,7 +1055,7 @@ class Feed(models.Model): if error_count: total = total * error_count - logging.debug(' ---> [%-30s] ~FBScheduling feed fetch geometrically: ~SB%s errors, %s non-errors. Total: %s' % (unicode(self)[:30], error_count, non_error_count, total)) + logging.debug(' ---> [%-30s] ~FBScheduling feed fetch geometrically: ~SB%s/%s errors. Time: %s min' % (unicode(self)[:30], error_count, non_error_count, total)) next_scheduled_update = datetime.datetime.utcnow() + datetime.timedelta( minutes = total + random_factor) @@ -1079,6 +1080,19 @@ class Feed(models.Model): return self.is_push = push.verified self.save() + + def queue_pushed_feed_xml(self, xml): + + logging.debug(' ---> [%-30s] Queuing pushed stories...' % (unicode(self)[:30])) + + publisher = Task.get_publisher() + + self.queued_date = datetime.datetime.utcnow() + self.set_next_scheduled_update() + + PushFeeds.apply_async(args=(self.pk, xml), queue='push_feeds', publisher=publisher) + + publisher.connection.close() # def calculate_collocations_story_content(self, # collocation_measures=TrigramAssocMeasures, diff --git a/apps/rss_feeds/tasks.py b/apps/rss_feeds/tasks.py index 838ffce7f..6a425273f 100644 --- a/apps/rss_feeds/tasks.py +++ b/apps/rss_feeds/tasks.py @@ -43,3 +43,17 @@ class NewFeeds(Task): for feed_pk in feed_pks: feed = Feed.objects.get(pk=feed_pk) feed.update(options=options) + +class PushFeeds(Task): + name = 'push-feeds' + max_retries = 0 + ignore_result = True + + def run(self, feed_id, xml, **kwargs): + from apps.rss_feeds.models import Feed + + options = { + 'feed_xml': xml + } + feed = Feed.objects.get(pk=feed_id) + feed.update(options=options) diff --git a/config/supervisor_celeryd.conf b/config/supervisor_celeryd.conf index e9461d2ac..426bfb709 100644 --- a/config/supervisor_celeryd.conf +++ b/config/supervisor_celeryd.conf @@ -1,5 +1,5 @@ [program:celery] -command=/home/sclay/newsblur/manage.py celeryd --loglevel=INFO -Q new_feeds,update_feeds +command=/home/sclay/newsblur/manage.py celeryd --loglevel=INFO -Q new_feeds,push_feeds,update_feeds directory=/home/sclay/newsblur user=sclay numprocs=1 diff --git a/settings.py b/settings.py index 0919a830d..8b935c637 100644 --- a/settings.py +++ b/settings.py @@ -242,6 +242,10 @@ CELERY_ROUTES = { "queue": "new_feeds", "binding_key": "new_feeds" }, + "push-feeds": { + "queue": "push_feeds", + "binding_key": "push_feeds" + }, "update-feeds": { "queue": "update_feeds", "binding_key": "update_feeds" @@ -253,6 +257,11 @@ CELERY_QUEUES = { "exchange_type": "direct", "binding_key": "new_feeds" }, + "push_feeds": { + "exchange": "push_feeds", + "exchange_type": "direct", + "binding_key": "push_feeds" + }, "update_feeds": { "exchange": "update_feeds", "exchange_type": "direct", diff --git a/utils/feed_fetcher.py b/utils/feed_fetcher.py index 915ee8737..65625b75e 100644 --- a/utils/feed_fetcher.py +++ b/utils/feed_fetcher.py @@ -63,7 +63,10 @@ class FetchFeed: 's' if self.feed.num_subscribers != 1 else '', settings.NEWSBLUR_URL ) - + if self.options.get('feed_xml'): + feed_xml = self.options.get('feed_xml') + logging.debug(u' ---> [%-30s] ~FM~BKFeed has been fat pinged. Ignoring fat: %s' % ( + unicode(self.feed)[:30], len(feed_xml))) if self.options.get('fpf'): self.fpf = self.options.get('fpf') logging.debug(u' ---> [%-30s] ~FM~BKFeed fetched in real-time with fat ping.' % (