Reading stories from slave db during feed update.

This commit is contained in:
Samuel Clay 2011-11-04 09:45:10 -07:00
parent dd440c0080
commit 040b755694
5 changed files with 59 additions and 14 deletions

View file

@ -63,6 +63,12 @@ class Command(BaseCommand):
options['compute_scores'] = True
import pymongo
db = pymongo.Connection(settings.MONGODB_SLAVE['host'], slave_okay=True, replicaset='nbset').newsblur
options['slave_db'] = db
disp = feed_fetcher.Dispatcher(options, num_workers)
feeds_queue = []

View file

@ -8,7 +8,6 @@ import zlib
import urllib
from collections import defaultdict
from operator import itemgetter
from BeautifulSoup import BeautifulStoneSoup
# from nltk.collocations import TrigramCollocationFinder, BigramCollocationFinder, TrigramAssocMeasures, BigramAssocMeasures
from django.db import models
from django.db import IntegrityError
@ -550,7 +549,7 @@ class Feed(models.Model):
self.data.feed_classifier_counts = json.encode(scores)
self.data.save()
def update(self, force=False, single_threaded=True, compute_scores=True):
def update(self, force=False, single_threaded=True, compute_scores=True, slave_db=None):
from utils import feed_fetcher
try:
self.feed_address = self.feed_address % {'NEWSBLUR_DIR': settings.NEWSBLUR_DIR}
@ -566,6 +565,7 @@ class Feed(models.Model):
'single_threaded': single_threaded,
'force': force,
'compute_scores': compute_scores,
'slave_db': slave_db,
}
disp = feed_fetcher.Dispatcher(options, 1)
disp.add_jobs([[self.pk]])
@ -624,6 +624,7 @@ class Feed(models.Model):
# logging.debug('- Updated story in feed (%s - %s): %s / %s' % (self.feed_title, story.get('title'), len(existing_story.story_content), len(story_content)))
story_guid = story.get('guid') or story.get('id') or story.get('link')
original_content = None
existing_story = MStory.objects.get(story_feed_id=existing_story.story_feed_id, story_guid=existing_story.story_guid)
if existing_story.story_original_content_z:
original_content = zlib.decompress(existing_story.story_original_content_z)
elif existing_story.story_content_z:

View file

@ -1,5 +1,6 @@
from celery.task import Task
from utils import log as logging
from django.conf import settings
class UpdateFeeds(Task):
name = 'update-feeds'
@ -11,10 +12,13 @@ class UpdateFeeds(Task):
if not isinstance(feed_pks, list):
feed_pks = [feed_pks]
import pymongo
db = pymongo.Connection(settings.MONGODB_SLAVE['host'], slave_okay=True, replicaset='nbset').newsblur
for feed_pk in feed_pks:
try:
feed = Feed.objects.get(pk=feed_pk)
feed.update()
feed.update(slave_db=db)
except Feed.DoesNotExist:
logging.info(" ---> Feed doesn't exist: [%s]" % feed_pk)
# logging.debug(' Updating: [%s] %s' % (feed_pks, feed))

View file

@ -421,6 +421,14 @@ class MasterSlaveRouter(object):
"Explicitly put all models on all databases."
return True
# ===========
# = MongoDB =
# ===========
MONGODB_SLAVE = {
'host': 'db01'
}
# ==================
# = Configurations =
# ==================

View file

@ -11,6 +11,7 @@ from utils import feedparser
from utils.story_functions import pre_process_story
from utils import log as logging
from utils.feed_functions import timelimit, TimeoutError, mail_feed_error_to_admin, utf8encode
from utils.story_functions import bunch
import time
import datetime
import traceback
@ -214,6 +215,19 @@ class ProcessFeed:
# if story.get('published') > end_date:
# end_date = story.get('published')
story_guids.append(story.get('guid') or story.get('link'))
if self.options['slave_db']:
slave_db = self.options['slave_db']
stories_db_orig = slave_db.stories.find({
"story_feed_id": self.feed.pk,
"story_date": {
"$gte": start_date,
},
}).limit(len(story_guids))
existing_stories = []
for story in stories_db_orig:
existing_stories.append(bunch(story))
else:
existing_stories = list(MStory.objects(
# story_guid__in=story_guids,
story_date__gte=start_date,
@ -227,10 +241,9 @@ class ProcessFeed:
# ).order_by('-story_date')
ret_values = self.feed.add_update_stories(self.fpf.entries, existing_stories)
logging.debug(u' ---> [%-30s] Parsed Feed: %s' % (
logging.debug(u' ---> [%-30s] ~FYParsed Feed: new~FG=~FG~SB%s~SN~FY up~FG=~FY~SB%s~SN same~FG=~FY%s err~FG=~FR~SB%s' % (
unicode(self.feed)[:30],
u' '.join(u'%s=%d' % (self.entry_trans[key],
ret_values[key]) for key in self.entry_keys),))
ret_values[ENTRY_NEW], ret_values[ENTRY_UPDATED], ret_values[ENTRY_SAME], ret_values[ENTRY_ERR]))
self.feed.update_all_statistics()
self.feed.trim_feed()
self.feed.save_feed_history(200, "OK")
@ -401,6 +414,19 @@ class Dispatcher:
unicode(feed)[:30], user_subs.count(),
feed.num_subscribers, feed.active_subscribers, feed.premium_subscribers))
if self.options['slave_db']:
slave_db = self.options['slave_db']
stories_db_orig = slave_db.stories.find({
"story_feed_id": feed.pk,
"story_date": {
"$gte": UNREAD_CUTOFF,
},
})
stories_db = []
for story in stories_db_orig:
stories_db.append(bunch(story))
else:
stories_db = MStory.objects(story_feed_id=feed.pk,
story_date__gte=UNREAD_CUTOFF)
for sub in user_subs: