Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 134 additions & 8 deletions packages/keiko-server/src/gitDelivery/agentOperationsRoutes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ import { join } from "node:path";
import { Readable } from "node:stream";
import type { IncomingMessage, ServerResponse } from "node:http";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { GitRepositoryAgentOperationRequest } from "@oscharko-dev/keiko-contracts";
import type {
GitRepositoryAgentOperationRequest,
GitRepositoryAgentOperationResponse,
} from "@oscharko-dev/keiko-contracts";
import { buildRedactor, createRunRegistry, type UiHandlerDeps } from "../index.js";
import type { GitProcessRunner } from "../gitRoutes.js";
import { matchRoute, type RouteContext } from "../routes.js";
import { createInMemoryUiStore, type UiStore } from "../store/index.js";
import {
handleGitAgentOperation,
handleGitAgentOperationWithDelegate,
IdempotencyCache,
} from "./agentOperationsRoutes.js";

let store: UiStore;
Expand Down Expand Up @@ -114,17 +118,12 @@ describe("POST /api/git/agent/operations", () => {
});

it("rejects extra top-level fields and credential-shaped strings", async () => {
expect(
await handleGitAgentOperation(ctx(request({ extra: true })), deps()),
).toMatchObject({
expect(await handleGitAgentOperation(ctx(request({ extra: true })), deps())).toMatchObject({
status: 400,
body: { status: "denied", denialReason: "bad-request" },
});
expect(
await handleGitAgentOperation(
ctx(request({ payload: { remote: "api_keyleak" } })),
deps(),
),
await handleGitAgentOperation(ctx(request({ payload: { remote: "api_keyleak" } })), deps()),
).toMatchObject({
status: 400,
body: { error: { code: "GIT_AGENT_OPERATION_FORBIDDEN_PAYLOAD" } },
Expand Down Expand Up @@ -231,3 +230,130 @@ describe("POST /api/git/agent/operations", () => {
});
});
});

type CacheEntry = Parameters<IdempotencyCache["set"]>[1];

describe("IdempotencyCache eviction", () => {
const response = { schemaVersion: "1" } as unknown as GitRepositoryAgentOperationResponse;
const settledEntry = (fingerprint: string): CacheEntry => ({ fingerprint, result: response });
const pendingEntry = (fingerprint: string): CacheEntry => ({
fingerprint,
pending: Promise.resolve(response),
});

it("evicts the least-recently-used settled entry once over the size cap", () => {
const cache = new IdempotencyCache({ maxEntries: 2, ttlMs: 1_000_000, now: (): number => 0 });
cache.set("a", settledEntry("fa"));
cache.set("b", settledEntry("fb"));
// Touch "a" so "b" becomes the least-recently-used entry.
expect(cache.get("a")?.fingerprint).toBe("fa");

cache.set("c", settledEntry("fc"));

expect(cache.size).toBe(2);
expect(cache.get("b")).toBeUndefined();
expect(cache.get("a")?.fingerprint).toBe("fa");
expect(cache.get("c")?.fingerprint).toBe("fc");
});

it("expires a settled entry once its TTL has elapsed", () => {
let clockMs = 0;
const cache = new IdempotencyCache({ maxEntries: 16, ttlMs: 1000, now: (): number => clockMs });
cache.set("k", settledEntry("fk"));

clockMs = 999;
expect(cache.get("k")?.fingerprint).toBe("fk");

clockMs = 1000;
expect(cache.get("k")).toBeUndefined();
expect(cache.size).toBe(0);
});

it("prunes expired settled entries when a new key is inserted", () => {
let clockMs = 0;
const cache = new IdempotencyCache({ maxEntries: 16, ttlMs: 1000, now: (): number => clockMs });
cache.set("old", settledEntry("fold"));

clockMs = 5000; // well past the TTL
cache.set("new", settledEntry("fnew")); // insertion triggers a prune sweep

expect(cache.size).toBe(1);
expect(cache.get("new")?.fingerprint).toBe("fnew");
});

it("never evicts an in-flight reservation to make room for a settled entry", () => {
const cache = new IdempotencyCache({ maxEntries: 1, ttlMs: 1_000_000, now: (): number => 0 });
cache.set("pending", pendingEntry("fp"));
cache.set("settled", settledEntry("fs"));

expect(cache.size).toBe(1);
expect(cache.get("pending")?.fingerprint).toBe("fp");
expect(cache.get("settled")).toBeUndefined();
});

it("retains concurrent in-flight reservations even beyond the cap", () => {
const cache = new IdempotencyCache({ maxEntries: 1, ttlMs: 1_000_000, now: (): number => 0 });
cache.set("p1", pendingEntry("f1"));
cache.set("p2", pendingEntry("f2"));

expect(cache.size).toBe(2);
expect(cache.get("p1")?.fingerprint).toBe("f1");
expect(cache.get("p2")?.fingerprint).toBe("f2");
});
});

describe("agent idempotency cache lifecycle through the handler", () => {
const execute = (idempotencyKey: string): GitRepositoryAgentOperationRequest =>
request({
operation: "branch-switch",
mode: "execute",
idempotencyKey,
payload: { branchName: "main" },
}) as unknown as GitRepositoryAgentOperationRequest;

it("replays a completed operation for a repeated key without re-delegating", async () => {
const cache = new IdempotencyCache({ maxEntries: 8, ttlMs: 1_000_000 });
const delegated = vi.fn(() => Promise.resolve({ status: 200, body: { ok: true } }));
const body = execute("replay-1");

const first = await handleGitAgentOperationWithDelegate(body, "fp", delegated, cache);
const second = await handleGitAgentOperationWithDelegate(body, "fp", delegated, cache);

expect(delegated).toHaveBeenCalledTimes(1);
expect(first.body).toMatchObject({ status: "delegated" });
expect(first.body).not.toMatchObject({ replay: true });
expect(second.body).toMatchObject({ status: "delegated", replay: true });
});

it("bounds the cache when many distinct keys are delegated", async () => {
const cache = new IdempotencyCache({ maxEntries: 3, ttlMs: 1_000_000 });
const delegated = vi.fn(() => Promise.resolve({ status: 200, body: { ok: true } }));

for (let i = 0; i < 10; i += 1) {
await handleGitAgentOperationWithDelegate(
execute(`bound-${String(i)}`),
`fp-${String(i)}`,
delegated,
cache,
);
}

expect(delegated).toHaveBeenCalledTimes(10);
expect(cache.size).toBe(3);
});

it("re-delegates once the replay entry has expired from the cache", async () => {
let clockMs = 0;
const cache = new IdempotencyCache({ maxEntries: 8, ttlMs: 1000, now: (): number => clockMs });
const delegated = vi.fn(() => Promise.resolve({ status: 200, body: { ok: true } }));
const body = execute("ttl-1");

await handleGitAgentOperationWithDelegate(body, "fp", delegated, cache);
clockMs = 5000; // past the replay window
const after = await handleGitAgentOperationWithDelegate(body, "fp", delegated, cache);

expect(delegated).toHaveBeenCalledTimes(2);
expect(after.body).toMatchObject({ status: "delegated" });
expect(after.body).not.toMatchObject({ replay: true });
});
});
120 changes: 105 additions & 15 deletions packages/keiko-server/src/gitDelivery/agentOperationsRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,97 @@ interface IdempotencyEntry {
readonly result?: GitRepositoryAgentOperationResponse;
}

const idempotencyCache = new Map<string, IdempotencyEntry>();
// Defaults for the process-memory idempotency cache. The cap bounds worst-case memory against a client
// that streams many distinct idempotency keys; the TTL lets settled replay entries self-evict so the
// map self-cleans even for keys that are never queried again.
export const DEFAULT_IDEMPOTENCY_MAX_ENTRIES = 1024;
export const DEFAULT_IDEMPOTENCY_TTL_MS = 10 * 60 * 1000;

export interface IdempotencyCacheOptions {
readonly maxEntries?: number;
readonly ttlMs?: number;
readonly now?: () => number;
}

interface StoredEntry {
readonly entry: IdempotencyEntry;
// Wall-clock expiry, enforced only once the entry holds a settled result. A pending reservation is a
// short-lived guard for an in-flight delegation; it is never TTL-pruned or LRU-evicted so a duplicate
// request can never re-trigger an operation that is still running (idempotency is preserved exactly).
readonly expiresAt: number;
}

// Bounded LRU + TTL store for the agent-facade idempotency replay window. Exposes the Map subset the
// handler relies on (get / set / delete) plus `size` for tests. Overflow eviction targets the
// least-recently-used *settled* entry only; in-flight reservations are exempt.
export class IdempotencyCache {
private readonly entries = new Map<string, StoredEntry>();
private readonly maxEntries: number;
private readonly ttlMs: number;
private readonly now: () => number;

public constructor(options: IdempotencyCacheOptions = {}) {
this.maxEntries = Math.max(
1,
Math.floor(options.maxEntries ?? DEFAULT_IDEMPOTENCY_MAX_ENTRIES),
);
this.ttlMs = Math.max(1, Math.floor(options.ttlMs ?? DEFAULT_IDEMPOTENCY_TTL_MS));
this.now = options.now ?? Date.now;
}

public get size(): number {
return this.entries.size;
}

public get(key: string): IdempotencyEntry | undefined {
const stored = this.entries.get(key);
if (stored === undefined) return undefined;
if (stored.entry.result !== undefined && this.now() >= stored.expiresAt) {
this.entries.delete(key);
return undefined;
}
// Refresh LRU recency on a hit by reinserting at the tail; the TTL window is unchanged.
this.entries.delete(key);
this.entries.set(key, stored);
return stored.entry;
}

public set(key: string, entry: IdempotencyEntry): void {
this.entries.delete(key);
this.pruneExpired();
this.entries.set(key, { entry, expiresAt: this.now() + this.ttlMs });
this.evictOverflow();
}

public delete(key: string): void {
this.entries.delete(key);
}

private pruneExpired(): void {
const now = this.now();
for (const [key, stored] of this.entries) {
if (stored.entry.result !== undefined && now >= stored.expiresAt) {
this.entries.delete(key);
}
}
}

private evictOverflow(): void {
while (this.entries.size > this.maxEntries) {
let victim: string | undefined;
for (const [key, stored] of this.entries) {
if (stored.entry.pending === undefined) {
victim = key;
break;
}
}
if (victim === undefined) break; // every entry is an in-flight reservation — cannot safely evict
this.entries.delete(victim);
}
}
}

const idempotencyCache = new IdempotencyCache();

function denied(
request: Partial<GitRepositoryAgentOperationRequest>,
Expand All @@ -88,7 +178,9 @@ function denied(
};
}

async function readParsed(req: IncomingMessage): Promise<
async function readParsed(
req: IncomingMessage,
): Promise<
| { readonly ok: true; readonly value: unknown }
| { readonly ok: false; readonly result: RouteResult }
> {
Expand Down Expand Up @@ -341,16 +433,16 @@ function normalizedForDigest(value: unknown): unknown {
}

function fingerprintRequest(request: GitRepositoryAgentOperationRequest): string {
return createHash("sha256").update(JSON.stringify(normalizedForDigest(request))).digest("hex");
return createHash("sha256")
.update(JSON.stringify(normalizedForDigest(request)))
.digest("hex");
}

function deniedStatus(reason: GitRepositoryAgentDenialReason): number {
return reason === "unsupported-direct-shell" ? 200 : 400;
}

async function parseAgentRequest(
req: IncomingMessage,
): Promise<
async function parseAgentRequest(req: IncomingMessage): Promise<
| {
readonly ok: true;
readonly request: GitRepositoryAgentOperationRequest;
Expand Down Expand Up @@ -390,10 +482,7 @@ function idempotencyConflict(request: GitRepositoryAgentOperationRequest): Route
};
}

function responseResult(
body: GitRepositoryAgentOperationResponse,
replay = false,
): RouteResult {
function responseResult(body: GitRepositoryAgentOperationResponse, replay = false): RouteResult {
const response = body.status === "delegated" && replay ? { ...body, replay: true } : body;
return {
status: body.status === "delegated" ? body.routeStatus : 200,
Expand All @@ -415,27 +504,28 @@ export async function handleGitAgentOperationWithDelegate(
request: GitRepositoryAgentOperationRequest,
fingerprint: string,
delegate: () => Promise<RouteResult>,
cache: IdempotencyCache = idempotencyCache,
): Promise<RouteResult> {
const key = cacheKey(request);
if (key === undefined) {
const result = await delegate();
return { status: result.status, body: wrapDelegated(request, result) };
}
const cached = idempotencyCache.get(key);
const cached = cache.get(key);
if (cached !== undefined) {
if (cached.fingerprint !== fingerprint) return idempotencyConflict(request);
if (cached.result !== undefined) return responseResult(cached.result, true);
if (cached.pending !== undefined) return responseResult(await cached.pending, true);
}
const pending = delegate().then((result) => wrapDelegated(request, result));
idempotencyCache.set(key, { fingerprint, pending });
cache.set(key, { fingerprint, pending });
try {
const body = await pending;
idempotencyCache.set(key, { fingerprint, result: body });
cache.set(key, { fingerprint, result: body });
return responseResult(body);
} catch (error) {
const current = idempotencyCache.get(key);
if (current?.pending === pending) idempotencyCache.delete(key);
const current = cache.get(key);
if (current?.pending === pending) cache.delete(key);
throw error;
}
}
Expand Down
41 changes: 41 additions & 0 deletions packages/keiko-server/src/gitDelivery/requestGuards.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { describe, expect, it } from "vitest";
import { GIT_DELIVERY_PATHSPEC_CONTROL_CHAR, isContainedPathspec } from "./requestGuards.js";

describe("isContainedPathspec", () => {
it("accepts contained relative pathspecs", () => {
expect(isContainedPathspec("src/index.ts")).toBe(true);
expect(isContainedPathspec("a/b/c.txt")).toBe(true);
expect(isContainedPathspec("file.with.dots")).toBe(true);
expect(isContainedPathspec("dir/my file.txt")).toBe(true); // a literal space is a valid filename char
});

it("rejects C0 control characters (TAB, LF, CR)", () => {
expect(isContainedPathspec("src/\tindex.ts")).toBe(false);
expect(isContainedPathspec("src/index.ts\n")).toBe(false);
expect(isContainedPathspec("src\r/index.ts")).toBe(false);
});

it("rejects the NUL byte and DEL alongside other C0 controls", () => {
expect(isContainedPathspec("src/\0index.ts")).toBe(false);
expect(isContainedPathspec("src/\x7findex.ts")).toBe(false);
expect(isContainedPathspec("\x01\x02\x1f")).toBe(false);
});

it("keeps rejecting the pre-existing unsafe shapes", () => {
expect(isContainedPathspec("")).toBe(false);
expect(isContainedPathspec("-flag")).toBe(false);
expect(isContainedPathspec("/absolute")).toBe(false);
expect(isContainedPathspec("C:\\windows")).toBe(false);
expect(isContainedPathspec("../escape")).toBe(false);
expect(isContainedPathspec(42)).toBe(false);
});

it("matches the C0 control range symmetrically with the network-ref guard", () => {
expect(GIT_DELIVERY_PATHSPEC_CONTROL_CHAR.test("\t")).toBe(true);
expect(GIT_DELIVERY_PATHSPEC_CONTROL_CHAR.test("\n")).toBe(true);
expect(GIT_DELIVERY_PATHSPEC_CONTROL_CHAR.test("\r")).toBe(true);
expect(GIT_DELIVERY_PATHSPEC_CONTROL_CHAR.test("\x00")).toBe(true);
expect(GIT_DELIVERY_PATHSPEC_CONTROL_CHAR.test("\x7f")).toBe(true);
expect(GIT_DELIVERY_PATHSPEC_CONTROL_CHAR.test("ok")).toBe(false);
});
});
Binary file modified packages/keiko-server/src/gitDelivery/requestGuards.ts
Binary file not shown.
Loading