diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index b55d76cd50..b956be7ebd 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -1,3 +1,4 @@
# Nix stuff is owned by Rory& - we& want to be notified if these are changed.
/flake.nix root@rory.gay
-/nix-update.sh root@rory.gay
\ No newline at end of file
+/nix-update.sh root@rory.gay
+/nix root@rory.gay
\ No newline at end of file
diff --git a/package-lock.json b/package-lock.json
index 0cd867fdbe..2b96765dcc 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -50,6 +50,7 @@
"reflect-metadata": "^0.2.2",
"tslib": "^2.8.1",
"typeorm": "^0.3.30",
+ "why-is-node-running": "^3.2.2",
"wretch": "^3.0.8",
"ws": "^8.21.0"
},
@@ -7387,6 +7388,18 @@
"url": "https://github.com/sponsors/ljharb"
}
},
+ "node_modules/why-is-node-running": {
+ "version": "3.2.2",
+ "resolved": "https://registry.npmjs.org/why-is-node-running/-/why-is-node-running-3.2.2.tgz",
+ "integrity": "sha512-NKUzAelcoCXhXL4dJzKIwXeR8iEVqsA0Lq6Vnd0UXvgaKbzVo4ZTHROF2Jidrv+SgxOQ03fMinnNhzZATxOD3A==",
+ "license": "MIT",
+ "bin": {
+ "why-is-node-running": "cli.js"
+ },
+ "engines": {
+ "node": ">=20.11"
+ }
+ },
"node_modules/word-wrap": {
"version": "1.2.5",
"resolved": "https://registry.npmjs.org/word-wrap/-/word-wrap-1.2.5.tgz",
diff --git a/package.json b/package.json
index 35108b7160..dc3fe06a81 100644
--- a/package.json
+++ b/package.json
@@ -118,6 +118,7 @@
"reflect-metadata": "^0.2.2",
"tslib": "^2.8.1",
"typeorm": "^0.3.30",
+ "why-is-node-running": "^3.2.2",
"wretch": "^3.0.8",
"ws": "^8.21.0"
},
diff --git a/scripts/stress/identify.js b/scripts/stress/identify.js
index 08d8d5db82..7aaca529c8 100644
--- a/scripts/stress/identify.js
+++ b/scripts/stress/identify.js
@@ -3,9 +3,10 @@
require("dotenv").config({ quiet: true });
const { OPCODES } = require("../../dist/gateway/util/Constants.js");
const WebSocket = require("ws");
-const ENDPOINT = `ws://localhost:3001?v=9&encoding=json`;
const TOKEN = process.env.TOKEN;
const TOTAL_ITERATIONS = process.env.ITER ? parseInt(process.env.ITER) : 500;
+const PORT = process.env.PORT ?? 3002;
+const ENDPOINT = `ws://localhost:${PORT}?v=9&encoding=json`;
const doTimedIdentify = () =>
new Promise((resolve) => {
@@ -44,7 +45,8 @@ const doTimedIdentify = () =>
while (perfs.length < TOTAL_ITERATIONS) {
const ret = await doTimedIdentify();
perfs.push(ret);
- // console.log(`${perfs.length}/${TOTAL_ITERATIONS} - this: ${Math.floor(ret)}ms`)
+ const avg = perfs.reduce((prev, curr) => prev + curr) / (perfs.length - 1);
+ console.log(`${perfs.length}/${TOTAL_ITERATIONS} - this: ${Math.floor(ret)}ms - avg: ${Math.floor(avg * 100) / 100}ms`);
}
const avg = perfs.reduce((prev, curr) => prev + curr) / (perfs.length - 1);
diff --git a/scripts/stress/identifyConcurrent.js b/scripts/stress/identifyConcurrent.js
new file mode 100644
index 0000000000..41d5d63f9e
--- /dev/null
+++ b/scripts/stress/identifyConcurrent.js
@@ -0,0 +1,86 @@
+/* eslint-env node */
+
+require("dotenv").config({ quiet: true });
+const { OPCODES } = require("../../dist/gateway/util/Constants.js");
+const WebSocket = require("ws");
+const { sleep } = require("../../dist/util/util/extensions/index.js");
+const TOKEN = process.env.TOKEN;
+const TOTAL_ITERATIONS = process.env.ITER ? parseInt(process.env.ITER) : 500;
+const PORT = process.env.PORT ?? 3002;
+const ENDPOINT = `ws://localhost:${PORT}?v=9&encoding=json`;
+const KEEPALIVE = !!process.env.KEEPALIVE;
+const SLEEP_EVERY = process.env.SLEEP_EVERY ? parseInt(process.env.SLEEP_EVERY) : 100;
+
+const doTimedIdentify = () =>
+ new Promise((resolve, reject) => {
+ let start;
+ const ws = new WebSocket(ENDPOINT);
+ ws.setMaxListeners(TOTAL_ITERATIONS);
+ ws.on("message", (data) => {
+ const parsed = JSON.parse(data);
+ let heartbeat;
+
+ switch (parsed.op) {
+ case OPCODES.Hello:
+ // send identify
+ start = performance.now();
+ ws.send(
+ JSON.stringify({
+ op: OPCODES.Identify,
+ d: {
+ token: TOKEN,
+ properties: {},
+ },
+ }),
+ );
+
+ if (KEEPALIVE)
+ heartbeat = setInterval(() => {
+ ws.send(
+ JSON.stringify({
+ op: OPCODES.Heartbeat,
+ d: null,
+ }),
+ );
+ process.stdout.write(".");
+ }, parsed.d.heartbeat_interval);
+ break;
+ case OPCODES.Dispatch:
+ if (parsed.t == "READY") {
+ if (!KEEPALIVE) ws.close();
+ else process.stdout.write("R");
+ return resolve(performance.now() - start);
+ }
+
+ break;
+ }
+ ws.on("error", reject);
+ if (KEEPALIVE)
+ ws.on("close", () => {
+ process.stdout.write("C");
+ clearTimeout(heartbeat);
+ });
+ });
+ });
+
+(async () => {
+ const promises = [];
+ for (let i = 0; i < TOTAL_ITERATIONS; i++) {
+ promises.push(doTimedIdentify());
+ process.stdout.write("+");
+ // await sleep(Math.random() * 250);
+ if (promises.length % SLEEP_EVERY === 0) await Promise.all(promises);
+ }
+
+ const perfs = [];
+ console.log("Identifies started");
+ for (const prom of promises) {
+ const ret = await prom;
+ perfs.push(ret);
+ const avg = perfs.reduce((prev, curr) => prev + curr) / (perfs.length - 1);
+ console.log(`${perfs.length}/${promises.length}: Identified in ${Math.floor(ret)}ms - avg: ${Math.floor(avg * 100) / 100}ms`);
+ }
+
+ const avg = perfs.reduce((prev, curr) => prev + curr) / (perfs.length - 1);
+ console.log(`Average identify time: ${Math.floor(avg * 100) / 100}ms`);
+})().catch((e) => console.error("Fail:", e));
diff --git a/src/api/Server.ts b/src/api/Server.ts
index 1d9fe2ab22..a1eb1716f3 100644
--- a/src/api/Server.ts
+++ b/src/api/Server.ts
@@ -16,15 +16,16 @@
along with this program. If not, see .
*/
-import { Config, ConnectionConfig, ConnectionLoader, Email, JSONReplacer, WebAuthn, initDatabase, initEvent, registerRoutes, getDatabase, getRevInfoOrFail } from "@spacebar/util";
-import { Authentication, CORS, ImageProxy, BodyParser, ErrorHandler, initRateLimits, initTranslation } from "./middlewares";
+import path from "node:path";
import { Request, Response, Router } from "express";
-import { Server, ServerOptions } from "lambert-server";
import morgan from "morgan";
-import path from "node:path";
+import { Server, ServerOptions } from "lambert-server";
import { red } from "picocolors";
+import { Config, ConnectionConfig, ConnectionLoader, Email, JSONReplacer, WebAuthn, initDatabase, initEvent, registerRoutes, getDatabase, getRevInfoOrFail } from "@spacebar/util";
+import { Authentication, CORS, ImageProxy, BodyParser, ErrorHandler, initRateLimits, initTranslation } from "./middlewares";
import { initInstance } from "./util/handlers/Instance";
import { route } from "./util";
+import { ProcessLifecycle } from "../util/util/ProcessLifecycle";
const ASSETS_FOLDER = path.join(__dirname, "..", "..", "assets");
const PUBLIC_ASSETS_FOLDER = path.join(ASSETS_FOLDER, "public");
@@ -196,6 +197,7 @@ export class SpacebarServer extends Server {
if (logRequests) console.log(red(`Warning: Request logging is enabled! This will spam your console!\nTo disable this, unset the 'LOG_REQUESTS' environment variable!`));
+ await ProcessLifecycle.Ready();
return super.start();
}
}
diff --git a/src/api/routes/stop.ts b/src/api/routes/stop.ts
index 4f86973508..e04b46ba6c 100644
--- a/src/api/routes/stop.ts
+++ b/src/api/routes/stop.ts
@@ -16,8 +16,9 @@
along with this program. If not, see .
*/
-import { route } from "@spacebar/api";
+import { route, SpacebarServer } from "@spacebar/api";
import { Request, Response, Router } from "express";
+import { ProcessLifecycle } from "../../util/util/ProcessLifecycle";
const router: Router = Router({ mergeParams: true });
@@ -35,7 +36,9 @@ router.post(
(req: Request, res: Response) => {
console.log(`/stop was called by ${req.user_id} at ${new Date()}`);
res.sendStatus(200);
- process.kill(process.pid, "SIGTERM");
+ ProcessLifecycle.Shutdown().catch((e) => {
+ console.error("Failed to shut down:", e);
+ });
},
);
diff --git a/src/bundle/Server.ts b/src/bundle/Server.ts
index 91c3dcf423..f743497a12 100644
--- a/src/bundle/Server.ts
+++ b/src/bundle/Server.ts
@@ -16,21 +16,18 @@
along with this program. If not, see .
*/
-import morgan from "morgan";
-
-process.on("unhandledRejection", console.error);
-process.on("uncaughtException", console.error);
-
import http from "node:http";
+import fs from "node:fs";
+import cluster from "node:cluster";
+import morgan from "morgan";
+import express from "express";
+import { green, bold } from "picocolors";
import * as Api from "@spacebar/api";
import * as Gateway from "@spacebar/gateway";
import * as Webrtc from "@spacebar/webrtc";
import { CDNServer } from "@spacebar/cdn";
-import express from "express";
-import { green, bold } from "picocolors";
import { Config, initDatabase } from "@spacebar/util";
-import fs from "node:fs";
-import cluster from "node:cluster";
+import { ProcessLifecycle } from "../util/util/ProcessLifecycle";
const app = express();
const server = http.createServer();
@@ -48,8 +45,7 @@ const webrtc = new Webrtc.Server({
production,
});
-process.on("SIGTERM", async () => {
- console.log("Shutting down due to SIGTERM");
+ProcessLifecycle.eventEmitter.on("stopping", async () => {
await gateway.stop();
await cdn.stop();
await api.stop();
diff --git a/src/cdn/Server.ts b/src/cdn/Server.ts
index 8260407ec3..5d87a86ce2 100644
--- a/src/cdn/Server.ts
+++ b/src/cdn/Server.ts
@@ -16,13 +16,14 @@
along with this program. If not, see .
*/
+import path from "node:path";
+import morgan from "morgan";
import { Server, ServerOptions } from "lambert-server";
import { Attachment, Config, initDatabase, registerRoutes } from "@spacebar/util";
import { CORS, BodyParser } from "@spacebar/api";
-import path from "node:path";
import guildProfilesRoute from "./routes/guild-profiles";
-import morgan from "morgan";
import { storage } from "./util";
+import { ProcessLifecycle } from "../util/util/ProcessLifecycle";
export type CDNServerOptions = ServerOptions;
@@ -71,6 +72,7 @@ export class CDNServer extends Server {
this.app.use("/guilds/:guild_id/users/:user_id/banners", guildProfilesRoute);
if (process.env.LOG_ROUTES !== "false") console.log("[Server] Route /guilds/:guild_id/users/:user_id/banners registered");
+ await ProcessLifecycle.Ready();
return super.start();
}
@@ -89,6 +91,8 @@ export class CDNServer extends Server {
}
async stop() {
+ await ProcessLifecycle.Shutdown();
+ await ProcessLifecycle.Finalize();
return super.stop();
}
}
diff --git a/src/gateway/Server.ts b/src/gateway/Server.ts
index 632fbc5295..5ac94a8857 100644
--- a/src/gateway/Server.ts
+++ b/src/gateway/Server.ts
@@ -16,21 +16,21 @@
along with this program. If not, see .
*/
-import dotenv from "dotenv";
-dotenv.config({ quiet: true });
-import { checkToken, closeDatabase, Config, initDatabase, initEvent, Rights } from "@spacebar/util";
-import ws from "ws";
-import { Connection, openConnections } from "./events/Connection";
import http from "node:http";
-import { cleanupOnStartup } from "./util";
-import { randomString } from "@spacebar/api";
import { setInterval } from "node:timers";
+import ws from "ws";
+import { checkToken, Config, initDatabase, initEvent, Rights } from "@spacebar/util";
+import { randomString } from "@spacebar/api"; // TODO: move to util
+import { Connection, openConnections } from "./events/Connection";
+import { cleanupOnStartup, OPCODES, Send } from "./util";
+import { ProcessLifecycle } from "../util/util/ProcessLifecycle";
export class Server {
public ws: ws.Server;
public port: number;
public server: http.Server;
public production: boolean;
+ private monitoringLoop: NodeJS.Timeout;
constructor({ port, server, production }: { port: number; server?: http.Server; production?: boolean }) {
this.port = port;
@@ -42,7 +42,7 @@ export class Server {
const eluP = [1, 5, 15].map(() => performance.eventLoopUtilization());
const cpu = [1, 5, 15].map(() => process.cpuUsage());
let sec = 0;
- setInterval(() => {
+ const monitoringLoop = setInterval(() => {
sec += 1;
// for some reason this behaves differently from cpuUsage, so we need an absolute reference as "previous"
const eluC = performance.eventLoopUtilization();
@@ -150,6 +150,8 @@ export class Server {
res.writeHead(200).end("Online");
});
+
+ ProcessLifecycle.eventEmitter.on("stopping", () => clearTimeout(monitoringLoop));
}
this.server.on("upgrade", (request, socket, head) => {
@@ -177,14 +179,16 @@ export class Server {
this.server.listen(this.port);
console.log(`[Gateway] online on 0.0.0.0:${this.port}`);
}
+
+ await ProcessLifecycle.Ready();
}
async stop() {
+ await ProcessLifecycle.Shutdown();
+ clearInterval(this.monitoringLoop);
this.ws.clients.forEach((x) => x.close());
- this.ws.close(() => {
- this.server.close(() => {
- closeDatabase();
- });
- });
+ this.ws.close();
+ this.server.close();
+ await ProcessLifecycle.Finalize();
}
}
diff --git a/src/gateway/events/Close.ts b/src/gateway/events/Close.ts
index 594149057c..6a3c6cb324 100644
--- a/src/gateway/events/Close.ts
+++ b/src/gateway/events/Close.ts
@@ -19,6 +19,7 @@
import { WebSocket } from "@spacebar/gateway";
import { emitEvent, Member, PresenceUpdateEvent, Session, SessionsReplace, User, VoiceState, VoiceStateUpdateEvent, distributePresenceUpdate } from "@spacebar/util";
import { randomString } from "@spacebar/api";
+import { ProcessLifecycle } from "../../util/util/ProcessLifecycle";
export async function Close(this: WebSocket, code: number, reason: Buffer) {
console.log("[WebSocket] closed", code, reason.toString());
@@ -32,36 +33,37 @@ export async function Close(this: WebSocket, code: number, reason: Buffer) {
const authSessionId = this.session?.session_id;
const closedAt = Date.now();
- setTimeout(async () => {
- console.log("Handling presence update after disconnect");
- try {
- if (authSessionId && this.user_id) {
- const s = await Session.findOne({
- where: { user_id: this.user_id, session_id: authSessionId },
- });
- if (s && (s.last_seen?.getTime() ?? 0) <= closedAt) {
- console.log("... updating session");
- await Session.update({ user_id: this.user_id, session_id: authSessionId }, { status: "offline", activities: [], client_status: {} });
- this.session = await Session.findOneOrFail({ where: { session_id: this.session_id } });
- console.log("... distributing PRESENCE_UPDATE");
- await distributePresenceUpdate(this.user_id, {
- event: "PRESENCE_UPDATE",
- data: {
- user: (await User.findOneOrFail({ where: { id: this.user_id } })).toPublicUser(),
- status: this.session!.getPublicStatus(),
- client_status: this.session!.client_status,
- activities: this.session!.activities,
- },
- origin: "GATEWAY_CLOSE",
- transaction_id: `IDENT_${this.user_id}_${randomString()}`,
- } satisfies PresenceUpdateEvent);
- console.log("... done!");
- } else console.log("... Discarding presence update as the session reactivated");
+ if (!(ProcessLifecycle.state === "stopping" || ProcessLifecycle.state === "stopped"))
+ setTimeout(async () => {
+ console.log("Handling presence update after disconnect");
+ try {
+ if (authSessionId && this.user_id) {
+ const s = await Session.findOne({
+ where: { user_id: this.user_id, session_id: authSessionId },
+ });
+ if (s && (s.last_seen?.getTime() ?? 0) <= closedAt) {
+ console.log("... updating session");
+ await Session.update({ user_id: this.user_id, session_id: authSessionId }, { status: "offline", activities: [], client_status: {} });
+ this.session = await Session.findOneOrFail({ where: { session_id: this.session_id } });
+ console.log("... distributing PRESENCE_UPDATE");
+ await distributePresenceUpdate(this.user_id, {
+ event: "PRESENCE_UPDATE",
+ data: {
+ user: (await User.findOneOrFail({ where: { id: this.user_id } })).toPublicUser(),
+ status: this.session!.getPublicStatus(),
+ client_status: this.session!.client_status,
+ activities: this.session!.activities,
+ },
+ origin: "GATEWAY_CLOSE",
+ transaction_id: `IDENT_${this.user_id}_${randomString()}`,
+ } satisfies PresenceUpdateEvent);
+ console.log("... done!");
+ } else console.log("... Discarding presence update as the session reactivated");
+ }
+ } catch (e) {
+ console.error("[WebSocket] Close session cleanup failed", code, e);
}
- } catch (e) {
- console.error("[WebSocket] Close session cleanup failed", code, e);
- }
- }, 10_000);
+ }, 10_000);
const voiceState = await VoiceState.findOne({
where: { user_id: this.user_id },
diff --git a/src/gateway/events/Connection.ts b/src/gateway/events/Connection.ts
index 5876c3e15e..d795d23627 100644
--- a/src/gateway/events/Connection.ts
+++ b/src/gateway/events/Connection.ts
@@ -29,6 +29,7 @@ import { Deflate, Inflate } from "fast-zlib";
import { URL } from "node:url";
import { Config } from "@spacebar/util";
import { Decoder, Encoder } from "@toondepauw/node-zstd";
+import { ProcessLifecycle } from "../../util/util/ProcessLifecycle";
// TODO: check rate limit
// TODO: specify rate limit in config
@@ -43,6 +44,27 @@ export async function Connection(this: WS.Server, socket: WebSocket, request: In
if (index !== -1) openConnections.splice(index, 1);
});
+ const onShutdown = async () => {
+ await Send(socket, {
+ op: OPCODES.Reconnect,
+ s: socket.sequence++,
+ d: Math.round(Math.random() * 5000),
+ });
+
+ const closeListeners = socket.listeners("close");
+ for (const listener of closeListeners) {
+ socket.off("close", listener);
+ // noinspection JSVoidFunctionReturnValueUsed - awaiting results
+ const res = listener.call(socket, 1000, 0) as void | Promise;
+ if (res) await res;
+ }
+
+ socket.close(1000);
+ };
+
+ if (ProcessLifecycle.state == "stopping" || ProcessLifecycle.state == "stopped") return await onShutdown();
+ ProcessLifecycle.eventEmitter.on("stopping", onShutdown);
+
const forwardedFor = Config.get().security.forwardedFor;
const ipAddress = forwardedFor ? (request.headers[forwardedFor.toLowerCase()] as string) : request.socket.remoteAddress;
diff --git a/src/util/util/Database.ts b/src/util/util/Database.ts
index 004fd3f5e1..ecbb7110c1 100644
--- a/src/util/util/Database.ts
+++ b/src/util/util/Database.ts
@@ -23,6 +23,7 @@ import { DataSource } from "typeorm";
// noinspection ES6PreferShortImport
import { ConfigEntity } from "../entities/Config";
import fs from "node:fs";
+import { ProcessLifecycle } from "./ProcessLifecycle";
// UUID extension option is only supported with postgres
// We want to generate all id's with Snowflakes that's why we have our own BaseEntity class
@@ -127,11 +128,13 @@ export async function initDatabase(): Promise {
}
}
- console.log(`[Database] ${green("Connected")}`);
+ ProcessLifecycle.eventEmitter.on("stopped", async () => await closeDatabase());
+ console.log(`[Database] ${green("Connected")}`);
return dbConnection;
}
export async function closeDatabase() {
- await dbConnection?.destroy();
+ if (DataSourceOptions.isInitialized) await DataSourceOptions.destroy();
+ if (dbConnection?.isInitialized) await dbConnection?.destroy();
}
diff --git a/src/util/util/ProcessLifecycle.ts b/src/util/util/ProcessLifecycle.ts
new file mode 100644
index 0000000000..6b4a6d7783
--- /dev/null
+++ b/src/util/util/ProcessLifecycle.ts
@@ -0,0 +1,62 @@
+/*
+ Spacebar: A FOSS re-implementation and extension of the Discord.com backend.
+ Copyright (C) 2026 Spacebar and Spacebar Contributors
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see .
+*/
+
+import EventEmitter from "node:events";
+import whyIsNodeRunning from "why-is-node-running";
+
+interface ProcessLifecycleEvents {
+ starting: unknown[];
+ running: unknown[];
+ stopping: unknown[];
+ stopped: unknown[];
+}
+
+export class ProcessLifecycle {
+ static state: keyof ProcessLifecycleEvents = "starting";
+ static eventEmitter: EventEmitter = new EventEmitter();
+
+ // to be ran after startup is finished
+ static async Ready() {
+ await this.emitAsync((this.state = "running"));
+ }
+
+ // to be ran at the start of shutdown
+ static async Shutdown() {
+ await this.emitAsync((this.state = "stopping"));
+ }
+
+ // to be ran at the end of shutdown (clean up sockets, ...)
+ static async Finalize() {
+ await this.emitAsync((this.state = "stopped"));
+ }
+
+ // emit, except it awaits promises
+ private static async emitAsync(eventName: keyof ProcessLifecycleEvents) {
+ for (const evt of this.eventEmitter.listeners(eventName)) {
+ // noinspection JSVoidFunctionReturnValueUsed - we want to handle async functions blocking aswell
+ const res = evt() as void | Promise;
+ if (res) await res;
+ }
+ }
+}
+
+process.on("SIGUSR1", () => {
+ console.log("Handling SIGUSR1:");
+ whyIsNodeRunning();
+ console.log("\nProcess state:", ProcessLifecycle.state);
+});
diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts
index b5fd5efd50..cd89cac6bc 100644
--- a/src/util/util/ipc/listener/RabbitMqSingleListener.ts
+++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts
@@ -17,10 +17,11 @@
*/
import EventEmitter from "node:events";
+import { randomUUID } from "node:crypto";
import { BaseEventListener } from "./BaseEventListener";
import { EVENT, Event, EventOpts, sleep } from "@spacebar/util";
import amqp, { Channel, ChannelModel } from "amqplib";
-import { randomUUID } from "node:crypto";
+import { ProcessLifecycle } from "../../ProcessLifecycle";
export class RabbitMqSingleListener extends BaseEventListener {
private readonly host: string;
@@ -50,9 +51,7 @@ export class RabbitMqSingleListener extends BaseEventListener {
}
this.channel = await this.connection.createChannel();
- for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
- process.on(sig, this.close);
- }
+ ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close());
this.connection.on("error", (err) => {
console.error("[RabbitMQSingleListener] Connection error:", err);
diff --git a/src/util/util/ipc/listener/UnixSocketListener.ts b/src/util/util/ipc/listener/UnixSocketListener.ts
index 8ead85bda5..be1b455fec 100644
--- a/src/util/util/ipc/listener/UnixSocketListener.ts
+++ b/src/util/util/ipc/listener/UnixSocketListener.ts
@@ -21,11 +21,13 @@ import fs from "node:fs";
import net, { Server } from "node:net";
import { BaseEventListener } from "./BaseEventListener";
import { EVENT, Event, EventOpts } from "@spacebar/util";
+import { ProcessLifecycle } from "../../ProcessLifecycle";
export class UnixSocketListener extends BaseEventListener {
eventEmitter: EventEmitter;
socketPath: string;
server: Server;
+ isInitialized = false;
constructor(socketPath: string) {
super();
@@ -47,7 +49,7 @@ export class UnixSocketListener extends BaseEventListener {
this.server = net.createServer((socket) => {
socket.on("connect", () => {
- console.log("[UnixSocketListener] Unix socket client connected");
+ console.log("[UnixSocketListener] Unix socket client connected, now at", this.server.connections, "connections...");
});
let buffer = Buffer.alloc(0);
socket.on("data", (data: Buffer) => {
@@ -74,15 +76,18 @@ export class UnixSocketListener extends BaseEventListener {
});
this.server.listen(this.socketPath, () => {
- console.log(`Unix socket server listening on ${this.socketPath}`);
+ console.log(`[UnixSocketListener] Listening on ${this.socketPath}`);
});
- for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
- process.on(sig, this.close);
- }
+ ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close());
+ this.isInitialized = true;
}
async close(): Promise {
+ if (!this.isInitialized) {
+ console.log("[UnixSocketListener] close() called before init! - Path:", this.socketPath, " - server:", this.server, " - this:", this);
+ }
+
console.log("[UnixSocketListener] Closing unix socket server");
this.server.close();
@@ -90,10 +95,9 @@ export class UnixSocketListener extends BaseEventListener {
try {
fs.unlinkSync(this.socketPath);
} catch (e) {
+ if (e instanceof Error && "errno" in e && e.errno == -2) return;
console.error("[UnixSocketListener] Failed to unlink socket file:", e);
}
-
- process.exit(0);
}
async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> {
diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts
index 28781c6dbd..ad00aa2b51 100644
--- a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts
+++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts
@@ -19,6 +19,7 @@
import { BaseEventWriter } from "./BaseEventWriter";
import amqp, { Channel, ChannelModel } from "amqplib";
import { Event, sleep } from "@spacebar/util";
+import { ProcessLifecycle } from "../../ProcessLifecycle";
export class RabbitMqSingleWriter extends BaseEventWriter {
private readonly host: string;
@@ -46,10 +47,7 @@ export class RabbitMqSingleWriter extends BaseEventWriter {
}
this.channel = await this.connection.createChannel();
- for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
- process.on(sig, this.close);
- }
-
+ ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close());
this.connection.on("error", (err) => {
console.error("[RabbitMQSingleWriter] Connection error:", err);
});
diff --git a/src/util/util/ipc/writer/UnixSocketWriter.ts b/src/util/util/ipc/writer/UnixSocketWriter.ts
index 53b9564ce2..357034d795 100644
--- a/src/util/util/ipc/writer/UnixSocketWriter.ts
+++ b/src/util/util/ipc/writer/UnixSocketWriter.ts
@@ -22,6 +22,7 @@ import path from "node:path";
import { red } from "picocolors";
import { BaseEventWriter } from "./BaseEventWriter";
import { Event, Stopwatch } from "@spacebar/util";
+import { ProcessLifecycle } from "../../ProcessLifecycle";
export class UnixSocketWriter extends BaseEventWriter {
socketPath: string;
@@ -142,6 +143,7 @@ export class UnixSocketWriter extends BaseEventWriter {
console.error("[UnixSocketWriter] Unix socket writer failed to read directory:", err);
}
+ ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close());
this.isInitializing = false;
}
diff --git a/src/webrtc/Server.ts b/src/webrtc/Server.ts
index 0f8ead2ce6..8bc34024c1 100644
--- a/src/webrtc/Server.ts
+++ b/src/webrtc/Server.ts
@@ -15,14 +15,14 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
*/
-import dotenv from "dotenv";
-dotenv.config({ quiet: true });
-import { closeDatabase, Config, initDatabase, initEvent, Session, TimeSpan } from "@spacebar/util";
+
import http from "node:http";
import ws from "ws";
+import { green, yellow } from "picocolors";
+import { Config, initDatabase, initEvent } from "@spacebar/util";
import { Connection } from "./events/Connection";
import { loadWebRtcLibrary, mediaServer, WRTC_PORT_MAX, WRTC_PORT_MIN, WRTC_PUBLIC_IP } from "./util";
-import { green, yellow } from "picocolors";
+import { ProcessLifecycle } from "../util/util/ProcessLifecycle";
export class Server {
public ws: ws.Server;
@@ -76,11 +76,14 @@ export class Server {
this.server.listen(this.port);
console.log(`[WebRTC] ${green(`online on 0.0.0.0:${this.port}`)}`);
}
+
+ await ProcessLifecycle.Ready();
}
async stop() {
- await closeDatabase();
+ await ProcessLifecycle.Shutdown();
this.server.close();
- mediaServer?.stop();
+ await mediaServer?.stop();
+ await ProcessLifecycle.Finalize();
}
}