diff --git a/apps/rss_feeds/management/commands/refresh_feeds.py b/apps/rss_feeds/management/commands/refresh_feeds.py index 82a1c9c26..5ed63536d 100644 --- a/apps/rss_feeds/management/commands/refresh_feeds.py +++ b/apps/rss_feeds/management/commands/refresh_feeds.py @@ -30,7 +30,7 @@ class Command(BaseCommand): socket.setdefaulttimeout(options['timeout']) now = datetime.datetime.now() - feeds = Feed.objects.filter(next_scheduled_update__lte=now).order_by('?') + feeds = Feed.objects.filter(next_scheduled_update__lte=now)#.order_by('?') num_workers = min(len(feeds), options['workerthreads']) if options['single_threaded']: diff --git a/utils/feed_fetcher.py b/utils/feed_fetcher.py index 66d5b4076..629de8ac0 100644 --- a/utils/feed_fetcher.py +++ b/utils/feed_fetcher.py @@ -1,4 +1,4 @@ -from apps.rss_feeds.models import Story, FeedUpdateHistory +from apps.rss_feeds.models import Feed, Story, FeedUpdateHistory from django.core.cache import cache from apps.reader.models import UserSubscription, UserStory from apps.rss_feeds.importer import PageImporter @@ -17,6 +17,7 @@ import multiprocessing import Queue import datetime import random +import socket # Refresh feed code adapted from Feedjack. # http://feedjack.googlecode.com @@ -49,7 +50,6 @@ class FetchFeed: def fetch(self): """ Downloads and parses a feed. """ - current_process = multiprocessing.current_process() identity = "X" if current_process._identity: @@ -59,9 +59,21 @@ class FetchFeed: self.feed.id) logging.info(log_msg) print(log_msg) + + # Check if feed still needs to be updated + feed = Feed.objects.get(pk=self.feed.pk) + if feed.next_scheduled_update > datetime.datetime.now(): + log_msg = u' ---> Already fetched %s (%d)' % (self.feed.feed_title, + self.feed.id) + logging.info(log_msg) + print(log_msg) + return FEED_SAME, None + + next_scheduled_update = self.set_next_scheduled_update() # we check the etag and the modified time to save bandwith and avoid bans try: + socket.setdefaulttimeout(10) self.fpf = feedparser.parse(self.feed.feed_address, agent=USER_AGENT, etag=self.feed.etag) @@ -70,9 +82,22 @@ class FetchFeed: logging.error(log_msg) print(log_msg) - return FEED_ERRPARSE + return FEED_ERRPARSE, None - return self.fpf + return FEED_OK, self.fpf + + def set_next_scheduled_update(self): + # Use stories per month to calculate next feed update + updates_per_day = max(30, self.feed.stories_per_month) / 30.0 * 6 + minutes_to_next_update = 60 * 24 / updates_per_day + random_factor = random.randint(0,int(minutes_to_next_update/4)) + next_scheduled_update = datetime.datetime.now() + datetime.timedelta( + minutes=minutes_to_next_update+random_factor + ) + self.feed.next_scheduled_update = next_scheduled_update + self.feed.save() + + return next_scheduled_update class FetchPage: def __init__(self, feed, options): @@ -108,15 +133,6 @@ class ProcessFeed: logging.debug(u'[%d] Processing %s' % (self.feed.id, self.feed.feed_title)) - - # Use stories per month to calculate next feed update - updates_per_day = max(30, self.feed.stories_per_month) / 30.0 * 6 - minutes_to_next_update = 60 * 24 / updates_per_day - random_factor = random.randint(0,int(minutes_to_next_update/4)) - next_scheduled_update = datetime.datetime.now() + datetime.timedelta( - minutes=minutes_to_next_update+random_factor - ) - self.feed.next_scheduled_update = next_scheduled_update if hasattr(self.fpf, 'status'): if self.options['verbose']: @@ -258,7 +274,15 @@ class Dispatcher: identity = current_process._identity[0] for feed in feed_queue: # print "Process Feed: [%s] %s" % (current_process.name, feed) - + ffeed = None + pfeed = None + fpage = None + ret_entries = { + ENTRY_NEW: 0, + ENTRY_UPDATED: 0, + ENTRY_SAME: 0, + ENTRY_ERR: 0 + } start_time = datetime.datetime.now() ### Uncomment to test feed fetcher @@ -268,20 +292,23 @@ class Dispatcher: try: ffeed = FetchFeed(feed, self.options) - fetched_feed = ffeed.fetch() + ret_feed, fetched_feed = ffeed.fetch() - pfeed = ProcessFeed(feed, fetched_feed, self.options) - ret_feed, ret_entries = pfeed.process() + if fetched_feed and ret_feed == FEED_OK: + pfeed = ProcessFeed(feed, fetched_feed, self.options) + ret_feed, ret_entries = pfeed.process() - fpage = FetchPage(feed, self.options) - fpage.fetch() + fpage = FetchPage(feed, self.options) + fpage.fetch() - if ENTRY_NEW in ret_entries and ret_entries[ENTRY_NEW]: - user_subs = UserSubscription.objects.filter(feed=feed) - for sub in user_subs: - logging.info('Deleteing user sub cache: %s' % sub.user_id) - cache.delete('usersub:%s' % sub.user_id) - sub.calculate_feed_scores() + if ENTRY_NEW in ret_entries and ret_entries[ENTRY_NEW]: + user_subs = UserSubscription.objects.filter(feed=feed) + for sub in user_subs: + logging.info('Deleting user sub cache: %s' % sub.user_id) + cache.delete('usersub:%s' % sub.user_id) + sub.calculate_feed_scores() + else: + continue except: (etype, eobj, etb) = sys.exc_info() print '[%d] ! -------------------------' % (feed.id,) @@ -289,15 +316,6 @@ class Dispatcher: traceback.print_exception(etype, eobj, etb) print '[%d] ! -------------------------' % (feed.id,) ret_feed = FEED_ERREXC - ffeed = None - pfeed = None - fpage = None - ret_entries = { - ENTRY_NEW: 0, - ENTRY_UPDATED: 0, - ENTRY_SAME: 0, - ENTRY_ERR: 0 - } finally: if ffeed: del ffeed if pfeed: del pfeed