NewsBlur-viq/utils/feed_fetcher.py

1248 lines
54 KiB
Python

import datetime
import multiprocessing
import time
import traceback
import django
django.setup()
import http
import http.client
import urllib.error
import urllib.parse
import urllib.request
http.client._MAXHEADERS = 10000
import random
import re
import xml.sax
import feedparser
import pymongo
import redis
import requests
from django.conf import settings
from django.core.cache import cache
from django.db import IntegrityError
from sentry_sdk import set_user
from apps.notifications.models import MUserFeedNotification
from apps.notifications.tasks import QueueNotifications
from apps.push.models import PushSubscription
from apps.reader.models import UserSubscription
from apps.rss_feeds.icon_importer import IconImporter
from apps.rss_feeds.models import Feed, MStory
from apps.rss_feeds.page_importer import PageImporter
from apps.statistics.models import MAnalyticsFetcher, MStatistics
feedparser.sanitizer._HTMLSanitizer.acceptable_elements.update(['iframe'])
feedparser.sanitizer._HTMLSanitizer.acceptable_elements.update(['text'])
from bs4 import BeautifulSoup
from celery.exceptions import SoftTimeLimitExceeded
from django.utils import feedgenerator
from django.utils.encoding import smart_str
from django.utils.html import linebreaks
from mongoengine import connect, connection
from qurl import qurl
from sentry_sdk import capture_exception, flush
from utils import json_functions as json
from utils import log as logging
from utils.facebook_fetcher import FacebookFetcher
from utils.feed_functions import TimeoutError, timelimit
from utils.json_fetcher import JSONFetcher
from utils.story_functions import linkify, pre_process_story, strip_tags
from utils.twitter_fetcher import TwitterFetcher
from utils.youtube_fetcher import YoutubeFetcher
# from utils.feed_functions import mail_feed_error_to_admin
# Refresh feed code adapted from Feedjack.
# http://feedjack.googlecode.com
FEED_OK, FEED_SAME, FEED_ERRPARSE, FEED_ERRHTTP, FEED_ERREXC = list(range(5))
class FetchFeed:
def __init__(self, feed_id, options):
self.feed = Feed.get_by_id(feed_id)
self.options = options
self.fpf = None
self.raw_feed = None
@timelimit(45)
def fetch(self):
"""
Uses requests to download the feed, parsing it in feedparser. Will be storified later.
"""
start = time.time()
identity = self.get_identity()
if self.options.get('archive_page', None):
log_msg = '%2s ---> [%-30s] ~FYFetching feed (~FB%d~FY) ~BG~FMarchive page~ST~FY: ~SB%s' % (
identity,
self.feed.log_title[:30],
self.feed.id,
self.options['archive_page'],
)
else:
log_msg = '%2s ---> [%-30s] ~FYFetching feed (~FB%d~FY), last update: %s' % (
identity,
self.feed.log_title[:30],
self.feed.id,
datetime.datetime.now() - self.feed.last_update,
)
logging.debug(log_msg)
etag = self.feed.etag
modified = self.feed.last_modified.utctimetuple()[:7] if self.feed.last_modified else None
address = self.feed.feed_address
if self.options.get('force') or self.options.get('archive_page', None) or random.random() <= 0.01:
self.options['force'] = True
modified = None
etag = None
if self.options.get('archive_page', None) == "rfc5005" and self.options.get('archive_page_link', None):
address = self.options['archive_page_link']
elif self.options.get('archive_page', None):
address = qurl(address, add={self.options['archive_page_key']: self.options['archive_page']})
elif address.startswith('http'):
address = qurl(address, add={"_": random.randint(0, 10000)})
logging.debug(' ---> [%-30s] ~FBForcing fetch: %s' % (self.feed.log_title[:30], address))
elif not self.feed.fetched_once or not self.feed.known_good:
modified = None
etag = None
if self.options.get('feed_xml'):
logging.debug(
' ---> [%-30s] ~FM~BKFeed has been fat pinged. Ignoring fat: %s'
% (self.feed.log_title[:30], len(self.options.get('feed_xml')))
)
if self.options.get('fpf'):
self.fpf = self.options.get('fpf')
logging.debug(
' ---> [%-30s] ~FM~BKFeed fetched in real-time with fat ping.' % (self.feed.log_title[:30])
)
return FEED_OK, self.fpf
if 'youtube.com' in address:
youtube_feed = self.fetch_youtube()
if not youtube_feed:
logging.debug(
' ***> [%-30s] ~FRYouTube fetch failed: %s.' % (self.feed.log_title[:30], address)
)
return FEED_ERRHTTP, None
self.fpf = feedparser.parse(youtube_feed, sanitize_html=False)
elif re.match(r'(https?)?://twitter.com/\w+/?', qurl(address, remove=['_'])):
twitter_feed = self.fetch_twitter(address)
if not twitter_feed:
logging.debug(
' ***> [%-30s] ~FRTwitter fetch failed: %s' % (self.feed.log_title[:30], address)
)
return FEED_ERRHTTP, None
self.fpf = feedparser.parse(twitter_feed)
elif re.match(r'(.*?)facebook.com/\w+/?$', qurl(address, remove=['_'])):
facebook_feed = self.fetch_facebook()
if not facebook_feed:
logging.debug(
' ***> [%-30s] ~FRFacebook fetch failed: %s' % (self.feed.log_title[:30], address)
)
return FEED_ERRHTTP, None
self.fpf = feedparser.parse(facebook_feed)
if not self.fpf and 'json' in address:
try:
headers = self.feed.fetch_headers()
if etag:
headers['If-None-Match'] = etag
if modified:
# format into an RFC 1123-compliant timestamp. We can't use
# time.strftime() since the %a and %b directives can be affected
# by the current locale, but RFC 2616 states that dates must be
# in English.
short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
months = [
'Jan',
'Feb',
'Mar',
'Apr',
'May',
'Jun',
'Jul',
'Aug',
'Sep',
'Oct',
'Nov',
'Dec',
]
modified_header = '%s, %02d %s %04d %02d:%02d:%02d GMT' % (
short_weekdays[modified[6]],
modified[2],
months[modified[1] - 1],
modified[0],
modified[3],
modified[4],
modified[5],
)
headers['If-Modified-Since'] = modified_header
if etag or modified:
headers['A-IM'] = 'feed'
try:
raw_feed = requests.get(address, headers=headers, timeout=15)
except (requests.adapters.ConnectionError, TimeoutError):
raw_feed = None
if not raw_feed or raw_feed.status_code >= 400:
if raw_feed:
logging.debug(
" ***> [%-30s] ~FRFeed fetch was %s status code, trying fake user agent: %s"
% (self.feed.log_title[:30], raw_feed.status_code, raw_feed.headers)
)
else:
logging.debug(" ***> [%-30s] ~FRJson feed fetch timed out, trying fake headers: %s" % (self.feed.log_title[:30], address))
raw_feed = requests.get(
self.feed.feed_address,
headers=self.feed.fetch_headers(fake=True),
timeout=15,
)
json_feed_content_type = any(
json_feed in raw_feed.headers.get('Content-Type', "")
for json_feed in ['application/feed+json', 'application/json']
)
if raw_feed.content and json_feed_content_type:
# JSON Feed
json_feed = self.fetch_json_feed(address, raw_feed)
if not json_feed:
logging.debug(
' ***> [%-30s] ~FRJSON fetch failed: %s' % (self.feed.log_title[:30], address)
)
return FEED_ERRHTTP, None
self.fpf = feedparser.parse(json_feed)
elif raw_feed.content and raw_feed.status_code < 400:
response_headers = raw_feed.headers
response_headers['Content-Location'] = raw_feed.url
self.raw_feed = smart_str(raw_feed.content)
self.fpf = feedparser.parse(self.raw_feed, response_headers=response_headers)
if self.options['verbose']:
logging.debug(
" ---> [%-30s] ~FBFeed fetch status %s: %s length / %s"
% (
self.feed.log_title[:30],
raw_feed.status_code,
len(smart_str(raw_feed.content)),
raw_feed.headers,
)
)
except Exception as e:
logging.debug(
" ***> [%-30s] ~FRFeed failed to fetch with request, trying feedparser: %s"
% (self.feed.log_title[:30], str(e))
)
# raise e
if not self.fpf or self.options.get('force_fp', False):
try:
self.fpf = feedparser.parse(address, agent=self.feed.user_agent, etag=etag, modified=modified)
except (
TypeError,
ValueError,
KeyError,
EOFError,
MemoryError,
urllib.error.URLError,
http.client.InvalidURL,
http.client.BadStatusLine,
http.client.IncompleteRead,
ConnectionResetError,
TimeoutError,
) as e:
logging.debug(' ***> [%-30s] ~FRFeed fetch error: %s' % (self.feed.log_title[:30], e))
pass
if not self.fpf:
try:
logging.debug(' ***> [%-30s] ~FRTurning off headers: %s' % (self.feed.log_title[:30], address))
self.fpf = feedparser.parse(address, agent=self.feed.user_agent)
except (
TypeError,
ValueError,
KeyError,
EOFError,
MemoryError,
urllib.error.URLError,
http.client.InvalidURL,
http.client.BadStatusLine,
http.client.IncompleteRead,
ConnectionResetError,
) as e:
logging.debug(' ***> [%-30s] ~FRFetch failed: %s.' % (self.feed.log_title[:30], e))
return FEED_ERRHTTP, None
logging.debug(
' ---> [%-30s] ~FYFeed fetch in ~FM%.4ss' % (self.feed.log_title[:30], time.time() - start)
)
return FEED_OK, self.fpf
def get_identity(self):
identity = "X"
current_process = multiprocessing.current_process()
if current_process._identity:
identity = current_process._identity[0]
return identity
def fetch_twitter(self, address=None):
twitter_fetcher = TwitterFetcher(self.feed, self.options)
return twitter_fetcher.fetch(address)
def fetch_facebook(self):
facebook_fetcher = FacebookFetcher(self.feed, self.options)
return facebook_fetcher.fetch()
def fetch_json_feed(self, address, headers):
json_fetcher = JSONFetcher(self.feed, self.options)
return json_fetcher.fetch(address, headers)
def fetch_youtube(self):
youtube_fetcher = YoutubeFetcher(self.feed, self.options)
return youtube_fetcher.fetch()
class ProcessFeed:
def __init__(self, feed_id, fpf, options, raw_feed=None):
self.feed_id = feed_id
self.options = options
self.fpf = fpf
self.raw_feed = raw_feed
self.archive_seen_story_hashes = set()
def refresh_feed(self):
self.feed = Feed.get_by_id(self.feed_id)
if self.feed_id != self.feed.pk:
logging.debug(" ***> Feed has changed: from %s to %s" % (self.feed_id, self.feed.pk))
self.feed_id = self.feed.pk
def process(self):
"""Downloads and parses a feed."""
start = time.time()
self.refresh_feed()
if not self.options.get('archive_page', None):
feed_status, ret_values = self.verify_feed_integrity()
if feed_status and ret_values:
return feed_status, ret_values
self.fpf.entries = self.fpf.entries[:100]
if not self.options.get('archive_page', None):
self.compare_feed_attribute_changes()
# Determine if stories aren't valid and replace broken guids
guids_seen = set()
permalinks_seen = set()
for entry in self.fpf.entries:
guids_seen.add(entry.get('guid'))
permalinks_seen.add(Feed.get_permalink(entry))
guid_difference = len(guids_seen) != len(self.fpf.entries)
single_guid = len(guids_seen) == 1
replace_guids = single_guid and guid_difference
permalink_difference = len(permalinks_seen) != len(self.fpf.entries)
single_permalink = len(permalinks_seen) == 1
replace_permalinks = single_permalink and permalink_difference
# Compare new stories to existing stories, adding and updating
start_date = datetime.datetime.utcnow()
day_ago = datetime.datetime.now() - datetime.timedelta(days=1)
story_hashes = []
stories = []
for entry in self.fpf.entries:
story = pre_process_story(entry, self.fpf.encoding)
if not story['title'] and not story['story_content']:
continue
if self.options.get('archive_page', None) and story.get('published') > day_ago:
# Archive only: Arbitrary but necessary to prevent feeds from creating an unlimited number of stories
# because they don't have a guid so it gets auto-generated based on the date, and if the story
# is missing a date, then the latest date gets used. So reject anything newer than 24 hours old
# when filling out the archive.
# logging.debug(f" ---> [%-30s] ~FBTossing story because it's too new for the archive: ~SB{story}")
continue
if story.get('published') < start_date:
start_date = story.get('published')
if replace_guids:
if replace_permalinks:
new_story_guid = str(story.get('published'))
if self.options['verbose']:
logging.debug(
' ---> [%-30s] ~FBReplacing guid (%s) with timestamp: %s'
% (self.feed.log_title[:30], story.get('guid'), new_story_guid)
)
story['guid'] = new_story_guid
else:
new_story_guid = Feed.get_permalink(story)
if self.options['verbose']:
logging.debug(
' ---> [%-30s] ~FBReplacing guid (%s) with permalink: %s'
% (self.feed.log_title[:30], story.get('guid'), new_story_guid)
)
story['guid'] = new_story_guid
story['story_hash'] = MStory.feed_guid_hash_unsaved(self.feed.pk, story.get('guid'))
stories.append(story)
story_hashes.append(story.get('story_hash'))
original_story_hash_count = len(story_hashes)
story_hashes_in_unread_cutoff = self.feed.story_hashes_in_unread_cutoff[:original_story_hash_count]
story_hashes.extend(story_hashes_in_unread_cutoff)
story_hashes = list(set(story_hashes))
if self.options['verbose'] or settings.DEBUG:
logging.debug(
' ---> [%-30s] ~FBFound ~SB%s~SN guids, adding ~SB%s~SN/%s guids from db'
% (
self.feed.log_title[:30],
original_story_hash_count,
len(story_hashes) - original_story_hash_count,
len(story_hashes_in_unread_cutoff),
)
)
existing_stories = dict(
(s.story_hash, s)
for s in MStory.objects(
story_hash__in=story_hashes,
# story_date__gte=start_date,
# story_feed_id=self.feed.pk
)
)
# if len(existing_stories) == 0:
# existing_stories = dict((s.story_hash, s) for s in MStory.objects(
# story_date__gte=start_date,
# story_feed_id=self.feed.pk
# ))
ret_values = self.feed.add_update_stories(
stories,
existing_stories,
verbose=self.options['verbose'],
updates_off=self.options['updates_off'],
)
# PubSubHubbub
if not self.options.get('archive_page', None):
self.check_feed_for_push()
# Push notifications
if ret_values['new'] > 0 and MUserFeedNotification.feed_has_users(self.feed.pk) > 0:
QueueNotifications.delay(self.feed.pk, ret_values['new'])
# All Done
logging.debug(
' ---> [%-30s] ~FYParsed Feed: %snew=%s~SN~FY %sup=%s~SN same=%s%s~SN %serr=%s~SN~FY total=~SB%s'
% (
self.feed.log_title[:30],
'~FG~SB' if ret_values['new'] else '',
ret_values['new'],
'~FY~SB' if ret_values['updated'] else '',
ret_values['updated'],
'~SB' if ret_values['same'] else '',
ret_values['same'],
'~FR~SB' if ret_values['error'] else '',
ret_values['error'],
len(self.fpf.entries),
)
)
self.feed.update_all_statistics(has_new_stories=bool(ret_values['new']), force=self.options['force'])
fetch_date = datetime.datetime.now()
if ret_values['new']:
if not getattr(settings, 'TEST_DEBUG', False):
self.feed.trim_feed()
self.feed.expire_redis()
if MStatistics.get('raw_feed', None) == self.feed.pk:
self.feed.save_raw_feed(self.raw_feed, fetch_date)
self.feed.save_feed_history(200, "OK", date=fetch_date)
if self.options['verbose']:
logging.debug(
' ---> [%-30s] ~FBTIME: feed parse in ~FM%.4ss'
% (self.feed.log_title[:30], time.time() - start)
)
if self.options.get('archive_page', None):
self.archive_seen_story_hashes.update(story_hashes)
return FEED_OK, ret_values
def verify_feed_integrity(self):
"""Ensures stories come through and any abberant status codes get saved
Returns:
FEED_STATUS: enum
ret_values: dictionary of counts of new, updated, same, and error stories
"""
ret_values = dict(new=0, updated=0, same=0, error=0)
if not self.feed:
return FEED_ERREXC, ret_values
if hasattr(self.fpf, 'status'):
if self.options['verbose']:
if self.fpf.bozo and self.fpf.status != 304:
logging.debug(
' ---> [%-30s] ~FRBOZO exception: %s ~SB(%s entries)'
% (self.feed.log_title[:30], self.fpf.bozo_exception, len(self.fpf.entries))
)
if self.fpf.status == 304:
self.feed = self.feed.save()
self.feed.save_feed_history(304, "Not modified")
return FEED_SAME, ret_values
# 302 and 307: Temporary redirect: ignore
# 301 and 308: Permanent redirect: save it (after 10 tries)
if self.fpf.status == 301 or self.fpf.status == 308:
if self.fpf.href.endswith('feedburner.com/atom.xml'):
return FEED_ERRHTTP, ret_values
redirects, non_redirects = self.feed.count_redirects_in_history('feed')
self.feed.save_feed_history(
self.fpf.status, "HTTP Redirect (%d to go)" % (10 - len(redirects))
)
if len(redirects) >= 10 or len(non_redirects) == 0:
address = self.fpf.href
if self.options['force'] and address:
address = qurl(address, remove=['_'])
self.feed.feed_address = address
if not self.feed.known_good:
self.feed.fetched_once = True
logging.debug(
" ---> [%-30s] ~SB~SK~FRFeed is %s'ing. Refetching..."
% (self.feed.log_title[:30], self.fpf.status)
)
self.feed = self.feed.schedule_feed_fetch_immediately()
if not self.fpf.entries:
self.feed = self.feed.save()
self.feed.save_feed_history(self.fpf.status, "HTTP Redirect")
return FEED_ERRHTTP, ret_values
if self.fpf.status >= 400:
logging.debug(
" ---> [%-30s] ~SB~FRHTTP Status code: %s. Checking address..."
% (self.feed.log_title[:30], self.fpf.status)
)
fixed_feed = None
if not self.feed.known_good:
fixed_feed, feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(self.fpf.status, "HTTP Error")
else:
self.feed = feed
self.feed = self.feed.save()
return FEED_ERRHTTP, ret_values
if not self.fpf:
logging.debug(
" ---> [%-30s] ~SB~FRFeed is Non-XML. No feedparser feed either!"
% (self.feed.log_title[:30])
)
self.feed.save_feed_history(551, "Broken feed")
return FEED_ERRHTTP, ret_values
if self.fpf and not self.fpf.entries:
if self.fpf.bozo and isinstance(self.fpf.bozo_exception, feedparser.NonXMLContentType):
logging.debug(
" ---> [%-30s] ~SB~FRFeed is Non-XML. %s entries. Checking address..."
% (self.feed.log_title[:30], len(self.fpf.entries))
)
fixed_feed = None
if not self.feed.known_good:
fixed_feed, feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(552, 'Non-xml feed', self.fpf.bozo_exception)
else:
self.feed = feed
self.feed = self.feed.save()
return FEED_ERRPARSE, ret_values
elif self.fpf.bozo and isinstance(self.fpf.bozo_exception, xml.sax._exceptions.SAXException):
logging.debug(
" ---> [%-30s] ~SB~FRFeed has SAX/XML parsing issues. %s entries. Checking address..."
% (self.feed.log_title[:30], len(self.fpf.entries))
)
fixed_feed = None
if not self.feed.known_good:
fixed_feed, feed = self.feed.check_feed_link_for_feed_address()
if not fixed_feed:
self.feed.save_feed_history(553, 'Not an RSS feed', self.fpf.bozo_exception)
else:
self.feed = feed
self.feed = self.feed.save()
return FEED_ERRPARSE, ret_values
return None, None
def compare_feed_attribute_changes(self):
"""
The feed has changed (or it is the first time we parse it)
saving the etag and last_modified fields
"""
if not self.feed:
logging.debug(f"Missing feed: {self.feed}")
return
original_etag = self.feed.etag
self.feed.etag = self.fpf.get('etag')
if self.feed.etag:
self.feed.etag = self.feed.etag[:255]
# some times this is None (it never should) *sigh*
if self.feed.etag is None:
self.feed.etag = ''
if self.feed.etag != original_etag:
self.feed.save(update_fields=['etag'])
original_last_modified = self.feed.last_modified
if hasattr(self.fpf, 'modified') and self.fpf.modified:
try:
self.feed.last_modified = datetime.datetime.strptime(
self.fpf.modified, '%a, %d %b %Y %H:%M:%S %Z'
)
except Exception as e:
self.feed.last_modified = None
logging.debug("Broken mtime %s: %s" % (self.feed.last_modified, e))
pass
if self.feed.last_modified != original_last_modified:
self.feed.save(update_fields=['last_modified'])
original_title = self.feed.feed_title
if self.fpf.feed.get('title'):
self.feed.feed_title = strip_tags(self.fpf.feed.get('title'))
if self.feed.feed_title != original_title:
self.feed.save(update_fields=['feed_title'])
tagline = self.fpf.feed.get('tagline', self.feed.data.feed_tagline)
if tagline:
original_tagline = self.feed.data.feed_tagline
self.feed.data.feed_tagline = smart_str(tagline)
if self.feed.data.feed_tagline != original_tagline:
self.feed.data.save(update_fields=['feed_tagline'])
if not self.feed.feed_link_locked:
new_feed_link = self.fpf.feed.get('link') or self.fpf.feed.get('id') or self.feed.feed_link
if self.options['force'] and new_feed_link:
new_feed_link = qurl(new_feed_link, remove=['_'])
if new_feed_link != self.feed.feed_link:
logging.debug(
" ---> [%-30s] ~SB~FRFeed's page is different: %s to %s"
% (self.feed.log_title[:30], self.feed.feed_link, new_feed_link)
)
redirects, non_redirects = self.feed.count_redirects_in_history('page')
self.feed.save_page_history(301, "HTTP Redirect (%s to go)" % (10 - len(redirects)))
if len(redirects) >= 10 or len(non_redirects) == 0:
self.feed.feed_link = new_feed_link
self.feed.save(update_fields=['feed_link'])
def check_feed_for_push(self):
if not (hasattr(self.fpf, 'feed') and hasattr(self.fpf.feed, 'links') and self.fpf.feed.links):
return
hub_url = None
self_url = self.feed.feed_address
for link in self.fpf.feed.links:
if link['rel'] == 'hub' and not hub_url:
hub_url = link['href']
elif link['rel'] == 'self':
self_url = link['href']
push_expired = False
if self.feed.is_push:
try:
push_expired = self.feed.push.lease_expires < datetime.datetime.now()
except PushSubscription.DoesNotExist:
self.feed.is_push = False
if (
hub_url
and self_url
and not settings.DEBUG
and self.feed.active_subscribers > 0
and (push_expired or not self.feed.is_push or self.options.get('force'))
):
logging.debug(
' ---> [%-30s] ~BB~FW%sSubscribing to PuSH hub: %s'
% (self.feed.log_title[:30], "~SKRe-~SN" if push_expired else "", hub_url)
)
try:
if settings.ENABLE_PUSH:
PushSubscription.objects.subscribe(self_url, feed=self.feed, hub=hub_url)
except TimeoutError:
logging.debug(
' ---> [%-30s] ~BB~FW~FRTimed out~FW subscribing to PuSH hub: %s'
% (self.feed.log_title[:30], hub_url)
)
elif self.feed.is_push and (self.feed.active_subscribers <= 0 or not hub_url):
logging.debug(
' ---> [%-30s] ~BB~FWTurning off PuSH, no hub found' % (self.feed.log_title[:30])
)
self.feed.is_push = False
self.feed = self.feed.save()
class FeedFetcherWorker:
def __init__(self, options):
self.options = options
self.feed_stats = {
FEED_OK: 0,
FEED_SAME: 0,
FEED_ERRPARSE: 0,
FEED_ERRHTTP: 0,
FEED_ERREXC: 0,
}
self.feed_trans = {
FEED_OK: 'ok',
FEED_SAME: 'unchanged',
FEED_ERRPARSE: 'cant_parse',
FEED_ERRHTTP: 'http_error',
FEED_ERREXC: 'exception',
}
self.feed_keys = sorted(self.feed_trans.keys())
self.time_start = datetime.datetime.utcnow()
def refresh_feed(self, feed_id):
"""Update feed, since it may have changed"""
return Feed.get_by_id(feed_id)
def reset_database_connections(self):
connection._connections = {}
connection._connection_settings = {}
connection._dbs = {}
settings.MONGODB = connect(settings.MONGO_DB_NAME, **settings.MONGO_DB)
if 'username' in settings.MONGO_ANALYTICS_DB:
settings.MONGOANALYTICSDB = connect(
db=settings.MONGO_ANALYTICS_DB['name'],
host=f"mongodb://{settings.MONGO_ANALYTICS_DB['username']}:{settings.MONGO_ANALYTICS_DB['password']}@{settings.MONGO_ANALYTICS_DB['host']}/?authSource=admin",
alias="nbanalytics",
)
else:
settings.MONGOANALYTICSDB = connect(
db=settings.MONGO_ANALYTICS_DB['name'],
host=f"mongodb://{settings.MONGO_ANALYTICS_DB['host']}/",
alias="nbanalytics",
)
def process_feed_wrapper(self, feed_queue):
self.reset_database_connections()
delta = None
current_process = multiprocessing.current_process()
identity = "X"
feed = None
if current_process._identity:
identity = current_process._identity[0]
# If fetching archive pages, come back once the archive scaffolding is built
if self.options.get('archive_page', None):
for feed_id in feed_queue:
feed = self.refresh_feed(feed_id)
try:
self.fetch_and_process_archive_pages(feed_id)
except SoftTimeLimitExceeded:
logging.debug(
' ---> [%-30s] ~FRTime limit reached while fetching ~FGarchive pages~FR. Made it to ~SB%s'
% (feed.log_title[:30], self.options['archive_page'])
)
pass
if len(feed_queue) == 1:
feed = self.refresh_feed(feed_queue[0])
return feed
return
for feed_id in feed_queue:
start_duration = time.time()
feed_fetch_duration = None
feed_process_duration = None
page_duration = None
icon_duration = None
feed_code = None
ret_entries = None
start_time = time.time()
ret_feed = FEED_ERREXC
set_user({"id": feed_id})
try:
feed = self.refresh_feed(feed_id)
set_user({"id": feed_id, "username": feed.feed_title})
skip = False
if self.options.get('fake'):
skip = True
weight = "-"
quick = "-"
rand = "-"
elif (
self.options.get('quick')
and not self.options['force']
and feed.known_good
and feed.fetched_once
and not feed.is_push
):
weight = feed.stories_last_month * feed.num_subscribers
random_weight = random.randint(1, max(weight, 1))
quick = float(self.options.get('quick', 0))
rand = random.random()
if random_weight < 1000 and rand < quick:
skip = True
elif False and feed.feed_address.startswith("http://news.google.com/news"):
skip = True
weight = "-"
quick = "-"
rand = "-"
if skip:
logging.debug(
' ---> [%-30s] ~BGFaking fetch, skipping (%s/month, %s subs, %s < %s)...'
% (feed.log_title[:30], weight, feed.num_subscribers, rand, quick)
)
continue
ffeed = FetchFeed(feed_id, self.options)
ret_feed, fetched_feed = ffeed.fetch()
feed_fetch_duration = time.time() - start_duration
raw_feed = ffeed.raw_feed
if fetched_feed and (ret_feed == FEED_OK or self.options['force']):
pfeed = ProcessFeed(feed_id, fetched_feed, self.options, raw_feed=raw_feed)
ret_feed, ret_entries = pfeed.process()
feed = pfeed.feed
feed_process_duration = time.time() - start_duration
if (ret_entries and ret_entries['new']) or self.options['force']:
start = time.time()
if not feed.known_good or not feed.fetched_once:
feed.known_good = True
feed.fetched_once = True
feed = feed.save()
if self.options['force'] or random.random() <= 0.02:
logging.debug(
' ---> [%-30s] ~FBPerforming feed cleanup...' % (feed.log_title[:30],)
)
start_cleanup = time.time()
feed.count_fs_size_bytes()
logging.debug(
' ---> [%-30s] ~FBDone with feed cleanup. Took ~SB%.4s~SN sec.'
% (feed.log_title[:30], time.time() - start_cleanup)
)
try:
self.count_unreads_for_subscribers(feed)
except TimeoutError:
logging.debug(
' ---> [%-30s] Unread count took too long...' % (feed.log_title[:30],)
)
if self.options['verbose']:
logging.debug(
' ---> [%-30s] ~FBTIME: unread count in ~FM%.4ss'
% (feed.log_title[:30], time.time() - start)
)
except (urllib.error.HTTPError, urllib.error.URLError) as e:
logging.debug(
' ---> [%-30s] ~FRFeed throws HTTP error: ~SB%s' % (str(feed_id)[:30], e.reason)
)
feed_code = 404
feed.save_feed_history(feed_code, str(e.reason), e)
fetched_feed = None
except Feed.DoesNotExist:
logging.debug(' ---> [%-30s] ~FRFeed is now gone...' % (str(feed_id)[:30]))
continue
except SoftTimeLimitExceeded as e:
logging.debug(" ---> [%-30s] ~BR~FWTime limit hit!~SB~FR Moving on to next feed..." % feed)
ret_feed = FEED_ERREXC
fetched_feed = None
feed_code = 559
feed.save_feed_history(feed_code, 'Timeout', e)
except TimeoutError as e:
logging.debug(' ---> [%-30s] ~FRFeed fetch timed out...' % (feed.log_title[:30]))
feed_code = 505
feed.save_feed_history(feed_code, 'Timeout', e)
fetched_feed = None
except Exception as e:
logging.debug('[%d] ! -------------------------' % (feed_id,))
tb = traceback.format_exc()
logging.error(tb)
logging.debug('[%d] ! -------------------------' % (feed_id,))
ret_feed = FEED_ERREXC
feed = Feed.get_by_id(getattr(feed, 'pk', feed_id))
if not feed:
continue
feed.save_feed_history(500, "Error", tb)
feed_code = 500
fetched_feed = None
# mail_feed_error_to_admin(feed, e, local_vars=locals())
if not settings.DEBUG and hasattr(settings, 'SENTRY_DSN') and settings.SENTRY_DSN:
capture_exception(e)
flush()
if not feed_code:
if ret_feed == FEED_OK:
feed_code = 200
elif ret_feed == FEED_SAME:
feed_code = 304
elif ret_feed == FEED_ERRHTTP:
feed_code = 400
if ret_feed == FEED_ERREXC:
feed_code = 500
elif ret_feed == FEED_ERRPARSE:
feed_code = 550
if not feed:
continue
feed = self.refresh_feed(feed.pk)
if not feed:
continue
if (
(self.options['force'])
or (random.random() > 0.9)
or (
fetched_feed
and feed.feed_link
and feed.has_page
and (ret_feed == FEED_OK or (ret_feed == FEED_SAME and feed.stories_last_month > 10))
)
):
logging.debug(' ---> [%-30s] ~FYFetching page: %s' % (feed.log_title[:30], feed.feed_link))
page_importer = PageImporter(feed)
try:
page_data = page_importer.fetch_page()
page_duration = time.time() - start_duration
except SoftTimeLimitExceeded as e:
logging.debug(
" ---> [%-30s] ~BR~FWTime limit hit!~SB~FR Moving on to next feed..." % feed
)
page_data = None
feed.save_feed_history(557, 'Timeout', e)
except TimeoutError:
logging.debug(' ---> [%-30s] ~FRPage fetch timed out...' % (feed.log_title[:30]))
page_data = None
feed.save_page_history(555, 'Timeout', '')
except Exception as e:
logging.debug('[%d] ! -------------------------' % (feed_id,))
tb = traceback.format_exc()
logging.error(tb)
logging.debug('[%d] ! -------------------------' % (feed_id,))
feed.save_page_history(550, "Page Error", tb)
fetched_feed = None
page_data = None
# mail_feed_error_to_admin(feed, e, local_vars=locals())
if not settings.DEBUG and hasattr(settings, 'SENTRY_DSN') and settings.SENTRY_DSN:
capture_exception(e)
flush()
feed = self.refresh_feed(feed.pk)
logging.debug(' ---> [%-30s] ~FYFetching icon: %s' % (feed.log_title[:30], feed.feed_link))
force = self.options['force']
if random.random() > 0.99:
force = True
icon_importer = IconImporter(feed, page_data=page_data, force=force)
try:
icon_importer.save()
icon_duration = time.time() - start_duration
except SoftTimeLimitExceeded as e:
logging.debug(
" ---> [%-30s] ~BR~FWTime limit hit!~SB~FR Moving on to next feed..." % feed
)
feed.save_feed_history(558, 'Timeout', e)
except TimeoutError:
logging.debug(' ---> [%-30s] ~FRIcon fetch timed out...' % (feed.log_title[:30]))
feed.save_page_history(556, 'Timeout', '')
except Exception as e:
logging.debug('[%d] ! -------------------------' % (feed_id,))
tb = traceback.format_exc()
logging.error(tb)
logging.debug('[%d] ! -------------------------' % (feed_id,))
# feed.save_feed_history(560, "Icon Error", tb)
# mail_feed_error_to_admin(feed, e, local_vars=locals())
if not settings.DEBUG and hasattr(settings, 'SENTRY_DSN') and settings.SENTRY_DSN:
capture_exception(e)
flush()
else:
logging.debug(
' ---> [%-30s] ~FBSkipping page fetch: (%s on %s stories) %s'
% (
feed.log_title[:30],
self.feed_trans[ret_feed],
feed.stories_last_month,
'' if feed.has_page else ' [HAS NO PAGE]',
)
)
feed = self.refresh_feed(feed.pk)
delta = time.time() - start_time
feed.last_load_time = round(delta)
feed.fetched_once = True
try:
feed = feed.save(update_fields=['last_load_time', 'fetched_once'])
except IntegrityError:
logging.debug(
" ***> [%-30s] ~FRIntegrityError on feed: %s"
% (
feed.log_title[:30],
feed.feed_address,
)
)
if ret_entries and ret_entries['new']:
self.publish_to_subscribers(feed, ret_entries['new'])
done_msg = '%2s ---> [%-30s] ~FYProcessed in ~FM~SB%.4ss~FY~SN (~FB%s~FY) [%s]' % (
identity,
feed.log_title[:30],
delta,
feed.pk,
self.feed_trans[ret_feed],
)
logging.debug(done_msg)
total_duration = time.time() - start_duration
MAnalyticsFetcher.add(
feed_id=feed.pk,
feed_fetch=feed_fetch_duration,
feed_process=feed_process_duration,
page=page_duration,
icon=icon_duration,
total=total_duration,
feed_code=feed_code,
)
self.feed_stats[ret_feed] += 1
if len(feed_queue) == 1:
return feed
# time_taken = datetime.datetime.utcnow() - self.time_start
def fetch_and_process_archive_pages(self, feed_id):
feed = Feed.get_by_id(feed_id)
first_seen_feed = None
original_starting_page = self.options['archive_page']
for archive_page_key in ["page", "paged", "rfc5005"]:
seen_story_hashes = set()
failed_pages = 0
self.options['archive_page_key'] = archive_page_key
if archive_page_key == "rfc5005":
self.options['archive_page'] = "rfc5005"
link_prev_archive = None
if first_seen_feed:
for link in getattr(first_seen_feed.feed, 'links', []):
if link['rel'] == 'prev-archive' or link['rel'] == 'next':
link_prev_archive = link['href']
logging.debug(' ---> [%-30s] ~FGFeed has ~SBRFC5005~SN links, filling out archive: %s' % (feed.log_title[:30], link_prev_archive))
break
else:
logging.debug(' ---> [%-30s] ~FBFeed has no RFC5005 links...' % (feed.log_title[:30]))
else:
self.options['archive_page_link'] = link_prev_archive
ffeed = FetchFeed(feed_id, self.options)
try:
ret_feed, fetched_feed = ffeed.fetch()
except TimeoutError:
logging.debug(' ---> [%-30s] ~FRArchive feed fetch timed out...' % (feed.log_title[:30]))
# Timeout means don't bother to keep checking...
continue
raw_feed = ffeed.raw_feed
if fetched_feed and ret_feed == FEED_OK:
pfeed = ProcessFeed(feed_id, fetched_feed, self.options, raw_feed=raw_feed)
if not pfeed.fpf or not pfeed.fpf.entries:
continue
for link in getattr(pfeed.fpf.feed, 'links', []):
if link['rel'] == 'prev-archive' or link['rel'] == 'next':
link_prev_archive = link['href']
if not link_prev_archive:
continue
while True:
if not link_prev_archive:
break
if link_prev_archive == self.options.get('archive_page_link', None):
logging.debug(' ---> [%-30s] ~FRNo change in archive page link: %s' % (feed.log_title[:30], link_prev_archive))
break
self.options['archive_page_link'] = link_prev_archive
link_prev_archive = None
ffeed = FetchFeed(feed_id, self.options)
try:
ret_feed, fetched_feed = ffeed.fetch()
except TimeoutError as e:
logging.debug(' ---> [%-30s] ~FRArchive feed fetch timed out...' % (feed.log_title[:30]))
# Timeout means don't bother to keep checking...
break
raw_feed = ffeed.raw_feed
if fetched_feed and ret_feed == FEED_OK:
pfeed = ProcessFeed(feed_id, fetched_feed, self.options, raw_feed=raw_feed)
if not pfeed.fpf or not pfeed.fpf.entries:
logging.debug(' ---> [%-30s] ~FRFeed parse failed, no entries' % (feed.log_title[:30]))
continue
for link in getattr(pfeed.fpf.feed, 'links', []):
if link['rel'] == 'prev-archive' or link['rel'] == 'next':
link_prev_archive = link['href']
logging.debug(' ---> [%-30s] ~FGFeed still has ~SBRFC5005~SN links, continuing filling out archive: %s' % (feed.log_title[:30], link_prev_archive))
break
else:
logging.debug(' ---> [%-30s] ~FBFeed has no more RFC5005 links...' % (feed.log_title[:30]))
break
before_story_hashes = len(seen_story_hashes)
pfeed.process()
seen_story_hashes.update(pfeed.archive_seen_story_hashes)
after_story_hashes = len(seen_story_hashes)
if before_story_hashes == after_story_hashes:
logging.debug(' ---> [%-30s] ~FRNo change in story hashes, but has archive link: %s' % (feed.log_title[:30], link_prev_archive))
failed_color = "~FR" if not link_prev_archive else ""
logging.debug(f" ---> [{feed.log_title[:30]:<30}] ~FGStory hashes found, archive RFC5005 ~SB{link_prev_archive}~SN: ~SB~FG{failed_color}{len(seen_story_hashes):,} stories~SN~FB")
else:
for page in range(3 if settings.DEBUG and False else 150):
if page < original_starting_page:
continue
if failed_pages >= 1:
break
self.options['archive_page'] = page+1
ffeed = FetchFeed(feed_id, self.options)
try:
ret_feed, fetched_feed = ffeed.fetch()
except TimeoutError as e:
logging.debug(' ---> [%-30s] ~FRArchive feed fetch timed out...' % (feed.log_title[:30]))
# Timeout means don't bother to keep checking...
break
raw_feed = ffeed.raw_feed
if fetched_feed and ret_feed == FEED_OK:
pfeed = ProcessFeed(feed_id, fetched_feed, self.options, raw_feed=raw_feed)
if not pfeed.fpf or not pfeed.fpf.entries:
failed_pages += 1
continue
if not first_seen_feed:
first_seen_feed = pfeed.fpf
before_story_hashes = len(seen_story_hashes)
pfeed.process()
seen_story_hashes.update(pfeed.archive_seen_story_hashes)
after_story_hashes = len(seen_story_hashes)
if before_story_hashes == after_story_hashes:
failed_pages += 1
else:
failed_pages += 1
failed_color = "~FR" if failed_pages > 0 else ""
logging.debug(f" ---> [{feed.log_title[:30]:<30}] ~FGStory hashes found, archive page ~SB{page+1}~SN: ~SB~FG{len(seen_story_hashes):,} stories~SN~FB, {failed_color}{failed_pages} failures")
def publish_to_subscribers(self, feed, new_count):
try:
r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL)
listeners_count = r.publish(str(feed.pk), 'story:new_count:%s' % new_count)
if listeners_count:
logging.debug(
" ---> [%-30s] ~FMPublished to %s subscribers" % (feed.log_title[:30], listeners_count)
)
except redis.ConnectionError:
logging.debug(" ***> [%-30s] ~BMRedis is unavailable for real-time." % (feed.log_title[:30],))
def count_unreads_for_subscribers(self, feed):
subscriber_expire = datetime.datetime.now() - datetime.timedelta(days=settings.SUBSCRIBER_EXPIRE)
user_subs = UserSubscription.objects.filter(
feed=feed, active=True, user__profile__last_seen_on__gte=subscriber_expire
).order_by('-last_read_date')
if not user_subs.count():
return
for sub in user_subs:
if not sub.needs_unread_recalc:
sub.needs_unread_recalc = True
sub.save()
if self.options['compute_scores']:
r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL)
stories = MStory.objects(story_feed_id=feed.pk, story_date__gte=feed.unread_cutoff)
stories = Feed.format_stories(stories, feed.pk)
story_hashes = r.zrangebyscore(
'zF:%s' % feed.pk,
int(feed.unread_cutoff.strftime('%s')),
int(time.time() + 60 * 60 * 24),
)
missing_story_hashes = set(story_hashes) - set([s['story_hash'] for s in stories])
if missing_story_hashes:
missing_stories = MStory.objects(
story_feed_id=feed.pk, story_hash__in=missing_story_hashes
).read_preference(pymongo.ReadPreference.PRIMARY)
missing_stories = Feed.format_stories(missing_stories, feed.pk)
stories = missing_stories + stories
logging.debug(
' ---> [%-30s] ~FYFound ~SB~FC%s(of %s)/%s~FY~SN un-secondaried stories while computing scores'
% (
feed.log_title[:30],
len(missing_stories),
len(missing_story_hashes),
len(stories),
)
)
cache.set("S:v3:%s" % feed.pk, stories, 60)
logging.debug(
' ---> [%-30s] ~FYComputing scores: ~SB%s stories~SN with ~SB%s subscribers ~SN(%s/%s/%s)'
% (
feed.log_title[:30],
len(stories),
user_subs.count(),
feed.num_subscribers,
feed.active_subscribers,
feed.premium_subscribers,
)
)
self.calculate_feed_scores_with_stories(user_subs, stories)
elif self.options.get('mongodb_replication_lag'):
logging.debug(
' ---> [%-30s] ~BR~FYSkipping computing scores: ~SB%s seconds~SN of mongodb lag'
% (feed.log_title[:30], self.options.get('mongodb_replication_lag'))
)
@timelimit(10)
def calculate_feed_scores_with_stories(self, user_subs, stories):
for sub in user_subs:
silent = False if getattr(self.options, 'verbose', 0) >= 2 else True
sub.calculate_feed_scores(silent=silent, stories=stories)
class Dispatcher:
def __init__(self, options, num_threads):
self.options = options
self.num_threads = num_threads
self.workers = []
def add_jobs(self, feeds_queue, feeds_count=1):
"""adds a feed processing job to the pool"""
self.feeds_queue = feeds_queue
self.feeds_count = feeds_count
def run_jobs(self):
if self.options['single_threaded'] or self.num_threads == 1:
return dispatch_workers(self.feeds_queue[0], self.options)
else:
for i in range(self.num_threads):
feed_queue = self.feeds_queue[i]
self.workers.append(
multiprocessing.Process(target=dispatch_workers, args=(feed_queue, self.options))
)
for i in range(self.num_threads):
self.workers[i].start()
def dispatch_workers(feed_queue, options):
worker = FeedFetcherWorker(options)
return worker.process_feed_wrapper(feed_queue)