From b924a2ffe5b806c0fc6fcbcbe602a560d3296bd8 Mon Sep 17 00:00:00 2001 From: Rory& Date: Tue, 26 May 2026 10:23:42 +0200 Subject: [PATCH 1/6] Gateway: send reconnect on shutdown --- src/gateway/events/Connection.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/gateway/events/Connection.ts b/src/gateway/events/Connection.ts index 5876c3e15e..6f67c0c597 100644 --- a/src/gateway/events/Connection.ts +++ b/src/gateway/events/Connection.ts @@ -43,6 +43,17 @@ export async function Connection(this: WS.Server, socket: WebSocket, request: In if (index !== -1) openConnections.splice(index, 1); }); + for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { + process.on(sig, async () => { + await Send(socket, { + op: OPCODES.Reconnect, + s: socket.sequence++, + d: Math.round(Math.random() * 5000), + }); + socket.close(1000); + }); + } + const forwardedFor = Config.get().security.forwardedFor; const ipAddress = forwardedFor ? (request.headers[forwardedFor.toLowerCase()] as string) : request.socket.remoteAddress; From ced707abc728eff250317590aa5d148b3e436c57 Mon Sep 17 00:00:00 2001 From: Rory& Date: Thu, 28 May 2026 09:17:01 +0200 Subject: [PATCH 2/6] IPC: shutdown hooks #Conflicts: # src/util/util/ipc/listener/UnixSocketListener.ts --- .../util/ipc/listener/RabbitMqSingleListener.ts | 2 +- src/util/util/ipc/listener/UnixSocketListener.ts | 14 +++++++++----- src/util/util/ipc/writer/RabbitMqSingleWriter.ts | 2 +- src/util/util/ipc/writer/UnixSocketWriter.ts | 4 ++++ 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts index b5fd5efd50..d120e70ac5 100644 --- a/src/util/util/ipc/listener/RabbitMqSingleListener.ts +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -51,7 +51,7 @@ export class RabbitMqSingleListener extends BaseEventListener { this.channel = await this.connection.createChannel(); for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, this.close); + process.on(sig, () => this.close()); } this.connection.on("error", (err) => { diff --git a/src/util/util/ipc/listener/UnixSocketListener.ts b/src/util/util/ipc/listener/UnixSocketListener.ts index 8ead85bda5..9465e9cadc 100644 --- a/src/util/util/ipc/listener/UnixSocketListener.ts +++ b/src/util/util/ipc/listener/UnixSocketListener.ts @@ -26,6 +26,7 @@ export class UnixSocketListener extends BaseEventListener { eventEmitter: EventEmitter; socketPath: string; server: Server; + isInitialized = false; constructor(socketPath: string) { super(); @@ -47,7 +48,7 @@ export class UnixSocketListener extends BaseEventListener { this.server = net.createServer((socket) => { socket.on("connect", () => { - console.log("[UnixSocketListener] Unix socket client connected"); + console.log("[UnixSocketListener] Unix socket client connected, now at", this.server.connections, "connections..."); }); let buffer = Buffer.alloc(0); socket.on("data", (data: Buffer) => { @@ -74,15 +75,20 @@ export class UnixSocketListener extends BaseEventListener { }); this.server.listen(this.socketPath, () => { - console.log(`Unix socket server listening on ${this.socketPath}`); + console.log(`[UnixSocketListener] Listening on ${this.socketPath}`); }); for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, this.close); + process.on(sig, () => this.close()); } + this.isInitialized = true; } async close(): Promise { + if (!this.isInitialized) { + console.log("[UnixSocketListener] close() called before init! - Path:", this.socketPath, " - server:", this.server, " - this:", this); + } + console.log("[UnixSocketListener] Closing unix socket server"); this.server.close(); @@ -92,8 +98,6 @@ export class UnixSocketListener extends BaseEventListener { } catch (e) { console.error("[UnixSocketListener] Failed to unlink socket file:", e); } - - process.exit(0); } async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts index 28781c6dbd..72a0d08ce7 100644 --- a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts +++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts @@ -47,7 +47,7 @@ export class RabbitMqSingleWriter extends BaseEventWriter { this.channel = await this.connection.createChannel(); for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, this.close); + process.on(sig, () => this.close()); } this.connection.on("error", (err) => { diff --git a/src/util/util/ipc/writer/UnixSocketWriter.ts b/src/util/util/ipc/writer/UnixSocketWriter.ts index 53b9564ce2..e3b91de8fd 100644 --- a/src/util/util/ipc/writer/UnixSocketWriter.ts +++ b/src/util/util/ipc/writer/UnixSocketWriter.ts @@ -142,6 +142,10 @@ export class UnixSocketWriter extends BaseEventWriter { console.error("[UnixSocketWriter] Unix socket writer failed to read directory:", err); } + for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { + process.on(sig, () => this.close()); + } + this.isInitializing = false; } From ac3160e8331fb0d283d60b9fc66c8e98eec240df Mon Sep 17 00:00:00 2001 From: Rory& Date: Wed, 27 May 2026 23:51:18 +0200 Subject: [PATCH 3/6] Concurrent identify stress script --- scripts/stress/identify.js | 6 +- scripts/stress/identifyConcurrent.js | 86 ++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 scripts/stress/identifyConcurrent.js diff --git a/scripts/stress/identify.js b/scripts/stress/identify.js index 08d8d5db82..7aaca529c8 100644 --- a/scripts/stress/identify.js +++ b/scripts/stress/identify.js @@ -3,9 +3,10 @@ require("dotenv").config({ quiet: true }); const { OPCODES } = require("../../dist/gateway/util/Constants.js"); const WebSocket = require("ws"); -const ENDPOINT = `ws://localhost:3001?v=9&encoding=json`; const TOKEN = process.env.TOKEN; const TOTAL_ITERATIONS = process.env.ITER ? parseInt(process.env.ITER) : 500; +const PORT = process.env.PORT ?? 3002; +const ENDPOINT = `ws://localhost:${PORT}?v=9&encoding=json`; const doTimedIdentify = () => new Promise((resolve) => { @@ -44,7 +45,8 @@ const doTimedIdentify = () => while (perfs.length < TOTAL_ITERATIONS) { const ret = await doTimedIdentify(); perfs.push(ret); - // console.log(`${perfs.length}/${TOTAL_ITERATIONS} - this: ${Math.floor(ret)}ms`) + const avg = perfs.reduce((prev, curr) => prev + curr) / (perfs.length - 1); + console.log(`${perfs.length}/${TOTAL_ITERATIONS} - this: ${Math.floor(ret)}ms - avg: ${Math.floor(avg * 100) / 100}ms`); } const avg = perfs.reduce((prev, curr) => prev + curr) / (perfs.length - 1); diff --git a/scripts/stress/identifyConcurrent.js b/scripts/stress/identifyConcurrent.js new file mode 100644 index 0000000000..41d5d63f9e --- /dev/null +++ b/scripts/stress/identifyConcurrent.js @@ -0,0 +1,86 @@ +/* eslint-env node */ + +require("dotenv").config({ quiet: true }); +const { OPCODES } = require("../../dist/gateway/util/Constants.js"); +const WebSocket = require("ws"); +const { sleep } = require("../../dist/util/util/extensions/index.js"); +const TOKEN = process.env.TOKEN; +const TOTAL_ITERATIONS = process.env.ITER ? parseInt(process.env.ITER) : 500; +const PORT = process.env.PORT ?? 3002; +const ENDPOINT = `ws://localhost:${PORT}?v=9&encoding=json`; +const KEEPALIVE = !!process.env.KEEPALIVE; +const SLEEP_EVERY = process.env.SLEEP_EVERY ? parseInt(process.env.SLEEP_EVERY) : 100; + +const doTimedIdentify = () => + new Promise((resolve, reject) => { + let start; + const ws = new WebSocket(ENDPOINT); + ws.setMaxListeners(TOTAL_ITERATIONS); + ws.on("message", (data) => { + const parsed = JSON.parse(data); + let heartbeat; + + switch (parsed.op) { + case OPCODES.Hello: + // send identify + start = performance.now(); + ws.send( + JSON.stringify({ + op: OPCODES.Identify, + d: { + token: TOKEN, + properties: {}, + }, + }), + ); + + if (KEEPALIVE) + heartbeat = setInterval(() => { + ws.send( + JSON.stringify({ + op: OPCODES.Heartbeat, + d: null, + }), + ); + process.stdout.write("."); + }, parsed.d.heartbeat_interval); + break; + case OPCODES.Dispatch: + if (parsed.t == "READY") { + if (!KEEPALIVE) ws.close(); + else process.stdout.write("R"); + return resolve(performance.now() - start); + } + + break; + } + ws.on("error", reject); + if (KEEPALIVE) + ws.on("close", () => { + process.stdout.write("C"); + clearTimeout(heartbeat); + }); + }); + }); + +(async () => { + const promises = []; + for (let i = 0; i < TOTAL_ITERATIONS; i++) { + promises.push(doTimedIdentify()); + process.stdout.write("+"); + // await sleep(Math.random() * 250); + if (promises.length % SLEEP_EVERY === 0) await Promise.all(promises); + } + + const perfs = []; + console.log("Identifies started"); + for (const prom of promises) { + const ret = await prom; + perfs.push(ret); + const avg = perfs.reduce((prev, curr) => prev + curr) / (perfs.length - 1); + console.log(`${perfs.length}/${promises.length}: Identified in ${Math.floor(ret)}ms - avg: ${Math.floor(avg * 100) / 100}ms`); + } + + const avg = perfs.reduce((prev, curr) => prev + curr) / (perfs.length - 1); + console.log(`Average identify time: ${Math.floor(avg * 100) / 100}ms`); +})().catch((e) => console.error("Fail:", e)); From b1b73d45e7ab798975400fa8be68a2ba2238e782 Mon Sep 17 00:00:00 2001 From: Rory& Date: Thu, 28 May 2026 10:38:39 +0200 Subject: [PATCH 4/6] Process lifecycle scripts #Conflicts: # src/api/Server.ts # src/bundle/Server.ts # src/cdn/Server.ts # src/gateway/Server.ts # src/gateway/events/Connection.ts # src/util/util/ipc/listener/RabbitMqSingleListener.ts # src/util/util/ipc/listener/UnixSocketListener.ts # src/util/util/ipc/writer/UnixSocketWriter.ts # src/webrtc/Server.ts --- package-lock.json | 16 ++++- package.json | 3 +- src/api/Server.ts | 10 +-- src/api/routes/stop.ts | 7 ++- src/bundle/Server.ts | 18 +++--- src/cdn/Server.ts | 8 ++- src/gateway/Server.ts | 30 +++++---- src/gateway/events/Close.ts | 60 +++++++++--------- src/gateway/events/Connection.ts | 29 ++++++--- src/util/util/Database.ts | 7 ++- src/util/util/ProcessLifecycle.ts | 62 +++++++++++++++++++ .../ipc/listener/RabbitMqSingleListener.ts | 7 +-- .../util/ipc/listener/UnixSocketListener.ts | 6 +- .../util/ipc/writer/RabbitMqSingleWriter.ts | 6 +- src/util/util/ipc/writer/UnixSocketWriter.ts | 6 +- src/webrtc/Server.ts | 15 +++-- 16 files changed, 195 insertions(+), 95 deletions(-) create mode 100644 src/util/util/ProcessLifecycle.ts diff --git a/package-lock.json b/package-lock.json index 0cd867fdbe..05ebdfacd4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -86,7 +86,8 @@ "pretty-quick": "^4.2.2", "ts-node": "^10.9.2", "typescript": "^6.0.3", - "typescript-json-schema": "^0.67.4" + "typescript-json-schema": "^0.67.4", + "why-is-node-running": "^3.2.2" }, "optionalDependencies": { "@sendgrid/mail": "^8.1.6", @@ -7387,6 +7388,19 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/why-is-node-running": { + "version": "3.2.2", + "resolved": "https://registry.npmjs.org/why-is-node-running/-/why-is-node-running-3.2.2.tgz", + "integrity": "sha512-NKUzAelcoCXhXL4dJzKIwXeR8iEVqsA0Lq6Vnd0UXvgaKbzVo4ZTHROF2Jidrv+SgxOQ03fMinnNhzZATxOD3A==", + "dev": true, + "license": "MIT", + "bin": { + "why-is-node-running": "cli.js" + }, + "engines": { + "node": ">=20.11" + } + }, "node_modules/word-wrap": { "version": "1.2.5", "resolved": "https://registry.npmjs.org/word-wrap/-/word-wrap-1.2.5.tgz", diff --git a/package.json b/package.json index 35108b7160..82d159c04f 100644 --- a/package.json +++ b/package.json @@ -75,7 +75,8 @@ "pretty-quick": "^4.2.2", "ts-node": "^10.9.2", "typescript": "^6.0.3", - "typescript-json-schema": "^0.67.4" + "typescript-json-schema": "^0.67.4", + "why-is-node-running": "^3.2.2" }, "dependencies": { "@spacebarchat/pion-webrtc": "^0.0.4", diff --git a/src/api/Server.ts b/src/api/Server.ts index 1d9fe2ab22..a1eb1716f3 100644 --- a/src/api/Server.ts +++ b/src/api/Server.ts @@ -16,15 +16,16 @@ along with this program. If not, see . */ -import { Config, ConnectionConfig, ConnectionLoader, Email, JSONReplacer, WebAuthn, initDatabase, initEvent, registerRoutes, getDatabase, getRevInfoOrFail } from "@spacebar/util"; -import { Authentication, CORS, ImageProxy, BodyParser, ErrorHandler, initRateLimits, initTranslation } from "./middlewares"; +import path from "node:path"; import { Request, Response, Router } from "express"; -import { Server, ServerOptions } from "lambert-server"; import morgan from "morgan"; -import path from "node:path"; +import { Server, ServerOptions } from "lambert-server"; import { red } from "picocolors"; +import { Config, ConnectionConfig, ConnectionLoader, Email, JSONReplacer, WebAuthn, initDatabase, initEvent, registerRoutes, getDatabase, getRevInfoOrFail } from "@spacebar/util"; +import { Authentication, CORS, ImageProxy, BodyParser, ErrorHandler, initRateLimits, initTranslation } from "./middlewares"; import { initInstance } from "./util/handlers/Instance"; import { route } from "./util"; +import { ProcessLifecycle } from "../util/util/ProcessLifecycle"; const ASSETS_FOLDER = path.join(__dirname, "..", "..", "assets"); const PUBLIC_ASSETS_FOLDER = path.join(ASSETS_FOLDER, "public"); @@ -196,6 +197,7 @@ export class SpacebarServer extends Server { if (logRequests) console.log(red(`Warning: Request logging is enabled! This will spam your console!\nTo disable this, unset the 'LOG_REQUESTS' environment variable!`)); + await ProcessLifecycle.Ready(); return super.start(); } } diff --git a/src/api/routes/stop.ts b/src/api/routes/stop.ts index 4f86973508..e04b46ba6c 100644 --- a/src/api/routes/stop.ts +++ b/src/api/routes/stop.ts @@ -16,8 +16,9 @@ along with this program. If not, see . */ -import { route } from "@spacebar/api"; +import { route, SpacebarServer } from "@spacebar/api"; import { Request, Response, Router } from "express"; +import { ProcessLifecycle } from "../../util/util/ProcessLifecycle"; const router: Router = Router({ mergeParams: true }); @@ -35,7 +36,9 @@ router.post( (req: Request, res: Response) => { console.log(`/stop was called by ${req.user_id} at ${new Date()}`); res.sendStatus(200); - process.kill(process.pid, "SIGTERM"); + ProcessLifecycle.Shutdown().catch((e) => { + console.error("Failed to shut down:", e); + }); }, ); diff --git a/src/bundle/Server.ts b/src/bundle/Server.ts index 91c3dcf423..f743497a12 100644 --- a/src/bundle/Server.ts +++ b/src/bundle/Server.ts @@ -16,21 +16,18 @@ along with this program. If not, see . */ -import morgan from "morgan"; - -process.on("unhandledRejection", console.error); -process.on("uncaughtException", console.error); - import http from "node:http"; +import fs from "node:fs"; +import cluster from "node:cluster"; +import morgan from "morgan"; +import express from "express"; +import { green, bold } from "picocolors"; import * as Api from "@spacebar/api"; import * as Gateway from "@spacebar/gateway"; import * as Webrtc from "@spacebar/webrtc"; import { CDNServer } from "@spacebar/cdn"; -import express from "express"; -import { green, bold } from "picocolors"; import { Config, initDatabase } from "@spacebar/util"; -import fs from "node:fs"; -import cluster from "node:cluster"; +import { ProcessLifecycle } from "../util/util/ProcessLifecycle"; const app = express(); const server = http.createServer(); @@ -48,8 +45,7 @@ const webrtc = new Webrtc.Server({ production, }); -process.on("SIGTERM", async () => { - console.log("Shutting down due to SIGTERM"); +ProcessLifecycle.eventEmitter.on("stopping", async () => { await gateway.stop(); await cdn.stop(); await api.stop(); diff --git a/src/cdn/Server.ts b/src/cdn/Server.ts index 8260407ec3..5d87a86ce2 100644 --- a/src/cdn/Server.ts +++ b/src/cdn/Server.ts @@ -16,13 +16,14 @@ along with this program. If not, see . */ +import path from "node:path"; +import morgan from "morgan"; import { Server, ServerOptions } from "lambert-server"; import { Attachment, Config, initDatabase, registerRoutes } from "@spacebar/util"; import { CORS, BodyParser } from "@spacebar/api"; -import path from "node:path"; import guildProfilesRoute from "./routes/guild-profiles"; -import morgan from "morgan"; import { storage } from "./util"; +import { ProcessLifecycle } from "../util/util/ProcessLifecycle"; export type CDNServerOptions = ServerOptions; @@ -71,6 +72,7 @@ export class CDNServer extends Server { this.app.use("/guilds/:guild_id/users/:user_id/banners", guildProfilesRoute); if (process.env.LOG_ROUTES !== "false") console.log("[Server] Route /guilds/:guild_id/users/:user_id/banners registered"); + await ProcessLifecycle.Ready(); return super.start(); } @@ -89,6 +91,8 @@ export class CDNServer extends Server { } async stop() { + await ProcessLifecycle.Shutdown(); + await ProcessLifecycle.Finalize(); return super.stop(); } } diff --git a/src/gateway/Server.ts b/src/gateway/Server.ts index 632fbc5295..5ac94a8857 100644 --- a/src/gateway/Server.ts +++ b/src/gateway/Server.ts @@ -16,21 +16,21 @@ along with this program. If not, see . */ -import dotenv from "dotenv"; -dotenv.config({ quiet: true }); -import { checkToken, closeDatabase, Config, initDatabase, initEvent, Rights } from "@spacebar/util"; -import ws from "ws"; -import { Connection, openConnections } from "./events/Connection"; import http from "node:http"; -import { cleanupOnStartup } from "./util"; -import { randomString } from "@spacebar/api"; import { setInterval } from "node:timers"; +import ws from "ws"; +import { checkToken, Config, initDatabase, initEvent, Rights } from "@spacebar/util"; +import { randomString } from "@spacebar/api"; // TODO: move to util +import { Connection, openConnections } from "./events/Connection"; +import { cleanupOnStartup, OPCODES, Send } from "./util"; +import { ProcessLifecycle } from "../util/util/ProcessLifecycle"; export class Server { public ws: ws.Server; public port: number; public server: http.Server; public production: boolean; + private monitoringLoop: NodeJS.Timeout; constructor({ port, server, production }: { port: number; server?: http.Server; production?: boolean }) { this.port = port; @@ -42,7 +42,7 @@ export class Server { const eluP = [1, 5, 15].map(() => performance.eventLoopUtilization()); const cpu = [1, 5, 15].map(() => process.cpuUsage()); let sec = 0; - setInterval(() => { + const monitoringLoop = setInterval(() => { sec += 1; // for some reason this behaves differently from cpuUsage, so we need an absolute reference as "previous" const eluC = performance.eventLoopUtilization(); @@ -150,6 +150,8 @@ export class Server { res.writeHead(200).end("Online"); }); + + ProcessLifecycle.eventEmitter.on("stopping", () => clearTimeout(monitoringLoop)); } this.server.on("upgrade", (request, socket, head) => { @@ -177,14 +179,16 @@ export class Server { this.server.listen(this.port); console.log(`[Gateway] online on 0.0.0.0:${this.port}`); } + + await ProcessLifecycle.Ready(); } async stop() { + await ProcessLifecycle.Shutdown(); + clearInterval(this.monitoringLoop); this.ws.clients.forEach((x) => x.close()); - this.ws.close(() => { - this.server.close(() => { - closeDatabase(); - }); - }); + this.ws.close(); + this.server.close(); + await ProcessLifecycle.Finalize(); } } diff --git a/src/gateway/events/Close.ts b/src/gateway/events/Close.ts index 594149057c..6a3c6cb324 100644 --- a/src/gateway/events/Close.ts +++ b/src/gateway/events/Close.ts @@ -19,6 +19,7 @@ import { WebSocket } from "@spacebar/gateway"; import { emitEvent, Member, PresenceUpdateEvent, Session, SessionsReplace, User, VoiceState, VoiceStateUpdateEvent, distributePresenceUpdate } from "@spacebar/util"; import { randomString } from "@spacebar/api"; +import { ProcessLifecycle } from "../../util/util/ProcessLifecycle"; export async function Close(this: WebSocket, code: number, reason: Buffer) { console.log("[WebSocket] closed", code, reason.toString()); @@ -32,36 +33,37 @@ export async function Close(this: WebSocket, code: number, reason: Buffer) { const authSessionId = this.session?.session_id; const closedAt = Date.now(); - setTimeout(async () => { - console.log("Handling presence update after disconnect"); - try { - if (authSessionId && this.user_id) { - const s = await Session.findOne({ - where: { user_id: this.user_id, session_id: authSessionId }, - }); - if (s && (s.last_seen?.getTime() ?? 0) <= closedAt) { - console.log("... updating session"); - await Session.update({ user_id: this.user_id, session_id: authSessionId }, { status: "offline", activities: [], client_status: {} }); - this.session = await Session.findOneOrFail({ where: { session_id: this.session_id } }); - console.log("... distributing PRESENCE_UPDATE"); - await distributePresenceUpdate(this.user_id, { - event: "PRESENCE_UPDATE", - data: { - user: (await User.findOneOrFail({ where: { id: this.user_id } })).toPublicUser(), - status: this.session!.getPublicStatus(), - client_status: this.session!.client_status, - activities: this.session!.activities, - }, - origin: "GATEWAY_CLOSE", - transaction_id: `IDENT_${this.user_id}_${randomString()}`, - } satisfies PresenceUpdateEvent); - console.log("... done!"); - } else console.log("... Discarding presence update as the session reactivated"); + if (!(ProcessLifecycle.state === "stopping" || ProcessLifecycle.state === "stopped")) + setTimeout(async () => { + console.log("Handling presence update after disconnect"); + try { + if (authSessionId && this.user_id) { + const s = await Session.findOne({ + where: { user_id: this.user_id, session_id: authSessionId }, + }); + if (s && (s.last_seen?.getTime() ?? 0) <= closedAt) { + console.log("... updating session"); + await Session.update({ user_id: this.user_id, session_id: authSessionId }, { status: "offline", activities: [], client_status: {} }); + this.session = await Session.findOneOrFail({ where: { session_id: this.session_id } }); + console.log("... distributing PRESENCE_UPDATE"); + await distributePresenceUpdate(this.user_id, { + event: "PRESENCE_UPDATE", + data: { + user: (await User.findOneOrFail({ where: { id: this.user_id } })).toPublicUser(), + status: this.session!.getPublicStatus(), + client_status: this.session!.client_status, + activities: this.session!.activities, + }, + origin: "GATEWAY_CLOSE", + transaction_id: `IDENT_${this.user_id}_${randomString()}`, + } satisfies PresenceUpdateEvent); + console.log("... done!"); + } else console.log("... Discarding presence update as the session reactivated"); + } + } catch (e) { + console.error("[WebSocket] Close session cleanup failed", code, e); } - } catch (e) { - console.error("[WebSocket] Close session cleanup failed", code, e); - } - }, 10_000); + }, 10_000); const voiceState = await VoiceState.findOne({ where: { user_id: this.user_id }, diff --git a/src/gateway/events/Connection.ts b/src/gateway/events/Connection.ts index 6f67c0c597..d795d23627 100644 --- a/src/gateway/events/Connection.ts +++ b/src/gateway/events/Connection.ts @@ -29,6 +29,7 @@ import { Deflate, Inflate } from "fast-zlib"; import { URL } from "node:url"; import { Config } from "@spacebar/util"; import { Decoder, Encoder } from "@toondepauw/node-zstd"; +import { ProcessLifecycle } from "../../util/util/ProcessLifecycle"; // TODO: check rate limit // TODO: specify rate limit in config @@ -43,16 +44,26 @@ export async function Connection(this: WS.Server, socket: WebSocket, request: In if (index !== -1) openConnections.splice(index, 1); }); - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, async () => { - await Send(socket, { - op: OPCODES.Reconnect, - s: socket.sequence++, - d: Math.round(Math.random() * 5000), - }); - socket.close(1000); + const onShutdown = async () => { + await Send(socket, { + op: OPCODES.Reconnect, + s: socket.sequence++, + d: Math.round(Math.random() * 5000), }); - } + + const closeListeners = socket.listeners("close"); + for (const listener of closeListeners) { + socket.off("close", listener); + // noinspection JSVoidFunctionReturnValueUsed - awaiting results + const res = listener.call(socket, 1000, 0) as void | Promise; + if (res) await res; + } + + socket.close(1000); + }; + + if (ProcessLifecycle.state == "stopping" || ProcessLifecycle.state == "stopped") return await onShutdown(); + ProcessLifecycle.eventEmitter.on("stopping", onShutdown); const forwardedFor = Config.get().security.forwardedFor; const ipAddress = forwardedFor ? (request.headers[forwardedFor.toLowerCase()] as string) : request.socket.remoteAddress; diff --git a/src/util/util/Database.ts b/src/util/util/Database.ts index 004fd3f5e1..ecbb7110c1 100644 --- a/src/util/util/Database.ts +++ b/src/util/util/Database.ts @@ -23,6 +23,7 @@ import { DataSource } from "typeorm"; // noinspection ES6PreferShortImport import { ConfigEntity } from "../entities/Config"; import fs from "node:fs"; +import { ProcessLifecycle } from "./ProcessLifecycle"; // UUID extension option is only supported with postgres // We want to generate all id's with Snowflakes that's why we have our own BaseEntity class @@ -127,11 +128,13 @@ export async function initDatabase(): Promise { } } - console.log(`[Database] ${green("Connected")}`); + ProcessLifecycle.eventEmitter.on("stopped", async () => await closeDatabase()); + console.log(`[Database] ${green("Connected")}`); return dbConnection; } export async function closeDatabase() { - await dbConnection?.destroy(); + if (DataSourceOptions.isInitialized) await DataSourceOptions.destroy(); + if (dbConnection?.isInitialized) await dbConnection?.destroy(); } diff --git a/src/util/util/ProcessLifecycle.ts b/src/util/util/ProcessLifecycle.ts new file mode 100644 index 0000000000..6b4a6d7783 --- /dev/null +++ b/src/util/util/ProcessLifecycle.ts @@ -0,0 +1,62 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import EventEmitter from "node:events"; +import whyIsNodeRunning from "why-is-node-running"; + +interface ProcessLifecycleEvents { + starting: unknown[]; + running: unknown[]; + stopping: unknown[]; + stopped: unknown[]; +} + +export class ProcessLifecycle { + static state: keyof ProcessLifecycleEvents = "starting"; + static eventEmitter: EventEmitter = new EventEmitter(); + + // to be ran after startup is finished + static async Ready() { + await this.emitAsync((this.state = "running")); + } + + // to be ran at the start of shutdown + static async Shutdown() { + await this.emitAsync((this.state = "stopping")); + } + + // to be ran at the end of shutdown (clean up sockets, ...) + static async Finalize() { + await this.emitAsync((this.state = "stopped")); + } + + // emit, except it awaits promises + private static async emitAsync(eventName: keyof ProcessLifecycleEvents) { + for (const evt of this.eventEmitter.listeners(eventName)) { + // noinspection JSVoidFunctionReturnValueUsed - we want to handle async functions blocking aswell + const res = evt() as void | Promise; + if (res) await res; + } + } +} + +process.on("SIGUSR1", () => { + console.log("Handling SIGUSR1:"); + whyIsNodeRunning(); + console.log("\nProcess state:", ProcessLifecycle.state); +}); diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts index d120e70ac5..cd89cac6bc 100644 --- a/src/util/util/ipc/listener/RabbitMqSingleListener.ts +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -17,10 +17,11 @@ */ import EventEmitter from "node:events"; +import { randomUUID } from "node:crypto"; import { BaseEventListener } from "./BaseEventListener"; import { EVENT, Event, EventOpts, sleep } from "@spacebar/util"; import amqp, { Channel, ChannelModel } from "amqplib"; -import { randomUUID } from "node:crypto"; +import { ProcessLifecycle } from "../../ProcessLifecycle"; export class RabbitMqSingleListener extends BaseEventListener { private readonly host: string; @@ -50,9 +51,7 @@ export class RabbitMqSingleListener extends BaseEventListener { } this.channel = await this.connection.createChannel(); - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, () => this.close()); - } + ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close()); this.connection.on("error", (err) => { console.error("[RabbitMQSingleListener] Connection error:", err); diff --git a/src/util/util/ipc/listener/UnixSocketListener.ts b/src/util/util/ipc/listener/UnixSocketListener.ts index 9465e9cadc..be1b455fec 100644 --- a/src/util/util/ipc/listener/UnixSocketListener.ts +++ b/src/util/util/ipc/listener/UnixSocketListener.ts @@ -21,6 +21,7 @@ import fs from "node:fs"; import net, { Server } from "node:net"; import { BaseEventListener } from "./BaseEventListener"; import { EVENT, Event, EventOpts } from "@spacebar/util"; +import { ProcessLifecycle } from "../../ProcessLifecycle"; export class UnixSocketListener extends BaseEventListener { eventEmitter: EventEmitter; @@ -78,9 +79,7 @@ export class UnixSocketListener extends BaseEventListener { console.log(`[UnixSocketListener] Listening on ${this.socketPath}`); }); - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, () => this.close()); - } + ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close()); this.isInitialized = true; } @@ -96,6 +95,7 @@ export class UnixSocketListener extends BaseEventListener { try { fs.unlinkSync(this.socketPath); } catch (e) { + if (e instanceof Error && "errno" in e && e.errno == -2) return; console.error("[UnixSocketListener] Failed to unlink socket file:", e); } } diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts index 72a0d08ce7..ad00aa2b51 100644 --- a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts +++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts @@ -19,6 +19,7 @@ import { BaseEventWriter } from "./BaseEventWriter"; import amqp, { Channel, ChannelModel } from "amqplib"; import { Event, sleep } from "@spacebar/util"; +import { ProcessLifecycle } from "../../ProcessLifecycle"; export class RabbitMqSingleWriter extends BaseEventWriter { private readonly host: string; @@ -46,10 +47,7 @@ export class RabbitMqSingleWriter extends BaseEventWriter { } this.channel = await this.connection.createChannel(); - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, () => this.close()); - } - + ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close()); this.connection.on("error", (err) => { console.error("[RabbitMQSingleWriter] Connection error:", err); }); diff --git a/src/util/util/ipc/writer/UnixSocketWriter.ts b/src/util/util/ipc/writer/UnixSocketWriter.ts index e3b91de8fd..357034d795 100644 --- a/src/util/util/ipc/writer/UnixSocketWriter.ts +++ b/src/util/util/ipc/writer/UnixSocketWriter.ts @@ -22,6 +22,7 @@ import path from "node:path"; import { red } from "picocolors"; import { BaseEventWriter } from "./BaseEventWriter"; import { Event, Stopwatch } from "@spacebar/util"; +import { ProcessLifecycle } from "../../ProcessLifecycle"; export class UnixSocketWriter extends BaseEventWriter { socketPath: string; @@ -142,10 +143,7 @@ export class UnixSocketWriter extends BaseEventWriter { console.error("[UnixSocketWriter] Unix socket writer failed to read directory:", err); } - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, () => this.close()); - } - + ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close()); this.isInitializing = false; } diff --git a/src/webrtc/Server.ts b/src/webrtc/Server.ts index 0f8ead2ce6..8bc34024c1 100644 --- a/src/webrtc/Server.ts +++ b/src/webrtc/Server.ts @@ -15,14 +15,14 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -import dotenv from "dotenv"; -dotenv.config({ quiet: true }); -import { closeDatabase, Config, initDatabase, initEvent, Session, TimeSpan } from "@spacebar/util"; + import http from "node:http"; import ws from "ws"; +import { green, yellow } from "picocolors"; +import { Config, initDatabase, initEvent } from "@spacebar/util"; import { Connection } from "./events/Connection"; import { loadWebRtcLibrary, mediaServer, WRTC_PORT_MAX, WRTC_PORT_MIN, WRTC_PUBLIC_IP } from "./util"; -import { green, yellow } from "picocolors"; +import { ProcessLifecycle } from "../util/util/ProcessLifecycle"; export class Server { public ws: ws.Server; @@ -76,11 +76,14 @@ export class Server { this.server.listen(this.port); console.log(`[WebRTC] ${green(`online on 0.0.0.0:${this.port}`)}`); } + + await ProcessLifecycle.Ready(); } async stop() { - await closeDatabase(); + await ProcessLifecycle.Shutdown(); this.server.close(); - mediaServer?.stop(); + await mediaServer?.stop(); + await ProcessLifecycle.Finalize(); } } From c76c9f69e3be9a638d6d8d06b278b98504ae8f91 Mon Sep 17 00:00:00 2001 From: Rory& Date: Thu, 28 May 2026 13:53:30 +0200 Subject: [PATCH 5/6] Move why-is-node-running to dependencies --- package-lock.json | 5 ++--- package.json | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/package-lock.json b/package-lock.json index 05ebdfacd4..2b96765dcc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -50,6 +50,7 @@ "reflect-metadata": "^0.2.2", "tslib": "^2.8.1", "typeorm": "^0.3.30", + "why-is-node-running": "^3.2.2", "wretch": "^3.0.8", "ws": "^8.21.0" }, @@ -86,8 +87,7 @@ "pretty-quick": "^4.2.2", "ts-node": "^10.9.2", "typescript": "^6.0.3", - "typescript-json-schema": "^0.67.4", - "why-is-node-running": "^3.2.2" + "typescript-json-schema": "^0.67.4" }, "optionalDependencies": { "@sendgrid/mail": "^8.1.6", @@ -7392,7 +7392,6 @@ "version": "3.2.2", "resolved": "https://registry.npmjs.org/why-is-node-running/-/why-is-node-running-3.2.2.tgz", "integrity": "sha512-NKUzAelcoCXhXL4dJzKIwXeR8iEVqsA0Lq6Vnd0UXvgaKbzVo4ZTHROF2Jidrv+SgxOQ03fMinnNhzZATxOD3A==", - "dev": true, "license": "MIT", "bin": { "why-is-node-running": "cli.js" diff --git a/package.json b/package.json index 82d159c04f..dc3fe06a81 100644 --- a/package.json +++ b/package.json @@ -75,8 +75,7 @@ "pretty-quick": "^4.2.2", "ts-node": "^10.9.2", "typescript": "^6.0.3", - "typescript-json-schema": "^0.67.4", - "why-is-node-running": "^3.2.2" + "typescript-json-schema": "^0.67.4" }, "dependencies": { "@spacebarchat/pion-webrtc": "^0.0.4", @@ -119,6 +118,7 @@ "reflect-metadata": "^0.2.2", "tslib": "^2.8.1", "typeorm": "^0.3.30", + "why-is-node-running": "^3.2.2", "wretch": "^3.0.8", "ws": "^8.21.0" }, From 232b53bc2dcacae8df432cdd20f277cc8102cc4c Mon Sep 17 00:00:00 2001 From: Rory& Date: Thu, 28 May 2026 16:18:30 +0200 Subject: [PATCH 6/6] CodeOwners: claim /nix dir --- .github/CODEOWNERS | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index b55d76cd50..b956be7ebd 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,3 +1,4 @@ # Nix stuff is owned by Rory& - we& want to be notified if these are changed. /flake.nix root@rory.gay -/nix-update.sh root@rory.gay \ No newline at end of file +/nix-update.sh root@rory.gay +/nix root@rory.gay \ No newline at end of file