NewsBlur/utils/mongo_raw_log_middleware.py

129 lines
4.4 KiB
Python
Raw Normal View History

2024-04-24 09:50:42 -04:00
import struct
from time import time
import bson
import pymongo
from bson.errors import InvalidBSON
2012-04-06 18:28:26 -07:00
from django.conf import settings
2024-04-24 09:50:42 -04:00
from django.core.exceptions import MiddlewareNotUsed
2012-04-06 18:28:26 -07:00
from django.db import connection
from pymongo.mongo_client import MongoClient
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
2024-04-24 09:50:42 -04:00
from utils import log as logging
2012-04-06 18:28:26 -07:00
2020-06-17 03:24:16 -04:00
2024-04-24 09:43:56 -04:00
class MongoDumpMiddleware(object):
2020-07-01 18:38:37 -04:00
def __init__(self, get_response=None):
2020-06-30 19:46:32 -04:00
self.get_response = get_response
2020-06-17 03:24:16 -04:00
def activated(self, request):
2024-04-24 09:43:56 -04:00
return settings.DEBUG_QUERIES or (
hasattr(request, "activated_segments") and "db_profiler" in request.activated_segments
)
2012-04-06 18:28:26 -07:00
def process_view(self, request, callback, callback_args, callback_kwargs):
2024-04-24 09:43:56 -04:00
if not self.activated(request):
return
2012-04-06 18:28:26 -07:00
self._used_msg_ids = []
2024-04-24 09:43:56 -04:00
if not getattr(MongoClient, "_logging", False):
2012-04-06 18:28:26 -07:00
# save old methods
2024-04-24 09:43:56 -04:00
setattr(MongoClient, "_logging", True)
if hasattr(MongoClient, "_send_message_with_response"):
connection.queriesx = []
2024-04-24 09:43:56 -04:00
MongoClient._send_message_with_response = self._instrument(
MongoClient._send_message_with_response
)
MongoReplicaSetClient._send_message_with_response = self._instrument(
MongoReplicaSetClient._send_message_with_response
)
return None
def process_celery(self, profiler):
2024-04-24 09:43:56 -04:00
if not self.activated(profiler):
return
self._used_msg_ids = []
2024-04-24 09:43:56 -04:00
if not getattr(MongoClient, "_logging", False):
# save old methods
2024-04-24 09:43:56 -04:00
setattr(MongoClient, "_logging", True)
if hasattr(MongoClient, "_send_message_with_response"):
MongoClient._send_message_with_response = self._instrument(
MongoClient._send_message_with_response
)
MongoReplicaSetClient._send_message_with_response = self._instrument(
MongoReplicaSetClient._send_message_with_response
)
2012-04-06 18:28:26 -07:00
return None
def process_response(self, request, response):
return response
def _instrument(self, original_method):
def instrumented_method(*args, **kwargs):
with args[0]._socket_for_writes() as sock_info:
query = args[1].get_message(False, sock_info, False)
message = _mongodb_decode_wire_protocol(query[1])
# message = _mongodb_decode_wire_protocol(args[1][1])
2024-04-24 09:43:56 -04:00
if not message or message["msg_id"] in self._used_msg_ids:
2012-04-06 18:28:26 -07:00
return original_method(*args, **kwargs)
2024-04-24 09:43:56 -04:00
self._used_msg_ids.append(message["msg_id"])
2012-04-06 18:28:26 -07:00
start = time()
result = original_method(*args, **kwargs)
stop = time()
duration = stop - start
2024-04-24 09:43:56 -04:00
if not getattr(connection, "queriesx", False):
connection.queriesx = []
2024-04-24 09:43:56 -04:00
connection.queriesx.append(
{
"mongo": message,
"time": "%.6f" % duration,
}
)
2012-04-06 18:28:26 -07:00
return result
2024-04-24 09:43:56 -04:00
2012-04-06 18:28:26 -07:00
return instrumented_method
2020-06-17 03:24:16 -04:00
def __call__(self, request):
response = self.get_response(request)
2020-06-30 19:46:32 -04:00
response = self.process_response(request, response)
2020-06-17 03:24:16 -04:00
return response
2024-04-24 09:43:56 -04:00
2012-04-06 18:28:26 -07:00
def _mongodb_decode_wire_protocol(message):
2024-04-24 09:43:56 -04:00
"""http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol"""
2012-04-06 18:28:26 -07:00
MONGO_OPS = {
2024-04-24 09:43:56 -04:00
1000: "msg",
2001: "update",
2002: "insert",
2003: "reserved",
2004: "query",
2005: "get_more",
2006: "delete",
2007: "kill_cursors",
2012-04-06 18:28:26 -07:00
}
2024-04-24 09:43:56 -04:00
_, msg_id, _, opcode, _ = struct.unpack("<iiiii", message[:20])
op = MONGO_OPS.get(opcode, "unknown")
2012-04-06 18:28:26 -07:00
zidx = 20
2024-04-24 09:43:56 -04:00
collection_name_size = message[zidx:].find(b"\0")
collection_name = message[zidx : zidx + collection_name_size].decode("utf-8")
if ".system." in collection_name:
return
2012-04-06 18:28:26 -07:00
zidx += collection_name_size + 1
2024-04-24 09:43:56 -04:00
skip, limit = struct.unpack("<ii", message[zidx : zidx + 8])
2012-04-06 18:28:26 -07:00
zidx += 8
msg = ""
try:
if message[zidx:]:
msg = bson.decode_all(message[zidx:])
2012-10-30 08:31:37 -07:00
except:
2024-04-24 09:43:56 -04:00
msg = "invalid bson"
return {
"op": op,
"collection": collection_name,
"msg_id": msg_id,
"skip": skip,
"limit": limit,
"query": msg,
}