2012-02-01 17:59:46 -08:00
import time
import datetime
import traceback
import multiprocessing
import urllib2
import xml . sax
import redis
2012-02-26 22:33:06 -08:00
import random
2012-10-30 10:57:12 -07:00
import pymongo
2010-08-21 13:57:39 -04:00
from django . conf import settings
2010-12-16 16:52:00 -05:00
from django . db import IntegrityError
2012-08-16 22:42:13 -07:00
from apps . reader . models import UserSubscription
2010-10-03 17:53:35 -04:00
from apps . rss_feeds . models import Feed , MStory
2011-01-27 19:05:50 -05:00
from apps . rss_feeds . page_importer import PageImporter
from apps . rss_feeds . icon_importer import IconImporter
2012-03-27 16:26:07 -07:00
from apps . push . models import PushSubscription
2012-09-06 17:16:01 -07:00
from apps . statistics . models import MAnalyticsFetcher
2010-04-09 16:37:19 -04:00
from utils import feedparser
2009-12-18 20:47:44 +00:00
from utils . story_functions import pre_process_story
2010-08-16 12:52:39 -04:00
from utils import log as logging
2011-02-15 21:08:40 -05:00
from utils . feed_functions import timelimit , TimeoutError , mail_feed_error_to_admin , utf8encode
2012-03-26 17:21:49 -07:00
2009-08-29 19:34:42 +00:00
# Refresh feed code adapted from Feedjack.
# http://feedjack.googlecode.com
ENTRY_NEW , ENTRY_UPDATED , ENTRY_SAME , ENTRY_ERR = range ( 4 )
FEED_OK , FEED_SAME , FEED_ERRPARSE , FEED_ERRHTTP , FEED_ERREXC = range ( 5 )
def mtime ( ttime ) :
""" datetime auxiliar function.
"""
return datetime . datetime . fromtimestamp ( time . mktime ( ttime ) )
2010-08-30 23:55:24 -04:00
2009-08-29 19:34:42 +00:00
class FetchFeed :
2010-10-03 18:04:40 -04:00
def __init__ ( self , feed_id , options ) :
2012-10-25 16:14:25 -07:00
self . feed = Feed . get_by_id ( feed_id )
2009-08-29 19:34:42 +00:00
self . options = options
self . fpf = None
2010-08-30 23:55:24 -04:00
2011-02-02 13:07:12 -05:00
@timelimit ( 20 )
2009-08-29 19:34:42 +00:00
def fetch ( self ) :
2010-10-07 19:07:43 -04:00
"""
Uses feedparser to download the feed . Will be parsed later .
2009-08-29 19:34:42 +00:00
"""
2012-03-26 17:21:49 -07:00
start = time . time ( )
2010-08-17 17:45:51 -04:00
identity = self . get_identity ( )
2012-01-04 18:47:40 -08:00
log_msg = u ' %2s ---> [ %-30s ] ~FYFetching feed (~FB %d ~FY), last update: %s ' % ( identity ,
2012-03-20 16:46:38 -07:00
self . feed . title [ : 30 ] ,
2011-11-15 18:19:09 -08:00
self . feed . id ,
datetime . datetime . now ( ) - self . feed . last_update )
2010-08-15 12:04:26 -04:00
logging . debug ( log_msg )
2010-04-27 13:44:53 -04:00
2010-08-30 19:57:27 -04:00
etag = self . feed . etag
2010-07-08 01:26:03 -04:00
modified = self . feed . last_modified . utctimetuple ( ) [ : 7 ] if self . feed . last_modified else None
2010-08-30 19:57:27 -04:00
2012-02-24 11:47:38 -08:00
if self . options . get ( ' force ' ) or not self . feed . fetched_once or not self . feed . known_good :
2010-08-30 19:57:27 -04:00
modified = None
etag = None
2012-05-07 17:16:32 -07:00
USER_AGENT = ' Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/536.2.3 (KHTML, like Gecko) Version/5.2 (NewsBlur Feed Fetcher - %s subscriber %s - %s ) ' % (
2011-01-29 22:01:09 -05:00
self . feed . num_subscribers ,
' s ' if self . feed . num_subscribers != 1 else ' ' ,
2011-11-27 02:41:12 -05:00
settings . NEWSBLUR_URL
2011-01-29 22:01:09 -05:00
)
2012-03-28 15:49:21 -07:00
if self . options . get ( ' feed_xml ' ) :
logging . debug ( u ' ---> [ %-30s ] ~FM~BKFeed has been fat pinged. Ignoring fat: %s ' % (
2012-03-28 18:42:35 -07:00
self . feed . title [ : 30 ] , len ( self . options . get ( ' feed_xml ' ) ) ) )
2012-03-27 18:37:04 -07:00
if self . options . get ( ' fpf ' ) :
self . fpf = self . options . get ( ' fpf ' )
logging . debug ( u ' ---> [ %-30s ] ~FM~BKFeed fetched in real-time with fat ping. ' % (
2012-03-28 18:42:35 -07:00
self . feed . title [ : 30 ] ) )
2012-03-27 18:37:04 -07:00
else :
self . fpf = feedparser . parse ( self . feed . feed_address ,
agent = USER_AGENT ,
etag = etag ,
modified = modified )
2012-03-26 17:21:49 -07:00
2012-05-17 12:42:15 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYFeed fetch in ~FM %.4s s ' % (
2012-05-17 13:27:29 -07:00
self . feed . title [ : 30 ] , time . time ( ) - start ) )
2010-07-08 11:37:54 -04:00
2010-04-27 13:44:53 -04:00
return FEED_OK , self . fpf
2010-08-17 17:45:51 -04:00
def get_identity ( self ) :
identity = " X "
current_process = multiprocessing . current_process ( )
if current_process . _identity :
identity = current_process . _identity [ 0 ]
return identity
2009-08-29 19:34:42 +00:00
class ProcessFeed :
2010-10-11 13:19:42 -04:00
def __init__ ( self , feed_id , fpf , options ) :
2010-10-03 18:19:23 -04:00
self . feed_id = feed_id
2009-08-29 19:34:42 +00:00
self . options = options
self . fpf = fpf
2010-09-07 14:41:11 -07:00
self . entry_trans = {
ENTRY_NEW : ' new ' ,
ENTRY_UPDATED : ' updated ' ,
ENTRY_SAME : ' same ' ,
ENTRY_ERR : ' error ' }
self . entry_keys = sorted ( self . entry_trans . keys ( ) )
2010-10-03 18:19:23 -04:00
def refresh_feed ( self ) :
2012-08-01 21:57:55 -07:00
self . feed = Feed . get_by_id ( self . feed_id )
2012-11-26 10:27:08 -08:00
if self . feed_id != self . feed . pk :
logging . debug ( " ***> Feed has changed: from %s to %s " % ( self . feed_id , self . feed . pk ) )
self . feed_id = self . feed . pk
2010-10-03 18:19:23 -04:00
2012-01-23 09:08:55 -08:00
def process ( self ) :
2009-08-29 19:34:42 +00:00
""" Downloads and parses a feed.
"""
2012-03-26 17:21:49 -07:00
start = time . time ( )
2010-10-03 18:19:23 -04:00
self . refresh_feed ( )
2009-08-29 19:34:42 +00:00
ret_values = {
ENTRY_NEW : 0 ,
ENTRY_UPDATED : 0 ,
ENTRY_SAME : 0 ,
ENTRY_ERR : 0 }
2010-08-29 13:23:50 -04:00
# logging.debug(u' ---> [%d] Processing %s' % (self.feed.id, self.feed.feed_title))
2010-10-02 17:06:36 -04:00
2009-08-29 19:34:42 +00:00
if hasattr ( self . fpf , ' status ' ) :
if self . options [ ' verbose ' ] :
2010-09-07 15:42:22 -07:00
if self . fpf . bozo and self . fpf . status != 304 :
2012-03-27 11:19:53 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FRBOZO exception: %s ~SB( %s entries) ' % (
2012-03-20 16:46:38 -07:00
self . feed . title [ : 30 ] ,
2010-10-03 17:48:44 -04:00
self . fpf . bozo_exception ,
len ( self . fpf . entries ) ) )
2012-03-27 16:26:07 -07:00
2009-08-29 19:34:42 +00:00
if self . fpf . status == 304 :
2012-03-27 17:34:39 -07:00
self . feed = self . feed . save ( )
2010-07-08 11:37:54 -04:00
self . feed . save_feed_history ( 304 , " Not modified " )
2009-08-29 19:34:42 +00:00
return FEED_SAME , ret_values
2010-10-03 18:16:32 -04:00
if self . fpf . status in ( 302 , 301 ) :
2011-02-05 15:34:43 -05:00
if not self . fpf . href . endswith ( ' feedburner.com/atom.xml ' ) :
self . feed . feed_address = self . fpf . href
2012-02-24 11:47:38 -08:00
if not self . feed . known_good :
2012-01-23 09:08:55 -08:00
self . feed . fetched_once = True
2012-03-28 12:05:51 -07:00
logging . debug ( " ---> [ %-30s ] ~SB~SK~FRFeed is %s ' ing. Refetching... " % ( self . feed . title [ : 30 ] , self . fpf . status ) )
2012-03-27 17:34:39 -07:00
self . feed = self . feed . schedule_feed_fetch_immediately ( )
2010-10-06 22:34:28 -04:00
if not self . fpf . entries :
2012-03-27 17:34:39 -07:00
self . feed = self . feed . save ( )
2010-10-06 22:34:28 -04:00
self . feed . save_feed_history ( self . fpf . status , " HTTP Redirect " )
return FEED_ERRHTTP , ret_values
2009-08-29 19:34:42 +00:00
if self . fpf . status > = 400 :
2012-04-24 17:53:41 -07:00
logging . debug ( " ---> [ %-30s ] ~SB~FRHTTP Status code: %s . Checking address... " % ( self . feed . title [ : 30 ] , self . fpf . status ) )
2012-01-23 09:08:55 -08:00
fixed_feed = None
2012-02-01 17:59:46 -08:00
if not self . feed . known_good :
2012-01-23 09:08:55 -08:00
fixed_feed = self . feed . check_feed_link_for_feed_address ( )
2010-12-10 09:32:06 -05:00
if not fixed_feed :
self . feed . save_feed_history ( self . fpf . status , " HTTP Error " )
2012-03-27 17:34:39 -07:00
self . feed = self . feed . save ( )
2009-08-29 19:34:42 +00:00
return FEED_ERRHTTP , ret_values
2012-03-28 12:05:51 -07:00
2012-03-27 11:19:53 -07:00
if not self . fpf . entries :
if self . fpf . bozo and isinstance ( self . fpf . bozo_exception , feedparser . NonXMLContentType ) :
2012-03-28 12:05:51 -07:00
logging . debug ( " ---> [ %-30s ] ~SB~FRFeed is Non-XML. %s entries. Checking address... " % ( self . feed . title [ : 30 ] , len ( self . fpf . entries ) ) )
2012-01-23 09:08:55 -08:00
fixed_feed = None
2012-02-01 17:59:46 -08:00
if not self . feed . known_good :
2012-01-23 09:08:55 -08:00
fixed_feed = self . feed . check_feed_link_for_feed_address ( )
2010-08-25 19:13:28 -04:00
if not fixed_feed :
2012-03-27 11:19:53 -07:00
self . feed . save_feed_history ( 552 , ' Non-xml feed ' , self . fpf . bozo_exception )
2012-03-27 17:34:39 -07:00
self . feed = self . feed . save ( )
2010-08-04 18:51:29 -04:00
return FEED_ERRPARSE , ret_values
2012-03-27 11:19:53 -07:00
elif self . fpf . bozo and isinstance ( self . fpf . bozo_exception , xml . sax . _exceptions . SAXException ) :
2012-03-28 12:05:51 -07:00
logging . debug ( " ---> [ %-30s ] ~SB~FRFeed has SAX/XML parsing issues. %s entries. Checking address... " % ( self . feed . title [ : 30 ] , len ( self . fpf . entries ) ) )
2012-01-23 09:08:55 -08:00
fixed_feed = None
2012-02-01 17:59:46 -08:00
if not self . feed . known_good :
2012-01-23 09:08:55 -08:00
fixed_feed = self . feed . check_feed_link_for_feed_address ( )
2010-08-25 19:10:55 -04:00
if not fixed_feed :
2012-03-27 11:19:53 -07:00
self . feed . save_feed_history ( 553 , ' SAX Exception ' , self . fpf . bozo_exception )
2012-03-27 17:34:39 -07:00
self . feed = self . feed . save ( )
2010-08-04 18:51:29 -04:00
return FEED_ERRPARSE , ret_values
2009-08-29 19:34:42 +00:00
# the feed has changed (or it is the first time we parse it)
# saving the etag and last_modified fields
2010-09-17 13:24:23 -04:00
self . feed . etag = self . fpf . get ( ' etag ' )
if self . feed . etag :
self . feed . etag = self . feed . etag [ : 255 ]
2009-08-29 19:34:42 +00:00
# some times this is None (it never should) *sigh*
if self . feed . etag is None :
self . feed . etag = ' '
try :
self . feed . last_modified = mtime ( self . fpf . modified )
except :
pass
2010-01-26 20:39:11 -05:00
2011-02-04 11:20:29 -05:00
self . fpf . entries = self . fpf . entries [ : 50 ]
2012-03-19 20:47:06 -07:00
if self . fpf . feed . get ( ' title ' ) :
2012-03-21 16:05:52 -07:00
self . feed . feed_title = self . fpf . feed . get ( ' title ' )
2011-02-05 22:27:47 -05:00
tagline = self . fpf . feed . get ( ' tagline ' , self . feed . data . feed_tagline )
if tagline :
2011-02-06 15:04:21 -05:00
self . feed . data . feed_tagline = utf8encode ( tagline )
2011-02-05 22:27:47 -05:00
self . feed . data . save ( )
2011-09-01 09:34:57 -07:00
if not self . feed . feed_link_locked :
self . feed . feed_link = self . fpf . feed . get ( ' link ' ) or self . fpf . feed . get ( ' id ' ) or self . feed . feed_link
2011-02-04 11:20:29 -05:00
2009-08-29 19:34:42 +00:00
guids = [ ]
for entry in self . fpf . entries :
if entry . get ( ' id ' , ' ' ) :
guids . append ( entry . get ( ' id ' , ' ' ) )
2011-02-05 21:33:24 -05:00
elif entry . get ( ' link ' ) :
2009-08-29 19:34:42 +00:00
guids . append ( entry . link )
2011-02-05 21:33:24 -05:00
elif entry . get ( ' title ' ) :
guids . append ( entry . title )
2012-11-26 10:27:08 -08:00
self . feed . save ( )
self . refresh_feed ( )
2009-08-29 19:34:42 +00:00
# Compare new stories to existing stories, adding and updating
2010-12-16 17:10:13 -05:00
start_date = datetime . datetime . utcnow ( )
2010-10-10 23:55:00 -04:00
# end_date = datetime.datetime.utcnow()
2010-01-26 20:23:41 -05:00
story_guids = [ ]
2012-07-21 16:38:37 -07:00
stories = [ ]
2009-12-18 20:47:44 +00:00
for entry in self . fpf . entries :
story = pre_process_story ( entry )
2010-12-16 17:10:13 -05:00
if story . get ( ' published ' ) < start_date :
start_date = story . get ( ' published ' )
2010-09-28 20:43:12 -04:00
# if story.get('published') > end_date:
# end_date = story.get('published')
2012-07-21 16:38:37 -07:00
stories . append ( story )
2010-01-28 13:28:27 -05:00
story_guids . append ( story . get ( ' guid ' ) or story . get ( ' link ' ) )
2012-03-06 16:11:27 -08:00
2011-12-11 11:06:42 -08:00
existing_stories = list ( MStory . objects (
# story_guid__in=story_guids,
story_date__gte = start_date ,
2012-11-26 10:27:08 -08:00
story_feed_id = self . feed . pk
2012-09-24 13:24:45 -07:00
) . limit ( max ( int ( len ( story_guids ) * 1.5 ) , 10 ) ) )
2010-12-16 17:10:13 -05:00
2012-07-21 16:38:37 -07:00
ret_values = self . feed . add_update_stories ( stories , existing_stories ,
verbose = self . options [ ' verbose ' ] )
2012-03-27 17:34:39 -07:00
2012-04-16 14:44:39 -07:00
if ( ( not self . feed . is_push or self . options . get ( ' force ' ) )
and hasattr ( self . fpf , ' feed ' ) and
2012-03-27 17:34:39 -07:00
hasattr ( self . fpf . feed , ' links ' ) and self . fpf . feed . links ) :
hub_url = None
2012-03-30 16:36:16 -07:00
self_url = self . feed . feed_address
2012-03-27 17:34:39 -07:00
for link in self . fpf . feed . links :
if link [ ' rel ' ] == ' hub ' :
hub_url = link [ ' href ' ]
elif link [ ' rel ' ] == ' self ' :
self_url = link [ ' href ' ]
2012-03-27 18:37:04 -07:00
if hub_url and self_url and not settings . DEBUG :
2012-05-17 12:42:15 -07:00
logging . debug ( u ' ---> [ %-30s ] ~BB~FWSubscribing to PuSH hub: %s ' % (
2012-03-28 18:42:35 -07:00
self . feed . title [ : 30 ] , hub_url ) )
2012-03-27 17:34:39 -07:00
PushSubscription . objects . subscribe ( self_url , feed = self . feed , hub = hub_url )
2010-09-07 14:41:11 -07:00
2012-07-22 12:25:09 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYParsed Feed: %s new= %s ~SN~FY %s up= %s ~SN same= %s %s ~SN %s err= %s ~SN~FY total=~SB %s ' % (
2012-03-20 16:46:38 -07:00
self . feed . title [ : 30 ] ,
2012-07-22 12:25:09 -07:00
' ~FG~SB ' if ret_values [ ENTRY_NEW ] else ' ' , ret_values [ ENTRY_NEW ] ,
' ~FY~SB ' if ret_values [ ENTRY_UPDATED ] else ' ' , ret_values [ ENTRY_UPDATED ] ,
' ~SB ' if ret_values [ ENTRY_SAME ] else ' ' , ret_values [ ENTRY_SAME ] ,
' ~FR~SB ' if ret_values [ ENTRY_ERR ] else ' ' , ret_values [ ENTRY_ERR ] ,
2012-05-07 17:16:32 -07:00
len ( self . fpf . entries ) ) )
2012-03-26 12:40:13 -07:00
self . feed . update_all_statistics ( full = bool ( ret_values [ ENTRY_NEW ] ) , force = self . options [ ' force ' ] )
2010-09-22 15:57:55 -04:00
self . feed . trim_feed ( )
2010-07-23 18:56:29 -04:00
self . feed . save_feed_history ( 200 , " OK " )
2012-03-27 11:19:53 -07:00
if self . options [ ' verbose ' ] :
logging . debug ( u ' ---> [ %-30s ] ~FBTIME: feed parse in ~FM %.4s s ' % (
2012-03-28 12:05:51 -07:00
self . feed . title [ : 30 ] , time . time ( ) - start ) )
2012-03-26 17:21:49 -07:00
2009-08-29 19:34:42 +00:00
return FEED_OK , ret_values
2010-01-21 13:12:29 -05:00
2009-08-29 19:34:42 +00:00
class Dispatcher :
def __init__ ( self , options , num_threads ) :
self . options = options
self . entry_stats = {
ENTRY_NEW : 0 ,
ENTRY_UPDATED : 0 ,
ENTRY_SAME : 0 ,
ENTRY_ERR : 0 }
self . feed_stats = {
FEED_OK : 0 ,
FEED_SAME : 0 ,
FEED_ERRPARSE : 0 ,
FEED_ERRHTTP : 0 ,
FEED_ERREXC : 0 }
self . feed_trans = {
FEED_OK : ' ok ' ,
FEED_SAME : ' unchanged ' ,
FEED_ERRPARSE : ' cant_parse ' ,
FEED_ERRHTTP : ' http_error ' ,
FEED_ERREXC : ' exception ' }
self . feed_keys = sorted ( self . feed_trans . keys ( ) )
2009-09-10 03:33:05 +00:00
self . num_threads = num_threads
2010-10-10 23:55:00 -04:00
self . time_start = datetime . datetime . utcnow ( )
2009-09-16 03:54:33 +00:00
self . workers = [ ]
2009-08-29 19:34:42 +00:00
2010-10-06 22:43:05 -04:00
def refresh_feed ( self , feed_id ) :
2010-12-23 13:29:31 -05:00
""" Update feed, since it may have changed """
2011-02-10 11:13:42 -05:00
return Feed . objects . using ( ' default ' ) . get ( pk = feed_id )
2010-10-06 22:43:05 -04:00
2009-09-16 03:54:33 +00:00
def process_feed_wrapper ( self , feed_queue ) :
2010-08-25 19:10:55 -04:00
delta = None
2009-09-16 02:22:27 +00:00
current_process = multiprocessing . current_process ( )
2010-04-09 16:37:19 -04:00
identity = " X "
2012-03-27 17:34:39 -07:00
feed = None
2012-09-06 17:16:01 -07:00
2010-04-09 16:37:19 -04:00
if current_process . _identity :
identity = current_process . _identity [ 0 ]
2010-12-23 13:29:31 -05:00
2010-10-03 18:04:40 -04:00
for feed_id in feed_queue :
2012-09-06 17:16:01 -07:00
start_duration = time . time ( )
feed_fetch_duration = None
feed_process_duration = None
page_duration = None
icon_duration = None
2012-09-06 22:31:44 -07:00
feed_code = None
2012-09-06 17:16:01 -07:00
2010-04-27 13:44:53 -04:00
ret_entries = {
ENTRY_NEW : 0 ,
ENTRY_UPDATED : 0 ,
ENTRY_SAME : 0 ,
ENTRY_ERR : 0
}
2012-01-04 18:47:40 -08:00
start_time = time . time ( )
2011-02-06 15:04:21 -05:00
ret_feed = FEED_ERREXC
2009-09-10 03:33:05 +00:00
try :
2010-11-05 20:34:17 -04:00
feed = self . refresh_feed ( feed_id )
2012-02-26 22:33:06 -08:00
skip = False
2012-02-24 16:48:58 -08:00
if self . options . get ( ' fake ' ) :
2012-02-26 22:33:06 -08:00
skip = True
weight = " - "
2012-02-27 16:36:39 -08:00
quick = " - "
rand = " - "
2012-03-27 18:37:04 -07:00
elif ( self . options . get ( ' quick ' ) and not self . options [ ' force ' ] and
feed . known_good and feed . fetched_once and not feed . is_push ) :
2012-02-26 22:33:06 -08:00
weight = feed . stories_last_month * feed . num_subscribers
random_weight = random . randint ( 1 , max ( weight , 1 ) )
2012-03-19 15:46:59 -07:00
quick = float ( self . options . get ( ' quick ' , 0 ) )
2012-02-26 22:33:06 -08:00
rand = random . random ( )
if random_weight < 100 and rand < quick :
skip = True
if skip :
2012-02-27 16:36:39 -08:00
logging . debug ( ' ---> [ %-30s ] ~BGFaking fetch, skipping ( %s /month, %s subs, %s < %s )... ' % (
2012-03-20 16:46:38 -07:00
feed . title [ : 30 ] ,
2012-02-26 22:33:06 -08:00
weight ,
2012-02-27 16:36:39 -08:00
feed . num_subscribers ,
rand , quick ) )
2012-02-24 16:43:08 -08:00
continue
2012-09-06 17:16:01 -07:00
2010-10-03 18:04:40 -04:00
ffeed = FetchFeed ( feed_id , self . options )
2010-04-27 13:44:53 -04:00
ret_feed , fetched_feed = ffeed . fetch ( )
2012-09-06 17:16:01 -07:00
feed_fetch_duration = time . time ( ) - start_duration
2010-08-18 21:54:33 -04:00
2010-08-30 22:42:44 -04:00
if ( ( fetched_feed and ret_feed == FEED_OK ) or self . options [ ' force ' ] ) :
2010-10-11 13:19:42 -04:00
pfeed = ProcessFeed ( feed_id , fetched_feed , self . options )
2010-04-27 13:44:53 -04:00
ret_feed , ret_entries = pfeed . process ( )
2012-03-27 17:34:39 -07:00
feed = pfeed . feed
2012-09-06 17:16:01 -07:00
feed_process_duration = time . time ( ) - start_duration
2010-10-03 17:53:35 -04:00
2012-02-01 17:59:46 -08:00
if ret_entries . get ( ENTRY_NEW ) or self . options [ ' force ' ] :
2012-03-26 17:21:49 -07:00
start = time . time ( )
2012-06-18 15:59:31 -07:00
if not feed . known_good or not feed . fetched_once :
2012-02-01 17:59:46 -08:00
feed . known_good = True
2012-06-18 15:59:31 -07:00
feed . fetched_once = True
2012-03-27 17:34:39 -07:00
feed = feed . save ( )
2012-08-16 22:40:54 -07:00
# MUserStory.delete_old_stories(feed_id=feed.pk)
2012-11-26 10:39:10 -08:00
if random . random ( ) < = 0.01 :
feed . sync_redis ( )
2010-11-05 12:53:02 -04:00
try :
2010-11-10 18:22:33 -05:00
self . count_unreads_for_subscribers ( feed )
2010-11-05 12:53:02 -04:00
except TimeoutError :
2012-03-20 16:46:38 -07:00
logging . debug ( ' ---> [ %-30s ] Unread count took too long... ' % ( feed . title [ : 30 ] , ) )
2012-03-27 11:19:53 -07:00
if self . options [ ' verbose ' ] :
logging . debug ( u ' ---> [ %-30s ] ~FBTIME: unread count in ~FM %.4s s ' % (
2012-03-28 12:05:51 -07:00
feed . title [ : 30 ] , time . time ( ) - start ) )
2010-09-07 15:42:22 -07:00
# if ret_entries.get(ENTRY_NEW) or ret_entries.get(ENTRY_UPDATED) or self.options['force']:
# feed.get_stories(force=True)
2010-07-07 18:40:10 -04:00
except KeyboardInterrupt :
break
2010-07-08 01:07:37 -04:00
except urllib2 . HTTPError , e :
2012-03-27 17:34:39 -07:00
logging . debug ( ' ---> [ %-30s ] ~FRFeed throws HTTP error: ~SB %s ' % ( unicode ( feed_id ) [ : 30 ] , e . fp . read ( ) ) )
2010-07-08 11:37:54 -04:00
feed . save_feed_history ( e . code , e . msg , e . fp . read ( ) )
2010-08-23 07:58:09 -04:00
fetched_feed = None
2010-10-08 16:33:53 -04:00
except Feed . DoesNotExist , e :
2012-03-27 17:34:39 -07:00
logging . debug ( ' ---> [ %-30s ] ~FRFeed is now gone... ' % ( unicode ( feed_id ) [ : 30 ] ) )
2010-11-03 21:41:43 -04:00
continue
2010-11-05 20:34:17 -04:00
except TimeoutError , e :
2012-03-28 18:42:35 -07:00
logging . debug ( ' ---> [ %-30s ] ~FRFeed fetch timed out... ' % ( feed . title [ : 30 ] ) )
2011-01-26 17:47:10 -05:00
feed . save_feed_history ( 505 , ' Timeout ' , ' ' )
2012-09-06 22:31:44 -07:00
feed_code = 505
2010-11-05 20:34:17 -04:00
fetched_feed = None
2010-07-08 01:07:37 -04:00
except Exception , e :
2010-11-05 20:34:17 -04:00
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
2010-07-06 13:21:12 -04:00
tb = traceback . format_exc ( )
2010-12-07 23:51:58 -05:00
logging . error ( tb )
2010-11-05 20:34:17 -04:00
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
2010-04-19 12:42:32 -04:00
ret_feed = FEED_ERREXC
2012-10-25 16:09:06 -07:00
feed = Feed . get_by_id ( getattr ( feed , ' pk ' , feed_id ) )
2010-07-08 11:37:54 -04:00
feed . save_feed_history ( 500 , " Error " , tb )
2012-09-06 22:31:44 -07:00
feed_code = 500
2010-08-23 07:58:09 -04:00
fetched_feed = None
2012-04-12 15:41:19 -07:00
mail_feed_error_to_admin ( feed , e , local_vars = locals ( ) )
2012-10-25 16:23:35 -07:00
settings . RAVEN_CLIENT . captureException ( e )
2012-09-06 22:31:44 -07:00
if not feed_code :
if ret_feed == FEED_OK :
feed_code = 200
elif ret_feed == FEED_SAME :
feed_code = 304
elif ret_feed == FEED_ERRHTTP :
feed_code = 400
if ret_feed == FEED_ERREXC :
feed_code = 500
elif ret_feed == FEED_ERRPARSE :
feed_code = 550
elif ret_feed == FEED_ERRPARSE :
feed_code = 550
2012-03-27 17:34:39 -07:00
feed = self . refresh_feed ( feed . pk )
2010-08-30 22:42:44 -04:00
if ( ( self . options [ ' force ' ] ) or
2012-03-05 13:12:50 -08:00
( random . random ( ) > .9 ) or
2010-08-30 22:42:44 -04:00
( fetched_feed and
feed . feed_link and
2011-09-04 10:59:29 -07:00
feed . has_page and
2010-08-30 22:42:44 -04:00
( ret_feed == FEED_OK or
( ret_feed == FEED_SAME and feed . stories_last_month > 10 ) ) ) ) :
2010-09-07 14:41:11 -07:00
2012-03-20 16:46:38 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYFetching page: %s ' % ( feed . title [ : 30 ] , feed . feed_link ) )
2011-12-20 22:30:55 -08:00
page_importer = PageImporter ( feed )
2011-01-17 14:20:36 -05:00
try :
2012-07-30 11:12:58 -07:00
page_data = page_importer . fetch_page ( )
2012-09-06 17:16:01 -07:00
page_duration = time . time ( ) - start_duration
2011-01-29 22:01:09 -05:00
except TimeoutError , e :
2012-03-20 16:46:38 -07:00
logging . debug ( ' ---> [ %-30s ] ~FRPage fetch timed out... ' % ( feed . title [ : 30 ] ) )
2012-07-30 21:39:21 -07:00
page_data = None
2011-01-29 22:01:09 -05:00
feed . save_page_history ( 555 , ' Timeout ' , ' ' )
2011-01-17 14:20:36 -05:00
except Exception , e :
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
tb = traceback . format_exc ( )
logging . error ( tb )
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
2011-01-29 22:01:09 -05:00
feed . save_page_history ( 550 , " Page Error " , tb )
2011-01-17 14:20:36 -05:00
fetched_feed = None
2012-07-30 11:12:58 -07:00
page_data = None
2012-04-12 15:41:19 -07:00
mail_feed_error_to_admin ( feed , e , local_vars = locals ( ) )
2012-10-25 16:23:35 -07:00
settings . RAVEN_CLIENT . captureException ( e )
2012-04-24 17:40:34 -07:00
feed = self . refresh_feed ( feed . pk )
2012-03-20 16:46:38 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYFetching icon: %s ' % ( feed . title [ : 30 ] , feed . feed_link ) )
2012-07-30 11:12:58 -07:00
icon_importer = IconImporter ( feed , page_data = page_data , force = self . options [ ' force ' ] )
2011-01-27 19:05:50 -05:00
try :
icon_importer . save ( )
2012-09-06 17:16:01 -07:00
icon_duration = time . time ( ) - start_duration
2011-01-30 12:53:05 -05:00
except TimeoutError , e :
2012-03-20 16:46:38 -07:00
logging . debug ( ' ---> [ %-30s ] ~FRIcon fetch timed out... ' % ( feed . title [ : 30 ] ) )
2011-01-30 12:53:05 -05:00
feed . save_page_history ( 556 , ' Timeout ' , ' ' )
2011-01-27 19:05:50 -05:00
except Exception , e :
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
tb = traceback . format_exc ( )
logging . error ( tb )
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
# feed.save_feed_history(560, "Icon Error", tb)
2012-04-12 15:41:19 -07:00
mail_feed_error_to_admin ( feed , e , local_vars = locals ( ) )
2012-10-25 16:23:35 -07:00
settings . RAVEN_CLIENT . captureException ( e )
2011-12-04 13:55:57 -08:00
else :
2012-03-20 16:46:38 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FBSkipping page fetch: ( %s on %s stories) %s ' % ( feed . title [ : 30 ] , self . feed_trans [ ret_feed ] , feed . stories_last_month , ' ' if feed . has_page else ' [HAS NO PAGE] ' ) )
2011-12-04 13:55:57 -08:00
2012-03-27 17:34:39 -07:00
feed = self . refresh_feed ( feed . pk )
2012-01-04 18:47:40 -08:00
delta = time . time ( ) - start_time
2010-04-29 13:35:46 -04:00
2012-01-04 18:47:40 -08:00
feed . last_load_time = round ( delta )
2010-08-09 20:44:36 -04:00
feed . fetched_once = True
2010-07-25 23:13:27 -04:00
try :
2012-03-27 17:34:39 -07:00
feed = feed . save ( )
2010-07-28 01:14:25 -04:00
except IntegrityError :
2012-03-20 16:46:38 -07:00
logging . debug ( " ---> [ %-30s ] ~FRIntegrityError on feed: %s " % ( feed . title [ : 30 ] , feed . feed_address , ) )
2010-04-29 13:35:46 -04:00
2011-11-06 12:28:06 -08:00
if ret_entries [ ENTRY_NEW ] :
2011-11-05 17:08:31 -07:00
self . publish_to_subscribers ( feed )
2012-07-22 12:25:09 -07:00
done_msg = ( u ' %2s ---> [ %-30s ] ~FYProcessed in ~FM~SB %.4s s~FY~SN (~FB %s ~FY) [ %s ] ' % (
2012-01-04 18:47:40 -08:00
identity , feed . feed_title [ : 30 ] , delta ,
2011-01-29 22:01:09 -05:00
feed . pk , self . feed_trans [ ret_feed ] , ) )
2010-04-06 16:56:47 -04:00
logging . debug ( done_msg )
2012-09-06 17:16:01 -07:00
total_duration = time . time ( ) - start_duration
2012-09-06 21:43:18 -07:00
MAnalyticsFetcher . add ( feed_id = feed . pk , feed_fetch = feed_fetch_duration ,
feed_process = feed_process_duration ,
page = page_duration , icon = icon_duration ,
2012-09-06 22:31:44 -07:00
total = total_duration , feed_code = feed_code )
2010-04-06 16:56:47 -04:00
2009-09-10 03:33:05 +00:00
self . feed_stats [ ret_feed ] + = 1
for key , val in ret_entries . items ( ) :
self . entry_stats [ key ] + = val
2012-03-21 16:05:52 -07:00
2012-03-22 10:39:24 -07:00
if len ( feed_queue ) == 1 :
return feed
2010-08-17 17:45:51 -04:00
2011-09-01 09:11:29 -07:00
# time_taken = datetime.datetime.utcnow() - self.time_start
2010-11-05 12:53:02 -04:00
2011-11-05 17:08:31 -07:00
def publish_to_subscribers ( self , feed ) :
try :
r = redis . Redis ( connection_pool = settings . REDIS_POOL )
listeners_count = r . publish ( str ( feed . pk ) , ' story:new ' )
2011-11-06 12:28:06 -08:00
if listeners_count :
2012-03-28 18:42:35 -07:00
logging . debug ( " ---> [ %-30s ] ~FMPublished to %s subscribers " % ( feed . title [ : 30 ] , listeners_count ) )
2011-11-05 17:08:31 -07:00
except redis . ConnectionError :
2012-03-28 18:42:35 -07:00
logging . debug ( " ***> [ %-30s ] ~BMRedis is unavailable for real-time. " % ( feed . title [ : 30 ] , ) )
2011-11-05 17:08:31 -07:00
2010-11-05 12:53:02 -04:00
def count_unreads_for_subscribers ( self , feed ) :
UNREAD_CUTOFF = datetime . datetime . utcnow ( ) - datetime . timedelta ( days = settings . DAYS_OF_UNREAD )
2010-11-10 18:22:33 -05:00
user_subs = UserSubscription . objects . filter ( feed = feed ,
active = True ,
user__profile__last_seen_on__gte = UNREAD_CUTOFF ) \
. order_by ( ' -last_read_date ' )
2012-10-29 12:25:28 -07:00
if not user_subs . count ( ) :
return
2010-11-05 12:53:02 -04:00
for sub in user_subs :
2012-04-24 16:34:28 -07:00
if not sub . needs_unread_recalc :
sub . needs_unread_recalc = True
sub . save ( )
2012-08-17 00:10:17 -07:00
2010-11-10 18:22:33 -05:00
if self . options [ ' compute_scores ' ] :
2012-10-29 12:25:28 -07:00
stories = MStory . objects ( story_feed_id = feed . pk ,
2012-10-30 10:57:12 -07:00
story_date__gte = UNREAD_CUTOFF ) \
. read_preference ( pymongo . ReadPreference . PRIMARY )
2012-10-29 12:25:28 -07:00
stories = Feed . format_stories ( stories , feed . pk )
2012-08-17 00:10:17 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYComputing scores: ~SB %s stories~SN with ~SB %s subscribers ~SN( %s / %s / %s ) ' % (
2012-10-29 12:25:28 -07:00
feed . title [ : 30 ] , len ( stories ) , user_subs . count ( ) ,
2012-08-17 00:10:17 -07:00
feed . num_subscribers , feed . active_subscribers , feed . premium_subscribers ) )
2012-10-29 12:25:28 -07:00
self . calculate_feed_scores_with_stories ( user_subs , stories )
2012-09-10 17:50:36 -07:00
elif self . options . get ( ' mongodb_replication_lag ' ) :
2012-08-17 00:10:17 -07:00
logging . debug ( u ' ---> [ %-30s ] ~BR~FYSkipping computing scores: ~SB %s seconds~SN of mongodb lag ' % (
feed . title [ : 30 ] , self . options . get ( ' mongodb_replication_lag ' ) ) )
@timelimit ( 10 )
2012-10-29 12:25:28 -07:00
def calculate_feed_scores_with_stories ( self , user_subs , stories ) :
2012-08-17 00:10:17 -07:00
for sub in user_subs :
silent = False if self . options [ ' verbose ' ] > = 2 else True
2012-10-29 12:25:28 -07:00
sub . calculate_feed_scores ( silent = silent , stories = stories )
2010-11-05 12:53:02 -04:00
2010-04-25 18:31:54 -04:00
def add_jobs ( self , feeds_queue , feeds_count = 1 ) :
2009-08-29 19:34:42 +00:00
""" adds a feed processing job to the pool
"""
2010-04-09 16:37:19 -04:00
self . feeds_queue = feeds_queue
2010-04-23 21:19:19 -04:00
self . feeds_count = feeds_count
2009-08-29 19:34:42 +00:00
2009-09-10 03:33:05 +00:00
def run_jobs ( self ) :
2010-04-09 16:37:19 -04:00
if self . options [ ' single_threaded ' ] :
2012-03-21 16:05:52 -07:00
return self . process_feed_wrapper ( self . feeds_queue [ 0 ] )
2010-04-09 16:37:19 -04:00
else :
for i in range ( self . num_threads ) :
feed_queue = self . feeds_queue [ i ]
2010-07-05 14:26:35 -04:00
self . workers . append ( multiprocessing . Process ( target = self . process_feed_wrapper ,
args = ( feed_queue , ) ) )
2010-04-09 16:37:19 -04:00
for i in range ( self . num_threads ) :
self . workers [ i ] . start ( )
2009-09-12 20:42:38 +00:00