Adding a new queue that handles new feeds, so they can be prioritized over updating old feeds.

This commit is contained in:
Samuel Clay 2010-09-20 19:22:19 -04:00
parent 0143fee6c8
commit 9d265e9db0
5 changed files with 47 additions and 20 deletions

View file

@ -3,6 +3,7 @@ from django.db import models
from django.contrib.auth.models import User
from apps.rss_feeds.models import Feed, DuplicateFeed
from apps.reader.models import UserSubscription, UserSubscriptionFolders
from apps.rss_feeds.tasks import NewFeeds
import datetime
import lxml.etree
from utils import json, urlnorm
@ -26,6 +27,13 @@ class Importer:
UserSubscriptionFolders.objects.filter(user=self.user).delete()
UserSubscription.objects.filter(user=self.user).delete()
def queue_new_feeds(self):
new_feeds = UserSubscription.objects.filter(user=self.user, feed__fetched_once=False).values('feed_id')
new_feeds = list(set([f['feed_id'] for f in new_feeds]))
logging.info(" ---> [%s] Queueing NewFeeds: (%s) %s" % (self.user, len(new_feeds), new_feeds))
size = 4
for t in (new_feeds[pos:pos + size] for pos in xrange(0, len(new_feeds), size)):
NewFeeds.apply_async(args=[t], queue="new_feeds")
class OPMLImporter(Importer):
@ -38,7 +46,7 @@ class OPMLImporter(Importer):
self.clear_feeds()
folders = self.process_outline(outline)
UserSubscriptionFolders.objects.create(user=self.user, folders=json.encode(folders))
self.queue_new_feeds()
return folders
def process_outline(self, outline):
@ -106,6 +114,7 @@ class GoogleReaderImporter(Importer):
logging.info(" ---> [%s] Google Reader import: %s" % (self.user, self.subscription_folders))
UserSubscriptionFolders.objects.create(user=self.user,
folders=json.encode(self.subscription_folders))
self.queue_new_feeds()
def parse(self):
self.feeds = lxml.etree.fromstring(self.feeds_xml).xpath('/object/list/object')

View file

@ -2,7 +2,7 @@ from django.core.management.base import BaseCommand
from django.conf import settings
from apps.rss_feeds.models import Feed
from optparse import make_option
from apps.rss_feeds.tasks import RefreshFeed
from apps.rss_feeds.tasks import UpdateFeeds
import datetime
@ -40,9 +40,9 @@ class Command(BaseCommand):
if i == 12:
print feed_queue
RefreshFeed.apply_async(args=(feed_queue,))
UpdateFeeds.apply_async(args=(feed_queue,))
feed_queue = []
i = 0
if feed_queue:
print feed_queue
RefreshFeed.apply_async(args=(feed_queue,))
UpdateFeeds.apply_async(args=(feed_queue,))

View file

@ -1,8 +1,8 @@
from celery.task import Task
from apps.rss_feeds.models import Feed
class RefreshFeed(Task):
name = 'refresh-feed'
class UpdateFeeds(Task):
name = 'update-feeds'
max_retries = 0
ignore_result = True
@ -14,3 +14,5 @@ class RefreshFeed(Task):
feed = Feed.objects.get(pk=feed_pk)
feed.update()
class NewFeeds(UpdateFeeds):
name = 'new-feeds'

View file

@ -1,13 +0,0 @@
BROKER_HOST = "db01.newsblur.com"
BROKER_PORT = 5672
BROKER_USER = "newsblur"
BROKER_PASSWORD = "newsblur"
BROKER_VHOST = "newsblurvhost"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("apps.rss_feeds.tasks", )
CELERYD_CONCURRENCY = 1
CELERY_IGNORE_RESULT = True
CELERYD_MAX_TASKS_PER_CHILD = 100
CELERY_DISABLE_RATE_LIMITS = True

View file

@ -223,9 +223,38 @@ DEVSERVER_MODULES = (
# = Celery =
# ==========
from celeryconfig import *
import djcelery
djcelery.setup_loader()
CELERY_ROUTES = {
"apps.rss_feeds.tasks.NewFeeds": {
"queue": "new_feeds"
},
"apps.rss_feeds.tasks.UpdateFeeds": {
"queue": "update_feeds"
},
}
CELERY_QUEUES = {
"new_feeds": {
"binding_key": "new_feeds"
},
"update_feeds": {
"binding_key": "update_feeds"
},
}
CELERY_DEFAULT_QUEUE = "update_feeds"
BROKER_HOST = "db01.newsblur.com"
BROKER_PORT = 5672
BROKER_USER = "newsblur"
BROKER_PASSWORD = "newsblur"
BROKER_VHOST = "newsblurvhost"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("apps.rss_feeds.tasks", )
CELERYD_CONCURRENCY = 4
CELERY_IGNORE_RESULT = True
CELERYD_MAX_TASKS_PER_CHILD = 100
CELERY_DISABLE_RATE_LIMITS = True
# ==================
# = Configurations =