NewsBlur-viq/utils/feed_fetcher.py

458 lines
20 KiB
Python
Raw Normal View History

# from apps.rss_feeds.models import FeedXML
from django.core.cache import cache
from django.conf import settings
from django.db import IntegrityError
# from mongoengine.queryset import Q
from apps.reader.models import UserSubscription, MUserStory
from apps.rss_feeds.models import Feed, MStory
from apps.rss_feeds.page_importer import PageImporter
from apps.rss_feeds.icon_importer import IconImporter
from utils import feedparser
2009-12-18 20:47:44 +00:00
from utils.story_functions import pre_process_story
from utils import log as logging
2011-02-15 21:08:40 -05:00
from utils.feed_functions import timelimit, TimeoutError, mail_feed_error_to_admin, utf8encode
import time
import datetime
import traceback
import multiprocessing
import urllib2
import xml.sax
import redis
# Refresh feed code adapted from Feedjack.
# http://feedjack.googlecode.com
SLOWFEED_WARNING = 10
ENTRY_NEW, ENTRY_UPDATED, ENTRY_SAME, ENTRY_ERR = range(4)
FEED_OK, FEED_SAME, FEED_ERRPARSE, FEED_ERRHTTP, FEED_ERREXC = range(5)
def mtime(ttime):
""" datetime auxiliar function.
"""
return datetime.datetime.fromtimestamp(time.mktime(ttime))
class FetchFeed:
def __init__(self, feed_id, options):
self.feed = Feed.objects.get(pk=feed_id)
self.options = options
self.fpf = None
@timelimit(20)
def fetch(self):
"""
Uses feedparser to download the feed. Will be parsed later.
"""
identity = self.get_identity()
2012-01-04 18:47:40 -08:00
log_msg = u'%2s ---> [%-30s] ~FYFetching feed (~FB%d~FY), last update: %s' % (identity,
unicode(self.feed)[:30],
self.feed.id,
datetime.datetime.now() - self.feed.last_update)
logging.debug(log_msg)
self.feed.set_next_scheduled_update()
etag=self.feed.etag
modified = self.feed.last_modified.utctimetuple()[:7] if self.feed.last_modified else None
if self.options.get('force') or not self.feed.fetched_once:
modified = None
etag = None
USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_1) AppleWebKit/534.48.3 (KHTML, like Gecko) Version/5.1 Safari/534.48.3 (NewsBlur Feed Fetcher - %s subscriber%s - %s)' % (
self.feed.num_subscribers,
's' if self.feed.num_subscribers != 1 else '',
settings.NEWSBLUR_URL
)
self.fpf = feedparser.parse(self.feed.feed_address,
agent=USER_AGENT,
etag=etag,
modified=modified)
return FEED_OK, self.fpf
def get_identity(self):
identity = "X"
current_process = multiprocessing.current_process()
if current_process._identity:
identity = current_process._identity[0]
return identity
class ProcessFeed:
def __init__(self, feed_id, fpf, options):
2010-10-03 18:19:23 -04:00
self.feed_id = feed_id
self.options = options
self.fpf = fpf
self.entry_trans = {
ENTRY_NEW:'new',
ENTRY_UPDATED:'updated',
ENTRY_SAME:'same',
ENTRY_ERR:'error'}
self.entry_keys = sorted(self.entry_trans.keys())
2010-10-03 18:19:23 -04:00
def refresh_feed(self):
self.feed = Feed.objects.using('default').get(pk=self.feed_id)
2010-10-03 18:19:23 -04:00
def process(self):
""" Downloads and parses a feed.
"""
2010-10-03 18:19:23 -04:00
self.refresh_feed()
ret_values = {
ENTRY_NEW:0,
ENTRY_UPDATED:0,
ENTRY_SAME:0,
ENTRY_ERR:0}
# logging.debug(u' ---> [%d] Processing %s' % (self.feed.id, self.feed.feed_title))
self.feed.last_update = datetime.datetime.utcnow()
if hasattr(self.fpf, 'status'):
if self.options['verbose']:
logging.debug(u' ---> [%-30s] Fetched feed, HTTP status %d: %s (bozo: %s)' % (unicode(self.feed)[:30],
self.fpf.status,
2010-08-25 19:37:07 -04:00
self.feed.feed_address,
self.fpf.bozo))
if self.fpf.bozo and self.fpf.status != 304:
logging.debug(u' ---> [%-30s] BOZO exception: %s (%s entries)' % (
unicode(self.feed)[:30],
self.fpf.bozo_exception,
len(self.fpf.entries)))
if self.fpf.status == 304:
self.feed.save()
self.feed.save_feed_history(304, "Not modified")
return FEED_SAME, ret_values
2010-10-03 18:16:32 -04:00
if self.fpf.status in (302, 301):
if not self.fpf.href.endswith('feedburner.com/atom.xml'):
self.feed.feed_address = self.fpf.href
if not self.feed.fetched_once:
self.feed.has_feed_exception = True
self.feed.fetched_once = True
self.feed.schedule_feed_fetch_immediately()
2010-10-06 22:34:28 -04:00
if not self.fpf.entries:
2010-10-06 22:43:05 -04:00
self.feed.save()
2010-10-06 22:34:28 -04:00
self.feed.save_feed_history(self.fpf.status, "HTTP Redirect")
return FEED_ERRHTTP, ret_values
2010-10-03 18:16:32 -04:00
if self.fpf.status >= 400:
logging.debug(" ---> [%-30s] HTTP Status code: %s.%s Checking address..." % (unicode(self.feed)[:30], self.fpf.status, ' Not' if self.feed.fetched_once else ''))
fixed_feed = None
if not self.feed.fetched_once:
fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(self.fpf.status, "HTTP Error")
else:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_immediately()
self.feed.save()
return FEED_ERRHTTP, ret_values
if self.fpf.bozo and isinstance(self.fpf.bozo_exception, feedparser.NonXMLContentType):
2011-11-12 15:36:38 -08:00
logging.debug(" ---> [%-30s] Feed is Non-XML. %s entries.%s Checking address..." % (unicode(self.feed)[:30], len(self.fpf.entries), ' Not' if self.fpf.entries else ''))
if not self.fpf.entries:
fixed_feed = None
if not self.feed.fetched_once:
fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(502, 'Non-xml feed', self.fpf.bozo_exception)
else:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_immediately()
self.feed.save()
return FEED_ERRPARSE, ret_values
elif self.fpf.bozo and isinstance(self.fpf.bozo_exception, xml.sax._exceptions.SAXException):
2011-11-12 15:36:38 -08:00
logging.debug(" ---> [%-30s] Feed has SAX/XML parsing issues. %s entries.%s Checking address..." % (unicode(self.feed)[:30], len(self.fpf.entries), ' Not' if self.fpf.entries else ''))
if not self.fpf.entries:
fixed_feed = None
if not self.feed.fetched_once:
fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(503, 'SAX Exception', self.fpf.bozo_exception)
else:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_immediately()
self.feed.save()
return FEED_ERRPARSE, ret_values
# the feed has changed (or it is the first time we parse it)
# saving the etag and last_modified fields
2010-09-17 13:24:23 -04:00
self.feed.etag = self.fpf.get('etag')
if self.feed.etag:
self.feed.etag = self.feed.etag[:255]
# some times this is None (it never should) *sigh*
if self.feed.etag is None:
self.feed.etag = ''
try:
self.feed.last_modified = mtime(self.fpf.modified)
except:
pass
2011-02-04 11:20:29 -05:00
self.fpf.entries = self.fpf.entries[:50]
self.feed.feed_title = self.fpf.feed.get('title', self.feed.feed_title)
2011-02-05 22:27:47 -05:00
tagline = self.fpf.feed.get('tagline', self.feed.data.feed_tagline)
if tagline:
self.feed.data.feed_tagline = utf8encode(tagline)
2011-02-05 22:27:47 -05:00
self.feed.data.save()
if not self.feed.feed_link_locked:
self.feed.feed_link = self.fpf.feed.get('link') or self.fpf.feed.get('id') or self.feed.feed_link
2011-02-04 11:20:29 -05:00
self.feed.last_update = datetime.datetime.utcnow()
guids = []
for entry in self.fpf.entries:
if entry.get('id', ''):
guids.append(entry.get('id', ''))
elif entry.get('link'):
guids.append(entry.link)
elif entry.get('title'):
guids.append(entry.title)
self.feed.save()
# Compare new stories to existing stories, adding and updating
start_date = datetime.datetime.utcnow()
# end_date = datetime.datetime.utcnow()
story_guids = []
2009-12-18 20:47:44 +00:00
for entry in self.fpf.entries:
story = pre_process_story(entry)
if story.get('published') < start_date:
start_date = story.get('published')
# if story.get('published') > end_date:
# end_date = story.get('published')
story_guids.append(story.get('guid') or story.get('link'))
2011-12-11 11:06:42 -08:00
existing_stories = list(MStory.objects(
# story_guid__in=story_guids,
story_date__gte=start_date,
story_feed_id=self.feed.pk
).limit(len(story_guids)))
# MStory.objects(
# (Q(story_date__gte=start_date) & Q(story_date__lte=end_date))
# | (Q(story_guid__in=story_guids)),
# story_feed=self.feed
# ).order_by('-story_date')
ret_values = self.feed.add_update_stories(self.fpf.entries, existing_stories, verbose=self.options['verbose'])
logging.debug(u' ---> [%-30s] ~FYParsed Feed: new=~FG~SB%s~SN~FY up=~FY~SB%s~SN same=~FY%s err=~FR~SB%s' % (
unicode(self.feed)[:30],
ret_values[ENTRY_NEW], ret_values[ENTRY_UPDATED], ret_values[ENTRY_SAME], ret_values[ENTRY_ERR]))
self.feed.update_all_statistics(full=bool(ret_values[ENTRY_NEW]))
2010-09-22 15:57:55 -04:00
self.feed.trim_feed()
self.feed.save_feed_history(200, "OK")
return FEED_OK, ret_values
class Dispatcher:
def __init__(self, options, num_threads):
self.options = options
self.entry_stats = {
ENTRY_NEW:0,
ENTRY_UPDATED:0,
ENTRY_SAME:0,
ENTRY_ERR:0}
self.feed_stats = {
FEED_OK:0,
FEED_SAME:0,
FEED_ERRPARSE:0,
FEED_ERRHTTP:0,
FEED_ERREXC:0}
self.feed_trans = {
FEED_OK:'ok',
FEED_SAME:'unchanged',
FEED_ERRPARSE:'cant_parse',
FEED_ERRHTTP:'http_error',
FEED_ERREXC:'exception'}
self.feed_keys = sorted(self.feed_trans.keys())
self.num_threads = num_threads
self.time_start = datetime.datetime.utcnow()
self.workers = []
2010-10-06 22:43:05 -04:00
def refresh_feed(self, feed_id):
"""Update feed, since it may have changed"""
return Feed.objects.using('default').get(pk=feed_id)
2010-10-06 22:43:05 -04:00
def process_feed_wrapper(self, feed_queue):
delta = None
current_process = multiprocessing.current_process()
identity = "X"
if current_process._identity:
identity = current_process._identity[0]
for feed_id in feed_queue:
ret_entries = {
ENTRY_NEW: 0,
ENTRY_UPDATED: 0,
ENTRY_SAME: 0,
ENTRY_ERR: 0
}
2012-01-04 18:47:40 -08:00
start_time = time.time()
ret_feed = FEED_ERREXC
try:
feed = self.refresh_feed(feed_id)
ffeed = FetchFeed(feed_id, self.options)
ret_feed, fetched_feed = ffeed.fetch()
if ((fetched_feed and ret_feed == FEED_OK) or self.options['force']):
pfeed = ProcessFeed(feed_id, fetched_feed, self.options)
ret_feed, ret_entries = pfeed.process()
feed = self.refresh_feed(feed_id)
if ret_entries.get(ENTRY_NEW) or self.options['force'] or not feed.fetched_once:
if not feed.fetched_once:
feed.fetched_once = True
feed.save()
MUserStory.delete_old_stories(feed_id=feed.pk)
try:
self.count_unreads_for_subscribers(feed)
except TimeoutError:
logging.debug(' ---> [%-30s] Unread count took too long...' % (unicode(feed)[:30],))
cache.delete('feed_stories:%s-%s-%s' % (feed.id, 0, 25))
# if ret_entries.get(ENTRY_NEW) or ret_entries.get(ENTRY_UPDATED) or self.options['force']:
# feed.get_stories(force=True)
except KeyboardInterrupt:
break
except urllib2.HTTPError, e:
feed.save_feed_history(e.code, e.msg, e.fp.read())
2010-08-23 07:58:09 -04:00
fetched_feed = None
except Feed.DoesNotExist, e:
logging.debug(' ---> [%-30s] Feed is now gone...' % (unicode(feed_id)[:30]))
continue
except TimeoutError, e:
logging.debug(' ---> [%-30s] Feed fetch timed out...' % (unicode(feed)[:30]))
2011-01-26 17:47:10 -05:00
feed.save_feed_history(505, 'Timeout', '')
fetched_feed = None
except Exception, e:
logging.debug('[%d] ! -------------------------' % (feed_id,))
2010-07-06 13:21:12 -04:00
tb = traceback.format_exc()
logging.error(tb)
logging.debug('[%d] ! -------------------------' % (feed_id,))
2010-04-19 12:42:32 -04:00
ret_feed = FEED_ERREXC
2011-04-04 20:53:12 -04:00
feed = self.refresh_feed(feed_id)
feed.save_feed_history(500, "Error", tb)
2010-08-23 07:58:09 -04:00
fetched_feed = None
2011-02-15 21:08:40 -05:00
mail_feed_error_to_admin(feed, e)
feed = self.refresh_feed(feed_id)
if ((self.options['force']) or
(fetched_feed and
feed.feed_link and
feed.has_page and
(ret_feed == FEED_OK or
(ret_feed == FEED_SAME and feed.stories_last_month > 10)))):
2012-01-04 18:30:43 -08:00
logging.debug(u' ---> [%-30s] ~FYFetching page: %s' % (unicode(feed)[:30], feed.feed_link))
page_importer = PageImporter(feed)
try:
page_importer.fetch_page()
except TimeoutError, e:
2012-01-04 18:30:43 -08:00
logging.debug(' ---> [%-30s] ~FRPage fetch timed out...' % (unicode(feed)[:30]))
feed.save_page_history(555, 'Timeout', '')
except Exception, e:
logging.debug('[%d] ! -------------------------' % (feed_id,))
tb = traceback.format_exc()
logging.error(tb)
logging.debug('[%d] ! -------------------------' % (feed_id,))
feed.save_page_history(550, "Page Error", tb)
fetched_feed = None
2011-02-15 21:08:40 -05:00
mail_feed_error_to_admin(feed, e)
2012-01-04 18:30:43 -08:00
logging.debug(u' ---> [%-30s] ~FYFetching icon: %s' % (unicode(feed)[:30], feed.feed_link))
icon_importer = IconImporter(feed, force=self.options['force'])
try:
icon_importer.save()
except TimeoutError, e:
2012-01-04 18:30:43 -08:00
logging.debug(' ---> [%-30s] ~FRIcon fetch timed out...' % (unicode(feed)[:30]))
feed.save_page_history(556, 'Timeout', '')
except Exception, e:
logging.debug('[%d] ! -------------------------' % (feed_id,))
tb = traceback.format_exc()
logging.error(tb)
logging.debug('[%d] ! -------------------------' % (feed_id,))
# feed.save_feed_history(560, "Icon Error", tb)
2011-02-15 21:08:40 -05:00
mail_feed_error_to_admin(feed, e)
else:
2012-01-04 18:47:40 -08:00
logging.debug(u' ---> [%-30s] ~FBSkipping page fetch: (%s on %s stories) %s' % (unicode(feed)[:30], self.feed_trans[ret_feed], feed.stories_last_month, '' if feed.has_page else ' [HAS NO PAGE]'))
feed = self.refresh_feed(feed_id)
2012-01-04 18:47:40 -08:00
delta = time.time() - start_time
2012-01-04 18:47:40 -08:00
feed.last_load_time = round(delta)
feed.fetched_once = True
try:
feed.save()
except IntegrityError:
2012-01-04 18:30:43 -08:00
logging.debug(" ---> [%-30s] ~FRIntegrityError on feed: %s" % (unicode(feed)[:30], feed.feed_address,))
2011-11-06 12:28:06 -08:00
if ret_entries[ENTRY_NEW]:
self.publish_to_subscribers(feed)
2012-01-04 18:47:40 -08:00
done_msg = (u'%2s ---> [%-30s] ~FYProcessed in ~FG~SB%.4ss~FY~SN (~FB%s~FY) [%s]' % (
identity, feed.feed_title[:30], delta,
feed.pk, self.feed_trans[ret_feed],))
2010-04-06 16:56:47 -04:00
logging.debug(done_msg)
self.feed_stats[ret_feed] += 1
for key, val in ret_entries.items():
self.entry_stats[key] += val
# time_taken = datetime.datetime.utcnow() - self.time_start
def publish_to_subscribers(self, feed):
try:
r = redis.Redis(connection_pool=settings.REDIS_POOL)
listeners_count = r.publish(str(feed.pk), 'story:new')
2011-11-06 12:28:06 -08:00
if listeners_count:
logging.debug(" ---> [%-30s] Published to %s subscribers" % (unicode(feed)[:30], listeners_count))
except redis.ConnectionError:
logging.debug(" ***> [%-30s] Redis is unavailable for real-time." % (unicode(feed)[:30],))
2010-11-08 11:30:06 -05:00
@timelimit(20)
def count_unreads_for_subscribers(self, feed):
UNREAD_CUTOFF = datetime.datetime.utcnow() - datetime.timedelta(days=settings.DAYS_OF_UNREAD)
user_subs = UserSubscription.objects.filter(feed=feed,
active=True,
user__profile__last_seen_on__gte=UNREAD_CUTOFF)\
.order_by('-last_read_date')
logging.debug(u' ---> [%-30s] Computing scores: %s (%s/%s/%s) subscribers' % (
unicode(feed)[:30], user_subs.count(),
feed.num_subscribers, feed.active_subscribers, feed.premium_subscribers))
2010-11-08 11:30:06 -05:00
2011-12-11 11:06:42 -08:00
stories_db = MStory.objects(story_feed_id=feed.pk,
story_date__gte=UNREAD_CUTOFF)
for sub in user_subs:
cache.delete('usersub:%s' % sub.user_id)
sub.needs_unread_recalc = True
sub.save()
if self.options['compute_scores']:
for sub in user_subs:
silent = False if self.options['verbose'] >= 2 else True
sub.calculate_feed_scores(silent=silent, stories_db=stories_db)
def add_jobs(self, feeds_queue, feeds_count=1):
""" adds a feed processing job to the pool
"""
self.feeds_queue = feeds_queue
self.feeds_count = feeds_count
def run_jobs(self):
if self.options['single_threaded']:
self.process_feed_wrapper(self.feeds_queue[0])
else:
for i in range(self.num_threads):
feed_queue = self.feeds_queue[i]
self.workers.append(multiprocessing.Process(target=self.process_feed_wrapper,
args=(feed_queue,)))
for i in range(self.num_threads):
self.workers[i].start()
2009-09-12 20:42:38 +00:00