diff --git a/apps/rss_feeds/models.py b/apps/rss_feeds/models.py index 0b6c034f0..75903fdac 100644 --- a/apps/rss_feeds/models.py +++ b/apps/rss_feeds/models.py @@ -329,6 +329,11 @@ class Feed(models.Model): feeds = [f.pk for f in feeds] r.srem('queued_feeds', *feeds) + now = datetime.datetime.now().strftime("%s") + p = r.pipeline() + for feed_id in feeds: + p.zadd('tasked_feeds', feed_id, now) + p.execute() for feed_ids in (feeds[pos:pos + queue_size] for pos in xrange(0, len(feeds), queue_size)): UpdateFeeds.apply_async(args=(feed_ids,), queue='update_feeds') @@ -1298,7 +1303,8 @@ class Feed(models.Model): if not skip_scheduling and self.active_subscribers >= 1: self.next_scheduled_update = next_scheduled_update r.zadd('scheduled_updates', self.pk, self.next_scheduled_update.strftime('%s')) - + r.zrem('tasked_feeds', self.pk) + self.save() diff --git a/apps/rss_feeds/tasks.py b/apps/rss_feeds/tasks.py index 5b027b389..6bf0a8e39 100644 --- a/apps/rss_feeds/tasks.py +++ b/apps/rss_feeds/tasks.py @@ -17,6 +17,7 @@ class TaskFeeds(Task): now = datetime.datetime.utcnow() start = time.time() r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL) + task_feeds_size = r.llen('update_feeds') * 12 hour_ago = now - datetime.timedelta(hours=1) r.zremrangebyscore('fetched_feeds_last_hour', 0, int(hour_ago.strftime('%s'))) @@ -31,20 +32,14 @@ class TaskFeeds(Task): r.zcard('scheduled_updates'))) # Regular feeds - feeds = r.srandmember('queued_feeds', 1000) - Feed.task_feeds(feeds, verbose=True) - active_count = len(feeds) + if task_feeds_size < 1000: + feeds = r.srandmember('queued_feeds', 1000) + Feed.task_feeds(feeds, verbose=True) + active_count = len(feeds) + else: + active_count = 0 cp1 = time.time() - # Regular feeds - # feeds = Feed.objects.filter( - # next_scheduled_update__lte=now, - # active=True, - # active_subscribers__gte=1 - # ).order_by('?')[:1250] - # active_count = feeds.count() - # cp1 = time.time() - # Force refresh feeds refresh_feeds = Feed.objects.filter( active=True, @@ -55,13 +50,15 @@ class TaskFeeds(Task): cp2 = time.time() # Mistakenly inactive feeds - day = now - datetime.timedelta(days=1) - inactive_feeds = Feed.objects.filter( - next_scheduled_update__lte=day, - min_to_decay__lte=60*24, - active_subscribers__gte=1 - ).order_by('?')[:100] - inactive_count = inactive_feeds.count() + hours_ago = (now - datetime.timedelta(hours=6)).strftime('%s') + old_tasked_feeds = r.zrangebyscore('tasked_feeds', 0, hours_ago) + inactive_count = len(old_tasked_feeds) + if tasked_feeds: + r.zremrangebyscore('tasked_feeds', 0, hours_ago) + r.sadd('queued_feeds', *old_tasked_feeds) + logging.debug(" ---> ~SN~FBRe-queuing ~SB%s~SN dropped feeds (~SB%s~SN queued)" % ( + inactive_count, + r.scard('queued_feeds'))) cp3 = time.time() old = now - datetime.timedelta(days=3) @@ -120,6 +117,8 @@ class UpdateFeeds(Task): for feed_pk in feed_pks: try: feed = Feed.get_by_id(feed_pk) + if not feed: + raise Feed.DoesNotExist feed.update(**options) except Feed.DoesNotExist: logging.info(" ---> Feed doesn't exist: [%s]" % feed_pk) diff --git a/config/supervisor_celeryd.conf b/config/supervisor_celeryd.conf index 610858266..ebbdeaf66 100644 --- a/config/supervisor_celeryd.conf +++ b/config/supervisor_celeryd.conf @@ -1,5 +1,4 @@ [program:celery] -# command=/srv/newsblur/manage.py celeryd --loglevel=INFO -Q new_feeds,work_queue,push_feeds,update_feeds command=/srv/newsblur/manage.py celeryd --loglevel=INFO -Q new_feeds,push_feeds,update_feeds directory=/srv/newsblur user=sclay diff --git a/config/supervisor_celeryd_new.conf b/config/supervisor_celeryd_new.conf new file mode 100644 index 000000000..0ac5d9e10 --- /dev/null +++ b/config/supervisor_celeryd_new.conf @@ -0,0 +1,19 @@ +[program:celery] +command=/srv/newsblur/manage.py celeryd --loglevel=INFO -Q new_feeds,push_feeds +directory=/srv/newsblur +user=sclay +numprocs=1 +stdout_logfile=/var/log/celeryd.log +stderr_logfile=/var/log/celeryd.log +autostart=true +autorestart=true +startsecs=10 +;process_name=%(program_name)s_%(process_num)03d + +; Need to wait for currently executing tasks to finish at shutdown. +; Increase this if you have very long running tasks. +stopwaitsecs = 60 + +; if rabbitmq is supervised, set its priority higher +; so it starts first +priority=998 diff --git a/fabfile.py b/fabfile.py index 15de7a3cf..f0519e564 100644 --- a/fabfile.py +++ b/fabfile.py @@ -201,9 +201,12 @@ def celery_fast(): @parallel def celery_stop(): with cd(env.NEWSBLUR_PATH): - run('sudo supervisorctl stop celery') + sudo('supervisorctl stop celery') with settings(warn_only=True): - run('./utils/kill_celery.sh') + if env.user == 'ubuntu': + sudo('./utils/kill_celery.sh') + else: + run('./utils/kill_celery.sh') @parallel def celery_start(): @@ -340,14 +343,14 @@ def setup_db(engine=None, skip_common=False): # if env.user == 'ubuntu': # setup_db_mdadm() -def setup_task(skip_common=False): +def setup_task(queue=None, skip_common=False): if not skip_common: setup_common() setup_vps() setup_task_firewall() setup_task_motd() copy_task_settings() - enable_celery_supervisor() + enable_celery_supervisor(queue) setup_gunicorn(supervisor=False) update_gunicorn() config_monit_task() @@ -444,7 +447,8 @@ def setup_python(): sudo('su -c \'echo "import sys; sys.setdefaultencoding(\\\\"utf-8\\\\")" > /usr/lib/python2.7/sitecustomize.py\'') if env.user == 'ubuntu': - sudo('chown -R ubuntu.ubuntu /home/ubuntu/.python-eggs') + with settings(warn_only=True): + sudo('chown -R ubuntu.ubuntu /home/ubuntu/.python-eggs') # PIL - Only if python-imaging didn't install through apt-get, like on Mac OS X. def setup_imaging(): @@ -935,16 +939,25 @@ def setup_task_firewall(): def setup_task_motd(): put('config/motd_task.txt', '/etc/motd.tail', use_sudo=True) -def enable_celery_supervisor(): - put('config/supervisor_celeryd.conf', '/etc/supervisor/conf.d/celeryd.conf', use_sudo=True) +def enable_celery_supervisor(queue=None): + if not queue: + put('config/supervisor_celeryd.conf', '/etc/supervisor/conf.d/celeryd.conf', use_sudo=True) + else: + put('config/supervisor_celeryd_%s.conf' % queue, '/etc/supervisor/conf.d/celeryd.conf', use_sudo=True) + sudo('supervisorctl reread') sudo('supervisorctl update') @parallel def copy_task_settings(): + if env.host: + host = env.host.split('.', 2)[0] + else: + host = env.host_string.split('.', 2)[0] + with settings(warn_only=True): put('../secrets-newsblur/settings/task_settings.py', '%s/local_settings.py' % env.NEWSBLUR_PATH) - run('echo "\nSERVER_NAME = \\\\"`hostname`\\\\"" >> %s/local_settings.py' % env.NEWSBLUR_PATH) + run('echo "\nSERVER_NAME = \\\\"%s\\\\"" >> %s/local_settings.py' % (host, env.NEWSBLUR_PATH)) # ========================= # = Setup - Digital Ocean = diff --git a/utils/munin/newsblur_updates.py b/utils/munin/newsblur_updates.py index ca2c766a9..004f6dfae 100755 --- a/utils/munin/newsblur_updates.py +++ b/utils/munin/newsblur_updates.py @@ -21,16 +21,13 @@ class NBMuninGraph(MuninGraph): def calculate_metrics(self): - import datetime - from apps.rss_feeds.models import Feed from django.conf import settings - hour_ago = datetime.datetime.utcnow() - datetime.timedelta(hours=1) r = redis.Redis(connection_pool=settings.REDIS_FEED_POOL) return { - 'update_queue': r.llen("queued_feeds"), - 'feeds_fetched': r.llen("fetched_feeds_last_hour"), + 'update_queue': r.scard("queued_feeds"), + 'feeds_fetched': r.zcard("fetched_feeds_last_hour"), 'celery_update_feeds': r.llen("update_feeds"), 'celery_new_feeds': r.llen("new_feeds"), 'celery_push_feeds': r.llen("push_feeds"),