diff --git a/apps/feed_import/models.py b/apps/feed_import/models.py index c57a6f460..a6093515b 100644 --- a/apps/feed_import/models.py +++ b/apps/feed_import/models.py @@ -3,6 +3,7 @@ from django.db import models from django.contrib.auth.models import User from apps.rss_feeds.models import Feed, DuplicateFeed from apps.reader.models import UserSubscription, UserSubscriptionFolders +from apps.rss_feeds.tasks import NewFeeds import datetime import lxml.etree from utils import json, urlnorm @@ -26,6 +27,13 @@ class Importer: UserSubscriptionFolders.objects.filter(user=self.user).delete() UserSubscription.objects.filter(user=self.user).delete() + def queue_new_feeds(self): + new_feeds = UserSubscription.objects.filter(user=self.user, feed__fetched_once=False).values('feed_id') + new_feeds = list(set([f['feed_id'] for f in new_feeds])) + logging.info(" ---> [%s] Queueing NewFeeds: (%s) %s" % (self.user, len(new_feeds), new_feeds)) + size = 4 + for t in (new_feeds[pos:pos + size] for pos in xrange(0, len(new_feeds), size)): + NewFeeds.apply_async(args=[t], queue="new_feeds") class OPMLImporter(Importer): @@ -38,7 +46,7 @@ class OPMLImporter(Importer): self.clear_feeds() folders = self.process_outline(outline) UserSubscriptionFolders.objects.create(user=self.user, folders=json.encode(folders)) - + self.queue_new_feeds() return folders def process_outline(self, outline): @@ -106,6 +114,7 @@ class GoogleReaderImporter(Importer): logging.info(" ---> [%s] Google Reader import: %s" % (self.user, self.subscription_folders)) UserSubscriptionFolders.objects.create(user=self.user, folders=json.encode(self.subscription_folders)) + self.queue_new_feeds() def parse(self): self.feeds = lxml.etree.fromstring(self.feeds_xml).xpath('/object/list/object') diff --git a/apps/rss_feeds/management/commands/task_feeds.py b/apps/rss_feeds/management/commands/task_feeds.py index 9691dcf37..b94eab7c6 100644 --- a/apps/rss_feeds/management/commands/task_feeds.py +++ b/apps/rss_feeds/management/commands/task_feeds.py @@ -2,7 +2,7 @@ from django.core.management.base import BaseCommand from django.conf import settings from apps.rss_feeds.models import Feed from optparse import make_option -from apps.rss_feeds.tasks import RefreshFeed +from apps.rss_feeds.tasks import UpdateFeeds import datetime @@ -40,9 +40,9 @@ class Command(BaseCommand): if i == 12: print feed_queue - RefreshFeed.apply_async(args=(feed_queue,)) + UpdateFeeds.apply_async(args=(feed_queue,)) feed_queue = [] i = 0 if feed_queue: print feed_queue - RefreshFeed.apply_async(args=(feed_queue,)) \ No newline at end of file + UpdateFeeds.apply_async(args=(feed_queue,)) \ No newline at end of file diff --git a/apps/rss_feeds/tasks.py b/apps/rss_feeds/tasks.py index fa2490bf9..8621a71c4 100644 --- a/apps/rss_feeds/tasks.py +++ b/apps/rss_feeds/tasks.py @@ -1,8 +1,8 @@ from celery.task import Task from apps.rss_feeds.models import Feed -class RefreshFeed(Task): - name = 'refresh-feed' +class UpdateFeeds(Task): + name = 'update-feeds' max_retries = 0 ignore_result = True @@ -14,3 +14,5 @@ class RefreshFeed(Task): feed = Feed.objects.get(pk=feed_pk) feed.update() +class NewFeeds(UpdateFeeds): + name = 'new-feeds' diff --git a/celeryconfig.py b/celeryconfig.py deleted file mode 100644 index 5c24eefd0..000000000 --- a/celeryconfig.py +++ /dev/null @@ -1,13 +0,0 @@ -BROKER_HOST = "db01.newsblur.com" -BROKER_PORT = 5672 -BROKER_USER = "newsblur" -BROKER_PASSWORD = "newsblur" -BROKER_VHOST = "newsblurvhost" - -CELERY_RESULT_BACKEND = "amqp" - -CELERY_IMPORTS = ("apps.rss_feeds.tasks", ) -CELERYD_CONCURRENCY = 1 -CELERY_IGNORE_RESULT = True -CELERYD_MAX_TASKS_PER_CHILD = 100 -CELERY_DISABLE_RATE_LIMITS = True \ No newline at end of file diff --git a/settings.py b/settings.py index 6afed2872..e941d28a7 100644 --- a/settings.py +++ b/settings.py @@ -223,9 +223,38 @@ DEVSERVER_MODULES = ( # = Celery = # ========== -from celeryconfig import * import djcelery djcelery.setup_loader() +CELERY_ROUTES = { + "apps.rss_feeds.tasks.NewFeeds": { + "queue": "new_feeds" + }, + "apps.rss_feeds.tasks.UpdateFeeds": { + "queue": "update_feeds" + }, +} +CELERY_QUEUES = { + "new_feeds": { + "binding_key": "new_feeds" + }, + "update_feeds": { + "binding_key": "update_feeds" + }, +} +CELERY_DEFAULT_QUEUE = "update_feeds" +BROKER_HOST = "db01.newsblur.com" +BROKER_PORT = 5672 +BROKER_USER = "newsblur" +BROKER_PASSWORD = "newsblur" +BROKER_VHOST = "newsblurvhost" + +CELERY_RESULT_BACKEND = "amqp" + +CELERY_IMPORTS = ("apps.rss_feeds.tasks", ) +CELERYD_CONCURRENCY = 4 +CELERY_IGNORE_RESULT = True +CELERYD_MAX_TASKS_PER_CHILD = 100 +CELERY_DISABLE_RATE_LIMITS = True # ================== # = Configurations =