Moving from threadpool to a Queue. Much simpler.

This commit is contained in:
Samuel Clay 2009-09-10 03:33:05 +00:00
parent 0caf49e9a4
commit 02a4d292e8
3 changed files with 77 additions and 47 deletions

View file

@ -31,6 +31,9 @@ class Command(BaseCommand):
disp = feed_fetcher.Dispatcher(options, options['workerthreads'])
feeds = Feed.objects.all()
disp.run_jobs()
for feed in feeds:
disp.add_job(feed)

View file

@ -183,6 +183,7 @@ class Feed(models.Model):
# import pdb
# pdb.set_trace()
# Title distance + content distance, checking if story changed
story_title_difference = levenshtein_distance(story.get('title'),
existing_story['story_title'])

View file

@ -10,8 +10,9 @@ import logging
import datetime
import threading
import traceback
from Queue import Queue
# threadpool = None
threadpool = None
# Refresh feed code adapted from Feedjack.
# http://feedjack.googlecode.com
@ -55,8 +56,11 @@ class FetchFeed:
self.fpf = feedparser.parse(self.feed.feed_address,
agent=USER_AGENT,
etag=self.feed.etag)
except:
logging.error('! ERROR: feed cannot be parsed')
except Exception, e:
log_msg = '! ERROR: feed cannot be parsed: %s' % e
logging.error(log_msg)
print(log_msg)
return FEED_ERRPARSE
return self.fpf
@ -82,7 +86,6 @@ class ProcessFeed:
self.options = options
self.fpf = fpf
@transaction.commit_on_success
def process(self):
""" Downloads and parses a feed.
"""
@ -204,6 +207,7 @@ class Dispatcher:
FEED_ERREXC:'exception'}
self.entry_keys = sorted(self.entry_trans.keys())
self.feed_keys = sorted(self.feed_trans.keys())
self.num_threads = num_threads
if threadpool:
self.tpool = threadpool.ThreadPool(num_threads)
else:
@ -211,57 +215,58 @@ class Dispatcher:
self.time_start = datetime.datetime.now()
def process_feed_wrapper(self, feed):
def process_feed_wrapper(self, feed_queue):
""" wrapper for ProcessFeed
"""
start_time = datetime.datetime.now()
while True:
feed = feed_queue.get()
start_time = datetime.datetime.now()
### Uncomment to test feed fetcher
# from random import randint
# if randint(0,10) < 10:
# return 5, {}
### Uncomment to test feed fetcher
# from random import randint
# if randint(0,10) < 10:
# return 5, {}
try:
ffeed = FetchFeed(feed, self.options)
fetched_feed = ffeed.fetch()
try:
ffeed = FetchFeed(feed, self.options)
fetched_feed = ffeed.fetch()
pfeed = ProcessFeed(feed, fetched_feed, self.options)
ret_feed, ret_entries = pfeed.process()
pfeed = ProcessFeed(feed, fetched_feed, self.options)
ret_feed, ret_entries = pfeed.process()
fpage = FetchPage(feed, self.options)
fpage.fetch()
fpage = FetchPage(feed, self.options)
fpage.fetch()
del ffeed
del pfeed
del fpage
except:
(etype, eobj, etb) = sys.exc_info()
print '[%d] ! -------------------------' % (feed.id,)
print traceback.format_exception(etype, eobj, etb)
traceback.print_exception(etype, eobj, etb)
print '[%d] ! -------------------------' % (feed.id,)
ret_feed = FEED_ERREXC
ret_entries = {}
del ffeed
del pfeed
del fpage
except:
(etype, eobj, etb) = sys.exc_info()
print '[%d] ! -------------------------' % (feed.id,)
# print traceback.format_exception(etype, eobj, etb)
traceback.print_exception(etype, eobj, etb)
print '[%d] ! -------------------------' % (feed.id,)
ret_feed = FEED_ERREXC
ret_entries = {}
delta = datetime.datetime.now() - start_time
if delta.seconds > SLOWFEED_WARNING:
comment = u' (SLOW FEED!)'
else:
comment = u''
done = (u'[%d] Processed %s in %s [%s] [%s]%s' % (
feed.id, feed.feed_title, unicode(delta),
self.feed_trans[ret_feed],
u' '.join(u'%s=%d' % (self.entry_trans[key],
ret_entries[key]) for key in self.entry_keys),
comment))
logging.debug(done)
print(done)
self.feed_stats[ret_feed] += 1
for key, val in ret_entries.items():
self.entry_stats[key] += val
return ret_feed, ret_entries
delta = datetime.datetime.now() - start_time
if delta.seconds > SLOWFEED_WARNING:
comment = u' (SLOW FEED!)'
else:
comment = u''
done = (u'[%d] Processed %s in %s [%s] [%s]%s' % (
feed.id, feed.feed_title, unicode(delta),
self.feed_trans[ret_feed],
u' '.join(u'%s=%d' % (self.entry_trans[key],
ret_entries[key]) for key in self.entry_keys),
comment))
logging.debug(done)
print(done)
self.feed_stats[ret_feed] += 1
for key, val in ret_entries.items():
self.entry_stats[key] += val
feed_queue.task_done()
def add_job(self, feed):
""" adds a feed processing job to the pool
@ -272,13 +277,34 @@ class Dispatcher:
self.tpool.putRequest(req)
else:
# no threadpool module, just run the job
self.process_feed_wrapper(feed)
self.feed_queue.put(feed)
# self.process_feed_wrapper(feed)
def run_jobs(self):
self.feed_queue = Queue()
for i in range(self.num_threads):
worker = threading.Thread(target=self.process_feed_wrapper, args=(self.feed_queue,))
worker.setDaemon(True)
worker.start()
def poll(self):
""" polls the active threads
"""
if not self.tpool:
# no thread pool, nothing to poll
self.feed_queue.join()
done = (u'* DONE in %s\n* Feeds: %s\n* Entries: %s' % (
unicode(datetime.datetime.now() - self.time_start),
u' '.join(u'%s=%d' % (self.feed_trans[key],
self.feed_stats[key])
for key in self.feed_keys),
u' '.join(u'%s=%d' % (self.entry_trans[key],
self.entry_stats[key])
for key in self.entry_keys)
))
print done
logging.info(done)
return
while True:
try: