Refactored the feed fetcher. Added integration tests for dupe matching. Also stubbed in AI score calculations so I can do something with the output soon.

This commit is contained in:
Samuel Clay 2009-08-29 19:34:42 +00:00
parent 3bd43a8cb4
commit bdd91d714d
16 changed files with 3427 additions and 418 deletions

View file

@ -1,9 +1,10 @@
from django.db import models
from django.contrib.auth.models import User
import datetime
import random
from django.core.cache import cache
from apps.rss_feeds.models import Feed, Story
from utils import feedparser, object_manager
from utils import feedparser, object_manager, json
class UserSubscription(models.Model):
user = models.ForeignKey(User)
@ -14,6 +15,7 @@ class UserSubscription(models.Model):
unread_count_updated = models.DateTimeField(
default=datetime.datetime(2000,1,1)
)
scores = models.CharField(max_length=256)
def __unicode__(self):
return '[' + self.feed.feed_title + '] '
@ -79,6 +81,21 @@ class UserSubscription(models.Model):
new_subscription = UserSubscription(user=self.user, feed=feed)
new_subscription.save()
def calculate_feed_scores(self):
scores = []
for i in range(20):
# [0, 0, 2, 4, 5 ..]
scores.append(random.randint(0, 20))
self.scores = json.encode(scores)
self.save()
return scores
def get_scores(self):
scores = json.decode(self.scores)
return scores
class Meta:
unique_together = ("user", "feed")

View file

@ -5,6 +5,7 @@ urlpatterns = patterns('apps.reader.views',
(r'^load_single_feed', 'load_single_feed'),
(r'^load_feed_page', 'load_feed_page'),
(r'^load_feeds', 'load_feeds'),
(r'^refresh_feed', 'refresh_feed'),
(r'^mark_story_as_read', 'mark_story_as_read'),
(r'^mark_story_as_like', 'mark_story_as_like'),
(r'^mark_story_as_dislike', 'mark_story_as_dislike'),

View file

@ -1,10 +1,13 @@
from django.shortcuts import render_to_response, get_list_or_404, get_object_or_404
from django.contrib.auth.decorators import login_required
from django.template import RequestContext
from apps.rss_feeds.models import Feed, Story
try:
from apps.rss_feeds.models import Feed, Story
except:
pass
from django.core.cache import cache
from apps.reader.models import UserSubscription, UserSubscriptionFolders, UserStory
from utils.json import json_encode
from utils import json
from utils.user_functions import get_user
from django.contrib.auth.models import User
from django.http import HttpResponse, HttpRequest
@ -15,6 +18,7 @@ from djangologging.decorators import suppress_logging_output
import logging
import datetime
import threading
import random
SINGLE_DAY = 60*60*24
@ -44,7 +48,8 @@ def load_feeds(request):
feeds = []
folders = []
for sub in us:
# logging.info("UserSub: %s" % sub)
# logging.info("UserSub Scores: %s" % sub.user_sub.scores)
sub.feed.scores = json.decode(sub.user_sub.scores)
try:
sub.feed.unread_count = sub.user_sub.unread_count
except:
@ -68,7 +73,7 @@ def load_feeds(request):
cache.set('usersub:%s' % user, feeds, SINGLE_DAY)
data = json_encode(feeds)
data = json.encode(feeds)
return HttpResponse(data, mimetype='text/html')
def load_single_feed(request):
@ -109,9 +114,38 @@ def load_single_feed(request):
# logging.debug("Story: %s" % story)
context = stories
data = json_encode(context)
data = json.encode(context)
return HttpResponse(data, mimetype='text/html')
def refresh_feed(request):
feed_id = request.REQUEST['feed_id']
force_update = request.GET.get('force', False)
feeds = Feed.objects.filter(id=feed_id)
feeds = refresh_feeds(feeds, force_update)
context = {}
user = request.user
user_info = _parse_user_info(user)
context.update(user_info)
return render_to_response('reader/feeds.xhtml', context,
context_instance=RequestContext(request))
def refresh_feeds(feeds, force=False):
for f in feeds:
logging.debug('Feed Updating: %s' % f)
f.update(force)
usersubs = UserSubscription.objects.filter(
feed=f.id
)
for us in usersubs:
us.count_unread()
logging.info('Deleteing user sub cache: %s' % us.user_id)
cache.delete('usersub:%s' % us.user_id)
return
@suppress_logging_output
def load_feed_page(request):
feed = Feed.objects.get(id=request.REQUEST.get('feed_id'))
@ -121,27 +155,16 @@ def load_feed_page(request):
@login_required
def mark_story_as_read(request):
story_id = request.REQUEST['story_id']
story = Story.objects.select_related("story_feed").get(id=story_id)
story_id = int(request.REQUEST['story_id'])
feed_id = int(request.REQUEST['feed_id'])
read_story = UserStory.objects.filter(story=story_id, user=request.user, feed=story.story_feed).count()
logging.debug('Marking as read: %s' % read_story)
if read_story:
data = json_encode(dict(code=1))
else:
us = UserSubscription.objects.get(
feed=story.story_feed,
user=request.user
)
us.mark_read()
logging.debug("Marked Read: " + str(story_id) + ' ' + str(story.id))
m = UserStory(story=story, user=request.user, feed=story.story_feed)
data = json_encode(dict(code=0))
try:
m.save()
except:
data = json_encode(dict(code=2))
logging.debug("Marked Read: %s (%s)" % (story_id, feed_id))
m = UserStory(story_id=story_id, user=request.user, feed_id=feed_id)
data = json.encode(dict(code=0))
try:
m.save()
except:
data = json.encode(dict(code=2))
return HttpResponse(data)
@login_required
@ -153,11 +176,11 @@ def mark_feed_as_read(request):
us.mark_feed_read()
UserStory.objects.filter(user=request.user, feed=feed_id).delete()
data = json_encode(dict(code=0))
data = json.encode(dict(code=0))
try:
m.save()
except:
data = json_encode(dict(code=1))
data = json.encode(dict(code=1))
return HttpResponse(data)
@login_required
@ -173,33 +196,33 @@ def mark_story_with_opinion(request, opinion):
story_id = request.REQUEST['story_id']
story = Story.objects.select_related("story_feed").get(id=story_id)
previous_opinion = StoryOpinions.objects.get(story=story,
previous_opinion = UserStory.objects.get(story=story,
user=request.user,
feed=story.story_feed)
if previous_opinion and previous_opinion.opinion != opinion:
previous_opinion.opinion = opinion
data = json_encode(dict(code=0))
data = json.encode(dict(code=0))
previous_opinion.save()
logging.debug("Changed Opinion: " + str(previous_opinion.opinion) + ' ' + str(opinion))
else:
logging.debug("Marked Opinion: " + str(story_id) + ' ' + str(opinion))
m = StoryOpinions(story=story, user=request.user, feed=story.story_feed, opinion=opinion)
data = json_encode(dict(code=0))
m = UserStory(story=story, user=request.user, feed=story.story_feed, opinion=opinion)
data = json.encode(dict(code=0))
try:
m.save()
except:
data = json_encode(dict(code=2))
data = json.encode(dict(code=2))
return HttpResponse(data)
@login_required
def get_read_feed_items(request, username):
feeds = get_list_or_404(Feed)
def _parse_user_info(user):
return {
'user_info': {
'is_anonymous': json_encode(user.is_anonymous()),
'is_authenticated': json_encode(user.is_authenticated()),
'username': json_encode(user.username if user.is_authenticated() else 'Anonymous')
'is_anonymous': json.encode(user.is_anonymous()),
'is_authenticated': json.encode(user.is_authenticated()),
'username': json.encode(user.username if user.is_authenticated() else 'Anonymous')
}
}

View file

@ -0,0 +1,19 @@
[
{
"pk": 4,
"model": "rss_feeds.feed",
"fields": {
"feed_address": "%(NEWSBLUR_DIR)s/apps/rss_feeds/fixtures/gothamist1.xml",
"days_to_trim": 90,
"feed_link": "http://gothamist.com",
"num_subscribers": 0,
"creation": "2009-01-12",
"feed_title": "Gothamist",
"last_update": "2009-07-06 22:30:03",
"min_to_decay": 1,
"etag": "",
"last_modified": "2009-07-06 22:30:03",
"active": 1
}
}
]

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,19 @@
[
{
"pk": 4,
"model": "rss_feeds.feed",
"fields": {
"feed_address": "%(NEWSBLUR_DIR)s/apps/rss_feeds/fixtures/gothamist2.xml",
"days_to_trim": 90,
"feed_link": "http://gothamist.com",
"num_subscribers": 0,
"creation": "2009-01-12",
"feed_title": "Gothamist",
"last_update": "2009-07-06 22:30:03",
"min_to_decay": 1,
"etag": "",
"last_modified": "2009-07-06 22:30:03",
"active": 1
}
}
]

File diff suppressed because it is too large Load diff

View file

@ -123,6 +123,29 @@
"last_read_date": "2009-07-28 23:17:27"
}
},
{
"pk": 4,
"model": "reader.usersubscriptionfolders",
"fields": {
"feed": 4,
"user_sub": 4,
"folder": "Blogs",
"user": 1
}
},
{
"pk": 4,
"model": "reader.usersubscription",
"fields": {
"feed": 4,
"unread_count_updated": "2009-08-01 00:23:42",
"mark_read_date": "2009-07-28 23:17:27",
"unread_count": 0,
"user": 1,
"last_read_date": "2009-07-28 23:17:27"
}
},

