Migrating to celery 4.0

This commit is contained in:
Samuel Clay 2020-11-13 12:14:37 -05:00
parent 6990e0844d
commit 0f77d1c441
8 changed files with 384 additions and 452 deletions

View file

@ -1,9 +1,8 @@
from celery.task import Task from celery.task import task
from utils import log as logging from utils import log as logging
class EmailPopularityQuery(Task): @task()
def EmailPopularityQuery(pk):
def run(self, pk):
from apps.analyzer.models import MPopularityQuery from apps.analyzer.models import MPopularityQuery
query = MPopularityQuery.objects.get(pk=pk) query = MPopularityQuery.objects.get(pk=pk)

View file

@ -1,13 +1,12 @@
from celery.task import Task from celery.task import task
from django.contrib.auth.models import User from django.contrib.auth.models import User
from apps.feed_import.models import UploadedOPML, OPMLImporter from apps.feed_import.models import UploadedOPML, OPMLImporter
from apps.reader.models import UserSubscription from apps.reader.models import UserSubscription
from utils import log as logging from utils import log as logging
class ProcessOPML(Task): @task()
def ProcessOPML(user_id):
def run(self, user_id):
user = User.objects.get(pk=user_id) user = User.objects.get(pk=user_id)
logging.user(user, "~FR~SBOPML upload (task) starting...") logging.user(user, "~FR~SBOPML upload (task) starting...")

View file

@ -1,10 +1,9 @@
from celery.task import Task from celery.task import task
from django.contrib.auth.models import User from django.contrib.auth.models import User
from apps.notifications.models import MUserFeedNotification from apps.notifications.models import MUserFeedNotification
from utils import log as logging from utils import log as logging
class QueueNotifications(Task): @task()
def QueueNotifications(feed_id, new_stories):
def run(self, feed_id, new_stories):
MUserFeedNotification.push_feed_notifications(feed_id, new_stories) MUserFeedNotification.push_feed_notifications(feed_id, new_stories)

View file

@ -1,26 +1,22 @@
import datetime import datetime
from celery.task import Task from celery.task import task
from apps.profile.models import Profile, RNewUserQueue from apps.profile.models import Profile, RNewUserQueue
from utils import log as logging from utils import log as logging
from apps.reader.models import UserSubscription, UserSubscriptionFolders from apps.reader.models import UserSubscription, UserSubscriptionFolders
from apps.social.models import MSocialServices, MActivity, MInteraction from apps.social.models import MSocialServices, MActivity, MInteraction
class EmailNewUser(Task): @task(name="email-new-user")
def EmailNewUser(user_id):
def run(self, user_id):
user_profile = Profile.objects.get(user__pk=user_id) user_profile = Profile.objects.get(user__pk=user_id)
user_profile.send_new_user_email() user_profile.send_new_user_email()
class EmailNewPremium(Task): @task(name="email-new-premium")
def EmailNewPremium(user_id):
def run(self, user_id):
user_profile = Profile.objects.get(user__pk=user_id) user_profile = Profile.objects.get(user__pk=user_id)
user_profile.send_new_premium_email() user_profile.send_new_premium_email()
class PremiumExpire(Task): @task(name="premium-expire")
name = 'premium-expire' def PremiumExpire(**kwargs):
def run(self, **kwargs):
# Get expired but grace period users # Get expired but grace period users
two_days_ago = datetime.datetime.now() - datetime.timedelta(days=2) two_days_ago = datetime.datetime.now() - datetime.timedelta(days=2)
thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=30) thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=30)
@ -45,18 +41,12 @@ class PremiumExpire(Task):
profile.send_premium_expire_email() profile.send_premium_expire_email()
profile.deactivate_premium() profile.deactivate_premium()
@task(name="activate-next-new-user")
class ActivateNextNewUser(Task): def ActivateNextNewUser():
name = 'activate-next-new-user'
def run(self):
RNewUserQueue.activate_next() RNewUserQueue.activate_next()
@task(name="cleanup-user")
class CleanupUser(Task): def CleanupUser(user_id):
name = 'cleanup-user'
def run(self, user_id):
UserSubscription.trim_user_read_stories(user_id) UserSubscription.trim_user_read_stories(user_id)
UserSubscription.verify_feeds_scheduled(user_id) UserSubscription.verify_feeds_scheduled(user_id)
Profile.count_all_feed_subscribers_for_user(user_id) Profile.count_all_feed_subscribers_for_user(user_id)
@ -73,17 +63,13 @@ class CleanupUser(Task):
return return
ss.sync_twitter_photo() ss.sync_twitter_photo()
class CleanSpam(Task): @task(name="clean-spam")
name = 'clean-spam' def CleanSpam():
def run(self, **kwargs):
logging.debug(" ---> Finding spammers...") logging.debug(" ---> Finding spammers...")
Profile.clear_dead_spammers(confirm=True) Profile.clear_dead_spammers(confirm=True)
class ReimportStripeHistory(Task): @task(name="reimport-stripe-history")
name = 'reimport-stripe-history' def ReimportStripeHistory():
def run(self, **kwargs):
logging.debug(" ---> Reimporting Stripe history...") logging.debug(" ---> Reimporting Stripe history...")
Profile.reimport_stripe_history(limit=10, days=1) Profile.reimport_stripe_history(limit=10, days=1)

