NewsBlur/utils/mongo_raw_log_middleware.py
Samuel Clay 0d6cb69548 Merge branch 'django1.11' into django2.0
* django1.11: (73 commits)
  Switching to new celery 4 standalone binary.
  Fixing various mongo data calls.
  Upgrading to latest celery 4 (holy moly), which required some big changes to project layout. Still needs supervisor scripts updated.
  Removing unused log on cookies.
  I believe this Context wrapping is still preserved. See this django ticket: https://code.djangoproject.com/ticket/28125. Reverting this fixes the error, so I'm assuming this is that type of render.
  Have to revert 3f122d5e03 because this broke existing sessions (logged me out) because the model has changed and the serialized model stored in redis no longer matches. Whew, this took a while to figure out.
  Upgrading redis cache.
  Adding cookies to path inspector.
  Removing dupe db log.
  Fixing missing DB logs (redis and mongo) due to this change in django 1.8: "connections.queries is now a read-only attribute."
  Removing migrations that set a default date of 2020-05-08. Not sure why this was committed. I thought we resolved the issue with default datetimes?
  Fixing CallableBool.
  Missing import
  Fixing runtime errors on django 1.10
  Fixing OAuth connect.
  Fixing various django1.9 issues, mainly around templates.
  BASE_DIR
  Not every story is from a feed.
  Styling background colors for newsletters.
  Styling more newsletter elements.
  ...
2020-06-30 12:34:59 -04:00

116 lines
No EOL
4.5 KiB
Python

from django.core.exceptions import MiddlewareNotUsed
from django.conf import settings
from django.db import connection
from pymongo.mongo_client import MongoClient
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
from time import time
import struct
import bson
import pymongo
from bson.errors import InvalidBSON
class MongoDumpMiddleware(object):
def __init__(self, get_response):
self.get_response = get_response
def activated(self, request):
return (settings.DEBUG_QUERIES or
(hasattr(request, 'activated_segments') and
'db_profiler' in request.activated_segments))
def process_view(self, request, callback, callback_args, callback_kwargs):
if not self.activated(request): return
self._used_msg_ids = []
if not getattr(MongoClient, '_logging', False):
# save old methods
setattr(MongoClient, '_logging', True)
MongoClient._send_message_with_response = \
self._instrument(MongoClient._send_message_with_response)
MongoReplicaSetClient._send_message_with_response = \
self._instrument(MongoReplicaSetClient._send_message_with_response)
return None
def process_celery(self, profiler):
if not self.activated(profiler): return
self._used_msg_ids = []
if not getattr(MongoClient, '_logging', False):
# save old methods
setattr(MongoClient, '_logging', True)
MongoClient._send_message_with_response = \
self._instrument(MongoClient._send_message_with_response)
MongoReplicaSetClient._send_message_with_response = \
self._instrument(MongoReplicaSetClient._send_message_with_response)
return None
def process_response(self, request, response):
# if settings.DEBUG and hasattr(self, 'orig_send_message') and hasattr(self, 'orig_send_message_with_response'):
# # remove instrumentation from pymongo
# MongoClient._send_message = \
# self.orig_send_message
# MongoClient._send_message_with_response = \
# self.orig_send_message_with_response
# MongoReplicaSetClient._send_message = \
# self.orig_rs_send_message
# MongoReplicaSetClient._send_message_with_response = \
# self.orig_rs_send_message_with_response
return response
def _instrument(self, original_method):
def instrumented_method(*args, **kwargs):
with args[0]._socket_for_writes() as sock_info:
query = args[1].get_message(False, sock_info, False)
message = _mongodb_decode_wire_protocol(query[1])
# message = _mongodb_decode_wire_protocol(args[1][1])
if not message or message['msg_id'] in self._used_msg_ids:
return original_method(*args, **kwargs)
self._used_msg_ids.append(message['msg_id'])
start = time()
result = original_method(*args, **kwargs)
stop = time()
duration = stop - start
if not getattr(connection, 'queriesx', False):
connection.queriesx = []
connection.queriesx.append({
'mongo': message,
'time': '%.3f' % duration,
})
return result
return instrumented_method
def __call__(self, request):
response = self.get_response(request)
return response
def _mongodb_decode_wire_protocol(message):
""" http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol """
MONGO_OPS = {
1000: 'msg',
2001: 'update',
2002: 'insert',
2003: 'reserved',
2004: 'query',
2005: 'get_more',
2006: 'delete',
2007: 'kill_cursors',
}
_, msg_id, _, opcode, _ = struct.unpack('<iiiii', message[:20])
op = MONGO_OPS.get(opcode, 'unknown')
zidx = 20
collection_name_size = message[zidx:].find('\0')
collection_name = message[zidx:zidx+collection_name_size]
if '.system.' in collection_name:
return
zidx += collection_name_size + 1
skip, limit = struct.unpack('<ii', message[zidx:zidx+8])
zidx += 8
msg = ""
try:
if message[zidx:]:
msg = bson.decode_all(message[zidx:])
except:
msg = 'invalid bson'
return { 'op': op, 'collection': collection_name,
'msg_id': msg_id, 'skip': skip, 'limit': limit,
'query': msg }