2010-07-08 11:37:54 -04:00
# from apps.rss_feeds.models import FeedXML
2009-08-29 19:34:42 +00:00
from django . core . cache import cache
2010-08-21 13:57:39 -04:00
from django . conf import settings
2010-12-16 16:52:00 -05:00
from django . db import IntegrityError
2010-12-16 17:00:08 -05:00
# from mongoengine.queryset import Q
2010-09-19 11:30:18 -04:00
from apps . reader . models import UserSubscription , MUserStory
2010-10-03 17:53:35 -04:00
from apps . rss_feeds . models import Feed , MStory
2011-01-27 19:05:50 -05:00
from apps . rss_feeds . page_importer import PageImporter
from apps . rss_feeds . icon_importer import IconImporter
2010-04-09 16:37:19 -04:00
from utils import feedparser
2009-12-18 20:47:44 +00:00
from utils . story_functions import pre_process_story
2010-08-16 12:52:39 -04:00
from utils import log as logging
2011-02-15 21:08:40 -05:00
from utils . feed_functions import timelimit , TimeoutError , mail_feed_error_to_admin , utf8encode
2011-11-04 09:45:10 -07:00
from utils . story_functions import bunch
2009-08-29 19:34:42 +00:00
import time
import datetime
import traceback
2009-09-16 02:22:27 +00:00
import multiprocessing
2010-07-05 14:26:35 -04:00
import urllib2
2010-07-21 11:38:33 -04:00
import xml . sax
2011-11-05 17:08:31 -07:00
import redis
2009-08-29 19:34:42 +00:00
# Refresh feed code adapted from Feedjack.
# http://feedjack.googlecode.com
2010-06-08 11:19:07 -04:00
URL = ' http://www.newsblur.com/ '
2009-08-29 19:34:42 +00:00
SLOWFEED_WARNING = 10
ENTRY_NEW , ENTRY_UPDATED , ENTRY_SAME , ENTRY_ERR = range ( 4 )
FEED_OK , FEED_SAME , FEED_ERRPARSE , FEED_ERRHTTP , FEED_ERREXC = range ( 5 )
def mtime ( ttime ) :
""" datetime auxiliar function.
"""
return datetime . datetime . fromtimestamp ( time . mktime ( ttime ) )
2010-08-30 23:55:24 -04:00
2009-08-29 19:34:42 +00:00
class FetchFeed :
2010-10-03 18:04:40 -04:00
def __init__ ( self , feed_id , options ) :
2010-12-23 13:29:31 -05:00
self . feed = Feed . objects . get ( pk = feed_id )
2009-08-29 19:34:42 +00:00
self . options = options
self . fpf = None
2010-08-30 23:55:24 -04:00
2011-02-02 13:07:12 -05:00
@timelimit ( 20 )
2009-08-29 19:34:42 +00:00
def fetch ( self ) :
2010-10-07 19:07:43 -04:00
"""
Uses feedparser to download the feed . Will be parsed later .
2009-08-29 19:34:42 +00:00
"""
2010-08-17 17:45:51 -04:00
identity = self . get_identity ( )
2010-09-07 14:41:11 -07:00
log_msg = u ' %2s ---> [ %-30s ] Fetching feed ( %d ) ' % ( identity ,
2010-10-18 18:57:07 -04:00
unicode ( self . feed ) [ : 30 ] ,
self . feed . id )
2010-08-15 12:04:26 -04:00
logging . debug ( log_msg )
2010-04-27 13:44:53 -04:00
2010-09-08 18:30:46 -07:00
self . feed . set_next_scheduled_update ( )
2010-08-30 19:57:27 -04:00
etag = self . feed . etag
2010-07-08 01:26:03 -04:00
modified = self . feed . last_modified . utctimetuple ( ) [ : 7 ] if self . feed . last_modified else None
2010-08-30 19:57:27 -04:00
2010-10-03 17:22:58 -04:00
if self . options . get ( ' force ' ) or not self . feed . fetched_once :
2010-08-30 19:57:27 -04:00
modified = None
etag = None
2011-11-10 10:29:25 -08:00
USER_AGENT = ' NewsBlur Feed Fetcher ( %s subscriber %s ) - %s (Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_1) AppleWebKit/534.48.3 (KHTML, like Gecko) Version/5.1 Safari/534.48.3) ' % (
2011-01-29 22:01:09 -05:00
self . feed . num_subscribers ,
' s ' if self . feed . num_subscribers != 1 else ' ' ,
URL
)
2011-09-01 09:11:29 -07:00
2010-07-08 01:26:03 -04:00
self . fpf = feedparser . parse ( self . feed . feed_address ,
agent = USER_AGENT ,
2010-08-30 19:57:27 -04:00
etag = etag ,
2010-07-08 01:26:03 -04:00
modified = modified )
2010-07-08 11:37:54 -04:00
2010-04-27 13:44:53 -04:00
return FEED_OK , self . fpf
2010-08-17 17:45:51 -04:00
def get_identity ( self ) :
identity = " X "
current_process = multiprocessing . current_process ( )
if current_process . _identity :
identity = current_process . _identity [ 0 ]
return identity
2009-08-29 19:34:42 +00:00
class ProcessFeed :
2010-10-11 13:19:42 -04:00
def __init__ ( self , feed_id , fpf , options ) :
2010-10-03 18:19:23 -04:00
self . feed_id = feed_id
2009-08-29 19:34:42 +00:00
self . options = options
self . fpf = fpf
2010-09-07 14:41:11 -07:00
self . entry_trans = {
ENTRY_NEW : ' new ' ,
ENTRY_UPDATED : ' updated ' ,
ENTRY_SAME : ' same ' ,
ENTRY_ERR : ' error ' }
self . entry_keys = sorted ( self . entry_trans . keys ( ) )
2010-10-03 18:19:23 -04:00
def refresh_feed ( self ) :
2011-02-10 11:13:42 -05:00
self . feed = Feed . objects . using ( ' default ' ) . get ( pk = self . feed_id )
2010-10-03 18:19:23 -04:00
2010-10-03 18:16:32 -04:00
def process ( self , first_run = True ) :
2009-08-29 19:34:42 +00:00
""" Downloads and parses a feed.
"""
2010-10-03 18:19:23 -04:00
self . refresh_feed ( )
2009-08-29 19:34:42 +00:00
ret_values = {
ENTRY_NEW : 0 ,
ENTRY_UPDATED : 0 ,
ENTRY_SAME : 0 ,
ENTRY_ERR : 0 }
2010-08-29 13:23:50 -04:00
# logging.debug(u' ---> [%d] Processing %s' % (self.feed.id, self.feed.feed_title))
2010-10-03 17:48:44 -04:00
2010-10-03 17:53:35 -04:00
self . feed . fetched_once = True
2010-10-10 23:55:00 -04:00
self . feed . last_update = datetime . datetime . utcnow ( )
2010-10-02 17:06:36 -04:00
2009-08-29 19:34:42 +00:00
if hasattr ( self . fpf , ' status ' ) :
if self . options [ ' verbose ' ] :
2010-09-07 14:41:11 -07:00
logging . debug ( u ' ---> [ %-30s ] Fetched feed, HTTP status %d : %s (bozo: %s ) ' % ( unicode ( self . feed ) [ : 30 ] ,
2009-08-29 19:34:42 +00:00
self . fpf . status ,
2010-08-25 19:37:07 -04:00
self . feed . feed_address ,
self . fpf . bozo ) )
2010-09-07 15:42:22 -07:00
if self . fpf . bozo and self . fpf . status != 304 :
2010-10-03 17:48:44 -04:00
logging . debug ( u ' ---> [ %-30s ] BOZO exception: %s ( %s entries) ' % (
2010-09-07 14:41:11 -07:00
unicode ( self . feed ) [ : 30 ] ,
2010-10-03 17:48:44 -04:00
self . fpf . bozo_exception ,
len ( self . fpf . entries ) ) )
2009-08-29 19:34:42 +00:00
if self . fpf . status == 304 :
2010-04-19 12:09:04 -04:00
self . feed . save ( )
2010-07-08 11:37:54 -04:00
self . feed . save_feed_history ( 304 , " Not modified " )
2009-08-29 19:34:42 +00:00
return FEED_SAME , ret_values
2010-10-03 18:16:32 -04:00
if self . fpf . status in ( 302 , 301 ) :
2011-02-05 15:34:43 -05:00
if not self . fpf . href . endswith ( ' feedburner.com/atom.xml ' ) :
self . feed . feed_address = self . fpf . href
2010-10-03 18:19:23 -04:00
if first_run :
2010-10-03 22:38:53 -04:00
self . feed . schedule_feed_fetch_immediately ( )
2010-10-06 22:34:28 -04:00
if not self . fpf . entries :
2010-10-06 22:43:05 -04:00
self . feed . save ( )
2010-10-06 22:34:28 -04:00
self . feed . save_feed_history ( self . fpf . status , " HTTP Redirect " )
return FEED_ERRHTTP , ret_values
2010-10-03 18:16:32 -04:00
2009-08-29 19:34:42 +00:00
if self . fpf . status > = 400 :
2010-12-10 09:32:06 -05:00
logging . debug ( " ---> [ %-30s ] HTTP Status code: %s . Checking address... " % ( unicode ( self . feed ) [ : 30 ] , self . fpf . status ) )
fixed_feed = self . feed . check_feed_address_for_feed_link ( )
if not fixed_feed :
self . feed . save_feed_history ( self . fpf . status , " HTTP Error " )
else :
self . feed . schedule_feed_fetch_immediately ( )
2010-04-19 12:09:04 -04:00
self . feed . save ( )
2009-08-29 19:34:42 +00:00
return FEED_ERRHTTP , ret_values
2010-08-04 18:51:29 -04:00
if self . fpf . bozo and isinstance ( self . fpf . bozo_exception , feedparser . NonXMLContentType ) :
if not self . fpf . entries :
2010-10-08 17:23:14 -04:00
logging . debug ( " ---> [ %-30s ] Feed is Non-XML. %s entries. Checking address... " % ( unicode ( self . feed ) [ : 30 ] , len ( self . fpf . entries ) ) )
2010-08-25 19:13:28 -04:00
fixed_feed = self . feed . check_feed_address_for_feed_link ( )
if not fixed_feed :
self . feed . save_feed_history ( 502 , ' Non-xml feed ' , self . fpf . bozo_exception )
2010-10-03 22:38:53 -04:00
else :
self . feed . schedule_feed_fetch_immediately ( )
2010-10-03 17:22:58 -04:00
self . feed . save ( )
2010-08-04 18:51:29 -04:00
return FEED_ERRPARSE , ret_values
elif self . fpf . bozo and isinstance ( self . fpf . bozo_exception , xml . sax . _exceptions . SAXException ) :
2010-10-08 17:23:14 -04:00
logging . debug ( " ---> [ %-30s ] Feed is Bad XML (SAX). %s entries. Checking address... " % ( unicode ( self . feed ) [ : 30 ] , len ( self . fpf . entries ) ) )
2010-08-04 18:51:29 -04:00
if not self . fpf . entries :
2010-08-25 19:10:55 -04:00
fixed_feed = self . feed . check_feed_address_for_feed_link ( )
if not fixed_feed :
self . feed . save_feed_history ( 503 , ' SAX Exception ' , self . fpf . bozo_exception )
2010-10-03 22:38:53 -04:00
else :
self . feed . schedule_feed_fetch_immediately ( )
2010-10-03 17:22:58 -04:00
self . feed . save ( )
2010-08-04 18:51:29 -04:00
return FEED_ERRPARSE , ret_values
2009-08-29 19:34:42 +00:00
# the feed has changed (or it is the first time we parse it)
# saving the etag and last_modified fields
2010-09-17 13:24:23 -04:00
self . feed . etag = self . fpf . get ( ' etag ' )
if self . feed . etag :
self . feed . etag = self . feed . etag [ : 255 ]
2009-08-29 19:34:42 +00:00
# some times this is None (it never should) *sigh*
if self . feed . etag is None :
self . feed . etag = ' '
try :
self . feed . last_modified = mtime ( self . fpf . modified )
except :
pass
2010-01-26 20:39:11 -05:00
2011-02-04 11:20:29 -05:00
self . fpf . entries = self . fpf . entries [ : 50 ]
2010-01-26 20:39:11 -05:00
self . feed . feed_title = self . fpf . feed . get ( ' title ' , self . feed . feed_title )
2011-02-05 22:27:47 -05:00
tagline = self . fpf . feed . get ( ' tagline ' , self . feed . data . feed_tagline )
if tagline :
2011-02-06 15:04:21 -05:00
self . feed . data . feed_tagline = utf8encode ( tagline )
2011-02-05 22:27:47 -05:00
self . feed . data . save ( )
2011-09-01 09:34:57 -07:00
if not self . feed . feed_link_locked :
self . feed . feed_link = self . fpf . feed . get ( ' link ' ) or self . fpf . feed . get ( ' id ' ) or self . feed . feed_link
2011-02-04 11:20:29 -05:00
2010-10-10 23:55:00 -04:00
self . feed . last_update = datetime . datetime . utcnow ( )
2010-08-25 19:22:53 -04:00
2009-08-29 19:34:42 +00:00
guids = [ ]
for entry in self . fpf . entries :
if entry . get ( ' id ' , ' ' ) :
guids . append ( entry . get ( ' id ' , ' ' ) )
2011-02-05 21:33:24 -05:00
elif entry . get ( ' link ' ) :
2009-08-29 19:34:42 +00:00
guids . append ( entry . link )
2011-02-05 21:33:24 -05:00
elif entry . get ( ' title ' ) :
guids . append ( entry . title )
2010-10-18 18:57:07 -04:00
self . feed . save ( )
2009-08-29 19:34:42 +00:00
# Compare new stories to existing stories, adding and updating
2010-12-16 17:10:13 -05:00
start_date = datetime . datetime . utcnow ( )
2010-10-10 23:55:00 -04:00
# end_date = datetime.datetime.utcnow()
2010-01-26 20:23:41 -05:00
story_guids = [ ]
2009-12-18 20:47:44 +00:00
for entry in self . fpf . entries :
story = pre_process_story ( entry )
2010-12-16 17:10:13 -05:00
if story . get ( ' published ' ) < start_date :
start_date = story . get ( ' published ' )
2010-09-28 20:43:12 -04:00
# if story.get('published') > end_date:
# end_date = story.get('published')
2010-01-28 13:28:27 -05:00
story_guids . append ( story . get ( ' guid ' ) or story . get ( ' link ' ) )
2011-11-04 09:45:10 -07:00
if self . options [ ' slave_db ' ] :
slave_db = self . options [ ' slave_db ' ]
stories_db_orig = slave_db . stories . find ( {
" story_feed_id " : self . feed . pk ,
" story_date " : {
" $gte " : start_date ,
} ,
} ) . limit ( len ( story_guids ) )
existing_stories = [ ]
for story in stories_db_orig :
existing_stories . append ( bunch ( story ) )
else :
existing_stories = list ( MStory . objects (
# story_guid__in=story_guids,
story_date__gte = start_date ,
story_feed_id = self . feed . pk
) . limit ( len ( story_guids ) ) )
2010-12-16 17:10:13 -05:00
2010-08-21 13:57:39 -04:00
# MStory.objects(
# (Q(story_date__gte=start_date) & Q(story_date__lte=end_date))
# | (Q(story_guid__in=story_guids)),
# story_feed=self.feed
# ).order_by('-story_date')
2010-10-11 13:19:42 -04:00
ret_values = self . feed . add_update_stories ( self . fpf . entries , existing_stories )
2010-09-07 14:41:11 -07:00
2011-11-04 09:45:10 -07:00
logging . debug ( u ' ---> [ %-30s ] ~FYParsed Feed: new~FG=~FG~SB %s ~SN~FY up~FG=~FY~SB %s ~SN same~FG=~FY %s err~FG=~FR~SB %s ' % (
2010-09-07 14:41:11 -07:00
unicode ( self . feed ) [ : 30 ] ,
2011-11-04 09:45:10 -07:00
ret_values [ ENTRY_NEW ] , ret_values [ ENTRY_UPDATED ] , ret_values [ ENTRY_SAME ] , ret_values [ ENTRY_ERR ] ) )
2010-11-05 20:34:17 -04:00
self . feed . update_all_statistics ( )
2010-09-22 15:57:55 -04:00
self . feed . trim_feed ( )
2010-07-23 18:56:29 -04:00
self . feed . save_feed_history ( 200 , " OK " )
2009-08-29 19:34:42 +00:00
return FEED_OK , ret_values
2010-01-21 13:12:29 -05:00
2009-08-29 19:34:42 +00:00
class Dispatcher :
def __init__ ( self , options , num_threads ) :
self . options = options
self . entry_stats = {
ENTRY_NEW : 0 ,
ENTRY_UPDATED : 0 ,
ENTRY_SAME : 0 ,
ENTRY_ERR : 0 }
self . feed_stats = {
FEED_OK : 0 ,
FEED_SAME : 0 ,
FEED_ERRPARSE : 0 ,
FEED_ERRHTTP : 0 ,
FEED_ERREXC : 0 }
self . feed_trans = {
FEED_OK : ' ok ' ,
FEED_SAME : ' unchanged ' ,
FEED_ERRPARSE : ' cant_parse ' ,
FEED_ERRHTTP : ' http_error ' ,
FEED_ERREXC : ' exception ' }
self . feed_keys = sorted ( self . feed_trans . keys ( ) )
2009-09-10 03:33:05 +00:00
self . num_threads = num_threads
2010-10-10 23:55:00 -04:00
self . time_start = datetime . datetime . utcnow ( )
2009-09-16 03:54:33 +00:00
self . workers = [ ]
2009-08-29 19:34:42 +00:00
2010-10-06 22:43:05 -04:00
def refresh_feed ( self , feed_id ) :
2010-12-23 13:29:31 -05:00
""" Update feed, since it may have changed """
2011-02-10 11:13:42 -05:00
return Feed . objects . using ( ' default ' ) . get ( pk = feed_id )
2010-10-06 22:43:05 -04:00
2009-09-16 03:54:33 +00:00
def process_feed_wrapper ( self , feed_queue ) :
2010-08-25 19:10:55 -04:00
delta = None
2009-09-16 02:22:27 +00:00
current_process = multiprocessing . current_process ( )
2010-04-09 16:37:19 -04:00
identity = " X "
if current_process . _identity :
identity = current_process . _identity [ 0 ]
2010-12-23 13:29:31 -05:00
2010-10-03 18:04:40 -04:00
for feed_id in feed_queue :
2010-04-27 13:44:53 -04:00
ret_entries = {
ENTRY_NEW : 0 ,
ENTRY_UPDATED : 0 ,
ENTRY_SAME : 0 ,
ENTRY_ERR : 0
}
2010-10-10 23:55:00 -04:00
start_time = datetime . datetime . utcnow ( )
2011-02-06 15:04:21 -05:00
ret_feed = FEED_ERREXC
2009-09-10 03:33:05 +00:00
try :
2010-11-05 20:34:17 -04:00
feed = self . refresh_feed ( feed_id )
2010-10-03 18:04:40 -04:00
ffeed = FetchFeed ( feed_id , self . options )
2010-04-27 13:44:53 -04:00
ret_feed , fetched_feed = ffeed . fetch ( )
2010-08-18 21:54:33 -04:00
2010-08-30 22:42:44 -04:00
if ( ( fetched_feed and ret_feed == FEED_OK ) or self . options [ ' force ' ] ) :
2010-10-11 13:19:42 -04:00
pfeed = ProcessFeed ( feed_id , fetched_feed , self . options )
2010-04-27 13:44:53 -04:00
ret_feed , ret_entries = pfeed . process ( )
2010-10-03 17:53:35 -04:00
2010-11-05 23:36:00 -04:00
feed = self . refresh_feed ( feed_id )
2010-10-03 17:53:35 -04:00
2010-09-19 11:30:18 -04:00
if ret_entries . get ( ENTRY_NEW ) or self . options [ ' force ' ] or not feed . fetched_once :
if not feed . fetched_once :
feed . fetched_once = True
feed . save ( )
2010-09-22 11:15:56 -04:00
MUserStory . delete_old_stories ( feed_id = feed . pk )
2010-11-05 12:53:02 -04:00
try :
2010-11-10 18:22:33 -05:00
self . count_unreads_for_subscribers ( feed )
2010-11-05 12:53:02 -04:00
except TimeoutError :
logging . debug ( ' ---> [ %-30s ] Unread count took too long... ' % ( unicode ( feed ) [ : 30 ] , ) )
2010-09-07 15:42:22 -07:00
cache . delete ( ' feed_stories: %s - %s - %s ' % ( feed . id , 0 , 25 ) )
# if ret_entries.get(ENTRY_NEW) or ret_entries.get(ENTRY_UPDATED) or self.options['force']:
# feed.get_stories(force=True)
2010-07-07 18:40:10 -04:00
except KeyboardInterrupt :
break
2010-07-08 01:07:37 -04:00
except urllib2 . HTTPError , e :
2010-07-08 11:37:54 -04:00
feed . save_feed_history ( e . code , e . msg , e . fp . read ( ) )
2010-08-23 07:58:09 -04:00
fetched_feed = None
2010-10-08 16:33:53 -04:00
except Feed . DoesNotExist , e :
2010-11-05 20:34:17 -04:00
logging . debug ( ' ---> [ %-30s ] Feed is now gone... ' % ( unicode ( feed_id ) [ : 30 ] ) )
2010-11-03 21:41:43 -04:00
continue
2010-11-05 20:34:17 -04:00
except TimeoutError , e :
logging . debug ( ' ---> [ %-30s ] Feed fetch timed out... ' % ( unicode ( feed ) [ : 30 ] ) )
2011-01-26 17:47:10 -05:00
feed . save_feed_history ( 505 , ' Timeout ' , ' ' )
2010-11-05 20:34:17 -04:00
fetched_feed = None
2010-07-08 01:07:37 -04:00
except Exception , e :
2010-11-05 20:34:17 -04:00
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
2010-07-06 13:21:12 -04:00
tb = traceback . format_exc ( )
2010-12-07 23:51:58 -05:00
logging . error ( tb )
2010-11-05 20:34:17 -04:00
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
2010-04-19 12:42:32 -04:00
ret_feed = FEED_ERREXC
2011-04-04 20:53:12 -04:00
feed = self . refresh_feed ( feed_id )
2010-07-08 11:37:54 -04:00
feed . save_feed_history ( 500 , " Error " , tb )
2010-08-23 07:58:09 -04:00
fetched_feed = None
2011-02-15 21:08:40 -05:00
mail_feed_error_to_admin ( feed , e )
2010-10-03 18:04:40 -04:00
2010-11-05 23:36:00 -04:00
feed = self . refresh_feed ( feed_id )
2010-08-30 22:42:44 -04:00
if ( ( self . options [ ' force ' ] ) or
( fetched_feed and
feed . feed_link and
2011-09-04 10:59:29 -07:00
feed . has_page and
2010-08-30 22:42:44 -04:00
( ret_feed == FEED_OK or
( ret_feed == FEED_SAME and feed . stories_last_month > 10 ) ) ) ) :
2010-09-07 14:41:11 -07:00
2011-01-29 22:01:09 -05:00
logging . debug ( u ' ---> [ %-30s ] Fetching page: %s ' % ( unicode ( feed ) [ : 30 ] , feed . feed_link ) )
2010-08-23 07:58:09 -04:00
page_importer = PageImporter ( feed . feed_link , feed )
2011-01-17 14:20:36 -05:00
try :
page_importer . fetch_page ( )
2011-01-29 22:01:09 -05:00
except TimeoutError , e :
logging . debug ( ' ---> [ %-30s ] Page fetch timed out... ' % ( unicode ( feed ) [ : 30 ] ) )
feed . save_page_history ( 555 , ' Timeout ' , ' ' )
2011-01-17 14:20:36 -05:00
except Exception , e :
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
tb = traceback . format_exc ( )
logging . error ( tb )
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
2011-01-29 22:01:09 -05:00
feed . save_page_history ( 550 , " Page Error " , tb )
2011-01-17 14:20:36 -05:00
fetched_feed = None
2011-02-15 21:08:40 -05:00
mail_feed_error_to_admin ( feed , e )
2011-01-27 19:05:50 -05:00
2011-01-29 22:01:09 -05:00
logging . debug ( u ' ---> [ %-30s ] Fetching icon: %s ' % ( unicode ( feed ) [ : 30 ] , feed . feed_link ) )
2011-01-29 19:16:40 -05:00
icon_importer = IconImporter ( feed , force = self . options [ ' force ' ] )
2011-01-27 19:05:50 -05:00
try :
icon_importer . save ( )
2011-01-30 12:53:05 -05:00
except TimeoutError , e :
logging . debug ( ' ---> [ %-30s ] Icon fetch timed out... ' % ( unicode ( feed ) [ : 30 ] ) )
feed . save_page_history ( 556 , ' Timeout ' , ' ' )
2011-01-27 19:05:50 -05:00
except Exception , e :
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
tb = traceback . format_exc ( )
logging . error ( tb )
logging . debug ( ' [ %d ] ! ------------------------- ' % ( feed_id , ) )
# feed.save_feed_history(560, "Icon Error", tb)
2011-02-15 21:08:40 -05:00
mail_feed_error_to_admin ( feed , e )
2011-01-17 14:20:36 -05:00
2010-11-05 23:36:00 -04:00
feed = self . refresh_feed ( feed_id )
2010-10-10 23:55:00 -04:00
delta = datetime . datetime . utcnow ( ) - start_time
2010-04-29 13:35:46 -04:00
2010-06-28 08:06:12 -04:00
feed . last_load_time = max ( 1 , delta . seconds )
2010-08-09 20:44:36 -04:00
feed . fetched_once = True
2010-07-25 23:13:27 -04:00
try :
feed . save ( )
2010-07-28 01:14:25 -04:00
except IntegrityError :
2010-09-07 14:41:11 -07:00
logging . debug ( " ---> [ %-30s ] IntegrityError on feed: %s " % ( unicode ( feed ) [ : 30 ] , feed . feed_address , ) )
2010-04-29 13:35:46 -04:00
2011-11-06 12:28:06 -08:00
if ret_entries [ ENTRY_NEW ] :
2011-11-05 17:08:31 -07:00
self . publish_to_subscribers ( feed )
2011-01-29 22:01:09 -05:00
done_msg = ( u ' %2s ---> [ %-30s ] Processed in %s ( %s ) [ %s ] ' % (
2010-09-07 14:41:11 -07:00
identity , feed . feed_title [ : 30 ] , unicode ( delta ) ,
2011-01-29 22:01:09 -05:00
feed . pk , self . feed_trans [ ret_feed ] , ) )
2010-04-06 16:56:47 -04:00
logging . debug ( done_msg )
2009-09-10 03:33:05 +00:00
self . feed_stats [ ret_feed ] + = 1
for key , val in ret_entries . items ( ) :
self . entry_stats [ key ] + = val
2010-08-17 17:45:51 -04:00
2011-09-01 09:11:29 -07:00
# time_taken = datetime.datetime.utcnow() - self.time_start
2010-11-05 12:53:02 -04:00
2011-11-05 17:08:31 -07:00
def publish_to_subscribers ( self , feed ) :
try :
r = redis . Redis ( connection_pool = settings . REDIS_POOL )
listeners_count = r . publish ( str ( feed . pk ) , ' story:new ' )
2011-11-06 12:28:06 -08:00
if listeners_count :
2011-11-05 17:08:31 -07:00
logging . debug ( " ---> [ %-30s ] Published to %s subscribers " % ( unicode ( feed ) [ : 30 ] , listeners_count ) )
except redis . ConnectionError :
logging . debug ( " ***> [ %-30s ] Redis is unavailable for real-time. " % ( unicode ( feed ) [ : 30 ] , ) )
2010-11-08 11:30:06 -05:00
@timelimit ( 20 )
2010-11-05 12:53:02 -04:00
def count_unreads_for_subscribers ( self , feed ) :
UNREAD_CUTOFF = datetime . datetime . utcnow ( ) - datetime . timedelta ( days = settings . DAYS_OF_UNREAD )
2010-11-10 18:22:33 -05:00
user_subs = UserSubscription . objects . filter ( feed = feed ,
active = True ,
user__profile__last_seen_on__gte = UNREAD_CUTOFF ) \
. order_by ( ' -last_read_date ' )
2011-01-29 22:01:09 -05:00
logging . debug ( u ' ---> [ %-30s ] Computing scores: %s ( %s / %s / %s ) subscribers ' % (
unicode ( feed ) [ : 30 ] , user_subs . count ( ) ,
feed . num_subscribers , feed . active_subscribers , feed . premium_subscribers ) )
2010-11-08 11:30:06 -05:00
2011-11-04 09:45:10 -07:00
if self . options [ ' slave_db ' ] :
slave_db = self . options [ ' slave_db ' ]
stories_db_orig = slave_db . stories . find ( {
" story_feed_id " : feed . pk ,
" story_date " : {
" $gte " : UNREAD_CUTOFF ,
} ,
} )
stories_db = [ ]
for story in stories_db_orig :
stories_db . append ( bunch ( story ) )
else :
stories_db = MStory . objects ( story_feed_id = feed . pk ,
story_date__gte = UNREAD_CUTOFF )
2010-11-05 12:53:02 -04:00
for sub in user_subs :
cache . delete ( ' usersub: %s ' % sub . user_id )
2010-11-10 18:22:33 -05:00
sub . needs_unread_recalc = True
sub . save ( )
if self . options [ ' compute_scores ' ] :
for sub in user_subs :
silent = False if self . options [ ' verbose ' ] > = 2 else True
sub . calculate_feed_scores ( silent = silent , stories_db = stories_db )
2010-11-05 12:53:02 -04:00
2010-04-25 18:31:54 -04:00
def add_jobs ( self , feeds_queue , feeds_count = 1 ) :
2009-08-29 19:34:42 +00:00
""" adds a feed processing job to the pool
"""
2010-04-09 16:37:19 -04:00
self . feeds_queue = feeds_queue
2010-04-23 21:19:19 -04:00
self . feeds_count = feeds_count
2009-08-29 19:34:42 +00:00
2009-09-10 03:33:05 +00:00
def run_jobs ( self ) :
2010-04-09 16:37:19 -04:00
if self . options [ ' single_threaded ' ] :
self . process_feed_wrapper ( self . feeds_queue [ 0 ] )
else :
for i in range ( self . num_threads ) :
feed_queue = self . feeds_queue [ i ]
2010-07-05 14:26:35 -04:00
self . workers . append ( multiprocessing . Process ( target = self . process_feed_wrapper ,
args = ( feed_queue , ) ) )
2010-04-09 16:37:19 -04:00
for i in range ( self . num_threads ) :
self . workers [ i ] . start ( )
2009-09-12 20:42:38 +00:00