2012-02-01 17:59:46 -08:00
import time
import datetime
import traceback
import multiprocessing
import urllib2
import xml . sax
import redis
2012-02-26 22:33:06 -08:00
import random
2012-10-30 10:57:12 -07:00
import pymongo
2015-04-29 16:30:44 -07:00
import re
import requests
import dateutil . parser
2015-05-08 15:34:35 -07:00
import isodate
2015-05-11 17:49:40 -07:00
import urlparse
2010-08-21 13:57:39 -04:00
from django . conf import settings
2010-12-16 16:52:00 -05:00
from django . db import IntegrityError
2013-05-03 09:43:21 -07:00
from django . core . cache import cache
2013-05-10 16:11:30 -07:00
from apps . reader . models import UserSubscription
2010-10-03 17:53:35 -04:00
from apps . rss_feeds . models import Feed , MStory
2011-01-27 19:05:50 -05:00
from apps . rss_feeds . page_importer import PageImporter
from apps . rss_feeds . icon_importer import IconImporter
2016-11-16 18:29:13 -08:00
from apps . notifications . tasks import QueueNotifications , MUserFeedNotification
2012-03-27 16:26:07 -07:00
from apps . push . models import PushSubscription
2017-04-12 19:13:33 -07:00
from apps . statistics . models import MAnalyticsFetcher , MStatistics
2014-11-18 11:08:02 -08:00
from utils import feedparser
2015-05-08 14:16:02 -07:00
from utils . story_functions import pre_process_story , strip_tags , linkify
2010-08-16 12:52:39 -04:00
from utils import log as logging
2016-02-03 12:18:35 -08:00
from utils . feed_functions import timelimit , TimeoutError
from qurl import qurl
2015-04-29 16:30:44 -07:00
from BeautifulSoup import BeautifulSoup
from django . utils import feedgenerator
from django . utils . html import linebreaks
2015-12-02 06:46:18 +01:00
from django . utils . encoding import smart_unicode
2015-04-29 16:30:44 -07:00
from utils import json_functions as json
2016-02-01 23:06:04 -08:00
from celery . exceptions import SoftTimeLimitExceeded
2017-05-06 19:38:36 -07:00
from utils . twitter_fetcher import TwitterFetcher
2017-05-22 16:46:56 -07:00
from utils . json_fetcher import JSONFetcher
2013-03-20 08:24:11 -07:00
# from utils.feed_functions import mail_feed_error_to_admin
2012-03-26 17:21:49 -07:00
2009-08-29 19:34:42 +00:00
# Refresh feed code adapted from Feedjack.
# http://feedjack.googlecode.com
FEED_OK , FEED_SAME , FEED_ERRPARSE , FEED_ERRHTTP , FEED_ERREXC = range ( 5 )
2010-08-30 23:55:24 -04:00
2009-08-29 19:34:42 +00:00
class FetchFeed :
2010-10-03 18:04:40 -04:00
def __init__ ( self , feed_id , options ) :
2012-10-25 16:14:25 -07:00
self . feed = Feed . get_by_id ( feed_id )
2009-08-29 19:34:42 +00:00
self . options = options
self . fpf = None
2017-04-12 19:13:33 -07:00
self . raw_feed = None
2010-08-30 23:55:24 -04:00
2014-11-18 13:37:55 -08:00
@timelimit ( 30 )
2009-08-29 19:34:42 +00:00
def fetch ( self ) :
2010-10-07 19:07:43 -04:00
"""
2016-02-04 12:28:55 -08:00
Uses requests to download the feed , parsing it in feedparser . Will be storified later .
2009-08-29 19:34:42 +00:00
"""
2012-03-26 17:21:49 -07:00
start = time . time ( )
2010-08-17 17:45:51 -04:00
identity = self . get_identity ( )
2012-01-04 18:47:40 -08:00
log_msg = u ' %2s ---> [ %-30s ] ~FYFetching feed (~FB %d ~FY), last update: %s ' % ( identity ,
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] ,
2011-11-15 18:19:09 -08:00
self . feed . id ,
datetime . datetime . now ( ) - self . feed . last_update )
2010-08-15 12:04:26 -04:00
logging . debug ( log_msg )
2016-02-09 16:59:58 -08:00
2017-05-22 16:46:56 -07:00
etag = self . feed . etag
2010-07-08 01:26:03 -04:00
modified = self . feed . last_modified . utctimetuple ( ) [ : 7 ] if self . feed . last_modified else None
2013-03-24 15:50:57 -07:00
address = self . feed . feed_address
2010-08-30 19:57:27 -04:00
2013-03-24 15:50:57 -07:00
if ( self . options . get ( ' force ' ) or random . random ( ) < = .01 ) :
2016-02-03 12:29:16 -08:00
self . options [ ' force ' ] = True
2010-08-30 19:57:27 -04:00
modified = None
etag = None
2016-02-03 12:18:35 -08:00
address = qurl ( address , add = { " _ " : random . randint ( 0 , 10000 ) } )
2013-03-24 15:50:57 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FBForcing fetch: %s ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] , address ) )
2013-03-24 15:50:57 -07:00
elif ( not self . feed . fetched_once or not self . feed . known_good ) :
modified = None
etag = None
2012-03-28 15:49:21 -07:00
if self . options . get ( ' feed_xml ' ) :
logging . debug ( u ' ---> [ %-30s ] ~FM~BKFeed has been fat pinged. Ignoring fat: %s ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] , len ( self . options . get ( ' feed_xml ' ) ) ) )
2012-11-26 11:41:31 -08:00
2012-03-27 18:37:04 -07:00
if self . options . get ( ' fpf ' ) :
self . fpf = self . options . get ( ' fpf ' )
logging . debug ( u ' ---> [ %-30s ] ~FM~BKFeed fetched in real-time with fat ping. ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] ) )
2012-11-26 11:41:31 -08:00
return FEED_OK , self . fpf
2015-04-29 16:30:44 -07:00
2015-05-08 14:50:44 -07:00
if ' youtube.com ' in address :
2015-07-07 13:40:22 -07:00
try :
youtube_feed = self . fetch_youtube ( address )
except ( requests . adapters . ConnectionError ) :
youtube_feed = None
2015-04-29 16:30:44 -07:00
if not youtube_feed :
2015-04-29 17:50:05 -07:00
logging . debug ( u ' ***> [ %-30s ] ~FRYouTube fetch failed: %s . ' %
2017-03-31 19:52:24 -07:00
( self . feed . log_title [ : 30 ] , address ) )
2015-04-29 16:30:44 -07:00
return FEED_ERRHTTP , None
self . fpf = feedparser . parse ( youtube_feed )
2016-05-26 15:36:20 -07:00
elif re . match ( ' (https?)?://twitter.com/ \ w+/?$ ' , qurl ( address , remove = [ ' _ ' ] ) ) :
2016-05-26 14:30:26 -07:00
twitter_feed = self . fetch_twitter ( address )
if not twitter_feed :
logging . debug ( u ' ***> [ %-30s ] ~FRTwitter fetch failed: %s ' %
2017-03-31 19:52:24 -07:00
( self . feed . log_title [ : 30 ] , address ) )
2016-05-26 14:30:26 -07:00
return FEED_ERRHTTP , None
self . fpf = feedparser . parse ( twitter_feed )
2015-05-08 14:50:44 -07:00
if not self . fpf :
2015-04-29 16:30:44 -07:00
try :
2017-02-17 12:19:55 -08:00
headers = self . feed . fetch_headers ( )
2016-02-04 12:28:55 -08:00
if etag :
headers [ ' If-None-Match ' ] = etag
if modified :
# format into an RFC 1123-compliant timestamp. We can't use
# time.strftime() since the %a and %b directives can be affected
# by the current locale, but RFC 2616 states that dates must be
# in English.
short_weekdays = [ ' Mon ' , ' Tue ' , ' Wed ' , ' Thu ' , ' Fri ' , ' Sat ' , ' Sun ' ]
months = [ ' Jan ' , ' Feb ' , ' Mar ' , ' Apr ' , ' May ' , ' Jun ' , ' Jul ' , ' Aug ' , ' Sep ' , ' Oct ' , ' Nov ' , ' Dec ' ]
modified_header = ' %s , %02d %s %04d %02d : %02d : %02d GMT ' % ( short_weekdays [ modified [ 6 ] ] , modified [ 2 ] , months [ modified [ 1 ] - 1 ] , modified [ 0 ] , modified [ 3 ] , modified [ 4 ] , modified [ 5 ] )
headers [ ' If-Modified-Since ' ] = modified_header
2016-10-24 15:17:16 +02:00
if etag or modified :
headers [ ' A-IM ' ] = ' feed '
2016-02-04 12:28:55 -08:00
raw_feed = requests . get ( address , headers = headers )
2017-02-17 12:19:55 -08:00
if raw_feed . status_code > = 400 :
2017-03-31 19:52:24 -07:00
logging . debug ( " ***> [ %-30s ] ~FRFeed fetch was %s status code, trying fake user agent: %s " % ( self . feed . log_title [ : 30 ] , raw_feed . status_code , raw_feed . headers ) )
2017-02-17 12:19:55 -08:00
raw_feed = requests . get ( address , headers = self . feed . fetch_headers ( fake = True ) )
2017-05-22 16:46:56 -07:00
if raw_feed . content and ' application/json ' in raw_feed . headers . get ( ' Content-Type ' , " " ) :
# JSON Feed
json_feed = self . fetch_json_feed ( address , raw_feed )
if not json_feed :
logging . debug ( u ' ***> [ %-30s ] ~FRJSON fetch failed: %s ' %
( self . feed . log_title [ : 30 ] , address ) )
return FEED_ERRHTTP , None
self . fpf = feedparser . parse ( json_feed )
elif raw_feed . content and raw_feed . status_code < 400 :
2016-03-08 13:26:34 -08:00
response_headers = raw_feed . headers
response_headers [ ' Content-Location ' ] = raw_feed . url
2017-04-12 19:13:33 -07:00
self . raw_feed = smart_unicode ( raw_feed . content )
self . fpf = feedparser . parse ( self . raw_feed ,
2016-03-08 13:26:34 -08:00
response_headers = response_headers )
2017-02-17 11:45:43 -08:00
if self . options . get ( ' debug ' , False ) :
2017-03-31 19:52:24 -07:00
logging . debug ( " ---> [ %-30s ] ~FBFeed fetch status %s : %s length / %s " % ( self . feed . log_title [ : 30 ] , raw_feed . status_code , len ( smart_unicode ( raw_feed . content ) ) , raw_feed . headers ) )
2016-02-04 12:28:55 -08:00
except Exception , e :
2017-03-31 19:52:24 -07:00
logging . debug ( " ***> [ %-30s ] ~FRFeed failed to fetch with request, trying feedparser: %s " % ( self . feed . log_title [ : 30 ] , unicode ( e ) [ : 100 ] ) )
2016-02-04 12:28:55 -08:00
2017-02-17 11:45:43 -08:00
if not self . fpf or self . options . get ( ' force_fp ' , False ) :
2016-02-04 12:28:55 -08:00
try :
self . fpf = feedparser . parse ( address ,
2017-02-17 12:05:27 -08:00
agent = self . feed . user_agent ,
2016-02-04 12:28:55 -08:00
etag = etag ,
modified = modified )
2016-12-11 18:29:00 -08:00
except ( TypeError , ValueError , KeyError , EOFError , MemoryError ) , e :
2016-02-04 12:28:55 -08:00
logging . debug ( u ' ***> [ %-30s ] ~FRFeed fetch error: %s ' %
2017-03-31 19:52:24 -07:00
( self . feed . log_title [ : 30 ] , e ) )
2016-02-04 12:28:55 -08:00
pass
2015-11-28 13:48:20 -08:00
if not self . fpf :
try :
logging . debug ( u ' ***> [ %-30s ] ~FRTurning off headers... ' %
2017-03-31 19:52:24 -07:00
( self . feed . log_title [ : 30 ] ) )
2017-02-17 12:05:27 -08:00
self . fpf = feedparser . parse ( address , agent = self . feed . user_agent )
2016-12-11 18:59:02 -08:00
except ( TypeError , ValueError , KeyError , EOFError , MemoryError ) , e :
2015-11-28 13:48:20 -08:00
logging . debug ( u ' ***> [ %-30s ] ~FRFetch failed: %s . ' %
2017-03-31 19:52:24 -07:00
( self . feed . log_title [ : 30 ] , e ) )
2015-04-29 16:30:44 -07:00
return FEED_ERRHTTP , None
2012-11-26 11:41:31 -08:00
2012-05-17 12:42:15 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYFeed fetch in ~FM %.4s s ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] , time . time ( ) - start ) )
2012-11-26 11:41:31 -08:00
2010-04-27 13:44:53 -04:00
return FEED_OK , self . fpf
2010-08-17 17:45:51 -04:00
def get_identity ( self ) :
identity = " X "
current_process = multiprocessing . current_process ( )
if current_process . _identity :
identity = current_process . _identity [ 0 ]
return identity
2015-04-29 16:30:44 -07:00
2017-05-06 19:38:36 -07:00
def fetch_twitter ( self , address = None ) :
twitter_fetcher = TwitterFetcher ( self . feed , self . options )
return twitter_fetcher . fetch ( address )
2017-05-22 16:46:56 -07:00
def fetch_json_feed ( self , address , headers ) :
json_fetcher = JSONFetcher ( self . feed , self . options )
return json_fetcher . fetch ( address , headers )
2015-04-29 16:30:44 -07:00
def fetch_youtube ( self , address ) :
2015-05-08 14:50:44 -07:00
username = None
channel_id = None
2015-05-12 08:41:45 -07:00
list_id = None
2015-05-08 14:50:44 -07:00
2015-04-29 17:58:44 -07:00
if ' gdata.youtube.com ' in address :
try :
2015-05-08 14:50:44 -07:00
username_groups = re . search ( ' gdata.youtube.com/feeds/ \ w+/users/( \ w+)/ ' , address )
2015-04-29 17:58:44 -07:00
if not username_groups :
return
username = username_groups . group ( 1 )
except IndexError :
2015-04-29 17:16:15 -07:00
return
2015-04-29 17:58:44 -07:00
elif ' youtube.com/feeds/videos.xml?user= ' in address :
try :
2015-05-11 17:49:40 -07:00
username = urlparse . parse_qs ( urlparse . urlparse ( address ) . query ) [ ' user ' ] [ 0 ]
2015-04-29 17:58:44 -07:00
except IndexError :
return
2015-05-08 14:50:44 -07:00
elif ' youtube.com/feeds/videos.xml?channel_id= ' in address :
try :
2015-05-11 17:49:40 -07:00
channel_id = urlparse . parse_qs ( urlparse . urlparse ( address ) . query ) [ ' channel_id ' ] [ 0 ]
2017-04-12 11:19:42 -07:00
except ( IndexError , KeyError ) :
2015-05-11 17:49:40 -07:00
return
elif ' youtube.com/playlist ' in address :
try :
list_id = urlparse . parse_qs ( urlparse . urlparse ( address ) . query ) [ ' list ' ] [ 0 ]
2015-05-08 14:50:44 -07:00
except IndexError :
return
2015-04-29 17:16:15 -07:00
2015-05-08 14:50:44 -07:00
if channel_id :
video_ids_xml = requests . get ( " https://www.youtube.com/feeds/videos.xml?channel_id= %s " % channel_id )
channel_json = requests . get ( " https://www.googleapis.com/youtube/v3/channels?part=snippet&id= %s &key= %s " %
( channel_id , settings . YOUTUBE_API_KEY ) )
channel = json . decode ( channel_json . content )
2015-07-07 13:37:07 -07:00
try :
username = channel [ ' items ' ] [ 0 ] [ ' snippet ' ] [ ' title ' ]
description = channel [ ' items ' ] [ 0 ] [ ' snippet ' ] [ ' description ' ]
2015-08-24 14:51:01 -07:00
except ( IndexError , KeyError ) :
2015-07-07 13:37:07 -07:00
return
2015-05-11 17:49:40 -07:00
elif list_id :
playlist_json = requests . get ( " https://www.googleapis.com/youtube/v3/playlists?part=snippet&id= %s &key= %s " %
( list_id , settings . YOUTUBE_API_KEY ) )
playlist = json . decode ( playlist_json . content )
2015-07-07 13:37:07 -07:00
try :
username = playlist [ ' items ' ] [ 0 ] [ ' snippet ' ] [ ' title ' ]
description = playlist [ ' items ' ] [ 0 ] [ ' snippet ' ] [ ' description ' ]
2015-08-24 14:51:01 -07:00
except ( IndexError , KeyError ) :
2015-07-07 13:37:07 -07:00
return
2015-05-11 17:49:40 -07:00
channel_url = " https://www.youtube.com/playlist?list= %s " % list_id
2015-05-08 14:50:44 -07:00
elif username :
video_ids_xml = requests . get ( " https://www.youtube.com/feeds/videos.xml?user= %s " % username )
description = " YouTube videos uploaded by %s " % username
else :
return
2015-05-11 17:49:40 -07:00
if list_id :
playlist_json = requests . get ( " https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&playlistId= %s &key= %s " %
( list_id , settings . YOUTUBE_API_KEY ) )
playlist = json . decode ( playlist_json . content )
2015-07-07 13:37:07 -07:00
try :
video_ids = [ video [ ' snippet ' ] [ ' resourceId ' ] [ ' videoId ' ] for video in playlist [ ' items ' ] ]
2015-08-24 14:51:01 -07:00
except ( IndexError , KeyError ) :
2015-07-07 13:37:07 -07:00
return
2015-05-11 17:49:40 -07:00
else :
if video_ids_xml . status_code != 200 :
return
video_ids_soup = BeautifulSoup ( video_ids_xml . content )
channel_url = video_ids_soup . find ( ' author ' ) . find ( ' uri ' ) . getText ( )
video_ids = [ ]
for video_id in video_ids_soup . findAll ( ' yt:videoid ' ) :
video_ids . append ( video_id . getText ( ) )
2015-04-29 16:30:44 -07:00
2015-05-08 15:34:35 -07:00
videos_json = requests . get ( " https://www.googleapis.com/youtube/v3/videos?part=contentDetails %% 2Csnippet&id= %s &key= %s " %
2015-04-29 16:30:44 -07:00
( ' , ' . join ( video_ids ) , settings . YOUTUBE_API_KEY ) )
videos = json . decode ( videos_json . content )
2016-01-07 20:04:29 -08:00
if ' error ' in videos :
logging . debug ( " ***> ~FRYoutube returned an error: ~FM~SB %s " % ( videos ) )
return
2015-04-29 16:30:44 -07:00
data = { }
2015-05-11 17:49:40 -07:00
data [ ' title ' ] = ( " %s ' s YouTube Videos " % username if ' Uploads ' not in username else username )
2015-05-08 15:34:35 -07:00
data [ ' link ' ] = channel_url
2015-05-08 14:50:44 -07:00
data [ ' description ' ] = description
2015-04-29 16:30:44 -07:00
data [ ' lastBuildDate ' ] = datetime . datetime . utcnow ( )
data [ ' generator ' ] = ' NewsBlur YouTube API v3 Decrapifier - %s ' % settings . NEWSBLUR_URL
data [ ' docs ' ] = None
data [ ' feed_url ' ] = address
rss = feedgenerator . Atom1Feed ( * * data )
2016-01-07 20:04:29 -08:00
2015-04-29 16:30:44 -07:00
for video in videos [ ' items ' ] :
thumbnail = video [ ' snippet ' ] [ ' thumbnails ' ] . get ( ' maxres ' )
if not thumbnail :
thumbnail = video [ ' snippet ' ] [ ' thumbnails ' ] . get ( ' high ' )
if not thumbnail :
thumbnail = video [ ' snippet ' ] [ ' thumbnails ' ] . get ( ' medium ' )
2015-05-08 15:34:35 -07:00
duration_sec = isodate . parse_duration ( video [ ' contentDetails ' ] [ ' duration ' ] ) . seconds
if duration_sec > = 3600 :
hours = ( duration_sec / 3600 )
minutes = ( duration_sec - ( hours * 3600 ) ) / 60
seconds = duration_sec - ( hours * 3600 ) - ( minutes * 60 )
duration = " %s : %s : %s " % ( hours , ' {0:02d} ' . format ( minutes ) , ' {0:02d} ' . format ( seconds ) )
else :
minutes = duration_sec / 60
seconds = duration_sec - ( minutes * 60 )
duration = " %s : %s " % ( ' {0:02d} ' . format ( minutes ) , ' {0:02d} ' . format ( seconds ) )
2015-10-23 15:58:50 -07:00
content = """ <div class= " NB-youtube-player " ><iframe allowfullscreen= " true " src= " %s ?iv_load_policy=3 " ></iframe></div>
2015-05-08 15:34:35 -07:00
< div class = " NB-youtube-stats " > < small >
< b > From : < / b > < a href = " %s " > % s < / a > < br / >
< b > Duration : < / b > % s < br / >
< / small > < / div > < hr >
2015-04-29 17:16:15 -07:00
< div class = " NB-youtube-description " > % s < / div >
< img src = " %s " style = " display:none " / > """ % (
2015-05-08 15:34:35 -07:00
( " https://www.youtube.com/embed/ " + video [ ' id ' ] ) ,
channel_url , username ,
duration ,
2015-05-08 14:16:02 -07:00
linkify ( linebreaks ( video [ ' snippet ' ] [ ' description ' ] ) ) ,
2015-04-29 16:30:44 -07:00
thumbnail [ ' url ' ] if thumbnail else " " ,
)
2015-04-29 17:16:15 -07:00
2015-05-08 14:50:44 -07:00
link = " http://www.youtube.com/watch?v= %s " % video [ ' id ' ]
2015-04-29 16:30:44 -07:00
story_data = {
' title ' : video [ ' snippet ' ] [ ' title ' ] ,
' link ' : link ,
' description ' : content ,
' author_name ' : username ,
' categories ' : [ ] ,
2015-04-29 16:43:48 -07:00
' unique_id ' : " tag:youtube.com,2008:video: %s " % video [ ' id ' ] ,
2015-04-29 16:30:44 -07:00
' pubdate ' : dateutil . parser . parse ( video [ ' snippet ' ] [ ' publishedAt ' ] ) ,
}
rss . add_item ( * * story_data )
return rss . writeString ( ' utf-8 ' )
2016-05-26 14:30:26 -07:00
2010-08-17 17:45:51 -04:00
2009-08-29 19:34:42 +00:00
class ProcessFeed :
2017-04-12 19:13:33 -07:00
def __init__ ( self , feed_id , fpf , options , raw_feed = None ) :
2010-10-03 18:19:23 -04:00
self . feed_id = feed_id
2009-08-29 19:34:42 +00:00
self . options = options
self . fpf = fpf
2017-04-12 19:13:33 -07:00
self . raw_feed = raw_feed
2010-10-03 18:19:23 -04:00
def refresh_feed ( self ) :
2012-08-01 21:57:55 -07:00
self . feed = Feed . get_by_id ( self . feed_id )
2012-11-26 10:27:08 -08:00
if self . feed_id != self . feed . pk :
logging . debug ( " ***> Feed has changed: from %s to %s " % ( self . feed_id , self . feed . pk ) )
self . feed_id = self . feed . pk
2013-08-06 13:38:35 -07:00
2012-01-23 09:08:55 -08:00
def process ( self ) :
2009-08-29 19:34:42 +00:00
""" Downloads and parses a feed.
"""
2012-03-26 17:21:49 -07:00
start = time . time ( )
2010-10-03 18:19:23 -04:00
self . refresh_feed ( )
2012-09-04 11:46:41 -07:00
ret_values = dict ( new = 0 , updated = 0 , same = 0 , error = 0 )
2009-08-29 19:34:42 +00:00
if hasattr ( self . fpf , ' status ' ) :
if self . options [ ' verbose ' ] :
2010-09-07 15:42:22 -07:00
if self . fpf . bozo and self . fpf . status != 304 :
2012-03-27 11:19:53 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FRBOZO exception: %s ~SB( %s entries) ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] ,
2010-10-03 17:48:44 -04:00
self . fpf . bozo_exception ,
len ( self . fpf . entries ) ) )
2012-03-27 16:26:07 -07:00
2009-08-29 19:34:42 +00:00
if self . fpf . status == 304 :
2012-03-27 17:34:39 -07:00
self . feed = self . feed . save ( )
2010-07-08 11:37:54 -04:00
self . feed . save_feed_history ( 304 , " Not modified " )
2009-08-29 19:34:42 +00:00
return FEED_SAME , ret_values
2010-10-03 18:16:32 -04:00
2013-04-05 17:54:10 -07:00
# 302: Temporary redirect: ignore
2016-02-03 12:11:22 -08:00
# 301: Permanent redirect: save it (after 10 tries)
2013-04-05 17:54:10 -07:00
if self . fpf . status == 301 :
2015-06-01 17:14:04 -07:00
if self . fpf . href . endswith ( ' feedburner.com/atom.xml ' ) :
return FEED_ERRHTTP , ret_values
redirects , non_redirects = self . feed . count_redirects_in_history ( ' feed ' )
2016-01-13 12:42:21 -08:00
self . feed . save_feed_history ( self . fpf . status , " HTTP Redirect ( %d to go) " % ( 10 - len ( redirects ) ) )
2016-02-03 12:29:16 -08:00
if len ( redirects ) > = 10 or len ( non_redirects ) == 0 :
2016-02-03 12:38:18 -08:00
address = self . fpf . href
2016-02-03 13:13:31 -08:00
if self . options [ ' force ' ] and address :
address = qurl ( address , remove = [ ' _ ' ] )
2016-02-03 12:18:35 -08:00
self . feed . feed_address = address
2012-02-24 11:47:38 -08:00
if not self . feed . known_good :
2012-01-23 09:08:55 -08:00
self . feed . fetched_once = True
2017-03-31 19:52:24 -07:00
logging . debug ( " ---> [ %-30s ] ~SB~SK~FRFeed is %s ' ing. Refetching... " % ( self . feed . log_title [ : 30 ] , self . fpf . status ) )
2012-03-27 17:34:39 -07:00
self . feed = self . feed . schedule_feed_fetch_immediately ( )
2010-10-06 22:34:28 -04:00
if not self . fpf . entries :
2012-03-27 17:34:39 -07:00
self . feed = self . feed . save ( )
2010-10-06 22:34:28 -04:00
self . feed . save_feed_history ( self . fpf . status , " HTTP Redirect " )
return FEED_ERRHTTP , ret_values
2009-08-29 19:34:42 +00:00
if self . fpf . status > = 400 :
2017-03-31 19:52:24 -07:00
logging . debug ( " ---> [ %-30s ] ~SB~FRHTTP Status code: %s . Checking address... " % ( self . feed . log_title [ : 30 ] , self . fpf . status ) )
2012-01-23 09:08:55 -08:00
fixed_feed = None
2012-02-01 17:59:46 -08:00
if not self . feed . known_good :
2013-04-04 17:09:07 -07:00
fixed_feed , feed = self . feed . check_feed_link_for_feed_address ( )
2010-12-10 09:32:06 -05:00
if not fixed_feed :
self . feed . save_feed_history ( self . fpf . status , " HTTP Error " )
2013-04-04 17:09:07 -07:00
else :
self . feed = feed
2012-03-27 17:34:39 -07:00
self . feed = self . feed . save ( )
2009-08-29 19:34:42 +00:00
return FEED_ERRHTTP , ret_values
2016-02-03 12:54:34 -08:00
if not self . fpf :
2017-03-31 19:52:24 -07:00
logging . debug ( " ---> [ %-30s ] ~SB~FRFeed is Non-XML. No feedparser feed either! " % ( self . feed . log_title [ : 30 ] ) )
2016-02-03 12:54:34 -08:00
self . feed . save_feed_history ( 551 , " Broken feed " )
return FEED_ERRHTTP , ret_values
2017-05-22 16:46:56 -07:00
2016-02-03 12:54:34 -08:00
if self . fpf and not self . fpf . entries :
2012-03-27 11:19:53 -07:00
if self . fpf . bozo and isinstance ( self . fpf . bozo_exception , feedparser . NonXMLContentType ) :
2017-03-31 19:52:24 -07:00
logging . debug ( " ---> [ %-30s ] ~SB~FRFeed is Non-XML. %s entries. Checking address... " % ( self . feed . log_title [ : 30 ] , len ( self . fpf . entries ) ) )
2012-01-23 09:08:55 -08:00
fixed_feed = None
2012-02-01 17:59:46 -08:00
if not self . feed . known_good :
2013-04-04 17:09:07 -07:00
fixed_feed , feed = self . feed . check_feed_link_for_feed_address ( )
2010-08-25 19:13:28 -04:00
if not fixed_feed :
2012-03-27 11:19:53 -07:00
self . feed . save_feed_history ( 552 , ' Non-xml feed ' , self . fpf . bozo_exception )
2013-04-04 17:09:07 -07:00
else :
self . feed = feed
2012-03-27 17:34:39 -07:00
self . feed = self . feed . save ( )
2010-08-04 18:51:29 -04:00
return FEED_ERRPARSE , ret_values
2012-03-27 11:19:53 -07:00
elif self . fpf . bozo and isinstance ( self . fpf . bozo_exception , xml . sax . _exceptions . SAXException ) :
2017-03-31 19:52:24 -07:00
logging . debug ( " ---> [ %-30s ] ~SB~FRFeed has SAX/XML parsing issues. %s entries. Checking address... " % ( self . feed . log_title [ : 30 ] , len ( self . fpf . entries ) ) )
2012-01-23 09:08:55 -08:00
fixed_feed = None
2012-02-01 17:59:46 -08:00
if not self . feed . known_good :
2013-04-04 17:09:07 -07:00
fixed_feed , feed = self . feed . check_feed_link_for_feed_address ( )
2010-08-25 19:10:55 -04:00
if not fixed_feed :
2017-02-17 12:19:55 -08:00
self . feed . save_feed_history ( 553 , ' Not an RSS feed ' , self . fpf . bozo_exception )
2013-04-04 17:09:07 -07:00
else :
self . feed = feed
2012-03-27 17:34:39 -07:00
self . feed = self . feed . save ( )
2010-08-04 18:51:29 -04:00
return FEED_ERRPARSE , ret_values
2009-08-29 19:34:42 +00:00
# the feed has changed (or it is the first time we parse it)
# saving the etag and last_modified fields
2015-07-22 13:18:54 -07:00
original_etag = self . feed . etag
2010-09-17 13:24:23 -04:00
self . feed . etag = self . fpf . get ( ' etag ' )
if self . feed . etag :
self . feed . etag = self . feed . etag [ : 255 ]
2009-08-29 19:34:42 +00:00
# some times this is None (it never should) *sigh*
if self . feed . etag is None :
self . feed . etag = ' '
2015-07-22 13:18:54 -07:00
if self . feed . etag != original_etag :
self . feed . save ( update_fields = [ ' etag ' ] )
original_last_modified = self . feed . last_modified
2016-03-08 13:33:16 -08:00
if hasattr ( self . fpf , ' modified ' ) and self . fpf . modified :
try :
self . feed . last_modified = datetime . datetime . strptime ( self . fpf . modified , ' %a , %d % b % Y % H: % M: % S % Z ' )
except Exception , e :
self . feed . last_modified = None
logging . debug ( " Broken mtime %s : %s " % ( self . feed . last_modified , e ) )
pass
2015-07-22 13:18:54 -07:00
if self . feed . last_modified != original_last_modified :
self . feed . save ( update_fields = [ ' last_modified ' ] )
2010-01-26 20:39:11 -05:00
2013-07-10 15:13:19 -07:00
self . fpf . entries = self . fpf . entries [ : 100 ]
2011-02-04 11:20:29 -05:00
2015-07-22 13:18:54 -07:00
original_title = self . feed . feed_title
2012-03-19 20:47:06 -07:00
if self . fpf . feed . get ( ' title ' ) :
2013-07-29 14:25:43 -07:00
self . feed . feed_title = strip_tags ( self . fpf . feed . get ( ' title ' ) )
2015-07-22 13:18:54 -07:00
if self . feed . feed_title != original_title :
self . feed . save ( update_fields = [ ' feed_title ' ] )
2011-02-05 22:27:47 -05:00
tagline = self . fpf . feed . get ( ' tagline ' , self . feed . data . feed_tagline )
if tagline :
2015-07-22 13:18:54 -07:00
original_tagline = self . feed . data . feed_tagline
2015-12-02 06:46:18 +01:00
self . feed . data . feed_tagline = smart_unicode ( tagline )
2015-07-22 13:18:54 -07:00
if self . feed . data . feed_tagline != original_tagline :
self . feed . data . save ( update_fields = [ ' feed_tagline ' ] )
2015-07-22 13:53:20 -07:00
2011-09-01 09:34:57 -07:00
if not self . feed . feed_link_locked :
2015-06-01 17:14:04 -07:00
new_feed_link = self . fpf . feed . get ( ' link ' ) or self . fpf . feed . get ( ' id ' ) or self . feed . feed_link
2016-02-03 13:13:31 -08:00
if self . options [ ' force ' ] and new_feed_link :
2016-02-03 12:29:16 -08:00
new_feed_link = qurl ( new_feed_link , remove = [ ' _ ' ] )
2015-06-01 17:14:04 -07:00
if new_feed_link != self . feed . feed_link :
2017-03-31 19:52:24 -07:00
logging . debug ( " ---> [ %-30s ] ~SB~FRFeed ' s page is different: %s to %s " % ( self . feed . log_title [ : 30 ] , self . feed . feed_link , new_feed_link ) )
2015-06-01 17:14:04 -07:00
redirects , non_redirects = self . feed . count_redirects_in_history ( ' page ' )
2016-02-03 12:11:22 -08:00
self . feed . save_page_history ( 301 , " HTTP Redirect ( %s to go) " % ( 10 - len ( redirects ) ) )
if len ( redirects ) > = 10 or len ( non_redirects ) == 0 :
2015-06-01 17:14:04 -07:00
self . feed . feed_link = new_feed_link
2015-07-22 13:18:54 -07:00
self . feed . save ( update_fields = [ ' feed_link ' ] )
2015-02-19 10:39:10 -08:00
# Determine if stories aren't valid and replace broken guids
guids_seen = set ( )
permalinks_seen = set ( )
for entry in self . fpf . entries :
guids_seen . add ( entry . get ( ' guid ' ) )
permalinks_seen . add ( Feed . get_permalink ( entry ) )
guid_difference = len ( guids_seen ) != len ( self . fpf . entries )
single_guid = len ( guids_seen ) == 1
replace_guids = single_guid and guid_difference
permalink_difference = len ( permalinks_seen ) != len ( self . fpf . entries )
single_permalink = len ( permalinks_seen ) == 1
replace_permalinks = single_permalink and permalink_difference
2009-08-29 19:34:42 +00:00
# Compare new stories to existing stories, adding and updating
2010-12-16 17:10:13 -05:00
start_date = datetime . datetime . utcnow ( )
2014-04-17 12:10:04 -07:00
story_hashes = [ ]
2012-07-21 16:38:37 -07:00
stories = [ ]
2009-12-18 20:47:44 +00:00
for entry in self . fpf . entries :
2016-05-11 12:01:20 -07:00
story = pre_process_story ( entry , self . fpf . encoding )
2010-12-16 17:10:13 -05:00
if story . get ( ' published ' ) < start_date :
start_date = story . get ( ' published ' )
2015-02-19 10:39:10 -08:00
if replace_guids :
if replace_permalinks :
new_story_guid = unicode ( story . get ( ' published ' ) )
if self . options [ ' verbose ' ] :
logging . debug ( u ' ---> [ %-30s ] ~FBReplacing guid ( %s ) with timestamp: %s ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] ,
2015-02-19 10:39:10 -08:00
story . get ( ' guid ' ) , new_story_guid ) )
story [ ' guid ' ] = new_story_guid
else :
new_story_guid = Feed . get_permalink ( story )
if self . options [ ' verbose ' ] :
logging . debug ( u ' ---> [ %-30s ] ~FBReplacing guid ( %s ) with permalink: %s ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] ,
2015-02-19 10:39:10 -08:00
story . get ( ' guid ' ) , new_story_guid ) )
story [ ' guid ' ] = new_story_guid
2014-04-17 12:10:04 -07:00
story [ ' story_hash ' ] = MStory . feed_guid_hash_unsaved ( self . feed . pk , story . get ( ' guid ' ) )
2012-07-21 16:38:37 -07:00
stories . append ( story )
2014-04-17 12:10:04 -07:00
story_hashes . append ( story . get ( ' story_hash ' ) )
2017-05-01 09:27:31 -07:00
original_story_hash_count = len ( story_hashes )
2017-05-01 12:06:56 -07:00
story_hashes_in_unread_cutoff = self . feed . story_hashes_in_unread_cutoff [ : original_story_hash_count ]
2017-05-01 09:27:31 -07:00
story_hashes . extend ( story_hashes_in_unread_cutoff )
story_hashes = list ( set ( story_hashes ) )
if self . options [ ' verbose ' ] or settings . DEBUG :
2017-05-01 11:39:24 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FBFound ~SB %s ~SN guids, adding ~SB %s ~SN/ %s guids from db ' % (
2017-05-01 09:27:31 -07:00
self . feed . log_title [ : 30 ] ,
2017-05-01 11:39:24 -07:00
original_story_hash_count , len ( story_hashes ) - original_story_hash_count ,
len ( story_hashes_in_unread_cutoff ) ) )
2017-05-01 09:27:31 -07:00
2014-04-17 12:10:04 -07:00
existing_stories = dict ( ( s . story_hash , s ) for s in MStory . objects (
story_hash__in = story_hashes ,
# story_date__gte=start_date,
# story_feed_id=self.feed.pk
) )
2017-05-01 11:39:24 -07:00
# if len(existing_stories) == 0:
# existing_stories = dict((s.story_hash, s) for s in MStory.objects(
# story_date__gte=start_date,
# story_feed_id=self.feed.pk
# ))
2015-07-22 13:53:20 -07:00
2012-07-21 16:38:37 -07:00
ret_values = self . feed . add_update_stories ( stories , existing_stories ,
2014-05-29 17:53:16 -07:00
verbose = self . options [ ' verbose ' ] ,
updates_off = self . options [ ' updates_off ' ] )
2012-03-27 17:34:39 -07:00
2016-11-15 20:45:59 -08:00
# PubSubHubbub
2013-03-06 14:45:01 -08:00
if ( hasattr ( self . fpf , ' feed ' ) and
2012-03-27 17:34:39 -07:00
hasattr ( self . fpf . feed , ' links ' ) and self . fpf . feed . links ) :
hub_url = None
2012-03-30 16:36:16 -07:00
self_url = self . feed . feed_address
2012-03-27 17:34:39 -07:00
for link in self . fpf . feed . links :
2013-01-22 18:15:22 -08:00
if link [ ' rel ' ] == ' hub ' and not hub_url :
2012-03-27 17:34:39 -07:00
hub_url = link [ ' href ' ]
elif link [ ' rel ' ] == ' self ' :
self_url = link [ ' href ' ]
2013-07-10 16:09:41 -07:00
push_expired = False
if self . feed . is_push :
try :
push_expired = self . feed . push . lease_expires < datetime . datetime . now ( )
except PushSubscription . DoesNotExist :
self . feed . is_push = False
2013-03-06 14:45:01 -08:00
if ( hub_url and self_url and not settings . DEBUG and
2013-03-20 16:17:20 -07:00
self . feed . active_subscribers > 0 and
2013-03-06 14:45:01 -08:00
( push_expired or not self . feed . is_push or self . options . get ( ' force ' ) ) ) :
logging . debug ( u ' ---> [ %-30s ] ~BB~FW %s Subscribing to PuSH hub: %s ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] ,
2013-03-06 14:47:15 -08:00
" ~SKRe-~SN " if push_expired else " " , hub_url ) )
2013-07-01 18:46:21 -07:00
try :
PushSubscription . objects . subscribe ( self_url , feed = self . feed , hub = hub_url )
except TimeoutError :
logging . debug ( u ' ---> [ %-30s ] ~BB~FW~FRTimed out~FW subscribing to PuSH hub: %s ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] , hub_url ) )
2013-03-20 16:17:20 -07:00
elif ( self . feed . is_push and
( self . feed . active_subscribers < = 0 or not hub_url ) ) :
2013-03-07 10:58:05 -05:00
logging . debug ( u ' ---> [ %-30s ] ~BB~FWTurning off PuSH, no hub found ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] ) )
2013-03-07 10:58:05 -05:00
self . feed . is_push = False
self . feed = self . feed . save ( )
2016-11-15 20:45:59 -08:00
2016-11-16 17:49:43 -08:00
# Push notifications
2016-11-16 18:29:13 -08:00
if ret_values [ ' new ' ] > 0 and MUserFeedNotification . feed_has_users ( self . feed . pk ) > 0 :
2016-11-16 17:49:43 -08:00
QueueNotifications . delay ( self . feed . pk , ret_values [ ' new ' ] )
2016-11-15 20:45:59 -08:00
# All Done
2012-07-22 12:25:09 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYParsed Feed: %s new= %s ~SN~FY %s up= %s ~SN same= %s %s ~SN %s err= %s ~SN~FY total=~SB %s ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] ,
2012-09-04 11:46:41 -07:00
' ~FG~SB ' if ret_values [ ' new ' ] else ' ' , ret_values [ ' new ' ] ,
' ~FY~SB ' if ret_values [ ' updated ' ] else ' ' , ret_values [ ' updated ' ] ,
' ~SB ' if ret_values [ ' same ' ] else ' ' , ret_values [ ' same ' ] ,
' ~FR~SB ' if ret_values [ ' error ' ] else ' ' , ret_values [ ' error ' ] ,
2012-05-07 17:16:32 -07:00
len ( self . fpf . entries ) ) )
2015-08-05 13:59:26 -07:00
self . feed . update_all_statistics ( has_new_stories = bool ( ret_values [ ' new ' ] ) , force = self . options [ ' force ' ] )
2017-04-12 19:13:33 -07:00
fetch_date = datetime . datetime . now ( )
2012-12-17 15:48:43 -08:00
if ret_values [ ' new ' ] :
2017-04-30 18:47:10 -07:00
if not getattr ( settings , ' TEST_DEBUG ' , False ) :
self . feed . trim_feed ( )
self . feed . expire_redis ( )
2017-04-12 19:13:33 -07:00
if MStatistics . get ( ' raw_feed ' , None ) == self . feed . pk :
self . feed . save_raw_feed ( self . raw_feed , fetch_date )
self . feed . save_feed_history ( 200 , " OK " , date = fetch_date )
2015-07-22 13:53:20 -07:00
2012-03-27 11:19:53 -07:00
if self . options [ ' verbose ' ] :
logging . debug ( u ' ---> [ %-30s ] ~FBTIME: feed parse in ~FM %.4s s ' % (
2017-03-31 19:52:24 -07:00
self . feed . log_title [ : 30 ] , time . time ( ) - start ) )
2012-03-26 17:21:49 -07:00
2009-08-29 19:34:42 +00:00
return FEED_OK , ret_values
2010-01-21 13:12:29 -05:00
2009-08-29 19:34:42 +00:00
class Dispatcher :
def __init__ ( self , options , num_threads ) :
self . options = options
self . feed_stats = {
FEED_OK : 0 ,
FEED_SAME : 0 ,
FEED_ERRPARSE : 0 ,
FEED_ERRHTTP : 0 ,
FEED_ERREXC : 0 }
self . feed_trans = {
FEED_OK : ' ok ' ,
FEED_SAME : ' unchanged ' ,
FEED_ERRPARSE : ' cant_parse ' ,
FEED_ERRHTTP : ' http_error ' ,
FEED_ERREXC : ' exception ' }
self . feed_keys = sorted ( self . feed_trans . keys ( ) )
2009-09-10 03:33:05 +00:00
self . num_threads = num_threads
2010-10-10 23:55:00 -04:00
self . time_start = datetime . datetime . utcnow ( )
2009-09-16 03:54:33 +00:00
self . workers = [ ]
2009-08-29 19:34:42 +00:00
2010-10-06 22:43:05 -04:00
def refresh_feed ( self , feed_id ) :
2010-12-23 13:29:31 -05:00
""" Update feed, since it may have changed """
2016-05-20 16:15:18 -07:00
return Feed . get_by_id ( feed_id )
2010-10-06 22:43:05 -04:00
2009-09-16 03:54:33 +00:00
def process_feed_wrapper ( self , feed_queue ) :
2010-08-25 19:10:55 -04:00
delta = None
2009-09-16 02:22:27 +00:00
current_process = multiprocessing . current_process ( )
2010-04-09 16:37:19 -04:00
identity = " X "
2012-03-27 17:34:39 -07:00
feed = None
2012-09-06 17:16:01 -07:00
2010-04-09 16:37:19 -04:00
if current_process . _identity :
identity = current_process . _identity [ 0 ]
2010-12-23 13:29:31 -05:00
2010-10-03 18:04:40 -04:00
for feed_id in feed_queue :
2012-09-06 17:16:01 -07:00
start_duration = time . time ( )
feed_fetch_duration = None
feed_process_duration = None
page_duration = None
icon_duration = None
2012-09-06 22:31:44 -07:00
feed_code = None
2012-09-07 22:26:19 -07:00
ret_entries = None
2012-01-04 18:47:40 -08:00
start_time = time . time ( )
2011-02-06 15:04:21 -05:00
ret_feed = FEED_ERREXC
2009-09-10 03:33:05 +00:00
try :
2010-11-05 20:34:17 -04:00
feed = self . refresh_feed ( feed_id )
2012-02-26 22:33:06 -08:00
skip = False
2012-02-24 16:48:58 -08:00
if self . options . get ( ' fake ' ) :
2012-02-26 22:33:06 -08:00
skip = True
weight = " - "
2012-02-27 16:36:39 -08:00
quick = " - "
rand = " - "
2012-03-27 18:37:04 -07:00
elif ( self . options . get ( ' quick ' ) and not self . options [ ' force ' ] and
feed . known_good and feed . fetched_once and not feed . is_push ) :
2012-02-26 22:33:06 -08:00
weight = feed . stories_last_month * feed . num_subscribers
random_weight = random . randint ( 1 , max ( weight , 1 ) )
2012-03-19 15:46:59 -07:00
quick = float ( self . options . get ( ' quick ' , 0 ) )
2012-02-26 22:33:06 -08:00
rand = random . random ( )
2016-11-09 09:00:30 -08:00
if random_weight < 1000 and rand < quick :
2012-02-26 22:33:06 -08:00
skip = True
2013-09-30 12:40:14 -07:00
elif False and feed . feed_address . startswith ( " http://news.google.com/news " ) :
2013-09-15 23:41:17 -07:00
skip = True
weight = " - "
quick = " - "
rand = " - "
2012-02-26 22:33:06 -08:00
if skip :
2012-02-27 16:36:39 -08:00
logging . debug ( ' ---> [ %-30s ] ~BGFaking fetch, skipping ( %s /month, %s subs, %s < %s )... ' % (
2017-03-31 19:52:24 -07:00
feed . log_title [ : 30 ] ,
2012-02-26 22:33:06 -08:00
weight ,
2012-02-27 16:36:39 -08:00
feed . num_subscribers ,
rand , quick ) )
2012-02-24 16:43:08 -08:00
continue
2012-09-06 17:16:01 -07:00
2010-10-03 18:04:40 -04:00
ffeed = FetchFeed ( feed_id , self . options )
2010-04-27 13:44:53 -04:00
ret_feed , fetched_feed = ffeed . fetch ( )
2017-05-22 16:46:56 -07:00
2012-09-06 17:16:01 -07:00
feed_fetch_duration = time . time ( ) - start_duration
2017-04-12 19:13:33 -07:00
raw_feed = ffeed . raw_feed
2010-08-18 21:54:33 -04:00
2010-08-30 22:42:44 -04:00
if ( ( fetched_feed and ret_feed == FEED_OK ) or self . options [ ' force ' ] ) :
2017-04-12 19:13:33 -07:00
pfeed = ProcessFeed ( feed_id , fetched_feed , self . options , raw_feed = raw_feed )
2013-08-06 13:54:06 -07:00
ret_feed , ret_entries = pfeed . process ( )
feed = pfeed . feed
2012-09-06 17:16:01 -07:00
feed_process_duration = time . time ( ) - start_duration
2010-10-03 17:53:35 -04:00
2013-08-06 13:38:35 -07:00
if ( ret_entries and ret_entries [ ' new ' ] ) or self . options [ ' force ' ] :
2012-03-26 17:21:49 -07:00
start = time . time ( )
2012-06-18 15:59:31 -07:00
if not feed . known_good or not feed . fetched_once :
2012-02-01 17:59:46 -08:00
feed . known_good = True
2012-06-18 15:59:31 -07:00
feed . fetched_once = True
2012-03-27 17:34:39 -07:00
feed = feed . save ( )
2013-04-07 17:19:59 -07:00
if self . options [ ' force ' ] or random . random ( ) < = 0.02 :
2017-03-31 19:52:24 -07:00
logging . debug ( ' ---> [ %-30s ] ~FBPerforming feed cleanup... ' % ( feed . log_title [ : 30 ] , ) )
2013-04-07 17:19:59 -07:00
start_cleanup = time . time ( )
2012-11-26 10:39:10 -08:00
feed . sync_redis ( )
2017-03-31 19:52:24 -07:00
logging . debug ( ' ---> [ %-30s ] ~FBDone with feed cleanup. Took ~SB %.4s ~SN sec. ' % ( feed . log_title [ : 30 ] , time . time ( ) - start_cleanup ) )
2010-11-05 12:53:02 -04:00
try :
2010-11-10 18:22:33 -05:00
self . count_unreads_for_subscribers ( feed )
2010-11-05 12:53:02 -04:00
except TimeoutError :
2017-03-31 19:52:24 -07:00
logging . debug ( ' ---> [ %-30s ] Unread count took too long... ' % ( feed . log_title [ : 30 ] , ) )
2012-03-27 11:19:53 -07:00
if self . options [ ' verbose ' ] :
logging . debug ( u ' ---> [ %-30s ] ~FBTIME: unread count in ~FM %.4s s ' % (
2017-03-31 19:52:24 -07:00
feed . log_title [ : 30 ] , time . time ( ) - start ) )
2010-07-08 01:07:37 -04:00
except urllib2 . HTTPError , e :
2012-03-27 17:34:39 -07:00
logging . debug ( ' ---> [ %-30s ] ~FRFeed throws HTTP error: ~SB %s ' % ( unicode ( feed_id ) [ : 30 ] , e . fp . read ( ) ) )
2016-02-09 16:22:34 -08:00
feed_code = e . code
feed . save_feed_history ( feed_code , e . msg , e . fp . read ( ) )
2010-08-23 07:58:09 -04:00
fetched_feed = None
2010-10-08 16:33:53 -04:00
except Feed . DoesNotExist , e :
2012-03-27 17:34:39 -07:00
logging . debug ( ' ---> [ %-30s ] ~FRFeed is now gone... ' % ( unicode ( feed_id ) [ : 30 ] ) )
2010-11-03 21:41:43 -04:00
continue
2016-02-01 23:06:04 -08:00
except SoftTimeLimitExceeded , e :
logging . debug ( " ---> [ %-30s ] ~BR~FWTime limit hit!~SB~FR Moving on to next feed... " % feed )
ret_feed = FEED_ERREXC
fetched_feed = None
2016-02-09 16:22:34 -08:00
feed_code = 559
feed . save_feed_history ( feed_code , ' Timeout ' , e )
2010-11-05 20:34:17 -04:00
except TimeoutError , e :
2017-03-31 19:52:24 -07:00
logging . debug ( ' ---> [ %-30s ] ~FRFeed fetch timed out... ' % ( feed . log_title [ : 30 ] ) )
2012-09-06 22:31:44 -07:00
feed_code = 505
2016-02-09 16:22:34 -08:00
feed . save_feed_history ( feed_code , ' Timeout ' , e )
2010-11-05 20:34:17 -04:00
fetched_feed = None
2010-07-08 01:07:37 -04:00
except Exception , e :
2010-11-05 20:34:17 -04:00
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
2010-07-06 13:21:12 -04:00
tb = traceback . format_exc ( )
2010-12-07 23:51:58 -05:00
logging . error ( tb )
2010-11-05 20:34:17 -04:00
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
2010-04-19 12:42:32 -04:00
ret_feed = FEED_ERREXC
2012-10-25 16:09:06 -07:00
feed = Feed . get_by_id ( getattr ( feed , ' pk ' , feed_id ) )
2013-04-03 21:20:19 -07:00
if not feed : continue
2010-07-08 11:37:54 -04:00
feed . save_feed_history ( 500 , " Error " , tb )
2012-09-06 22:31:44 -07:00
feed_code = 500
2010-08-23 07:58:09 -04:00
fetched_feed = None
2013-03-20 08:24:11 -07:00
# mail_feed_error_to_admin(feed, e, local_vars=locals())
2012-11-27 16:22:03 -08:00
if ( not settings . DEBUG and hasattr ( settings , ' RAVEN_CLIENT ' ) and
settings . RAVEN_CLIENT ) :
2013-01-07 16:35:29 -08:00
settings . RAVEN_CLIENT . captureException ( )
2012-09-06 22:31:44 -07:00
if not feed_code :
if ret_feed == FEED_OK :
feed_code = 200
elif ret_feed == FEED_SAME :
feed_code = 304
elif ret_feed == FEED_ERRHTTP :
feed_code = 400
if ret_feed == FEED_ERREXC :
feed_code = 500
elif ret_feed == FEED_ERRPARSE :
feed_code = 550
2013-04-04 17:18:27 -07:00
if not feed : continue
2012-03-27 17:34:39 -07:00
feed = self . refresh_feed ( feed . pk )
2015-11-28 13:54:21 -08:00
if not feed : continue
2015-07-22 13:53:20 -07:00
2010-08-30 22:42:44 -04:00
if ( ( self . options [ ' force ' ] ) or
2012-03-05 13:12:50 -08:00
( random . random ( ) > .9 ) or
2010-08-30 22:42:44 -04:00
( fetched_feed and
feed . feed_link and
2011-09-04 10:59:29 -07:00
feed . has_page and
2010-08-30 22:42:44 -04:00
( ret_feed == FEED_OK or
( ret_feed == FEED_SAME and feed . stories_last_month > 10 ) ) ) ) :
2010-09-07 14:41:11 -07:00
2017-03-31 19:52:24 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYFetching page: %s ' % ( feed . log_title [ : 30 ] , feed . feed_link ) )
2011-12-20 22:30:55 -08:00
page_importer = PageImporter ( feed )
2011-01-17 14:20:36 -05:00
try :
2012-07-30 11:12:58 -07:00
page_data = page_importer . fetch_page ( )
2012-09-06 17:16:01 -07:00
page_duration = time . time ( ) - start_duration
2016-02-01 23:06:04 -08:00
except SoftTimeLimitExceeded , e :
logging . debug ( " ---> [ %-30s ] ~BR~FWTime limit hit!~SB~FR Moving on to next feed... " % feed )
page_data = None
feed . save_feed_history ( 557 , ' Timeout ' , e )
2011-01-29 22:01:09 -05:00
except TimeoutError , e :
2017-03-31 19:52:24 -07:00
logging . debug ( ' ---> [ %-30s ] ~FRPage fetch timed out... ' % ( feed . log_title [ : 30 ] ) )
2012-07-30 21:39:21 -07:00
page_data = None
2011-01-29 22:01:09 -05:00
feed . save_page_history ( 555 , ' Timeout ' , ' ' )
2011-01-17 14:20:36 -05:00
except Exception , e :
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
tb = traceback . format_exc ( )
logging . error ( tb )
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
2011-01-29 22:01:09 -05:00
feed . save_page_history ( 550 , " Page Error " , tb )
2011-01-17 14:20:36 -05:00
fetched_feed = None
2012-07-30 11:12:58 -07:00
page_data = None
2013-03-20 08:24:11 -07:00
# mail_feed_error_to_admin(feed, e, local_vars=locals())
if ( not settings . DEBUG and hasattr ( settings , ' RAVEN_CLIENT ' ) and
settings . RAVEN_CLIENT ) :
settings . RAVEN_CLIENT . captureException ( )
2015-07-22 13:53:20 -07:00
2012-04-24 17:40:34 -07:00
feed = self . refresh_feed ( feed . pk )
2017-03-31 19:52:24 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYFetching icon: %s ' % ( feed . log_title [ : 30 ] , feed . feed_link ) )
2013-09-06 12:42:39 -07:00
force = self . options [ ' force ' ]
if random . random ( ) > .99 :
force = True
icon_importer = IconImporter ( feed , page_data = page_data , force = force )
2011-01-27 19:05:50 -05:00
try :
icon_importer . save ( )
2012-09-06 17:16:01 -07:00
icon_duration = time . time ( ) - start_duration
2016-02-01 23:06:04 -08:00
except SoftTimeLimitExceeded , e :
logging . debug ( " ---> [ %-30s ] ~BR~FWTime limit hit!~SB~FR Moving on to next feed... " % feed )
feed . save_feed_history ( 558 , ' Timeout ' , e )
2011-01-30 12:53:05 -05:00
except TimeoutError , e :
2017-03-31 19:52:24 -07:00
logging . debug ( ' ---> [ %-30s ] ~FRIcon fetch timed out... ' % ( feed . log_title [ : 30 ] ) )
2011-01-30 12:53:05 -05:00
feed . save_page_history ( 556 , ' Timeout ' , ' ' )
2011-01-27 19:05:50 -05:00
except Exception , e :
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
tb = traceback . format_exc ( )
logging . error ( tb )
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
# feed.save_feed_history(560, "Icon Error", tb)
2013-03-20 08:24:11 -07:00
# mail_feed_error_to_admin(feed, e, local_vars=locals())
if ( not settings . DEBUG and hasattr ( settings , ' RAVEN_CLIENT ' ) and
settings . RAVEN_CLIENT ) :
settings . RAVEN_CLIENT . captureException ( )
2011-12-04 13:55:57 -08:00
else :
2017-03-31 19:52:24 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FBSkipping page fetch: ( %s on %s stories) %s ' % ( feed . log_title [ : 30 ] , self . feed_trans [ ret_feed ] , feed . stories_last_month , ' ' if feed . has_page else ' [HAS NO PAGE] ' ) )
2011-12-04 13:55:57 -08:00
2012-03-27 17:34:39 -07:00
feed = self . refresh_feed ( feed . pk )
2012-01-04 18:47:40 -08:00
delta = time . time ( ) - start_time
2010-04-29 13:35:46 -04:00
2012-01-04 18:47:40 -08:00
feed . last_load_time = round ( delta )
2010-08-09 20:44:36 -04:00
feed . fetched_once = True
2010-07-25 23:13:27 -04:00
try :
2015-07-22 13:18:54 -07:00
feed = feed . save ( update_fields = [ ' last_load_time ' , ' fetched_once ' ] )
2010-07-28 01:14:25 -04:00
except IntegrityError :
2017-03-31 19:52:24 -07:00
logging . debug ( " ***> [ %-30s ] ~FRIntegrityError on feed: %s " % ( feed . log_title [ : 30 ] , feed . feed_address , ) )
2010-04-29 13:35:46 -04:00
2012-09-07 22:26:19 -07:00
if ret_entries and ret_entries [ ' new ' ] :
2016-12-13 16:29:42 -08:00
self . publish_to_subscribers ( feed , ret_entries [ ' new ' ] )
2011-11-05 17:08:31 -07:00
2012-07-22 12:25:09 -07:00
done_msg = ( u ' %2s ---> [ %-30s ] ~FYProcessed in ~FM~SB %.4s s~FY~SN (~FB %s ~FY) [ %s ] ' % (
2017-03-31 19:52:24 -07:00
identity , feed . log_title [ : 30 ] , delta ,
2011-01-29 22:01:09 -05:00
feed . pk , self . feed_trans [ ret_feed ] , ) )
2010-04-06 16:56:47 -04:00
logging . debug ( done_msg )
2012-09-06 17:16:01 -07:00
total_duration = time . time ( ) - start_duration
2012-09-06 21:43:18 -07:00
MAnalyticsFetcher . add ( feed_id = feed . pk , feed_fetch = feed_fetch_duration ,
feed_process = feed_process_duration ,
page = page_duration , icon = icon_duration ,
2012-09-06 22:31:44 -07:00
total = total_duration , feed_code = feed_code )
2010-04-06 16:56:47 -04:00
2009-09-10 03:33:05 +00:00
self . feed_stats [ ret_feed ] + = 1
2015-07-22 13:53:20 -07:00
2012-03-22 10:39:24 -07:00
if len ( feed_queue ) == 1 :
return feed
2010-08-17 17:45:51 -04:00
2011-09-01 09:11:29 -07:00
# time_taken = datetime.datetime.utcnow() - self.time_start
2010-11-05 12:53:02 -04:00
2016-12-13 16:29:42 -08:00
def publish_to_subscribers ( self , feed , new_count ) :
2011-11-05 17:08:31 -07:00
try :
2013-06-18 12:21:27 -07:00
r = redis . Redis ( connection_pool = settings . REDIS_PUBSUB_POOL )
2016-12-13 16:29:42 -08:00
listeners_count = r . publish ( str ( feed . pk ) , ' story:new_count: %s ' % new_count )
2011-11-06 12:28:06 -08:00
if listeners_count :
2017-03-31 19:52:24 -07:00
logging . debug ( " ---> [ %-30s ] ~FMPublished to %s subscribers " % ( feed . log_title [ : 30 ] , listeners_count ) )
2011-11-05 17:08:31 -07:00
except redis . ConnectionError :
2017-03-31 19:52:24 -07:00
logging . debug ( " ***> [ %-30s ] ~BMRedis is unavailable for real-time. " % ( feed . log_title [ : 30 ] , ) )
2011-11-05 17:08:31 -07:00
2010-11-05 12:53:02 -04:00
def count_unreads_for_subscribers ( self , feed ) :
2010-11-10 18:22:33 -05:00
user_subs = UserSubscription . objects . filter ( feed = feed ,
active = True ,
2013-09-16 16:42:49 -07:00
user__profile__last_seen_on__gte = feed . unread_cutoff ) \
2010-11-10 18:22:33 -05:00
. order_by ( ' -last_read_date ' )
2012-10-29 12:25:28 -07:00
if not user_subs . count ( ) :
return
2010-11-05 12:53:02 -04:00
for sub in user_subs :
2012-04-24 16:34:28 -07:00
if not sub . needs_unread_recalc :
sub . needs_unread_recalc = True
sub . save ( )
2012-08-17 00:10:17 -07:00
2010-11-10 18:22:33 -05:00
if self . options [ ' compute_scores ' ] :
2013-09-16 12:07:15 -07:00
r = redis . Redis ( connection_pool = settings . REDIS_STORY_HASH_POOL )
2012-10-29 12:25:28 -07:00
stories = MStory . objects ( story_feed_id = feed . pk ,
2013-09-16 16:42:49 -07:00
story_date__gte = feed . unread_cutoff )
2012-10-29 12:25:28 -07:00
stories = Feed . format_stories ( stories , feed . pk )
2013-09-16 16:42:49 -07:00
story_hashes = r . zrangebyscore ( ' zF: %s ' % feed . pk , int ( feed . unread_cutoff . strftime ( ' %s ' ) ) ,
2013-09-16 12:07:15 -07:00
int ( time . time ( ) + 60 * 60 * 24 ) )
missing_story_hashes = set ( story_hashes ) - set ( [ s [ ' story_hash ' ] for s in stories ] )
if missing_story_hashes :
missing_stories = MStory . objects ( story_feed_id = feed . pk ,
story_hash__in = missing_story_hashes ) \
. read_preference ( pymongo . ReadPreference . PRIMARY )
missing_stories = Feed . format_stories ( missing_stories , feed . pk )
stories = missing_stories + stories
2017-03-31 19:52:24 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYFound ~SB~FC %s (of %s )/ %s ~FY~SN un-secondaried stories while computing scores ' % ( feed . log_title [ : 30 ] , len ( missing_stories ) , len ( missing_story_hashes ) , len ( stories ) ) )
2013-05-03 09:43:21 -07:00
cache . set ( " S: %s " % feed . pk , stories , 60 )
2012-08-17 00:10:17 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYComputing scores: ~SB %s stories~SN with ~SB %s subscribers ~SN( %s / %s / %s ) ' % (
2017-03-31 19:52:24 -07:00
feed . log_title [ : 30 ] , len ( stories ) , user_subs . count ( ) ,
2012-08-17 00:10:17 -07:00
feed . num_subscribers , feed . active_subscribers , feed . premium_subscribers ) )
2012-10-29 12:25:28 -07:00
self . calculate_feed_scores_with_stories ( user_subs , stories )
2012-09-10 17:50:36 -07:00
elif self . options . get ( ' mongodb_replication_lag ' ) :
2012-08-17 00:10:17 -07:00
logging . debug ( u ' ---> [ %-30s ] ~BR~FYSkipping computing scores: ~SB %s seconds~SN of mongodb lag ' % (
2017-03-31 19:52:24 -07:00
feed . log_title [ : 30 ] , self . options . get ( ' mongodb_replication_lag ' ) ) )
2012-08-17 00:10:17 -07:00
@timelimit ( 10 )
2012-10-29 12:25:28 -07:00
def calculate_feed_scores_with_stories ( self , user_subs , stories ) :
2012-08-17 00:10:17 -07:00
for sub in user_subs :
silent = False if self . options [ ' verbose ' ] > = 2 else True
2012-10-29 12:25:28 -07:00
sub . calculate_feed_scores ( silent = silent , stories = stories )
2010-11-05 12:53:02 -04:00
2010-04-25 18:31:54 -04:00
def add_jobs ( self , feeds_queue , feeds_count = 1 ) :
2009-08-29 19:34:42 +00:00
""" adds a feed processing job to the pool
"""
2010-04-09 16:37:19 -04:00
self . feeds_queue = feeds_queue
2010-04-23 21:19:19 -04:00
self . feeds_count = feeds_count
2009-08-29 19:34:42 +00:00
2009-09-10 03:33:05 +00:00
def run_jobs ( self ) :
2010-04-09 16:37:19 -04:00
if self . options [ ' single_threaded ' ] :
2012-03-21 16:05:52 -07:00
return self . process_feed_wrapper ( self . feeds_queue [ 0 ] )
2010-04-09 16:37:19 -04:00
else :
for i in range ( self . num_threads ) :
feed_queue = self . feeds_queue [ i ]
2010-07-05 14:26:35 -04:00
self . workers . append ( multiprocessing . Process ( target = self . process_feed_wrapper ,
args = ( feed_queue , ) ) )
2010-04-09 16:37:19 -04:00
for i in range ( self . num_threads ) :
self . workers [ i ] . start ( )