Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions src/orb/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ export async function registerOrbRelay(env: Env, secret: string, relayUrl: strin
}

const RELAY_RETRY_MAX_ATTEMPTS = 5;
const RELAY_RETRY_BATCH_SIZE = 25;
const RELAY_RETRY_CONCURRENCY = 5;

/** Record a failed relay forward in the retry queue. Idempotent on delivery_id — a duplicate insert (e.g. from a
* GitHub redelivery reaching the same event before the retry fires) is silently ignored. */
Expand All @@ -109,28 +111,31 @@ export async function retryFailedRelays(env: Env, opts?: { fetchImpl?: typeof fe
.run();
const { results } = await env.DB
.prepare(
"SELECT delivery_id, event_name, installation_id, raw_body FROM orb_relay_failures WHERE expires_at >= datetime('now') AND attempts < ?",
"SELECT delivery_id, event_name, installation_id, raw_body FROM orb_relay_failures WHERE expires_at >= datetime('now') AND attempts < ? ORDER BY created_at, delivery_id LIMIT ?",
)
.bind(RELAY_RETRY_MAX_ATTEMPTS)
.bind(RELAY_RETRY_MAX_ATTEMPTS, RELAY_RETRY_BATCH_SIZE)
.all<{ delivery_id: string; event_name: string; installation_id: number; raw_body: string }>();
if (!results.length) return;
await Promise.all(
results.map(async (row) => {
const result = await forwardOrbEvent(
env,
{ eventName: row.event_name, installationId: row.installation_id, deliveryId: row.delivery_id, rawBody: row.raw_body },
opts?.fetchImpl,
);
if (result === "forwarded" || result === "skipped") {
await env.DB.prepare("DELETE FROM orb_relay_failures WHERE delivery_id = ?").bind(row.delivery_id).run();
} else {
await env.DB
.prepare("UPDATE orb_relay_failures SET attempts = attempts + 1, last_attempt_at = datetime('now') WHERE delivery_id = ?")
.bind(row.delivery_id)
.run();
}
}),
);

const retryRow = async (row: { delivery_id: string; event_name: string; installation_id: number; raw_body: string }) => {
const result = await forwardOrbEvent(
env,
{ eventName: row.event_name, installationId: row.installation_id, deliveryId: row.delivery_id, rawBody: row.raw_body },
opts?.fetchImpl,
);
if (result === "forwarded" || result === "skipped") {
await env.DB.prepare("DELETE FROM orb_relay_failures WHERE delivery_id = ?").bind(row.delivery_id).run();
} else {
await env.DB
.prepare("UPDATE orb_relay_failures SET attempts = attempts + 1, last_attempt_at = datetime('now') WHERE delivery_id = ?")
.bind(row.delivery_id)
.run();
}
};

for (let i = 0; i < results.length; i += RELAY_RETRY_CONCURRENCY) {
await Promise.all(results.slice(i, i + RELAY_RETRY_CONCURRENCY).map(retryRow));
}
}

/** Forward a webhook event to the brokered self-host registered for this installation. BEST-EFFORT + fail-safe:
Expand Down
30 changes: 30 additions & 0 deletions test/integration/orb-relay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,36 @@ describe("retryFailedRelays", () => {
expect(row ?? null).toBeNull(); // skipped = no longer applicable → cleaned up
});

it("PAGES retry work and bounds concurrent forwards", async () => {
const e = brokeredEnv();
const secret = await enroll(e, 9500);
await registerOrbRelay(e, secret, "https://c.example/v1/orb/relay");
for (let i = 0; i < 30; i += 1) {
await storeRelayFailure(e, { deliveryId: `retry-bulk-${String(i).padStart(2, "0")}`, eventName: "pull_request", installationId: 9500, rawBody: JSON.stringify({ n: i }) });
}

let active = 0;
let maxActive = 0;
let calls = 0;
const fetchFail = (async () => {
calls += 1;
active += 1;
maxActive = Math.max(maxActive, active);
await Promise.resolve();
active -= 1;
return new Response("bad", { status: 503 });
}) as typeof fetch;

await retryFailedRelays(e, { fetchImpl: fetchFail });

expect(calls).toBe(25);
expect(maxActive).toBeLessThanOrEqual(5);
const attempted = await db(e).prepare("SELECT COUNT(*) AS n FROM orb_relay_failures WHERE attempts=1").first<{ n: number }>();
const untouched = await db(e).prepare("SELECT COUNT(*) AS n FROM orb_relay_failures WHERE attempts=0").first<{ n: number }>();
expect(attempted?.n).toBe(25);
expect(untouched?.n).toBe(5);
});

it("PRUNES rows that have exhausted their attempt budget (attempts >= 5)", async () => {
const e = brokeredEnv();
// Manually insert a row at the attempt ceiling.
Expand Down
Loading