// Generated by CoffeeScript 2.6.1 (function() { var fs, log, redis, unread_counts; fs = require('fs'); redis = require('redis'); log = require('./log.js'); unread_counts = (server) => { var ENV_DEV, ENV_DOCKER, ENV_PROD, REDIS_PORT, REDIS_SERVER, SECURE, active_connections, io, pub_client, redis_opts, setup_redis_client, sub_client; ENV_DEV = process.env.NODE_ENV === 'development' || process.env.NODE_ENV === 'debug'; ENV_PROD = process.env.NODE_ENV === 'production'; ENV_DOCKER = process.env.NODE_ENV === 'docker'; REDIS_SERVER = "db_redis"; if (ENV_DEV) { REDIS_SERVER = 'localhost'; } else if (ENV_PROD) { REDIS_SERVER = 'db-redis-pubsub.service.nyc1.consul'; } SECURE = !!process.env.NODE_SSL; REDIS_PORT = ENV_DOCKER ? 6579 : 6383; // client = redis.createClient 6379, REDIS_SERVER // RedisStore = require 'socket.io/lib/stores/redis' // rpub = redis.createClient 6379, REDIS_SERVER // rsub = redis.createClient 6379, REDIS_SERVER // rclient = redis.createClient 6379, REDIS_SERVER log.debug("Starting NewsBlur unread count server..."); if (!ENV_DEV && !process.env.NODE_ENV) { log.debug("Specify NODE_ENV="); return; } else if (ENV_DEV) { log.debug("Running as development server"); } else if (ENV_DOCKER) { log.debug("Running as docker server"); } else { log.debug("Running as production server"); } // Create Redis clients for Socket.IO adapter with improved configuration redis_opts = { host: REDIS_SERVER, port: REDIS_PORT, retry_strategy: function(options) { // Exponential backoff with a cap return Math.min(options.attempt * 100, 3000); }, connect_timeout: 10000 }; pub_client = redis.createClient(redis_opts); sub_client = redis.createClient(redis_opts); // Handle Redis adapter client errors pub_client.on("error", function(err) { return log.debug(`Redis Pub Error: ${err}`); }); sub_client.on("error", function(err) { return log.debug(`Redis Sub Error: ${err}`); }); pub_client.on("reconnecting", function(attempt) { return log.debug(`Redis Pub reconnecting... Attempt ${attempt}`); }); sub_client.on("reconnecting", function(attempt) { return log.debug(`Redis Sub reconnecting... Attempt ${attempt}`); }); io = require('socket.io')(server, { path: "/v3/socket.io", pingTimeout: 120000, // Increased from 60s to 120s pingInterval: 30000, // Increased from 25s to 30s connectTimeout: 60000, // Increased from 45s to 60s transports: ['websocket'], // Prefer websocket transport maxHttpBufferSize: 1e8, // Increase buffer size to 100MB cors: { origin: "*", methods: ["GET", "POST"] }, allowEIO3: true, // Allow compatibility with Socket.IO v3 clients adapter: require('@socket.io/redis-adapter').createAdapter(pub_client, sub_client) }); // Setup Redis error handling and reconnection setup_redis_client = function(socket, username) { var client; client = redis.createClient({ host: REDIS_SERVER, port: REDIS_PORT, retry_strategy: function(options) { return Math.min(options.attempt * 100, 3000); }, connect_timeout: 10000 }); client.on("error", (err) => { return log.info(username, `Redis Error: ${err}`); }); // Don't quit on error, let retry strategy handle it client.on("reconnecting", (attempt) => { return log.info(username, `Redis reconnecting... Attempt ${attempt}`); }); return client; }; // Track active connections by username for debugging active_connections = {}; // Log engine events for debugging io.engine.on('connection', function(socket) { return log.debug(`Engine connection established: ${socket.id}`); }); io.engine.on('close', function(socket) { return log.debug(`Engine connection closed: ${socket.id}`); }); io.on('connection', function(socket) { var ip, socket_id; ip = socket.handshake.headers['X-Forwarded-For'] || socket.handshake.address; socket_id = socket.id; log.debug(`Socket connected: ${socket_id} from ${ip}`); // Store socket data for tracking socket.data = { ip: ip, socket_id: socket_id, connected_at: Date.now() }; // Set a longer ping timeout for this socket socket.conn.pingTimeout = 120000; socket.conn.on('error', function(err) { return log.debug(`Socket ${socket_id} - connection error: ${err}`); }); socket.conn.on('close', function(reason) { return log.debug(`Socket ${socket_id} - connection closed: ${reason}`); }); socket.on('subscribe:feeds', (feeds, username) => { var ref; // Store user data directly on the socket for access during disconnect socket.data.feeds = feeds; socket.data.username = username; socket.data.subscribed_at = Date.now(); log.info(username, `Connecting (${feeds.length} feeds, ${ip}), (${io.engine.clientsCount} connected) ${SECURE ? "(SSL)" : ""}`); // Track connections by username for debugging active_connections[username] = active_connections[username] || {}; active_connections[username][socket_id] = { connected_at: socket.data.connected_at, subscribed_at: socket.data.subscribed_at, feed_count: feeds.length }; log.debug(`${username} now has ${Object.keys(active_connections[username]).length} active connections, adding ${socket_id}`); if (!username) { return; } socket.on("error", function(err) { return log.debug(`Error (socket): ${err}`); }); if ((ref = socket.subscribe) != null) { ref.quit(); } socket.subscribe = setup_redis_client(socket, username); socket.subscribe.on("connect", () => { var feeds_story; log.info(username, `Connected (${feeds.length} feeds, ${ip}), (${io.engine.clientsCount} connected) ${SECURE ? "(SSL)" : "(non-SSL)"}`); socket.subscribe.subscribe(feeds); feeds_story = feeds.map(function(f) { return `${f}:story`; }); socket.subscribe.subscribe(feeds_story); return socket.subscribe.subscribe(username); }); return socket.subscribe.on('message', (channel, message) => { var event_name; event_name = 'feed:update'; if (channel === username) { event_name = 'user:update'; } else if (channel.indexOf(':story') >= 0) { event_name = 'feed:story:new'; } log.info(username, `Update on ${channel}: ${event_name} - ${message}`); return socket.emit(event_name, channel, message); }); }); return socket.on('disconnect', function(reason) { var connected_at, connection_duration, feeds, now, ref, subscribed_at, subscription_duration, username; // Use the data stored on the socket username = socket.data.username; feeds = socket.data.feeds; ip = socket.data.ip; socket_id = socket.data.socket_id; connected_at = socket.data.connected_at; subscribed_at = socket.data.subscribed_at; // Calculate connection duration now = Date.now(); connection_duration = now - (connected_at || now); subscription_duration = subscribed_at ? now - subscribed_at : 0; log.debug(`Socket ${socket_id} disconnected: ${reason}, username: ${username}, connection duration: ${connection_duration}ms, subscription duration: ${subscription_duration}ms`); // Update connection tracking if (username && active_connections[username]) { if (active_connections[username][socket_id]) { delete active_connections[username][socket_id]; log.debug(`${username} now has ${Object.keys(active_connections[username]).length} active connections after removing ${socket_id}`); } else { log.debug(`Socket ${socket_id} not found in active connections for ${username}`); } if (Object.keys(active_connections[username]).length === 0) { delete active_connections[username]; } } if ((ref = socket.subscribe) != null) { ref.quit(); } if (username && feeds) { return log.info(username, `Disconnect (${feeds.length} feeds, ${ip}), there are now ${io.engine.clientsCount} users. ${SECURE ? "(SSL)" : "(non-SSL)"}`); } }); }); io.engine.on('connection_error', function(err) { return log.debug(`Connection Error: ${err.code} - ${err.message}`); }); io.sockets.on('error', function(err) { return log.debug(`Error (sockets): ${err}`); }); // Periodically log connection stats setInterval(function() { var sockets, total_connections, total_tracked, total_users, username; total_users = Object.keys(active_connections).length; total_connections = io.engine.clientsCount; total_tracked = 0; for (username in active_connections) { sockets = active_connections[username]; total_tracked += Object.keys(sockets).length; } return log.debug(`Connection stats: ${total_users} users with ${total_connections} total connections (${total_tracked} tracked)`); }, 60000); return io; }; exports.unread_counts = unread_counts; }).call(this);