diff --git a/apps/rss_feeds/management/commands/refresh_feeds.py b/apps/rss_feeds/management/commands/refresh_feeds.py index 4578e3dc8..ac1106ab6 100644 --- a/apps/rss_feeds/management/commands/refresh_feeds.py +++ b/apps/rss_feeds/management/commands/refresh_feeds.py @@ -31,6 +31,9 @@ class Command(BaseCommand): disp = feed_fetcher.Dispatcher(options, options['workerthreads']) feeds = Feed.objects.all() + + disp.run_jobs() + for feed in feeds: disp.add_job(feed) diff --git a/apps/rss_feeds/models.py b/apps/rss_feeds/models.py index 00b6515e5..4c19fc3aa 100644 --- a/apps/rss_feeds/models.py +++ b/apps/rss_feeds/models.py @@ -183,6 +183,7 @@ class Feed(models.Model): # import pdb # pdb.set_trace() + # Title distance + content distance, checking if story changed story_title_difference = levenshtein_distance(story.get('title'), existing_story['story_title']) diff --git a/utils/feed_fetcher.py b/utils/feed_fetcher.py index e10a7974e..61f6a3f12 100644 --- a/utils/feed_fetcher.py +++ b/utils/feed_fetcher.py @@ -10,8 +10,9 @@ import logging import datetime import threading import traceback +from Queue import Queue -# threadpool = None +threadpool = None # Refresh feed code adapted from Feedjack. # http://feedjack.googlecode.com @@ -55,8 +56,11 @@ class FetchFeed: self.fpf = feedparser.parse(self.feed.feed_address, agent=USER_AGENT, etag=self.feed.etag) - except: - logging.error('! ERROR: feed cannot be parsed') + except Exception, e: + log_msg = '! ERROR: feed cannot be parsed: %s' % e + logging.error(log_msg) + print(log_msg) + return FEED_ERRPARSE return self.fpf @@ -82,7 +86,6 @@ class ProcessFeed: self.options = options self.fpf = fpf - @transaction.commit_on_success def process(self): """ Downloads and parses a feed. """ @@ -204,6 +207,7 @@ class Dispatcher: FEED_ERREXC:'exception'} self.entry_keys = sorted(self.entry_trans.keys()) self.feed_keys = sorted(self.feed_trans.keys()) + self.num_threads = num_threads if threadpool: self.tpool = threadpool.ThreadPool(num_threads) else: @@ -211,57 +215,58 @@ class Dispatcher: self.time_start = datetime.datetime.now() - def process_feed_wrapper(self, feed): + def process_feed_wrapper(self, feed_queue): """ wrapper for ProcessFeed """ - start_time = datetime.datetime.now() + while True: + feed = feed_queue.get() + start_time = datetime.datetime.now() - ### Uncomment to test feed fetcher - # from random import randint - # if randint(0,10) < 10: - # return 5, {} + ### Uncomment to test feed fetcher + # from random import randint + # if randint(0,10) < 10: + # return 5, {} - try: - ffeed = FetchFeed(feed, self.options) - fetched_feed = ffeed.fetch() + try: + ffeed = FetchFeed(feed, self.options) + fetched_feed = ffeed.fetch() - pfeed = ProcessFeed(feed, fetched_feed, self.options) - ret_feed, ret_entries = pfeed.process() + pfeed = ProcessFeed(feed, fetched_feed, self.options) + ret_feed, ret_entries = pfeed.process() - fpage = FetchPage(feed, self.options) - fpage.fetch() + fpage = FetchPage(feed, self.options) + fpage.fetch() - del ffeed - del pfeed - del fpage - except: - (etype, eobj, etb) = sys.exc_info() - print '[%d] ! -------------------------' % (feed.id,) - print traceback.format_exception(etype, eobj, etb) - traceback.print_exception(etype, eobj, etb) - print '[%d] ! -------------------------' % (feed.id,) - ret_feed = FEED_ERREXC - ret_entries = {} + del ffeed + del pfeed + del fpage + except: + (etype, eobj, etb) = sys.exc_info() + print '[%d] ! -------------------------' % (feed.id,) + # print traceback.format_exception(etype, eobj, etb) + traceback.print_exception(etype, eobj, etb) + print '[%d] ! -------------------------' % (feed.id,) + ret_feed = FEED_ERREXC + ret_entries = {} - delta = datetime.datetime.now() - start_time - if delta.seconds > SLOWFEED_WARNING: - comment = u' (SLOW FEED!)' - else: - comment = u'' - done = (u'[%d] Processed %s in %s [%s] [%s]%s' % ( - feed.id, feed.feed_title, unicode(delta), - self.feed_trans[ret_feed], - u' '.join(u'%s=%d' % (self.entry_trans[key], - ret_entries[key]) for key in self.entry_keys), - comment)) - logging.debug(done) - print(done) - self.feed_stats[ret_feed] += 1 - for key, val in ret_entries.items(): - self.entry_stats[key] += val - - return ret_feed, ret_entries + delta = datetime.datetime.now() - start_time + if delta.seconds > SLOWFEED_WARNING: + comment = u' (SLOW FEED!)' + else: + comment = u'' + done = (u'[%d] Processed %s in %s [%s] [%s]%s' % ( + feed.id, feed.feed_title, unicode(delta), + self.feed_trans[ret_feed], + u' '.join(u'%s=%d' % (self.entry_trans[key], + ret_entries[key]) for key in self.entry_keys), + comment)) + logging.debug(done) + print(done) + self.feed_stats[ret_feed] += 1 + for key, val in ret_entries.items(): + self.entry_stats[key] += val + feed_queue.task_done() def add_job(self, feed): """ adds a feed processing job to the pool @@ -272,13 +277,34 @@ class Dispatcher: self.tpool.putRequest(req) else: # no threadpool module, just run the job - self.process_feed_wrapper(feed) + self.feed_queue.put(feed) + # self.process_feed_wrapper(feed) + def run_jobs(self): + self.feed_queue = Queue() + + for i in range(self.num_threads): + worker = threading.Thread(target=self.process_feed_wrapper, args=(self.feed_queue,)) + worker.setDaemon(True) + worker.start() + def poll(self): """ polls the active threads """ if not self.tpool: # no thread pool, nothing to poll + self.feed_queue.join() + done = (u'* DONE in %s\n* Feeds: %s\n* Entries: %s' % ( + unicode(datetime.datetime.now() - self.time_start), + u' '.join(u'%s=%d' % (self.feed_trans[key], + self.feed_stats[key]) + for key in self.feed_keys), + u' '.join(u'%s=%d' % (self.entry_trans[key], + self.entry_stats[key]) + for key in self.entry_keys) + )) + print done + logging.info(done) return while True: try: