mirror of
https://github.com/viq/NewsBlur.git
synced 2025-08-05 16:49:45 +00:00
Risky business: now capturing db times (sql/mongo/redis) for 1% of all tasks. Also updating munin to recognize new db profiling times.
This commit is contained in:
parent
378c421d8b
commit
4a12c220fe
6 changed files with 59 additions and 22 deletions
|
@ -8,7 +8,6 @@ from django.http import HttpResponse
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.db import connection
|
from django.db import connection
|
||||||
from django.template import Template, Context
|
from django.template import Template, Context
|
||||||
from apps.profile.tasks import CleanupUser
|
|
||||||
from apps.statistics.rstats import round_time
|
from apps.statistics.rstats import round_time
|
||||||
from utils import json_functions as json
|
from utils import json_functions as json
|
||||||
|
|
||||||
|
@ -26,6 +25,7 @@ class LastSeenMiddleware(object):
|
||||||
if request.user.profile.last_seen_on < hour_ago:
|
if request.user.profile.last_seen_on < hour_ago:
|
||||||
logging.user(request, "~FG~BBRepeat visitor: ~SB%s (%s)" % (
|
logging.user(request, "~FG~BBRepeat visitor: ~SB%s (%s)" % (
|
||||||
request.user.profile.last_seen_on, ip))
|
request.user.profile.last_seen_on, ip))
|
||||||
|
from apps.profile.tasks import CleanupUser
|
||||||
CleanupUser.delay(user_id=request.user.pk)
|
CleanupUser.delay(user_id=request.user.pk)
|
||||||
elif settings.DEBUG:
|
elif settings.DEBUG:
|
||||||
logging.user(request, "~FG~BBRepeat visitor (ignored): ~SB%s (%s)" % (
|
logging.user(request, "~FG~BBRepeat visitor (ignored): ~SB%s (%s)" % (
|
||||||
|
@ -45,6 +45,13 @@ class DBProfilerMiddleware:
|
||||||
random.random() < .01):
|
random.random() < .01):
|
||||||
request.activated_segments.append('db_profiler')
|
request.activated_segments.append('db_profiler')
|
||||||
connection.use_debug_cursor = True
|
connection.use_debug_cursor = True
|
||||||
|
|
||||||
|
def process_celery(self):
|
||||||
|
setattr(self, 'activated_segments', [])
|
||||||
|
if random.random() < .01:
|
||||||
|
self.activated_segments.append('db_profiler')
|
||||||
|
connection.use_debug_cursor = True
|
||||||
|
return self
|
||||||
|
|
||||||
def process_exception(self, request, exception):
|
def process_exception(self, request, exception):
|
||||||
if hasattr(request, 'sql_times_elapsed'):
|
if hasattr(request, 'sql_times_elapsed'):
|
||||||
|
@ -55,14 +62,21 @@ class DBProfilerMiddleware:
|
||||||
self._save_times(request.sql_times_elapsed)
|
self._save_times(request.sql_times_elapsed)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def _save_times(self, db_times):
|
def process_celery_finished(self):
|
||||||
|
middleware = SQLLogToConsoleMiddleware()
|
||||||
|
middleware.process_celery(self)
|
||||||
|
if hasattr(self, 'sql_times_elapsed'):
|
||||||
|
logging.debug(" ---> ~FGProfiling~FB task: %s" % self.sql_times_elapsed)
|
||||||
|
self._save_times(self.sql_times_elapsed, 'task_')
|
||||||
|
|
||||||
|
def _save_times(self, db_times, prefix=""):
|
||||||
if not db_times: return
|
if not db_times: return
|
||||||
|
|
||||||
r = redis.Redis(connection_pool=settings.REDIS_STATISTICS_POOL)
|
r = redis.Redis(connection_pool=settings.REDIS_STATISTICS_POOL)
|
||||||
pipe = r.pipeline()
|
pipe = r.pipeline()
|
||||||
minute = round_time(round_to=60)
|
minute = round_time(round_to=60)
|
||||||
for db, duration in db_times.items():
|
for db, duration in db_times.items():
|
||||||
key = "DB:%s:%s" % (db, minute.strftime('%s'))
|
key = "DB:%s%s:%s" % (prefix, db, minute.strftime('%s'))
|
||||||
pipe.incr("%s:c" % key)
|
pipe.incr("%s:c" % key)
|
||||||
pipe.expireat("%s:c" % key, (minute + datetime.timedelta(days=2)).strftime("%s"))
|
pipe.expireat("%s:c" % key, (minute + datetime.timedelta(days=2)).strftime("%s"))
|
||||||
if duration:
|
if duration:
|
||||||
|
@ -109,6 +123,9 @@ class SQLLogToConsoleMiddleware:
|
||||||
}
|
}
|
||||||
setattr(request, 'sql_times_elapsed', times_elapsed)
|
setattr(request, 'sql_times_elapsed', times_elapsed)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
def process_celery(self, profiler):
|
||||||
|
self.process_response(profiler, None)
|
||||||
|
|
||||||
SIMPSONS_QUOTES = [
|
SIMPSONS_QUOTES = [
|
||||||
("Homer", "D'oh."),
|
("Homer", "D'oh."),
|
||||||
|
|
|
@ -7,7 +7,9 @@ from celery.task import Task
|
||||||
from utils import log as logging
|
from utils import log as logging
|
||||||
from utils import s3_utils as s3
|
from utils import s3_utils as s3
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
from apps.profile.middleware import DBProfilerMiddleware
|
||||||
|
from utils.mongo_raw_log_middleware import MongoDumpMiddleware
|
||||||
|
from utils.redis_raw_log_middleware import RedisDumpMiddleware
|
||||||
FEED_TASKING_MAX = 10000
|
FEED_TASKING_MAX = 10000
|
||||||
|
|
||||||
class TaskFeeds(Task):
|
class TaskFeeds(Task):
|
||||||
|
@ -131,6 +133,14 @@ class UpdateFeeds(Task):
|
||||||
mongodb_replication_lag = int(MStatistics.get('mongodb_replication_lag', 0))
|
mongodb_replication_lag = int(MStatistics.get('mongodb_replication_lag', 0))
|
||||||
compute_scores = bool(mongodb_replication_lag < 10)
|
compute_scores = bool(mongodb_replication_lag < 10)
|
||||||
|
|
||||||
|
profiler = DBProfilerMiddleware()
|
||||||
|
profiler_activated = profiler.process_celery()
|
||||||
|
if profiler_activated:
|
||||||
|
mongo_middleware = MongoDumpMiddleware()
|
||||||
|
mongo_middleware.process_celery(profiler)
|
||||||
|
redis_middleware = RedisDumpMiddleware()
|
||||||
|
redis_middleware.process_celery(profiler)
|
||||||
|
|
||||||
options = {
|
options = {
|
||||||
'quick': float(MStatistics.get('quick_fetch', 0)),
|
'quick': float(MStatistics.get('quick_fetch', 0)),
|
||||||
'updates_off': MStatistics.get('updates_off', False),
|
'updates_off': MStatistics.get('updates_off', False),
|
||||||
|
@ -148,6 +158,7 @@ class UpdateFeeds(Task):
|
||||||
r.zrem('tasked_feeds', feed_pk)
|
r.zrem('tasked_feeds', feed_pk)
|
||||||
if feed:
|
if feed:
|
||||||
feed.update(**options)
|
feed.update(**options)
|
||||||
|
if profiler_activated: profiler.process_celery_finished()
|
||||||
|
|
||||||
class NewFeeds(Task):
|
class NewFeeds(Task):
|
||||||
name = 'new-feeds'
|
name = 'new-feeds'
|
||||||
|
|
|
@ -186,7 +186,7 @@ class MStatistics(mongo.Document):
|
||||||
db_times = {}
|
db_times = {}
|
||||||
latest_db_times = {}
|
latest_db_times = {}
|
||||||
|
|
||||||
for db in ['sql', 'mongo', 'redis']:
|
for db in ['sql', 'mongo', 'redis', 'task_sql', 'task_mongo', 'task_redis']:
|
||||||
db_times[db] = []
|
db_times[db] = []
|
||||||
for hour in range(24):
|
for hour in range(24):
|
||||||
start_hours_ago = now - datetime.timedelta(hours=hour+1)
|
start_hours_ago = now - datetime.timedelta(hours=hour+1)
|
||||||
|
@ -224,6 +224,9 @@ class MStatistics(mongo.Document):
|
||||||
('latest_sql_avg', latest_db_times['sql']),
|
('latest_sql_avg', latest_db_times['sql']),
|
||||||
('latest_mongo_avg', latest_db_times['mongo']),
|
('latest_mongo_avg', latest_db_times['mongo']),
|
||||||
('latest_redis_avg', latest_db_times['redis']),
|
('latest_redis_avg', latest_db_times['redis']),
|
||||||
|
('latest_task_sql_avg', latest_db_times['task_sql']),
|
||||||
|
('latest_task_mongo_avg', latest_db_times['task_mongo']),
|
||||||
|
('latest_task_redis_avg', latest_db_times['task_redis']),
|
||||||
)
|
)
|
||||||
for key, value in values:
|
for key, value in values:
|
||||||
cls.objects(key=key).update_one(upsert=True, set__key=key, set__value=value)
|
cls.objects(key=key).update_one(upsert=True, set__key=key, set__value=value)
|
||||||
|
|
|
@ -20,22 +20,20 @@ class MongoDumpMiddleware(object):
|
||||||
if not getattr(MongoClient, '_logging', False):
|
if not getattr(MongoClient, '_logging', False):
|
||||||
# save old methods
|
# save old methods
|
||||||
setattr(MongoClient, '_logging', True)
|
setattr(MongoClient, '_logging', True)
|
||||||
# # save old methods
|
|
||||||
# self.orig_send_message = \
|
|
||||||
# MongoClient._send_message
|
|
||||||
# self.orig_send_message_with_response = \
|
|
||||||
# MongoClient._send_message_with_response
|
|
||||||
# self.orig_rs_send_message = \
|
|
||||||
# MongoReplicaSetClient._send_message
|
|
||||||
# self.orig_rs_send_message_with_response = \
|
|
||||||
# MongoReplicaSetClient._send_message_with_response
|
|
||||||
# instrument methods to record messages
|
|
||||||
# MongoClient._send_message = \
|
|
||||||
# self._instrument(MongoClient._send_message)
|
|
||||||
MongoClient._send_message_with_response = \
|
MongoClient._send_message_with_response = \
|
||||||
self._instrument(MongoClient._send_message_with_response)
|
self._instrument(MongoClient._send_message_with_response)
|
||||||
# MongoReplicaSetClient._send_message = \
|
MongoReplicaSetClient._send_message_with_response = \
|
||||||
# self._instrument(MongoReplicaSetClient._send_message)
|
self._instrument(MongoReplicaSetClient._send_message_with_response)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def process_celery(self, profiler):
|
||||||
|
if not self.activated(profiler): return
|
||||||
|
self._used_msg_ids = []
|
||||||
|
if not getattr(MongoClient, '_logging', False):
|
||||||
|
# save old methods
|
||||||
|
setattr(MongoClient, '_logging', True)
|
||||||
|
MongoClient._send_message_with_response = \
|
||||||
|
self._instrument(MongoClient._send_message_with_response)
|
||||||
MongoReplicaSetClient._send_message_with_response = \
|
MongoReplicaSetClient._send_message_with_response = \
|
||||||
self._instrument(MongoReplicaSetClient._send_message_with_response)
|
self._instrument(MongoReplicaSetClient._send_message_with_response)
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -25,6 +25,9 @@ class NBMuninGraph(MuninGraph):
|
||||||
'sql_avg': MStatistics.get('latest_sql_avg'),
|
'sql_avg': MStatistics.get('latest_sql_avg'),
|
||||||
'mongo_avg': MStatistics.get('latest_mongo_avg'),
|
'mongo_avg': MStatistics.get('latest_mongo_avg'),
|
||||||
'redis_avg': MStatistics.get('latest_redis_avg'),
|
'redis_avg': MStatistics.get('latest_redis_avg'),
|
||||||
|
'task_sql_avg': MStatistics.get('latest_task_sql_avg'),
|
||||||
|
'task_mongo_avg': MStatistics.get('latest_task_mongo_avg'),
|
||||||
|
'task_redis_avg': MStatistics.get('latest_task_redis_avg'),
|
||||||
}
|
}
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
|
@ -15,9 +15,14 @@ class RedisDumpMiddleware(object):
|
||||||
if not getattr(Connection, '_logging', False):
|
if not getattr(Connection, '_logging', False):
|
||||||
# save old methods
|
# save old methods
|
||||||
setattr(Connection, '_logging', True)
|
setattr(Connection, '_logging', True)
|
||||||
# self.orig_pack_command = \
|
Connection.pack_command = \
|
||||||
# Connection.pack_command
|
self._instrument(Connection.pack_command)
|
||||||
# instrument methods to record messages
|
|
||||||
|
def process_celery(self, profiler):
|
||||||
|
if not self.activated(profiler): return
|
||||||
|
if not getattr(Connection, '_logging', False):
|
||||||
|
# save old methods
|
||||||
|
setattr(Connection, '_logging', True)
|
||||||
Connection.pack_command = \
|
Connection.pack_command = \
|
||||||
self._instrument(Connection.pack_command)
|
self._instrument(Connection.pack_command)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue