2012-07-16 23:40:17 -07:00
|
|
|
import datetime
|
2012-12-10 11:37:35 -08:00
|
|
|
import os
|
|
|
|
import shutil
|
|
|
|
import time
|
2013-03-30 19:05:13 -07:00
|
|
|
import redis
|
2020-10-05 00:45:20 +07:00
|
|
|
from celery import Task
|
2016-02-01 23:06:04 -08:00
|
|
|
from celery.exceptions import SoftTimeLimitExceeded
|
2011-04-11 21:57:45 -04:00
|
|
|
from utils import log as logging
|
2012-12-10 11:40:35 -08:00
|
|
|
from utils import s3_utils as s3
|
2011-11-04 09:45:10 -07:00
|
|
|
from django.conf import settings
|
2015-12-16 17:31:41 -08:00
|
|
|
from apps.profile.middleware import DBProfilerMiddleware
|
|
|
|
from utils.mongo_raw_log_middleware import MongoDumpMiddleware
|
|
|
|
from utils.redis_raw_log_middleware import RedisDumpMiddleware
|
2015-07-23 16:29:47 -07:00
|
|
|
FEED_TASKING_MAX = 10000
|
|
|
|
|
2012-07-16 23:40:17 -07:00
|
|
|
class TaskFeeds(Task):
|
|
|
|
name = 'task-feeds'
|
|
|
|
|
|
|
|
def run(self, **kwargs):
|
|
|
|
from apps.rss_feeds.models import Feed
|
|
|
|
settings.LOG_TO_STREAM = True
|
|
|
|
now = datetime.datetime.utcnow()
|
2013-03-28 11:16:43 -07:00
|
|
|
start = time.time()
|
2015-07-27 18:35:25 -07:00
|
|
|
r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL)
|
2013-04-03 19:04:30 -07:00
|
|
|
tasked_feeds_size = r.zcard('tasked_feeds')
|
2013-03-30 19:05:13 -07:00
|
|
|
|
|
|
|
hour_ago = now - datetime.timedelta(hours=1)
|
|
|
|
r.zremrangebyscore('fetched_feeds_last_hour', 0, int(hour_ago.strftime('%s')))
|
|
|
|
|
|
|
|
now_timestamp = int(now.strftime("%s"))
|
|
|
|
queued_feeds = r.zrangebyscore('scheduled_updates', 0, now_timestamp)
|
|
|
|
r.zremrangebyscore('scheduled_updates', 0, now_timestamp)
|
2016-02-05 14:43:31 -08:00
|
|
|
if not queued_feeds:
|
|
|
|
logging.debug(" ---> ~SN~FB~BMNo feeds to queue! Exiting...")
|
|
|
|
return
|
|
|
|
|
2013-03-30 19:05:13 -07:00
|
|
|
r.sadd('queued_feeds', *queued_feeds)
|
2013-04-07 17:19:59 -07:00
|
|
|
logging.debug(" ---> ~SN~FBQueuing ~SB%s~SN stale feeds (~SB%s~SN/~FG%s~FB~SN/%s tasked/queued/scheduled)" % (
|
2013-03-30 19:05:13 -07:00
|
|
|
len(queued_feeds),
|
2013-04-07 17:19:59 -07:00
|
|
|
r.zcard('tasked_feeds'),
|
2013-03-30 19:05:13 -07:00
|
|
|
r.scard('queued_feeds'),
|
|
|
|
r.zcard('scheduled_updates')))
|
2012-07-16 23:40:17 -07:00
|
|
|
|
2013-03-14 12:55:10 -07:00
|
|
|
# Regular feeds
|
2015-07-23 16:29:47 -07:00
|
|
|
if tasked_feeds_size < FEED_TASKING_MAX:
|
|
|
|
feeds = r.srandmember('queued_feeds', FEED_TASKING_MAX)
|
2013-04-03 17:22:45 -07:00
|
|
|
Feed.task_feeds(feeds, verbose=True)
|
|
|
|
active_count = len(feeds)
|
|
|
|
else:
|
2013-04-08 09:49:21 -07:00
|
|
|
logging.debug(" ---> ~SN~FBToo many tasked feeds. ~SB%s~SN tasked." % tasked_feeds_size)
|
2013-04-03 17:22:45 -07:00
|
|
|
active_count = 0
|
2015-07-23 16:29:47 -07:00
|
|
|
|
|
|
|
logging.debug(" ---> ~SN~FBTasking %s feeds took ~SB%s~SN seconds (~SB%s~SN/~FG%s~FB~SN/%s tasked/queued/scheduled)" % (
|
|
|
|
active_count,
|
|
|
|
int((time.time() - start)),
|
|
|
|
r.zcard('tasked_feeds'),
|
|
|
|
r.scard('queued_feeds'),
|
|
|
|
r.zcard('scheduled_updates')))
|
|
|
|
|
|
|
|
class TaskBrokenFeeds(Task):
|
|
|
|
name = 'task-broken-feeds'
|
|
|
|
max_retries = 0
|
|
|
|
ignore_result = True
|
|
|
|
|
|
|
|
def run(self, **kwargs):
|
|
|
|
from apps.rss_feeds.models import Feed
|
|
|
|
settings.LOG_TO_STREAM = True
|
|
|
|
now = datetime.datetime.utcnow()
|
|
|
|
start = time.time()
|
2015-07-27 18:35:25 -07:00
|
|
|
r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL)
|
2015-07-23 16:29:47 -07:00
|
|
|
|
|
|
|
logging.debug(" ---> ~SN~FBQueuing broken feeds...")
|
2013-03-14 12:55:10 -07:00
|
|
|
|
2013-03-21 10:57:57 -07:00
|
|
|
# Force refresh feeds
|
|
|
|
refresh_feeds = Feed.objects.filter(
|
|
|
|
active=True,
|
|
|
|
fetched_once=False,
|
|
|
|
active_subscribers__gte=1
|
2013-03-29 21:46:04 -07:00
|
|
|
).order_by('?')[:100]
|
2013-03-21 10:57:57 -07:00
|
|
|
refresh_count = refresh_feeds.count()
|
2015-07-23 16:29:47 -07:00
|
|
|
cp1 = time.time()
|
2013-03-21 10:57:57 -07:00
|
|
|
|
2015-07-23 16:29:47 -07:00
|
|
|
logging.debug(" ---> ~SN~FBFound %s active, unfetched broken feeds" % refresh_count)
|
|
|
|
|
2012-07-16 23:40:17 -07:00
|
|
|
# Mistakenly inactive feeds
|
2013-05-24 13:18:05 -07:00
|
|
|
hours_ago = (now - datetime.timedelta(minutes=10)).strftime('%s')
|
2013-04-03 17:22:45 -07:00
|
|
|
old_tasked_feeds = r.zrangebyscore('tasked_feeds', 0, hours_ago)
|
|
|
|
inactive_count = len(old_tasked_feeds)
|
2013-04-03 17:28:51 -07:00
|
|
|
if inactive_count:
|
2013-04-03 17:22:45 -07:00
|
|
|
r.zremrangebyscore('tasked_feeds', 0, hours_ago)
|
2013-04-08 10:50:50 -07:00
|
|
|
# r.sadd('queued_feeds', *old_tasked_feeds)
|
2013-04-08 10:04:40 -07:00
|
|
|
for feed_id in old_tasked_feeds:
|
2020-06-29 17:39:55 -04:00
|
|
|
r.zincrby('error_feeds', 1, feed_id)
|
2013-04-08 10:50:50 -07:00
|
|
|
feed = Feed.get_by_id(feed_id)
|
|
|
|
feed.set_next_scheduled_update()
|
2015-07-23 16:29:47 -07:00
|
|
|
logging.debug(" ---> ~SN~FBRe-queuing ~SB%s~SN dropped/broken feeds (~SB%s/%s~SN queued/tasked)" % (
|
|
|
|
inactive_count,
|
|
|
|
r.scard('queued_feeds'),
|
|
|
|
r.zcard('tasked_feeds')))
|
|
|
|
cp2 = time.time()
|
2012-07-16 23:40:17 -07:00
|
|
|
|
2013-04-03 22:59:25 -07:00
|
|
|
old = now - datetime.timedelta(days=1)
|
2012-12-25 14:58:49 -08:00
|
|
|
old_feeds = Feed.objects.filter(
|
2013-03-30 19:05:13 -07:00
|
|
|
next_scheduled_update__lte=old,
|
2012-08-09 21:18:45 -07:00
|
|
|
active_subscribers__gte=1
|
2013-03-29 16:02:32 -07:00
|
|
|
).order_by('?')[:500]
|
2012-12-25 14:58:49 -08:00
|
|
|
old_count = old_feeds.count()
|
2015-07-23 16:29:47 -07:00
|
|
|
cp3 = time.time()
|
2012-12-25 12:08:17 -08:00
|
|
|
|
2015-07-23 16:29:47 -07:00
|
|
|
logging.debug(" ---> ~SN~FBTasking ~SBrefresh:~FC%s~FB inactive:~FC%s~FB old:~FC%s~SN~FB broken feeds... (%.4s/%.4s/%.4s)" % (
|
2013-03-21 10:57:57 -07:00
|
|
|
refresh_count,
|
2012-12-25 12:08:17 -08:00
|
|
|
inactive_count,
|
|
|
|
old_count,
|
2013-03-29 21:46:04 -07:00
|
|
|
cp1 - start,
|
|
|
|
cp2 - cp1,
|
|
|
|
cp3 - cp2,
|
|
|
|
))
|
2012-12-25 14:58:49 -08:00
|
|
|
|
2013-03-21 10:57:57 -07:00
|
|
|
Feed.task_feeds(refresh_feeds, verbose=False)
|
2013-03-29 21:46:04 -07:00
|
|
|
Feed.task_feeds(old_feeds, verbose=False)
|
2015-07-23 16:29:47 -07:00
|
|
|
|
|
|
|
logging.debug(" ---> ~SN~FBTasking broken feeds took ~SB%s~SN seconds (~SB%s~SN/~FG%s~FB~SN/%s tasked/queued/scheduled)" % (
|
2013-03-30 19:05:13 -07:00
|
|
|
int((time.time() - start)),
|
2013-04-03 19:04:30 -07:00
|
|
|
r.zcard('tasked_feeds'),
|
2013-03-30 19:05:13 -07:00
|
|
|
r.scard('queued_feeds'),
|
|
|
|
r.zcard('scheduled_updates')))
|
2012-07-16 23:40:17 -07:00
|
|
|
|
2010-09-20 19:22:19 -04:00
|
|
|
class UpdateFeeds(Task):
|
|
|
|
name = 'update-feeds'
|
2010-08-31 18:07:47 -04:00
|
|
|
max_retries = 0
|
2010-09-01 08:19:58 -04:00
|
|
|
ignore_result = True
|
2016-02-01 23:06:04 -08:00
|
|
|
time_limit = 10*60
|
|
|
|
soft_time_limit = 9*60
|
2010-08-31 18:07:47 -04:00
|
|
|
|
2010-09-01 10:37:58 -04:00
|
|
|
def run(self, feed_pks, **kwargs):
|
2010-12-23 12:32:24 -05:00
|
|
|
from apps.rss_feeds.models import Feed
|
2012-02-24 16:43:08 -08:00
|
|
|
from apps.statistics.models import MStatistics
|
2015-07-27 18:35:25 -07:00
|
|
|
r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL)
|
2013-04-04 21:57:58 -07:00
|
|
|
|
2012-08-16 23:56:21 -07:00
|
|
|
mongodb_replication_lag = int(MStatistics.get('mongodb_replication_lag', 0))
|
2012-10-23 09:30:14 -07:00
|
|
|
compute_scores = bool(mongodb_replication_lag < 10)
|
2012-08-16 23:56:21 -07:00
|
|
|
|
2015-12-16 17:31:41 -08:00
|
|
|
profiler = DBProfilerMiddleware()
|
|
|
|
profiler_activated = profiler.process_celery()
|
|
|
|
if profiler_activated:
|
|
|
|
mongo_middleware = MongoDumpMiddleware()
|
|
|
|
mongo_middleware.process_celery(profiler)
|
|
|
|
redis_middleware = RedisDumpMiddleware()
|
|
|
|
redis_middleware.process_celery(profiler)
|
|
|
|
|
2012-02-26 22:33:06 -08:00
|
|
|
options = {
|
2012-02-27 16:41:34 -08:00
|
|
|
'quick': float(MStatistics.get('quick_fetch', 0)),
|
2014-05-29 17:53:16 -07:00
|
|
|
'updates_off': MStatistics.get('updates_off', False),
|
2012-08-16 23:56:21 -07:00
|
|
|
'compute_scores': compute_scores,
|
|
|
|
'mongodb_replication_lag': mongodb_replication_lag,
|
2012-02-26 22:33:06 -08:00
|
|
|
}
|
2012-02-24 16:43:08 -08:00
|
|
|
|
2010-09-01 10:37:58 -04:00
|
|
|
if not isinstance(feed_pks, list):
|
|
|
|
feed_pks = [feed_pks]
|
|
|
|
|
|
|
|
for feed_pk in feed_pks:
|
2013-04-04 21:57:58 -07:00
|
|
|
feed = Feed.get_by_id(feed_pk)
|
|
|
|
if not feed or feed.pk != int(feed_pk):
|
|
|
|
logging.info(" ---> ~FRRemoving feed_id %s from tasked_feeds queue, points to %s..." % (feed_pk, feed and feed.pk))
|
|
|
|
r.zrem('tasked_feeds', feed_pk)
|
2016-02-01 23:06:04 -08:00
|
|
|
if not feed:
|
|
|
|
continue
|
|
|
|
try:
|
2012-03-26 17:04:35 -07:00
|
|
|
feed.update(**options)
|
2020-06-15 02:54:37 -04:00
|
|
|
except SoftTimeLimitExceeded as e:
|
2016-02-01 23:06:04 -08:00
|
|
|
feed.save_feed_history(505, 'Timeout', e)
|
|
|
|
logging.info(" ---> [%-30s] ~BR~FWTime limit hit!~SB~FR Moving on to next feed..." % feed)
|
|
|
|
if profiler_activated: profiler.process_celery_finished()
|
2010-08-31 18:07:47 -04:00
|
|
|
|
2010-09-28 05:16:00 -04:00
|
|
|
class NewFeeds(Task):
|
2010-09-20 19:22:19 -04:00
|
|
|
name = 'new-feeds'
|
2010-09-28 05:16:00 -04:00
|
|
|
max_retries = 0
|
|
|
|
ignore_result = True
|
2016-02-01 23:06:04 -08:00
|
|
|
time_limit = 10*60
|
|
|
|
soft_time_limit = 9*60
|
2010-09-28 05:16:00 -04:00
|
|
|
|
|
|
|
def run(self, feed_pks, **kwargs):
|
2010-12-23 12:32:24 -05:00
|
|
|
from apps.rss_feeds.models import Feed
|
2010-09-28 05:16:00 -04:00
|
|
|
if not isinstance(feed_pks, list):
|
|
|
|
feed_pks = [feed_pks]
|
2012-03-26 12:40:13 -07:00
|
|
|
|
2013-04-07 20:22:46 -07:00
|
|
|
options = {}
|
2010-09-28 05:16:00 -04:00
|
|
|
for feed_pk in feed_pks:
|
2012-10-25 16:14:25 -07:00
|
|
|
feed = Feed.get_by_id(feed_pk)
|
2013-04-15 15:16:53 -07:00
|
|
|
if not feed: continue
|
2012-03-26 12:40:13 -07:00
|
|
|
feed.update(options=options)
|
2012-03-28 15:49:21 -07:00
|
|
|
|
|
|
|
class PushFeeds(Task):
|
|
|
|
name = 'push-feeds'
|
|
|
|
max_retries = 0
|
|
|
|
ignore_result = True
|
|
|
|
|
|
|
|
def run(self, feed_id, xml, **kwargs):
|
|
|
|
from apps.rss_feeds.models import Feed
|
2012-08-16 23:56:21 -07:00
|
|
|
from apps.statistics.models import MStatistics
|
|
|
|
|
|
|
|
mongodb_replication_lag = int(MStatistics.get('mongodb_replication_lag', 0))
|
2012-09-13 13:54:40 -07:00
|
|
|
compute_scores = bool(mongodb_replication_lag < 60)
|
2012-03-28 15:49:21 -07:00
|
|
|
|
|
|
|
options = {
|
2012-08-16 23:56:21 -07:00
|
|
|
'feed_xml': xml,
|
|
|
|
'compute_scores': compute_scores,
|
|
|
|
'mongodb_replication_lag': mongodb_replication_lag,
|
2012-03-28 15:49:21 -07:00
|
|
|
}
|
2012-10-25 16:14:25 -07:00
|
|
|
feed = Feed.get_by_id(feed_id)
|
2013-04-15 15:16:53 -07:00
|
|
|
if feed:
|
|
|
|
feed.update(options=options)
|
2012-12-10 11:37:35 -08:00
|
|
|
|
|
|
|
class BackupMongo(Task):
|
|
|
|
name = 'backup-mongo'
|
|
|
|
max_retries = 0
|
|
|
|
ignore_result = True
|
|
|
|
|
|
|
|
def run(self, **kwargs):
|
|
|
|
COLLECTIONS = "classifier_tag classifier_author classifier_feed classifier_title userstories starred_stories shared_stories category category_site sent_emails social_profile social_subscription social_services statistics feedback"
|
|
|
|
|
|
|
|
date = time.strftime('%Y-%m-%d-%H-%M')
|
|
|
|
collections = COLLECTIONS.split(' ')
|
|
|
|
db_name = 'newsblur'
|
|
|
|
dir_name = 'backup_mongo_%s' % date
|
|
|
|
filename = '%s.tgz' % dir_name
|
|
|
|
|
|
|
|
os.mkdir(dir_name)
|
|
|
|
|
|
|
|
for collection in collections:
|
|
|
|
cmd = 'mongodump --db %s --collection %s -o %s' % (db_name, collection, dir_name)
|
|
|
|
logging.debug(' ---> ~FMDumping ~SB%s~SN: %s' % (collection, cmd))
|
|
|
|
os.system(cmd)
|
|
|
|
|
|
|
|
cmd = 'tar -jcf %s %s' % (filename, dir_name)
|
|
|
|
os.system(cmd)
|
|
|
|
|
2012-12-10 13:33:37 -08:00
|
|
|
logging.debug(' ---> ~FRUploading ~SB~FM%s~SN~FR to S3...' % filename)
|
2012-12-10 11:37:35 -08:00
|
|
|
s3.save_file_in_s3(filename)
|
|
|
|
shutil.rmtree(dir_name)
|
2012-12-10 13:33:37 -08:00
|
|
|
os.remove(filename)
|
|
|
|
logging.debug(' ---> ~FRFinished uploading ~SB~FM%s~SN~FR to S3.' % filename)
|
2013-01-02 12:27:08 -08:00
|
|
|
|
|
|
|
|
|
|
|
class ScheduleImmediateFetches(Task):
|
|
|
|
|
2014-04-21 16:38:27 -07:00
|
|
|
def run(self, feed_ids, user_id=None, **kwargs):
|
2013-01-02 12:27:08 -08:00
|
|
|
from apps.rss_feeds.models import Feed
|
|
|
|
|
|
|
|
if not isinstance(feed_ids, list):
|
|
|
|
feed_ids = [feed_ids]
|
|
|
|
|
2014-04-21 16:38:27 -07:00
|
|
|
Feed.schedule_feed_fetches_immediately(feed_ids, user_id=user_id)
|
2013-02-15 09:52:11 -08:00
|
|
|
|
|
|
|
|
|
|
|
class SchedulePremiumSetup(Task):
|
|
|
|
|
|
|
|
def run(self, feed_ids, **kwargs):
|
|
|
|
from apps.rss_feeds.models import Feed
|
|
|
|
|
|
|
|
if not isinstance(feed_ids, list):
|
|
|
|
feed_ids = [feed_ids]
|
|
|
|
|
|
|
|
Feed.setup_feeds_for_premium_subscribers(feed_ids)
|
2013-01-02 12:27:08 -08:00
|
|
|
|
2014-04-29 12:36:42 -07:00
|
|
|
class ScheduleCountTagsForUser(Task):
|
|
|
|
|
|
|
|
def run(self, user_id):
|
2014-05-20 16:00:57 -07:00
|
|
|
from apps.rss_feeds.models import MStarredStoryCounts
|
2014-04-29 12:36:42 -07:00
|
|
|
|
2014-05-28 15:30:30 -07:00
|
|
|
MStarredStoryCounts.count_for_user(user_id)
|