Pulling in redis db changes from pro branch, includes pipeline and command execution duration that actually works.

This commit is contained in:
Samuel Clay 2022-06-24 14:04:36 -04:00
parent c249de0168
commit ae0916762f

View file

@ -2,7 +2,9 @@ from django.core.exceptions import MiddlewareNotUsed
from django.conf import settings from django.conf import settings
from django.db import connection from django.db import connection
from redis.connection import Connection from redis.connection import Connection
from redis.client import Redis, Pipeline
from time import time from time import time
from pprint import pprint
class RedisDumpMiddleware(object): class RedisDumpMiddleware(object):
@ -20,16 +22,20 @@ class RedisDumpMiddleware(object):
# save old methods # save old methods
setattr(Connection, '_logging', True) setattr(Connection, '_logging', True)
connection.queriesx = [] connection.queriesx = []
Connection.pack_command = \ Redis.execute_command = \
self._instrument(Connection.pack_command) self._instrument(Redis.execute_command)
Pipeline._execute_transaction = \
self._instrument_pipeline(Pipeline._execute_transaction)
def process_celery(self, profiler): def process_celery(self, profiler):
if not self.activated(profiler): return if not self.activated(profiler): return
if not getattr(Connection, '_logging', False): if not getattr(Connection, '_logging', False):
# save old methods # save old methods
setattr(Connection, '_logging', True) setattr(Connection, '_logging', True)
Connection.pack_command = \ Redis.execute_command = \
self._instrument(Connection.pack_command) self._instrument(Redis.execute_command)
Pipeline._execute_transaction = \
self._instrument_pipeline(Pipeline._execute_transaction)
def process_response(self, request, response): def process_response(self, request, response):
# if settings.DEBUG and hasattr(self, 'orig_pack_command'): # if settings.DEBUG and hasattr(self, 'orig_pack_command'):
@ -57,13 +63,31 @@ class RedisDumpMiddleware(object):
return result return result
return instrumented_method return instrumented_method
def _instrument_pipeline(self, original_method):
def instrumented_method(*args, **kwargs):
message = self.process_pipeline(*args, **kwargs)
if not message:
return original_method(*args, **kwargs)
start = time()
result = original_method(*args, **kwargs)
stop = time()
duration = stop - start
if not getattr(connection, 'queriesx', False):
connection.queriesx = []
connection.queriesx.append({
message['redis_server_name']: message,
'time': '%.6f' % duration,
})
return result
return instrumented_method
def process_message(self, *args, **kwargs): def process_message(self, *args, **kwargs):
query = [] query = []
redis_server_name = None redis_server_name = None
for a, arg in enumerate(args): for a, arg in enumerate(args):
if isinstance(arg, Connection): if isinstance(arg, Redis):
redis_connection = arg redis_connection = arg
redis_server_name = redis_connection.host redis_server_name = redis_connection.connection_pool.connection_kwargs['host']
if 'db-redis-user' in redis_server_name: if 'db-redis-user' in redis_server_name:
redis_server_name = 'redis_user' redis_server_name = 'redis_user'
elif 'db-redis-session' in redis_server_name: elif 'db-redis-session' in redis_server_name:
@ -78,7 +102,38 @@ class RedisDumpMiddleware(object):
if len(str(arg)) > 100: if len(str(arg)) > 100:
arg = "[%s bytes]" % len(str(arg)) arg = "[%s bytes]" % len(str(arg))
query.append(str(arg).replace('\n', '')) query.append(str(arg).replace('\n', ''))
return { 'query': ' '.join(query), 'redis_server_name': redis_server_name } return { 'query': f"{redis_server_name}: {' '.join(query)}", 'redis_server_name': redis_server_name }
def process_pipeline(self, *args, **kwargs):
queries = []
redis_server_name = None
for a, arg in enumerate(args):
if isinstance(arg, Connection):
continue
if isinstance(arg, Pipeline):
redis_connection = arg
redis_server_name = redis_connection.connection_pool.connection_kwargs['host']
if 'db-redis-user' in redis_server_name:
redis_server_name = 'redis_user'
elif 'db-redis-session' in redis_server_name:
redis_server_name = 'redis_session'
elif 'db-redis-story' in redis_server_name:
redis_server_name = 'redis_story'
elif 'db-redis-pubsub' in redis_server_name:
redis_server_name = 'redis_pubsub'
elif 'db_redis' in redis_server_name:
redis_server_name = 'redis_user'
continue
if not isinstance(arg, list):
continue
for command in arg:
command_query = " ".join([str(c) for c in command[0]])
queries.append(command_query)
if len(str(arg)) > 10000:
arg = "[%s bytes]" % len(str(arg))
# query.append(str(arg).replace('\n', ''))
queries_str = '\n\t\t\t\t\t\t~FC'.join(queries)
return { 'query': f"{redis_server_name}: {queries_str}", 'redis_server_name': redis_server_name }
def __call__(self, request): def __call__(self, request):
response = None response = None