Adding push feeds queue. Ignoring fat ping, since it might not match URLs.

This commit is contained in:
Samuel Clay 2012-03-28 15:49:21 -07:00
parent dad3283d2a
commit a74be1b604
7 changed files with 52 additions and 11 deletions

View file

@ -58,7 +58,7 @@ def push_callback(request, push_id):
callback=request.build_absolute_uri(), callback=request.build_absolute_uri(),
lease_seconds=seconds) lease_seconds=seconds)
subscription.feed.update(fpf=parsed) subscription.feed.queue_pushed_feed_xml(request.raw_post_data)
updated.send(sender=subscription, update=parsed) updated.send(sender=subscription, update=parsed)
return HttpResponse('') return HttpResponse('')

View file

@ -364,7 +364,8 @@ class UserSubscription(models.Model):
# Switch read stories # Switch read stories
user_stories = MUserStory.objects(user_id=self.user.pk, feed_id=old_feed.pk) user_stories = MUserStory.objects(user_id=self.user.pk, feed_id=old_feed.pk)
logging.info(" ---> %s read stories" % user_stories.count()) if user_stories.count() > 0:
logging.info(" ---> %s read stories" % user_stories.count())
for user_story in user_stories: for user_story in user_stories:
user_story.feed_id = new_feed.pk user_story.feed_id = new_feed.pk
duplicate_story = user_story.story duplicate_story = user_story.story

View file

