mirror of
https://github.com/viq/NewsBlur.git
synced 2025-09-18 21:43:31 +00:00
Merge branch 'queue'
* queue: Cleaning up queue migration. Handling decayed feeds with no more active subscribers. They don't get returned to the scheduled_updates queue until there are enough active users. Only need next scheduled update. Fixing feeds that fall through. Moving task queue from postgres db backed to redis backed.
This commit is contained in:
commit
92fedb0d3a
6 changed files with 160 additions and 40 deletions
|
@ -226,15 +226,20 @@ def load_feeds(request):
|
|||
|
||||
user_subs = UserSubscription.objects.select_related('feed').filter(user=user)
|
||||
|
||||
day_ago = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
scheduled_feeds = []
|
||||
for sub in user_subs:
|
||||
pk = sub.feed_id
|
||||
if update_counts:
|
||||
sub.calculate_feed_scores(silent=True)
|
||||
feeds[pk] = sub.canonical(include_favicon=include_favicons)
|
||||
|
||||
if not sub.active: continue
|
||||
if not sub.feed.active and not sub.feed.has_feed_exception and not sub.feed.has_page_exception:
|
||||
scheduled_feeds.append(sub.feed.pk)
|
||||
elif sub.active and sub.feed.active_subscribers <= 0:
|
||||
elif sub.feed.active_subscribers <= 0:
|
||||
scheduled_feeds.append(sub.feed.pk)
|
||||
elif sub.feed.next_scheduled_update < day_ago:
|
||||
scheduled_feeds.append(sub.feed.pk)
|
||||
|
||||
if len(scheduled_feeds) > 0 and request.user.is_authenticated():
|
||||
|
|
87
apps/rss_feeds/migrations/0067_feed_next_update_redis.py
Normal file
87
apps/rss_feeds/migrations/0067_feed_next_update_redis.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import redis
|
||||
from south.v2 import DataMigration
|
||||
from django.conf import settings
|
||||
from apps.rss_feeds.models import Feed
|
||||
|
||||
class Migration(DataMigration):
|
||||
|
||||
def forwards(self, orm):
|
||||
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
|
||||
start = 0
|
||||
for f in xrange(start, Feed.objects.latest('pk').pk, 1000):
|
||||
print " ---> %s" % f
|
||||
feed = Feed.objects.filter(pk__in=range(f, f+1000),
|
||||
active=True,
|
||||
active_subscribers__gte=1)\
|
||||
.values_list('pk', 'next_scheduled_update')
|
||||
p = r.pipeline()
|
||||
for pk, s in feed:
|
||||
p.zadd('scheduled_updates', pk, s.strftime('%s'))
|
||||
p.execute()
|
||||
|
||||
def backwards(self, orm):
|
||||
"Write your backwards methods here."
|
||||
|
||||
models = {
|
||||
u'rss_feeds.duplicatefeed': {
|
||||
'Meta': {'object_name': 'DuplicateFeed'},
|
||||
'duplicate_address': ('django.db.models.fields.CharField', [], {'max_length': '764', 'db_index': 'True'}),
|
||||
'duplicate_feed_id': ('django.db.models.fields.CharField', [], {'max_length': '255', 'null': 'True', 'db_index': 'True'}),
|
||||
'duplicate_link': ('django.db.models.fields.CharField', [], {'max_length': '764', 'null': 'True', 'db_index': 'True'}),
|
||||
'feed': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'duplicate_addresses'", 'to': u"orm['rss_feeds.Feed']"}),
|
||||
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'})
|
||||
},
|
||||
u'rss_feeds.feed': {
|
||||
'Meta': {'ordering': "['feed_title']", 'object_name': 'Feed', 'db_table': "'feeds'"},
|
||||
'active': ('django.db.models.fields.BooleanField', [], {'default': 'True', 'db_index': 'True'}),
|
||||
'active_premium_subscribers': ('django.db.models.fields.IntegerField', [], {'default': '-1'}),
|
||||
'active_subscribers': ('django.db.models.fields.IntegerField', [], {'default': '-1', 'db_index': 'True'}),
|
||||
'average_stories_per_month': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
|
||||
'branch_from_feed': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['rss_feeds.Feed']", 'null': 'True', 'blank': 'True'}),
|
||||
'creation': ('django.db.models.fields.DateField', [], {'auto_now_add': 'True', 'blank': 'True'}),
|
||||
'days_to_trim': ('django.db.models.fields.IntegerField', [], {'default': '90'}),
|
||||
'errors_since_good': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
|
||||
'etag': ('django.db.models.fields.CharField', [], {'max_length': '255', 'null': 'True', 'blank': 'True'}),
|
||||
'exception_code': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
|
||||
'favicon_color': ('django.db.models.fields.CharField', [], {'max_length': '6', 'null': 'True', 'blank': 'True'}),
|
||||
'favicon_not_found': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'feed_address': ('django.db.models.fields.URLField', [], {'max_length': '764', 'db_index': 'True'}),
|
||||
'feed_address_locked': ('django.db.models.fields.NullBooleanField', [], {'default': 'False', 'null': 'True', 'blank': 'True'}),
|
||||
'feed_link': ('django.db.models.fields.URLField', [], {'default': "''", 'max_length': '1000', 'null': 'True', 'blank': 'True'}),
|
||||
'feed_link_locked': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'feed_title': ('django.db.models.fields.CharField', [], {'default': "'[Untitled]'", 'max_length': '255', 'null': 'True', 'blank': 'True'}),
|
||||
'fetched_once': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'has_feed_exception': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'db_index': 'True'}),
|
||||
'has_page': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
|
||||
'has_page_exception': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'db_index': 'True'}),
|
||||
'hash_address_and_link': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64'}),
|
||||
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'is_push': ('django.db.models.fields.NullBooleanField', [], {'default': 'False', 'null': 'True', 'blank': 'True'}),
|
||||
'known_good': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
|
||||
'last_load_time': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
|
||||
'last_modified': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}),
|
||||
'last_update': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}),
|
||||
'min_to_decay': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
|
||||
'next_scheduled_update': ('django.db.models.fields.DateTimeField', [], {}),
|
||||
'num_subscribers': ('django.db.models.fields.IntegerField', [], {'default': '-1'}),
|
||||
'premium_subscribers': ('django.db.models.fields.IntegerField', [], {'default': '-1'}),
|
||||
'queued_date': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}),
|
||||
's3_icon': ('django.db.models.fields.NullBooleanField', [], {'default': 'False', 'null': 'True', 'blank': 'True'}),
|
||||
's3_page': ('django.db.models.fields.NullBooleanField', [], {'default': 'False', 'null': 'True', 'blank': 'True'}),
|
||||
'stories_last_month': ('django.db.models.fields.IntegerField', [], {'default': '0'})
|
||||
},
|
||||
u'rss_feeds.feeddata': {
|
||||
'Meta': {'object_name': 'FeedData'},
|
||||
'feed': ('utils.fields.AutoOneToOneField', [], {'related_name': "'data'", 'unique': 'True', 'to': u"orm['rss_feeds.Feed']"}),
|
||||
'feed_classifier_counts': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
|
||||
'feed_tagline': ('django.db.models.fields.CharField', [], {'max_length': '1024', 'null': 'True', 'blank': 'True'}),
|
||||
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'popular_authors': ('django.db.models.fields.CharField', [], {'max_length': '2048', 'null': 'True', 'blank': 'True'}),
|
||||
'popular_tags': ('django.db.models.fields.CharField', [], {'max_length': '1024', 'null': 'True', 'blank': 'True'}),
|
||||
'story_count_history': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'})
|
||||
}
|
||||
}
|
||||
|
||||
complete_apps = ['rss_feeds']
|
||||
symmetrical = True
|
|
@ -313,21 +313,21 @@ class Feed(models.Model):
|
|||
@classmethod
|
||||
def task_feeds(cls, feeds, queue_size=12, verbose=True):
|
||||
if not feeds: return
|
||||
|
||||
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
|
||||
|
||||
if isinstance(feeds, Feed):
|
||||
if verbose:
|
||||
logging.debug(" ---> Tasking feed: %s" % feeds)
|
||||
feeds = [feeds]
|
||||
logging.debug(" ---> ~SN~FBTasking feed: ~SB%s" % feeds)
|
||||
feeds = [feeds.pk]
|
||||
elif verbose:
|
||||
logging.debug(" ---> Tasking %s feeds..." % len(feeds))
|
||||
logging.debug(" ---> ~SN~FBTasking ~SB%s~SN feeds..." % len(feeds))
|
||||
|
||||
feed_queue = []
|
||||
for f in feeds:
|
||||
f.queued_date = datetime.datetime.utcnow()
|
||||
f.set_next_scheduled_update(verbose=False)
|
||||
|
||||
for feed_queue in (feeds[pos:pos + queue_size] for pos in xrange(0, len(feeds), queue_size)):
|
||||
feed_ids = [feed.pk for feed in feed_queue]
|
||||
if isinstance(feeds, QuerySet):
|
||||
feeds = [f.pk for f in feeds]
|
||||
|
||||
r.srem('queued_feeds', *feeds)
|
||||
|
||||
for feed_ids in (feeds[pos:pos + queue_size] for pos in xrange(0, len(feeds), queue_size)):
|
||||
UpdateFeeds.apply_async(args=(feed_ids,), queue='update_feeds')
|
||||
|
||||
def update_all_statistics(self, full=True, force=False):
|
||||
|
@ -384,7 +384,7 @@ class Feed(models.Model):
|
|||
return False
|
||||
try:
|
||||
self.feed_address = feed_address
|
||||
self.next_scheduled_update = datetime.datetime.utcnow()
|
||||
self.schedule_feed_fetch_immediately()
|
||||
self.has_feed_exception = False
|
||||
self.active = True
|
||||
self.save()
|
||||
|
@ -739,6 +739,8 @@ class Feed(models.Model):
|
|||
|
||||
def update(self, **kwargs):
|
||||
from utils import feed_fetcher
|
||||
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
|
||||
|
||||
if getattr(settings, 'TEST_DEBUG', False):
|
||||
self.feed_address = self.feed_address % {'NEWSBLUR_DIR': settings.NEWSBLUR_DIR}
|
||||
self.feed_link = self.feed_link % {'NEWSBLUR_DIR': settings.NEWSBLUR_DIR}
|
||||
|
@ -766,6 +768,7 @@ class Feed(models.Model):
|
|||
|
||||
feed.last_update = datetime.datetime.utcnow()
|
||||
feed.set_next_scheduled_update()
|
||||
r.zadd('fetched_feeds_last_hour', self.pk, int(datetime.datetime.now().strftime('%s')))
|
||||
|
||||
if options['force']:
|
||||
feed.sync_redis()
|
||||
|
@ -1274,6 +1277,7 @@ class Feed(models.Model):
|
|||
return total, random_factor*8
|
||||
|
||||
def set_next_scheduled_update(self, verbose=False, skip_scheduling=False):
|
||||
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
|
||||
total, random_factor = self.get_next_scheduled_update(force=True, verbose=verbose)
|
||||
|
||||
if self.errors_since_good:
|
||||
|
@ -1285,18 +1289,23 @@ class Feed(models.Model):
|
|||
|
||||
next_scheduled_update = datetime.datetime.utcnow() + datetime.timedelta(
|
||||
minutes = total + random_factor)
|
||||
|
||||
|
||||
|
||||
self.min_to_decay = total
|
||||
if not skip_scheduling:
|
||||
if not skip_scheduling and self.active_subscribers >= 1:
|
||||
self.next_scheduled_update = next_scheduled_update
|
||||
r.zadd('scheduled_updates', self.pk, self.next_scheduled_update.strftime('%s'))
|
||||
|
||||
self.save()
|
||||
|
||||
|
||||
def schedule_feed_fetch_immediately(self, verbose=True):
|
||||
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
|
||||
if verbose:
|
||||
logging.debug(' ---> [%-30s] Scheduling feed fetch immediately...' % (unicode(self)[:30]))
|
||||
|
||||
self.next_scheduled_update = datetime.datetime.utcnow()
|
||||
r.zadd('scheduled_updates', self.pk, self.next_scheduled_update.strftime('%s'))
|
||||
|
||||
return self.save()
|
||||
|
||||
|
@ -1318,7 +1327,7 @@ class Feed(models.Model):
|
|||
self.schedule_feed_fetch_immediately()
|
||||
else:
|
||||
logging.debug(' ---> [%-30s] [%s] ~FBQueuing pushed stories...' % (unicode(self)[:30], self.pk))
|
||||
self.queued_date = datetime.datetime.utcnow()
|
||||
# self.queued_date = datetime.datetime.utcnow()
|
||||
self.set_next_scheduled_update()
|
||||
PushFeeds.apply_async(args=(self.pk, xml), queue='push_feeds')
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ import datetime
|
|||
import os
|
||||
import shutil
|
||||
import time
|
||||
import redis
|
||||
from celery.task import Task
|
||||
from utils import log as logging
|
||||
from utils import s3_utils as s3
|
||||
|
@ -15,19 +16,37 @@ class TaskFeeds(Task):
|
|||
settings.LOG_TO_STREAM = True
|
||||
now = datetime.datetime.utcnow()
|
||||
start = time.time()
|
||||
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
|
||||
|
||||
hour_ago = now - datetime.timedelta(hours=1)
|
||||
r.zremrangebyscore('fetched_feeds_last_hour', 0, int(hour_ago.strftime('%s')))
|
||||
|
||||
now_timestamp = int(now.strftime("%s"))
|
||||
queued_feeds = r.zrangebyscore('scheduled_updates', 0, now_timestamp)
|
||||
r.zremrangebyscore('scheduled_updates', 0, now_timestamp)
|
||||
r.sadd('queued_feeds', *queued_feeds)
|
||||
logging.debug(" ---> ~SN~FBQueuing ~SB%s~SN stale feeds (~SB%s~SN/%s queued/scheduled)" % (
|
||||
len(queued_feeds),
|
||||
r.scard('queued_feeds'),
|
||||
r.zcard('scheduled_updates')))
|
||||
|
||||
# Regular feeds
|
||||
feeds = Feed.objects.filter(
|
||||
next_scheduled_update__lte=now,
|
||||
active=True,
|
||||
active_subscribers__gte=1
|
||||
).order_by('?')[:1250]
|
||||
active_count = feeds.count()
|
||||
feeds = r.srandmember('queued_feeds', 1000)
|
||||
Feed.task_feeds(feeds, verbose=True)
|
||||
active_count = len(feeds)
|
||||
cp1 = time.time()
|
||||
|
||||
# Regular feeds
|
||||
# feeds = Feed.objects.filter(
|
||||
# next_scheduled_update__lte=now,
|
||||
# active=True,
|
||||
# active_subscribers__gte=1
|
||||
# ).order_by('?')[:1250]
|
||||
# active_count = feeds.count()
|
||||
# cp1 = time.time()
|
||||
|
||||
# Force refresh feeds
|
||||
refresh_feeds = Feed.objects.filter(
|
||||
next_scheduled_update__lte=now,
|
||||
active=True,
|
||||
fetched_once=False,
|
||||
active_subscribers__gte=1
|
||||
|
@ -38,18 +57,16 @@ class TaskFeeds(Task):
|
|||
# Mistakenly inactive feeds
|
||||
day = now - datetime.timedelta(days=1)
|
||||
inactive_feeds = Feed.objects.filter(
|
||||
last_update__lte=day,
|
||||
queued_date__lte=day,
|
||||
next_scheduled_update__lte=day,
|
||||
min_to_decay__lte=60*24,
|
||||
active_subscribers__gte=1
|
||||
).order_by('?')[:100]
|
||||
inactive_count = inactive_feeds.count()
|
||||
cp3 = time.time()
|
||||
|
||||
week = now - datetime.timedelta(days=7)
|
||||
old = now - datetime.timedelta(days=3)
|
||||
old_feeds = Feed.objects.filter(
|
||||
last_update__lte=week,
|
||||
queued_date__lte=day,
|
||||
next_scheduled_update__lte=old,
|
||||
active_subscribers__gte=1
|
||||
).order_by('?')[:500]
|
||||
old_count = old_feeds.count()
|
||||
|
@ -66,12 +83,16 @@ class TaskFeeds(Task):
|
|||
cp4 - cp3
|
||||
))
|
||||
|
||||
Feed.task_feeds(feeds, verbose=False)
|
||||
# Feed.task_feeds(feeds, verbose=False)
|
||||
Feed.task_feeds(refresh_feeds, verbose=False)
|
||||
Feed.task_feeds(inactive_feeds, verbose=False)
|
||||
Feed.task_feeds(old_feeds, verbose=False)
|
||||
|
||||
logging.debug(" ---> ~SN~FBTasking took ~SB%s~SN seconds" % int((time.time() - start)))
|
||||
logging.debug(" ---> ~SN~FBTasking took ~SB%s~SN seconds (~SB%s~SN/~SB%s~SN/%s tasked/queued/scheduled)" % (
|
||||
int((time.time() - start)),
|
||||
r.llen('update_feeds'),
|
||||
r.scard('queued_feeds'),
|
||||
r.zcard('scheduled_updates')))
|
||||
|
||||
|
||||
class UpdateFeeds(Task):
|
||||
|
|
|
@ -211,7 +211,7 @@ def exception_retry(request):
|
|||
if not feed:
|
||||
raise Http404
|
||||
|
||||
feed.next_scheduled_update = datetime.datetime.utcnow()
|
||||
feed.schedule_feed_fetch_immediately()
|
||||
feed.has_page_exception = False
|
||||
feed.has_feed_exception = False
|
||||
feed.active = True
|
||||
|
@ -258,13 +258,12 @@ def exception_change_feed_address(request):
|
|||
feed.active = True
|
||||
feed.fetched_once = False
|
||||
feed.feed_address = feed_address
|
||||
feed.next_scheduled_update = datetime.datetime.utcnow()
|
||||
duplicate_feed = feed.save()
|
||||
duplicate_feed = feed.schedule_feed_fetch_immediately()
|
||||
code = 1
|
||||
if duplicate_feed:
|
||||
new_feed = Feed.objects.get(pk=duplicate_feed.pk)
|
||||
feed = new_feed
|
||||
new_feed.next_scheduled_update = datetime.datetime.utcnow()
|
||||
new_feed.schedule_feed_fetch_immediately()
|
||||
new_feed.has_feed_exception = False
|
||||
new_feed.active = True
|
||||
new_feed.save()
|
||||
|
@ -339,12 +338,11 @@ def exception_change_feed_link(request):
|
|||
feed.fetched_once = False
|
||||
feed.feed_link = feed_link
|
||||
feed.feed_address = feed_address
|
||||
feed.next_scheduled_update = datetime.datetime.utcnow()
|
||||
duplicate_feed = feed.save()
|
||||
duplicate_feed = feed.schedule_feed_fetch_immediately()
|
||||
if duplicate_feed:
|
||||
new_feed = Feed.objects.get(pk=duplicate_feed.pk)
|
||||
feed = new_feed
|
||||
new_feed.next_scheduled_update = datetime.datetime.utcnow()
|
||||
new_feed.schedule_feed_fetch_immediately()
|
||||
new_feed.has_page_exception = False
|
||||
new_feed.active = True
|
||||
new_feed.save()
|
||||
|
|
|
@ -11,7 +11,7 @@ class NBMuninGraph(MuninGraph):
|
|||
'graph_title' : 'NewsBlur Updates',
|
||||
'graph_vlabel' : '# of updates',
|
||||
'graph_args' : '-l 0',
|
||||
'update_queue.label': 'Queued Feeds last hour',
|
||||
'update_queue.label': 'Queued Feeds',
|
||||
'feeds_fetched.label': 'Fetched feeds last hour',
|
||||
'celery_update_feeds.label': 'Celery - Update Feeds',
|
||||
'celery_new_feeds.label': 'Celery - New Feeds',
|
||||
|
@ -29,8 +29,8 @@ class NBMuninGraph(MuninGraph):
|
|||
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
|
||||
|
||||
return {
|
||||
'update_queue': Feed.objects.filter(queued_date__gte=hour_ago).count(),
|
||||
'feeds_fetched': Feed.objects.filter(last_update__gte=hour_ago).count(),
|
||||
'update_queue': r.llen("queued_feeds"),
|
||||
'feeds_fetched': r.llen("fetched_feeds_last_hour"),
|
||||
'celery_update_feeds': r.llen("update_feeds"),
|
||||
'celery_new_feeds': r.llen("new_feeds"),
|
||||
'celery_push_feeds': r.llen("push_feeds"),
|
||||
|
|
Loading…
Add table
Reference in a new issue