From b5886398c00b229948247c308d9db8e2d1e23625 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=86=A0=E8=BE=B0?= Date: Sat, 20 Jun 2026 14:36:21 +0800 Subject: [PATCH] fix(gateway): isolate Hermes memory by user MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 李冠辰 --- .../memory/memory_tencentdb/__init__.py | 2 + .../memory/memory_tencentdb/client.py | 24 ++- src/gateway/server.test.ts | 145 ++++++++++++++++++ src/gateway/server.ts | 125 +++++++++++---- src/gateway/types.ts | 4 + 5 files changed, 269 insertions(+), 31 deletions(-) create mode 100644 src/gateway/server.test.ts diff --git a/hermes-plugin/memory/memory_tencentdb/__init__.py b/hermes-plugin/memory/memory_tencentdb/__init__.py index 86350fad..e1775ec7 100644 --- a/hermes-plugin/memory/memory_tencentdb/__init__.py +++ b/hermes-plugin/memory/memory_tencentdb/__init__.py @@ -1020,6 +1020,7 @@ def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> st query=query, limit=_coerce_limit(args.get("limit")), type_filter=args.get("type", ""), + user_id=self._user_id, ) self._record_success() return json.dumps(result) @@ -1031,6 +1032,7 @@ def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> st result = self._client.search_conversations( query=query, limit=_coerce_limit(args.get("limit")), + user_id=self._user_id, ) self._record_success() return json.dumps(result) diff --git a/hermes-plugin/memory/memory_tencentdb/client.py b/hermes-plugin/memory/memory_tencentdb/client.py index 9e9338cd..dbe68d64 100644 --- a/hermes-plugin/memory/memory_tencentdb/client.py +++ b/hermes-plugin/memory/memory_tencentdb/client.py @@ -139,18 +139,35 @@ def capture( body["user_id"] = user_id return self._post("/capture", body) - def search_memories(self, query: str, limit: int = 5, type_filter: str = "", scene: str = "") -> Dict[str, Any]: + def search_memories( + self, + query: str, + limit: int = 5, + type_filter: str = "", + scene: str = "", + user_id: str = "", + ) -> Dict[str, Any]: """Search L1 structured memories.""" body: Dict[str, Any] = {"query": query, "limit": limit} + if user_id: + body["user_id"] = user_id if type_filter: body["type"] = type_filter if scene: body["scene"] = scene return self._post("/search/memories", body) - def search_conversations(self, query: str, limit: int = 5, session_key: str = "") -> Dict[str, Any]: + def search_conversations( + self, + query: str, + limit: int = 5, + session_key: str = "", + user_id: str = "", + ) -> Dict[str, Any]: """Search L0 raw conversations.""" body: Dict[str, Any] = {"query": query, "limit": limit} + if user_id: + body["user_id"] = user_id if session_key: body["session_key"] = session_key return self._post("/search/conversations", body) @@ -166,6 +183,7 @@ def seed( self, data: Any, session_key: str = "", + user_id: str = "", strict_round_role: bool = False, auto_fill_timestamps: bool = True, config_override: Optional[Dict[str, Any]] = None, @@ -187,6 +205,8 @@ def seed( body: Dict[str, Any] = {"data": data} if session_key: body["session_key"] = session_key + if user_id: + body["user_id"] = user_id if strict_round_role: body["strict_round_role"] = True if not auto_fill_timestamps: diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts new file mode 100644 index 00000000..971454b1 --- /dev/null +++ b/src/gateway/server.test.ts @@ -0,0 +1,145 @@ +import net from "node:net"; +import os from "node:os"; +import path from "node:path"; +import { mkdtemp, readdir, readFile, rm, writeFile } from "node:fs/promises"; +import { afterEach, describe, expect, it } from "vitest"; +import { parseConfig } from "../config.js"; +import { TdaiGateway } from "./server.js"; + +async function pickFreePort(): Promise { + return new Promise((resolve, reject) => { + const server = net.createServer(); + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + if (!address || typeof address === "string") { + server.close(() => reject(new Error("failed to allocate port"))); + return; + } + const port = address.port; + server.close(() => resolve(port)); + }); + }); +} + +async function postJson(port: number, pathName: string, body: unknown): Promise { + const response = await fetch(`http://127.0.0.1:${port}${pathName}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + }); + const text = await response.text(); + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${text}`); + } + return JSON.parse(text) as T; +} + +async function listJsonlFiles(root: string): Promise { + try { + const entries = await readdir(root, { recursive: true, withFileTypes: true }); + return entries + .filter((entry) => entry.isFile() && entry.name.endsWith(".jsonl")) + .map((entry) => path.join(entry.parentPath, entry.name)) + .sort(); + } catch { + return []; + } +} + +describe("TdaiGateway user scoping", () => { + let gateway: TdaiGateway | undefined; + let dataDir: string | undefined; + + afterEach(async () => { + if (gateway) { + await gateway.stop(); + gateway = undefined; + } + if (dataDir) { + await rm(dataDir, { recursive: true, force: true }); + dataDir = undefined; + } + }); + + it("does not expose legacy persona data to a non-default user_id", async () => { + dataDir = await mkdtemp(path.join(os.tmpdir(), "tdai-gateway-user-scope-")); + await writeFile( + path.join(dataDir, "persona.md"), + "Alice legacy private persona marker", + "utf-8", + ); + const port = await pickFreePort(); + const memory = parseConfig({ + extraction: { enabled: false }, + embedding: { provider: "none" }, + recall: { strategy: "keyword" }, + }); + + gateway = new TdaiGateway({ + server: { host: "127.0.0.1", port, corsOrigins: [] }, + data: { baseDir: dataDir }, + memory, + }); + await gateway.start(); + + const bobRecall = await postJson<{ context: string; memory_count: number }>(port, "/recall", { + user_id: "bob", + session_key: "shared-session", + query: "private persona marker", + }); + expect(bobRecall.context).toBe(""); + expect(bobRecall.memory_count).toBe(0); + + const defaultRecall = await postJson<{ context: string; memory_count: number }>(port, "/recall", { + session_key: "shared-session", + query: "private persona marker", + }); + expect(defaultRecall.context).toContain("Alice legacy private persona marker"); + + const defaultAliasRecall = await postJson<{ context: string; memory_count: number }>(port, "/recall", { + user_id: "default", + session_key: "shared-session", + query: "private persona marker", + }); + expect(defaultAliasRecall.context).toContain("Alice legacy private persona marker"); + }); + + it("captures non-default user_id data outside the legacy base directory", async () => { + dataDir = await mkdtemp(path.join(os.tmpdir(), "tdai-gateway-user-scope-")); + const port = await pickFreePort(); + const memory = parseConfig({ + extraction: { enabled: false }, + embedding: { provider: "none" }, + recall: { strategy: "keyword" }, + }); + + gateway = new TdaiGateway({ + server: { host: "127.0.0.1", port, corsOrigins: [] }, + data: { baseDir: dataDir }, + memory, + }); + await gateway.start(); + + const now = Date.now() + 10_000; + const capture = await postJson<{ l0_recorded: number }>(port, "/capture", { + user_id: "alice", + session_key: "shared-session", + user_content: "alice private sentinel project alpha", + assistant_content: "acknowledged alice private sentinel project alpha", + messages: [ + { role: "user", content: "alice private sentinel project alpha", timestamp: now }, + { role: "assistant", content: "acknowledged alice private sentinel project alpha", timestamp: now + 1 }, + ], + }); + expect(capture.l0_recorded).toBeGreaterThan(0); + + const legacyFiles = await listJsonlFiles(path.join(dataDir, "conversations")); + expect(legacyFiles).toEqual([]); + + const scopedFiles = await listJsonlFiles(path.join(dataDir, "users")); + expect(scopedFiles.length).toBeGreaterThan(0); + const scopedText = (await Promise.all(scopedFiles.map((file) => readFile(file, "utf-8")))).join("\n"); + expect(scopedText).toContain("alice private sentinel project alpha"); + }); +}); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index b4c66ec6..b1770704 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -15,8 +15,9 @@ */ import http from "node:http"; +import path from "node:path"; import { URL } from "node:url"; -import { timingSafeEqual } from "node:crypto"; +import { createHash, timingSafeEqual } from "node:crypto"; import { TdaiCore } from "../core/tdai-core.js"; import { StandaloneHostAdapter } from "../adapters/standalone/host-adapter.js"; import { loadGatewayConfig } from "./config.js"; @@ -46,6 +47,12 @@ import type { SeedProgress } from "../core/seed/types.js"; const TAG = "[tdai-gateway]"; const VERSION = "0.1.0"; +const DEFAULT_USER_SCOPE = "__default__"; + +interface ScopedCoreEntry { + core: TdaiCore; + ready: Promise; +} // ============================ // Console logger (for standalone gateway — no OpenClaw logger available) @@ -107,6 +114,22 @@ function safeEqual(a: string, b: string): boolean { return timingSafeEqual(ab, bb); } +function isDefaultUserId(userId: string | undefined): boolean { + const normalized = userId?.trim().toLowerCase(); + return !normalized || normalized === "default" || normalized === "default_user"; +} + +function userScopeKey(userId: string | undefined): string { + if (isDefaultUserId(userId)) return DEFAULT_USER_SCOPE; + return createHash("sha256").update(userId!.trim()).digest("hex").slice(0, 32); +} + +function scopedDataDir(baseDir: string, userId: string | undefined): string { + const scope = userScopeKey(userId); + if (scope === DEFAULT_USER_SCOPE) return baseDir; + return path.join(baseDir, "users", `u_${scope}`); +} + // ============================ // Gateway Server // ============================ @@ -114,28 +137,13 @@ function safeEqual(a: string, b: string): boolean { export class TdaiGateway { private config: GatewayConfig; private logger: Logger; - private core: TdaiCore; + private cores = new Map(); private server: http.Server | null = null; private startTime = Date.now(); constructor(configOverrides?: Partial) { this.config = loadGatewayConfig(configOverrides); this.logger = createConsoleLogger(); - - // Create host adapter - const adapter = new StandaloneHostAdapter({ - dataDir: this.config.data.baseDir, - llmConfig: this.config.llm, - logger: this.logger, - platform: "gateway", - }); - - // Create core - this.core = new TdaiCore({ - hostAdapter: adapter, - config: this.config.memory, - sessionFilter: new SessionFilter(this.config.memory.capture.excludeAgents), - }); } /** @@ -145,8 +153,9 @@ export class TdaiGateway { // Initialize data directories initDataDirectories(this.config.data.baseDir); - // Initialize core - await this.core.initialize(); + // Initialize the legacy/default user scope eagerly so /health reflects + // startup readiness while non-default users are created lazily per request. + await this.getCoreForUser(undefined); // Create HTTP server this.server = http.createServer((req, res) => this.handleRequest(req, res)); @@ -224,10 +233,62 @@ export class TdaiGateway { }); } - await this.core.destroy(); + const entries = [...this.cores.values()]; + this.cores.clear(); + for (const entry of entries) { + await entry.ready.catch(() => {}); + await entry.core.destroy(); + } this.logger.info("Gateway stopped"); } + private createCore(dataDir: string, userId: string | undefined): TdaiCore { + const adapter = new StandaloneHostAdapter({ + dataDir, + llmConfig: this.config.llm, + logger: this.logger, + defaultUserId: isDefaultUserId(userId) ? "default_user" : userId!.trim(), + platform: "gateway", + }); + + return new TdaiCore({ + hostAdapter: adapter, + config: this.config.memory, + sessionFilter: new SessionFilter(this.config.memory.capture.excludeAgents), + }); + } + + private getOrCreateCoreEntry(userId: string | undefined): ScopedCoreEntry { + const userScope = userScopeKey(userId); + const existing = this.cores.get(userScope); + if (existing) return existing; + + const dataDir = scopedDataDir(this.config.data.baseDir, userId); + const core = this.createCore(dataDir, userId); + const entry: ScopedCoreEntry = { + core, + ready: (async () => { + initDataDirectories(dataDir); + await core.initialize(); + })(), + }; + entry.ready.catch(() => { + this.cores.delete(userScope); + }); + this.cores.set(userScope, entry); + return entry; + } + + private async getCoreForUser(userId: string | undefined): Promise { + const entry = this.getOrCreateCoreEntry(userId); + await entry.ready; + return entry.core; + } + + private getDataDirForUser(userId: string | undefined): string { + return scopedDataDir(this.config.data.baseDir, userId); + } + // ============================ // Request router // ============================ @@ -356,13 +417,14 @@ export class TdaiGateway { // ============================ private handleHealth(res: http.ServerResponse): void { + const defaultCore = this.cores.get(DEFAULT_USER_SCOPE)?.core; const response: HealthResponse = { - status: this.core.getVectorStore() ? "ok" : "degraded", + status: defaultCore?.getVectorStore() ? "ok" : "degraded", version: VERSION, uptime: Math.floor((Date.now() - this.startTime) / 1000), stores: { - vectorStore: !!this.core.getVectorStore(), - embeddingService: !!this.core.getEmbeddingService(), + vectorStore: !!defaultCore?.getVectorStore(), + embeddingService: !!defaultCore?.getEmbeddingService(), }, }; sendJson(res, 200, response); @@ -377,7 +439,8 @@ export class TdaiGateway { } const startMs = Date.now(); - const result = await this.core.handleBeforeRecall(body.query, body.session_key); + const core = await this.getCoreForUser(body.user_id); + const result = await core.handleBeforeRecall(body.query, body.session_key); const elapsed = Date.now() - startMs; this.logger.info(`Recall completed in ${elapsed}ms: context=${(result.appendSystemContext?.length ?? 0)} chars`); @@ -399,7 +462,8 @@ export class TdaiGateway { } const startMs = Date.now(); - const result = await this.core.handleTurnCommitted({ + const core = await this.getCoreForUser(body.user_id); + const result = await core.handleTurnCommitted({ userText: body.user_content, assistantText: body.assistant_content, messages: body.messages ?? [ @@ -428,7 +492,8 @@ export class TdaiGateway { return; } - const result = await this.core.searchMemories({ + const core = await this.getCoreForUser(body.user_id); + const result = await core.searchMemories({ query: body.query, limit: body.limit, type: body.type, @@ -451,7 +516,8 @@ export class TdaiGateway { return; } - const result = await this.core.searchConversations({ + const core = await this.getCoreForUser(body.user_id); + const result = await core.searchConversations({ query: body.query, limit: body.limit, sessionKey: body.session_key, @@ -472,7 +538,8 @@ export class TdaiGateway { return; } - await this.core.handleSessionEnd(body.session_key); + const core = await this.getCoreForUser(body.user_id); + await core.handleSessionEnd(body.session_key); const response: SessionEndResponse = { flushed: true }; sendJson(res, 200, response); @@ -516,7 +583,7 @@ export class TdaiGateway { const ts = `${now.getFullYear()}${pad(now.getMonth() + 1)}${pad(now.getDate())}-` + `${pad(now.getHours())}${pad(now.getMinutes())}${pad(now.getSeconds())}`; - const outputDir = `${this.config.data.baseDir}/seed-${ts}`; + const outputDir = `${this.getDataDirForUser(body.user_id)}/seed-${ts}`; // Merge config overrides if provided // Start with the base memory config + inject llm config from gateway settings diff --git a/src/gateway/types.ts b/src/gateway/types.ts index 50b2ff4c..aae638f3 100644 --- a/src/gateway/types.ts +++ b/src/gateway/types.ts @@ -65,6 +65,7 @@ export interface CaptureResponse { export interface MemorySearchRequest { query: string; + user_id?: string; limit?: number; type?: string; scene?: string; @@ -82,6 +83,7 @@ export interface MemorySearchResponse { export interface ConversationSearchRequest { query: string; + user_id?: string; limit?: number; session_key?: string; } @@ -125,6 +127,8 @@ export interface SeedRequest { data: unknown; /** Fallback session key when input sessions lack one. */ session_key?: string; + /** User scope for imported memories. */ + user_id?: string; /** Require each round to have both user and assistant messages. */ strict_round_role?: boolean; /** Auto-fill missing timestamps (default: true). */