Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/valkey-cache-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@browserbasehq/stagehand": minor
---

Add Valkey as an optional cache backend via iovalkey. Configure with `valkeyHost` (and optional `valkeyPort`, `valkeyTls`, `valkeyPassword`, `valkeyUsername`, `cacheTtl`, `valkeyKeyPrefix`, `valkeyRequestTimeout`, `valkeyMaxCacheValueBytes`) to store act/agent cache entries in Valkey instead of the local filesystem. Gracefully falls back to disabled caching if the connection fails.
7 changes: 6 additions & 1 deletion packages/core/lib/v3/cache/ActCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ export class ActCache {
value: entry,
error,
path,
} = await this.storage.readJson<CachedActEntry>(`${context.cacheKey}.json`);
} = await this.storage.readJson<CachedActEntry>(
`${context.cacheKey}.json`,
"act",
);
if (error && path) {
this.logger({
category: "cache",
Expand Down Expand Up @@ -161,6 +164,7 @@ export class ActCache {
const { error, path } = await this.storage.writeJson(
`${context.cacheKey}.json`,
entry,
"act",
);
if (error && path) {
this.logger({
Expand Down Expand Up @@ -334,6 +338,7 @@ export class ActCache {
...entry,
variableKeys: context.variableKeys,
},
"act",
);

if (error && path) {
Expand Down
9 changes: 9 additions & 0 deletions packages/core/lib/v3/cache/AgentCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ export class AgentCache {
path,
} = await this.storage.readJson<CachedAgentEntry>(
`agent-${context.cacheKey}.json`,
// NOTE: The `category` parameter provides act/agent namespace isolation only for
// the Valkey backend (via key prefix). File and in-memory backends ignore it and
// store entries at `agent-${cacheKey}.json` — collisions don't occur in practice
// because act and agent hash different payloads, producing distinct SHA-256 keys,
// but the `agent-` prefix provides an additional layer of separation on disk.
"agent",
);
if (error && path) {
this.logger({
Expand Down Expand Up @@ -365,6 +371,7 @@ export class AgentCache {
const { error, path } = await this.storage.writeJson(
`agent-${context.cacheKey}.json`,
entry,
"agent",
);
if (error && path) {
this.logger({
Expand Down Expand Up @@ -415,6 +422,7 @@ export class AgentCache {
const { error, path } = await this.storage.writeJson(
`agent-${payload.cacheKey}.json`,
entry,
"agent",
);
if (error && path) {
this.logger({
Expand Down Expand Up @@ -880,6 +888,7 @@ export class AgentCache {
const { error, path } = await this.storage.writeJson(
`agent-${context.cacheKey}.json`,
updatedEntry,
"agent",
);
if (error && path) {
this.logger({
Expand Down
283 changes: 279 additions & 4 deletions packages/core/lib/v3/cache/CacheStorage.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import fs from "fs";
import path from "path";
import type { Logger } from "../types/public/index.js";
import { ReadJsonResult, WriteJsonResult } from "../types/private/index.js";
import {
CacheCategory,
ReadJsonResult,
WriteJsonResult,
} from "../types/private/index.js";

const jsonClone = <T>(value: T): T => {
const serialized = JSON.stringify(value);
Expand All @@ -11,11 +15,76 @@ const jsonClone = <T>(value: T): T => {
return JSON.parse(serialized) as T;
};

/**
* Configuration for the Valkey cache backend.
*/
export interface ValkeyCacheOptions {
/** Valkey host address. */
host: string;
/** Valkey port (default: 6379). */
port?: number;
/** Enable TLS for the connection. */
useTls?: boolean;
/** Authentication password (IAM token or static auth token). */
password?: string;
/** Authentication username (for ACL-enabled instances). */
username?: string;
/** Default TTL in seconds for cache entries. Omit for no expiry. */
cacheTtl?: number;
/** Key prefix namespace (default: "stagehand"). */
keyPrefix?: string;
/** Request timeout in ms (default: 5000). */
requestTimeout?: number;
/** Max allowed cache value size in bytes (default: 5MB). Writes exceeding this are skipped. */
maxCacheValueBytes?: number;
}

/**
* Options shape for ValkeyClientLike.set(), matching GLIDE's expiry API.
*/
interface ValkeySetOptions {
expiry?: { type: "EX" | "PX" | "EXAT" | "PXAT"; count: number };
}

/**
* Minimal interface matching the subset of Valkey client methods used by
* CacheStorage. This avoids a hard compile-time dependency on iovalkey
* for users who don't need the Valkey backend.
*/
interface ValkeyClientLike {
get(key: string): Promise<string | null>;
set(
key: string,
value: string,
options?: ValkeySetOptions,
): Promise<string | null>;
del(keys: string[]): Promise<number>;
close(): Promise<void>;
}

/**
* Minimal type shape for the dynamically imported iovalkey module.
*/
interface IovalkeyModule {
default: new (options: Record<string, unknown>) => IovalkeyClient;
}

interface IovalkeyClient {
connect(): Promise<void>;
get(key: string): Promise<string | null>;
set(key: string, value: string, ...args: unknown[]): Promise<string | null>;
del(...keys: string[]): Promise<number>;
quit(): Promise<string>;
disconnect(): void;
}

export class CacheStorage {
private constructor(
private readonly logger: Logger,
private readonly dir?: string,
private readonly memoryStore?: Map<string, unknown>,
private readonly valkeyClient?: ValkeyClientLike,
private readonly valkeyOptions?: ValkeyCacheOptions,
) {}

static create(
Expand Down Expand Up @@ -49,20 +118,184 @@ export class CacheStorage {
return new CacheStorage(logger, undefined, new Map());
}

/**
* Create a CacheStorage backed by Valkey via iovalkey.
* Requires `iovalkey` to be installed as an optional dependency.
* Returns a disabled CacheStorage if the connection fails.
*/
static async createValkey(
options: ValkeyCacheOptions,
logger: Logger,
): Promise<CacheStorage> {
try {
const mod = (await import(
/* webpackIgnore: true */ /* @vite-ignore */ "iovalkey"
)) as unknown as IovalkeyModule;
const Valkey = mod.default;

if (options.username && !options.password) {
throw new Error(
"Valkey cache: username was provided without a password. " +
"Supply both username and password, or omit both.",
);
}

// Default TLS on when credentials are present to avoid plaintext transit.
const useTLS = options.useTls ?? !!options.password;
const port = options.port ?? 6379;

const iovalkeyOpts: Record<string, unknown> = {
host: options.host,
port,
...(options.password ? { password: options.password } : {}),
...(options.username ? { username: options.username } : {}),
...(useTLS ? { tls: {} } : {}),
commandTimeout: options.requestTimeout ?? 5000,
maxRetriesPerRequest: 3,
retryStrategy: (times: number): number | null =>
times > 5 ? null : Math.min(times * 500, 5000),
connectionName: "stagehand-cache",
lazyConnect: true,
};

const rawClient = new Valkey(iovalkeyOpts);
await rawClient.connect();

// Adapt iovalkey's API to ValkeyClientLike
const client: ValkeyClientLike = {
get: (key) => rawClient.get(key),
set: (key, value, setOpts?) => {
if (setOpts?.expiry) {
return rawClient.set(
key,
value,
setOpts.expiry.type,
setOpts.expiry.count,
);
}
return rawClient.set(key, value);
},
del: (keys) =>
keys.length > 0 ? rawClient.del(...keys) : Promise.resolve(0),
close: (): Promise<void> => rawClient.quit().then((): void => {}),
};

logger({
category: "cache",
message: `valkey cache connected to ${options.host}:${port}`,
level: 1,
});

return new CacheStorage(logger, undefined, undefined, client, options);
} catch (err) {
const safeMessage = err instanceof Error ? err.message : "unknown error";
logger({
category: "cache",
message: `unable to initialize valkey cache: ${safeMessage}`,
level: 1,
auxiliary: {
error: { value: safeMessage, type: "string" },
},
});
return new CacheStorage(logger);
}
}

get directory(): string | undefined {
return this.dir;
}

get enabled(): boolean {
return !!this.dir || !!this.memoryStore;
return !!this.dir || !!this.memoryStore || !!this.valkeyClient;
}

/** True if this storage is backed by a Valkey client. */
get isValkey(): boolean {
return !!this.valkeyClient;
}

/**
* Close the underlying Valkey client connection, if any.
* Safe to call multiple times or when no Valkey client is attached.
*/
async close(): Promise<void> {
if (this.valkeyClient) {
try {
await this.valkeyClient.close();
} catch (err) {
this.logger({
category: "cache",
message: `valkey close error (best-effort): ${err instanceof Error ? err.message : "unknown"}`,
level: 2,
});
}
}
}

private resolvePath(fileName: string): string | null {
if (!this.dir) return null;
return path.join(this.dir, fileName);
}

async readJson<T>(fileName: string): Promise<ReadJsonResult<T>> {
/**
* Derive the Valkey key from a cache fileName and explicit category.
* Strips any redundant category prefix from the fileName (e.g. "agent-")
* since the category is already encoded in the key namespace.
*/
private toValkeyKey(fileName: string, category: CacheCategory): string {
const prefix = this.valkeyOptions?.keyPrefix ?? "stagehand";
const base = fileName.replace(/\.json$/, "").replace(/^agent-/, "");
return `${prefix}:${category}:${base}`;
}

async readJson<T>(
fileName: string,
category: CacheCategory = "act",
): Promise<ReadJsonResult<T>> {
if (this.valkeyClient) {
const key = this.toValkeyKey(fileName, category);
try {
const raw = await this.valkeyClient.get(key);
if (raw === null) {
return { value: null };
}
try {
return { value: JSON.parse(raw) as T };
} catch (parseErr) {
// Corrupt data — delete the poisoned key so subsequent reads don't
// keep failing until TTL expiry.
this.logger({
category: "cache",
message: `valkey key ${key} contains corrupt JSON; deleting`,
level: 1,
auxiliary: {
error: { value: String(parseErr), type: "string" },
},
});
try {
await this.valkeyClient.del([key]);
} catch (delErr) {
this.logger({
category: "cache",
message: `valkey del error for corrupt key ${key} (best-effort): ${delErr instanceof Error ? delErr.message : "unknown"}`,
level: 2,
});
}
return { value: null, error: parseErr, path: key };
}
} catch (err) {
this.logger({
category: "cache",
message: `valkey read error for key ${key}`,
level: 1,
auxiliary: {
error: { value: String(err), type: "string" },
},
});
return { value: null, error: err, path: key };
}
}

if (this.memoryStore) {
if (!this.memoryStore.has(fileName)) {
return { value: null };
Expand All @@ -88,7 +321,49 @@ export class CacheStorage {
}
}

async writeJson(fileName: string, data: unknown): Promise<WriteJsonResult> {
async writeJson(
fileName: string,
data: unknown,
category: CacheCategory = "act",
): Promise<WriteJsonResult> {
if (this.valkeyClient) {
const key = this.toValkeyKey(fileName, category);
try {
const serialized = JSON.stringify(data);
const maxBytes = this.valkeyOptions?.maxCacheValueBytes ?? 5_242_880;
if (Buffer.byteLength(serialized, "utf8") > maxBytes) {
this.logger({
category: "cache",
message: `valkey write skipped: payload exceeds ${maxBytes} byte limit`,
level: 1,
});
return {
error: new Error("cache value exceeds size limit"),
path: key,
};
}
const ttl = this.valkeyOptions?.cacheTtl;
if (ttl !== undefined && ttl > 0) {
await this.valkeyClient.set(key, serialized, {
expiry: { type: "EX", count: ttl },
});
} else {
await this.valkeyClient.set(key, serialized);
}
return {};
} catch (err) {
this.logger({
category: "cache",
message: `valkey write error for key ${key}`,
level: 1,
auxiliary: {
error: { value: String(err), type: "string" },
},
});
return { error: err, path: key };
}
}

if (this.memoryStore) {
this.memoryStore.set(fileName, jsonClone(data));
return {};
Expand Down
Loading
Loading