From ae0916762f116dbbd80c101e599a77eee1643e14 Mon Sep 17 00:00:00 2001 From: Samuel Clay Date: Fri, 24 Jun 2022 14:04:36 -0400 Subject: [PATCH] Pulling in redis db changes from pro branch, includes pipeline and command execution duration that actually works. --- utils/redis_raw_log_middleware.py | 69 +++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 7 deletions(-) diff --git a/utils/redis_raw_log_middleware.py b/utils/redis_raw_log_middleware.py index cb1a4d02d..c1049c7ae 100644 --- a/utils/redis_raw_log_middleware.py +++ b/utils/redis_raw_log_middleware.py @@ -2,7 +2,9 @@ from django.core.exceptions import MiddlewareNotUsed from django.conf import settings from django.db import connection from redis.connection import Connection +from redis.client import Redis, Pipeline from time import time +from pprint import pprint class RedisDumpMiddleware(object): @@ -20,16 +22,20 @@ class RedisDumpMiddleware(object): # save old methods setattr(Connection, '_logging', True) connection.queriesx = [] - Connection.pack_command = \ - self._instrument(Connection.pack_command) + Redis.execute_command = \ + self._instrument(Redis.execute_command) + Pipeline._execute_transaction = \ + self._instrument_pipeline(Pipeline._execute_transaction) def process_celery(self, profiler): if not self.activated(profiler): return if not getattr(Connection, '_logging', False): # save old methods setattr(Connection, '_logging', True) - Connection.pack_command = \ - self._instrument(Connection.pack_command) + Redis.execute_command = \ + self._instrument(Redis.execute_command) + Pipeline._execute_transaction = \ + self._instrument_pipeline(Pipeline._execute_transaction) def process_response(self, request, response): # if settings.DEBUG and hasattr(self, 'orig_pack_command'): @@ -56,14 +62,32 @@ class RedisDumpMiddleware(object): }) return result return instrumented_method + + def _instrument_pipeline(self, original_method): + def instrumented_method(*args, **kwargs): + message = self.process_pipeline(*args, **kwargs) + if not message: + return original_method(*args, **kwargs) + start = time() + result = original_method(*args, **kwargs) + stop = time() + duration = stop - start + if not getattr(connection, 'queriesx', False): + connection.queriesx = [] + connection.queriesx.append({ + message['redis_server_name']: message, + 'time': '%.6f' % duration, + }) + return result + return instrumented_method def process_message(self, *args, **kwargs): query = [] redis_server_name = None for a, arg in enumerate(args): - if isinstance(arg, Connection): + if isinstance(arg, Redis): redis_connection = arg - redis_server_name = redis_connection.host + redis_server_name = redis_connection.connection_pool.connection_kwargs['host'] if 'db-redis-user' in redis_server_name: redis_server_name = 'redis_user' elif 'db-redis-session' in redis_server_name: @@ -78,7 +102,38 @@ class RedisDumpMiddleware(object): if len(str(arg)) > 100: arg = "[%s bytes]" % len(str(arg)) query.append(str(arg).replace('\n', '')) - return { 'query': ' '.join(query), 'redis_server_name': redis_server_name } + return { 'query': f"{redis_server_name}: {' '.join(query)}", 'redis_server_name': redis_server_name } + + def process_pipeline(self, *args, **kwargs): + queries = [] + redis_server_name = None + for a, arg in enumerate(args): + if isinstance(arg, Connection): + continue + if isinstance(arg, Pipeline): + redis_connection = arg + redis_server_name = redis_connection.connection_pool.connection_kwargs['host'] + if 'db-redis-user' in redis_server_name: + redis_server_name = 'redis_user' + elif 'db-redis-session' in redis_server_name: + redis_server_name = 'redis_session' + elif 'db-redis-story' in redis_server_name: + redis_server_name = 'redis_story' + elif 'db-redis-pubsub' in redis_server_name: + redis_server_name = 'redis_pubsub' + elif 'db_redis' in redis_server_name: + redis_server_name = 'redis_user' + continue + if not isinstance(arg, list): + continue + for command in arg: + command_query = " ".join([str(c) for c in command[0]]) + queries.append(command_query) + if len(str(arg)) > 10000: + arg = "[%s bytes]" % len(str(arg)) + # query.append(str(arg).replace('\n', '')) + queries_str = '\n\t\t\t\t\t\t~FC'.join(queries) + return { 'query': f"{redis_server_name}: {queries_str}", 'redis_server_name': redis_server_name } def __call__(self, request): response = None