Merge branch 'master' into circular

* master:
  Adding tasked_feeds set to handle dropped feeds.
  Fixing missing feed bug.
  Adding new task server role: only new and push feeds.
  Fixing munin counts.
This commit is contained in:
Samuel Clay 2013-04-03 17:26:10 -07:00
commit 20698e9bc5
6 changed files with 67 additions and 34 deletions

View file

@ -329,6 +329,11 @@ class Feed(models.Model):
feeds = [f.pk for f in feeds]
r.srem('queued_feeds', *feeds)
now = datetime.datetime.now().strftime("%s")
p = r.pipeline()
for feed_id in feeds:
p.zadd('tasked_feeds', feed_id, now)
p.execute()
for feed_ids in (feeds[pos:pos + queue_size] for pos in xrange(0, len(feeds), queue_size)):
UpdateFeeds.apply_async(args=(feed_ids,), queue='update_feeds')
@ -1298,7 +1303,8 @@ class Feed(models.Model):
if not skip_scheduling and self.active_subscribers >= 1:
self.next_scheduled_update = next_scheduled_update
r.zadd('scheduled_updates', self.pk, self.next_scheduled_update.strftime('%s'))
r.zrem('tasked_feeds', self.pk)
self.save()

View file

@ -17,6 +17,7 @@ class TaskFeeds(Task):
now = datetime.datetime.utcnow()
start = time.time()
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
task_feeds_size = r.llen('update_feeds') * 12
hour_ago = now - datetime.timedelta(hours=1)
r.zremrangebyscore('fetched_feeds_last_hour', 0, int(hour_ago.strftime('%s')))
@ -31,20 +32,14 @@ class TaskFeeds(Task):
r.zcard('scheduled_updates')))
# Regular feeds
feeds = r.srandmember('queued_feeds', 1000)
Feed.task_feeds(feeds, verbose=True)
active_count = len(feeds)
if task_feeds_size < 1000:
feeds = r.srandmember('queued_feeds', 1000)
Feed.task_feeds(feeds, verbose=True)
active_count = len(feeds)
else:
active_count = 0
cp1 = time.time()
# Regular feeds
# feeds = Feed.objects.filter(
# next_scheduled_update__lte=now,
# active=True,
# active_subscribers__gte=1
# ).order_by('?')[:1250]
# active_count = feeds.count()
# cp1 = time.time()
# Force refresh feeds
refresh_feeds = Feed.objects.filter(
active=True,
@ -55,13 +50,15 @@ class TaskFeeds(Task):
cp2 = time.time()
# Mistakenly inactive feeds
day = now - datetime.timedelta(days=1)
inactive_feeds = Feed.objects.filter(
next_scheduled_update__lte=day,
min_to_decay__lte=60*24,
active_subscribers__gte=1
).order_by('?')[:100]
inactive_count = inactive_feeds.count()
hours_ago = (now - datetime.timedelta(hours=6)).strftime('%s')
old_tasked_feeds = r.zrangebyscore('tasked_feeds', 0, hours_ago)
inactive_count = len(old_tasked_feeds)
if tasked_feeds:
r.zremrangebyscore('tasked_feeds', 0, hours_ago)
r.sadd('queued_feeds', *old_tasked_feeds)
logging.debug(" ---> ~SN~FBRe-queuing ~SB%s~SN dropped feeds (~SB%s~SN queued)" % (
inactive_count,
r.scard('queued_feeds')))
cp3 = time.time()
old = now - datetime.timedelta(days=3)
@ -120,6 +117,8 @@ class UpdateFeeds(Task):
for feed_pk in feed_pks:
try:
feed = Feed.get_by_id(feed_pk)
if not feed:
raise Feed.DoesNotExist
feed.update(**options)
except Feed.DoesNotExist:
logging.info(" ---> Feed doesn't exist: [%s]" % feed_pk)

View file

@ -1,5 +1,4 @@
[program:celery]
# command=/srv/newsblur/manage.py celeryd --loglevel=INFO -Q new_feeds,work_queue,push_feeds,update_feeds
command=/srv/newsblur/manage.py celeryd --loglevel=INFO -Q new_feeds,push_feeds,update_feeds
directory=/srv/newsblur
user=sclay

View file

@ -0,0 +1,19 @@
[program:celery]
command=/srv/newsblur/manage.py celeryd --loglevel=INFO -Q new_feeds,push_feeds
directory=/srv/newsblur
user=sclay
numprocs=1
stdout_logfile=/var/log/celeryd.log
stderr_logfile=/var/log/celeryd.log
autostart=true
autorestart=true
startsecs=10
;process_name=%(program_name)s_%(process_num)03d
; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 60
; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998

29
fabfile.py vendored
View file

@ -201,9 +201,12 @@ def celery_fast():
@parallel
def celery_stop():
with cd(env.NEWSBLUR_PATH):
run('sudo supervisorctl stop celery')
sudo('supervisorctl stop celery')
with settings(warn_only=True):
run('./utils/kill_celery.sh')
if env.user == 'ubuntu':
sudo('./utils/kill_celery.sh')
else:
run('./utils/kill_celery.sh')
@parallel
def celery_start():
@ -340,14 +343,14 @@ def setup_db(engine=None, skip_common=False):
# if env.user == 'ubuntu':
# setup_db_mdadm()
def setup_task(skip_common=False):
def setup_task(queue=None, skip_common=False):
if not skip_common:
setup_common()
setup_vps()
setup_task_firewall()
setup_task_motd()
copy_task_settings()
enable_celery_supervisor()
enable_celery_supervisor(queue)
setup_gunicorn(supervisor=False)
update_gunicorn()
config_monit_task()
@ -444,7 +447,8 @@ def setup_python():
sudo('su -c \'echo "import sys; sys.setdefaultencoding(\\\\"utf-8\\\\")" > /usr/lib/python2.7/sitecustomize.py\'')
if env.user == 'ubuntu':
sudo('chown -R ubuntu.ubuntu /home/ubuntu/.python-eggs')
with settings(warn_only=True):
sudo('chown -R ubuntu.ubuntu /home/ubuntu/.python-eggs')
# PIL - Only if python-imaging didn't install through apt-get, like on Mac OS X.
def setup_imaging():
@ -935,16 +939,25 @@ def setup_task_firewall():
def setup_task_motd():
put('config/motd_task.txt', '/etc/motd.tail', use_sudo=True)
def enable_celery_supervisor():
put('config/supervisor_celeryd.conf', '/etc/supervisor/conf.d/celeryd.conf', use_sudo=True)
def enable_celery_supervisor(queue=None):
if not queue:
put('config/supervisor_celeryd.conf', '/etc/supervisor/conf.d/celeryd.conf', use_sudo=True)
else:
put('config/supervisor_celeryd_%s.conf' % queue, '/etc/supervisor/conf.d/celeryd.conf', use_sudo=True)
sudo('supervisorctl reread')
sudo('supervisorctl update')
@parallel
def copy_task_settings():
if env.host:
host = env.host.split('.', 2)[0]
else:
host = env.host_string.split('.', 2)[0]
with settings(warn_only=True):
put('../secrets-newsblur/settings/task_settings.py', '%s/local_settings.py' % env.NEWSBLUR_PATH)
run('echo "\nSERVER_NAME = \\\\"`hostname`\\\\"" >> %s/local_settings.py' % env.NEWSBLUR_PATH)
run('echo "\nSERVER_NAME = \\\\"%s\\\\"" >> %s/local_settings.py' % (host, env.NEWSBLUR_PATH))
# =========================
# = Setup - Digital Ocean =

View file

@ -21,16 +21,13 @@ class NBMuninGraph(MuninGraph):
def calculate_metrics(self):
import datetime
from apps.rss_feeds.models import Feed
from django.conf import settings
hour_ago = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL)
return {
'update_queue': r.llen("queued_feeds"),
'feeds_fetched': r.llen("fetched_feeds_last_hour"),
'update_queue': r.scard("queued_feeds"),
'feeds_fetched': r.zcard("fetched_feeds_last_hour"),
'celery_update_feeds': r.llen("update_feeds"),
'celery_new_feeds': r.llen("new_feeds"),
'celery_push_feeds': r.llen("push_feeds"),