2010-08-31 23:19:06 -04:00
|
|
|
from apps.rss_feeds.models import FeedUpdateHistory
|
2010-07-08 11:37:54 -04:00
|
|
|
# from apps.rss_feeds.models import FeedXML
|
2009-08-29 19:34:42 +00:00
|
|
|
from django.core.cache import cache
|
2010-08-21 13:57:39 -04:00
|
|
|
from django.conf import settings
|
2010-09-19 11:30:18 -04:00
|
|
|
from apps.reader.models import UserSubscription, MUserStory
|
|
|
|
from apps.rss_feeds.models import MStory
|
2009-08-29 19:34:42 +00:00
|
|
|
from apps.rss_feeds.importer import PageImporter
|
2010-04-09 16:37:19 -04:00
|
|
|
from utils import feedparser
|
2010-07-28 19:05:03 -04:00
|
|
|
from django.db import IntegrityError
|
2009-12-18 20:47:44 +00:00
|
|
|
from utils.story_functions import pre_process_story
|
2010-08-16 12:52:39 -04:00
|
|
|
from utils import log as logging
|
2010-09-27 11:44:23 -04:00
|
|
|
from utils.feed_functions import timelimit
|
2009-08-29 19:34:42 +00:00
|
|
|
import sys
|
|
|
|
import time
|
|
|
|
import datetime
|
|
|
|
import traceback
|
2009-09-16 02:22:27 +00:00
|
|
|
import multiprocessing
|
2010-07-05 14:26:35 -04:00
|
|
|
import urllib2
|
2010-07-21 11:38:33 -04:00
|
|
|
import xml.sax
|
2010-08-21 13:57:39 -04:00
|
|
|
import socket
|
2010-09-17 12:42:44 -04:00
|
|
|
import mongoengine
|
2009-08-29 19:34:42 +00:00
|
|
|
|
|
|
|
# Refresh feed code adapted from Feedjack.
|
|
|
|
# http://feedjack.googlecode.com
|
|
|
|
|
2010-08-30 13:33:29 -04:00
|
|
|
VERSION = '0.9'
|
2010-06-08 11:19:07 -04:00
|
|
|
URL = 'http://www.newsblur.com/'
|
2010-08-03 21:53:08 -04:00
|
|
|
USER_AGENT = 'NewsBlur Fetcher %s - %s' % (VERSION, URL)
|
2009-08-29 19:34:42 +00:00
|
|
|
SLOWFEED_WARNING = 10
|
|
|
|
ENTRY_NEW, ENTRY_UPDATED, ENTRY_SAME, ENTRY_ERR = range(4)
|
|
|
|
FEED_OK, FEED_SAME, FEED_ERRPARSE, FEED_ERRHTTP, FEED_ERREXC = range(5)
|
|
|
|
|
2010-09-19 11:30:18 -04:00
|
|
|
UNREAD_CUTOFF = datetime.datetime.utcnow() - datetime.timedelta(days=settings.DAYS_OF_UNREAD)
|
|
|
|
|
2009-08-29 19:34:42 +00:00
|
|
|
def mtime(ttime):
|
|
|
|
""" datetime auxiliar function.
|
|
|
|
"""
|
|
|
|
return datetime.datetime.fromtimestamp(time.mktime(ttime))
|
|
|
|
|
2010-08-30 23:55:24 -04:00
|
|
|
|
2009-08-29 19:34:42 +00:00
|
|
|
class FetchFeed:
|
|
|
|
def __init__(self, feed, options):
|
|
|
|
self.feed = feed
|
|
|
|
self.options = options
|
|
|
|
self.fpf = None
|
2010-08-30 23:55:24 -04:00
|
|
|
|
2010-08-31 20:17:27 -04:00
|
|
|
@timelimit(20)
|
2009-08-29 19:34:42 +00:00
|
|
|
def fetch(self):
|
|
|
|
""" Downloads and parses a feed.
|
|
|
|
"""
|
2010-08-21 13:57:39 -04:00
|
|
|
socket.setdefaulttimeout(30)
|
2010-08-17 17:45:51 -04:00
|
|
|
identity = self.get_identity()
|
2010-09-07 14:41:11 -07:00
|
|
|
log_msg = u'%2s ---> [%-30s] Fetching feed (%d)' % (identity,
|
|
|
|
unicode(self.feed)[:30],
|
2010-04-06 16:56:47 -04:00
|
|
|
self.feed.id)
|
2010-08-15 12:04:26 -04:00
|
|
|
logging.debug(log_msg)
|
2010-04-27 13:44:53 -04:00
|
|
|
|
|
|
|
# Check if feed still needs to be updated
|
2010-08-31 23:19:06 -04:00
|
|
|
# feed = Feed.objects.get(pk=self.feed.pk)
|
|
|
|
# if feed.next_scheduled_update > datetime.datetime.now() and not self.options.get('force'):
|
|
|
|
# log_msg = u' ---> Already fetched %s (%d)' % (self.feed.feed_title,
|
|
|
|
# self.feed.id)
|
|
|
|
# logging.debug(log_msg)
|
|
|
|
# feed.save_feed_history(303, "Already fetched")
|
|
|
|
# return FEED_SAME, None
|
|
|
|
# else:
|
2010-09-08 18:30:46 -07:00
|
|
|
self.feed.set_next_scheduled_update()
|
2010-08-30 23:37:39 -04:00
|
|
|
|
2010-08-30 19:57:27 -04:00
|
|
|
etag=self.feed.etag
|
2010-07-08 01:26:03 -04:00
|
|
|
modified = self.feed.last_modified.utctimetuple()[:7] if self.feed.last_modified else None
|
2010-08-30 19:57:27 -04:00
|
|
|
|
2010-08-30 22:42:44 -04:00
|
|
|
if self.options.get('force'):
|
2010-08-30 19:57:27 -04:00
|
|
|
modified = None
|
|
|
|
etag = None
|
|
|
|
|
2010-07-08 01:26:03 -04:00
|
|
|
self.fpf = feedparser.parse(self.feed.feed_address,
|
|
|
|
agent=USER_AGENT,
|
2010-08-30 19:57:27 -04:00
|
|
|
etag=etag,
|
2010-07-08 01:26:03 -04:00
|
|
|
modified=modified)
|
2010-07-08 11:37:54 -04:00
|
|
|
|
2010-04-27 13:44:53 -04:00
|
|
|
return FEED_OK, self.fpf
|
2010-08-17 17:45:51 -04:00
|
|
|
|
|
|
|
def get_identity(self):
|
|
|
|
identity = "X"
|
|
|
|
|
|
|
|
current_process = multiprocessing.current_process()
|
|
|
|
if current_process._identity:
|
|
|
|
identity = current_process._identity[0]
|
|
|
|
|
|
|
|
return identity
|
|
|
|
|
2009-08-29 19:34:42 +00:00
|
|
|
class ProcessFeed:
|
2010-08-21 20:42:38 -04:00
|
|
|
def __init__(self, feed, fpf, db, options):
|
2009-08-29 19:34:42 +00:00
|
|
|
self.feed = feed
|
|
|
|
self.options = options
|
|
|
|
self.fpf = fpf
|
2009-09-16 02:34:04 +00:00
|
|
|
self.lock = multiprocessing.Lock()
|
2010-08-21 20:42:38 -04:00
|
|
|
self.db = db
|
2010-09-07 14:41:11 -07:00
|
|
|
self.entry_trans = {
|
|
|
|
ENTRY_NEW:'new',
|
|
|
|
ENTRY_UPDATED:'updated',
|
|
|
|
ENTRY_SAME:'same',
|
|
|
|
ENTRY_ERR:'error'}
|
|
|
|
self.entry_keys = sorted(self.entry_trans.keys())
|
2009-08-29 19:34:42 +00:00
|
|
|
|
|
|
|
def process(self):
|
|
|
|
""" Downloads and parses a feed.
|
|
|
|
"""
|
|
|
|
|
|
|
|
ret_values = {
|
|
|
|
ENTRY_NEW:0,
|
|
|
|
ENTRY_UPDATED:0,
|
|
|
|
ENTRY_SAME:0,
|
|
|
|
ENTRY_ERR:0}
|
|
|
|
|
2010-08-29 13:23:50 -04:00
|
|
|
# logging.debug(u' ---> [%d] Processing %s' % (self.feed.id, self.feed.feed_title))
|
2010-07-21 11:38:33 -04:00
|
|
|
|
2009-08-29 19:34:42 +00:00
|
|
|
if hasattr(self.fpf, 'status'):
|
|
|
|
if self.options['verbose']:
|
2010-09-07 14:41:11 -07:00
|
|
|
logging.debug(u' ---> [%-30s] Fetched feed, HTTP status %d: %s (bozo: %s)' % (unicode(self.feed)[:30],
|
2009-08-29 19:34:42 +00:00
|
|
|
self.fpf.status,
|
2010-08-25 19:37:07 -04:00
|
|
|
self.feed.feed_address,
|
|
|
|
self.fpf.bozo))
|
2010-09-07 15:42:22 -07:00
|
|
|
if self.fpf.bozo and self.fpf.status != 304:
|
2010-09-07 14:41:11 -07:00
|
|
|
logging.debug(u' ---> [%-30s] BOZO exception: %s' % (
|
|
|
|
unicode(self.feed)[:30],
|
|
|
|
self.fpf.bozo_exception,))
|
2009-08-29 19:34:42 +00:00
|
|
|
if self.fpf.status == 304:
|
2010-04-19 12:09:04 -04:00
|
|
|
self.feed.save()
|
2010-07-08 11:37:54 -04:00
|
|
|
self.feed.save_feed_history(304, "Not modified")
|
2009-08-29 19:34:42 +00:00
|
|
|
return FEED_SAME, ret_values
|
|
|
|
|
|
|
|
if self.fpf.status >= 400:
|
2010-04-19 12:09:04 -04:00
|
|
|
self.feed.save()
|
2010-07-08 11:37:54 -04:00
|
|
|
self.feed.save_feed_history(self.fpf.status, "HTTP Error")
|
2009-08-29 19:34:42 +00:00
|
|
|
return FEED_ERRHTTP, ret_values
|
2010-08-04 18:51:29 -04:00
|
|
|
|
|
|
|
if self.fpf.bozo and isinstance(self.fpf.bozo_exception, feedparser.NonXMLContentType):
|
|
|
|
if not self.fpf.entries:
|
2010-09-07 14:41:11 -07:00
|
|
|
logging.debug(" ---> [%-30s] Feed is Non-XML. Checking address..." % unicode(self.feed)[:30])
|
2010-08-25 19:13:28 -04:00
|
|
|
fixed_feed = self.feed.check_feed_address_for_feed_link()
|
|
|
|
if not fixed_feed:
|
|
|
|
self.feed.save_feed_history(502, 'Non-xml feed', self.fpf.bozo_exception)
|
2010-08-04 18:51:29 -04:00
|
|
|
return FEED_ERRPARSE, ret_values
|
|
|
|
elif self.fpf.bozo and isinstance(self.fpf.bozo_exception, xml.sax._exceptions.SAXException):
|
2010-09-07 14:41:11 -07:00
|
|
|
logging.debug(" ---> [%-30s] Feed is Bad XML (SAX). Checking address..." % unicode(self.feed)[:30])
|
2010-08-04 18:51:29 -04:00
|
|
|
if not self.fpf.entries:
|
2010-08-25 19:10:55 -04:00
|
|
|
fixed_feed = self.feed.check_feed_address_for_feed_link()
|
|
|
|
if not fixed_feed:
|
|
|
|
self.feed.save_feed_history(503, 'SAX Exception', self.fpf.bozo_exception)
|
2010-08-04 18:51:29 -04:00
|
|
|
return FEED_ERRPARSE, ret_values
|
|
|
|
|
2009-08-29 19:34:42 +00:00
|
|
|
# the feed has changed (or it is the first time we parse it)
|
|
|
|
# saving the etag and last_modified fields
|
2010-09-17 13:24:23 -04:00
|
|
|
self.feed.etag = self.fpf.get('etag')
|
|
|
|
if self.feed.etag:
|
|
|
|
self.feed.etag = self.feed.etag[:255]
|
2009-08-29 19:34:42 +00:00
|
|
|
# some times this is None (it never should) *sigh*
|
|
|
|
if self.feed.etag is None:
|
|
|
|
self.feed.etag = ''
|
|
|
|
|
|
|
|
try:
|
|
|
|
self.feed.last_modified = mtime(self.fpf.modified)
|
|
|
|
except:
|
|
|
|
pass
|
2010-01-26 20:39:11 -05:00
|
|
|
|
|
|
|
self.feed.feed_title = self.fpf.feed.get('title', self.feed.feed_title)
|
2010-07-08 01:07:37 -04:00
|
|
|
self.feed.feed_tagline = self.fpf.feed.get('tagline', self.feed.feed_tagline)
|
2010-01-26 20:39:11 -05:00
|
|
|
self.feed.feed_link = self.fpf.feed.get('link', self.feed.feed_link)
|
2009-08-29 19:34:42 +00:00
|
|
|
self.feed.last_update = datetime.datetime.now()
|
2010-08-25 19:22:53 -04:00
|
|
|
|
2009-08-29 19:34:42 +00:00
|
|
|
guids = []
|
|
|
|
for entry in self.fpf.entries:
|
|
|
|
if entry.get('id', ''):
|
|
|
|
guids.append(entry.get('id', ''))
|
|
|
|
elif entry.title:
|
|
|
|
guids.append(entry.title)
|
|
|
|
elif entry.link:
|
|
|
|
guids.append(entry.link)
|
2010-07-05 14:26:35 -04:00
|
|
|
|
|
|
|
self.lock.acquire()
|
|
|
|
try:
|
|
|
|
self.feed.save()
|
|
|
|
finally:
|
|
|
|
self.lock.release()
|
2009-08-29 19:34:42 +00:00
|
|
|
|
|
|
|
# Compare new stories to existing stories, adding and updating
|
2009-12-18 20:47:44 +00:00
|
|
|
start_date = datetime.datetime.now()
|
|
|
|
end_date = datetime.datetime.now()
|
2010-01-26 20:23:41 -05:00
|
|
|
story_guids = []
|
2009-12-18 20:47:44 +00:00
|
|
|
for entry in self.fpf.entries:
|
|
|
|
story = pre_process_story(entry)
|
2010-01-28 13:28:27 -05:00
|
|
|
if story.get('published') < start_date:
|
2009-12-18 20:47:44 +00:00
|
|
|
start_date = story.get('published')
|
2010-01-28 13:28:27 -05:00
|
|
|
if story.get('published') > end_date:
|
2009-12-18 20:47:44 +00:00
|
|
|
end_date = story.get('published')
|
2010-01-28 13:28:27 -05:00
|
|
|
story_guids.append(story.get('guid') or story.get('link'))
|
2010-08-21 13:57:39 -04:00
|
|
|
existing_stories = self.db.stories.find({
|
|
|
|
'story_feed_id': self.feed.pk,
|
2010-09-21 18:58:21 -04:00
|
|
|
'story_date': {'$gte': start_date},
|
|
|
|
'story_guid': {'$in': story_guids}
|
|
|
|
}).limit(len(story_guids))
|
2010-08-21 13:57:39 -04:00
|
|
|
# MStory.objects(
|
|
|
|
# (Q(story_date__gte=start_date) & Q(story_date__lte=end_date))
|
|
|
|
# | (Q(story_guid__in=story_guids)),
|
|
|
|
# story_feed=self.feed
|
|
|
|
# ).order_by('-story_date')
|
|
|
|
ret_values = self.feed.add_update_stories(self.fpf.entries, existing_stories, self.db)
|
2010-09-07 14:41:11 -07:00
|
|
|
|
|
|
|
logging.debug(u' ---> [%-30s] Parsed Feed: %s' % (
|
|
|
|
unicode(self.feed)[:30],
|
|
|
|
u' '.join(u'%s=%d' % (self.entry_trans[key],
|
|
|
|
ret_values[key]) for key in self.entry_keys),))
|
2010-08-23 09:55:21 -04:00
|
|
|
self.feed.update_all_statistics(lock=self.lock)
|
2010-09-22 15:57:55 -04:00
|
|
|
self.feed.trim_feed()
|
2010-07-23 18:56:29 -04:00
|
|
|
self.feed.save_feed_history(200, "OK")
|
|
|
|
|
2009-08-29 19:34:42 +00:00
|
|
|
return FEED_OK, ret_values
|
|
|
|
|
2010-01-21 13:12:29 -05:00
|
|
|
|
2009-08-29 19:34:42 +00:00
|
|
|
class Dispatcher:
|
|
|
|
def __init__(self, options, num_threads):
|
|
|
|
self.options = options
|
|
|
|
self.entry_stats = {
|
|
|
|
ENTRY_NEW:0,
|
|
|
|
ENTRY_UPDATED:0,
|
|
|
|
ENTRY_SAME:0,
|
|
|
|
ENTRY_ERR:0}
|
|
|
|
self.feed_stats = {
|
|
|
|
FEED_OK:0,
|
|
|
|
FEED_SAME:0,
|
|
|
|
FEED_ERRPARSE:0,
|
|
|
|
FEED_ERRHTTP:0,
|
|
|
|
FEED_ERREXC:0}
|
|
|
|
self.feed_trans = {
|
|
|
|
FEED_OK:'ok',
|
|
|
|
FEED_SAME:'unchanged',
|
|
|
|
FEED_ERRPARSE:'cant_parse',
|
|
|
|
FEED_ERRHTTP:'http_error',
|
|
|
|
FEED_ERREXC:'exception'}
|
|
|
|
self.feed_keys = sorted(self.feed_trans.keys())
|
2009-09-10 03:33:05 +00:00
|
|
|
self.num_threads = num_threads
|
2009-08-29 19:34:42 +00:00
|
|
|
self.time_start = datetime.datetime.now()
|
2009-09-16 03:54:33 +00:00
|
|
|
self.workers = []
|
2009-08-29 19:34:42 +00:00
|
|
|
|
2009-09-16 03:54:33 +00:00
|
|
|
def process_feed_wrapper(self, feed_queue):
|
2009-08-29 19:34:42 +00:00
|
|
|
""" wrapper for ProcessFeed
|
|
|
|
"""
|
2010-09-01 15:29:35 -04:00
|
|
|
if not self.options['single_threaded']:
|
|
|
|
# Close the DB so the connection can be re-opened on a per-process basis
|
|
|
|
from django.db import connection
|
|
|
|
connection.close()
|
2010-08-25 19:10:55 -04:00
|
|
|
delta = None
|
2009-09-16 02:39:45 +00:00
|
|
|
|
2010-08-21 20:42:38 -04:00
|
|
|
MONGO_DB = settings.MONGO_DB
|
2010-09-17 12:42:44 -04:00
|
|
|
db = mongoengine.connection.connect(db=MONGO_DB['NAME'], host=MONGO_DB['HOST'], port=MONGO_DB['PORT'])
|
2010-08-21 20:42:38 -04:00
|
|
|
|
2009-09-16 02:22:27 +00:00
|
|
|
current_process = multiprocessing.current_process()
|
2010-07-05 23:17:36 -04:00
|
|
|
|
2010-04-09 16:37:19 -04:00
|
|
|
identity = "X"
|
|
|
|
if current_process._identity:
|
|
|
|
identity = current_process._identity[0]
|
2009-09-16 03:54:33 +00:00
|
|
|
for feed in feed_queue:
|
2010-04-27 13:44:53 -04:00
|
|
|
ret_entries = {
|
|
|
|
ENTRY_NEW: 0,
|
|
|
|
ENTRY_UPDATED: 0,
|
|
|
|
ENTRY_SAME: 0,
|
|
|
|
ENTRY_ERR: 0
|
|
|
|
}
|
2009-09-10 03:33:05 +00:00
|
|
|
start_time = datetime.datetime.now()
|
2010-08-30 23:37:39 -04:00
|
|
|
|
2009-09-10 03:33:05 +00:00
|
|
|
### Uncomment to test feed fetcher
|
2009-09-16 03:56:58 +00:00
|
|
|
# from random import randint
|
|
|
|
# if randint(0,10) < 10:
|
|
|
|
# continue
|
2010-04-19 12:37:15 -04:00
|
|
|
|
2009-09-10 03:33:05 +00:00
|
|
|
try:
|
|
|
|
ffeed = FetchFeed(feed, self.options)
|
2010-04-27 13:44:53 -04:00
|
|
|
ret_feed, fetched_feed = ffeed.fetch()
|
2010-08-18 21:54:33 -04:00
|
|
|
|
2010-08-30 22:42:44 -04:00
|
|
|
if ((fetched_feed and ret_feed == FEED_OK) or self.options['force']):
|
2010-08-21 20:42:38 -04:00
|
|
|
pfeed = ProcessFeed(feed, fetched_feed, db, self.options)
|
2010-04-27 13:44:53 -04:00
|
|
|
ret_feed, ret_entries = pfeed.process()
|
2010-08-30 22:42:44 -04:00
|
|
|
|
2010-09-19 11:30:18 -04:00
|
|
|
if ret_entries.get(ENTRY_NEW) or self.options['force'] or not feed.fetched_once:
|
|
|
|
if not feed.fetched_once:
|
|
|
|
feed.fetched_once = True
|
|
|
|
feed.save()
|
2010-09-22 11:15:56 -04:00
|
|
|
MUserStory.delete_old_stories(feed_id=feed.pk)
|
2010-04-27 13:44:53 -04:00
|
|
|
user_subs = UserSubscription.objects.filter(feed=feed)
|
2010-09-07 14:41:11 -07:00
|
|
|
logging.debug(u' ---> [%-30s] Computing scores for all feed subscribers: %s subscribers' % (unicode(feed)[:30], user_subs.count()))
|
2010-09-19 11:30:18 -04:00
|
|
|
stories_db = MStory.objects(story_feed_id=feed.pk,
|
|
|
|
story_date__gte=UNREAD_CUTOFF)
|
2010-04-27 13:44:53 -04:00
|
|
|
for sub in user_subs:
|
|
|
|
cache.delete('usersub:%s' % sub.user_id)
|
2010-09-07 14:02:48 -07:00
|
|
|
silent = False if self.options['verbose'] >= 2 else True
|
2010-09-19 11:30:18 -04:00
|
|
|
sub.calculate_feed_scores(silent=silent, stories_db=stories_db)
|
2010-09-07 15:42:22 -07:00
|
|
|
cache.delete('feed_stories:%s-%s-%s' % (feed.id, 0, 25))
|
|
|
|
# if ret_entries.get(ENTRY_NEW) or ret_entries.get(ENTRY_UPDATED) or self.options['force']:
|
|
|
|
# feed.get_stories(force=True)
|
2010-07-07 18:40:10 -04:00
|
|
|
except KeyboardInterrupt:
|
|
|
|
break
|
2010-07-08 01:07:37 -04:00
|
|
|
except urllib2.HTTPError, e:
|
2010-07-08 11:37:54 -04:00
|
|
|
feed.save_feed_history(e.code, e.msg, e.fp.read())
|
2010-08-23 07:58:09 -04:00
|
|
|
fetched_feed = None
|
2010-07-08 01:07:37 -04:00
|
|
|
except Exception, e:
|
2010-08-15 12:04:26 -04:00
|
|
|
logging.debug('[%d] ! -------------------------' % (feed.id,))
|
2010-07-06 13:21:12 -04:00
|
|
|
tb = traceback.format_exc()
|
2010-08-15 12:04:26 -04:00
|
|
|
logging.debug(tb)
|
|
|
|
logging.debug('[%d] ! -------------------------' % (feed.id,))
|
2010-04-19 12:42:32 -04:00
|
|
|
ret_feed = FEED_ERREXC
|
2010-07-08 11:37:54 -04:00
|
|
|
feed.save_feed_history(500, "Error", tb)
|
2010-08-23 07:58:09 -04:00
|
|
|
fetched_feed = None
|
|
|
|
|
2010-08-30 22:42:44 -04:00
|
|
|
if ((self.options['force']) or
|
|
|
|
(fetched_feed and
|
|
|
|
feed.feed_link and
|
|
|
|
(ret_feed == FEED_OK or
|
|
|
|
(ret_feed == FEED_SAME and feed.stories_last_month > 10)))):
|
2010-09-07 14:41:11 -07:00
|
|
|
|
|
|
|
logging.debug(u' ---> [%-30s] Fetching page' % (unicode(feed)[:30]))
|
2010-08-23 07:58:09 -04:00
|
|
|
page_importer = PageImporter(feed.feed_link, feed)
|
|
|
|
page_importer.fetch_page()
|
2009-08-29 19:34:42 +00:00
|
|
|
|
2010-08-27 18:35:33 -04:00
|
|
|
delta = datetime.datetime.now() - start_time
|
2010-04-29 13:35:46 -04:00
|
|
|
|
2010-06-28 08:06:12 -04:00
|
|
|
feed.last_load_time = max(1, delta.seconds)
|
2010-08-09 20:44:36 -04:00
|
|
|
feed.fetched_once = True
|
2010-07-25 23:13:27 -04:00
|
|
|
try:
|
|
|
|
feed.save()
|
2010-07-28 01:14:25 -04:00
|
|
|
except IntegrityError:
|
2010-09-07 14:41:11 -07:00
|
|
|
logging.debug(" ---> [%-30s] IntegrityError on feed: %s" % (unicode(feed)[:30], feed.feed_address,))
|
2010-04-29 13:35:46 -04:00
|
|
|
|
2010-09-07 14:41:11 -07:00
|
|
|
done_msg = (u'%2s ---> [%-30s] Processed in %s [%s]' % (
|
|
|
|
identity, feed.feed_title[:30], unicode(delta),
|
|
|
|
self.feed_trans[ret_feed],))
|
2010-04-06 16:56:47 -04:00
|
|
|
logging.debug(done_msg)
|
|
|
|
|
2009-09-10 03:33:05 +00:00
|
|
|
self.feed_stats[ret_feed] += 1
|
|
|
|
for key, val in ret_entries.items():
|
|
|
|
self.entry_stats[key] += val
|
2010-08-17 17:45:51 -04:00
|
|
|
|
|
|
|
time_taken = datetime.datetime.now() - self.time_start
|
|
|
|
history = FeedUpdateHistory(
|
|
|
|
number_of_feeds=len(feed_queue),
|
|
|
|
seconds_taken=time_taken.seconds
|
|
|
|
)
|
|
|
|
history.save()
|
2010-04-09 16:37:19 -04:00
|
|
|
if not self.options['single_threaded']:
|
2010-08-15 12:04:26 -04:00
|
|
|
logging.debug("---> DONE WITH PROCESS: %s" % current_process.name)
|
2010-04-09 16:37:19 -04:00
|
|
|
sys.exit()
|
2009-08-29 19:34:42 +00:00
|
|
|
|
2010-04-25 18:31:54 -04:00
|
|
|
def add_jobs(self, feeds_queue, feeds_count=1):
|
2009-08-29 19:34:42 +00:00
|
|
|
""" adds a feed processing job to the pool
|
|
|
|
"""
|
2010-04-09 16:37:19 -04:00
|
|
|
self.feeds_queue = feeds_queue
|
2010-04-23 21:19:19 -04:00
|
|
|
self.feeds_count = feeds_count
|
2009-08-29 19:34:42 +00:00
|
|
|
|
2009-09-10 03:33:05 +00:00
|
|
|
def run_jobs(self):
|
2010-04-09 16:37:19 -04:00
|
|
|
if self.options['single_threaded']:
|
|
|
|
self.process_feed_wrapper(self.feeds_queue[0])
|
|
|
|
else:
|
|
|
|
for i in range(self.num_threads):
|
|
|
|
feed_queue = self.feeds_queue[i]
|
2010-07-05 14:26:35 -04:00
|
|
|
self.workers.append(multiprocessing.Process(target=self.process_feed_wrapper,
|
|
|
|
args=(feed_queue,)))
|
2010-04-09 16:37:19 -04:00
|
|
|
# worker.setName("Thread #%s" % (i+1))
|
|
|
|
# worker.setDaemon(True)
|
|
|
|
for i in range(self.num_threads):
|
|
|
|
self.workers[i].start()
|
2009-09-16 03:54:33 +00:00
|
|
|
|
2009-08-29 19:34:42 +00:00
|
|
|
def poll(self):
|
|
|
|
""" polls the active threads
|
|
|
|
"""
|
2010-04-09 16:37:19 -04:00
|
|
|
if not self.options['single_threaded']:
|
2009-09-16 03:54:33 +00:00
|
|
|
for i in range(self.num_threads):
|
|
|
|
self.workers[i].join()
|
2009-09-10 03:33:05 +00:00
|
|
|
done = (u'* DONE in %s\n* Feeds: %s\n* Entries: %s' % (
|
|
|
|
unicode(datetime.datetime.now() - self.time_start),
|
|
|
|
u' '.join(u'%s=%d' % (self.feed_trans[key],
|
|
|
|
self.feed_stats[key])
|
|
|
|
for key in self.feed_keys),
|
|
|
|
u' '.join(u'%s=%d' % (self.entry_trans[key],
|
|
|
|
self.entry_stats[key])
|
|
|
|
for key in self.entry_keys)
|
|
|
|
))
|
2010-08-15 12:04:26 -04:00
|
|
|
logging.debug(done)
|
2009-08-29 19:34:42 +00:00
|
|
|
return
|
2010-04-09 16:37:19 -04:00
|
|
|
|
2009-09-12 20:42:38 +00:00
|
|
|
|