View file

@ -3,7 +3,7 @@ import os
import shutil import shutil
import time import time
import redis import redis
from celery.task import Task from celery.task import task
from celery.exceptions import SoftTimeLimitExceeded from celery.exceptions import SoftTimeLimitExceeded
from utils import log as logging from utils import log as logging
from utils import s3_utils as s3 from utils import s3_utils as s3
@ -13,10 +13,8 @@ from utils.mongo_raw_log_middleware import MongoDumpMiddleware
from utils.redis_raw_log_middleware import RedisDumpMiddleware from utils.redis_raw_log_middleware import RedisDumpMiddleware
FEED_TASKING_MAX = 10000 FEED_TASKING_MAX = 10000
class TaskFeeds(Task): @task(name='task-feeds')
name = 'task-feeds' def TaskFeeds():
def run(self, **kwargs):
from apps.rss_feeds.models import Feed from apps.rss_feeds.models import Feed
settings.LOG_TO_STREAM = True settings.LOG_TO_STREAM = True
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
@ -57,12 +55,8 @@ class TaskFeeds(Task):
r.scard('queued_feeds'), r.scard('queued_feeds'),
r.zcard('scheduled_updates'))) r.zcard('scheduled_updates')))
class TaskBrokenFeeds(Task): @task(name='task-broken-feeds')
name = 'task-broken-feeds' def TaskBrokenFeeds():
max_retries = 0
ignore_result = True
def run(self, **kwargs):
from apps.rss_feeds.models import Feed from apps.rss_feeds.models import Feed
settings.LOG_TO_STREAM = True settings.LOG_TO_STREAM = True
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
@ -125,14 +119,8 @@ class TaskBrokenFeeds(Task):
r.scard('queued_feeds'), r.scard('queued_feeds'),
r.zcard('scheduled_updates'))) r.zcard('scheduled_updates')))
class UpdateFeeds(Task): @task(name='update-feeds', time_limit=10*60, soft_time_limit=9*60, ignore_result=True)
name = 'update-feeds' def UpdateFeeds(feed_pks):
max_retries = 0
ignore_result = True
time_limit = 10*60
soft_time_limit = 9*60
def run(self, feed_pks, **kwargs):
from apps.rss_feeds.models import Feed from apps.rss_feeds.models import Feed
from apps.statistics.models import MStatistics from apps.statistics.models import MStatistics
r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL) r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL)
@ -172,14 +160,8 @@ class UpdateFeeds(Task):
logging.info(" ---> [%-30s] ~BR~FWTime limit hit!~SB~FR Moving on to next feed..." % feed) logging.info(" ---> [%-30s] ~BR~FWTime limit hit!~SB~FR Moving on to next feed..." % feed)
if profiler_activated: profiler.process_celery_finished() if profiler_activated: profiler.process_celery_finished()
class NewFeeds(Task): @task(name='new-feeds', time_limit=10*60, soft_time_limit=9*60, ignore_result=True)
name = 'new-feeds' def NewFeeds(feed_pks):
max_retries = 0
ignore_result = True
time_limit = 10*60
soft_time_limit = 9*60
def run(self, feed_pks, **kwargs):
from apps.rss_feeds.models import Feed from apps.rss_feeds.models import Feed
if not isinstance(feed_pks, list): if not isinstance(feed_pks, list):
feed_pks = [feed_pks] feed_pks = [feed_pks]
@ -190,12 +172,8 @@ class NewFeeds(Task):
if not feed: continue if not feed: continue
feed.update(options=options) feed.update(options=options)
class PushFeeds(Task): @task(name='push-feeds', ignore_result=True)
name = 'push-feeds' def PushFeeds(feed_id, xml):
max_retries = 0
ignore_result = True
def run(self, feed_id, xml, **kwargs):
from apps.rss_feeds.models import Feed from apps.rss_feeds.models import Feed
from apps.statistics.models import MStatistics from apps.statistics.models import MStatistics
@ -211,12 +189,8 @@ class PushFeeds(Task):
if feed: if feed:
feed.update(options=options) feed.update(options=options)
class BackupMongo(Task): @task(name='backup-mongo', ignore_result=True)
name = 'backup-mongo' def BackupMongo():
max_retries = 0
ignore_result = True
def run(self, **kwargs):
COLLECTIONS = "classifier_tag classifier_author classifier_feed classifier_title userstories starred_stories shared_stories category category_site sent_emails social_profile social_subscription social_services statistics feedback" COLLECTIONS = "classifier_tag classifier_author classifier_feed classifier_title userstories starred_stories shared_stories category category_site sent_emails social_profile social_subscription social_services statistics feedback"
date = time.strftime('%Y-%m-%d-%H-%M') date = time.strftime('%Y-%m-%d-%H-%M')
@ -242,9 +216,8 @@ class BackupMongo(Task):
logging.debug(' ---> ~FRFinished uploading ~SB~FM%s~SN~FR to S3.' % filename) logging.debug(' ---> ~FRFinished uploading ~SB~FM%s~SN~FR to S3.' % filename)
class ScheduleImmediateFetches(Task): @task()
def ScheduleImmediateFetches(feed_ids, user_id=None):
def run(self, feed_ids, user_id=None, **kwargs):
from apps.rss_feeds.models import Feed from apps.rss_feeds.models import Feed
if not isinstance(feed_ids, list): if not isinstance(feed_ids, list):
@ -253,9 +226,8 @@ class ScheduleImmediateFetches(Task):
Feed.schedule_feed_fetches_immediately(feed_ids, user_id=user_id) Feed.schedule_feed_fetches_immediately(feed_ids, user_id=user_id)
class SchedulePremiumSetup(Task): @task()
def SchedulePremiumSetup(feed_ids):
def run(self, feed_ids, **kwargs):
from apps.rss_feeds.models import Feed from apps.rss_feeds.models import Feed
if not isinstance(feed_ids, list): if not isinstance(feed_ids, list):
@ -263,9 +235,8 @@ class SchedulePremiumSetup(Task):
Feed.setup_feeds_for_premium_subscribers(feed_ids) Feed.setup_feeds_for_premium_subscribers(feed_ids)
class ScheduleCountTagsForUser(Task): @task()
def ScheduleCountTagsForUser(user_id):
def run(self, user_id):
from apps.rss_feeds.models import MStarredStoryCounts from apps.rss_feeds.models import MStarredStoryCounts
MStarredStoryCounts.count_for_user(user_id) MStarredStoryCounts.count_for_user(user_id)