View file

@ -0,0 +1,44 @@
from django.core.management.base import BaseCommand
from django.core.handlers.wsgi import WSGIHandler
from apps.rss_feeds.models import Feed, Story
from django.core.cache import cache
from apps.reader.models import UserSubscription, UserSubscriptionFolders, UserStory
from optparse import OptionParser, make_option
import os
import logging
import errno
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
make_option("-f", "--feed", dest="feed", default=None),
make_option("-d", "--daemon", dest="daemonize", action="store_true"),
)
def handle(self, *args, **options):
if options['daemonize']:
daemonize()
feeds = UserSubscription.objects.all()
for f in feeds:
f.calculate_feed_scores()
def daemonize():
"""
Detach from the terminal and continue as a daemon.
"""
# swiped from twisted/scripts/twistd.py
# See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(077)
null = os.open("/dev/null", os.O_RDWR)
for i in range(3):
try:
os.dup2(null, i)
except OSError, e:
if e.errno != errno.EBADF:
raise
os.close(null)

View file

@ -1,44 +1,13 @@
from django.core.management.base import BaseCommand
from django.core.handlers.wsgi import WSGIHandler
from apps.rss_feeds.models import Feed, Story
from django.core.cache import cache
from apps.reader.models import UserSubscription, UserSubscriptionFolders, UserStory
from optparse import OptionParser, make_option
from apps.rss_feeds.importer import PageImporter
from utils import feedparser, threadpool
import os
import sys
import time
from utils import feed_fetcher
import logging
import errno
import datetime
import threading
import traceback
import os
import socket
import errno
threadpool = None
# Refresh feed code adapted from Feedjack.
# http://feedjack.googlecode.com
VERSION = '0.2'
URL = 'http://www.newsblur.com/'
USER_AGENT = 'NewsBlur %s - %s' % (VERSION, URL)
SLOWFEED_WARNING = 10
ENTRY_NEW, ENTRY_UPDATED, ENTRY_SAME, ENTRY_ERR = range(4)
FEED_OK, FEED_SAME, FEED_ERRPARSE, FEED_ERRHTTP, FEED_ERREXC = range(5)
def prints(tstr):
""" lovely unicode
"""
sys.stdout.write('%s\n' % (tstr.encode(sys.getdefaultencoding(),
'replace')))
sys.stdout.flush()
def mtime(ttime):
""" datetime auxiliar function.
"""
return datetime.datetime.fromtimestamp(time.mktime(ttime))
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
@ -59,7 +28,7 @@ class Command(BaseCommand):
# settting socket timeout (default= 10 seconds)
socket.setdefaulttimeout(options['timeout'])
disp = Dispatcher(options, options['workerthreads'])
disp = feed_fetcher.Dispatcher(options, options['workerthreads'])
feeds = Feed.objects.all()
for feed in feeds:
@ -67,279 +36,6 @@ class Command(BaseCommand):
disp.poll()
class FetchFeed:
def __init__(self, feed, options):
self.feed = feed
self.options = options
self.fpf = None
def fetch(self):
""" Downloads and parses a feed.
"""
logging.debug(u'[%d] Fetching %s' % (self.feed.id,
self.feed.feed_title))
# we check the etag and the modified time to save bandwith and
# avoid bans
try:
self.fpf = feedparser.parse(self.feed.feed_address,
agent=USER_AGENT,
etag=self.feed.etag)
except:
logging.error('! ERROR: feed cannot be parsed')
return FEED_ERRPARSE
return self.fpf
class FetchPage:
def __init__(self, feed, options):
self.feed = feed
self.options = options
def fetch(self):
logging.debug(u'[%d] Fetching page from %s' % (self.feed.id,
self.feed.feed_title))
if self.feed.feed_link:
page_importer = PageImporter(self.feed.feed_link, self.feed)
self.feed.page = page_importer.fetch_page()
self.feed.save()
class ProcessFeed:
def __init__(self, feed, fpf, options):
self.feed = feed
self.options = options
self.fpf = fpf
def process(self):
""" Downloads and parses a feed.
"""
ret_values = {
ENTRY_NEW:0,
ENTRY_UPDATED:0,
ENTRY_SAME:0,
ENTRY_ERR:0}
logging.debug(u'[%d] Processing %s' % (self.feed.id,
self.feed.feed_title))
if hasattr(self.fpf, 'status'):
if self.options['verbose']:
logging.debug(u'[%d] HTTP status %d: %s' % (self.feed.id,
self.fpf.status,
self.feed.feed_address))
if self.fpf.status == 304:
# this means the feed has not changed
if self.options['verbose']:
logging.debug('[%d] Feed has not changed since ' \
'last check: %s' % (self.feed.id,
self.feed.feed_address))
return FEED_SAME, ret_values
if self.fpf.status >= 400:
# http error, ignore
logging.error('[%d] !HTTP_ERROR! %d: %s' % (self.feed.id,
self.fpf.status,
self.feed.feed_address))
return FEED_ERRHTTP, ret_values
if hasattr(self.fpf, 'bozo') and self.fpf.bozo:
logging.debug('[%d] !BOZO! Feed is not well formed: %s' % (
self.feed.id, self.feed.feed_address))
# the feed has changed (or it is the first time we parse it)
# saving the etag and last_modified fields
self.feed.etag = self.fpf.get('etag', '')
# some times this is None (it never should) *sigh*
if self.feed.etag is None:
self.feed.etag = ''
try:
self.feed.last_modified = mtime(self.fpf.modified)
except:
pass
self.feed.feed_title = self.fpf.feed.get('title', '')[0:254]
self.feed.feed_tagline = self.fpf.feed.get('tagline', '')
self.feed.feed_link = self.fpf.feed.get('link', '')
self.feed.last_update = datetime.datetime.now()
if False and self.options['verbose']:
logging.debug(u'[%d] Feed info for: %s\n' \
u' title %s\n' \
u' tagline %s\n' \
u' link %s\n' \
u' last_checked %s' % (
self.feed.id, self.feed.feed_address, self.feed.feed_title,
self.feed.feed_tagline, self.feed.feed_link, self.feed.last_update))
guids = []
for entry in self.fpf.entries:
if entry.get('id', ''):
guids.append(entry.get('id', ''))
elif entry.title:
guids.append(entry.title)
elif entry.link:
guids.append(entry.link)
self.feed.save()
# Compare new stories to existing stories, adding and updating
try:
num_entries = len(self.fpf.entries)
existing_stories = Story.objects.filter(
story_feed=self.feed
).order_by('-story_date').values()[:num_entries]
ret_values = self.feed.add_update_stories(self.fpf.entries, existing_stories)
except:
(etype, eobj, etb) = sys.exc_info()
print '[%d] ! -------------------------' % (self.feed.id,)
print traceback.format_exception(etype, eobj, etb)
traceback.print_exception(etype, eobj, etb)
print '[%d] ! -------------------------' % (self.feed.id,)
return FEED_OK, ret_values
class Dispatcher:
def __init__(self, options, num_threads):
self.options = options
self.entry_stats = {
ENTRY_NEW:0,
ENTRY_UPDATED:0,
ENTRY_SAME:0,
ENTRY_ERR:0}
self.feed_stats = {
FEED_OK:0,
FEED_SAME:0,
FEED_ERRPARSE:0,
FEED_ERRHTTP:0,
FEED_ERREXC:0}
self.entry_trans = {
ENTRY_NEW:'new',
ENTRY_UPDATED:'updated',
ENTRY_SAME:'same',
ENTRY_ERR:'error'}
self.feed_trans = {
FEED_OK:'ok',
FEED_SAME:'unchanged',
FEED_ERRPARSE:'cant_parse',
FEED_ERRHTTP:'http_error',
FEED_ERREXC:'exception'}
self.entry_keys = sorted(self.entry_trans.keys())
self.feed_keys = sorted(self.feed_trans.keys())
if threadpool:
self.tpool = threadpool.ThreadPool(num_threads)
else:
self.tpool = None
self.time_start = datetime.datetime.now()
def process_feed_wrapper(self, feed):
""" wrapper for ProcessFeed
"""
start_time = datetime.datetime.now()
try:
ffeed = FetchFeed(feed, self.options)
fetched_feed = ffeed.fetch()
pfeed = ProcessFeed(feed, fetched_feed, self.options)
ret_feed, ret_entries = pfeed.process()
fpage = FetchPage(feed, self.options)
fpage.fetch()
del ffeed
del pfeed
del fpage
except:
(etype, eobj, etb) = sys.exc_info()
print '[%d] ! -------------------------' % (feed.id,)
print traceback.format_exception(etype, eobj, etb)
traceback.print_exception(etype, eobj, etb)
print '[%d] ! -------------------------' % (feed.id,)
ret_feed = FEED_ERREXC
ret_entries = {}
delta = datetime.datetime.now() - start_time
if delta.seconds > SLOWFEED_WARNING:
comment = u' (SLOW FEED!)'
else:
comment = u''
logging.debug(u'[%d] Processed %s in %s [%s] [%s]%s' % (
feed.id, feed.feed_title, unicode(delta),
self.feed_trans[ret_feed],
u' '.join(u'%s=%d' % (self.entry_trans[key],
ret_entries[key]) for key in self.entry_keys),
comment))
self.feed_stats[ret_feed] += 1
for key, val in ret_entries.items():
self.entry_stats[key] += val
return ret_feed, ret_entries
def add_job(self, feed):
""" adds a feed processing job to the pool
"""
if self.tpool:
req = threadpool.WorkRequest(self.process_feed_wrapper,
(feed,))
self.tpool.putRequest(req)
else:
# no threadpool module, just run the job
self.process_feed_wrapper(feed)
def poll(self):
""" polls the active threads
"""
if not self.tpool:
# no thread pool, nothing to poll
return
while True:
try:
time.sleep(0.2)
self.tpool.poll()
except KeyboardInterrupt:
logging.debug('! Cancelled by user')
break
except threadpool.NoResultsPending:
logging.info(u'* DONE in %s\n* Feeds: %s\n* Entries: %s' % (
unicode(datetime.datetime.now() - self.time_start),
u' '.join(u'%s=%d' % (self.feed_trans[key],
self.feed_stats[key])
for key in self.feed_keys),
u' '.join(u'%s=%d' % (self.entry_trans[key],
self.entry_stats[key])
for key in self.entry_keys)
))
break
except Exception, e:
logging.error(u'I DONT KNOW')
class FeedFetcher(threading.Thread):
def __init__(self, feed):
threading.Thread.__init__(self)
self.feed = feed
def run(self):
print self.feed
self.feed.update(True)
usersubs = UserSubscription.objects.filter(
feed=self.feed.id
)
for us in usersubs:
us.count_unread()
cache.delete('usersub:%s' % us.user_id)
def daemonize():
"""

View file

@ -11,10 +11,9 @@ from django.utils.safestring import mark_safe
from utils.story_functions import format_story_link_date__short
from utils.story_functions import format_story_link_date__long
from django.db.models import Q
from utils.diff import HTMLDiff
from apps.rss_feeds.importer import PageImporter
import settings
import logging
from utils.diff import HTMLDiff
USER_AGENT = 'NewsBlur v1.0 - newsblur.com'
@ -51,70 +50,19 @@ class Feed(models.Model):
print locals()
def update(self, force=False, feed=None):
from utils import feed_fetcher
try:
self.feed_address = self.feed_address % {'NEWSBLUR_DIR': settings.NEWSBLUR_DIR}
except:
pass
last_updated = self.last_updated() / 60
min_to_decay = self.min_to_decay + (random.random()*self.min_to_decay)
if last_updated < min_to_decay and not force:
# logging.debug('Feed unchanged: ' + self.feed_title)
return
feed_updated, feed = cache.get("feed:" + self.feed_address, (None, None,))
if feed and not force:
# logging.debug('Feed Cached: ' + self.feed_title)
pass
if not feed or force:
last_modified = None
now = datetime.datetime.now()
if self.last_modified:
last_modified = datetime.datetime.timetuple(self.last_modified)
if not feed:
logging.debug('[%d] Retrieving Feed: %s'
% (self.id, self.feed_title))
feed = feedparser.parse(self.feed_address,
etag=self.etag,
modified=last_modified,
agent=USER_AGENT)
logging.debug('\t- [%d] Retrieved Feed: %s'
% (self.id, self.feed_title))
cache.set("feed:" + self.feed_address, (now, feed), min_to_decay)
# check for movement or disappearance
if hasattr(feed, 'status'):
if feed.status == 301:
self.feed_url = feed.href
if feed.status == 410:
self.active = False
if feed.status >= 400:
return
# Fill in optional fields
if not self.feed_title:
self.feed_title = feed.feed.get('title', feed.feed.get('link'))
if not self.feed_link:
self.feed_link = feed.feed.get('link')
self.etag = feed.get('etag', '')
self.last_update = datetime.datetime.now()
self.last_modified = mtime(feed.get('modified',
datetime.datetime.timetuple(datetime.datetime.now())))
page_importer = PageImporter(self.feed_link, self)
self.page = page_importer.fetch_page()
self.save()
num_entries = len(feed['entries'])
# Compare new stories to existing stories, adding and updating
existing_stories = Story.objects.filter(
story_feed=self
).order_by('-story_date').values()[:num_entries]
self.add_update_stories(feed['entries'], existing_stories)
self.trim_feed();
options = {
'verbose': 0,
'timeout': 10
}
disp = feed_fetcher.Dispatcher(options, 1)
disp.add_job(self)
disp.poll()
return
@ -155,7 +103,7 @@ class Feed(models.Model):
pass
elif existing_story and is_different:
# update story
logging.debug('- Updated story in feed (%s - %s/%s): %s / %s' % (self.feed_title, len(existing_story['story_content']), len(story.get('title')), len(existing_story['story_content']), len(story_content)))
logging.debug('- Updated story in feed (%s - %s): %s / %s' % (self.feed_title, story.get('title'), len(existing_story['story_content']), len(story_content)))
original_content = None
if existing_story['story_original_content']:

View file

@ -9,15 +9,28 @@ class FeedTest(TestCase):
def setUp(self):
self.client = Client()
def test_load_feeds__changed_story_title(self):
# def test_load_feeds__changed_story_title(self):
# self.client.login(userame='conesus', password='test')
#
# management.call_command('loaddata', 'gawker1.json', verbosity=0)
# response = self.client.get('/reader/refresh_feed', { "feed_id": 1, "force": True })
#
# management.call_command('loaddata', 'gawker2.json', verbosity=0)
# response = self.client.get('/reader/refresh_feed', { "feed_id": 1, "force": True })
#
# response = self.client.get('/reader/load_single_feed', { "feed_id": 1 })
# print [c['story_title'] for c in json.loads(response.content)]
# # print json.loads(response.content)[0]
def test_load_feeds__gothamist__changed_story_title(self):
self.client.login(userame='conesus', password='test')
management.call_command('loaddata', 'gawker1.json', verbosity=0)
response = self.client.get('/reader/refresh_feed', { "feed_id": 1, "force": True })
management.call_command('loaddata', 'gothamist1.json', verbosity=0)
response = self.client.get('/reader/refresh_feed', { "feed_id": 4, "force": True })
management.call_command('loaddata', 'gawker2.json', verbosity=0)
response = self.client.get('/reader/refresh_feed', { "feed_id": 1, "force": True })
management.call_command('loaddata', 'gothamist2.json', verbosity=0)
response = self.client.get('/reader/refresh_feed', { "feed_id": 4, "force": True })
response = self.client.get('/reader/load_single_feed', { "feed_id": 1 })
response = self.client.get('/reader/load_single_feed', { "feed_id": 4 })
print [c['story_title'] for c in json.loads(response.content)]
# print json.loads(response.content)[0]

View file

@ -49,7 +49,7 @@ NEWSBLUR.AssetModel.Reader.prototype = {
var raw_data = o.substring(0, log_index);
data = eval('(' + raw_data + ')');
if (log) {
var log_js_index_begin = log.indexOf('<script type="text/javascript">');
var log_js_index_begin = log.indexOf('<script type=\"text\/javascript\">');
var log_js_index_end = log.indexOf('</script>');
var log_html = log.substring(0, log_js_index_begin);
var log_js = log.substring(log_js_index_begin+31, log_js_index_end);
@ -71,7 +71,7 @@ NEWSBLUR.AssetModel.Reader.prototype = {
});
},
mark_story_as_read: function(story_id, callback) {
mark_story_as_read: function(story_id, feed_id, callback) {
var self = this;
var read = false;
@ -86,7 +86,8 @@ NEWSBLUR.AssetModel.Reader.prototype = {
if (!read) {
this.make_request('/reader/mark_story_as_read',
{
story_id: story_id
story_id: story_id,
feed_id: feed_id
}, callback
);
}
@ -233,6 +234,7 @@ NEWSBLUR.AssetModel.Preferences.prototype = {
},
make_request: function(url, data, callback) {
$.ajaxStop();
$.ajax({
url: url,
data: data,

View file

@ -297,6 +297,8 @@
// this.model.load_feed_page(feed_id, 0, $.rescope(this.show_feed_page_contents, this));
this.show_feed_page_contents(feed_id);
this.show_correct_story_view(feed_id);
this.active_feed = feed_id;
},
show_correct_story_view: function(feed_id) {
@ -548,7 +550,6 @@
var title = story_title.replace('^\s+|\s+$', '');
var $story, $stories = [], title_words, shortened_title, $reduced_stories = [];
NEWSBLUR.log(['SS:', story_title, title]);
$stories = $iframe.contents().find(':contains('+title+')');
NEWSBLUR.log(['SS 1:', {'stories': $stories}, $stories.slice(-1), $stories.length]);
@ -634,6 +635,7 @@
mark_story_as_read: function(story_id, $story_title) {
var self = this;
var feed_id = this.active_feed;
var callback = function() {
return;
@ -641,7 +643,7 @@
$story_title.addClass('read');
if (NEWSBLUR.Globals.is_authenticated) {
this.model.mark_story_as_read(story_id, callback);
this.model.mark_story_as_read(story_id, feed_id, callback);
}
},

307
utils/feed_fetcher.py Normal file
View file

@ -0,0 +1,307 @@
from apps.rss_feeds.models import Story
from django.core.cache import cache
from apps.reader.models import UserSubscription, UserSubscriptionFolders, UserStory
from apps.rss_feeds.importer import PageImporter
from utils import feedparser, threadpool
import sys
import time
import logging
import datetime
import threading
import traceback
threadpool = None
# Refresh feed code adapted from Feedjack.
# http://feedjack.googlecode.com
VERSION = '0.2'
URL = 'http://www.newsblur.com/'
USER_AGENT = 'NewsBlur %s - %s' % (VERSION, URL)
SLOWFEED_WARNING = 10
ENTRY_NEW, ENTRY_UPDATED, ENTRY_SAME, ENTRY_ERR = range(4)
FEED_OK, FEED_SAME, FEED_ERRPARSE, FEED_ERRHTTP, FEED_ERREXC = range(5)
def prints(tstr):
""" lovely unicode
"""
sys.stdout.write('%s\n' % (tstr.encode(sys.getdefaultencoding(),
'replace')))
sys.stdout.flush()
def mtime(ttime):
""" datetime auxiliar function.
"""
return datetime.datetime.fromtimestamp(time.mktime(ttime))
class FetchFeed:
def __init__(self, feed, options):
self.feed = feed
self.options = options
self.fpf = None
def fetch(self):
""" Downloads and parses a feed.
"""
logging.debug(u'[%d] Fetching %s' % (self.feed.id,
self.feed.feed_title))
# we check the etag and the modified time to save bandwith and
# avoid bans
try:
self.fpf = feedparser.parse(self.feed.feed_address,
agent=USER_AGENT,
etag=self.feed.etag)
except:
logging.error('! ERROR: feed cannot be parsed')
return FEED_ERRPARSE
return self.fpf
class FetchPage:
def __init__(self, feed, options):
self.feed = feed
self.options = options
def fetch(self):
logging.debug(u'[%d] Fetching page from %s' % (self.feed.id,
self.feed.feed_title))
if self.feed.feed_link:
page_importer = PageImporter(self.feed.feed_link, self.feed)
self.feed.page = page_importer.fetch_page()
self.feed.save()
class ProcessFeed:
def __init__(self, feed, fpf, options):
self.feed = feed
self.options = options
self.fpf = fpf
def process(self):
""" Downloads and parses a feed.
"""
ret_values = {
ENTRY_NEW:0,
ENTRY_UPDATED:0,
ENTRY_SAME:0,
ENTRY_ERR:0}
logging.debug(u'[%d] Processing %s' % (self.feed.id,
self.feed.feed_title))
if hasattr(self.fpf, 'status'):
if self.options['verbose']:
logging.debug(u'[%d] HTTP status %d: %s' % (self.feed.id,
self.fpf.status,
self.feed.feed_address))
if self.fpf.status == 304:
# this means the feed has not changed
if self.options['verbose']:
logging.debug('[%d] Feed has not changed since ' \
'last check: %s' % (self.feed.id,
self.feed.feed_address))
return FEED_SAME, ret_values
if self.fpf.status >= 400:
# http error, ignore
logging.error('[%d] !HTTP_ERROR! %d: %s' % (self.feed.id,
self.fpf.status,
self.feed.feed_address))
return FEED_ERRHTTP, ret_values
if hasattr(self.fpf, 'bozo') and self.fpf.bozo:
logging.debug('[%d] !BOZO! Feed is not well formed: %s' % (
self.feed.id, self.feed.feed_address))
# the feed has changed (or it is the first time we parse it)
# saving the etag and last_modified fields
self.feed.etag = self.fpf.get('etag', '')
# some times this is None (it never should) *sigh*
if self.feed.etag is None:
self.feed.etag = ''
try:
self.feed.last_modified = mtime(self.fpf.modified)
except:
pass
self.feed.feed_title = self.fpf.feed.get('title', '')[0:254]
self.feed.feed_tagline = self.fpf.feed.get('tagline', '')
self.feed.feed_link = self.fpf.feed.get('link', '')
self.feed.last_update = datetime.datetime.now()
if False and self.options['verbose']:
logging.debug(u'[%d] Feed info for: %s\n' \
u' title %s\n' \
u' tagline %s\n' \
u' link %s\n' \
u' last_checked %s' % (
self.feed.id, self.feed.feed_address, self.feed.feed_title,
self.feed.feed_tagline, self.feed.feed_link, self.feed.last_update))
guids = []
for entry in self.fpf.entries:
if entry.get('id', ''):
guids.append(entry.get('id', ''))
elif entry.title:
guids.append(entry.title)
elif entry.link:
guids.append(entry.link)
self.feed.save()
# Compare new stories to existing stories, adding and updating
try:
num_entries = len(self.fpf.entries)
existing_stories = Story.objects.filter(
story_feed=self.feed
).order_by('-story_date').values()[:num_entries]
ret_values = self.feed.add_update_stories(self.fpf.entries, existing_stories)
except:
(etype, eobj, etb) = sys.exc_info()
print '[%d] ! -------------------------' % (self.feed.id,)
print traceback.format_exception(etype, eobj, etb)
traceback.print_exception(etype, eobj, etb)
print '[%d] ! -------------------------' % (self.feed.id,)
return FEED_OK, ret_values
class Dispatcher:
def __init__(self, options, num_threads):
self.options = options
self.entry_stats = {
ENTRY_NEW:0,
ENTRY_UPDATED:0,
ENTRY_SAME:0,
ENTRY_ERR:0}
self.feed_stats = {
FEED_OK:0,
FEED_SAME:0,
FEED_ERRPARSE:0,
FEED_ERRHTTP:0,
FEED_ERREXC:0}
self.entry_trans = {
ENTRY_NEW:'new',
ENTRY_UPDATED:'updated',
ENTRY_SAME:'same',
ENTRY_ERR:'error'}
self.feed_trans = {
FEED_OK:'ok',
FEED_SAME:'unchanged',
FEED_ERRPARSE:'cant_parse',
FEED_ERRHTTP:'http_error',
FEED_ERREXC:'exception'}
self.entry_keys = sorted(self.entry_trans.keys())
self.feed_keys = sorted(self.feed_trans.keys())
if threadpool:
self.tpool = threadpool.ThreadPool(num_threads)
else:
self.tpool = None
self.time_start = datetime.datetime.now()
def process_feed_wrapper(self, feed):
""" wrapper for ProcessFeed
"""
start_time = datetime.datetime.now()
try:
ffeed = FetchFeed(feed, self.options)
fetched_feed = ffeed.fetch()
pfeed = ProcessFeed(feed, fetched_feed, self.options)
ret_feed, ret_entries = pfeed.process()
fpage = FetchPage(feed, self.options)
fpage.fetch()
del ffeed
del pfeed
del fpage
except:
(etype, eobj, etb) = sys.exc_info()
print '[%d] ! -------------------------' % (feed.id,)
print traceback.format_exception(etype, eobj, etb)
traceback.print_exception(etype, eobj, etb)
print '[%d] ! -------------------------' % (feed.id,)
ret_feed = FEED_ERREXC
ret_entries = {}
delta = datetime.datetime.now() - start_time
if delta.seconds > SLOWFEED_WARNING:
comment = u' (SLOW FEED!)'
else:
comment = u''
logging.debug(u'[%d] Processed %s in %s [%s] [%s]%s' % (
feed.id, feed.feed_title, unicode(delta),
self.feed_trans[ret_feed],
u' '.join(u'%s=%d' % (self.entry_trans[key],
ret_entries[key]) for key in self.entry_keys),
comment))
self.feed_stats[ret_feed] += 1
for key, val in ret_entries.items():
self.entry_stats[key] += val
return ret_feed, ret_entries
def add_job(self, feed):
""" adds a feed processing job to the pool
"""
if self.tpool:
req = threadpool.WorkRequest(self.process_feed_wrapper,
(feed,))
self.tpool.putRequest(req)
else:
# no threadpool module, just run the job
self.process_feed_wrapper(feed)
def poll(self):
""" polls the active threads
"""
if not self.tpool:
# no thread pool, nothing to poll
return
while True:
try:
time.sleep(0.2)
self.tpool.poll()
except KeyboardInterrupt:
logging.debug('! Cancelled by user')
break
except threadpool.NoResultsPending:
logging.info(u'* DONE in %s\n* Feeds: %s\n* Entries: %s' % (
unicode(datetime.datetime.now() - self.time_start),
u' '.join(u'%s=%d' % (self.feed_trans[key],
self.feed_stats[key])
for key in self.feed_keys),
u' '.join(u'%s=%d' % (self.entry_trans[key],
self.entry_stats[key])
for key in self.entry_keys)
))
break
except Exception, e:
logging.error(u'I DONT KNOW')
class FeedFetcher(threading.Thread):
def __init__(self, feed):
threading.Thread.__init__(self)
self.feed = feed
def run(self):
print self.feed
self.feed.update(True)
usersubs = UserSubscription.objects.filter(
feed=self.feed.id
)
for us in usersubs:
us.count_unread()
cache.delete('usersub:%s' % us.user_id)

View file

@ -5,6 +5,11 @@ from django.utils.encoding import force_unicode
from django.utils import simplejson as json
from decimal import Decimal
def decode(data):
return json.loads(data)
def encode(data):
return json_encode(data)
def json_encode(data):
"""