From df3428154449d1ebfb1b160bc6344f442e07548e Mon Sep 17 00:00:00 2001 From: ghost <49853598+JSONbored@users.noreply.github.com> Date: Thu, 25 Jun 2026 16:07:56 -0700 Subject: [PATCH] fix(orb): bound relay retry fan-out --- src/orb/relay.ts | 43 +++++++++++++++++------------- test/integration/orb-relay.test.ts | 30 +++++++++++++++++++++ 2 files changed, 54 insertions(+), 19 deletions(-) diff --git a/src/orb/relay.ts b/src/orb/relay.ts index ab7c0ae87..4f6e88eef 100644 --- a/src/orb/relay.ts +++ b/src/orb/relay.ts @@ -83,6 +83,8 @@ export async function registerOrbRelay(env: Env, secret: string, relayUrl: strin } const RELAY_RETRY_MAX_ATTEMPTS = 5; +const RELAY_RETRY_BATCH_SIZE = 25; +const RELAY_RETRY_CONCURRENCY = 5; /** Record a failed relay forward in the retry queue. Idempotent on delivery_id — a duplicate insert (e.g. from a * GitHub redelivery reaching the same event before the retry fires) is silently ignored. */ @@ -109,28 +111,31 @@ export async function retryFailedRelays(env: Env, opts?: { fetchImpl?: typeof fe .run(); const { results } = await env.DB .prepare( - "SELECT delivery_id, event_name, installation_id, raw_body FROM orb_relay_failures WHERE expires_at >= datetime('now') AND attempts < ?", + "SELECT delivery_id, event_name, installation_id, raw_body FROM orb_relay_failures WHERE expires_at >= datetime('now') AND attempts < ? ORDER BY created_at, delivery_id LIMIT ?", ) - .bind(RELAY_RETRY_MAX_ATTEMPTS) + .bind(RELAY_RETRY_MAX_ATTEMPTS, RELAY_RETRY_BATCH_SIZE) .all<{ delivery_id: string; event_name: string; installation_id: number; raw_body: string }>(); if (!results.length) return; - await Promise.all( - results.map(async (row) => { - const result = await forwardOrbEvent( - env, - { eventName: row.event_name, installationId: row.installation_id, deliveryId: row.delivery_id, rawBody: row.raw_body }, - opts?.fetchImpl, - ); - if (result === "forwarded" || result === "skipped") { - await env.DB.prepare("DELETE FROM orb_relay_failures WHERE delivery_id = ?").bind(row.delivery_id).run(); - } else { - await env.DB - .prepare("UPDATE orb_relay_failures SET attempts = attempts + 1, last_attempt_at = datetime('now') WHERE delivery_id = ?") - .bind(row.delivery_id) - .run(); - } - }), - ); + + const retryRow = async (row: { delivery_id: string; event_name: string; installation_id: number; raw_body: string }) => { + const result = await forwardOrbEvent( + env, + { eventName: row.event_name, installationId: row.installation_id, deliveryId: row.delivery_id, rawBody: row.raw_body }, + opts?.fetchImpl, + ); + if (result === "forwarded" || result === "skipped") { + await env.DB.prepare("DELETE FROM orb_relay_failures WHERE delivery_id = ?").bind(row.delivery_id).run(); + } else { + await env.DB + .prepare("UPDATE orb_relay_failures SET attempts = attempts + 1, last_attempt_at = datetime('now') WHERE delivery_id = ?") + .bind(row.delivery_id) + .run(); + } + }; + + for (let i = 0; i < results.length; i += RELAY_RETRY_CONCURRENCY) { + await Promise.all(results.slice(i, i + RELAY_RETRY_CONCURRENCY).map(retryRow)); + } } /** Forward a webhook event to the brokered self-host registered for this installation. BEST-EFFORT + fail-safe: diff --git a/test/integration/orb-relay.test.ts b/test/integration/orb-relay.test.ts index 4cf7788c0..2ab477a5b 100644 --- a/test/integration/orb-relay.test.ts +++ b/test/integration/orb-relay.test.ts @@ -251,6 +251,36 @@ describe("retryFailedRelays", () => { expect(row ?? null).toBeNull(); // skipped = no longer applicable → cleaned up }); + it("PAGES retry work and bounds concurrent forwards", async () => { + const e = brokeredEnv(); + const secret = await enroll(e, 9500); + await registerOrbRelay(e, secret, "https://c.example/v1/orb/relay"); + for (let i = 0; i < 30; i += 1) { + await storeRelayFailure(e, { deliveryId: `retry-bulk-${String(i).padStart(2, "0")}`, eventName: "pull_request", installationId: 9500, rawBody: JSON.stringify({ n: i }) }); + } + + let active = 0; + let maxActive = 0; + let calls = 0; + const fetchFail = (async () => { + calls += 1; + active += 1; + maxActive = Math.max(maxActive, active); + await Promise.resolve(); + active -= 1; + return new Response("bad", { status: 503 }); + }) as typeof fetch; + + await retryFailedRelays(e, { fetchImpl: fetchFail }); + + expect(calls).toBe(25); + expect(maxActive).toBeLessThanOrEqual(5); + const attempted = await db(e).prepare("SELECT COUNT(*) AS n FROM orb_relay_failures WHERE attempts=1").first<{ n: number }>(); + const untouched = await db(e).prepare("SELECT COUNT(*) AS n FROM orb_relay_failures WHERE attempts=0").first<{ n: number }>(); + expect(attempted?.n).toBe(25); + expect(untouched?.n).toBe(5); + }); + it("PRUNES rows that have exhausted their attempt budget (attempts >= 5)", async () => { const e = brokeredEnv(); // Manually insert a row at the attempt ceiling.