From d7b94e756d8a69501015b8725243fb1bf18c5356 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Thu, 26 Jun 2025 09:32:14 -0400 Subject: [PATCH] fix websockets not working --- .../server/api/StreamingApiServerService.ts | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 35855ab9bf..a4ddf0d4b2 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -4,7 +4,7 @@ */ import { EventEmitter } from 'events'; -import { Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import * as Redis from 'ioredis'; import * as WebSocket from 'ws'; import proxyAddr from 'proxy-addr'; @@ -32,11 +32,12 @@ import type * as http from 'node:http'; const MAX_CONNECTIONS_PER_CLIENT = 32; @Injectable() -export class StreamingApiServerService { +export class StreamingApiServerService implements OnApplicationShutdown { #wss: WebSocket.WebSocketServer; #connections = new Map(); #connectionsByClient = new Map>(); // key: IP / user ID -> value: connection #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; + readonly #globalEv = new EventEmitter(); constructor( @Inject(DI.redisForSub) @@ -57,6 +58,14 @@ export class StreamingApiServerService { @Inject(DI.config) private config: Config, ) { + this.redisForSub.on('message', this.onRedis); + } + + @bindThis + onApplicationShutdown() { + this.redisForSub.off('message', this.onRedis); + this.#globalEv.removeAllListeners(); + // Other shutdown logic is handled by detach(), which gets called by ServerServer's own shutdown handler. } @bindThis @@ -69,6 +78,12 @@ export class StreamingApiServerService { return rateLimit.blocked; } + @bindThis + private onRedis(_: string, data: string) { + const parsed = JSON.parse(data); + this.#globalEv.emit('message', parsed); + } + @bindThis public attach(server: http.Server): void { this.#wss = new WebSocket.WebSocketServer({ @@ -199,15 +214,6 @@ export class StreamingApiServerService { }); }); - const globalEv = new EventEmitter(); - - const onRedis = (_: string, data: string) => { - const parsed = JSON.parse(data); - globalEv.emit('message', parsed); - }; - - this.redisForSub.on('message', onRedis); - this.#wss.on('connection', async (connection: WebSocket.WebSocket, request: http.IncomingMessage, ctx: { stream: MainStreamConnection, user: MiLocalUser | null; @@ -221,7 +227,7 @@ export class StreamingApiServerService { ev.emit(data.channel, data.message); } - globalEv.on('message', onRedisMessage); + this.#globalEv.on('message', onRedisMessage); await stream.listen(ev, connection); @@ -238,8 +244,7 @@ export class StreamingApiServerService { connection.once('close', () => { ev.removeAllListeners(); stream.dispose(); - this.redisForSub.off('message', onRedis); - globalEv.off('message', onRedisMessage); + this.#globalEv.off('message', onRedisMessage); this.#connections.delete(connection); if (userUpdateIntervalId) clearInterval(userUpdateIntervalId); });