Moving task queue from postgres db backed to redis backed.

This commit is contained in:
Samuel Clay 2013-03-30 19:05:13 -07:00
parent 25d21c9f69
commit ba5919d760
5 changed files with 151 additions and 38 deletions

View file

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
import redis
from south.v2 import DataMigration
from django.conf import settings
from apps.rss_feeds.models import Feed
class Migration(DataMigration):
def forwards(self, orm):
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
start = 0
for f in xrange(start, Feed.objects.latest('pk').pk, 1000):
print " ---> %s" % f
feed = Feed.objects.filter(pk__in=range(f, f+1000), active=True)\
.values_list('pk', 'next_scheduled_update', 'min_to_decay')
p = r.pipeline()
for pk, s, m in feed:
p.zadd('scheduled_updates', pk, s.strftime('%s'))
p.execute()
def backwards(self, orm):
"Write your backwards methods here."
models = {
u'rss_feeds.duplicatefeed': {
'Meta': {'object_name': 'DuplicateFeed'},
'duplicate_address': ('django.db.models.fields.CharField', [], {'max_length': '764', 'db_index': 'True'}),
'duplicate_feed_id': ('django.db.models.fields.CharField', [], {'max_length': '255', 'null': 'True', 'db_index': 'True'}),
'duplicate_link': ('django.db.models.fields.CharField', [], {'max_length': '764', 'null': 'True', 'db_index': 'True'}),
'feed': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'duplicate_addresses'", 'to': u"orm['rss_feeds.Feed']"}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'})
},
u'rss_feeds.feed': {
'Meta': {'ordering': "['feed_title']", 'object_name': 'Feed', 'db_table': "'feeds'"},
'active': ('django.db.models.fields.BooleanField', [], {'default': 'True', 'db_index': 'True'}),
'active_premium_subscribers': ('django.db.models.fields.IntegerField', [], {'default': '-1'}),
'active_subscribers': ('django.db.models.fields.IntegerField', [], {'default': '-1', 'db_index': 'True'}),
'average_stories_per_month': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
'branch_from_feed': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['rss_feeds.Feed']", 'null': 'True', 'blank': 'True'}),
'creation': ('django.db.models.fields.DateField', [], {'auto_now_add': 'True', 'blank': 'True'}),
'days_to_trim': ('django.db.models.fields.IntegerField', [], {'default': '90'}),
'errors_since_good': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
'etag': ('django.db.models.fields.CharField', [], {'max_length': '255', 'null': 'True', 'blank': 'True'}),
'exception_code': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
'favicon_color': ('django.db.models.fields.CharField', [], {'max_length': '6', 'null': 'True', 'blank': 'True'}),
'favicon_not_found': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'feed_address': ('django.db.models.fields.URLField', [], {'max_length': '764', 'db_index': 'True'}),
'feed_address_locked': ('django.db.models.fields.NullBooleanField', [], {'default': 'False', 'null': 'True', 'blank': 'True'}),
'feed_link': ('django.db.models.fields.URLField', [], {'default': "''", 'max_length': '1000', 'null': 'True', 'blank': 'True'}),
'feed_link_locked': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'feed_title': ('django.db.models.fields.CharField', [], {'default': "'[Untitled]'", 'max_length': '255', 'null': 'True', 'blank': 'True'}),
'fetched_once': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'has_feed_exception': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'db_index': 'True'}),
'has_page': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
'has_page_exception': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'db_index': 'True'}),
'hash_address_and_link': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'is_push': ('django.db.models.fields.NullBooleanField', [], {'default': 'False', 'null': 'True', 'blank': 'True'}),
'known_good': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'last_load_time': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
'last_modified': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}),
'last_update': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}),
'min_to_decay': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
'next_scheduled_update': ('django.db.models.fields.DateTimeField', [], {}),
'num_subscribers': ('django.db.models.fields.IntegerField', [], {'default': '-1'}),
'premium_subscribers': ('django.db.models.fields.IntegerField', [], {'default': '-1'}),
'queued_date': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}),
's3_icon': ('django.db.models.fields.NullBooleanField', [], {'default': 'False', 'null': 'True', 'blank': 'True'}),
's3_page': ('django.db.models.fields.NullBooleanField', [], {'default': 'False', 'null': 'True', 'blank': 'True'}),
'stories_last_month': ('django.db.models.fields.IntegerField', [], {'default': '0'})
},
u'rss_feeds.feeddata': {
'Meta': {'object_name': 'FeedData'},
'feed': ('utils.fields.AutoOneToOneField', [], {'related_name': "'data'", 'unique': 'True', 'to': u"orm['rss_feeds.Feed']"}),
'feed_classifier_counts': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'feed_tagline': ('django.db.models.fields.CharField', [], {'max_length': '1024', 'null': 'True', 'blank': 'True'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'popular_authors': ('django.db.models.fields.CharField', [], {'max_length': '2048', 'null': 'True', 'blank': 'True'}),
'popular_tags': ('django.db.models.fields.CharField', [], {'max_length': '1024', 'null': 'True', 'blank': 'True'}),
'story_count_history': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'})
}
}
complete_apps = ['rss_feeds']
symmetrical = True

View file

@ -313,21 +313,21 @@ class Feed(models.Model):
@classmethod
def task_feeds(cls, feeds, queue_size=12, verbose=True):
if not feeds: return
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
if isinstance(feeds, Feed):
if verbose:
logging.debug(" ---> Tasking feed: %s" % feeds)
feeds = [feeds]
logging.debug(" ---> ~SN~FBTasking feed: ~SB%s" % feeds)
feeds = [feeds.pk]
elif verbose:
logging.debug(" ---> Tasking %s feeds..." % len(feeds))
logging.debug(" ---> ~SN~FBTasking ~SB%s~SN feeds..." % len(feeds))
feed_queue = []
for f in feeds:
f.queued_date = datetime.datetime.utcnow()
f.set_next_scheduled_update(verbose=False)
for feed_queue in (feeds[pos:pos + queue_size] for pos in xrange(0, len(feeds), queue_size)):
feed_ids = [feed.pk for feed in feed_queue]
if isinstance(feeds, QuerySet):
feeds = [f.pk for f in feeds]
r.srem('queued_feeds', *feeds)
for feed_ids in (feeds[pos:pos + queue_size] for pos in xrange(0, len(feeds), queue_size)):
UpdateFeeds.apply_async(args=(feed_ids,), queue='update_feeds')
def update_all_statistics(self, full=True, force=False):
@ -384,7 +384,7 @@ class Feed(models.Model):
return False
try:
self.feed_address = feed_address
self.next_scheduled_update = datetime.datetime.utcnow()
self.schedule_feed_fetch_immediately()
self.has_feed_exception = False
self.active = True
self.save()
@ -739,6 +739,8 @@ class Feed(models.Model):
def update(self, **kwargs):
from utils import feed_fetcher
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
if getattr(settings, 'TEST_DEBUG', False):
self.feed_address = self.feed_address % {'NEWSBLUR_DIR': settings.NEWSBLUR_DIR}
self.feed_link = self.feed_link % {'NEWSBLUR_DIR': settings.NEWSBLUR_DIR}
@ -766,6 +768,7 @@ class Feed(models.Model):
feed.last_update = datetime.datetime.utcnow()
feed.set_next_scheduled_update()
r.zadd('fetched_feeds_last_hour', self.pk, int(datetime.datetime.now().strftime('%s')))
if options['force']:
feed.sync_redis()
@ -1274,6 +1277,7 @@ class Feed(models.Model):
return total, random_factor*8
def set_next_scheduled_update(self, verbose=False, skip_scheduling=False):
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
total, random_factor = self.get_next_scheduled_update(force=True, verbose=verbose)
if self.errors_since_good:
@ -1285,18 +1289,23 @@ class Feed(models.Model):
next_scheduled_update = datetime.datetime.utcnow() + datetime.timedelta(
minutes = total + random_factor)
self.min_to_decay = total
if not skip_scheduling:
self.next_scheduled_update = next_scheduled_update
r.zadd('scheduled_updates', self.pk, self.next_scheduled_update.strftime('%s'))
self.save()
def schedule_feed_fetch_immediately(self, verbose=True):
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
if verbose:
logging.debug(' ---> [%-30s] Scheduling feed fetch immediately...' % (unicode(self)[:30]))
self.next_scheduled_update = datetime.datetime.utcnow()
r.zadd('scheduled_updates', self.pk, self.next_scheduled_update.strftime('%s'))
return self.save()
@ -1318,7 +1327,7 @@ class Feed(models.Model):
self.schedule_feed_fetch_immediately()
else:
logging.debug(' ---> [%-30s] [%s] ~FBQueuing pushed stories...' % (unicode(self)[:30], self.pk))
self.queued_date = datetime.datetime.utcnow()
# self.queued_date = datetime.datetime.utcnow()
self.set_next_scheduled_update()
PushFeeds.apply_async(args=(self.pk, xml), queue='push_feeds')

View file

@ -2,6 +2,7 @@ import datetime
import os
import shutil
import time
import redis
from celery.task import Task
from utils import log as logging
from utils import s3_utils as s3
@ -15,19 +16,37 @@ class TaskFeeds(Task):
settings.LOG_TO_STREAM = True
now = datetime.datetime.utcnow()
start = time.time()
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
hour_ago = now - datetime.timedelta(hours=1)
r.zremrangebyscore('fetched_feeds_last_hour', 0, int(hour_ago.strftime('%s')))
now_timestamp = int(now.strftime("%s"))
queued_feeds = r.zrangebyscore('scheduled_updates', 0, now_timestamp)
r.zremrangebyscore('scheduled_updates', 0, now_timestamp)
r.sadd('queued_feeds', *queued_feeds)
logging.debug(" ---> ~SN~FBQueuing ~SB%s~SN stale feeds (~SB%s~SN/%s queued/scheduled)" % (
len(queued_feeds),
r.scard('queued_feeds'),
r.zcard('scheduled_updates')))
# Regular feeds
feeds = Feed.objects.filter(
next_scheduled_update__lte=now,
active=True,
active_subscribers__gte=1
).order_by('?')[:1250]
active_count = feeds.count()
feeds = r.srandmember('queued_feeds', 1000)
Feed.task_feeds(feeds, verbose=True)
active_count = len(feeds)
cp1 = time.time()
# Regular feeds
# feeds = Feed.objects.filter(
# next_scheduled_update__lte=now,
# active=True,
# active_subscribers__gte=1
# ).order_by('?')[:1250]
# active_count = feeds.count()
# cp1 = time.time()
# Force refresh feeds
refresh_feeds = Feed.objects.filter(
next_scheduled_update__lte=now,
active=True,
fetched_once=False,
active_subscribers__gte=1
@ -38,18 +57,16 @@ class TaskFeeds(Task):
# Mistakenly inactive feeds
day = now - datetime.timedelta(days=1)
inactive_feeds = Feed.objects.filter(
last_update__lte=day,
queued_date__lte=day,
next_scheduled_update__lte=day,
min_to_decay__lte=60*24,
active_subscribers__gte=1
).order_by('?')[:100]
inactive_count = inactive_feeds.count()
cp3 = time.time()
week = now - datetime.timedelta(days=7)
old = now - datetime.timedelta(days=3)
old_feeds = Feed.objects.filter(
last_update__lte=week,
queued_date__lte=day,
next_scheduled_update__lte=old,
active_subscribers__gte=1
).order_by('?')[:500]
old_count = old_feeds.count()
@ -66,12 +83,16 @@ class TaskFeeds(Task):
cp4 - cp3
))
Feed.task_feeds(feeds, verbose=False)
# Feed.task_feeds(feeds, verbose=False)
Feed.task_feeds(refresh_feeds, verbose=False)
Feed.task_feeds(inactive_feeds, verbose=False)
Feed.task_feeds(old_feeds, verbose=False)
logging.debug(" ---> ~SN~FBTasking took ~SB%s~SN seconds" % int((time.time() - start)))
logging.debug(" ---> ~SN~FBTasking took ~SB%s~SN seconds (~SB%s~SN/~SB%s~SN/%s tasked/queued/scheduled)" % (
int((time.time() - start)),
r.llen('update_feeds'),
r.scard('queued_feeds'),
r.zcard('scheduled_updates')))
class UpdateFeeds(Task):

