diff --git a/apps/analyzer/tasks.py b/apps/analyzer/tasks.py index 60f7f41bf..d0fb7909e 100644 --- a/apps/analyzer/tasks.py +++ b/apps/analyzer/tasks.py @@ -1,13 +1,12 @@ -from celery.task import Task +from celery.task import task from utils import log as logging -class EmailPopularityQuery(Task): +@task() +def EmailPopularityQuery(pk): + from apps.analyzer.models import MPopularityQuery + + query = MPopularityQuery.objects.get(pk=pk) + logging.debug(" -> ~BB~FCRunning popularity query: ~SB%s" % query) + + query.send_email() - def run(self, pk): - from apps.analyzer.models import MPopularityQuery - - query = MPopularityQuery.objects.get(pk=pk) - logging.debug(" -> ~BB~FCRunning popularity query: ~SB%s" % query) - - query.send_email() - diff --git a/apps/feed_import/tasks.py b/apps/feed_import/tasks.py index d0f6c1c7c..701d8f921 100644 --- a/apps/feed_import/tasks.py +++ b/apps/feed_import/tasks.py @@ -1,21 +1,20 @@ -from celery.task import Task +from celery.task import task from django.contrib.auth.models import User from apps.feed_import.models import UploadedOPML, OPMLImporter from apps.reader.models import UserSubscription from utils import log as logging -class ProcessOPML(Task): +@task() +def ProcessOPML(user_id): + user = User.objects.get(pk=user_id) + logging.user(user, "~FR~SBOPML upload (task) starting...") + + opml = UploadedOPML.objects.filter(user_id=user_id).first() + opml_importer = OPMLImporter(opml.opml_file, user) + opml_importer.process() - def run(self, user_id): - user = User.objects.get(pk=user_id) - logging.user(user, "~FR~SBOPML upload (task) starting...") - - opml = UploadedOPML.objects.filter(user_id=user_id).first() - opml_importer = OPMLImporter(opml.opml_file, user) - opml_importer.process() - - feed_count = UserSubscription.objects.filter(user=user).count() - user.profile.send_upload_opml_finished_email(feed_count) - logging.user(user, "~FR~SBOPML upload (task): ~SK%s~SN~SB~FR feeds" % (feed_count)) + feed_count = UserSubscription.objects.filter(user=user).count() + user.profile.send_upload_opml_finished_email(feed_count) + logging.user(user, "~FR~SBOPML upload (task): ~SK%s~SN~SB~FR feeds" % (feed_count)) diff --git a/apps/notifications/tasks.py b/apps/notifications/tasks.py index 3ad7b80eb..8001b60ec 100644 --- a/apps/notifications/tasks.py +++ b/apps/notifications/tasks.py @@ -1,10 +1,9 @@ -from celery.task import Task +from celery.task import task from django.contrib.auth.models import User from apps.notifications.models import MUserFeedNotification from utils import log as logging -class QueueNotifications(Task): - - def run(self, feed_id, new_stories): - MUserFeedNotification.push_feed_notifications(feed_id, new_stories) \ No newline at end of file +@task() +def QueueNotifications(feed_id, new_stories): + MUserFeedNotification.push_feed_notifications(feed_id, new_stories) diff --git a/apps/profile/tasks.py b/apps/profile/tasks.py index 491fb7302..fd3fa5855 100644 --- a/apps/profile/tasks.py +++ b/apps/profile/tasks.py @@ -1,90 +1,76 @@ import datetime -from celery.task import Task +from celery.task import task from apps.profile.models import Profile, RNewUserQueue from utils import log as logging from apps.reader.models import UserSubscription, UserSubscriptionFolders from apps.social.models import MSocialServices, MActivity, MInteraction -class EmailNewUser(Task): - - def run(self, user_id): - user_profile = Profile.objects.get(user__pk=user_id) - user_profile.send_new_user_email() +@task(name="email-new-user") +def EmailNewUser(user_id): + user_profile = Profile.objects.get(user__pk=user_id) + user_profile.send_new_user_email() -class EmailNewPremium(Task): - - def run(self, user_id): - user_profile = Profile.objects.get(user__pk=user_id) - user_profile.send_new_premium_email() +@task(name="email-new-premium") +def EmailNewPremium(user_id): + user_profile = Profile.objects.get(user__pk=user_id) + user_profile.send_new_premium_email() -class PremiumExpire(Task): - name = 'premium-expire' - - def run(self, **kwargs): - # Get expired but grace period users - two_days_ago = datetime.datetime.now() - datetime.timedelta(days=2) - thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=30) - expired_profiles = Profile.objects.filter(is_premium=True, - premium_expire__lte=two_days_ago, - premium_expire__gt=thirty_days_ago) - logging.debug(" ---> %s users have expired premiums, emailing grace..." % expired_profiles.count()) - for profile in expired_profiles: - if profile.grace_period_email_sent(): - continue - profile.setup_premium_history() - if profile.premium_expire < two_days_ago: - profile.send_premium_expire_grace_period_email() - - # Get fully expired users - expired_profiles = Profile.objects.filter(is_premium=True, - premium_expire__lte=thirty_days_ago) - logging.debug(" ---> %s users have expired premiums, deactivating and emailing..." % expired_profiles.count()) - for profile in expired_profiles: - profile.setup_premium_history() - if profile.premium_expire < thirty_days_ago: - profile.send_premium_expire_email() - profile.deactivate_premium() - - -class ActivateNextNewUser(Task): - name = 'activate-next-new-user' - - def run(self): - RNewUserQueue.activate_next() - - -class CleanupUser(Task): - name = 'cleanup-user' - - def run(self, user_id): - UserSubscription.trim_user_read_stories(user_id) - UserSubscription.verify_feeds_scheduled(user_id) - Profile.count_all_feed_subscribers_for_user(user_id) - MInteraction.trim(user_id) - MActivity.trim(user_id) - UserSubscriptionFolders.add_missing_feeds_for_user(user_id) - UserSubscriptionFolders.compact_for_user(user_id) - # UserSubscription.refresh_stale_feeds(user_id) +@task(name="premium-expire") +def PremiumExpire(**kwargs): + # Get expired but grace period users + two_days_ago = datetime.datetime.now() - datetime.timedelta(days=2) + thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=30) + expired_profiles = Profile.objects.filter(is_premium=True, + premium_expire__lte=two_days_ago, + premium_expire__gt=thirty_days_ago) + logging.debug(" ---> %s users have expired premiums, emailing grace..." % expired_profiles.count()) + for profile in expired_profiles: + if profile.grace_period_email_sent(): + continue + profile.setup_premium_history() + if profile.premium_expire < two_days_ago: + profile.send_premium_expire_grace_period_email() - try: - ss = MSocialServices.objects.get(user_id=user_id) - except MSocialServices.DoesNotExist: - logging.debug(" ---> ~FRCleaning up user, can't find social_services for user_id: ~SB%s" % user_id) - return - ss.sync_twitter_photo() + # Get fully expired users + expired_profiles = Profile.objects.filter(is_premium=True, + premium_expire__lte=thirty_days_ago) + logging.debug(" ---> %s users have expired premiums, deactivating and emailing..." % expired_profiles.count()) + for profile in expired_profiles: + profile.setup_premium_history() + if profile.premium_expire < thirty_days_ago: + profile.send_premium_expire_email() + profile.deactivate_premium() -class CleanSpam(Task): - name = 'clean-spam' +@task(name="activate-next-new-user") +def ActivateNextNewUser(): + RNewUserQueue.activate_next() - def run(self, **kwargs): - logging.debug(" ---> Finding spammers...") - Profile.clear_dead_spammers(confirm=True) +@task(name="cleanup-user") +def CleanupUser(user_id): + UserSubscription.trim_user_read_stories(user_id) + UserSubscription.verify_feeds_scheduled(user_id) + Profile.count_all_feed_subscribers_for_user(user_id) + MInteraction.trim(user_id) + MActivity.trim(user_id) + UserSubscriptionFolders.add_missing_feeds_for_user(user_id) + UserSubscriptionFolders.compact_for_user(user_id) + # UserSubscription.refresh_stale_feeds(user_id) + + try: + ss = MSocialServices.objects.get(user_id=user_id) + except MSocialServices.DoesNotExist: + logging.debug(" ---> ~FRCleaning up user, can't find social_services for user_id: ~SB%s" % user_id) + return + ss.sync_twitter_photo() -class ReimportStripeHistory(Task): - name = 'reimport-stripe-history' +@task(name="clean-spam") +def CleanSpam(): + logging.debug(" ---> Finding spammers...") + Profile.clear_dead_spammers(confirm=True) - def run(self, **kwargs): - logging.debug(" ---> Reimporting Stripe history...") - Profile.reimport_stripe_history(limit=10, days=1) +@task(name="reimport-stripe-history") +def ReimportStripeHistory(): + logging.debug(" ---> Reimporting Stripe history...") + Profile.reimport_stripe_history(limit=10, days=1) diff --git a/apps/rss_feeds/tasks.py b/apps/rss_feeds/tasks.py index 2e5f40ac1..4a93275e1 100644 --- a/apps/rss_feeds/tasks.py +++ b/apps/rss_feeds/tasks.py @@ -3,7 +3,7 @@ import os import shutil import time import redis -from celery.task import Task +from celery.task import task from celery.exceptions import SoftTimeLimitExceeded from utils import log as logging from utils import s3_utils as s3 @@ -13,259 +13,230 @@ from utils.mongo_raw_log_middleware import MongoDumpMiddleware from utils.redis_raw_log_middleware import RedisDumpMiddleware FEED_TASKING_MAX = 10000 -class TaskFeeds(Task): - name = 'task-feeds' - - def run(self, **kwargs): - from apps.rss_feeds.models import Feed - settings.LOG_TO_STREAM = True - now = datetime.datetime.utcnow() - start = time.time() - r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL) - tasked_feeds_size = r.zcard('tasked_feeds') - - hour_ago = now - datetime.timedelta(hours=1) - r.zremrangebyscore('fetched_feeds_last_hour', 0, int(hour_ago.strftime('%s'))) - - now_timestamp = int(now.strftime("%s")) - queued_feeds = r.zrangebyscore('scheduled_updates', 0, now_timestamp) - r.zremrangebyscore('scheduled_updates', 0, now_timestamp) - if not queued_feeds: - logging.debug(" ---> ~SN~FB~BMNo feeds to queue! Exiting...") - return - - r.sadd('queued_feeds', *queued_feeds) - logging.debug(" ---> ~SN~FBQueuing ~SB%s~SN stale feeds (~SB%s~SN/~FG%s~FB~SN/%s tasked/queued/scheduled)" % ( - len(queued_feeds), - r.zcard('tasked_feeds'), - r.scard('queued_feeds'), - r.zcard('scheduled_updates'))) - - # Regular feeds - if tasked_feeds_size < FEED_TASKING_MAX: - feeds = r.srandmember('queued_feeds', FEED_TASKING_MAX) - Feed.task_feeds(feeds, verbose=True) - active_count = len(feeds) - else: - logging.debug(" ---> ~SN~FBToo many tasked feeds. ~SB%s~SN tasked." % tasked_feeds_size) - active_count = 0 - - logging.debug(" ---> ~SN~FBTasking %s feeds took ~SB%s~SN seconds (~SB%s~SN/~FG%s~FB~SN/%s tasked/queued/scheduled)" % ( - active_count, - int((time.time() - start)), - r.zcard('tasked_feeds'), - r.scard('queued_feeds'), - r.zcard('scheduled_updates'))) - -class TaskBrokenFeeds(Task): - name = 'task-broken-feeds' - max_retries = 0 - ignore_result = True +@task(name='task-feeds') +def TaskFeeds(): + from apps.rss_feeds.models import Feed + settings.LOG_TO_STREAM = True + now = datetime.datetime.utcnow() + start = time.time() + r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL) + tasked_feeds_size = r.zcard('tasked_feeds') - def run(self, **kwargs): - from apps.rss_feeds.models import Feed - settings.LOG_TO_STREAM = True - now = datetime.datetime.utcnow() - start = time.time() - r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL) - - logging.debug(" ---> ~SN~FBQueuing broken feeds...") - - # Force refresh feeds - refresh_feeds = Feed.objects.filter( - active=True, - fetched_once=False, - active_subscribers__gte=1 - ).order_by('?')[:100] - refresh_count = refresh_feeds.count() - cp1 = time.time() - - logging.debug(" ---> ~SN~FBFound %s active, unfetched broken feeds" % refresh_count) - - # Mistakenly inactive feeds - hours_ago = (now - datetime.timedelta(minutes=10)).strftime('%s') - old_tasked_feeds = r.zrangebyscore('tasked_feeds', 0, hours_ago) - inactive_count = len(old_tasked_feeds) - if inactive_count: - r.zremrangebyscore('tasked_feeds', 0, hours_ago) - # r.sadd('queued_feeds', *old_tasked_feeds) - for feed_id in old_tasked_feeds: - r.zincrby('error_feeds', 1, feed_id) - feed = Feed.get_by_id(feed_id) - feed.set_next_scheduled_update() - logging.debug(" ---> ~SN~FBRe-queuing ~SB%s~SN dropped/broken feeds (~SB%s/%s~SN queued/tasked)" % ( - inactive_count, - r.scard('queued_feeds'), - r.zcard('tasked_feeds'))) - cp2 = time.time() - - old = now - datetime.timedelta(days=1) - old_feeds = Feed.objects.filter( - next_scheduled_update__lte=old, - active_subscribers__gte=1 - ).order_by('?')[:500] - old_count = old_feeds.count() - cp3 = time.time() - - logging.debug(" ---> ~SN~FBTasking ~SBrefresh:~FC%s~FB inactive:~FC%s~FB old:~FC%s~SN~FB broken feeds... (%.4s/%.4s/%.4s)" % ( - refresh_count, - inactive_count, - old_count, - cp1 - start, - cp2 - cp1, - cp3 - cp2, - )) - - Feed.task_feeds(refresh_feeds, verbose=False) - Feed.task_feeds(old_feeds, verbose=False) - - logging.debug(" ---> ~SN~FBTasking broken feeds took ~SB%s~SN seconds (~SB%s~SN/~FG%s~FB~SN/%s tasked/queued/scheduled)" % ( - int((time.time() - start)), - r.zcard('tasked_feeds'), - r.scard('queued_feeds'), - r.zcard('scheduled_updates'))) - -class UpdateFeeds(Task): - name = 'update-feeds' - max_retries = 0 - ignore_result = True - time_limit = 10*60 - soft_time_limit = 9*60 - - def run(self, feed_pks, **kwargs): - from apps.rss_feeds.models import Feed - from apps.statistics.models import MStatistics - r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL) - - mongodb_replication_lag = int(MStatistics.get('mongodb_replication_lag', 0)) - compute_scores = bool(mongodb_replication_lag < 10) - - profiler = DBProfilerMiddleware() - profiler_activated = profiler.process_celery() - if profiler_activated: - mongo_middleware = MongoDumpMiddleware() - mongo_middleware.process_celery(profiler) - redis_middleware = RedisDumpMiddleware() - redis_middleware.process_celery(profiler) - - options = { - 'quick': float(MStatistics.get('quick_fetch', 0)), - 'updates_off': MStatistics.get('updates_off', False), - 'compute_scores': compute_scores, - 'mongodb_replication_lag': mongodb_replication_lag, - } - - if not isinstance(feed_pks, list): - feed_pks = [feed_pks] - - for feed_pk in feed_pks: - feed = Feed.get_by_id(feed_pk) - if not feed or feed.pk != int(feed_pk): - logging.info(" ---> ~FRRemoving feed_id %s from tasked_feeds queue, points to %s..." % (feed_pk, feed and feed.pk)) - r.zrem('tasked_feeds', feed_pk) - if not feed: - continue - try: - feed.update(**options) - except SoftTimeLimitExceeded, e: - feed.save_feed_history(505, 'Timeout', e) - logging.info(" ---> [%-30s] ~BR~FWTime limit hit!~SB~FR Moving on to next feed..." % feed) - if profiler_activated: profiler.process_celery_finished() - -class NewFeeds(Task): - name = 'new-feeds' - max_retries = 0 - ignore_result = True - time_limit = 10*60 - soft_time_limit = 9*60 - - def run(self, feed_pks, **kwargs): - from apps.rss_feeds.models import Feed - if not isinstance(feed_pks, list): - feed_pks = [feed_pks] - - options = {} - for feed_pk in feed_pks: - feed = Feed.get_by_id(feed_pk) - if not feed: continue - feed.update(options=options) - -class PushFeeds(Task): - name = 'push-feeds' - max_retries = 0 - ignore_result = True - - def run(self, feed_id, xml, **kwargs): - from apps.rss_feeds.models import Feed - from apps.statistics.models import MStatistics - - mongodb_replication_lag = int(MStatistics.get('mongodb_replication_lag', 0)) - compute_scores = bool(mongodb_replication_lag < 60) - - options = { - 'feed_xml': xml, - 'compute_scores': compute_scores, - 'mongodb_replication_lag': mongodb_replication_lag, - } - feed = Feed.get_by_id(feed_id) - if feed: - feed.update(options=options) - -class BackupMongo(Task): - name = 'backup-mongo' - max_retries = 0 - ignore_result = True + hour_ago = now - datetime.timedelta(hours=1) + r.zremrangebyscore('fetched_feeds_last_hour', 0, int(hour_ago.strftime('%s'))) - def run(self, **kwargs): - COLLECTIONS = "classifier_tag classifier_author classifier_feed classifier_title userstories starred_stories shared_stories category category_site sent_emails social_profile social_subscription social_services statistics feedback" + now_timestamp = int(now.strftime("%s")) + queued_feeds = r.zrangebyscore('scheduled_updates', 0, now_timestamp) + r.zremrangebyscore('scheduled_updates', 0, now_timestamp) + if not queued_feeds: + logging.debug(" ---> ~SN~FB~BMNo feeds to queue! Exiting...") + return + + r.sadd('queued_feeds', *queued_feeds) + logging.debug(" ---> ~SN~FBQueuing ~SB%s~SN stale feeds (~SB%s~SN/~FG%s~FB~SN/%s tasked/queued/scheduled)" % ( + len(queued_feeds), + r.zcard('tasked_feeds'), + r.scard('queued_feeds'), + r.zcard('scheduled_updates'))) + + # Regular feeds + if tasked_feeds_size < FEED_TASKING_MAX: + feeds = r.srandmember('queued_feeds', FEED_TASKING_MAX) + Feed.task_feeds(feeds, verbose=True) + active_count = len(feeds) + else: + logging.debug(" ---> ~SN~FBToo many tasked feeds. ~SB%s~SN tasked." % tasked_feeds_size) + active_count = 0 + + logging.debug(" ---> ~SN~FBTasking %s feeds took ~SB%s~SN seconds (~SB%s~SN/~FG%s~FB~SN/%s tasked/queued/scheduled)" % ( + active_count, + int((time.time() - start)), + r.zcard('tasked_feeds'), + r.scard('queued_feeds'), + r.zcard('scheduled_updates'))) - date = time.strftime('%Y-%m-%d-%H-%M') - collections = COLLECTIONS.split(' ') - db_name = 'newsblur' - dir_name = 'backup_mongo_%s' % date - filename = '%s.tgz' % dir_name +@task(name='task-broken-feeds') +def TaskBrokenFeeds(): + from apps.rss_feeds.models import Feed + settings.LOG_TO_STREAM = True + now = datetime.datetime.utcnow() + start = time.time() + r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL) + + logging.debug(" ---> ~SN~FBQueuing broken feeds...") + + # Force refresh feeds + refresh_feeds = Feed.objects.filter( + active=True, + fetched_once=False, + active_subscribers__gte=1 + ).order_by('?')[:100] + refresh_count = refresh_feeds.count() + cp1 = time.time() + + logging.debug(" ---> ~SN~FBFound %s active, unfetched broken feeds" % refresh_count) - os.mkdir(dir_name) + # Mistakenly inactive feeds + hours_ago = (now - datetime.timedelta(minutes=10)).strftime('%s') + old_tasked_feeds = r.zrangebyscore('tasked_feeds', 0, hours_ago) + inactive_count = len(old_tasked_feeds) + if inactive_count: + r.zremrangebyscore('tasked_feeds', 0, hours_ago) + # r.sadd('queued_feeds', *old_tasked_feeds) + for feed_id in old_tasked_feeds: + r.zincrby('error_feeds', 1, feed_id) + feed = Feed.get_by_id(feed_id) + feed.set_next_scheduled_update() + logging.debug(" ---> ~SN~FBRe-queuing ~SB%s~SN dropped/broken feeds (~SB%s/%s~SN queued/tasked)" % ( + inactive_count, + r.scard('queued_feeds'), + r.zcard('tasked_feeds'))) + cp2 = time.time() + + old = now - datetime.timedelta(days=1) + old_feeds = Feed.objects.filter( + next_scheduled_update__lte=old, + active_subscribers__gte=1 + ).order_by('?')[:500] + old_count = old_feeds.count() + cp3 = time.time() + + logging.debug(" ---> ~SN~FBTasking ~SBrefresh:~FC%s~FB inactive:~FC%s~FB old:~FC%s~SN~FB broken feeds... (%.4s/%.4s/%.4s)" % ( + refresh_count, + inactive_count, + old_count, + cp1 - start, + cp2 - cp1, + cp3 - cp2, + )) + + Feed.task_feeds(refresh_feeds, verbose=False) + Feed.task_feeds(old_feeds, verbose=False) + + logging.debug(" ---> ~SN~FBTasking broken feeds took ~SB%s~SN seconds (~SB%s~SN/~FG%s~FB~SN/%s tasked/queued/scheduled)" % ( + int((time.time() - start)), + r.zcard('tasked_feeds'), + r.scard('queued_feeds'), + r.zcard('scheduled_updates'))) + +@task(name='update-feeds', time_limit=10*60, soft_time_limit=9*60, ignore_result=True) +def UpdateFeeds(feed_pks): + from apps.rss_feeds.models import Feed + from apps.statistics.models import MStatistics + r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL) - for collection in collections: - cmd = 'mongodump --db %s --collection %s -o %s' % (db_name, collection, dir_name) - logging.debug(' ---> ~FMDumping ~SB%s~SN: %s' % (collection, cmd)) - os.system(cmd) + mongodb_replication_lag = int(MStatistics.get('mongodb_replication_lag', 0)) + compute_scores = bool(mongodb_replication_lag < 10) + + profiler = DBProfilerMiddleware() + profiler_activated = profiler.process_celery() + if profiler_activated: + mongo_middleware = MongoDumpMiddleware() + mongo_middleware.process_celery(profiler) + redis_middleware = RedisDumpMiddleware() + redis_middleware.process_celery(profiler) + + options = { + 'quick': float(MStatistics.get('quick_fetch', 0)), + 'updates_off': MStatistics.get('updates_off', False), + 'compute_scores': compute_scores, + 'mongodb_replication_lag': mongodb_replication_lag, + } + + if not isinstance(feed_pks, list): + feed_pks = [feed_pks] + + for feed_pk in feed_pks: + feed = Feed.get_by_id(feed_pk) + if not feed or feed.pk != int(feed_pk): + logging.info(" ---> ~FRRemoving feed_id %s from tasked_feeds queue, points to %s..." % (feed_pk, feed and feed.pk)) + r.zrem('tasked_feeds', feed_pk) + if not feed: + continue + try: + feed.update(**options) + except SoftTimeLimitExceeded, e: + feed.save_feed_history(505, 'Timeout', e) + logging.info(" ---> [%-30s] ~BR~FWTime limit hit!~SB~FR Moving on to next feed..." % feed) + if profiler_activated: profiler.process_celery_finished() - cmd = 'tar -jcf %s %s' % (filename, dir_name) +@task(name='new-feeds', time_limit=10*60, soft_time_limit=9*60, ignore_result=True) +def NewFeeds(feed_pks): + from apps.rss_feeds.models import Feed + if not isinstance(feed_pks, list): + feed_pks = [feed_pks] + + options = {} + for feed_pk in feed_pks: + feed = Feed.get_by_id(feed_pk) + if not feed: continue + feed.update(options=options) + +@task(name='push-feeds', ignore_result=True) +def PushFeeds(feed_id, xml): + from apps.rss_feeds.models import Feed + from apps.statistics.models import MStatistics + + mongodb_replication_lag = int(MStatistics.get('mongodb_replication_lag', 0)) + compute_scores = bool(mongodb_replication_lag < 60) + + options = { + 'feed_xml': xml, + 'compute_scores': compute_scores, + 'mongodb_replication_lag': mongodb_replication_lag, + } + feed = Feed.get_by_id(feed_id) + if feed: + feed.update(options=options) + +@task(name='backup-mongo', ignore_result=True) +def BackupMongo(): + COLLECTIONS = "classifier_tag classifier_author classifier_feed classifier_title userstories starred_stories shared_stories category category_site sent_emails social_profile social_subscription social_services statistics feedback" + + date = time.strftime('%Y-%m-%d-%H-%M') + collections = COLLECTIONS.split(' ') + db_name = 'newsblur' + dir_name = 'backup_mongo_%s' % date + filename = '%s.tgz' % dir_name + + os.mkdir(dir_name) + + for collection in collections: + cmd = 'mongodump --db %s --collection %s -o %s' % (db_name, collection, dir_name) + logging.debug(' ---> ~FMDumping ~SB%s~SN: %s' % (collection, cmd)) os.system(cmd) - logging.debug(' ---> ~FRUploading ~SB~FM%s~SN~FR to S3...' % filename) - s3.save_file_in_s3(filename) - shutil.rmtree(dir_name) - os.remove(filename) - logging.debug(' ---> ~FRFinished uploading ~SB~FM%s~SN~FR to S3.' % filename) + cmd = 'tar -jcf %s %s' % (filename, dir_name) + os.system(cmd) + + logging.debug(' ---> ~FRUploading ~SB~FM%s~SN~FR to S3...' % filename) + s3.save_file_in_s3(filename) + shutil.rmtree(dir_name) + os.remove(filename) + logging.debug(' ---> ~FRFinished uploading ~SB~FM%s~SN~FR to S3.' % filename) -class ScheduleImmediateFetches(Task): +@task() +def ScheduleImmediateFetches(feed_ids, user_id=None): + from apps.rss_feeds.models import Feed - def run(self, feed_ids, user_id=None, **kwargs): - from apps.rss_feeds.models import Feed - - if not isinstance(feed_ids, list): - feed_ids = [feed_ids] - - Feed.schedule_feed_fetches_immediately(feed_ids, user_id=user_id) + if not isinstance(feed_ids, list): + feed_ids = [feed_ids] + + Feed.schedule_feed_fetches_immediately(feed_ids, user_id=user_id) -class SchedulePremiumSetup(Task): +@task() +def SchedulePremiumSetup(feed_ids): + from apps.rss_feeds.models import Feed - def run(self, feed_ids, **kwargs): - from apps.rss_feeds.models import Feed - - if not isinstance(feed_ids, list): - feed_ids = [feed_ids] - - Feed.setup_feeds_for_premium_subscribers(feed_ids) - -class ScheduleCountTagsForUser(Task): + if not isinstance(feed_ids, list): + feed_ids = [feed_ids] - def run(self, user_id): - from apps.rss_feeds.models import MStarredStoryCounts - - MStarredStoryCounts.count_for_user(user_id) + Feed.setup_feeds_for_premium_subscribers(feed_ids) + +@task() +def ScheduleCountTagsForUser(user_id): + from apps.rss_feeds.models import MStarredStoryCounts + + MStarredStoryCounts.count_for_user(user_id) diff --git a/apps/search/tasks.py b/apps/search/tasks.py index 7dfbbe8b2..51cac61ad 100644 --- a/apps/search/tasks.py +++ b/apps/search/tasks.py @@ -1,26 +1,21 @@ -from celery.task import Task +from celery.task import task -class IndexSubscriptionsForSearch(Task): +@task() +def IndexSubscriptionsForSearch(user_id): + from apps.search.models import MUserSearch - def run(self, user_id): - from apps.search.models import MUserSearch - - user_search = MUserSearch.get_user(user_id) - user_search.index_subscriptions_for_search() + user_search = MUserSearch.get_user(user_id) + user_search.index_subscriptions_for_search() -class IndexSubscriptionsChunkForSearch(Task): +@task() +def IndexSubscriptionsChunkForSearch(feed_ids, user_id): + from apps.search.models import MUserSearch - ignore_result = False - - def run(self, feed_ids, user_id): - from apps.search.models import MUserSearch - - user_search = MUserSearch.get_user(user_id) - user_search.index_subscriptions_chunk_for_search(feed_ids) + user_search = MUserSearch.get_user(user_id) + user_search.index_subscriptions_chunk_for_search(feed_ids) -class IndexFeedsForSearch(Task): +@task() +def IndexFeedsForSearch(feed_ids, user_id): + from apps.search.models import MUserSearch - def run(self, feed_ids, user_id): - from apps.search.models import MUserSearch - - MUserSearch.index_feeds_for_search(feed_ids, user_id) \ No newline at end of file + MUserSearch.index_feeds_for_search(feed_ids, user_id) diff --git a/apps/social/tasks.py b/apps/social/tasks.py index 2d934836d..bde7f3305 100644 --- a/apps/social/tasks.py +++ b/apps/social/tasks.py @@ -1,92 +1,79 @@ from bson.objectid import ObjectId -from celery.task import Task +from celery.task import task from apps.social.models import MSharedStory, MSocialProfile, MSocialServices, MSocialSubscription from django.contrib.auth.models import User from utils import log as logging -class PostToService(Task): - - def run(self, shared_story_id, service): - try: - shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) - shared_story.post_to_service(service) - except MSharedStory.DoesNotExist: - logging.debug(" ---> Shared story not found (%s). Can't post to: %s" % (shared_story_id, service)) +@task() +def PostToService(shared_story_id, service): + try: + shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) + shared_story.post_to_service(service) + except MSharedStory.DoesNotExist: + logging.debug(" ---> Shared story not found (%s). Can't post to: %s" % (shared_story_id, service)) -class EmailNewFollower(Task): - - def run(self, follower_user_id, followee_user_id): - user_profile = MSocialProfile.get_user(followee_user_id) - user_profile.send_email_for_new_follower(follower_user_id) +@task() +def EmailNewFollower(follower_user_id, followee_user_id): + user_profile = MSocialProfile.get_user(followee_user_id) + user_profile.send_email_for_new_follower(follower_user_id) -class EmailFollowRequest(Task): - - def run(self, follower_user_id, followee_user_id): - user_profile = MSocialProfile.get_user(followee_user_id) - user_profile.send_email_for_follow_request(follower_user_id) +@task() +def EmailFollowRequest(follower_user_id, followee_user_id): + user_profile = MSocialProfile.get_user(followee_user_id) + user_profile.send_email_for_follow_request(follower_user_id) -class EmailFirstShare(Task): - - def run(self, user_id): - user = User.objects.get(pk=user_id) - user.profile.send_first_share_to_blurblog_email() +@task() +def EmailFirstShare(user_id): + user = User.objects.get(pk=user_id) + user.profile.send_first_share_to_blurblog_email() -class EmailCommentReplies(Task): - - def run(self, shared_story_id, reply_id): - shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) - shared_story.send_emails_for_new_reply(ObjectId(reply_id)) +@task() +def EmailCommentReplies(shared_story_id, reply_id): + shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) + shared_story.send_emails_for_new_reply(ObjectId(reply_id)) -class EmailStoryReshares(Task): - - def run(self, shared_story_id): - shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) - shared_story.send_email_for_reshare() +@task +def EmailStoryReshares(shared_story_id): + shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) + shared_story.send_email_for_reshare() -class SyncTwitterFriends(Task): - - def run(self, user_id): - social_services = MSocialServices.objects.get(user_id=user_id) - social_services.sync_twitter_friends() +@task() +def SyncTwitterFriends(user_id): + social_services = MSocialServices.objects.get(user_id=user_id) + social_services.sync_twitter_friends() -class SyncFacebookFriends(Task): - - def run(self, user_id): - social_services = MSocialServices.objects.get(user_id=user_id) - social_services.sync_facebook_friends() +@task() +def SyncFacebookFriends(user_id): + social_services = MSocialServices.objects.get(user_id=user_id) + social_services.sync_facebook_friends() -class SharePopularStories(Task): - name = 'share-popular-stories' - - def run(self, **kwargs): - logging.debug(" ---> Sharing popular stories...") - MSharedStory.share_popular_stories(interactive=False) +@task(name="share-popular-stories") +def SharePopularStories(): + logging.debug(" ---> Sharing popular stories...") + MSharedStory.share_popular_stories(interactive=False) -class CleanSocialSpam(Task): - name = 'clean-social-spam' - - def run(self, **kwargs): - logging.debug(" ---> Finding social spammers...") - MSharedStory.count_potential_spammers(destroy=True) +@task(name='clean-social-spam') +def CleanSocialSpam(): + logging.debug(" ---> Finding social spammers...") + MSharedStory.count_potential_spammers(destroy=True) -class UpdateRecalcForSubscription(Task): +@task() +def UpdateRecalcForSubscription(subscription_user_id, shared_story_id): + user = User.objects.get(pk=subscription_user_id) + socialsubs = MSocialSubscription.objects.filter(subscription_user_id=subscription_user_id) + try: + shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) + except MSharedStory.DoesNotExist: + return - def run(self, subscription_user_id, shared_story_id): - user = User.objects.get(pk=subscription_user_id) - socialsubs = MSocialSubscription.objects.filter(subscription_user_id=subscription_user_id) - try: - shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) - except MSharedStory.DoesNotExist: - return - - logging.debug(" ---> ~FM~SNFlipping unread recalc for ~SB%s~SN subscriptions to ~SB%s's blurblog~SN" % ( - socialsubs.count(), - user.username - )) - for socialsub in socialsubs: - socialsub.needs_unread_recalc = True - socialsub.save() - - shared_story.publish_update_to_subscribers() + logging.debug(" ---> ~FM~SNFlipping unread recalc for ~SB%s~SN subscriptions to ~SB%s's blurblog~SN" % ( + socialsubs.count(), + user.username + )) + for socialsub in socialsubs: + socialsub.needs_unread_recalc = True + socialsub.save() + + shared_story.publish_update_to_subscribers() diff --git a/apps/statistics/tasks.py b/apps/statistics/tasks.py index 7a8c5f7e4..e05bbe6d3 100644 --- a/apps/statistics/tasks.py +++ b/apps/statistics/tasks.py @@ -1,21 +1,17 @@ -from celery.task import Task +from celery.task import task from apps.statistics.models import MStatistics from apps.statistics.models import MFeedback # from utils import log as logging -class CollectStats(Task): - name = 'collect-stats' - - def run(self, **kwargs): - # logging.debug(" ---> ~FBCollecting stats...") - MStatistics.collect_statistics() +@task(name='collect-stats') +def CollectStats(): + logging.debug(" ---> ~FBCollecting stats...") + MStatistics.collect_statistics() -class CollectFeedback(Task): - name = 'collect-feedback' - - def run(self, **kwargs): - # logging.debug(" ---> ~FBCollecting feedback...") - MFeedback.collect_feedback() \ No newline at end of file +@task(name='collect-feedback') +def CollectFeedback(): + logging.debug(" ---> ~FBCollecting feedback...") + MFeedback.collect_feedback()