View file

@ -1,26 +1,21 @@
from celery.task import Task from celery.task import task
class IndexSubscriptionsForSearch(Task): @task()
def IndexSubscriptionsForSearch(user_id):
def run(self, user_id):
from apps.search.models import MUserSearch from apps.search.models import MUserSearch
user_search = MUserSearch.get_user(user_id) user_search = MUserSearch.get_user(user_id)
user_search.index_subscriptions_for_search() user_search.index_subscriptions_for_search()
class IndexSubscriptionsChunkForSearch(Task): @task()
def IndexSubscriptionsChunkForSearch(feed_ids, user_id):
ignore_result = False
def run(self, feed_ids, user_id):
from apps.search.models import MUserSearch from apps.search.models import MUserSearch
user_search = MUserSearch.get_user(user_id) user_search = MUserSearch.get_user(user_id)
user_search.index_subscriptions_chunk_for_search(feed_ids) user_search.index_subscriptions_chunk_for_search(feed_ids)
class IndexFeedsForSearch(Task): @task()
def IndexFeedsForSearch(feed_ids, user_id):
def run(self, feed_ids, user_id):
from apps.search.models import MUserSearch from apps.search.models import MUserSearch
MUserSearch.index_feeds_for_search(feed_ids, user_id) MUserSearch.index_feeds_for_search(feed_ids, user_id)

View file

