Abstracting feed queueing so I can queue a queryset of feeds from the command line. Also adding in queueing of feeds that have not been updated in a week (for those that fall through the cracks.

This commit is contained in:
Samuel Clay 2010-12-23 12:32:24 -05:00
parent 1808dcf332
commit b98c63dd14
3 changed files with 33 additions and 27 deletions

View file

@ -1,48 +1,34 @@
from django.core.management.base import BaseCommand
from django.conf import settings
from apps.rss_feeds.models import Feed
from optparse import make_option
from apps.rss_feeds.tasks import UpdateFeeds
from celery.task import Task
import datetime
class Command(BaseCommand):
option_list = BaseCommand.option_list + (
make_option("-f", "--feed", default=None),
make_option("-F", "--force", dest="force", action="store_true"),
make_option('-V', '--verbose', action='store_true',
dest='verbose', default=False, help='Verbose output.'),
)
def handle(self, *args, **options):
from apps.rss_feeds.models import Feed
settings.LOG_TO_STREAM = True
now = datetime.datetime.utcnow()
# Active feeds
feeds = Feed.objects.filter(
next_scheduled_update__lte=now,
next_scheduled_update__lte=now,
active=True
).exclude(
active_subscribers=0
).order_by('?')
Feed.task_feeds(feeds)
if options['force']:
feeds = Feed.objects.all().order_by('pk')
print " ---> Tasking %s feeds..." % feeds.count()
publisher = Task.get_publisher()
feed_queue = []
size = 12
for f in feeds:
f.queued_date = datetime.datetime.utcnow()
f.set_next_scheduled_update()
for feed_queue in (feeds[pos:pos + size] for pos in xrange(0, len(feeds), size)):
print feed_queue
feed_ids = [feed.pk for feed in feed_queue]
print feed_ids
UpdateFeeds.apply_async(args=(feed_ids,), queue='update_feeds', publisher=publisher)
publisher.connection.close()
# Mistakenly inactive feeds
week = datetime.datetime.now() - datetime.timedelta(days=7)
feeds = Feed.objects.filter(
last_update__lte=week,
active_subscribers__gte=1
).order_by('?')
if feeds: Feed.task_feeds(feeds)

View file

@ -16,6 +16,8 @@ from django.db import IntegrityError
from django.core.cache import cache
from django.conf import settings
from mongoengine.queryset import OperationError
from apps.rss_feeds.tasks import UpdateFeeds
from celery.task import Task
from utils import json_functions as json
from utils import feedfinder
from utils.feed_functions import levenshtein_distance
@ -85,6 +87,23 @@ class Feed(models.Model):
# Feed has been deleted. Just ignore it.
pass
@classmethod
def task_feeds(cls, feeds, queue_size=12):
print " ---> Tasking %s feeds..." % feeds.count()
publisher = Task.get_publisher()
feed_queue = []
for f in feeds:
f.queued_date = datetime.datetime.utcnow()
f.set_next_scheduled_update()
for feed_queue in (feeds[pos:pos + queue_size] for pos in xrange(0, len(feeds), queue_size)):
feed_ids = [feed.pk for feed in feed_queue]
UpdateFeeds.apply_async(args=(feed_ids,), queue='update_feeds', publisher=publisher)
publisher.connection.close()
def update_all_statistics(self):
self.count_subscribers()
self.count_stories()
@ -353,7 +372,7 @@ class Feed(models.Model):
s.save()
ret_values[ENTRY_NEW] += 1
cache.set('updated_feed:%s' % self.id, 1)
except (IntegrityError, OperationError), e:
except (IntegrityError, OperationError):
ret_values[ENTRY_ERR] += 1
# logging.info('Saving new story, IntegrityError: %s - %s: %s' % (self.feed_title, story.get('title'), e))
elif existing_story and story_has_changed:

View file

@ -1,5 +1,4 @@
from celery.task import Task
from apps.rss_feeds.models import Feed
# from utils import log as logging
class UpdateFeeds(Task):
@ -8,6 +7,7 @@ class UpdateFeeds(Task):
ignore_result = True
def run(self, feed_pks, **kwargs):
from apps.rss_feeds.models import Feed
if not isinstance(feed_pks, list):
feed_pks = [feed_pks]
@ -22,6 +22,7 @@ class NewFeeds(Task):
ignore_result = True
def run(self, feed_pks, **kwargs):
from apps.rss_feeds.models import Feed
if not isinstance(feed_pks, list):
feed_pks = [feed_pks]