mirror of
https://github.com/samuelclay/NewsBlur.git
synced 2025-04-13 09:42:01 +00:00
Adding authentication for mongo. Hopefully this covers everything needed.
This commit is contained in:
parent
e11d4db4ea
commit
0754c0c049
9 changed files with 25 additions and 599 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -44,6 +44,7 @@ vendor/mms-agent/settings.py
|
|||
apps/social/spam.py
|
||||
venv*
|
||||
/backups
|
||||
config/mongodb_keyfile.key
|
||||
|
||||
# Docker Jinja templates
|
||||
docker/haproxy/haproxy.consul.cfg
|
||||
|
|
|
@ -7,6 +7,7 @@ git_secrets_repo: ssh://git@github.com/samuelclay/newsblur-secrets
|
|||
create_user: nb
|
||||
local_key: "{{ lookup('file', lookup('env','HOME') + '/.ssh/id_rsa.pub') }}"
|
||||
copy_local_key: "{{ lookup('file', '/srv/secrets-newsblur/keys/docker.key.pub') }}"
|
||||
mongodb_keyfile: "{{ lookup('file', '/srv/secrets-newsblur/keys/mongodb_keyfile.key') }}"
|
||||
sys_packages: [
|
||||
'git',
|
||||
'python3',
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
networks:
|
||||
- name: newsblurnet
|
||||
env:
|
||||
MONGODB_URI: 'mongodb://db-mongo.service.nyc1.consul:27017/admin?'
|
||||
MONGODB_URI: 'mongodb://nb:KCi@db-mongo.service.nyc1.consul:27017/admin?'
|
||||
ports:
|
||||
- '9216:9216'
|
||||
|
||||
|
|
|
@ -6,6 +6,12 @@
|
|||
mode: 0777
|
||||
path: /var/log/mongodb
|
||||
|
||||
- name: Copy MongoDB keyfile
|
||||
copy:
|
||||
content: "{{ mongodb_keyfile }}"
|
||||
dest: /srv/newsblur/config/mongodb_keyfile.key
|
||||
mode: 0400
|
||||
|
||||
- name: Make docker network for newsblurnet
|
||||
become: yes
|
||||
docker_network:
|
||||
|
@ -30,8 +36,9 @@
|
|||
volumes:
|
||||
- /mnt/{{ inventory_hostname | regex_replace('db-|-', '') }}:/data/db
|
||||
- /srv/newsblur/ansible/roles/mongo/templates/mongo.conf:/etc/mongod.conf
|
||||
- /srv/newsblur/config/mongodb_keyfile.key:/srv/newsblur/config/mongodb_keyfile.key
|
||||
- /var/log/mongodb/:/var/log/mongodb/
|
||||
- /opt/mongo/newsblur/backup:/backup/'
|
||||
- /opt/mongo/newsblur/backup:/backup/
|
||||
|
||||
- name: Register mongo in consul
|
||||
tags: consul
|
||||
|
|
|
@ -27,7 +27,9 @@ net:
|
|||
processManagement:
|
||||
timeZoneInfo: /usr/share/zoneinfo
|
||||
|
||||
# security:
|
||||
security:
|
||||
authorization: enabled
|
||||
keyFile: /srv/newsblur/config/mongodb_keyfile.key
|
||||
|
||||
operationProfiling:
|
||||
mode: slowOp
|
||||
|
|
|
@ -19,7 +19,7 @@ def db_check_postgres():
|
|||
settings.DATABASES['default']['NAME'],
|
||||
settings.DATABASES['default']['USER'],
|
||||
settings.DATABASES['default']['PASSWORD'],
|
||||
'postgres',
|
||||
'db-postgres.service.nyc1.consul',
|
||||
settings.DATABASES['default']['PORT'],
|
||||
)
|
||||
try:
|
||||
|
@ -67,7 +67,7 @@ def db_check_mysql():
|
|||
@app.route("/db_check/mongo")
|
||||
def db_check_mongo():
|
||||
try:
|
||||
client = pymongo.MongoClient('mongodb://mongo')
|
||||
client = pymongo.MongoClient(f'mongodb://{settings.MONGO_DB['username']}:{settings.MONGO_DB['password']}@db-mongo.server.nyc1.consul')
|
||||
db = client.newsblur
|
||||
except:
|
||||
abort(503)
|
||||
|
@ -101,27 +101,10 @@ def db_check_mongo():
|
|||
|
||||
return str(stories)
|
||||
|
||||
@app.route("/db_check/redis")
|
||||
def db_check_redis():
|
||||
try:
|
||||
r = redis.Redis('redis', db=0)
|
||||
except:
|
||||
abort(503)
|
||||
|
||||
try:
|
||||
randkey = r.randomkey()
|
||||
except:
|
||||
abort(504)
|
||||
|
||||
if randkey:
|
||||
return str(randkey)
|
||||
else:
|
||||
abort(505)
|
||||
|
||||
@app.route("/db_check/redis_user")
|
||||
def db_check_redis_user():
|
||||
try:
|
||||
r = redis.Redis('redis', db=0)
|
||||
r = redis.Redis('db-redis-user.server.nyc1.consul', db=0)
|
||||
except:
|
||||
abort(503)
|
||||
|
||||
|
@ -138,7 +121,7 @@ def db_check_redis_user():
|
|||
@app.route("/db_check/redis_story")
|
||||
def db_check_redis_story():
|
||||
try:
|
||||
r = redis.Redis('redis', db=1)
|
||||
r = redis.Redis('db-redis-story.server.nyc1.consul', db=1)
|
||||
except:
|
||||
abort(503)
|
||||
|
||||
|
@ -155,7 +138,7 @@ def db_check_redis_story():
|
|||
@app.route("/db_check/redis_sessions")
|
||||
def db_check_redis_sessions():
|
||||
try:
|
||||
r = redis.Redis('redis', db=5)
|
||||
r = redis.Redis('db-redis-sessions.server.nyc1.consul', db=5)
|
||||
except:
|
||||
abort(503)
|
||||
|
||||
|
@ -172,7 +155,7 @@ def db_check_redis_sessions():
|
|||
@app.route("/db_check/redis_pubsub")
|
||||
def db_check_redis_pubsub():
|
||||
try:
|
||||
r = redis.Redis('redis', db=1)
|
||||
r = redis.Redis('db-redis-pubsub.server.nyc1.consul', db=1)
|
||||
except:
|
||||
abort(503)
|
||||
|
||||
|
|
|
@ -6,6 +6,8 @@ favicons = (app) =>
|
|||
ENV_DEV = process.env.NODE_ENV == 'development' or process.env.NODE_ENV == 'development'
|
||||
ENV_PROD = process.env.NODE_ENV == 'production'
|
||||
ENV_DOCKER = process.env.NODE_ENV == 'docker'
|
||||
MONGODB_USERNAME = process.env.MONGODB_USERNAME
|
||||
MONGODB_PASSWORD = process.env.MONGODB_PASSWORD
|
||||
MONGODB_SERVER = "db_mongo"
|
||||
if ENV_DEV
|
||||
MONGODB_SERVER = 'localhost'
|
||||
|
@ -27,7 +29,7 @@ favicons = (app) =>
|
|||
log.debug "Running as production server"
|
||||
|
||||
if ENV_PROD
|
||||
url = "mongodb://#{MONGODB_SERVER}:#{MONGODB_PORT}/newsblur?replicaSet=nbset&readPreference=secondaryPreferred"
|
||||
url = "mongodb://#{MONGODB_USERNAME}:#{MONGODB_PASSWORD}@#{MONGODB_SERVER}:#{MONGODB_PORT}/newsblur?replicaSet=nbset&readPreference=secondaryPreferred"
|
||||
else
|
||||
url = "mongodb://#{MONGODB_SERVER}:#{MONGODB_PORT}/newsblur"
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ def main():
|
|||
r = redis.Redis(connection_pool=settings.REDIS_ANALYTICS_POOL)
|
||||
|
||||
try:
|
||||
client = pymongo.MongoClient('mongodb://%s' % settings.MONGO_DB['host'])
|
||||
client = pymongo.MongoClient(f'mongodb://{settings.MONGO_DB['username']}:{settings.MONGO_DB['password']}@{settings.MONGO_DB['host']}')
|
||||
feeds_fetched = client.newsblur.statistics.find_one({"key": "feeds_fetched"})['value']
|
||||
redis_task_fetches = int(r.get(monitor_key) or 0)
|
||||
except Exception as e:
|
||||
|
|
570
vendor/mms-agent/mmsAgent.py
vendored
570
vendor/mms-agent/mmsAgent.py
vendored
|
@ -1,570 +0,0 @@
|
|||
"""
|
||||
(C) Copyright 2011, 10gen
|
||||
|
||||
This is a label on a mattress. Do not modify this file!
|
||||
"""
|
||||
|
||||
# App
|
||||
import pymongo, munin, nonBlockingStats, blockingStats, getLogs, traceback
|
||||
|
||||
# Mongo
|
||||
import bson
|
||||
|
||||
# Python
|
||||
import threading, urllib.request, urllib.parse, urllib.error, urllib.request, urllib.error, urllib.parse, socket, zlib, time, sets, re, gc
|
||||
|
||||
mmsAgentVersion = "1.5.7"
|
||||
|
||||
class MmsAgent( object ):
|
||||
""" The mms agent object """
|
||||
|
||||
def __init__( self, settings, agentVersion, pythonVersion, pymongoVersion, pymongoHasC, agentHostname, logger, sessionKey ):
|
||||
""" Constructor """
|
||||
self.logger = logger
|
||||
|
||||
self.sessionKey = sessionKey
|
||||
|
||||
self.settings = settings
|
||||
|
||||
self.pythonVersion = pythonVersion
|
||||
self.pymongoVersion = pymongoVersion
|
||||
self.pymongoHasC = pymongoHasC
|
||||
self.agentHostname = agentHostname
|
||||
self.agentVersion = agentVersion
|
||||
self.srcVersion = self.settings.src_version
|
||||
|
||||
self.collectionInterval = settings.collection_interval
|
||||
self.logCollectionInterval = settings.log_interval
|
||||
self.confInterval = settings.conf_interval
|
||||
|
||||
self.disableProfileDataCollection = settings.disableProfileDataCollection
|
||||
self.disableGetLogsDataCollection = settings.disableGetLogsDataCollection
|
||||
|
||||
self.hostStateLock = threading.Lock()
|
||||
self.hostState = { }
|
||||
self.hostStateLastUpdate = { }
|
||||
|
||||
self.operationFailureUrl = settings.operationFailureUrl % settings.mms_key
|
||||
|
||||
self.serverHostDefs = { }
|
||||
self.serverHostDefsLock = threading.Lock()
|
||||
|
||||
self.serverUniqueHosts = { }
|
||||
self.serverUniqueHostsLock = threading.Lock()
|
||||
|
||||
self.pingUrl = settings.ping_url % settings.mms_key
|
||||
|
||||
self.disableDbstats = False
|
||||
|
||||
self.done = False
|
||||
|
||||
socket.setdefaulttimeout( settings.socket_timeout )
|
||||
|
||||
def closeDbConnection( self, hostKey, connection ):
|
||||
""" Close the connection and reset """
|
||||
self.serverHostDefsLock.acquire()
|
||||
try:
|
||||
try:
|
||||
if hostKey in self.serverHostDefs:
|
||||
if 'availableCmds' in self.serverHostDefs[hostKey]:
|
||||
del self.serverHostDefs[hostKey]['availableCmds']
|
||||
except:
|
||||
pass
|
||||
finally:
|
||||
self.serverHostDefsLock.release()
|
||||
|
||||
if connection is not None:
|
||||
try:
|
||||
connection.disconnect()
|
||||
except:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def getDbConnection( self, hostKey ):
|
||||
""" Returns a database connection """
|
||||
|
||||
hostDef = None
|
||||
self.serverHostDefsLock.acquire()
|
||||
try:
|
||||
if hostKey in self.serverHostDefs:
|
||||
hostDef = self.serverHostDefs[hostKey]
|
||||
|
||||
finally:
|
||||
self.serverHostDefsLock.release()
|
||||
|
||||
if not hostDef:
|
||||
return None
|
||||
|
||||
if not self.settings.useSslForAllConnections:
|
||||
useSsl = False
|
||||
if 'ssl' in hostDef:
|
||||
useSsl = hostDef['ssl']
|
||||
|
||||
if useSsl:
|
||||
return pymongo.Connection( hostDef['mongoUri'] , slave_okay=True, ssl=True, document_class=bson.son.SON )
|
||||
else:
|
||||
return pymongo.Connection( hostDef['mongoUri'] , slave_okay=True, document_class=bson.son.SON )
|
||||
else:
|
||||
return pymongo.Connection( hostDef['mongoUri'] , slave_okay=True, ssl=True, document_class=bson.son.SON )
|
||||
|
||||
def handleOperationFailure( self, hostKey, operation, exception ):
|
||||
""" Process a query or command operation failure """
|
||||
try:
|
||||
msg = { }
|
||||
|
||||
exceptionStr = traceback.format_exc( exception )
|
||||
|
||||
# These are excpected and we do not need to log.
|
||||
if exceptionStr.find( 'unrecognized command:' ) > -1:
|
||||
return
|
||||
|
||||
if exceptionStr.find( 'failed: not running with --replSet' ) > -1:
|
||||
return
|
||||
|
||||
if exceptionStr.find( 'failed: ns not found' ) > -1:
|
||||
return
|
||||
|
||||
if exceptionStr.find( 'not master or secondary' ) > -1:
|
||||
return
|
||||
|
||||
msg['hostnameAndPort'] = hostKey
|
||||
msg['operation'] = operation
|
||||
msg['exception'] = exceptionStr
|
||||
|
||||
res = None
|
||||
try:
|
||||
res = urllib.request.urlopen( self.operationFailureUrl, bson.binary.Binary( bson.BSON.encode( msg, check_keys=False ) ) )
|
||||
res.read()
|
||||
finally:
|
||||
if res is not None:
|
||||
res.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
def haveHostDef( self, hostKey ):
|
||||
""" Returns true if this is a known host """
|
||||
self.serverHostDefsLock.acquire()
|
||||
try:
|
||||
return hostKey in self.serverHostDefs
|
||||
finally:
|
||||
self.serverHostDefsLock.release()
|
||||
|
||||
def _removeHostState( self, hostKey ):
|
||||
""" Delete the state for a host """
|
||||
self.hostStateLock.acquire()
|
||||
try:
|
||||
if hostKey in self.hostState:
|
||||
del self.hostState[hostKey]
|
||||
finally:
|
||||
self.hostStateLock.release()
|
||||
|
||||
def extractHostname( self, hostKey ):
|
||||
""" Extract the hostname from the hostname:port hostKey """
|
||||
return hostKey[0 : hostKey.find( ':' )]
|
||||
|
||||
def extractPort( self, hostKey ):
|
||||
""" Extract the port from the hostname:port hostKey """
|
||||
return int( hostKey[ ( hostKey.find(':' ) + 1 ) : ] )
|
||||
|
||||
def setMuninHostState( self, hostname, state ):
|
||||
""" Set the state inside of the lock - there are multiple threads who set state here """
|
||||
if state is None:
|
||||
return
|
||||
|
||||
# Compress only once for munin data
|
||||
stateData = bson.binary.Binary( zlib.compress( bson.BSON.encode( state, check_keys=False ), 9 ) )
|
||||
|
||||
try:
|
||||
self.hostStateLock.acquire()
|
||||
|
||||
for hostKey in list(self.hostState.keys()):
|
||||
if hostname == self.extractHostname( hostKey ):
|
||||
state['port'] = self.extractPort( hostKey )
|
||||
self._setHostStateValue( hostKey, 'munin', stateData )
|
||||
finally:
|
||||
self.hostStateLock.release()
|
||||
|
||||
def cleanHostState( self ):
|
||||
""" Make sure the host state data is current """
|
||||
try:
|
||||
self.hostStateLock.acquire()
|
||||
now = time.time()
|
||||
|
||||
toDel = []
|
||||
|
||||
for hostKey in self.hostState:
|
||||
if hostKey not in self.hostStateLastUpdate:
|
||||
continue
|
||||
|
||||
if ( now - self.hostStateLastUpdate[hostKey] ) > 60:
|
||||
toDel.append( hostKey )
|
||||
|
||||
for hostKey in toDel:
|
||||
del self.hostState[hostKey]
|
||||
|
||||
finally:
|
||||
self.hostStateLock.release()
|
||||
|
||||
def setHostState( self, hostKey, stateType, state ):
|
||||
""" Set the state inside of the lock - there are multiple threads who set state here """
|
||||
if state is None:
|
||||
return
|
||||
|
||||
try:
|
||||
self.hostStateLock.acquire()
|
||||
self._setHostStateValue( hostKey, stateType, state )
|
||||
finally:
|
||||
self.hostStateLock.release()
|
||||
|
||||
def _setHostStateValue( self, hostKey, stateType, state):
|
||||
""" Set the host state. This can only be called when a host state lock is in place """
|
||||
|
||||
if hostKey not in self.hostState:
|
||||
self.hostState[hostKey] = { }
|
||||
|
||||
if stateType == 'logs':
|
||||
if 'logs' not in self.hostState[hostKey] or self.hostState[hostKey][stateType] is None:
|
||||
self.hostState[hostKey][stateType] = []
|
||||
|
||||
self.hostState[hostKey][stateType].append(bson.binary.Binary( zlib.compress( bson.BSON.encode( state, check_keys=False ), 9 ) ) )
|
||||
|
||||
elif stateType == 'munin':
|
||||
|
||||
self.hostState[hostKey][stateType] = state
|
||||
|
||||
else:
|
||||
self.hostState[hostKey][stateType] = bson.binary.Binary( zlib.compress( bson.BSON.encode( state, check_keys=False ), 9 ) )
|
||||
|
||||
self.hostStateLastUpdate[hostKey] = time.time()
|
||||
|
||||
def checkChangedHostDef( self, hostDef, hostDefLast ):
|
||||
""" Check to see if this host definition has changed. If it has, stop the thread and
|
||||
start a new one. This assumes it is called inside of serverHostDefsLock """
|
||||
|
||||
if not hostDefLast:
|
||||
return
|
||||
|
||||
hostKey = hostDef['hostKey']
|
||||
|
||||
changed = False
|
||||
|
||||
# The ssl configuration
|
||||
if hostDef['ssl'] and not hostDefLast['ssl']:
|
||||
changed = True
|
||||
|
||||
if not hostDef['ssl'] and hostDefLast['ssl']:
|
||||
changed = True
|
||||
|
||||
# The profiler configuration
|
||||
if not self.disableProfileDataCollection and hostDef['profiler'] and not hostDefLast['profiler']:
|
||||
changed = True
|
||||
|
||||
if not self.disableProfileDataCollection and not hostDef['profiler'] and hostDefLast['profiler']:
|
||||
changed = True
|
||||
|
||||
# The getLogs configuration
|
||||
if not self.disableProfileDataCollection and hostDef['getLogs'] and not hostDefLast['getLogs']:
|
||||
changed = True
|
||||
|
||||
if not self.disableProfileDataCollection and not hostDef['getLogs'] and hostDefLast['getLogs']:
|
||||
changed = True
|
||||
|
||||
# The mongo URI configuration
|
||||
if hostDef['mongoUri'] != hostDefLast['mongoUri']:
|
||||
changed = True
|
||||
|
||||
hostDefLast = None
|
||||
|
||||
if not changed:
|
||||
return
|
||||
|
||||
self.stopAndClearHost( hostKey )
|
||||
|
||||
# Start the new thread and set the host def.
|
||||
self.startMonitoringThreads( hostDef )
|
||||
|
||||
def hostDefValue( self, hostKey, key ):
|
||||
""" Returns the host def value """
|
||||
|
||||
hostDef = None
|
||||
self.serverHostDefsLock.acquire()
|
||||
try:
|
||||
if hostKey in self.serverHostDefs:
|
||||
hostDef = self.serverHostDefs[hostKey]
|
||||
finally:
|
||||
self.serverHostDefsLock.release()
|
||||
|
||||
if not hostDef or key not in hostDef:
|
||||
return None
|
||||
|
||||
return hostDef[key]
|
||||
|
||||
def hasCommand( self, command, hostKey, connection ):
|
||||
""" Returns True if this command is contained """
|
||||
cmd = command.lower()
|
||||
|
||||
hostDef = None
|
||||
self.serverHostDefsLock.acquire()
|
||||
try:
|
||||
if hostKey in self.serverHostDefs:
|
||||
hostDef = self.serverHostDefs[hostKey]
|
||||
finally:
|
||||
self.serverHostDefsLock.release()
|
||||
|
||||
if not hostDef:
|
||||
return False
|
||||
|
||||
if 'availableCmds' in hostDef:
|
||||
return cmd in hostDef['availableCmds']
|
||||
|
||||
availableCmds = sets.Set()
|
||||
|
||||
for field, value in list(connection.admin.command( 'listCommands' )['commands'].items()):
|
||||
availableCmds.add( field.lower() )
|
||||
|
||||
hostDef['availableCmds'] = availableCmds
|
||||
|
||||
return cmd in hostDef['availableCmds']
|
||||
|
||||
def startMonitoringThreads( self, hostDef ):
|
||||
""" Start server status and munin threads. This assumes it is called inside of serverHostDefsLock """
|
||||
|
||||
hostKey = hostDef['hostKey']
|
||||
|
||||
# Start the non-blocking stats thread
|
||||
hostDef['nonBlockingStatsThread'] = nonBlockingStats.NonBlockingMongoStatsThread( hostKey, self )
|
||||
hostDef['nonBlockingStatsThread'].setName( ( 'NonBlockingMongoStatsThread-' + hostKey ) )
|
||||
hostDef['nonBlockingStatsThread'].start()
|
||||
|
||||
# Start the blocking stats thread
|
||||
hostDef['blockingStatsThread'] = blockingStats.BlockingMongoStatsThread( hostKey, self )
|
||||
hostDef['blockingStatsThread'].setName( ( 'BlockingMongoStatsThread-' + hostKey ) )
|
||||
hostDef['blockingStatsThread'].start()
|
||||
|
||||
if not self.disableProfileDataCollection and hostDef['getLogs']:
|
||||
hostDef['getLogsThread'] = getLogs.GetLogsThread( hostKey, self )
|
||||
hostDef['getLogsThread'].setName( ( 'GetLogsThread-' + hostKey ) )
|
||||
hostDef['getLogsThread'].start()
|
||||
|
||||
# Start the munin thread for the server, if there is not one running.
|
||||
self._startMuninThread( hostDef['hostname'] )
|
||||
|
||||
self.serverHostDefs[hostDef['hostKey']] = hostDef
|
||||
|
||||
def _startMuninThread( self, hostname ):
|
||||
""" Start the munin thread if one is not running """
|
||||
if not getattr(self.settings, 'enableMunin', True):
|
||||
return
|
||||
|
||||
try:
|
||||
self.serverUniqueHostsLock.acquire()
|
||||
if hostname not in self.serverUniqueHosts:
|
||||
self.serverUniqueHosts[hostname] = munin.MuninThread( hostname, self )
|
||||
self.serverUniqueHosts[hostname].setName( ( 'MuninThread-' + hostname ) )
|
||||
self.serverUniqueHosts[hostname].start()
|
||||
finally:
|
||||
self.serverUniqueHostsLock.release()
|
||||
|
||||
def hasUniqueServer( self, hostname ):
|
||||
""" Return true if there hostname is in the list """
|
||||
try:
|
||||
self.serverUniqueHostsLock.acquire()
|
||||
return hostname in self.serverUniqueHosts
|
||||
finally:
|
||||
self.serverUniqueHostsLock.release()
|
||||
|
||||
def extractHostDef( self, host ):
|
||||
""" Return the host def for the host """
|
||||
|
||||
hostKey = host['hostKey']
|
||||
|
||||
hostDef = None
|
||||
hostDefLast = None
|
||||
|
||||
# Check to see if we already have this object.
|
||||
if hostKey not in self.serverHostDefs:
|
||||
hostDef = { }
|
||||
hostDef['hostname'] = host['hostname']
|
||||
hostDef['id'] = host['id']
|
||||
hostDef['hostKey'] = host['hostKey']
|
||||
hostDef['port'] = host['port']
|
||||
else:
|
||||
hostDefLast = { }
|
||||
hostDef = self.serverHostDefs[hostKey]
|
||||
hostDefLast['ssl'] = hostDef['ssl']
|
||||
hostDefLast['profiler'] = hostDef['profiler']
|
||||
hostDefLast['getLogs'] = hostDef['getLogs']
|
||||
hostDefLast['mongoUri'] = hostDef['mongoUri']
|
||||
|
||||
mongoUri = host['uri']
|
||||
|
||||
# Check to see if the global username/password is set in the settings.py file.
|
||||
if getattr( self.settings, 'globalAuthUsername', None ) is not None and getattr( self.settings, 'globalAuthPassword', None ) is not None:
|
||||
|
||||
# Assemble the credentials string.
|
||||
credentials = '%(username)s:%(password)s' % {
|
||||
'username' : urllib.parse.quote_plus( self.settings.globalAuthUsername ),
|
||||
'password' : urllib.parse.quote_plus( self.settings.globalAuthPassword )
|
||||
}
|
||||
|
||||
if not '@' in mongoUri:
|
||||
# This means that we do have an existing password (from the central servers).
|
||||
mongoUri = re.sub( '(^mongodb://)(.+)', 'mongodb://' + credentials + '@\\2', mongoUri )
|
||||
else:
|
||||
# This means a password was sent by the central servers.
|
||||
mongoUri = re.sub( '(^mongodb://.+:.+@)(.+)', 'mongodb://' + credentials + '@\\2', mongoUri )
|
||||
|
||||
hostDef['mongoUri'] = mongoUri
|
||||
|
||||
hostDef['munin'] = host['munin']
|
||||
hostDef['profiler'] = host['profiler']
|
||||
|
||||
if 'sslEnabled' not in host:
|
||||
hostDef['ssl'] = False
|
||||
else:
|
||||
hostDef['ssl'] = host['sslEnabled']
|
||||
|
||||
if 'getLogs' not in host:
|
||||
hostDef['getLogs'] = False
|
||||
else:
|
||||
hostDef['getLogs'] = host['getLogs']
|
||||
|
||||
return ( hostDef, hostDefLast )
|
||||
|
||||
def isValidMonitorConn( self, hostKey, conn ):
|
||||
""" In pymongo <= 1.9, even with slave_okay set, we will re-route to master if the secondary we're talking to goes down - this should work around that """
|
||||
if not conn:
|
||||
return False
|
||||
|
||||
hostDef = None
|
||||
self.serverHostDefsLock.acquire()
|
||||
try:
|
||||
if hostKey in self.serverHostDefs:
|
||||
hostDef = self.serverHostDefs[hostKey]
|
||||
finally:
|
||||
self.serverHostDefsLock.release()
|
||||
|
||||
if not hostDef:
|
||||
return False
|
||||
|
||||
if conn.host is not None:
|
||||
if conn.host != hostDef['hostname'] or str( conn.port ) != str( hostDef['port'] ):
|
||||
self.logger.warning( 'replica set switched hosts, disconnecting - wanted: ' + str( hostDef['mongoUri'] ) + ' - got: mongodb://' + str( conn.host ) + ':' + str( conn.port ) )
|
||||
if conn is not None:
|
||||
conn.disconnect()
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
return True
|
||||
|
||||
def _handleRemote( self, req ):
|
||||
""" Send the data to the central MMS servers """
|
||||
try:
|
||||
if req is None:
|
||||
return
|
||||
|
||||
res = None
|
||||
|
||||
try:
|
||||
res = urllib.request.urlopen( self.pingUrl, req )
|
||||
res.read()
|
||||
finally:
|
||||
if res is not None:
|
||||
res.close()
|
||||
|
||||
except Exception:
|
||||
self.logger.warning( "Problem sending data to MMS (check firewall and network)" )
|
||||
|
||||
def _assemblePingRequest( self ):
|
||||
""" Create the ping data request """
|
||||
try:
|
||||
if self.hostState is None:
|
||||
return None
|
||||
|
||||
req = { 'key' : self.settings.mms_key, 'hosts' : self.hostState }
|
||||
|
||||
req['agentVersion'] = self.agentVersion
|
||||
req['agentHostname'] = self.agentHostname
|
||||
req['pythonVersion'] = self.pythonVersion
|
||||
req['pymongoVersion'] = self.pymongoVersion
|
||||
req['pymongoHasC'] = self.pymongoHasC
|
||||
req['agentSessionKey'] = self.sessionKey
|
||||
req['srcVersion'] = self.srcVersion
|
||||
req['dataFormat'] = 1
|
||||
req['disableProfileDataCollection'] = self.disableProfileDataCollection
|
||||
req['disableGetLogsDataCollection'] = self.disableGetLogsDataCollection
|
||||
|
||||
return bson.BSON.encode( req, check_keys=False )
|
||||
|
||||
finally:
|
||||
del self.hostState
|
||||
self.hostState = { }
|
||||
|
||||
def sendDataToMms( self ):
|
||||
""" Assemble the ping request and send the data to MMS """
|
||||
data = None
|
||||
|
||||
try:
|
||||
self.hostStateLock.acquire()
|
||||
# Empty dictionary is False
|
||||
#if self.hostState:
|
||||
if ( self.hostState is not None and len( self.hostState ) > 0 ):
|
||||
data = self._assemblePingRequest()
|
||||
finally:
|
||||
self.hostStateLock.release()
|
||||
|
||||
if data is None:
|
||||
return
|
||||
|
||||
self._handleRemote( data )
|
||||
|
||||
def stopAll( self ):
|
||||
""" Stop all the threads """
|
||||
try:
|
||||
self.serverHostDefsLock.acquire()
|
||||
for hostKey in list(self.serverHostDefs.keys()):
|
||||
self.stopAndClearHost( hostKey )
|
||||
finally:
|
||||
self.serverHostDefsLock.release()
|
||||
|
||||
def stopAndClearHost( self, hostKey ):
|
||||
""" Stop the data collection for this host. This assumes a server host def lock """
|
||||
|
||||
if hostKey not in self.serverHostDefs:
|
||||
return
|
||||
|
||||
# Stop the current thread and delete the definition
|
||||
self.serverHostDefs[hostKey]['nonBlockingStatsThread'].stopThread()
|
||||
self.serverHostDefs[hostKey]['blockingStatsThread'].stopThread()
|
||||
|
||||
if 'getLogsThread' in self.serverHostDefs[hostKey]:
|
||||
self.serverHostDefs[hostKey]['getLogsThread'].stopThread()
|
||||
|
||||
self._removeHostState( hostKey )
|
||||
|
||||
# Check and see if this is the last definition of the unique server
|
||||
self._stopAndClearUniqueHost( self.serverHostDefs[hostKey]['hostname'] )
|
||||
|
||||
# Remove the object from the dictionary.
|
||||
del self.serverHostDefs[hostKey]
|
||||
|
||||
def _stopAndClearUniqueHost( self, hostname ):
|
||||
""" If this is the last reference to a hostname, remove. This requires a host def lock wrapping """
|
||||
try:
|
||||
self.serverUniqueHostsLock.acquire()
|
||||
|
||||
foundCount = 0
|
||||
|
||||
for hostKey in list(self.serverHostDefs.keys()):
|
||||
if self.serverHostDefs[hostKey]['hostname'] == hostname:
|
||||
foundCount = foundCount + 1
|
||||
|
||||
if foundCount <= 1 and hostname in self.serverUniqueHosts:
|
||||
self.serverUniqueHosts[hostname].stopThread()
|
||||
del self.serverUniqueHosts[hostname]
|
||||
finally:
|
||||
self.serverUniqueHostsLock.release()
|
||||
|
||||
|
Loading…
Add table
Reference in a new issue