Geometrically delaying the fetching of bad feeds, while they're still somewhat fresh.

This commit is contained in:
Samuel Clay 2012-02-24 11:47:38 -08:00
parent 7203d69e42
commit 1ffefaa419
2 changed files with 20 additions and 26 deletions

View file

@ -4,7 +4,6 @@ import random
import re import re
import math import math
import mongoengine as mongo import mongoengine as mongo
import redis
import zlib import zlib
import urllib import urllib
import hashlib import hashlib
@ -13,7 +12,6 @@ from operator import itemgetter
# from nltk.collocations import TrigramCollocationFinder, BigramCollocationFinder, TrigramAssocMeasures, BigramAssocMeasures # from nltk.collocations import TrigramCollocationFinder, BigramCollocationFinder, TrigramAssocMeasures, BigramAssocMeasures
from django.db import models from django.db import models
from django.db import IntegrityError from django.db import IntegrityError
from django.core.cache import cache
from django.conf import settings from django.conf import settings
from django.db.models.query import QuerySet from django.db.models.query import QuerySet
from mongoengine.queryset import OperationError from mongoengine.queryset import OperationError
@ -287,6 +285,10 @@ class Feed(models.Model):
self.save_feed_history(505, 'Timeout', '') self.save_feed_history(505, 'Timeout', '')
feed_address = None feed_address = None
if feed_address:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_immediately()
return not not feed_address return not not feed_address
def save_feed_history(self, status_code, message, exception=None): def save_feed_history(self, status_code, message, exception=None):
@ -304,7 +306,8 @@ class Feed(models.Model):
# for history in old_fetch_histories: # for history in old_fetch_histories:
# history.delete() # history.delete()
if status_code not in (200, 304): if status_code not in (200, 304):
self.count_errors_in_history('feed', status_code) errors, non_errors = self.count_errors_in_history('feed', status_code)
self.set_next_scheduled_update(error_count=len(errors), non_error_count=len(non_errors))
elif self.has_feed_exception: elif self.has_feed_exception:
self.has_feed_exception = False self.has_feed_exception = False
self.active = True self.active = True
@ -334,7 +337,7 @@ class Feed(models.Model):
non_errors = [h for h in fetch_history if int(h) in (200, 304)] non_errors = [h for h in fetch_history if int(h) in (200, 304)]
errors = [h for h in fetch_history if int(h) not in (200, 304)] errors = [h for h in fetch_history if int(h) not in (200, 304)]
if len(non_errors) == 0 and len(errors) >= 1: if len(non_errors) == 0 and len(errors) > 1:
if exception_type == 'feed': if exception_type == 'feed':
self.has_feed_exception = True self.has_feed_exception = True
self.active = False self.active = False
@ -345,6 +348,10 @@ class Feed(models.Model):
elif self.exception_code > 0: elif self.exception_code > 0:
self.active = True self.active = True
self.exception_code = 0 self.exception_code = 0
if exception_type == 'feed':
self.has_feed_exception = False
elif exception_type == 'page':
self.has_page_exception = False
self.save() self.save()
return errors, non_errors return errors, non_errors
@ -1007,11 +1014,12 @@ class Feed(models.Model):
return total, random_factor*2 return total, random_factor*2
def set_next_scheduled_update(self, multiplier=1): def set_next_scheduled_update(self, error_count=0, non_error_count=0):
total, random_factor = self.get_next_scheduled_update(force=True, verbose=False) total, random_factor = self.get_next_scheduled_update(force=True, verbose=False)
if multiplier > 1: if error_count:
total = total * multiplier logging.debug(' ---> [%-30s] ~FBScheduling feed fetch geometrically: ~SB%s errors, %s non-errors' % (unicode(self)[:30], error_count, non_error_count))
total = total * error_count
next_scheduled_update = datetime.datetime.utcnow() + datetime.timedelta( next_scheduled_update = datetime.datetime.utcnow() + datetime.timedelta(
minutes = total + random_factor) minutes = total + random_factor)
@ -1022,14 +1030,11 @@ class Feed(models.Model):
self.save() self.save()
def schedule_feed_fetch_immediately(self): def schedule_feed_fetch_immediately(self):
logging.debug(' ---> [%-30s] Scheduling feed fetch immediately...' % (unicode(self)[:30]))
self.next_scheduled_update = datetime.datetime.utcnow() self.next_scheduled_update = datetime.datetime.utcnow()
self.save() self.save()
def schedule_feed_fetch_geometrically(self):
errors, non_errors = self.count_errors_in_history('feed')
self.set_next_scheduled_update(multiplier=len(errors))
# def calculate_collocations_story_content(self, # def calculate_collocations_story_content(self,
# collocation_measures=TrigramAssocMeasures, # collocation_measures=TrigramAssocMeasures,
# collocation_finder=TrigramCollocationFinder): # collocation_finder=TrigramCollocationFinder):

