Parallelizing feed fetcher now recognizes other feed fetchers so they do not replicate work. Now multi-threaded and multi-processed aware

This commit is contained in:
Samuel Clay 2010-04-27 13:44:53 -04:00
parent dd588c7c69
commit 2c6433b0de
2 changed files with 53 additions and 35 deletions

View file

@ -30,7 +30,7 @@ class Command(BaseCommand):
socket.setdefaulttimeout(options['timeout'])
now = datetime.datetime.now()
feeds = Feed.objects.filter(next_scheduled_update__lte=now).order_by('?')
feeds = Feed.objects.filter(next_scheduled_update__lte=now)#.order_by('?')
num_workers = min(len(feeds), options['workerthreads'])
if options['single_threaded']:

View file

@ -1,4 +1,4 @@
from apps.rss_feeds.models import Story, FeedUpdateHistory
from apps.rss_feeds.models import Feed, Story, FeedUpdateHistory
from django.core.cache import cache
from apps.reader.models import UserSubscription, UserStory
from apps.rss_feeds.importer import PageImporter
@ -17,6 +17,7 @@ import multiprocessing
import Queue
import datetime
import random
import socket
# Refresh feed code adapted from Feedjack.
# http://feedjack.googlecode.com
@ -49,7 +50,6 @@ class FetchFeed:
def fetch(self):
""" Downloads and parses a feed.
"""
current_process = multiprocessing.current_process()
identity = "X"
if current_process._identity:
@ -59,9 +59,21 @@ class FetchFeed:
self.feed.id)
logging.info(log_msg)
print(log_msg)
# Check if feed still needs to be updated
feed = Feed.objects.get(pk=self.feed.pk)
if feed.next_scheduled_update > datetime.datetime.now():
log_msg = u' ---> Already fetched %s (%d)' % (self.feed.feed_title,
self.feed.id)
logging.info(log_msg)
print(log_msg)
return FEED_SAME, None
next_scheduled_update = self.set_next_scheduled_update()
# we check the etag and the modified time to save bandwith and avoid bans
try:
socket.setdefaulttimeout(10)
self.fpf = feedparser.parse(self.feed.feed_address,
agent=USER_AGENT,
etag=self.feed.etag)
@ -70,9 +82,22 @@ class FetchFeed:
logging.error(log_msg)
print(log_msg)
return FEED_ERRPARSE
return FEED_ERRPARSE, None
return self.fpf
return FEED_OK, self.fpf
def set_next_scheduled_update(self):
# Use stories per month to calculate next feed update
updates_per_day = max(30, self.feed.stories_per_month) / 30.0 * 6
minutes_to_next_update = 60 * 24 / updates_per_day
random_factor = random.randint(0,int(minutes_to_next_update/4))
next_scheduled_update = datetime.datetime.now() + datetime.timedelta(
minutes=minutes_to_next_update+random_factor
)
self.feed.next_scheduled_update = next_scheduled_update
self.feed.save()
return next_scheduled_update
class FetchPage:
def __init__(self, feed, options):
@ -108,15 +133,6 @@ class ProcessFeed:
logging.debug(u'[%d] Processing %s' % (self.feed.id,
self.feed.feed_title))
# Use stories per month to calculate next feed update
updates_per_day = max(30, self.feed.stories_per_month) / 30.0 * 6
minutes_to_next_update = 60 * 24 / updates_per_day
random_factor = random.randint(0,int(minutes_to_next_update/4))
next_scheduled_update = datetime.datetime.now() + datetime.timedelta(
minutes=minutes_to_next_update+random_factor
)
self.feed.next_scheduled_update = next_scheduled_update
if hasattr(self.fpf, 'status'):
if self.options['verbose']:
@ -258,7 +274,15 @@ class Dispatcher:
identity = current_process._identity[0]
for feed in feed_queue:
# print "Process Feed: [%s] %s" % (current_process.name, feed)
ffeed = None
pfeed = None
fpage = None
ret_entries = {
ENTRY_NEW: 0,
ENTRY_UPDATED: 0,
ENTRY_SAME: 0,
ENTRY_ERR: 0
}
start_time = datetime.datetime.now()
### Uncomment to test feed fetcher
@ -268,20 +292,23 @@ class Dispatcher:
try:
ffeed = FetchFeed(feed, self.options)
fetched_feed = ffeed.fetch()
ret_feed, fetched_feed = ffeed.fetch()
pfeed = ProcessFeed(feed, fetched_feed, self.options)
ret_feed, ret_entries = pfeed.process()
if fetched_feed and ret_feed == FEED_OK:
pfeed = ProcessFeed(feed, fetched_feed, self.options)
ret_feed, ret_entries = pfeed.process()
fpage = FetchPage(feed, self.options)
fpage.fetch()
fpage = FetchPage(feed, self.options)
fpage.fetch()
if ENTRY_NEW in ret_entries and ret_entries[ENTRY_NEW]:
user_subs = UserSubscription.objects.filter(feed=feed)
for sub in user_subs:
logging.info('Deleteing user sub cache: %s' % sub.user_id)
cache.delete('usersub:%s' % sub.user_id)
sub.calculate_feed_scores()
if ENTRY_NEW in ret_entries and ret_entries[ENTRY_NEW]:
user_subs = UserSubscription.objects.filter(feed=feed)
for sub in user_subs:
logging.info('Deleting user sub cache: %s' % sub.user_id)
cache.delete('usersub:%s' % sub.user_id)
sub.calculate_feed_scores()
else:
continue
except:
(etype, eobj, etb) = sys.exc_info()
print '[%d] ! -------------------------' % (feed.id,)
@ -289,15 +316,6 @@ class Dispatcher:
traceback.print_exception(etype, eobj, etb)
print '[%d] ! -------------------------' % (feed.id,)
ret_feed = FEED_ERREXC
ffeed = None
pfeed = None
fpage = None
ret_entries = {
ENTRY_NEW: 0,
ENTRY_UPDATED: 0,
ENTRY_SAME: 0,
ENTRY_ERR: 0
}
finally:
if ffeed: del ffeed
if pfeed: del pfeed