Moving cronjob tasks over to celerybeat. Starting with task_feeds, collect_feedback, collect_stats, and mark_read for homepage freshening.

This commit is contained in:
Samuel Clay 2012-07-16 23:40:17 -07:00
parent 7be638a735
commit d722e9c4eb
5 changed files with 147 additions and 10 deletions

43
apps/reader/tasks.py Normal file
View file

@ -0,0 +1,43 @@
import datetime
from celery.task import Task
from utils import log as logging
from django.contrib.auth.models import User
from apps.reader.models import UserSubscription
from apps.statistics.models import MStatistics
from apps.statistics.models import MFeedback
class FreshenHomepage(Task):
name = 'freshen-homepage'
def run(self, **kwargs):
day_ago = datetime.datetime.utcnow() - datetime.timedelta(days=1)
users = ['conesus', 'popular']
for username in users:
user = User.objects.get(username=username)
user.profile.last_seen_on = datetime.datetime.utcnow()
user.profile.save()
feeds = UserSubscription.objects.filter(user=user)
logging.debug(" Marking %s feeds day old read." % feeds.count())
for sub in feeds:
sub.mark_read_date = day_ago
sub.needs_unread_recalc = True
sub.save()
class CollectStats(Task):
name = 'collect-stats'
def run(self, **kwargs):
MStatistics.collect_statistics()
MStatistics.delete_old_stats()
class CollectFeedback(Task):
name = 'collect-feedback'
def run(self, **kwargs):
MFeedback.collect_feedback()

View file

@ -1,7 +1,46 @@
import datetime
from celery.task import Task
from utils import log as logging
from django.conf import settings
class TaskFeeds(Task):
name = 'task-feeds'
def run(self, **kwargs):
from apps.rss_feeds.models import Feed
settings.LOG_TO_STREAM = True
now = datetime.datetime.utcnow()
# Active feeds
feeds = Feed.objects.filter(
next_scheduled_update__lte=now,
active=True
).exclude(
active_subscribers=0
).order_by('?')
Feed.task_feeds(feeds)
# Mistakenly inactive feeds
day = now - datetime.timedelta(days=1)
feeds = Feed.objects.filter(
last_update__lte=day,
queued_date__lte=day,
min_to_decay__lte=60*24,
active_subscribers__gte=1,
active=True
).order_by('?')
if feeds: Feed.task_feeds(feeds)
week = now - datetime.timedelta(days=7)
feeds = Feed.objects.filter(
last_update__lte=week,
queued_date__lte=day,
active_subscribers__gte=1,
active=True
).order_by('?')
if feeds: Feed.task_feeds(feeds)
class UpdateFeeds(Task):
name = 'update-feeds'
max_retries = 0

View file

@ -0,0 +1,14 @@
[program:celerybeat]
command=/home/sclay/newsblur/manage.py celerybeat --schedule=/home/sclay/newsblur/data/celerybeat-schedule.db --loglevel=INFO
directory=/home/sclay/newsblur
user=sclay
numprocs=1
stdout_logfile=/var/log/celerybeat.log
stderr_logfile=/var/log/celerybeat.log
autostart=true
autorestart=true
startsecs=10
; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998

9
fabfile.py vendored
View file

@ -243,7 +243,7 @@ def setup_common():
config_pgbouncer()
# setup_mongoengine()
# setup_forked_mongoengine()
setup_pymongo_repo()
# setup_pymongo_repo()
setup_logrotate()
setup_nginx()
configure_nginx()
@ -362,7 +362,7 @@ def setup_psycopg():
def setup_python():
# sudo('easy_install -U pip')
sudo('easy_install -U fabric django==1.3.1 readline pyflakes iconv celery django-celery django-celery-with-redis django-compress South django-extensions pymongo stripe BeautifulSoup pyyaml nltk lxml oauth2 pytz boto seacucumber django_ses mongoengine redis requests')
sudo('easy_install -U fabric django==1.3.1 readline pyflakes iconv celery django-celery django-celery-with-redis django-compress South django-extensions pymongo==2.2.0 stripe BeautifulSoup pyyaml nltk lxml oauth2 pytz boto seacucumber django_ses mongoengine redis requests')
put('config/pystartup.py', '.pystartup')
with cd(os.path.join(env.NEWSBLUR_PATH, 'vendor/cjson')):
@ -637,6 +637,11 @@ def setup_task_motd():
def enable_celery_supervisor():
put('config/supervisor_celeryd.conf', '/etc/supervisor/conf.d/celeryd.conf', use_sudo=True)
def enable_celerybeat():
with cd(env.NEWSBLUR_PATH):
run('mkdir -p data')
put('config/supervisor_celerybeat.conf', '/etc/supervisor/conf.d/celerybeat.conf', use_sudo=True)
def copy_task_settings():
with settings(warn_only=True):
put('config/settings/task_settings.py', '%s/local_settings.py' % env.NEWSBLUR_PATH)

