NewsBlur-viq/utils/feed_fetcher.py
Samuel Clay 36105bf851 Merge branch 'master' into feed_settings
* master:
  Revert "Switching to requests from urllib2/httplib. Bring on the page errors."
  Fixing #41: Adding REDIS to local_settings.py.template.
  Revert "Revert "Switching to requests from urllib2/httplib. Bring on the page errors.""
  Revert "Switching to requests from urllib2/httplib. Bring on the page errors."
  Switching to requests from urllib2/httplib. Bring on the page errors.
  Adding a bunch fo keyboard shortcuts.
  Adding elapsed time field to river and feed view.
  Fixing autolinking of text links to no longer indiscriminately replace text, causing double encoding of html entities.
  Fixing the space bar key to not queue, so it's actually useful now!
  Refining media enclosures by adding all types and linking to their source.
  Refining media enclosure detection.
2011-11-27 02:41:40 -05:00

477 lines
No EOL
20 KiB
Python

# from apps.rss_feeds.models import FeedXML
from django.core.cache import cache
from django.conf import settings
from django.db import IntegrityError
# from mongoengine.queryset import Q
from apps.reader.models import UserSubscription, MUserStory
from apps.rss_feeds.models import Feed, MStory
from apps.rss_feeds.page_importer import PageImporter
from apps.rss_feeds.icon_importer import IconImporter
from utils import feedparser
from utils.story_functions import pre_process_story
from utils import log as logging
from utils.feed_functions import timelimit, TimeoutError, mail_feed_error_to_admin, utf8encode
from utils.story_functions import bunch
import time
import datetime
import traceback
import multiprocessing
import urllib2
import xml.sax
import redis
# Refresh feed code adapted from Feedjack.
# http://feedjack.googlecode.com
URL = 'http://www.newsblur.com/'
SLOWFEED_WARNING = 10
ENTRY_NEW, ENTRY_UPDATED, ENTRY_SAME, ENTRY_ERR = range(4)
FEED_OK, FEED_SAME, FEED_ERRPARSE, FEED_ERRHTTP, FEED_ERREXC = range(5)
def mtime(ttime):
""" datetime auxiliar function.
"""
return datetime.datetime.fromtimestamp(time.mktime(ttime))
class FetchFeed:
def __init__(self, feed_id, options):
self.feed = Feed.objects.get(pk=feed_id)
self.options = options
self.fpf = None
@timelimit(20)
def fetch(self):
"""
Uses feedparser to download the feed. Will be parsed later.
"""
identity = self.get_identity()
log_msg = u'%2s ---> [%-30s] Fetching feed (%d), last update: %s' % (identity,
unicode(self.feed)[:30],
self.feed.id,
datetime.datetime.now() - self.feed.last_update)
logging.debug(log_msg)
self.feed.set_next_scheduled_update()
etag=self.feed.etag
modified = self.feed.last_modified.utctimetuple()[:7] if self.feed.last_modified else None
if self.options.get('force') or not self.feed.fetched_once:
modified = None
etag = None
USER_AGENT = 'NewsBlur Feed Fetcher (%s subscriber%s) - %s (Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_1) AppleWebKit/534.48.3 (KHTML, like Gecko) Version/5.1 Safari/534.48.3)' % (
self.feed.num_subscribers,
's' if self.feed.num_subscribers != 1 else '',
URL
)
self.fpf = feedparser.parse(self.feed.feed_address,
agent=USER_AGENT,
etag=etag,
modified=modified)
return FEED_OK, self.fpf
def get_identity(self):
identity = "X"
current_process = multiprocessing.current_process()
if current_process._identity:
identity = current_process._identity[0]
return identity
class ProcessFeed:
def __init__(self, feed_id, fpf, options):
self.feed_id = feed_id
self.options = options
self.fpf = fpf
self.entry_trans = {
ENTRY_NEW:'new',
ENTRY_UPDATED:'updated',
ENTRY_SAME:'same',
ENTRY_ERR:'error'}
self.entry_keys = sorted(self.entry_trans.keys())
def refresh_feed(self):
self.feed = Feed.objects.using('default').get(pk=self.feed_id)
def process(self, first_run=True):
""" Downloads and parses a feed.
"""
self.refresh_feed()
ret_values = {
ENTRY_NEW:0,
ENTRY_UPDATED:0,
ENTRY_SAME:0,
ENTRY_ERR:0}
# logging.debug(u' ---> [%d] Processing %s' % (self.feed.id, self.feed.feed_title))
self.feed.fetched_once = True
self.feed.last_update = datetime.datetime.utcnow()
if hasattr(self.fpf, 'status'):
if self.options['verbose']:
logging.debug(u' ---> [%-30s] Fetched feed, HTTP status %d: %s (bozo: %s)' % (unicode(self.feed)[:30],
self.fpf.status,
self.feed.feed_address,
self.fpf.bozo))
if self.fpf.bozo and self.fpf.status != 304:
logging.debug(u' ---> [%-30s] BOZO exception: %s (%s entries)' % (
unicode(self.feed)[:30],
self.fpf.bozo_exception,
len(self.fpf.entries)))
if self.fpf.status == 304:
self.feed.save()
self.feed.save_feed_history(304, "Not modified")
return FEED_SAME, ret_values
if self.fpf.status in (302, 301):
if not self.fpf.href.endswith('feedburner.com/atom.xml'):
self.feed.feed_address = self.fpf.href
if first_run:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_immediately()
if not self.fpf.entries:
self.feed.save()
self.feed.save_feed_history(self.fpf.status, "HTTP Redirect")
return FEED_ERRHTTP, ret_values
if self.fpf.status >= 400:
logging.debug(" ---> [%-30s] HTTP Status code: %s. Checking address..." % (unicode(self.feed)[:30], self.fpf.status))
fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(self.fpf.status, "HTTP Error")
else:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_immediately()
self.feed.save()
return FEED_ERRHTTP, ret_values
if self.fpf.bozo and isinstance(self.fpf.bozo_exception, feedparser.NonXMLContentType):
logging.debug(" ---> [%-30s] Feed is Non-XML. %s entries.%s Checking address..." % (unicode(self.feed)[:30], len(self.fpf.entries), ' Not' if self.fpf.entries else ''))
if not self.fpf.entries:
fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(502, 'Non-xml feed', self.fpf.bozo_exception)
else:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_immediately()
self.feed.save()
return FEED_ERRPARSE, ret_values
elif self.fpf.bozo and isinstance(self.fpf.bozo_exception, xml.sax._exceptions.SAXException):
logging.debug(" ---> [%-30s] Feed has SAX/XML parsing issues. %s entries.%s Checking address..." % (unicode(self.feed)[:30], len(self.fpf.entries), ' Not' if self.fpf.entries else ''))
if not self.fpf.entries:
fixed_feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(503, 'SAX Exception', self.fpf.bozo_exception)
else:
self.feed.has_feed_exception = True
self.feed.schedule_feed_fetch_immediately()
self.feed.save()
return FEED_ERRPARSE, ret_values
# the feed has changed (or it is the first time we parse it)
# saving the etag and last_modified fields
self.feed.etag = self.fpf.get('etag')
if self.feed.etag:
self.feed.etag = self.feed.etag[:255]
# some times this is None (it never should) *sigh*
if self.feed.etag is None:
self.feed.etag = ''
try:
self.feed.last_modified = mtime(self.fpf.modified)
except:
pass
self.fpf.entries = self.fpf.entries[:50]
self.feed.feed_title = self.fpf.feed.get('title', self.feed.feed_title)
tagline = self.fpf.feed.get('tagline', self.feed.data.feed_tagline)
if tagline:
self.feed.data.feed_tagline = utf8encode(tagline)
self.feed.data.save()
if not self.feed.feed_link_locked:
self.feed.feed_link = self.fpf.feed.get('link') or self.fpf.feed.get('id') or self.feed.feed_link
self.feed.last_update = datetime.datetime.utcnow()
guids = []
for entry in self.fpf.entries:
if entry.get('id', ''):
guids.append(entry.get('id', ''))
elif entry.get('link'):
guids.append(entry.link)
elif entry.get('title'):
guids.append(entry.title)
self.feed.save()
# Compare new stories to existing stories, adding and updating
start_date = datetime.datetime.utcnow()
# end_date = datetime.datetime.utcnow()
story_guids = []
for entry in self.fpf.entries:
story = pre_process_story(entry)
if story.get('published') < start_date:
start_date = story.get('published')
# if story.get('published') > end_date:
# end_date = story.get('published')
story_guids.append(story.get('guid') or story.get('link'))
if self.options['slave_db']:
slave_db = self.options['slave_db']
stories_db_orig = slave_db.stories.find({
"story_feed_id": self.feed.pk,
"story_date": {
"$gte": start_date,
},
}).limit(len(story_guids))
existing_stories = []
for story in stories_db_orig:
existing_stories.append(bunch(story))
else:
existing_stories = list(MStory.objects(
# story_guid__in=story_guids,
story_date__gte=start_date,
story_feed_id=self.feed.pk
).limit(len(story_guids)))
# MStory.objects(
# (Q(story_date__gte=start_date) & Q(story_date__lte=end_date))
# | (Q(story_guid__in=story_guids)),
# story_feed=self.feed
# ).order_by('-story_date')
ret_values = self.feed.add_update_stories(self.fpf.entries, existing_stories, verbose=self.options['verbose'])
logging.debug(u' ---> [%-30s] ~FYParsed Feed: new=~FG~SB%s~SN~FY up=~FY~SB%s~SN same=~FY%s err=~FR~SB%s' % (
unicode(self.feed)[:30],
ret_values[ENTRY_NEW], ret_values[ENTRY_UPDATED], ret_values[ENTRY_SAME], ret_values[ENTRY_ERR]))
self.feed.update_all_statistics()
self.feed.trim_feed()
self.feed.save_feed_history(200, "OK")
return FEED_OK, ret_values
class Dispatcher:
def __init__(self, options, num_threads):
self.options = options
self.entry_stats = {
ENTRY_NEW:0,
ENTRY_UPDATED:0,
ENTRY_SAME:0,
ENTRY_ERR:0}
self.feed_stats = {
FEED_OK:0,
FEED_SAME:0,
FEED_ERRPARSE:0,
FEED_ERRHTTP:0,
FEED_ERREXC:0}
self.feed_trans = {
FEED_OK:'ok',
FEED_SAME:'unchanged',
FEED_ERRPARSE:'cant_parse',
FEED_ERRHTTP:'http_error',
FEED_ERREXC:'exception'}
self.feed_keys = sorted(self.feed_trans.keys())
self.num_threads = num_threads
self.time_start = datetime.datetime.utcnow()
self.workers = []
def refresh_feed(self, feed_id):
"""Update feed, since it may have changed"""
return Feed.objects.using('default').get(pk=feed_id)
def process_feed_wrapper(self, feed_queue):
delta = None
current_process = multiprocessing.current_process()
identity = "X"
if current_process._identity:
identity = current_process._identity[0]
for feed_id in feed_queue:
ret_entries = {
ENTRY_NEW: 0,
ENTRY_UPDATED: 0,
ENTRY_SAME: 0,
ENTRY_ERR: 0
}
start_time = datetime.datetime.utcnow()
ret_feed = FEED_ERREXC
try:
feed = self.refresh_feed(feed_id)
ffeed = FetchFeed(feed_id, self.options)
ret_feed, fetched_feed = ffeed.fetch()
if ((fetched_feed and ret_feed == FEED_OK) or self.options['force']):
pfeed = ProcessFeed(feed_id, fetched_feed, self.options)
ret_feed, ret_entries = pfeed.process()
feed = self.refresh_feed(feed_id)
if ret_entries.get(ENTRY_NEW) or self.options['force'] or not feed.fetched_once:
if not feed.fetched_once:
feed.fetched_once = True
feed.save()
MUserStory.delete_old_stories(feed_id=feed.pk)
try:
self.count_unreads_for_subscribers(feed)
except TimeoutError:
logging.debug(' ---> [%-30s] Unread count took too long...' % (unicode(feed)[:30],))
cache.delete('feed_stories:%s-%s-%s' % (feed.id, 0, 25))
# if ret_entries.get(ENTRY_NEW) or ret_entries.get(ENTRY_UPDATED) or self.options['force']:
# feed.get_stories(force=True)
except KeyboardInterrupt:
break
except urllib2.HTTPError, e:
feed.save_feed_history(e.code, e.msg, e.fp.read())
fetched_feed = None
except Feed.DoesNotExist, e:
logging.debug(' ---> [%-30s] Feed is now gone...' % (unicode(feed_id)[:30]))
continue
except TimeoutError, e:
logging.debug(' ---> [%-30s] Feed fetch timed out...' % (unicode(feed)[:30]))
feed.save_feed_history(505, 'Timeout', '')
fetched_feed = None
except Exception, e:
logging.debug('[%d] ! -------------------------' % (feed_id,))
tb = traceback.format_exc()
logging.error(tb)
logging.debug('[%d] ! -------------------------' % (feed_id,))
ret_feed = FEED_ERREXC
feed = self.refresh_feed(feed_id)
feed.save_feed_history(500, "Error", tb)
fetched_feed = None
mail_feed_error_to_admin(feed, e)
feed = self.refresh_feed(feed_id)
if ((self.options['force']) or
(fetched_feed and
feed.feed_link and
feed.has_page and
(ret_feed == FEED_OK or
(ret_feed == FEED_SAME and feed.stories_last_month > 10)))):
logging.debug(u' ---> [%-30s] Fetching page: %s' % (unicode(feed)[:30], feed.feed_link))
page_importer = PageImporter(feed.feed_link, feed)
try:
page_importer.fetch_page()
except TimeoutError, e:
logging.debug(' ---> [%-30s] Page fetch timed out...' % (unicode(feed)[:30]))
feed.save_page_history(555, 'Timeout', '')
except Exception, e:
logging.debug('[%d] ! -------------------------' % (feed_id,))
tb = traceback.format_exc()
logging.error(tb)
logging.debug('[%d] ! -------------------------' % (feed_id,))
feed.save_page_history(550, "Page Error", tb)
fetched_feed = None
mail_feed_error_to_admin(feed, e)
logging.debug(u' ---> [%-30s] Fetching icon: %s' % (unicode(feed)[:30], feed.feed_link))
icon_importer = IconImporter(feed, force=self.options['force'])
try:
icon_importer.save()
except TimeoutError, e:
logging.debug(' ---> [%-30s] Icon fetch timed out...' % (unicode(feed)[:30]))
feed.save_page_history(556, 'Timeout', '')
except Exception, e:
logging.debug('[%d] ! -------------------------' % (feed_id,))
tb = traceback.format_exc()
logging.error(tb)
logging.debug('[%d] ! -------------------------' % (feed_id,))
# feed.save_feed_history(560, "Icon Error", tb)
mail_feed_error_to_admin(feed, e)
feed = self.refresh_feed(feed_id)
delta = datetime.datetime.utcnow() - start_time
feed.last_load_time = max(1, delta.seconds)
feed.fetched_once = True
try:
feed.save()
except IntegrityError:
logging.debug(" ---> [%-30s] IntegrityError on feed: %s" % (unicode(feed)[:30], feed.feed_address,))
if ret_entries[ENTRY_NEW]:
self.publish_to_subscribers(feed)
done_msg = (u'%2s ---> [%-30s] Processed in %s (%s) [%s]' % (
identity, feed.feed_title[:30], unicode(delta),
feed.pk, self.feed_trans[ret_feed],))
logging.debug(done_msg)
self.feed_stats[ret_feed] += 1
for key, val in ret_entries.items():
self.entry_stats[key] += val
# time_taken = datetime.datetime.utcnow() - self.time_start
def publish_to_subscribers(self, feed):
try:
r = redis.Redis(connection_pool=settings.REDIS_POOL)
listeners_count = r.publish(str(feed.pk), 'story:new')
if listeners_count:
logging.debug(" ---> [%-30s] Published to %s subscribers" % (unicode(feed)[:30], listeners_count))
except redis.ConnectionError:
logging.debug(" ***> [%-30s] Redis is unavailable for real-time." % (unicode(feed)[:30],))
@timelimit(20)
def count_unreads_for_subscribers(self, feed):
UNREAD_CUTOFF = datetime.datetime.utcnow() - datetime.timedelta(days=settings.DAYS_OF_UNREAD)
user_subs = UserSubscription.objects.filter(feed=feed,
active=True,
user__profile__last_seen_on__gte=UNREAD_CUTOFF)\
.order_by('-last_read_date')
logging.debug(u' ---> [%-30s] Computing scores: %s (%s/%s/%s) subscribers' % (
unicode(feed)[:30], user_subs.count(),
feed.num_subscribers, feed.active_subscribers, feed.premium_subscribers))
if self.options['slave_db']:
slave_db = self.options['slave_db']
stories_db_orig = slave_db.stories.find({
"story_feed_id": feed.pk,
"story_date": {
"$gte": UNREAD_CUTOFF,
},
})
stories_db = []
for story in stories_db_orig:
stories_db.append(bunch(story))
else:
stories_db = MStory.objects(story_feed_id=feed.pk,
story_date__gte=UNREAD_CUTOFF)
for sub in user_subs:
cache.delete('usersub:%s' % sub.user_id)
sub.needs_unread_recalc = True
sub.save()
if self.options['compute_scores']:
for sub in user_subs:
silent = False if self.options['verbose'] >= 2 else True
sub.calculate_feed_scores(silent=silent, stories_db=stories_db)
def add_jobs(self, feeds_queue, feeds_count=1):
""" adds a feed processing job to the pool
"""
self.feeds_queue = feeds_queue
self.feeds_count = feeds_count
def run_jobs(self):
if self.options['single_threaded']:
self.process_feed_wrapper(self.feeds_queue[0])
else:
for i in range(self.num_threads):
feed_queue = self.feeds_queue[i]
self.workers.append(multiprocessing.Process(target=self.process_feed_wrapper,
args=(feed_queue,)))
for i in range(self.num_threads):
self.workers[i].start()