@ -16,7 +16,7 @@ from django.conf import settings
from django.db.models.query import QuerySet from django.db.models.query import QuerySet
from mongoengine.queryset import OperationError from mongoengine.queryset import OperationError
from mongoengine.base import ValidationError from mongoengine.base import ValidationError
from apps.rss_feeds.tasks import UpdateFeeds from apps.rss_feeds.tasks import UpdateFeeds, PushFeeds
from celery.task import Task from celery.task import Task
from utils import json_functions as json from utils import json_functions as json
from utils import feedfinder, feedparser from utils import feedfinder, feedparser
@ -171,18 +171,17 @@ class Feed(models.Model):
def by_url(address): def by_url(address):
feed = cls.objects.filter(**criteria('feed_address', address)).order_by('-num_subscribers') feed = cls.objects.filter(**criteria('feed_address', address)).order_by('-num_subscribers')
if not feed:
feed = cls.objects.filter(**criteria('feed_link', address)).order_by('-num_subscribers')
if not feed: if not feed:
duplicate_feed = DuplicateFeed.objects.filter(**criteria('duplicate_address', address)) duplicate_feed = DuplicateFeed.objects.filter(**criteria('duplicate_address', address))
if duplicate_feed and len(duplicate_feed) > offset: if duplicate_feed and len(duplicate_feed) > offset:
feed = [duplicate_feed[offset].feed] feed = [duplicate_feed[offset].feed]
if not feed and aggressive:
feed = cls.objects.filter(**criteria('feed_link', address)).order_by('-num_subscribers')
return feed return feed
# Normalize and check for feed_address, dupes, and feed_link # Normalize and check for feed_address, dupes, and feed_link
if not aggressive: url = urlnorm.normalize(url)
url = urlnorm.normalize(url)
feed = by_url(url) feed = by_url(url)
# Create if it looks good # Create if it looks good
@ -634,6 +633,7 @@ class Feed(models.Model):
'quick': kwargs.get('quick'), 'quick': kwargs.get('quick'),
'debug': kwargs.get('debug'), 'debug': kwargs.get('debug'),
'fpf': kwargs.get('fpf'), 'fpf': kwargs.get('fpf'),
'feed_xml': kwargs.get('feed_xml'),
} }
disp = feed_fetcher.Dispatcher(options, 1) disp = feed_fetcher.Dispatcher(options, 1)
disp.add_jobs([[self.pk]]) disp.add_jobs([[self.pk]])
@ -1039,7 +1039,8 @@ class Feed(models.Model):
if self.active_premium_subscribers > 0: if self.active_premium_subscribers > 0:
total = min(total, 60) # 1 hour minimum for premiums total = min(total, 60) # 1 hour minimum for premiums
if self.is_push:
total = total * 20
if verbose: if verbose:
print "[%s] %s (%s/%s/%s/%s), %s, %s: %s" % (self, updates_per_day_delay, print "[%s] %s (%s/%s/%s/%s), %s, %s: %s" % (self, updates_per_day_delay,
self.num_subscribers, self.active_subscribers, self.num_subscribers, self.active_subscribers,
@ -1054,7 +1055,7 @@ class Feed(models.Model):
if error_count: if error_count:
total = total * error_count total = total * error_count
logging.debug(' ---> [%-30s] ~FBScheduling feed fetch geometrically: ~SB%s errors, %s non-errors. Total: %s' % (unicode(self)[:30], error_count, non_error_count, total)) logging.debug(' ---> [%-30s] ~FBScheduling feed fetch geometrically: ~SB%s/%s errors. Time: %s min' % (unicode(self)[:30], error_count, non_error_count, total))
next_scheduled_update = datetime.datetime.utcnow() + datetime.timedelta( next_scheduled_update = datetime.datetime.utcnow() + datetime.timedelta(
minutes = total + random_factor) minutes = total + random_factor)
@ -1079,6 +1080,19 @@ class Feed(models.Model):
return return
self.is_push = push.verified self.is_push = push.verified
self.save() self.save()
def queue_pushed_feed_xml(self, xml):
logging.debug(' ---> [%-30s] Queuing pushed stories...' % (unicode(self)[:30]))
publisher = Task.get_publisher()
self.queued_date = datetime.datetime.utcnow()
self.set_next_scheduled_update()
PushFeeds.apply_async(args=(self.pk, xml), queue='push_feeds', publisher=publisher)
publisher.connection.close()
# def calculate_collocations_story_content(self, # def calculate_collocations_story_content(self,
# collocation_measures=TrigramAssocMeasures, # collocation_measures=TrigramAssocMeasures,

View file

@ -43,3 +43,17 @@ class NewFeeds(Task):
for feed_pk in feed_pks: for feed_pk in feed_pks:
feed = Feed.objects.get(pk=feed_pk) feed = Feed.objects.get(pk=feed_pk)
feed.update(options=options) feed.update(options=options)
class PushFeeds(Task):
name = 'push-feeds'
max_retries = 0
ignore_result = True
def run(self, feed_id, xml, **kwargs):
from apps.rss_feeds.models import Feed
options = {
'feed_xml': xml
}
feed = Feed.objects.get(pk=feed_id)
feed.update(options=options)

View file

@ -1,5 +1,5 @@
[program:celery] [program:celery]
command=/home/sclay/newsblur/manage.py celeryd --loglevel=INFO -Q new_feeds,update_feeds command=/home/sclay/newsblur/manage.py celeryd --loglevel=INFO -Q new_feeds,push_feeds,update_feeds
directory=/home/sclay/newsblur directory=/home/sclay/newsblur
user=sclay user=sclay
numprocs=1 numprocs=1

View file

@ -242,6 +242,10 @@ CELERY_ROUTES = {
"queue": "new_feeds", "queue": "new_feeds",
"binding_key": "new_feeds" "binding_key": "new_feeds"
}, },
"push-feeds": {
"queue": "push_feeds",
"binding_key": "push_feeds"
},
"update-feeds": { "update-feeds": {
"queue": "update_feeds", "queue": "update_feeds",
"binding_key": "update_feeds" "binding_key": "update_feeds"
@ -253,6 +257,11 @@ CELERY_QUEUES = {
"exchange_type": "direct", "exchange_type": "direct",
"binding_key": "new_feeds" "binding_key": "new_feeds"
}, },
"push_feeds": {
"exchange": "push_feeds",
"exchange_type": "direct",
"binding_key": "push_feeds"
},
"update_feeds": { "update_feeds": {
"exchange": "update_feeds", "exchange": "update_feeds",
"exchange_type": "direct", "exchange_type": "direct",

View file

@ -63,7 +63,10 @@ class FetchFeed:
's' if self.feed.num_subscribers != 1 else '', 's' if self.feed.num_subscribers != 1 else '',
settings.NEWSBLUR_URL settings.NEWSBLUR_URL
) )
if self.options.get('feed_xml'):
feed_xml = self.options.get('feed_xml')
logging.debug(u' ---> [%-30s] ~FM~BKFeed has been fat pinged. Ignoring fat: %s' % (
unicode(self.feed)[:30], len(feed_xml)))
if self.options.get('fpf'): if self.options.get('fpf'):
self.fpf = self.options.get('fpf') self.fpf = self.options.get('fpf')
logging.debug(u' ---> [%-30s] ~FM~BKFeed fetched in real-time with fat ping.' % ( logging.debug(u' ---> [%-30s] ~FM~BKFeed fetched in real-time with fat ping.' % (