Merge branch 'master' into circular

* master:
  Using error_feeds to push broken feeds further out in the pipeline.
This commit is contained in:
Samuel Clay 2013-04-08 10:51:00 -07:00
commit 0ec997f930
2 changed files with 18 additions and 8 deletions

View file

@ -780,8 +780,10 @@ class Feed(models.Model):
if not feed or original_feed_id != feed.pk:
logging.info(" ---> ~FRFeed changed id, removing %s from tasked_feeds queue..." % original_feed_id)
r.zrem('tasked_feeds', original_feed_id)
r.zrem('error_feeds', original_feed_id)
if feed:
r.zrem('tasked_feeds', feed.pk)
r.zrem('error_feeds', feed.pk)
return feed
@ -1293,9 +1295,10 @@ class Feed(models.Model):
def set_next_scheduled_update(self, verbose=False, skip_scheduling=False):
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
total, random_factor = self.get_next_scheduled_update(force=True, verbose=verbose)
error_count = self.error_count
if self.errors_since_good:
total = total * self.errors_since_good
if error_count:
total = total * error_count
if verbose:
logging.debug(' ---> [%-30s] ~FBScheduling feed fetch geometrically: '
'~SB%s errors. Time: %s min' % (
@ -1304,16 +1307,23 @@ class Feed(models.Model):
next_scheduled_update = datetime.datetime.utcnow() + datetime.timedelta(
minutes = total + random_factor)
self.min_to_decay = total
if not skip_scheduling and self.active_subscribers >= 1:
self.next_scheduled_update = next_scheduled_update
r.zadd('scheduled_updates', self.pk, self.next_scheduled_update.strftime('%s'))
r.zrem('tasked_feeds', self.pk)
r.srem('queued_feeds', self.pk)
self.save()
@property
def error_count(self):
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
fetch_errors = int(r.zscore('error_feeds', self.pk) or 0)
return fetch_errors + self.errors_since_good
def schedule_feed_fetch_immediately(self, verbose=True):
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
if verbose:

View file

@ -57,11 +57,11 @@ class TaskFeeds(Task):
inactive_count = len(old_tasked_feeds)
if inactive_count:
r.zremrangebyscore('tasked_feeds', 0, hours_ago)
r.sadd('queued_feeds', *old_tasked_feeds)
p = r.pipeline()
# r.sadd('queued_feeds', *old_tasked_feeds)
for feed_id in old_tasked_feeds:
p.zincrby('error_feeds', feed_id, 1)
p.execute()
r.zincrby('error_feeds', feed_id, 1)
feed = Feed.get_by_id(feed_id)
feed.set_next_scheduled_update()
logging.debug(" ---> ~SN~FBRe-queuing ~SB%s~SN dropped feeds (~SB%s/%s~SN queued/tasked)" % (
inactive_count,
r.scard('queued_feeds'),