From d722e9c4ebd83e94e2dd649f4b3fb79dd0c23ac8 Mon Sep 17 00:00:00 2001 From: Samuel Clay Date: Mon, 16 Jul 2012 23:40:17 -0700 Subject: [PATCH] Moving cronjob tasks over to celerybeat. Starting with task_feeds, collect_feedback, collect_stats, and mark_read for homepage freshening. --- apps/reader/tasks.py | 43 +++++++++++++++++++++++++ apps/rss_feeds/tasks.py | 39 +++++++++++++++++++++++ config/supervisor_celerybeat.conf | 14 +++++++++ fabfile.py | 9 ++++-- settings.py | 52 ++++++++++++++++++++++++++----- 5 files changed, 147 insertions(+), 10 deletions(-) create mode 100644 apps/reader/tasks.py create mode 100644 config/supervisor_celerybeat.conf diff --git a/apps/reader/tasks.py b/apps/reader/tasks.py new file mode 100644 index 000000000..2d30e5492 --- /dev/null +++ b/apps/reader/tasks.py @@ -0,0 +1,43 @@ +import datetime +from celery.task import Task +from utils import log as logging +from django.contrib.auth.models import User +from apps.reader.models import UserSubscription +from apps.statistics.models import MStatistics +from apps.statistics.models import MFeedback + + +class FreshenHomepage(Task): + name = 'freshen-homepage' + + def run(self, **kwargs): + day_ago = datetime.datetime.utcnow() - datetime.timedelta(days=1) + users = ['conesus', 'popular'] + + for username in users: + user = User.objects.get(username=username) + user.profile.last_seen_on = datetime.datetime.utcnow() + user.profile.save() + + feeds = UserSubscription.objects.filter(user=user) + logging.debug(" Marking %s feeds day old read." % feeds.count()) + for sub in feeds: + sub.mark_read_date = day_ago + sub.needs_unread_recalc = True + sub.save() + + +class CollectStats(Task): + name = 'collect-stats' + + def run(self, **kwargs): + MStatistics.collect_statistics() + MStatistics.delete_old_stats() + + +class CollectFeedback(Task): + name = 'collect-feedback' + + def run(self, **kwargs): + MFeedback.collect_feedback() + \ No newline at end of file diff --git a/apps/rss_feeds/tasks.py b/apps/rss_feeds/tasks.py index 6a425273f..907a3a15a 100644 --- a/apps/rss_feeds/tasks.py +++ b/apps/rss_feeds/tasks.py @@ -1,7 +1,46 @@ +import datetime from celery.task import Task from utils import log as logging from django.conf import settings +class TaskFeeds(Task): + name = 'task-feeds' + + def run(self, **kwargs): + from apps.rss_feeds.models import Feed + settings.LOG_TO_STREAM = True + now = datetime.datetime.utcnow() + + # Active feeds + feeds = Feed.objects.filter( + next_scheduled_update__lte=now, + active=True + ).exclude( + active_subscribers=0 + ).order_by('?') + Feed.task_feeds(feeds) + + # Mistakenly inactive feeds + day = now - datetime.timedelta(days=1) + feeds = Feed.objects.filter( + last_update__lte=day, + queued_date__lte=day, + min_to_decay__lte=60*24, + active_subscribers__gte=1, + active=True + ).order_by('?') + if feeds: Feed.task_feeds(feeds) + + week = now - datetime.timedelta(days=7) + feeds = Feed.objects.filter( + last_update__lte=week, + queued_date__lte=day, + active_subscribers__gte=1, + active=True + ).order_by('?') + if feeds: Feed.task_feeds(feeds) + + class UpdateFeeds(Task): name = 'update-feeds' max_retries = 0 diff --git a/config/supervisor_celerybeat.conf b/config/supervisor_celerybeat.conf new file mode 100644 index 000000000..9d7947232 --- /dev/null +++ b/config/supervisor_celerybeat.conf @@ -0,0 +1,14 @@ +[program:celerybeat] +command=/home/sclay/newsblur/manage.py celerybeat --schedule=/home/sclay/newsblur/data/celerybeat-schedule.db --loglevel=INFO +directory=/home/sclay/newsblur +user=sclay +numprocs=1 +stdout_logfile=/var/log/celerybeat.log +stderr_logfile=/var/log/celerybeat.log +autostart=true +autorestart=true +startsecs=10 + +; if rabbitmq is supervised, set its priority higher +; so it starts first +priority=998 \ No newline at end of file diff --git a/fabfile.py b/fabfile.py index 8acce93ad..7acc66cf8 100644 --- a/fabfile.py +++ b/fabfile.py @@ -243,7 +243,7 @@ def setup_common(): config_pgbouncer() # setup_mongoengine() # setup_forked_mongoengine() - setup_pymongo_repo() + # setup_pymongo_repo() setup_logrotate() setup_nginx() configure_nginx() @@ -362,7 +362,7 @@ def setup_psycopg(): def setup_python(): # sudo('easy_install -U pip') - sudo('easy_install -U fabric django==1.3.1 readline pyflakes iconv celery django-celery django-celery-with-redis django-compress South django-extensions pymongo stripe BeautifulSoup pyyaml nltk lxml oauth2 pytz boto seacucumber django_ses mongoengine redis requests') + sudo('easy_install -U fabric django==1.3.1 readline pyflakes iconv celery django-celery django-celery-with-redis django-compress South django-extensions pymongo==2.2.0 stripe BeautifulSoup pyyaml nltk lxml oauth2 pytz boto seacucumber django_ses mongoengine redis requests') put('config/pystartup.py', '.pystartup') with cd(os.path.join(env.NEWSBLUR_PATH, 'vendor/cjson')): @@ -637,6 +637,11 @@ def setup_task_motd(): def enable_celery_supervisor(): put('config/supervisor_celeryd.conf', '/etc/supervisor/conf.d/celeryd.conf', use_sudo=True) +def enable_celerybeat(): + with cd(env.NEWSBLUR_PATH): + run('mkdir -p data') + put('config/supervisor_celerybeat.conf', '/etc/supervisor/conf.d/celerybeat.conf', use_sudo=True) + def copy_task_settings(): with settings(warn_only=True): put('config/settings/task_settings.py', '%s/local_settings.py' % env.NEWSBLUR_PATH) diff --git a/settings.py b/settings.py index 78557d0a8..378747e50 100644 --- a/settings.py +++ b/settings.py @@ -1,6 +1,7 @@ import sys import logging import os +import datetime from mongoengine import connect import redis from utils import jammit @@ -21,7 +22,7 @@ NEWSBLUR_URL = 'http://www.newsblur.com' # = Directory Declaractions = # =========================== -CURRENT_DIR = os.path.dirname(__file__) +CURRENT_DIR = os.getcwd() NEWSBLUR_DIR = CURRENT_DIR TEMPLATE_DIRS = (os.path.join(CURRENT_DIR, 'templates'),) MEDIA_ROOT = os.path.join(CURRENT_DIR, 'media') @@ -37,9 +38,10 @@ IMAGE_MASK = os.path.join(CURRENT_DIR, 'media/img/mask.png') if '/utils' not in ' '.join(sys.path): sys.path.append(UTILS_ROOT) + if '/vendor' not in ' '.join(sys.path): sys.path.append(VENDOR_ROOT) - + # =================== # = Global Settings = # =================== @@ -238,7 +240,7 @@ if not DEVELOPMENT: INSTALLED_APPS += ( 'gunicorn', ) - + # ========== # = Stripe = # ========== @@ -270,6 +272,10 @@ CELERY_ROUTES = { "queue": "update_feeds", "binding_key": "update_feeds" }, + "beat-tasks": { + "queue": "beat_tasks", + "binding_key": "beat_tasks" + }, } CELERY_QUEUES = { "work_queue": { @@ -292,6 +298,11 @@ CELERY_QUEUES = { "exchange_type": "direct", "binding_key": "update_feeds" }, + "beat_tasks": { + "exchange": "beat_tasks", + "exchange_type": "direct", + "binding_key": "beat_tasks" + }, } CELERY_DEFAULT_QUEUE = "work_queue" BROKER_BACKEND = "redis" @@ -299,7 +310,7 @@ BROKER_URL = "redis://db01:6379/0" CELERY_REDIS_HOST = "db01" CELERYD_PREFETCH_MULTIPLIER = 1 -CELERY_IMPORTS = ("apps.rss_feeds.tasks", "apps.social.tasks", ) +CELERY_IMPORTS = ("apps.rss_feeds.tasks", "apps.social.tasks", "apps.reader.tasks",) CELERYD_CONCURRENCY = 4 CELERY_IGNORE_RESULT = True CELERY_ACKS_LATE = True # Retry if task fails @@ -307,28 +318,51 @@ CELERYD_MAX_TASKS_PER_CHILD = 10 CELERYD_TASK_TIME_LIMIT = 12 * 30 CELERY_DISABLE_RATE_LIMITS = True +CELERYBEAT_SCHEDULE = { + 'freshen-homepage': { + 'task': 'freshen-homepage', + 'schedule': datetime.timedelta(hours=1), + 'options': {'queue': 'beat_tasks'}, + }, + 'task-feeds': { + 'task': 'task-feeds', + 'schedule': datetime.timedelta(minutes=1), + 'options': {'queue': 'beat_tasks'}, + }, + 'collect-stats': { + 'task': 'collect-stats', + 'schedule': datetime.timedelta(minutes=1), + 'options': {'queue': 'beat_tasks'}, + }, + 'collect-feedback': { + 'task': 'collect-feedback', + 'schedule': datetime.timedelta(minutes=1), + 'options': {'queue': 'beat_tasks'}, + }, +} + # ==================== # = Database Routers = # ==================== class MasterSlaveRouter(object): """A router that sets up a simple master/slave configuration""" - + def db_for_read(self, model, **hints): "Point all read operations to a random slave" return 'slave' - + def db_for_write(self, model, **hints): "Point all write operations to the master" return 'default' - + def allow_relation(self, obj1, obj2, **hints): "Allow any relation between two objects in the db pool" db_list = ('slave','default') if obj1._state.db in db_list and obj2._state.db in db_list: return True return None - + def allow_syncdb(self, db, model): "Explicitly put all models on all databases." return True @@ -357,6 +391,7 @@ try: from gunicorn_conf import * except ImportError, e: pass + from local_settings import * COMPRESS = not DEBUG @@ -397,3 +432,4 @@ JAMMIT = jammit.JammitAssets(NEWSBLUR_DIR) if DEBUG: MIDDLEWARE_CLASSES += ('utils.mongo_raw_log_middleware.SqldumpMiddleware',) +