Updating celery calls in prep for upgrade to celery 3.0.

This commit is contained in:
Samuel Clay 2012-07-12 23:58:29 -07:00
parent 3d8f206d60
commit 803278bb08
2 changed files with 3 additions and 15 deletions

View file

@ -11,7 +11,6 @@ from django.core.mail import mail_admins
from django.core.mail import EmailMultiAlternatives
from django.core.urlresolvers import reverse
from django.template.loader import render_to_string
from celery.task import Task
from apps.reader.models import UserSubscription
from apps.rss_feeds.models import Feed
from apps.rss_feeds.tasks import NewFeeds
@ -82,10 +81,8 @@ class Profile(models.Model):
new_feeds = list(set([f['feed_id'] for f in new_feeds]))
logging.user(self.user, "~BB~FW~SBQueueing NewFeeds: ~FC(%s) %s" % (len(new_feeds), new_feeds))
size = 4
publisher = Task.get_publisher(exchange="new_feeds")
for t in (new_feeds[pos:pos + size] for pos in xrange(0, len(new_feeds), size)):
NewFeeds.apply_async(args=(t,), queue="new_feeds", publisher=publisher)
publisher.connection.close()
NewFeeds.apply_async(args=(t,), queue="new_feeds")
def refresh_stale_feeds(self, exclude_new=False):
stale_cutoff = datetime.datetime.now() - datetime.timedelta(days=7)

View file

@ -18,7 +18,6 @@ from django.contrib.sites.models import Site
from mongoengine.queryset import OperationError
from mongoengine.base import ValidationError
from apps.rss_feeds.tasks import UpdateFeeds, PushFeeds
from celery.task import Task
from utils import json_functions as json
from utils import feedfinder, feedparser
from utils import urlnorm
@ -267,8 +266,6 @@ class Feed(models.Model):
def task_feeds(cls, feeds, queue_size=12):
logging.debug(" ---> Tasking %s feeds..." % feeds.count())
publisher = Task.get_publisher()
feed_queue = []
for f in feeds:
f.queued_date = datetime.datetime.utcnow()
@ -276,9 +273,7 @@ class Feed(models.Model):
for feed_queue in (feeds[pos:pos + queue_size] for pos in xrange(0, len(feeds), queue_size)):
feed_ids = [feed.pk for feed in feed_queue]
UpdateFeeds.apply_async(args=(feed_ids,), queue='update_feeds', publisher=publisher)
publisher.connection.close()
UpdateFeeds.apply_async(args=(feed_ids,), queue='update_feeds')
def update_all_statistics(self, full=True, force=False):
self.count_subscribers()
@ -1141,14 +1136,10 @@ class Feed(models.Model):
def queue_pushed_feed_xml(self, xml):
logging.debug(' ---> [%-30s] [%s] ~FBQueuing pushed stories...' % (unicode(self)[:30], self.pk))
publisher = Task.get_publisher()
self.queued_date = datetime.datetime.utcnow()
self.set_next_scheduled_update()
PushFeeds.apply_async(args=(self.pk, xml), queue='push_feeds', publisher=publisher)
publisher.connection.close()
PushFeeds.apply_async(args=(self.pk, xml), queue='push_feeds')
# def calculate_collocations_story_content(self,
# collocation_measures=TrigramAssocMeasures,