fix websockets not working

This commit is contained in:
Hazelnoot 2025-06-26 09:32:14 -04:00
parent 088fe15be5
commit d7b94e756d

View file

@ -4,7 +4,7 @@
*/
import { EventEmitter } from 'events';
import { Inject, Injectable } from '@nestjs/common';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import * as Redis from 'ioredis';
import * as WebSocket from 'ws';
import proxyAddr from 'proxy-addr';
@ -32,11 +32,12 @@ import type * as http from 'node:http';
const MAX_CONNECTIONS_PER_CLIENT = 32;
@Injectable()
export class StreamingApiServerService {
export class StreamingApiServerService implements OnApplicationShutdown {
#wss: WebSocket.WebSocketServer;
#connections = new Map<WebSocket.WebSocket, number>();
#connectionsByClient = new Map<string, Set<WebSocket.WebSocket>>(); // key: IP / user ID -> value: connection
#cleanConnectionsIntervalId: NodeJS.Timeout | null = null;
readonly #globalEv = new EventEmitter();
constructor(
@Inject(DI.redisForSub)
@ -57,6 +58,14 @@ export class StreamingApiServerService {
@Inject(DI.config)
private config: Config,
) {
this.redisForSub.on('message', this.onRedis);
}
@bindThis
onApplicationShutdown() {
this.redisForSub.off('message', this.onRedis);
this.#globalEv.removeAllListeners();
// Other shutdown logic is handled by detach(), which gets called by ServerServer's own shutdown handler.
}
@bindThis
@ -69,6 +78,12 @@ export class StreamingApiServerService {
return rateLimit.blocked;
}
@bindThis
private onRedis(_: string, data: string) {
const parsed = JSON.parse(data);
this.#globalEv.emit('message', parsed);
}
@bindThis
public attach(server: http.Server): void {
this.#wss = new WebSocket.WebSocketServer({
@ -199,15 +214,6 @@ export class StreamingApiServerService {
});
});
const globalEv = new EventEmitter();
const onRedis = (_: string, data: string) => {
const parsed = JSON.parse(data);
globalEv.emit('message', parsed);
};
this.redisForSub.on('message', onRedis);
this.#wss.on('connection', async (connection: WebSocket.WebSocket, request: http.IncomingMessage, ctx: {
stream: MainStreamConnection,
user: MiLocalUser | null;
@ -221,7 +227,7 @@ export class StreamingApiServerService {
ev.emit(data.channel, data.message);
}
globalEv.on('message', onRedisMessage);
this.#globalEv.on('message', onRedisMessage);
await stream.listen(ev, connection);
@ -238,8 +244,7 @@ export class StreamingApiServerService {
connection.once('close', () => {
ev.removeAllListeners();
stream.dispose();
this.redisForSub.off('message', onRedis);
globalEv.off('message', onRedisMessage);
this.#globalEv.off('message', onRedisMessage);
this.#connections.delete(connection);
if (userUpdateIntervalId) clearInterval(userUpdateIntervalId);
});