@ -1,79 +1,66 @@
from bson.objectid import ObjectId from bson.objectid import ObjectId
from celery.task import Task from celery.task import task
from apps.social.models import MSharedStory, MSocialProfile, MSocialServices, MSocialSubscription from apps.social.models import MSharedStory, MSocialProfile, MSocialServices, MSocialSubscription
from django.contrib.auth.models import User from django.contrib.auth.models import User
from utils import log as logging from utils import log as logging
class PostToService(Task): @task()
def PostToService(shared_story_id, service):
def run(self, shared_story_id, service):
try: try:
shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id))
shared_story.post_to_service(service) shared_story.post_to_service(service)
except MSharedStory.DoesNotExist: except MSharedStory.DoesNotExist:
logging.debug(" ---> Shared story not found (%s). Can't post to: %s" % (shared_story_id, service)) logging.debug(" ---> Shared story not found (%s). Can't post to: %s" % (shared_story_id, service))
class EmailNewFollower(Task): @task()
def EmailNewFollower(follower_user_id, followee_user_id):
def run(self, follower_user_id, followee_user_id):
user_profile = MSocialProfile.get_user(followee_user_id) user_profile = MSocialProfile.get_user(followee_user_id)
user_profile.send_email_for_new_follower(follower_user_id) user_profile.send_email_for_new_follower(follower_user_id)
class EmailFollowRequest(Task): @task()
def EmailFollowRequest(follower_user_id, followee_user_id):
def run(self, follower_user_id, followee_user_id):
user_profile = MSocialProfile.get_user(followee_user_id) user_profile = MSocialProfile.get_user(followee_user_id)
user_profile.send_email_for_follow_request(follower_user_id) user_profile.send_email_for_follow_request(follower_user_id)
class EmailFirstShare(Task): @task()
def EmailFirstShare(user_id):
def run(self, user_id):
user = User.objects.get(pk=user_id) user = User.objects.get(pk=user_id)
user.profile.send_first_share_to_blurblog_email() user.profile.send_first_share_to_blurblog_email()
class EmailCommentReplies(Task): @task()
def EmailCommentReplies(shared_story_id, reply_id):
def run(self, shared_story_id, reply_id):
shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id))
shared_story.send_emails_for_new_reply(ObjectId(reply_id)) shared_story.send_emails_for_new_reply(ObjectId(reply_id))
class EmailStoryReshares(Task): @task
def EmailStoryReshares(shared_story_id):
def run(self, shared_story_id):
shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id)) shared_story = MSharedStory.objects.get(id=ObjectId(shared_story_id))
shared_story.send_email_for_reshare() shared_story.send_email_for_reshare()
class SyncTwitterFriends(Task): @task()
def SyncTwitterFriends(user_id):
def run(self, user_id):
social_services = MSocialServices.objects.get(user_id=user_id) social_services = MSocialServices.objects.get(user_id=user_id)
social_services.sync_twitter_friends() social_services.sync_twitter_friends()
class SyncFacebookFriends(Task): @task()
def SyncFacebookFriends(user_id):
def run(self, user_id):
social_services = MSocialServices.objects.get(user_id=user_id) social_services = MSocialServices.objects.get(user_id=user_id)
social_services.sync_facebook_friends() social_services.sync_facebook_friends()
class SharePopularStories(Task): @task(name="share-popular-stories")
name = 'share-popular-stories' def SharePopularStories():
def run(self, **kwargs):
logging.debug(" ---> Sharing popular stories...") logging.debug(" ---> Sharing popular stories...")
MSharedStory.share_popular_stories(interactive=False) MSharedStory.share_popular_stories(interactive=False)
class CleanSocialSpam(Task): @task(name='clean-social-spam')
name = 'clean-social-spam' def CleanSocialSpam():
def run(self, **kwargs):
logging.debug(" ---> Finding social spammers...") logging.debug(" ---> Finding social spammers...")
MSharedStory.count_potential_spammers(destroy=True) MSharedStory.count_potential_spammers(destroy=True)
class UpdateRecalcForSubscription(Task): @task()
def UpdateRecalcForSubscription(subscription_user_id, shared_story_id):
def run(self, subscription_user_id, shared_story_id):
user = User.objects.get(pk=subscription_user_id) user = User.objects.get(pk=subscription_user_id)
socialsubs = MSocialSubscription.objects.filter(subscription_user_id=subscription_user_id) socialsubs = MSocialSubscription.objects.filter(subscription_user_id=subscription_user_id)
try: try:

View file

@ -1,21 +1,17 @@
from celery.task import Task from celery.task import task
from apps.statistics.models import MStatistics from apps.statistics.models import MStatistics
from apps.statistics.models import MFeedback from apps.statistics.models import MFeedback
# from utils import log as logging # from utils import log as logging
class CollectStats(Task): @task(name='collect-stats')
name = 'collect-stats' def CollectStats():
logging.debug(" ---> ~FBCollecting stats...")
def run(self, **kwargs):
# logging.debug(" ---> ~FBCollecting stats...")
MStatistics.collect_statistics() MStatistics.collect_statistics()
class CollectFeedback(Task): @task(name='collect-feedback')
name = 'collect-feedback' def CollectFeedback():
logging.debug(" ---> ~FBCollecting feedback...")
def run(self, **kwargs):
# logging.debug(" ---> ~FBCollecting feedback...")
MFeedback.collect_feedback() MFeedback.collect_feedback()