View file

@ -47,11 +47,10 @@ class FetchFeed:
datetime.datetime.now() - self.feed.last_update) datetime.datetime.now() - self.feed.last_update)
logging.debug(log_msg) logging.debug(log_msg)
self.feed.set_next_scheduled_update()
etag=self.feed.etag etag=self.feed.etag
modified = self.feed.last_modified.utctimetuple()[:7] if self.feed.last_modified else None modified = self.feed.last_modified.utctimetuple()[:7] if self.feed.last_modified else None
if self.options.get('force') or not self.feed.fetched_once: if self.options.get('force') or not self.feed.fetched_once or not self.feed.known_good:
modified = None modified = None
etag = None etag = None
@ -126,10 +125,9 @@ class ProcessFeed:
if self.fpf.status in (302, 301): if self.fpf.status in (302, 301):
if not self.fpf.href.endswith('feedburner.com/atom.xml'): if not self.fpf.href.endswith('feedburner.com/atom.xml'):
self.feed.feed_address = self.fpf.href self.feed.feed_address = self.fpf.href
if not self.feed.fetched_once: if not self.feed.known_good:
self.feed.has_feed_exception = True
self.feed.fetched_once = True self.feed.fetched_once = True
logging.debug(" ---> [%-30s] Feed is 302'ing, but it's not new. Refetching..." % (unicode(self.feed)[:30])) logging.debug(" ---> [%-30s] Feed is %s'ing. Refetching..." % (unicode(self.feed)[:30], self.fpf.status))
self.feed.schedule_feed_fetch_immediately() self.feed.schedule_feed_fetch_immediately()
if not self.fpf.entries: if not self.fpf.entries:
self.feed.save() self.feed.save()
@ -142,9 +140,6 @@ class ProcessFeed:
fixed_feed = self.feed.check_feed_link_for_feed_address() fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed: if not fixed_feed:
self.feed.save_feed_history(self.fpf.status, "HTTP Error") self.feed.save_feed_history(self.fpf.status, "HTTP Error")
else:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_geometrically()
self.feed.save() self.feed.save()
return FEED_ERRHTTP, ret_values return FEED_ERRHTTP, ret_values
@ -156,9 +151,6 @@ class ProcessFeed:
fixed_feed = self.feed.check_feed_link_for_feed_address() fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed: if not fixed_feed:
self.feed.save_feed_history(502, 'Non-xml feed', self.fpf.bozo_exception) self.feed.save_feed_history(502, 'Non-xml feed', self.fpf.bozo_exception)
else:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_immediately()
self.feed.save() self.feed.save()
return FEED_ERRPARSE, ret_values return FEED_ERRPARSE, ret_values
elif self.fpf.bozo and isinstance(self.fpf.bozo_exception, xml.sax._exceptions.SAXException): elif self.fpf.bozo and isinstance(self.fpf.bozo_exception, xml.sax._exceptions.SAXException):
@ -169,9 +161,6 @@ class ProcessFeed:
fixed_feed = self.feed.check_feed_link_for_feed_address() fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed: if not fixed_feed:
self.feed.save_feed_history(503, 'SAX Exception', self.fpf.bozo_exception) self.feed.save_feed_history(503, 'SAX Exception', self.fpf.bozo_exception)
else:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_immediately()
self.feed.save() self.feed.save()
return FEED_ERRPARSE, ret_values return FEED_ERRPARSE, ret_values