From 7f743ccfe701087bfbd79d0244b1cc6733e928bf Mon Sep 17 00:00:00 2001 From: Jason Schrader Date: Wed, 15 Apr 2026 12:25:55 -0700 Subject: [PATCH 1/3] feat(core)!: two-phase broadcast status + reconcile grace window (closes #25) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the two residual state-confidence gaps that blocked the relay sponsor-ledger refactor from claiming mainnet-grade state confidence. **Gap 1 — two-phase broadcast lifecycle** - `LedgerEntryStatusSchema`: `pending_broadcast` | `broadcast_sent` | `broadcast_failed` - `SponsorLedgerEntry.status` is now required; schema rejects entries without it - `beginPendingBroadcast(ledger, input)` writes the ledger before the network call; refuses to overwrite an unresolved `pending_broadcast` - `resolveBroadcast(ledger, nonce, "sent" | "failed")` transitions on return; throws `LedgerTransitionError` on invalid transitions (no double-resolve) - `decideBroadcast` returns `{ kind: "await_pending_broadcast", nonce, txId }` when the current entry is unresolved — forces the consumer to resolve the prior call before another broadcast can fire **Gap 2 — reconcile grace window** - `reconcile()` accepts `justBroadcastGraceSeconds` (default 30) - Ledger entries absent from the mempool but broadcast within the window are surfaced as `inFlightPendingIndex` instead of `dropped` — covers node → Hiro indexer propagation lag - `reconcile()` promotes `pending_broadcast` → `broadcast_sent` automatically when the mempool confirms the txId (crash recovery) **Classification stays closed** `OccupantClassification` is NOT extended. Grace-window ambiguity requires ledger + clock context that `classifyOccupant` has neither of by design; leaking it there would make every caller grace-aware for a concern only `reconcile` can answer. The pending state is a ledger lifecycle concern, not an occupant kind — the existing `sponsor_owned_in_ledger` match handles mempool-confirmed pending entries correctly. **Breaking change** Existing ledger entries persisted under 0.8.0 lack `status` and will fail schema parse. Consumers must backfill `status: "broadcast_sent"` on migration — targets `0.9.0` minor. Co-Authored-By: Claude Opus 4.6 (1M context) --- README.md | 65 ++++++ src/core/sponsor-ledger.ts | 16 ++ src/core/sponsor-wallet-machine.ts | 196 +++++++++++++++- tests/sponsor-wallet-machine.test.ts | 325 +++++++++++++++++++++++---- 4 files changed, 551 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index ee63c8e..98167cf 100644 --- a/README.md +++ b/README.md @@ -153,3 +153,68 @@ const { wallet: next, ledger: nextLedger, adopted, dropped } = reconcile( All helpers are pure: inputs → new state. No I/O, and time-sensitive helpers accept an injectable `now` option for deterministic tests; otherwise they use the current time by default. + +### Canonical write pattern: two-phase broadcast + +`LedgerEntry.status` captures whether a broadcast has round-tripped with the +node. The two-phase contract closes the edge-terminator / crash window where a +single-phase ledger write could claim `broadcast_sent` for a tx the node never +saw (or lose track of a tx the node did accept). + +``` +(no entry) → pending_broadcast [beginPendingBroadcast] +pending_broadcast → broadcast_sent [resolveBroadcast("sent") | reconcile] +pending_broadcast → broadcast_failed [resolveBroadcast("failed")] +broadcast_sent → pending_broadcast [new RBF attempt, new txId] +broadcast_failed → pending_broadcast [retry, new txId] +``` + +Write the ledger **before** the network call, resolve on return: + +```ts +import { + beginPendingBroadcast, + resolveBroadcast, + reconcile, +} from "@aibtc/tx-schemas/core"; + +let ledger = beginPendingBroadcast(ledger, { + nonce, + txId, + fee, +}); + +try { + const outcome = await broadcastTransaction(signedTx); + ledger = resolveBroadcast(ledger, nonce, "sent", { lastOutcome: outcome }); +} catch (err) { + ledger = resolveBroadcast(ledger, nonce, "failed"); + throw err; +} +``` + +`decideBroadcast` refuses to issue a new decision while the entry is +`pending_broadcast` — it returns `{ kind: "await_pending_broadcast", nonce, +txId }` so the consumer resolves the prior call before a second broadcast +can fire. + +`reconcile()` sweeps survivors of crashes that dropped the resolve step: + +- A `pending_broadcast` entry whose txId appears in the mempool is promoted + to `broadcast_sent` automatically. +- A `pending_broadcast` entry absent from the mempool within + `justBroadcastGraceSeconds` (default 30) is classified as + `inFlightPendingIndex` — node may have accepted it; indexer just hasn't + caught up. +- Past the grace window with no mempool hit, the entry is reported as + `dropped` for caller inspection. + +```ts +const { ledger: next, adopted, dropped, inFlightPendingIndex } = reconcile( + wallet, + ledger, + mempoolReadByNonce, + sponsorAddress, + { justBroadcastGraceSeconds: 30 } +); +``` diff --git a/src/core/sponsor-ledger.ts b/src/core/sponsor-ledger.ts index 989ddbe..37bf4dc 100644 --- a/src/core/sponsor-ledger.ts +++ b/src/core/sponsor-ledger.ts @@ -14,12 +14,28 @@ import { // One entry per (sponsorAddress, nonce). Relays persist this so they can // distinguish their own prior broadcast (RBF path) from a foreign occupant // (quarantine path) when stacks-core reports a nonce conflict. +// +// Lifecycle (two-phase broadcast): +// (no entry) → pending_broadcast [beginPendingBroadcast] +// pending_broadcast → broadcast_sent [resolveBroadcast / reconcile] +// pending_broadcast → broadcast_failed [resolveBroadcast] +// broadcast_sent → pending_broadcast [new RBF attempt, new txId] +// broadcast_failed → pending_broadcast [retry, new txId] // --------------------------------------------------------------------------- +export const LedgerEntryStatusSchema = z.enum([ + "pending_broadcast", + "broadcast_sent", + "broadcast_failed", +]); + +export type LedgerEntryStatus = z.infer; + export const SponsorLedgerEntrySchema = z.object({ nonce: NonNegativeIntegerSchema, txId: TransactionIdSchema, fee: AmountStringSchema, + status: LedgerEntryStatusSchema, broadcastAt: IsoDateTimeSchema, rbfAttempts: NonNegativeIntegerSchema, lastOutcome: NodeBroadcastOutcomeSchema.optional(), diff --git a/src/core/sponsor-wallet-machine.ts b/src/core/sponsor-wallet-machine.ts index 088d513..d60aea3 100644 --- a/src/core/sponsor-wallet-machine.ts +++ b/src/core/sponsor-wallet-machine.ts @@ -7,8 +7,13 @@ import { StacksAddressSchema, TransactionIdSchema, } from "./primitives.js"; -import { getLedgerEntry, SponsorLedgerEntrySchema } from "./sponsor-ledger.js"; +import { + getLedgerEntry, + LedgerEntryStatusSchema, + SponsorLedgerEntrySchema, +} from "./sponsor-ledger.js"; import type { + LedgerEntryStatus, SponsorLedger, SponsorLedgerEntry, } from "./sponsor-ledger.js"; @@ -93,12 +98,19 @@ const TerminalDecisionSchema = z.object({ reason: FailedTerminalReasonSchema, }); +const AwaitPendingBroadcastDecisionSchema = z.object({ + kind: z.literal("await_pending_broadcast"), + nonce: NonNegativeIntegerSchema, + txId: TransactionIdSchema, +}); + export const BroadcastDecisionSchema = z.discriminatedUnion("kind", [ FirstBroadcastDecisionSchema, RbfWithFeeDecisionSchema, AdoptThenRbfDecisionSchema, QuarantineDecisionSchema, TerminalDecisionSchema, + AwaitPendingBroadcastDecisionSchema, ]); export type BroadcastDecision = z.infer; @@ -133,6 +145,7 @@ export function classifyOccupant( nonce: entry.nonce, txId: entry.txId, fee: entry.fee, + status: entry.status, broadcastAt: entry.broadcastAt, rbfAttempts: entry.rbfAttempts, }, @@ -159,7 +172,8 @@ export function classifyOccupant( // // Maps a (walletCapacity, nodeBroadcastOutcome, ledger-for-nonce, occupant) // tuple to the next action: first broadcast, RBF, adopt-then-RBF, quarantine, -// or terminal. Pure — no I/O, no time source. +// terminal, or await-pending (the prior broadcast hasn't been resolved yet). +// Pure — no I/O, no time source. // --------------------------------------------------------------------------- export interface BroadcastContext { @@ -180,6 +194,18 @@ export function decideBroadcast( const { nonce, ledger, occupant } = context; const maxAttempts = context.maxRbfAttempts ?? MAX_RBF_ATTEMPTS; const ledgerEntry = getLedgerEntry(ledger, nonce); + + // Hard invariant: a pending_broadcast ledger entry means a prior broadcast + // call hasn't been resolved. Issuing a new decision on top would risk + // double-broadcast. Force the consumer to resolveBroadcast() first. + if (ledgerEntry?.status === "pending_broadcast") { + return { + kind: "await_pending_broadcast", + nonce, + txId: ledgerEntry.txId, + }; + } + const occupied = wallet.occupiedNonces.find((o) => o.nonce === nonce); const attempts = occupied?.rbfAttempts ?? ledgerEntry?.rbfAttempts ?? 0; @@ -286,6 +312,111 @@ function decideSponsorConflict(args: { const bump = (fee: string): string => (BigInt(fee) + 1n).toString(); +// --------------------------------------------------------------------------- +// Two-phase broadcast lifecycle +// +// beginPendingBroadcast → (network call) → resolveBroadcast +// +// The ledger is written BEFORE the network call so a crash between write +// and call return never produces a ledger entry claiming sent-to-node when +// the node never saw the tx. On the return path, resolveBroadcast promotes +// pending → broadcast_sent or → broadcast_failed. If the process dies in +// between, reconcile() sweeps stragglers using the grace window. +// --------------------------------------------------------------------------- + +export class LedgerTransitionError extends Error { + constructor(message: string) { + super(message); + this.name = "LedgerTransitionError"; + } +} + +export interface BeginPendingBroadcastInput { + nonce: number; + txId: string; + fee: string; + broadcastAt?: Date; + rbfAttempts?: number; +} + +// Valid predecessors for entering pending_broadcast at a nonce. A fresh nonce +// (no entry) is allowed; prior terminal states (sent/failed) are allowed +// because they represent a completed lifecycle that the caller is extending +// with a new txId (RBF or retry). pending_broadcast is NOT a valid predecessor +// — the prior call must be resolved first. +const PENDING_PREDECESSORS = new Set([ + "none", + "broadcast_sent", + "broadcast_failed", +]); + +export function beginPendingBroadcast( + ledger: SponsorLedger, + input: BeginPendingBroadcastInput +): SponsorLedger { + const { nonce, txId, fee } = input; + const existing = getLedgerEntry(ledger, nonce); + const predecessor: LedgerEntryStatus | "none" = existing?.status ?? "none"; + + if (!PENDING_PREDECESSORS.has(predecessor)) { + throw new LedgerTransitionError( + `cannot begin pending_broadcast for nonce ${nonce}: prior status is ${predecessor}. resolveBroadcast() must be called first.` + ); + } + + const broadcastAt = (input.broadcastAt ?? new Date()).toISOString(); + const rbfAttempts = input.rbfAttempts ?? existing?.rbfAttempts ?? 0; + + const entry: SponsorLedgerEntry = { + nonce, + txId, + fee, + status: "pending_broadcast", + broadcastAt, + rbfAttempts, + }; + + return { + ...ledger, + entries: { ...ledger.entries, [String(nonce)]: entry }, + }; +} + +export type ResolveBroadcastOutcome = "sent" | "failed"; + +export function resolveBroadcast( + ledger: SponsorLedger, + nonce: number, + outcome: ResolveBroadcastOutcome, + options: { lastOutcome?: NodeBroadcastOutcome } = {} +): SponsorLedger { + const entry = getLedgerEntry(ledger, nonce); + if (!entry) { + throw new LedgerTransitionError( + `cannot resolve nonce ${nonce}: no ledger entry exists.` + ); + } + if (entry.status !== "pending_broadcast") { + throw new LedgerTransitionError( + `cannot resolve nonce ${nonce}: current status is ${entry.status}, expected pending_broadcast.` + ); + } + + const status: LedgerEntryStatus = + outcome === "sent" ? "broadcast_sent" : "broadcast_failed"; + + const resolved: SponsorLedgerEntry = { + ...entry, + status, + ...(options.lastOutcome !== undefined && { lastOutcome: options.lastOutcome }), + }; + + return { + ...ledger, + entries: { ...ledger.entries, [String(nonce)]: resolved }, + }; +} + // --------------------------------------------------------------------------- // adoptOrphan // @@ -377,17 +508,35 @@ export function quarantine( // // Folds one address-filtered mempool read into wallet + ledger: // - adopts sponsor-owned tx_ids missing from the ledger -// - flags ledger entries that are no longer visible in the mempool +// - promotes pending_broadcast entries to broadcast_sent on mempool hit +// - classifies ledger entries not (yet) seen in the mempool: +// * within the just-broadcast grace window → inFlightPendingIndex +// * past the grace window → dropped // -// Returns the updated wallet + ledger plus the adopted/dropped nonce lists so -// the caller can log/alert. Pure — `now` is injectable. +// Returns the updated wallet + ledger plus classification lists so the caller +// can log/alert. Pure — `now` is injectable. // --------------------------------------------------------------------------- +export const DEFAULT_JUST_BROADCAST_GRACE_SECONDS = 30; + +export interface ReconcileOptions { + now?: Date; + // Ledger entries broadcast within this many seconds are classified as + // `inFlightPendingIndex` rather than `dropped` when absent from the + // mempool read. Covers node→indexer propagation lag (~6-10 Nakamoto + // blocks + Hiro indexing). Default 30s. + justBroadcastGraceSeconds?: number; +} + export interface ReconcileResult { wallet: WalletCapacity; ledger: SponsorLedger; adopted: number[]; dropped: number[]; + // Ledger entries absent from the mempool but still within the grace + // window. Callers should NOT treat these as missing — the node accepted + // the broadcast; the indexer just hasn't caught up. + inFlightPendingIndex: number[]; // Orphans we saw but couldn't price (no fee_rate on the Hiro view). // Callers should quarantine/alert rather than assume a safe RBF baseline. unpriceableOrphans: number[]; @@ -398,16 +547,24 @@ export function reconcile( ledger: SponsorLedger, mempoolReadByNonce: Record, ourSponsorAddress: string, - options: { now?: Date } = {} + options: ReconcileOptions = {} ): ReconcileResult { const now = options.now ?? new Date(); const nowIso = now.toISOString(); + const nowMs = now.getTime(); + const graceMs = + (options.justBroadcastGraceSeconds ?? DEFAULT_JUST_BROADCAST_GRACE_SECONDS) * + 1000; + let nextWallet = wallet; let nextLedger = ledger; const adopted: number[] = []; const dropped: number[] = []; + const inFlightPendingIndex: number[] = []; const unpriceableOrphans: number[] = []; + // Pass 1: adopt orphans, promote pending_broadcast → broadcast_sent when + // the mempool confirms our txId. for (const [nonceStr, hiroTx] of Object.entries(mempoolReadByNonce)) { const nonce = Number(nonceStr); if (!Number.isInteger(nonce) || nonce < 0) continue; @@ -419,6 +576,21 @@ export function reconcile( nextLedger, nonce ); + + if (classification.kind === "sponsor_owned_in_ledger") { + const entry = getLedgerEntry(nextLedger, nonce); + if (entry?.status === "pending_broadcast") { + nextLedger = { + ...nextLedger, + entries: { + ...nextLedger.entries, + [String(nonce)]: { ...entry, status: "broadcast_sent" }, + }, + }; + } + continue; + } + if (classification.kind !== "sponsor_owned_orphan") continue; if (hiroTx.fee_rate === undefined) { @@ -435,6 +607,7 @@ export function reconcile( nonce, txId: hiroTx.tx_id, fee: hiroTx.fee_rate, + status: "broadcast_sent", broadcastAt: nowIso, rbfAttempts: 0, }, @@ -443,11 +616,19 @@ export function reconcile( adopted.push(nonce); } + // Pass 2: classify absences. An entry whose nonce was included in the + // mempool read but whose txId is missing (null or mismatched) is either + // still-propagating (within grace) or truly dropped. for (const [nonceStr, entry] of Object.entries(ledger.entries)) { const nonce = Number(nonceStr); if (!(nonce in mempoolReadByNonce)) continue; const observed = mempoolReadByNonce[nonce]; - if (!observed || observed.tx_id !== entry.txId) { + if (observed && observed.tx_id === entry.txId) continue; + + const ageMs = nowMs - Date.parse(entry.broadcastAt); + if (ageMs >= 0 && ageMs < graceMs) { + inFlightPendingIndex.push(entry.nonce); + } else { dropped.push(entry.nonce); } } @@ -457,6 +638,7 @@ export function reconcile( ledger: nextLedger, adopted, dropped, + inFlightPendingIndex, unpriceableOrphans, }; } diff --git a/tests/sponsor-wallet-machine.test.ts b/tests/sponsor-wallet-machine.test.ts index 33fd5e7..82f18f4 100644 --- a/tests/sponsor-wallet-machine.test.ts +++ b/tests/sponsor-wallet-machine.test.ts @@ -1,19 +1,23 @@ import { describe, expect, it } from "vitest"; import { adoptOrphan, + beginPendingBroadcast, BroadcastDecisionSchema, classifyOccupant, decideBroadcast, HiroSponsorTxViewSchema, + LedgerTransitionError, MAX_RBF_ATTEMPTS, OccupantClassificationSchema, quarantine, RELAY_CHAINING_LIMIT, reconcile, + resolveBroadcast, SponsorLedgerSchema, WalletCapacitySchema, type HiroSponsorTxView, type SponsorLedger, + type SponsorLedgerEntry, type WalletCapacity, } from "../src/index.js"; @@ -25,6 +29,8 @@ const SPONSOR = "SP1KGHF33Y4M7Q87WNRXQBCAP2Y6DBSQSHJHQH4T"; const STRANGER = "SP2STRANGERXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"; const TX_OURS = "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; +const TX_OURS_2 = + "0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd"; const TX_FOREIGN = "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; const TX_ORPHAN = @@ -51,6 +57,18 @@ function makeLedger(entries: SponsorLedger["entries"] = {}): SponsorLedger { return { sponsorAddress: SPONSOR, entries }; } +function makeEntry(overrides: Partial = {}): SponsorLedgerEntry { + return { + nonce: 105, + txId: TX_OURS, + fee: "5000", + status: "broadcast_sent", + broadcastAt: NOW.toISOString(), + rbfAttempts: 0, + ...overrides, + }; +} + function hiroTx(overrides: Partial = {}): HiroSponsorTxView { return HiroSponsorTxViewSchema.parse({ tx_id: TX_OURS, @@ -65,7 +83,7 @@ function hiroTx(overrides: Partial = {}): HiroSponsorTxView { } // --------------------------------------------------------------------------- -// SponsorLedgerSchema — key/nonce drift guard +// SponsorLedgerSchema — key/nonce drift guard + required status // --------------------------------------------------------------------------- describe("SponsorLedgerSchema", () => { @@ -73,17 +91,20 @@ describe("SponsorLedgerSchema", () => { const result = SponsorLedgerSchema.safeParse({ sponsorAddress: SPONSOR, entries: { - "104": { - nonce: 105, - txId: TX_OURS, - fee: "5000", - broadcastAt: NOW.toISOString(), - rbfAttempts: 0, - }, + "104": makeEntry({ nonce: 105 }), }, }); expect(result.success).toBe(false); }); + + it("rejects ledger entries that omit status", () => { + const { status: _status, ...rest } = makeEntry(); + const result = SponsorLedgerSchema.safeParse({ + sponsorAddress: SPONSOR, + entries: { "105": rest }, + }); + expect(result.success).toBe(false); + }); }); // --------------------------------------------------------------------------- @@ -99,18 +120,13 @@ describe("classifyOccupant", () => { it("classifies our sponsor-owned tx as in-ledger when it matches", () => { const ledger = makeLedger({ - "105": { - nonce: 105, - txId: TX_OURS, - fee: "5000", - broadcastAt: NOW.toISOString(), - rbfAttempts: 1, - }, + "105": makeEntry({ rbfAttempts: 1 }), }); const result = classifyOccupant(hiroTx(), SPONSOR, ledger, 105); expect(result.kind).toBe("sponsor_owned_in_ledger"); if (result.kind === "sponsor_owned_in_ledger") { expect(result.ledgerEntry.txId).toBe(TX_OURS); + expect(result.ledgerEntry.status).toBe("broadcast_sent"); } }); @@ -152,20 +168,12 @@ describe("classifyOccupant", () => { }); // --------------------------------------------------------------------------- -// decideBroadcast workflow cases (from issue #22 acceptance criteria) +// decideBroadcast workflow cases // --------------------------------------------------------------------------- describe("decideBroadcast", () => { it("sponsor-owned in-ledger conflict → RBF with occupant_fee + 1", () => { - const ledger = makeLedger({ - "105": { - nonce: 105, - txId: TX_OURS, - fee: "5000", - broadcastAt: NOW.toISOString(), - rbfAttempts: 1, - }, - }); + const ledger = makeLedger({ "105": makeEntry({ rbfAttempts: 1 }) }); const occupant = classifyOccupant(hiroTx(), SPONSOR, ledger, 105); const decision = decideBroadcast( makeWallet({ occupiedNonces: [] }), @@ -348,6 +356,143 @@ describe("decideBroadcast", () => { ); expect(decision).toEqual({ kind: "terminal", reason: "sponsor_exhausted" }); }); + + it("returns await_pending_broadcast when the ledger entry is unresolved", () => { + const ledger = makeLedger({ + "105": makeEntry({ status: "pending_broadcast" }), + }); + const decision = decideBroadcast( + makeWallet(), + { outcome: "accepted" }, + { nonce: 105, ledger } + ); + expect(decision).toEqual({ + kind: "await_pending_broadcast", + nonce: 105, + txId: TX_OURS, + }); + expect(BroadcastDecisionSchema.parse(decision)).toEqual(decision); + }); + + it("await_pending_broadcast takes precedence over RBF for a conflict outcome", () => { + const ledger = makeLedger({ + "105": makeEntry({ status: "pending_broadcast" }), + }); + const occupant = classifyOccupant(hiroTx(), SPONSOR, ledger, 105); + const decision = decideBroadcast( + makeWallet(), + { outcome: "nonce_conflict", isOrigin: false }, + { nonce: 105, ledger, occupant } + ); + expect(decision.kind).toBe("await_pending_broadcast"); + }); +}); + +// --------------------------------------------------------------------------- +// Two-phase broadcast lifecycle +// --------------------------------------------------------------------------- + +describe("beginPendingBroadcast", () => { + it("writes a new pending_broadcast entry for a fresh nonce", () => { + const ledger = beginPendingBroadcast(makeLedger(), { + nonce: 105, + txId: TX_OURS, + fee: "5000", + broadcastAt: NOW, + }); + expect(ledger.entries["105"]).toMatchObject({ + nonce: 105, + txId: TX_OURS, + fee: "5000", + status: "pending_broadcast", + broadcastAt: NOW.toISOString(), + rbfAttempts: 0, + }); + expect(SponsorLedgerSchema.parse(ledger)).toEqual(ledger); + }); + + it("allows transition from broadcast_sent (RBF replaces prior entry)", () => { + const prior = makeLedger({ "105": makeEntry({ rbfAttempts: 2 }) }); + const next = beginPendingBroadcast(prior, { + nonce: 105, + txId: TX_OURS_2, + fee: "6000", + rbfAttempts: 3, + broadcastAt: NOW, + }); + expect(next.entries["105"]?.status).toBe("pending_broadcast"); + expect(next.entries["105"]?.txId).toBe(TX_OURS_2); + expect(next.entries["105"]?.rbfAttempts).toBe(3); + }); + + it("allows transition from broadcast_failed (retry)", () => { + const prior = makeLedger({ + "105": makeEntry({ status: "broadcast_failed" }), + }); + const next = beginPendingBroadcast(prior, { + nonce: 105, + txId: TX_OURS_2, + fee: "6000", + broadcastAt: NOW, + }); + expect(next.entries["105"]?.status).toBe("pending_broadcast"); + }); + + it("refuses to overwrite an unresolved pending_broadcast entry", () => { + const prior = makeLedger({ + "105": makeEntry({ status: "pending_broadcast" }), + }); + expect(() => + beginPendingBroadcast(prior, { + nonce: 105, + txId: TX_OURS_2, + fee: "6000", + }) + ).toThrow(LedgerTransitionError); + }); +}); + +describe("resolveBroadcast", () => { + it("transitions pending_broadcast → broadcast_sent", () => { + const ledger = beginPendingBroadcast(makeLedger(), { + nonce: 105, + txId: TX_OURS, + fee: "5000", + broadcastAt: NOW, + }); + const resolved = resolveBroadcast(ledger, 105, "sent"); + expect(resolved.entries["105"]?.status).toBe("broadcast_sent"); + }); + + it("transitions pending_broadcast → broadcast_failed", () => { + const ledger = beginPendingBroadcast(makeLedger(), { + nonce: 105, + txId: TX_OURS, + fee: "5000", + broadcastAt: NOW, + }); + const resolved = resolveBroadcast(ledger, 105, "failed", { + lastOutcome: { outcome: "server_error", reason: "upstream 502" }, + }); + expect(resolved.entries["105"]?.status).toBe("broadcast_failed"); + expect(resolved.entries["105"]?.lastOutcome).toEqual({ + outcome: "server_error", + reason: "upstream 502", + }); + }); + + it("throws when the entry does not exist", () => { + expect(() => resolveBroadcast(makeLedger(), 105, "sent")).toThrow( + LedgerTransitionError + ); + }); + + it("throws when the entry is not pending (no double-resolve)", () => { + const ledger = makeLedger({ "105": makeEntry() }); + expect(() => resolveBroadcast(ledger, 105, "sent")).toThrow( + LedgerTransitionError + ); + }); }); // --------------------------------------------------------------------------- @@ -459,19 +604,13 @@ describe("quarantine", () => { }); // --------------------------------------------------------------------------- -// reconcile — orphans adopted, missing entries flagged +// reconcile — orphans, grace window, pending promotion // --------------------------------------------------------------------------- describe("reconcile", () => { it("adopts orphans seen in the mempool and flags ledger entries that disappeared", () => { const ledger = makeLedger({ - "104": { - nonce: 104, - txId: TX_OURS, - fee: "5000", - broadcastAt: NOW.toISOString(), - rbfAttempts: 0, - }, + "104": makeEntry({ nonce: 104 }), }); const wallet = makeWallet(); @@ -480,27 +619,25 @@ describe("reconcile", () => { 105: hiroTx({ tx_id: TX_ORPHAN, fee_rate: "7000" }), }; - const result = reconcile(wallet, ledger, mempool, SPONSOR, { now: NOW }); + const result = reconcile(wallet, ledger, mempool, SPONSOR, { + now: new Date(NOW.getTime() + 120_000), + }); expect(result.adopted).toEqual([105]); expect(result.dropped).toEqual([104]); + expect(result.inFlightPendingIndex).toEqual([]); expect(result.ledger.entries["105"]?.txId).toBe(TX_ORPHAN); + expect(result.ledger.entries["105"]?.status).toBe("broadcast_sent"); expect(result.wallet.occupiedNonces.map((n) => n.nonce)).toContain(105); }); it("flags ledger drift when mempool tx_id differs from ledger", () => { - const ledger = makeLedger({ - "104": { - nonce: 104, - txId: TX_OURS, - fee: "5000", - broadcastAt: NOW.toISOString(), - rbfAttempts: 0, - }, - }); + const ledger = makeLedger({ "104": makeEntry({ nonce: 104 }) }); const mempool: Record = { 104: hiroTx({ tx_id: TX_ORPHAN, sponsor_nonce: 104 }), }; - const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { + now: new Date(NOW.getTime() + 120_000), + }); expect(result.dropped).toContain(104); }); @@ -530,4 +667,104 @@ describe("reconcile", () => { expect(result.unpriceableOrphans).toEqual([105]); expect(result.ledger.entries["105"]).toBeUndefined(); }); + + it("promotes pending_broadcast → broadcast_sent when the mempool confirms the txId", () => { + const ledger = makeLedger({ + "105": makeEntry({ status: "pending_broadcast" }), + }); + const mempool: Record = { + 105: hiroTx(), + }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.ledger.entries["105"]?.status).toBe("broadcast_sent"); + }); + + it("classifies inside-grace absences as inFlightPendingIndex, not dropped", () => { + const broadcastAt = new Date(NOW.getTime() - 10_000); // 10s ago + const ledger = makeLedger({ + "105": makeEntry({ + status: "pending_broadcast", + broadcastAt: broadcastAt.toISOString(), + }), + }); + const mempool: Record = { 105: null }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.inFlightPendingIndex).toEqual([105]); + expect(result.dropped).toEqual([]); + }); + + it("classifies past-grace absences as dropped", () => { + const broadcastAt = new Date(NOW.getTime() - 60_000); // 60s ago + const ledger = makeLedger({ + "105": makeEntry({ + status: "pending_broadcast", + broadcastAt: broadcastAt.toISOString(), + }), + }); + const mempool: Record = { 105: null }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.dropped).toEqual([105]); + expect(result.inFlightPendingIndex).toEqual([]); + }); + + it("honors a custom justBroadcastGraceSeconds override", () => { + const broadcastAt = new Date(NOW.getTime() - 45_000); // 45s ago + const ledger = makeLedger({ + "105": makeEntry({ + status: "pending_broadcast", + broadcastAt: broadcastAt.toISOString(), + }), + }); + const mempool: Record = { 105: null }; + + const tight = reconcile(makeWallet(), ledger, mempool, SPONSOR, { + now: NOW, + justBroadcastGraceSeconds: 30, + }); + expect(tight.dropped).toEqual([105]); + + const lenient = reconcile(makeWallet(), ledger, mempool, SPONSOR, { + now: NOW, + justBroadcastGraceSeconds: 60, + }); + expect(lenient.inFlightPendingIndex).toEqual([105]); + }); + + it("two-phase happy path end-to-end: begin → reconcile promotes → resolve is a no-op edge case", () => { + let ledger = beginPendingBroadcast(makeLedger(), { + nonce: 105, + txId: TX_OURS, + fee: "5000", + broadcastAt: NOW, + }); + expect(ledger.entries["105"]?.status).toBe("pending_broadcast"); + + // Network call returned success — caller resolves explicitly. + ledger = resolveBroadcast(ledger, 105, "sent"); + expect(ledger.entries["105"]?.status).toBe("broadcast_sent"); + + // Mempool later confirms the same txId; no drift, no adopt. + const mempool: Record = { + 105: hiroTx(), + }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.adopted).toEqual([]); + expect(result.dropped).toEqual([]); + expect(result.inFlightPendingIndex).toEqual([]); + }); + + it("two-phase crash recovery: pending entry older than grace is dropped for caller to inspect", () => { + // Simulate: beginPendingBroadcast succeeded, process died, entry was + // never resolved and the tx never reached the node. + const broadcastAt = new Date(NOW.getTime() - 120_000); + const ledger = makeLedger({ + "105": makeEntry({ + status: "pending_broadcast", + broadcastAt: broadcastAt.toISOString(), + }), + }); + const mempool: Record = { 105: null }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.dropped).toEqual([105]); + }); }); From 3b1ec6fcbec8f9cdabfebaa506f14c9e3d2311ec Mon Sep 17 00:00:00 2001 From: Jason Schrader Date: Wed, 15 Apr 2026 12:36:22 -0700 Subject: [PATCH 2/3] fix: address PR review feedback - Remove unused LedgerEntryStatusSchema import from sponsor-wallet-machine. - reconcile: txId mismatch at a nonce is drift, not propagation lag. Classify as dropped regardless of broadcastAt age. Grace window now applies only when observed is null/undefined. Adds test to pin the behavior. - README: reformat inline object literal as fenced block so markdown renderers don't split the single-line code span across a newline. Co-Authored-By: Claude Opus 4.6 (1M context) --- README.md | 10 +++++++--- src/core/sponsor-wallet-machine.ts | 16 ++++++++++++---- tests/sponsor-wallet-machine.test.ts | 18 ++++++++++++++++++ 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 98167cf..44f8b13 100644 --- a/README.md +++ b/README.md @@ -194,9 +194,13 @@ try { ``` `decideBroadcast` refuses to issue a new decision while the entry is -`pending_broadcast` — it returns `{ kind: "await_pending_broadcast", nonce, -txId }` so the consumer resolves the prior call before a second broadcast -can fire. +`pending_broadcast`. It returns: + +```ts +{ kind: "await_pending_broadcast", nonce, txId } +``` + +so the consumer resolves the prior call before a second broadcast can fire. `reconcile()` sweeps survivors of crashes that dropped the resolve step: diff --git a/src/core/sponsor-wallet-machine.ts b/src/core/sponsor-wallet-machine.ts index d60aea3..338a607 100644 --- a/src/core/sponsor-wallet-machine.ts +++ b/src/core/sponsor-wallet-machine.ts @@ -9,7 +9,6 @@ import { } from "./primitives.js"; import { getLedgerEntry, - LedgerEntryStatusSchema, SponsorLedgerEntrySchema, } from "./sponsor-ledger.js"; import type { @@ -616,15 +615,24 @@ export function reconcile( adopted.push(nonce); } - // Pass 2: classify absences. An entry whose nonce was included in the - // mempool read but whose txId is missing (null or mismatched) is either - // still-propagating (within grace) or truly dropped. + // Pass 2: classify absences. Two distinct cases: + // - observed is null/undefined → not seen yet; could be propagation lag + // (grace-window → inFlightPendingIndex) or a true drop (past grace). + // - observed.tx_id !== entry.txId → a different tx holds the slot. That's + // drift, not lag — the indexer is already reporting a concrete outcome. + // Classify as dropped regardless of age so callers re-decide immediately. for (const [nonceStr, entry] of Object.entries(ledger.entries)) { const nonce = Number(nonceStr); if (!(nonce in mempoolReadByNonce)) continue; const observed = mempoolReadByNonce[nonce]; + if (observed && observed.tx_id === entry.txId) continue; + if (observed) { + dropped.push(entry.nonce); + continue; + } + const ageMs = nowMs - Date.parse(entry.broadcastAt); if (ageMs >= 0 && ageMs < graceMs) { inFlightPendingIndex.push(entry.nonce); diff --git a/tests/sponsor-wallet-machine.test.ts b/tests/sponsor-wallet-machine.test.ts index 82f18f4..846c855 100644 --- a/tests/sponsor-wallet-machine.test.ts +++ b/tests/sponsor-wallet-machine.test.ts @@ -707,6 +707,24 @@ describe("reconcile", () => { expect(result.inFlightPendingIndex).toEqual([]); }); + it("treats txId drift as dropped even within the grace window (drift is not lag)", () => { + const broadcastAt = new Date(NOW.getTime() - 5_000); // 5s ago, well inside grace + const ledger = makeLedger({ + "105": makeEntry({ + nonce: 105, + txId: TX_OURS, + broadcastAt: broadcastAt.toISOString(), + }), + }); + // Mempool reports a *different* sponsor-owned tx at the same nonce. + const mempool: Record = { + 105: hiroTx({ tx_id: TX_ORPHAN, fee_rate: "7000" }), + }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.dropped).toContain(105); + expect(result.inFlightPendingIndex).toEqual([]); + }); + it("honors a custom justBroadcastGraceSeconds override", () => { const broadcastAt = new Date(NOW.getTime() - 45_000); // 45s ago const ledger = makeLedger({ From a909bbc0d1d02c4279dee3fa01e72d8adedf99e6 Mon Sep 17 00:00:00 2001 From: Jason Schrader Date: Wed, 15 Apr 2026 15:15:13 -0700 Subject: [PATCH 3/3] fix: address arc0btc review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - reconcile: grace window now applies ONLY to pending_broadcast entries. A broadcast_sent entry that vanishes from the mempool is mined or evicted (the node already confirmed receipt), not propagation lag. Falls through to dropped so the caller disambiguates via tx status lookup. - ReconcileOptions: clarify default-30s calibration (Nakamoto block timing + Hiro indexer latency) and document grace-window scope. - README: drop shadowed `let ledger = ...` in favor of straight reassignment. - README: add RBF flow example showing explicit rbfAttempts increment so consumers don't miss the no-auto-increment behavior. - Test: pin broadcast_sent absence behavior (5s old, mempool null → dropped, not inFlightPendingIndex). Co-Authored-By: Claude Opus 4.6 (1M context) --- README.md | 16 ++++++++++- src/core/sponsor-wallet-machine.ts | 41 +++++++++++++++++++++------- tests/sponsor-wallet-machine.test.ts | 18 ++++++++++++ 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 44f8b13..dabc7eb 100644 --- a/README.md +++ b/README.md @@ -178,7 +178,7 @@ import { reconcile, } from "@aibtc/tx-schemas/core"; -let ledger = beginPendingBroadcast(ledger, { +ledger = beginPendingBroadcast(ledger, { nonce, txId, fee, @@ -193,6 +193,20 @@ try { } ``` +For RBF (e.g., a `fee_too_low` outcome), pass the incremented attempt count +explicitly — `beginPendingBroadcast` does not auto-increment, since the same +helper is also used for first broadcasts and retries: + +```ts +const existing = ledger.entries[String(nonce)]!; +ledger = beginPendingBroadcast(ledger, { + nonce, + txId: rbfTxId, + fee: bumpedFee, + rbfAttempts: existing.rbfAttempts + 1, +}); +``` + `decideBroadcast` refuses to issue a new decision while the entry is `pending_broadcast`. It returns: diff --git a/src/core/sponsor-wallet-machine.ts b/src/core/sponsor-wallet-machine.ts index 338a607..e0f7b76 100644 --- a/src/core/sponsor-wallet-machine.ts +++ b/src/core/sponsor-wallet-machine.ts @@ -520,10 +520,22 @@ export const DEFAULT_JUST_BROADCAST_GRACE_SECONDS = 30; export interface ReconcileOptions { now?: Date; - // Ledger entries broadcast within this many seconds are classified as - // `inFlightPendingIndex` rather than `dropped` when absent from the - // mempool read. Covers node→indexer propagation lag (~6-10 Nakamoto - // blocks + Hiro indexing). Default 30s. + // Grace window for `pending_broadcast` ledger entries that are absent from + // the mempool read. Within this many seconds of `broadcastAt`, absence is + // classified as `inFlightPendingIndex` (node may have accepted; indexer + // hasn't reported yet) instead of `dropped`. Default 30s. + // + // Note: this default is calibrated to typical Hiro indexer latency and is + // intentionally on the tight end — 6-10 Nakamoto blocks at ~3-5s/block plus + // indexing puts the natural worst case near 30-50s. Operators who observe + // routine Hiro slowness should override upward; the grace only delays the + // `dropped` classification, it never changes a true outcome. + // + // The grace window applies ONLY to `pending_broadcast` entries. A + // `broadcast_sent` entry that vanishes is a different operational signal + // (the node confirmed receipt; absence usually means mined or evicted), so + // it falls through to `dropped` immediately for the caller to disambiguate + // via tx status lookup. justBroadcastGraceSeconds?: number; } @@ -615,12 +627,16 @@ export function reconcile( adopted.push(nonce); } - // Pass 2: classify absences. Two distinct cases: - // - observed is null/undefined → not seen yet; could be propagation lag - // (grace-window → inFlightPendingIndex) or a true drop (past grace). - // - observed.tx_id !== entry.txId → a different tx holds the slot. That's - // drift, not lag — the indexer is already reporting a concrete outcome. - // Classify as dropped regardless of age so callers re-decide immediately. + // Pass 2: classify absences. Three distinct cases: + // - observed.tx_id !== entry.txId → drift, not lag. The indexer is + // already reporting a concrete outcome at this nonce. Classify as + // dropped regardless of age so the caller re-decides immediately. + // - observed is null/undefined AND entry.status === "pending_broadcast" + // → genuine propagation ambiguity. Apply the grace window. + // - observed is null/undefined AND entry.status !== "pending_broadcast" + // → the node already confirmed receipt of this tx; absence now means + // mined or evicted. Classify as dropped immediately; the caller looks + // up tx status to disambiguate. for (const [nonceStr, entry] of Object.entries(ledger.entries)) { const nonce = Number(nonceStr); if (!(nonce in mempoolReadByNonce)) continue; @@ -633,6 +649,11 @@ export function reconcile( continue; } + if (entry.status !== "pending_broadcast") { + dropped.push(entry.nonce); + continue; + } + const ageMs = nowMs - Date.parse(entry.broadcastAt); if (ageMs >= 0 && ageMs < graceMs) { inFlightPendingIndex.push(entry.nonce); diff --git a/tests/sponsor-wallet-machine.test.ts b/tests/sponsor-wallet-machine.test.ts index 846c855..9910caa 100644 --- a/tests/sponsor-wallet-machine.test.ts +++ b/tests/sponsor-wallet-machine.test.ts @@ -707,6 +707,24 @@ describe("reconcile", () => { expect(result.inFlightPendingIndex).toEqual([]); }); + it("does NOT grace-window broadcast_sent entries — node already confirmed receipt", () => { + // Entry was sent 5s ago (well inside the default 30s grace) but its + // status is broadcast_sent, meaning the node accepted it. A subsequent + // mempool absence is not propagation lag — it's mined or evicted, and + // the caller must look up tx status to disambiguate. Grace doesn't apply. + const broadcastAt = new Date(NOW.getTime() - 5_000); + const ledger = makeLedger({ + "105": makeEntry({ + status: "broadcast_sent", + broadcastAt: broadcastAt.toISOString(), + }), + }); + const mempool: Record = { 105: null }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.dropped).toEqual([105]); + expect(result.inFlightPendingIndex).toEqual([]); + }); + it("treats txId drift as dropped even within the grace window (drift is not lag)", () => { const broadcastAt = new Date(NOW.getTime() - 5_000); // 5s ago, well inside grace const ledger = makeLedger({