View file

@ -211,7 +211,7 @@ def exception_retry(request):
if not feed:
raise Http404
feed.next_scheduled_update = datetime.datetime.utcnow()
feed.schedule_feed_fetch_immediately()
feed.has_page_exception = False
feed.has_feed_exception = False
feed.active = True
@ -258,13 +258,12 @@ def exception_change_feed_address(request):
feed.active = True
feed.fetched_once = False
feed.feed_address = feed_address
feed.next_scheduled_update = datetime.datetime.utcnow()
duplicate_feed = feed.save()
duplicate_feed = feed.schedule_feed_fetch_immediately()
code = 1
if duplicate_feed:
new_feed = Feed.objects.get(pk=duplicate_feed.pk)
feed = new_feed
new_feed.next_scheduled_update = datetime.datetime.utcnow()
new_feed.schedule_feed_fetch_immediately()
new_feed.has_feed_exception = False
new_feed.active = True
new_feed.save()
@ -339,12 +338,11 @@ def exception_change_feed_link(request):
feed.fetched_once = False
feed.feed_link = feed_link
feed.feed_address = feed_address
feed.next_scheduled_update = datetime.datetime.utcnow()
duplicate_feed = feed.save()
duplicate_feed = feed.schedule_feed_fetch_immediately()
if duplicate_feed:
new_feed = Feed.objects.get(pk=duplicate_feed.pk)
feed = new_feed
new_feed.next_scheduled_update = datetime.datetime.utcnow()
new_feed.schedule_feed_fetch_immediately()
new_feed.has_page_exception = False
new_feed.active = True
new_feed.save()

View file

@ -11,7 +11,7 @@ class NBMuninGraph(MuninGraph):
'graph_title' : 'NewsBlur Updates',
'graph_vlabel' : '# of updates',
'graph_args' : '-l 0',
'update_queue.label': 'Queued Feeds last hour',
'update_queue.label': 'Queued Feeds',
'feeds_fetched.label': 'Fetched feeds last hour',
'celery_update_feeds.label': 'Celery - Update Feeds',
'celery_new_feeds.label': 'Celery - New Feeds',
@ -29,8 +29,8 @@ class NBMuninGraph(MuninGraph):
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
return {
'update_queue': Feed.objects.filter(queued_date__gte=hour_ago).count(),
'feeds_fetched': Feed.objects.filter(last_update__gte=hour_ago).count(),
'update_queue': r.llen("queued_feeds"),
'feeds_fetched': r.llen("fetched_feeds_last_hour"),
'celery_update_feeds': r.llen("update_feeds"),
'celery_new_feeds': r.llen("new_feeds"),
'celery_push_feeds': r.llen("push_feeds"),