View file

@ -1,6 +1,7 @@
import sys
import logging
import os
import datetime
from mongoengine import connect
import redis
from utils import jammit
@ -21,7 +22,7 @@ NEWSBLUR_URL = 'http://www.newsblur.com'
# = Directory Declaractions =
# ===========================
CURRENT_DIR = os.path.dirname(__file__)
CURRENT_DIR = os.getcwd()
NEWSBLUR_DIR = CURRENT_DIR
TEMPLATE_DIRS = (os.path.join(CURRENT_DIR, 'templates'),)
MEDIA_ROOT = os.path.join(CURRENT_DIR, 'media')
@ -37,9 +38,10 @@ IMAGE_MASK = os.path.join(CURRENT_DIR, 'media/img/mask.png')
if '/utils' not in ' '.join(sys.path):
sys.path.append(UTILS_ROOT)
if '/vendor' not in ' '.join(sys.path):
sys.path.append(VENDOR_ROOT)
# ===================
# = Global Settings =
# ===================
@ -238,7 +240,7 @@ if not DEVELOPMENT:
INSTALLED_APPS += (
'gunicorn',
)
# ==========
# = Stripe =
# ==========
@ -270,6 +272,10 @@ CELERY_ROUTES = {
"queue": "update_feeds",
"binding_key": "update_feeds"
},
"beat-tasks": {
"queue": "beat_tasks",
"binding_key": "beat_tasks"
},
}
CELERY_QUEUES = {
"work_queue": {
@ -292,6 +298,11 @@ CELERY_QUEUES = {
"exchange_type": "direct",
"binding_key": "update_feeds"
},
"beat_tasks": {
"exchange": "beat_tasks",
"exchange_type": "direct",
"binding_key": "beat_tasks"
},
}
CELERY_DEFAULT_QUEUE = "work_queue"
BROKER_BACKEND = "redis"
@ -299,7 +310,7 @@ BROKER_URL = "redis://db01:6379/0"
CELERY_REDIS_HOST = "db01"
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_IMPORTS = ("apps.rss_feeds.tasks", "apps.social.tasks", )
CELERY_IMPORTS = ("apps.rss_feeds.tasks", "apps.social.tasks", "apps.reader.tasks",)
CELERYD_CONCURRENCY = 4
CELERY_IGNORE_RESULT = True
CELERY_ACKS_LATE = True # Retry if task fails
@ -307,28 +318,51 @@ CELERYD_MAX_TASKS_PER_CHILD = 10
CELERYD_TASK_TIME_LIMIT = 12 * 30
CELERY_DISABLE_RATE_LIMITS = True
CELERYBEAT_SCHEDULE = {
'freshen-homepage': {
'task': 'freshen-homepage',
'schedule': datetime.timedelta(hours=1),
'options': {'queue': 'beat_tasks'},
},
'task-feeds': {
'task': 'task-feeds',
'schedule': datetime.timedelta(minutes=1),
'options': {'queue': 'beat_tasks'},
},
'collect-stats': {
'task': 'collect-stats',
'schedule': datetime.timedelta(minutes=1),
'options': {'queue': 'beat_tasks'},
},
'collect-feedback': {
'task': 'collect-feedback',
'schedule': datetime.timedelta(minutes=1),
'options': {'queue': 'beat_tasks'},
},
}
# ====================
# = Database Routers =
# ====================
class MasterSlaveRouter(object):
"""A router that sets up a simple master/slave configuration"""
def db_for_read(self, model, **hints):
"Point all read operations to a random slave"
return 'slave'
def db_for_write(self, model, **hints):
"Point all write operations to the master"
return 'default'
def allow_relation(self, obj1, obj2, **hints):
"Allow any relation between two objects in the db pool"
db_list = ('slave','default')
if obj1._state.db in db_list and obj2._state.db in db_list:
return True
return None
def allow_syncdb(self, db, model):
"Explicitly put all models on all databases."
return True
@ -357,6 +391,7 @@ try:
from gunicorn_conf import *
except ImportError, e:
pass
from local_settings import *
COMPRESS = not DEBUG
@ -397,3 +432,4 @@ JAMMIT = jammit.JammitAssets(NEWSBLUR_DIR)
if DEBUG:
MIDDLEWARE_CLASSES += ('utils.mongo_raw_log_middleware.SqldumpMiddleware',)