mirror of
https://github.com/samuelclay/NewsBlur.git
synced 2025-08-05 16:58:59 +00:00

* master: Fixing bogus type error, caused by duplicate feeds. Correcting styling on site settings loader icon. Removing dupe migration from feed_settings. Adding migration for unique hashes on feed addresses + links. Counting subscribers on branched feeds correctly, so as to maintain update frequency, even though it's branched. Hooking up feed address change nd branching. Also fixing those infernal feed address change bugs for good. Finishing up feed_link corrections. Just need feed_address (which is essentially a refactor and abstraction of this feed_link code), and then duplicate UI, which is pretty easy. Yay. Refactoring feed rewriting to take advantage of new feed link changer. Seems to work. Still needs some better duplicate locking during fetch. Uniqifying address+link hash. Allowing feed_link branches, but it does not yet work. Needs proper dupe detection. Beginning new branch feed logic. Needs a huge db migration. Swapping titles on feed settings when necessary. Swapping rss and website url. Stubbing out entire feed settings modal. Needs duplicate feed list. Also needs backend splitting of subscriptions. Adding feed setting menu item. Cleaning up feed exception dialog.
476 lines
No EOL
20 KiB
Python
476 lines
No EOL
20 KiB
Python
# from apps.rss_feeds.models import FeedXML
|
|
from django.core.cache import cache
|
|
from django.conf import settings
|
|
from django.db import IntegrityError
|
|
# from mongoengine.queryset import Q
|
|
from apps.reader.models import UserSubscription, MUserStory
|
|
from apps.rss_feeds.models import Feed, MStory
|
|
from apps.rss_feeds.page_importer import PageImporter
|
|
from apps.rss_feeds.icon_importer import IconImporter
|
|
from utils import feedparser
|
|
from utils.story_functions import pre_process_story
|
|
from utils import log as logging
|
|
from utils.feed_functions import timelimit, TimeoutError, mail_feed_error_to_admin, utf8encode
|
|
from utils.story_functions import bunch
|
|
import time
|
|
import datetime
|
|
import traceback
|
|
import multiprocessing
|
|
import urllib2
|
|
import xml.sax
|
|
import redis
|
|
|
|
# Refresh feed code adapted from Feedjack.
|
|
# http://feedjack.googlecode.com
|
|
|
|
SLOWFEED_WARNING = 10
|
|
ENTRY_NEW, ENTRY_UPDATED, ENTRY_SAME, ENTRY_ERR = range(4)
|
|
FEED_OK, FEED_SAME, FEED_ERRPARSE, FEED_ERRHTTP, FEED_ERREXC = range(5)
|
|
|
|
def mtime(ttime):
|
|
""" datetime auxiliar function.
|
|
"""
|
|
return datetime.datetime.fromtimestamp(time.mktime(ttime))
|
|
|
|
|
|
class FetchFeed:
|
|
def __init__(self, feed_id, options):
|
|
self.feed = Feed.objects.get(pk=feed_id)
|
|
self.options = options
|
|
self.fpf = None
|
|
|
|
@timelimit(20)
|
|
def fetch(self):
|
|
"""
|
|
Uses feedparser to download the feed. Will be parsed later.
|
|
"""
|
|
identity = self.get_identity()
|
|
log_msg = u'%2s ---> [%-30s] Fetching feed (%d), last update: %s' % (identity,
|
|
unicode(self.feed)[:30],
|
|
self.feed.id,
|
|
datetime.datetime.now() - self.feed.last_update)
|
|
logging.debug(log_msg)
|
|
|
|
self.feed.set_next_scheduled_update()
|
|
etag=self.feed.etag
|
|
modified = self.feed.last_modified.utctimetuple()[:7] if self.feed.last_modified else None
|
|
|
|
if self.options.get('force') or not self.feed.fetched_once:
|
|
modified = None
|
|
etag = None
|
|
|
|
USER_AGENT = 'NewsBlur Feed Fetcher (%s subscriber%s) - %s (Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_1) AppleWebKit/534.48.3 (KHTML, like Gecko) Version/5.1 Safari/534.48.3)' % (
|
|
self.feed.num_subscribers,
|
|
's' if self.feed.num_subscribers != 1 else '',
|
|
settings.NEWSBLUR_URL
|
|
)
|
|
|
|
self.fpf = feedparser.parse(self.feed.feed_address,
|
|
agent=USER_AGENT,
|
|
etag=etag,
|
|
modified=modified)
|
|
|
|
return FEED_OK, self.fpf
|
|
|
|
def get_identity(self):
|
|
identity = "X"
|
|
|
|
current_process = multiprocessing.current_process()
|
|
if current_process._identity:
|
|
identity = current_process._identity[0]
|
|
|
|
return identity
|
|
|
|
class ProcessFeed:
|
|
def __init__(self, feed_id, fpf, options):
|
|
self.feed_id = feed_id
|
|
self.options = options
|
|
self.fpf = fpf
|
|
self.entry_trans = {
|
|
ENTRY_NEW:'new',
|
|
ENTRY_UPDATED:'updated',
|
|
ENTRY_SAME:'same',
|
|
ENTRY_ERR:'error'}
|
|
self.entry_keys = sorted(self.entry_trans.keys())
|
|
|
|
def refresh_feed(self):
|
|
self.feed = Feed.objects.using('default').get(pk=self.feed_id)
|
|
|
|
def process(self, first_run=True):
|
|
""" Downloads and parses a feed.
|
|
"""
|
|
self.refresh_feed()
|
|
|
|
ret_values = {
|
|
ENTRY_NEW:0,
|
|
ENTRY_UPDATED:0,
|
|
ENTRY_SAME:0,
|
|
ENTRY_ERR:0}
|
|
|
|
# logging.debug(u' ---> [%d] Processing %s' % (self.feed.id, self.feed.feed_title))
|
|
|
|
self.feed.fetched_once = True
|
|
self.feed.last_update = datetime.datetime.utcnow()
|
|
|
|
if hasattr(self.fpf, 'status'):
|
|
if self.options['verbose']:
|
|
logging.debug(u' ---> [%-30s] Fetched feed, HTTP status %d: %s (bozo: %s)' % (unicode(self.feed)[:30],
|
|
self.fpf.status,
|
|
self.feed.feed_address,
|
|
self.fpf.bozo))
|
|
if self.fpf.bozo and self.fpf.status != 304:
|
|
logging.debug(u' ---> [%-30s] BOZO exception: %s (%s entries)' % (
|
|
unicode(self.feed)[:30],
|
|
self.fpf.bozo_exception,
|
|
len(self.fpf.entries)))
|
|
if self.fpf.status == 304:
|
|
self.feed.save()
|
|
self.feed.save_feed_history(304, "Not modified")
|
|
return FEED_SAME, ret_values
|
|
|
|
if self.fpf.status in (302, 301):
|
|
if not self.fpf.href.endswith('feedburner.com/atom.xml'):
|
|
self.feed.feed_address = self.fpf.href
|
|
if first_run:
|
|
self.feed.has_feed_exception = True
|
|
self.feed.schedule_feed_fetch_immediately()
|
|
if not self.fpf.entries:
|
|
self.feed.save()
|
|
self.feed.save_feed_history(self.fpf.status, "HTTP Redirect")
|
|
return FEED_ERRHTTP, ret_values
|
|
|
|
if self.fpf.status >= 400:
|
|
logging.debug(" ---> [%-30s] HTTP Status code: %s. Checking address..." % (unicode(self.feed)[:30], self.fpf.status))
|
|
fixed_feed = self.feed.check_feed_link_for_feed_address()
|
|
if not fixed_feed:
|
|
self.feed.save_feed_history(self.fpf.status, "HTTP Error")
|
|
else:
|
|
self.feed.has_feed_exception = True
|
|
self.feed.schedule_feed_fetch_immediately()
|
|
self.feed.save()
|
|
return FEED_ERRHTTP, ret_values
|
|
|
|
if self.fpf.bozo and isinstance(self.fpf.bozo_exception, feedparser.NonXMLContentType):
|
|
logging.debug(" ---> [%-30s] Feed is Non-XML. %s entries.%s Checking address..." % (unicode(self.feed)[:30], len(self.fpf.entries), ' Not' if self.fpf.entries else ''))
|
|
if not self.fpf.entries:
|
|
fixed_feed = self.feed.check_feed_link_for_feed_address()
|
|
if not fixed_feed:
|
|
self.feed.save_feed_history(502, 'Non-xml feed', self.fpf.bozo_exception)
|
|
else:
|
|
self.feed.has_feed_exception = True
|
|
self.feed.schedule_feed_fetch_immediately()
|
|
self.feed.save()
|
|
return FEED_ERRPARSE, ret_values
|
|
elif self.fpf.bozo and isinstance(self.fpf.bozo_exception, xml.sax._exceptions.SAXException):
|
|
logging.debug(" ---> [%-30s] Feed has SAX/XML parsing issues. %s entries.%s Checking address..." % (unicode(self.feed)[:30], len(self.fpf.entries), ' Not' if self.fpf.entries else ''))
|
|
if not self.fpf.entries:
|
|
fixed_feed = self.feed.check_feed_link_for_feed_address()
|
|
if not fixed_feed:
|
|
self.feed.save_feed_history(503, 'SAX Exception', self.fpf.bozo_exception)
|
|
else:
|
|
self.feed.has_feed_exception = True
|
|
self.feed.schedule_feed_fetch_immediately()
|
|
self.feed.save()
|
|
return FEED_ERRPARSE, ret_values
|
|
|
|
# the feed has changed (or it is the first time we parse it)
|
|
# saving the etag and last_modified fields
|
|
self.feed.etag = self.fpf.get('etag')
|
|
if self.feed.etag:
|
|
self.feed.etag = self.feed.etag[:255]
|
|
# some times this is None (it never should) *sigh*
|
|
if self.feed.etag is None:
|
|
self.feed.etag = ''
|
|
|
|
try:
|
|
self.feed.last_modified = mtime(self.fpf.modified)
|
|
except:
|
|
pass
|
|
|
|
self.fpf.entries = self.fpf.entries[:50]
|
|
|
|
self.feed.feed_title = self.fpf.feed.get('title', self.feed.feed_title)
|
|
tagline = self.fpf.feed.get('tagline', self.feed.data.feed_tagline)
|
|
if tagline:
|
|
self.feed.data.feed_tagline = utf8encode(tagline)
|
|
self.feed.data.save()
|
|
if not self.feed.feed_link_locked:
|
|
self.feed.feed_link = self.fpf.feed.get('link') or self.fpf.feed.get('id') or self.feed.feed_link
|
|
|
|
self.feed.last_update = datetime.datetime.utcnow()
|
|
|
|
guids = []
|
|
for entry in self.fpf.entries:
|
|
if entry.get('id', ''):
|
|
guids.append(entry.get('id', ''))
|
|
elif entry.get('link'):
|
|
guids.append(entry.link)
|
|
elif entry.get('title'):
|
|
guids.append(entry.title)
|
|
self.feed.save()
|
|
|
|
# Compare new stories to existing stories, adding and updating
|
|
start_date = datetime.datetime.utcnow()
|
|
# end_date = datetime.datetime.utcnow()
|
|
story_guids = []
|
|
for entry in self.fpf.entries:
|
|
story = pre_process_story(entry)
|
|
if story.get('published') < start_date:
|
|
start_date = story.get('published')
|
|
# if story.get('published') > end_date:
|
|
# end_date = story.get('published')
|
|
story_guids.append(story.get('guid') or story.get('link'))
|
|
|
|
if self.options['slave_db']:
|
|
slave_db = self.options['slave_db']
|
|
stories_db_orig = slave_db.stories.find({
|
|
"story_feed_id": self.feed.pk,
|
|
"story_date": {
|
|
"$gte": start_date,
|
|
},
|
|
}).limit(len(story_guids))
|
|
existing_stories = []
|
|
for story in stories_db_orig:
|
|
existing_stories.append(bunch(story))
|
|
else:
|
|
existing_stories = list(MStory.objects(
|
|
# story_guid__in=story_guids,
|
|
story_date__gte=start_date,
|
|
story_feed_id=self.feed.pk
|
|
).limit(len(story_guids)))
|
|
|
|
# MStory.objects(
|
|
# (Q(story_date__gte=start_date) & Q(story_date__lte=end_date))
|
|
# | (Q(story_guid__in=story_guids)),
|
|
# story_feed=self.feed
|
|
# ).order_by('-story_date')
|
|
ret_values = self.feed.add_update_stories(self.fpf.entries, existing_stories, verbose=self.options['verbose'])
|
|
|
|
logging.debug(u' ---> [%-30s] ~FYParsed Feed: new=~FG~SB%s~SN~FY up=~FY~SB%s~SN same=~FY%s err=~FR~SB%s' % (
|
|
unicode(self.feed)[:30],
|
|
ret_values[ENTRY_NEW], ret_values[ENTRY_UPDATED], ret_values[ENTRY_SAME], ret_values[ENTRY_ERR]))
|
|
self.feed.update_all_statistics()
|
|
self.feed.trim_feed()
|
|
self.feed.save_feed_history(200, "OK")
|
|
|
|
return FEED_OK, ret_values
|
|
|
|
|
|
class Dispatcher:
|
|
def __init__(self, options, num_threads):
|
|
self.options = options
|
|
self.entry_stats = {
|
|
ENTRY_NEW:0,
|
|
ENTRY_UPDATED:0,
|
|
ENTRY_SAME:0,
|
|
ENTRY_ERR:0}
|
|
self.feed_stats = {
|
|
FEED_OK:0,
|
|
FEED_SAME:0,
|
|
FEED_ERRPARSE:0,
|
|
FEED_ERRHTTP:0,
|
|
FEED_ERREXC:0}
|
|
self.feed_trans = {
|
|
FEED_OK:'ok',
|
|
FEED_SAME:'unchanged',
|
|
FEED_ERRPARSE:'cant_parse',
|
|
FEED_ERRHTTP:'http_error',
|
|
FEED_ERREXC:'exception'}
|
|
self.feed_keys = sorted(self.feed_trans.keys())
|
|
self.num_threads = num_threads
|
|
self.time_start = datetime.datetime.utcnow()
|
|
self.workers = []
|
|
|
|
def refresh_feed(self, feed_id):
|
|
"""Update feed, since it may have changed"""
|
|
return Feed.objects.using('default').get(pk=feed_id)
|
|
|
|
def process_feed_wrapper(self, feed_queue):
|
|
delta = None
|
|
current_process = multiprocessing.current_process()
|
|
identity = "X"
|
|
if current_process._identity:
|
|
identity = current_process._identity[0]
|
|
|
|
for feed_id in feed_queue:
|
|
ret_entries = {
|
|
ENTRY_NEW: 0,
|
|
ENTRY_UPDATED: 0,
|
|
ENTRY_SAME: 0,
|
|
ENTRY_ERR: 0
|
|
}
|
|
start_time = datetime.datetime.utcnow()
|
|
ret_feed = FEED_ERREXC
|
|
try:
|
|
feed = self.refresh_feed(feed_id)
|
|
|
|
ffeed = FetchFeed(feed_id, self.options)
|
|
ret_feed, fetched_feed = ffeed.fetch()
|
|
|
|
if ((fetched_feed and ret_feed == FEED_OK) or self.options['force']):
|
|
pfeed = ProcessFeed(feed_id, fetched_feed, self.options)
|
|
ret_feed, ret_entries = pfeed.process()
|
|
|
|
feed = self.refresh_feed(feed_id)
|
|
|
|
if ret_entries.get(ENTRY_NEW) or self.options['force'] or not feed.fetched_once:
|
|
if not feed.fetched_once:
|
|
feed.fetched_once = True
|
|
feed.save()
|
|
MUserStory.delete_old_stories(feed_id=feed.pk)
|
|
try:
|
|
self.count_unreads_for_subscribers(feed)
|
|
except TimeoutError:
|
|
logging.debug(' ---> [%-30s] Unread count took too long...' % (unicode(feed)[:30],))
|
|
cache.delete('feed_stories:%s-%s-%s' % (feed.id, 0, 25))
|
|
# if ret_entries.get(ENTRY_NEW) or ret_entries.get(ENTRY_UPDATED) or self.options['force']:
|
|
# feed.get_stories(force=True)
|
|
except KeyboardInterrupt:
|
|
break
|
|
except urllib2.HTTPError, e:
|
|
feed.save_feed_history(e.code, e.msg, e.fp.read())
|
|
fetched_feed = None
|
|
except Feed.DoesNotExist, e:
|
|
logging.debug(' ---> [%-30s] Feed is now gone...' % (unicode(feed_id)[:30]))
|
|
continue
|
|
except TimeoutError, e:
|
|
logging.debug(' ---> [%-30s] Feed fetch timed out...' % (unicode(feed)[:30]))
|
|
feed.save_feed_history(505, 'Timeout', '')
|
|
fetched_feed = None
|
|
except Exception, e:
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
tb = traceback.format_exc()
|
|
logging.error(tb)
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
ret_feed = FEED_ERREXC
|
|
feed = self.refresh_feed(feed_id)
|
|
feed.save_feed_history(500, "Error", tb)
|
|
fetched_feed = None
|
|
mail_feed_error_to_admin(feed, e)
|
|
|
|
feed = self.refresh_feed(feed_id)
|
|
if ((self.options['force']) or
|
|
(fetched_feed and
|
|
feed.feed_link and
|
|
feed.has_page and
|
|
(ret_feed == FEED_OK or
|
|
(ret_feed == FEED_SAME and feed.stories_last_month > 10)))):
|
|
|
|
logging.debug(u' ---> [%-30s] Fetching page: %s' % (unicode(feed)[:30], feed.feed_link))
|
|
page_importer = PageImporter(feed.feed_link, feed)
|
|
try:
|
|
page_importer.fetch_page()
|
|
except TimeoutError, e:
|
|
logging.debug(' ---> [%-30s] Page fetch timed out...' % (unicode(feed)[:30]))
|
|
feed.save_page_history(555, 'Timeout', '')
|
|
except Exception, e:
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
tb = traceback.format_exc()
|
|
logging.error(tb)
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
feed.save_page_history(550, "Page Error", tb)
|
|
fetched_feed = None
|
|
mail_feed_error_to_admin(feed, e)
|
|
|
|
logging.debug(u' ---> [%-30s] Fetching icon: %s' % (unicode(feed)[:30], feed.feed_link))
|
|
icon_importer = IconImporter(feed, force=self.options['force'])
|
|
try:
|
|
icon_importer.save()
|
|
except TimeoutError, e:
|
|
logging.debug(' ---> [%-30s] Icon fetch timed out...' % (unicode(feed)[:30]))
|
|
feed.save_page_history(556, 'Timeout', '')
|
|
except Exception, e:
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
tb = traceback.format_exc()
|
|
logging.error(tb)
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
# feed.save_feed_history(560, "Icon Error", tb)
|
|
mail_feed_error_to_admin(feed, e)
|
|
|
|
feed = self.refresh_feed(feed_id)
|
|
delta = datetime.datetime.utcnow() - start_time
|
|
|
|
feed.last_load_time = max(1, delta.seconds)
|
|
feed.fetched_once = True
|
|
try:
|
|
feed.save()
|
|
except IntegrityError:
|
|
logging.debug(" ---> [%-30s] IntegrityError on feed: %s" % (unicode(feed)[:30], feed.feed_address,))
|
|
|
|
if ret_entries[ENTRY_NEW]:
|
|
self.publish_to_subscribers(feed)
|
|
|
|
done_msg = (u'%2s ---> [%-30s] Processed in %s (%s) [%s]' % (
|
|
identity, feed.feed_title[:30], unicode(delta),
|
|
feed.pk, self.feed_trans[ret_feed],))
|
|
logging.debug(done_msg)
|
|
|
|
self.feed_stats[ret_feed] += 1
|
|
for key, val in ret_entries.items():
|
|
self.entry_stats[key] += val
|
|
|
|
# time_taken = datetime.datetime.utcnow() - self.time_start
|
|
|
|
def publish_to_subscribers(self, feed):
|
|
try:
|
|
r = redis.Redis(connection_pool=settings.REDIS_POOL)
|
|
listeners_count = r.publish(str(feed.pk), 'story:new')
|
|
if listeners_count:
|
|
logging.debug(" ---> [%-30s] Published to %s subscribers" % (unicode(feed)[:30], listeners_count))
|
|
except redis.ConnectionError:
|
|
logging.debug(" ***> [%-30s] Redis is unavailable for real-time." % (unicode(feed)[:30],))
|
|
|
|
@timelimit(20)
|
|
def count_unreads_for_subscribers(self, feed):
|
|
UNREAD_CUTOFF = datetime.datetime.utcnow() - datetime.timedelta(days=settings.DAYS_OF_UNREAD)
|
|
user_subs = UserSubscription.objects.filter(feed=feed,
|
|
active=True,
|
|
user__profile__last_seen_on__gte=UNREAD_CUTOFF)\
|
|
.order_by('-last_read_date')
|
|
logging.debug(u' ---> [%-30s] Computing scores: %s (%s/%s/%s) subscribers' % (
|
|
unicode(feed)[:30], user_subs.count(),
|
|
feed.num_subscribers, feed.active_subscribers, feed.premium_subscribers))
|
|
|
|
if self.options['slave_db']:
|
|
slave_db = self.options['slave_db']
|
|
|
|
stories_db_orig = slave_db.stories.find({
|
|
"story_feed_id": feed.pk,
|
|
"story_date": {
|
|
"$gte": UNREAD_CUTOFF,
|
|
},
|
|
})
|
|
stories_db = []
|
|
for story in stories_db_orig:
|
|
stories_db.append(bunch(story))
|
|
else:
|
|
stories_db = MStory.objects(story_feed_id=feed.pk,
|
|
story_date__gte=UNREAD_CUTOFF)
|
|
for sub in user_subs:
|
|
cache.delete('usersub:%s' % sub.user_id)
|
|
sub.needs_unread_recalc = True
|
|
sub.save()
|
|
|
|
if self.options['compute_scores']:
|
|
for sub in user_subs:
|
|
silent = False if self.options['verbose'] >= 2 else True
|
|
sub.calculate_feed_scores(silent=silent, stories_db=stories_db)
|
|
|
|
def add_jobs(self, feeds_queue, feeds_count=1):
|
|
""" adds a feed processing job to the pool
|
|
"""
|
|
self.feeds_queue = feeds_queue
|
|
self.feeds_count = feeds_count
|
|
|
|
def run_jobs(self):
|
|
if self.options['single_threaded']:
|
|
self.process_feed_wrapper(self.feeds_queue[0])
|
|
else:
|
|
for i in range(self.num_threads):
|
|
feed_queue = self.feeds_queue[i]
|
|
self.workers.append(multiprocessing.Process(target=self.process_feed_wrapper,
|
|
args=(feed_queue,)))
|
|
for i in range(self.num_threads):
|
|
self.workers[i].start()
|
|
|
|
|