mirror of
https://github.com/samuelclay/NewsBlur.git
synced 2025-04-13 09:42:01 +00:00
240 lines
9.5 KiB
JavaScript
240 lines
9.5 KiB
JavaScript
// Generated by CoffeeScript 2.6.1
|
|
(function() {
|
|
var fs, log, redis, unread_counts;
|
|
|
|
fs = require('fs');
|
|
|
|
redis = require('redis');
|
|
|
|
log = require('./log.js');
|
|
|
|
unread_counts = (server) => {
|
|
var ENV_DEV, ENV_DOCKER, ENV_PROD, REDIS_PORT, REDIS_SERVER, SECURE, active_connections, io, pub_client, redis_opts, setup_redis_client, sub_client;
|
|
ENV_DEV = process.env.NODE_ENV === 'development' || process.env.NODE_ENV === 'debug';
|
|
ENV_PROD = process.env.NODE_ENV === 'production';
|
|
ENV_DOCKER = process.env.NODE_ENV === 'docker';
|
|
REDIS_SERVER = "db_redis";
|
|
if (ENV_DEV) {
|
|
REDIS_SERVER = 'localhost';
|
|
} else if (ENV_PROD) {
|
|
REDIS_SERVER = 'db-redis-pubsub.service.nyc1.consul';
|
|
}
|
|
SECURE = !!process.env.NODE_SSL;
|
|
REDIS_PORT = ENV_DOCKER ? 6579 : 6383;
|
|
// client = redis.createClient 6379, REDIS_SERVER
|
|
|
|
// RedisStore = require 'socket.io/lib/stores/redis'
|
|
// rpub = redis.createClient 6379, REDIS_SERVER
|
|
// rsub = redis.createClient 6379, REDIS_SERVER
|
|
// rclient = redis.createClient 6379, REDIS_SERVER
|
|
log.debug("Starting NewsBlur unread count server...");
|
|
if (!ENV_DEV && !process.env.NODE_ENV) {
|
|
log.debug("Specify NODE_ENV=<development,production>");
|
|
return;
|
|
} else if (ENV_DEV) {
|
|
log.debug("Running as development server");
|
|
} else if (ENV_DOCKER) {
|
|
log.debug("Running as docker server");
|
|
} else {
|
|
log.debug("Running as production server");
|
|
}
|
|
|
|
// Create Redis clients for Socket.IO adapter with improved configuration
|
|
redis_opts = {
|
|
host: REDIS_SERVER,
|
|
port: REDIS_PORT,
|
|
retry_strategy: function(options) {
|
|
// Exponential backoff with a cap
|
|
return Math.min(options.attempt * 100, 3000);
|
|
},
|
|
connect_timeout: 10000
|
|
};
|
|
pub_client = redis.createClient(redis_opts);
|
|
sub_client = redis.createClient(redis_opts);
|
|
// Handle Redis adapter client errors
|
|
pub_client.on("error", function(err) {
|
|
return log.debug(`Redis Pub Error: ${err}`);
|
|
});
|
|
sub_client.on("error", function(err) {
|
|
return log.debug(`Redis Sub Error: ${err}`);
|
|
});
|
|
pub_client.on("reconnecting", function(attempt) {
|
|
return log.debug(`Redis Pub reconnecting... Attempt ${attempt}`);
|
|
});
|
|
sub_client.on("reconnecting", function(attempt) {
|
|
return log.debug(`Redis Sub reconnecting... Attempt ${attempt}`);
|
|
});
|
|
io = require('socket.io')(server, {
|
|
path: "/v3/socket.io",
|
|
pingTimeout: 120000, // Increased from 60s to 120s
|
|
pingInterval: 30000, // Increased from 25s to 30s
|
|
connectTimeout: 60000, // Increased from 45s to 60s
|
|
transports: ['websocket'], // Prefer websocket transport
|
|
maxHttpBufferSize: 1e8, // Increase buffer size to 100MB
|
|
cors: {
|
|
origin: "*",
|
|
methods: ["GET", "POST"]
|
|
},
|
|
allowEIO3: true, // Allow compatibility with Socket.IO v3 clients
|
|
adapter: require('@socket.io/redis-adapter').createAdapter(pub_client, sub_client)
|
|
});
|
|
// Setup Redis error handling and reconnection
|
|
setup_redis_client = function(socket, username) {
|
|
var client;
|
|
client = redis.createClient({
|
|
host: REDIS_SERVER,
|
|
port: REDIS_PORT,
|
|
retry_strategy: function(options) {
|
|
return Math.min(options.attempt * 100, 3000);
|
|
},
|
|
connect_timeout: 10000
|
|
});
|
|
client.on("error", (err) => {
|
|
return log.info(username, `Redis Error: ${err}`);
|
|
});
|
|
// Don't quit on error, let retry strategy handle it
|
|
client.on("reconnecting", (attempt) => {
|
|
return log.info(username, `Redis reconnecting... Attempt ${attempt}`);
|
|
});
|
|
return client;
|
|
};
|
|
// Track active connections by username for debugging
|
|
active_connections = {};
|
|
|
|
// Log engine events for debugging
|
|
io.engine.on('connection', function(socket) {
|
|
return log.debug(`Engine connection established: ${socket.id}`);
|
|
});
|
|
io.engine.on('close', function(socket) {
|
|
return log.debug(`Engine connection closed: ${socket.id}`);
|
|
});
|
|
io.on('connection', function(socket) {
|
|
var ip, socket_id;
|
|
ip = socket.handshake.headers['X-Forwarded-For'] || socket.handshake.address;
|
|
socket_id = socket.id;
|
|
log.debug(`Socket connected: ${socket_id} from ${ip}`);
|
|
|
|
// Store socket data for tracking
|
|
socket.data = {
|
|
ip: ip,
|
|
socket_id: socket_id,
|
|
connected_at: Date.now()
|
|
};
|
|
|
|
// Set a longer ping timeout for this socket
|
|
socket.conn.pingTimeout = 120000;
|
|
socket.conn.on('error', function(err) {
|
|
return log.debug(`Socket ${socket_id} - connection error: ${err}`);
|
|
});
|
|
socket.conn.on('close', function(reason) {
|
|
return log.debug(`Socket ${socket_id} - connection closed: ${reason}`);
|
|
});
|
|
socket.on('subscribe:feeds', (feeds, username) => {
|
|
var ref;
|
|
// Store user data directly on the socket for access during disconnect
|
|
socket.data.feeds = feeds;
|
|
socket.data.username = username;
|
|
socket.data.subscribed_at = Date.now();
|
|
log.info(username, `Connecting (${feeds.length} feeds, ${ip}), (${io.engine.clientsCount} connected) ${SECURE ? "(SSL)" : ""}`);
|
|
|
|
// Track connections by username for debugging
|
|
active_connections[username] = active_connections[username] || {};
|
|
active_connections[username][socket_id] = {
|
|
connected_at: socket.data.connected_at,
|
|
subscribed_at: socket.data.subscribed_at,
|
|
feed_count: feeds.length
|
|
};
|
|
log.debug(`${username} now has ${Object.keys(active_connections[username]).length} active connections, adding ${socket_id}`);
|
|
if (!username) {
|
|
return;
|
|
}
|
|
socket.on("error", function(err) {
|
|
return log.debug(`Error (socket): ${err}`);
|
|
});
|
|
if ((ref = socket.subscribe) != null) {
|
|
ref.quit();
|
|
}
|
|
socket.subscribe = setup_redis_client(socket, username);
|
|
socket.subscribe.on("connect", () => {
|
|
var feeds_story;
|
|
log.info(username, `Connected (${feeds.length} feeds, ${ip}), (${io.engine.clientsCount} connected) ${SECURE ? "(SSL)" : "(non-SSL)"}`);
|
|
socket.subscribe.subscribe(feeds);
|
|
feeds_story = feeds.map(function(f) {
|
|
return `${f}:story`;
|
|
});
|
|
socket.subscribe.subscribe(feeds_story);
|
|
return socket.subscribe.subscribe(username);
|
|
});
|
|
return socket.subscribe.on('message', (channel, message) => {
|
|
var event_name;
|
|
event_name = 'feed:update';
|
|
if (channel === username) {
|
|
event_name = 'user:update';
|
|
} else if (channel.indexOf(':story') >= 0) {
|
|
event_name = 'feed:story:new';
|
|
}
|
|
log.info(username, `Update on ${channel}: ${event_name} - ${message}`);
|
|
return socket.emit(event_name, channel, message);
|
|
});
|
|
});
|
|
return socket.on('disconnect', function(reason) {
|
|
var connected_at, connection_duration, feeds, now, ref, subscribed_at, subscription_duration, username;
|
|
// Use the data stored on the socket
|
|
username = socket.data.username;
|
|
feeds = socket.data.feeds;
|
|
ip = socket.data.ip;
|
|
socket_id = socket.data.socket_id;
|
|
connected_at = socket.data.connected_at;
|
|
subscribed_at = socket.data.subscribed_at;
|
|
|
|
// Calculate connection duration
|
|
now = Date.now();
|
|
connection_duration = now - (connected_at || now);
|
|
subscription_duration = subscribed_at ? now - subscribed_at : 0;
|
|
log.debug(`Socket ${socket_id} disconnected: ${reason}, username: ${username}, connection duration: ${connection_duration}ms, subscription duration: ${subscription_duration}ms`);
|
|
|
|
// Update connection tracking
|
|
if (username && active_connections[username]) {
|
|
if (active_connections[username][socket_id]) {
|
|
delete active_connections[username][socket_id];
|
|
log.debug(`${username} now has ${Object.keys(active_connections[username]).length} active connections after removing ${socket_id}`);
|
|
} else {
|
|
log.debug(`Socket ${socket_id} not found in active connections for ${username}`);
|
|
}
|
|
if (Object.keys(active_connections[username]).length === 0) {
|
|
delete active_connections[username];
|
|
}
|
|
}
|
|
if ((ref = socket.subscribe) != null) {
|
|
ref.quit();
|
|
}
|
|
if (username && feeds) {
|
|
return log.info(username, `Disconnect (${feeds.length} feeds, ${ip}), there are now ${io.engine.clientsCount} users. ${SECURE ? "(SSL)" : "(non-SSL)"}`);
|
|
}
|
|
});
|
|
});
|
|
io.engine.on('connection_error', function(err) {
|
|
return log.debug(`Connection Error: ${err.code} - ${err.message}`);
|
|
});
|
|
io.sockets.on('error', function(err) {
|
|
return log.debug(`Error (sockets): ${err}`);
|
|
});
|
|
|
|
// Periodically log connection stats
|
|
setInterval(function() {
|
|
var sockets, total_connections, total_tracked, total_users, username;
|
|
total_users = Object.keys(active_connections).length;
|
|
total_connections = io.engine.clientsCount;
|
|
total_tracked = 0;
|
|
for (username in active_connections) {
|
|
sockets = active_connections[username];
|
|
total_tracked += Object.keys(sockets).length;
|
|
}
|
|
return log.debug(`Connection stats: ${total_users} users with ${total_connections} total connections (${total_tracked} tracked)`);
|
|
}, 60000);
|
|
return io;
|
|
};
|
|
|
|
exports.unread_counts = unread_counts;
|
|
|
|
}).call(this);
|