NewsBlur-viq/utils/redis_raw_log_middleware.py

152 lines
6.2 KiB
Python
Raw Normal View History

from django.core.exceptions import MiddlewareNotUsed
from django.conf import settings
from django.db import connection
from redis.connection import Connection
from redis.client import Redis, Pipeline
from time import time
from pprint import pprint
2020-06-17 03:24:16 -04:00
2024-04-24 09:43:56 -04:00
class RedisDumpMiddleware(object):
2020-07-01 18:38:37 -04:00
def __init__(self, get_response=None):
2020-06-17 03:24:16 -04:00
self.get_response = get_response
def activated(self, request):
2024-04-24 09:43:56 -04:00
return settings.DEBUG_QUERIES or (
hasattr(request, "activated_segments") and "db_profiler" in request.activated_segments
)
2020-06-30 19:46:32 -04:00
def process_view(self, request, callback, callback_args, callback_kwargs):
2024-04-24 09:43:56 -04:00
if not self.activated(request):
return
if not getattr(Connection, "_logging", False):
# save old methods
2024-04-24 09:43:56 -04:00
setattr(Connection, "_logging", True)
connection.queriesx = []
2024-04-24 09:43:56 -04:00
Redis.execute_command = self._instrument(Redis.execute_command)
Pipeline._execute_transaction = self._instrument_pipeline(Pipeline._execute_transaction)
2020-06-30 19:46:32 -04:00
def process_celery(self, profiler):
2024-04-24 09:43:56 -04:00
if not self.activated(profiler):
return
if not getattr(Connection, "_logging", False):
# save old methods
2024-04-24 09:43:56 -04:00
setattr(Connection, "_logging", True)
Redis.execute_command = self._instrument(Redis.execute_command)
Pipeline._execute_transaction = self._instrument_pipeline(Pipeline._execute_transaction)
2020-06-30 19:46:32 -04:00
def process_response(self, request, response):
# if settings.DEBUG and hasattr(self, 'orig_pack_command'):
# # remove instrumentation from redis
# setattr(Connection, '_logging', False)
# Connection.pack_command = \
# self.orig_pack_command
return response
2020-06-30 19:46:32 -04:00
def _instrument(self, original_method):
def instrumented_method(*args, **kwargs):
message = self.process_message(*args, **kwargs)
if not message:
return original_method(*args, **kwargs)
start = time()
result = original_method(*args, **kwargs)
stop = time()
duration = stop - start
2024-04-24 09:43:56 -04:00
if not getattr(connection, "queriesx", False):
connection.queriesx = []
2024-04-24 09:43:56 -04:00
connection.queriesx.append(
{
message["redis_server_name"]: message,
"time": "%.6f" % duration,
}
)
return result
2024-04-24 09:43:56 -04:00
return instrumented_method
def _instrument_pipeline(self, original_method):
def instrumented_method(*args, **kwargs):
message = self.process_pipeline(*args, **kwargs)
if not message:
return original_method(*args, **kwargs)
start = time()
result = original_method(*args, **kwargs)
stop = time()
duration = stop - start
2024-04-24 09:43:56 -04:00
if not getattr(connection, "queriesx", False):
connection.queriesx = []
2024-04-24 09:43:56 -04:00
connection.queriesx.append(
{
message["redis_server_name"]: message,
"time": "%.6f" % duration,
}
)
return result
2024-04-24 09:43:56 -04:00
return instrumented_method
2024-04-24 09:43:56 -04:00
def process_message(self, *args, **kwargs):
query = []
redis_server_name = None
for a, arg in enumerate(args):
if isinstance(arg, Redis):
redis_connection = arg
2024-04-24 09:43:56 -04:00
redis_server_name = redis_connection.connection_pool.connection_kwargs["host"]
if "db-redis-user" in redis_server_name:
redis_server_name = "redis_user"
elif "db-redis-session" in redis_server_name:
redis_server_name = "redis_session"
elif "db-redis-story" in redis_server_name:
redis_server_name = "redis_story"
elif "db-redis-pubsub" in redis_server_name:
redis_server_name = "redis_pubsub"
elif "db_redis" in redis_server_name:
redis_server_name = "redis_user"
continue
2013-10-08 10:10:07 -07:00
if len(str(arg)) > 100:
2014-12-01 16:31:13 -08:00
arg = "[%s bytes]" % len(str(arg))
2024-04-24 09:43:56 -04:00
query.append(str(arg).replace("\n", ""))
return {"query": f"{redis_server_name}: {' '.join(query)}", "redis_server_name": redis_server_name}
def process_pipeline(self, *args, **kwargs):
queries = []
redis_server_name = None
for a, arg in enumerate(args):
if isinstance(arg, Connection):
continue
if isinstance(arg, Pipeline):
redis_connection = arg
2024-04-24 09:43:56 -04:00
redis_server_name = redis_connection.connection_pool.connection_kwargs["host"]
if "db-redis-user" in redis_server_name:
redis_server_name = "redis_user"
elif "db-redis-session" in redis_server_name:
redis_server_name = "redis_session"
elif "db-redis-story" in redis_server_name:
redis_server_name = "redis_story"
elif "db-redis-pubsub" in redis_server_name:
redis_server_name = "redis_pubsub"
elif "db_redis" in redis_server_name:
redis_server_name = "redis_user"
continue
if not isinstance(arg, list):
continue
for command in arg:
command_query = " ".join([str(c) for c in command[0]])
queries.append(command_query)
if len(str(arg)) > 10000:
arg = "[%s bytes]" % len(str(arg))
# query.append(str(arg).replace('\n', ''))
2024-04-24 09:43:56 -04:00
queries_str = "\n\t\t\t\t\t\t~FC".join(queries)
return {"query": f"{redis_server_name}: {queries_str}", "redis_server_name": redis_server_name}
2020-06-17 03:24:16 -04:00
def __call__(self, request):
2020-06-30 19:46:32 -04:00
response = None
2024-04-24 09:43:56 -04:00
if hasattr(self, "process_request"):
2020-06-30 19:46:32 -04:00
response = self.process_request(request)
if not response:
response = self.get_response(request)
2024-04-24 09:43:56 -04:00
if hasattr(self, "process_response"):
2020-06-30 19:46:32 -04:00
response = self.process_response(request, response)
2020-06-17 03:24:16 -04:00
2020-06-30 19:46:32 -04:00
return response