Updating to altest socket.io redis adapter.

This commit is contained in:
Samuel Clay 2025-02-02 22:56:57 -08:00
parent e2fe7fa620
commit 94044a775e
4 changed files with 459 additions and 2586 deletions

2938
node/package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -23,6 +23,7 @@
"redis": "^3.1.2",
"socket.io": "^4.8.1",
"socket.io-client": "^4.8.1",
"@socket.io/redis-adapter": "^7.0.0",
"supervisor": "^0.12.0",
"utf-8-validate": "^5.0.4",
"ws": "^7.4.2"

View file

@ -33,14 +33,43 @@ unread_counts = (server) =>
else
log.debug "Running as production server"
io = require('socket.io')(server, path: "/v3/socket.io")
# Create Redis clients for Socket.IO adapter
pub_client = redis.createClient(REDIS_PORT, REDIS_SERVER)
sub_client = redis.createClient(REDIS_PORT, REDIS_SERVER)
# io.set('transports', ['websocket'])
io = require('socket.io')(server, {
path: "/v3/socket.io",
pingTimeout: 60000, # Increase ping timeout to 60 seconds
pingInterval: 25000, # Send ping every 25 seconds
connectTimeout: 45000, # Connection timeout
transports: ['websocket'], # Prefer websocket transport
adapter: require('@socket.io/redis-adapter').createAdapter(pub_client, sub_client)
})
# io.set 'store', new RedisStore
# redisPub : rpub
# redisSub : rsub
# redisClient : rclient
# Handle Redis adapter client errors
pub_client.on "error", (err) ->
log.debug "Redis Pub Error: #{err}"
sub_client.on "error", (err) ->
log.debug "Redis Sub Error: #{err}"
# Setup Redis error handling and reconnection
setup_redis_client = (socket, username) ->
client = redis.createClient({
host: REDIS_SERVER,
port: REDIS_PORT,
retry_strategy: (options) ->
return Math.min(options.attempt * 100, 3000)
})
client.on "error", (err) =>
log.info username, "Redis Error: #{err}"
# Don't quit on error, let retry strategy handle it
client.on "reconnecting", (attempt) =>
log.info username, "Redis reconnecting... Attempt #{attempt}"
return client
io.on 'connection', (socket) ->
ip = socket.handshake.headers['X-Forwarded-For'] || socket.handshake.address
@ -52,14 +81,13 @@ unread_counts = (server) =>
if not @username
return
socket.on "error", (err) ->
log.debug "Error (socket): #{err}"
socket.subscribe?.quit()
socket.subscribe = redis.createClient REDIS_PORT, REDIS_SERVER
socket.subscribe.on "error", (err) =>
log.info @username, "Error: #{err} (#{@feeds.length} feeds)"
socket.subscribe?.quit()
socket.subscribe = setup_redis_client(socket, @username)
socket.subscribe.on "connect", =>
log.info @username, "Connected (#{@feeds.length} feeds, #{ip})," +
" (#{io.engine.clientsCount} connected) " +

View file

@ -9,7 +9,7 @@
log = require('./log.js');
unread_counts = (server) => {
var ENV_DEV, ENV_DOCKER, ENV_PROD, REDIS_PORT, REDIS_SERVER, SECURE, io;
var ENV_DEV, ENV_DOCKER, ENV_PROD, REDIS_PORT, REDIS_SERVER, SECURE, io, pub_client, setup_redis_client, sub_client;
ENV_DEV = process.env.NODE_ENV === 'development' || process.env.NODE_ENV === 'debug';
ENV_PROD = process.env.NODE_ENV === 'production';
ENV_DOCKER = process.env.NODE_ENV === 'docker';
@ -38,22 +38,51 @@
} else {
log.debug("Running as production server");
}
// Create Redis clients for Socket.IO adapter
pub_client = redis.createClient(REDIS_PORT, REDIS_SERVER);
sub_client = redis.createClient(REDIS_PORT, REDIS_SERVER);
io = require('socket.io')(server, {
path: "/v3/socket.io"
path: "/v3/socket.io",
pingTimeout: 60000, // Increase ping timeout to 60 seconds
pingInterval: 25000, // Send ping every 25 seconds
connectTimeout: 45000, // Connection timeout
transports: ['websocket'], // Prefer websocket transport
adapter: require('@socket.io/redis-adapter').createAdapter(pub_client, sub_client)
});
// io.set('transports', ['websocket'])
// io.set 'store', new RedisStore
// redisPub : rpub
// redisSub : rsub
// redisClient : rclient
// Handle Redis adapter client errors
pub_client.on("error", function(err) {
return log.debug(`Redis Pub Error: ${err}`);
});
sub_client.on("error", function(err) {
return log.debug(`Redis Sub Error: ${err}`);
});
// Setup Redis error handling and reconnection
setup_redis_client = function(socket, username) {
var client;
client = redis.createClient({
host: REDIS_SERVER,
port: REDIS_PORT,
retry_strategy: function(options) {
return Math.min(options.attempt * 100, 3000);
}
});
client.on("error", (err) => {
return log.info(username, `Redis Error: ${err}`);
});
// Don't quit on error, let retry strategy handle it
client.on("reconnecting", (attempt) => {
return log.info(username, `Redis reconnecting... Attempt ${attempt}`);
});
return client;
};
io.on('connection', function(socket) {
var ip;
ip = socket.handshake.headers['X-Forwarded-For'] || socket.handshake.address;
socket.on('subscribe:feeds', (feeds, username) => {
socket.on('subscribe:feeds', (feeds, username1) => {
var ref;
this.feeds = feeds;
this.username = username;
this.username = username1;
log.info(this.username, `Connecting (${this.feeds.length} feeds, ${ip}),` + ` (${io.engine.clientsCount} connected) ` + ` ${SECURE ? "(SSL)" : ""}`);
if (!this.username) {
return;
@ -64,12 +93,7 @@
if ((ref = socket.subscribe) != null) {
ref.quit();
}
socket.subscribe = redis.createClient(REDIS_PORT, REDIS_SERVER);
socket.subscribe.on("error", (err) => {
var ref1;
log.info(this.username, `Error: ${err} (${this.feeds.length} feeds)`);
return (ref1 = socket.subscribe) != null ? ref1.quit() : void 0;
});
socket.subscribe = setup_redis_client(socket, this.username);
socket.subscribe.on("connect", () => {
var feeds_story;
log.info(this.username, `Connected (${this.feeds.length} feeds, ${ip}),` + ` (${io.engine.clientsCount} connected) ` + ` ${SECURE ? "(SSL)" : "(non-SSL)"}`);