diff --git a/README.md b/README.md index ee63c8e..dabc7eb 100644 --- a/README.md +++ b/README.md @@ -153,3 +153,86 @@ const { wallet: next, ledger: nextLedger, adopted, dropped } = reconcile( All helpers are pure: inputs → new state. No I/O, and time-sensitive helpers accept an injectable `now` option for deterministic tests; otherwise they use the current time by default. + +### Canonical write pattern: two-phase broadcast + +`LedgerEntry.status` captures whether a broadcast has round-tripped with the +node. The two-phase contract closes the edge-terminator / crash window where a +single-phase ledger write could claim `broadcast_sent` for a tx the node never +saw (or lose track of a tx the node did accept). + +``` +(no entry) → pending_broadcast [beginPendingBroadcast] +pending_broadcast → broadcast_sent [resolveBroadcast("sent") | reconcile] +pending_broadcast → broadcast_failed [resolveBroadcast("failed")] +broadcast_sent → pending_broadcast [new RBF attempt, new txId] +broadcast_failed → pending_broadcast [retry, new txId] +``` + +Write the ledger **before** the network call, resolve on return: + +```ts +import { + beginPendingBroadcast, + resolveBroadcast, + reconcile, +} from "@aibtc/tx-schemas/core"; + +ledger = beginPendingBroadcast(ledger, { + nonce, + txId, + fee, +}); + +try { + const outcome = await broadcastTransaction(signedTx); + ledger = resolveBroadcast(ledger, nonce, "sent", { lastOutcome: outcome }); +} catch (err) { + ledger = resolveBroadcast(ledger, nonce, "failed"); + throw err; +} +``` + +For RBF (e.g., a `fee_too_low` outcome), pass the incremented attempt count +explicitly — `beginPendingBroadcast` does not auto-increment, since the same +helper is also used for first broadcasts and retries: + +```ts +const existing = ledger.entries[String(nonce)]!; +ledger = beginPendingBroadcast(ledger, { + nonce, + txId: rbfTxId, + fee: bumpedFee, + rbfAttempts: existing.rbfAttempts + 1, +}); +``` + +`decideBroadcast` refuses to issue a new decision while the entry is +`pending_broadcast`. It returns: + +```ts +{ kind: "await_pending_broadcast", nonce, txId } +``` + +so the consumer resolves the prior call before a second broadcast can fire. + +`reconcile()` sweeps survivors of crashes that dropped the resolve step: + +- A `pending_broadcast` entry whose txId appears in the mempool is promoted + to `broadcast_sent` automatically. +- A `pending_broadcast` entry absent from the mempool within + `justBroadcastGraceSeconds` (default 30) is classified as + `inFlightPendingIndex` — node may have accepted it; indexer just hasn't + caught up. +- Past the grace window with no mempool hit, the entry is reported as + `dropped` for caller inspection. + +```ts +const { ledger: next, adopted, dropped, inFlightPendingIndex } = reconcile( + wallet, + ledger, + mempoolReadByNonce, + sponsorAddress, + { justBroadcastGraceSeconds: 30 } +); +``` diff --git a/src/core/sponsor-ledger.ts b/src/core/sponsor-ledger.ts index 989ddbe..37bf4dc 100644 --- a/src/core/sponsor-ledger.ts +++ b/src/core/sponsor-ledger.ts @@ -14,12 +14,28 @@ import { // One entry per (sponsorAddress, nonce). Relays persist this so they can // distinguish their own prior broadcast (RBF path) from a foreign occupant // (quarantine path) when stacks-core reports a nonce conflict. +// +// Lifecycle (two-phase broadcast): +// (no entry) → pending_broadcast [beginPendingBroadcast] +// pending_broadcast → broadcast_sent [resolveBroadcast / reconcile] +// pending_broadcast → broadcast_failed [resolveBroadcast] +// broadcast_sent → pending_broadcast [new RBF attempt, new txId] +// broadcast_failed → pending_broadcast [retry, new txId] // --------------------------------------------------------------------------- +export const LedgerEntryStatusSchema = z.enum([ + "pending_broadcast", + "broadcast_sent", + "broadcast_failed", +]); + +export type LedgerEntryStatus = z.infer; + export const SponsorLedgerEntrySchema = z.object({ nonce: NonNegativeIntegerSchema, txId: TransactionIdSchema, fee: AmountStringSchema, + status: LedgerEntryStatusSchema, broadcastAt: IsoDateTimeSchema, rbfAttempts: NonNegativeIntegerSchema, lastOutcome: NodeBroadcastOutcomeSchema.optional(), diff --git a/src/core/sponsor-wallet-machine.ts b/src/core/sponsor-wallet-machine.ts index 088d513..e0f7b76 100644 --- a/src/core/sponsor-wallet-machine.ts +++ b/src/core/sponsor-wallet-machine.ts @@ -7,8 +7,12 @@ import { StacksAddressSchema, TransactionIdSchema, } from "./primitives.js"; -import { getLedgerEntry, SponsorLedgerEntrySchema } from "./sponsor-ledger.js"; +import { + getLedgerEntry, + SponsorLedgerEntrySchema, +} from "./sponsor-ledger.js"; import type { + LedgerEntryStatus, SponsorLedger, SponsorLedgerEntry, } from "./sponsor-ledger.js"; @@ -93,12 +97,19 @@ const TerminalDecisionSchema = z.object({ reason: FailedTerminalReasonSchema, }); +const AwaitPendingBroadcastDecisionSchema = z.object({ + kind: z.literal("await_pending_broadcast"), + nonce: NonNegativeIntegerSchema, + txId: TransactionIdSchema, +}); + export const BroadcastDecisionSchema = z.discriminatedUnion("kind", [ FirstBroadcastDecisionSchema, RbfWithFeeDecisionSchema, AdoptThenRbfDecisionSchema, QuarantineDecisionSchema, TerminalDecisionSchema, + AwaitPendingBroadcastDecisionSchema, ]); export type BroadcastDecision = z.infer; @@ -133,6 +144,7 @@ export function classifyOccupant( nonce: entry.nonce, txId: entry.txId, fee: entry.fee, + status: entry.status, broadcastAt: entry.broadcastAt, rbfAttempts: entry.rbfAttempts, }, @@ -159,7 +171,8 @@ export function classifyOccupant( // // Maps a (walletCapacity, nodeBroadcastOutcome, ledger-for-nonce, occupant) // tuple to the next action: first broadcast, RBF, adopt-then-RBF, quarantine, -// or terminal. Pure — no I/O, no time source. +// terminal, or await-pending (the prior broadcast hasn't been resolved yet). +// Pure — no I/O, no time source. // --------------------------------------------------------------------------- export interface BroadcastContext { @@ -180,6 +193,18 @@ export function decideBroadcast( const { nonce, ledger, occupant } = context; const maxAttempts = context.maxRbfAttempts ?? MAX_RBF_ATTEMPTS; const ledgerEntry = getLedgerEntry(ledger, nonce); + + // Hard invariant: a pending_broadcast ledger entry means a prior broadcast + // call hasn't been resolved. Issuing a new decision on top would risk + // double-broadcast. Force the consumer to resolveBroadcast() first. + if (ledgerEntry?.status === "pending_broadcast") { + return { + kind: "await_pending_broadcast", + nonce, + txId: ledgerEntry.txId, + }; + } + const occupied = wallet.occupiedNonces.find((o) => o.nonce === nonce); const attempts = occupied?.rbfAttempts ?? ledgerEntry?.rbfAttempts ?? 0; @@ -286,6 +311,111 @@ function decideSponsorConflict(args: { const bump = (fee: string): string => (BigInt(fee) + 1n).toString(); +// --------------------------------------------------------------------------- +// Two-phase broadcast lifecycle +// +// beginPendingBroadcast → (network call) → resolveBroadcast +// +// The ledger is written BEFORE the network call so a crash between write +// and call return never produces a ledger entry claiming sent-to-node when +// the node never saw the tx. On the return path, resolveBroadcast promotes +// pending → broadcast_sent or → broadcast_failed. If the process dies in +// between, reconcile() sweeps stragglers using the grace window. +// --------------------------------------------------------------------------- + +export class LedgerTransitionError extends Error { + constructor(message: string) { + super(message); + this.name = "LedgerTransitionError"; + } +} + +export interface BeginPendingBroadcastInput { + nonce: number; + txId: string; + fee: string; + broadcastAt?: Date; + rbfAttempts?: number; +} + +// Valid predecessors for entering pending_broadcast at a nonce. A fresh nonce +// (no entry) is allowed; prior terminal states (sent/failed) are allowed +// because they represent a completed lifecycle that the caller is extending +// with a new txId (RBF or retry). pending_broadcast is NOT a valid predecessor +// — the prior call must be resolved first. +const PENDING_PREDECESSORS = new Set([ + "none", + "broadcast_sent", + "broadcast_failed", +]); + +export function beginPendingBroadcast( + ledger: SponsorLedger, + input: BeginPendingBroadcastInput +): SponsorLedger { + const { nonce, txId, fee } = input; + const existing = getLedgerEntry(ledger, nonce); + const predecessor: LedgerEntryStatus | "none" = existing?.status ?? "none"; + + if (!PENDING_PREDECESSORS.has(predecessor)) { + throw new LedgerTransitionError( + `cannot begin pending_broadcast for nonce ${nonce}: prior status is ${predecessor}. resolveBroadcast() must be called first.` + ); + } + + const broadcastAt = (input.broadcastAt ?? new Date()).toISOString(); + const rbfAttempts = input.rbfAttempts ?? existing?.rbfAttempts ?? 0; + + const entry: SponsorLedgerEntry = { + nonce, + txId, + fee, + status: "pending_broadcast", + broadcastAt, + rbfAttempts, + }; + + return { + ...ledger, + entries: { ...ledger.entries, [String(nonce)]: entry }, + }; +} + +export type ResolveBroadcastOutcome = "sent" | "failed"; + +export function resolveBroadcast( + ledger: SponsorLedger, + nonce: number, + outcome: ResolveBroadcastOutcome, + options: { lastOutcome?: NodeBroadcastOutcome } = {} +): SponsorLedger { + const entry = getLedgerEntry(ledger, nonce); + if (!entry) { + throw new LedgerTransitionError( + `cannot resolve nonce ${nonce}: no ledger entry exists.` + ); + } + if (entry.status !== "pending_broadcast") { + throw new LedgerTransitionError( + `cannot resolve nonce ${nonce}: current status is ${entry.status}, expected pending_broadcast.` + ); + } + + const status: LedgerEntryStatus = + outcome === "sent" ? "broadcast_sent" : "broadcast_failed"; + + const resolved: SponsorLedgerEntry = { + ...entry, + status, + ...(options.lastOutcome !== undefined && { lastOutcome: options.lastOutcome }), + }; + + return { + ...ledger, + entries: { ...ledger.entries, [String(nonce)]: resolved }, + }; +} + // --------------------------------------------------------------------------- // adoptOrphan // @@ -377,17 +507,47 @@ export function quarantine( // // Folds one address-filtered mempool read into wallet + ledger: // - adopts sponsor-owned tx_ids missing from the ledger -// - flags ledger entries that are no longer visible in the mempool +// - promotes pending_broadcast entries to broadcast_sent on mempool hit +// - classifies ledger entries not (yet) seen in the mempool: +// * within the just-broadcast grace window → inFlightPendingIndex +// * past the grace window → dropped // -// Returns the updated wallet + ledger plus the adopted/dropped nonce lists so -// the caller can log/alert. Pure — `now` is injectable. +// Returns the updated wallet + ledger plus classification lists so the caller +// can log/alert. Pure — `now` is injectable. // --------------------------------------------------------------------------- +export const DEFAULT_JUST_BROADCAST_GRACE_SECONDS = 30; + +export interface ReconcileOptions { + now?: Date; + // Grace window for `pending_broadcast` ledger entries that are absent from + // the mempool read. Within this many seconds of `broadcastAt`, absence is + // classified as `inFlightPendingIndex` (node may have accepted; indexer + // hasn't reported yet) instead of `dropped`. Default 30s. + // + // Note: this default is calibrated to typical Hiro indexer latency and is + // intentionally on the tight end — 6-10 Nakamoto blocks at ~3-5s/block plus + // indexing puts the natural worst case near 30-50s. Operators who observe + // routine Hiro slowness should override upward; the grace only delays the + // `dropped` classification, it never changes a true outcome. + // + // The grace window applies ONLY to `pending_broadcast` entries. A + // `broadcast_sent` entry that vanishes is a different operational signal + // (the node confirmed receipt; absence usually means mined or evicted), so + // it falls through to `dropped` immediately for the caller to disambiguate + // via tx status lookup. + justBroadcastGraceSeconds?: number; +} + export interface ReconcileResult { wallet: WalletCapacity; ledger: SponsorLedger; adopted: number[]; dropped: number[]; + // Ledger entries absent from the mempool but still within the grace + // window. Callers should NOT treat these as missing — the node accepted + // the broadcast; the indexer just hasn't caught up. + inFlightPendingIndex: number[]; // Orphans we saw but couldn't price (no fee_rate on the Hiro view). // Callers should quarantine/alert rather than assume a safe RBF baseline. unpriceableOrphans: number[]; @@ -398,16 +558,24 @@ export function reconcile( ledger: SponsorLedger, mempoolReadByNonce: Record, ourSponsorAddress: string, - options: { now?: Date } = {} + options: ReconcileOptions = {} ): ReconcileResult { const now = options.now ?? new Date(); const nowIso = now.toISOString(); + const nowMs = now.getTime(); + const graceMs = + (options.justBroadcastGraceSeconds ?? DEFAULT_JUST_BROADCAST_GRACE_SECONDS) * + 1000; + let nextWallet = wallet; let nextLedger = ledger; const adopted: number[] = []; const dropped: number[] = []; + const inFlightPendingIndex: number[] = []; const unpriceableOrphans: number[] = []; + // Pass 1: adopt orphans, promote pending_broadcast → broadcast_sent when + // the mempool confirms our txId. for (const [nonceStr, hiroTx] of Object.entries(mempoolReadByNonce)) { const nonce = Number(nonceStr); if (!Number.isInteger(nonce) || nonce < 0) continue; @@ -419,6 +587,21 @@ export function reconcile( nextLedger, nonce ); + + if (classification.kind === "sponsor_owned_in_ledger") { + const entry = getLedgerEntry(nextLedger, nonce); + if (entry?.status === "pending_broadcast") { + nextLedger = { + ...nextLedger, + entries: { + ...nextLedger.entries, + [String(nonce)]: { ...entry, status: "broadcast_sent" }, + }, + }; + } + continue; + } + if (classification.kind !== "sponsor_owned_orphan") continue; if (hiroTx.fee_rate === undefined) { @@ -435,6 +618,7 @@ export function reconcile( nonce, txId: hiroTx.tx_id, fee: hiroTx.fee_rate, + status: "broadcast_sent", broadcastAt: nowIso, rbfAttempts: 0, }, @@ -443,11 +627,37 @@ export function reconcile( adopted.push(nonce); } + // Pass 2: classify absences. Three distinct cases: + // - observed.tx_id !== entry.txId → drift, not lag. The indexer is + // already reporting a concrete outcome at this nonce. Classify as + // dropped regardless of age so the caller re-decides immediately. + // - observed is null/undefined AND entry.status === "pending_broadcast" + // → genuine propagation ambiguity. Apply the grace window. + // - observed is null/undefined AND entry.status !== "pending_broadcast" + // → the node already confirmed receipt of this tx; absence now means + // mined or evicted. Classify as dropped immediately; the caller looks + // up tx status to disambiguate. for (const [nonceStr, entry] of Object.entries(ledger.entries)) { const nonce = Number(nonceStr); if (!(nonce in mempoolReadByNonce)) continue; const observed = mempoolReadByNonce[nonce]; - if (!observed || observed.tx_id !== entry.txId) { + + if (observed && observed.tx_id === entry.txId) continue; + + if (observed) { + dropped.push(entry.nonce); + continue; + } + + if (entry.status !== "pending_broadcast") { + dropped.push(entry.nonce); + continue; + } + + const ageMs = nowMs - Date.parse(entry.broadcastAt); + if (ageMs >= 0 && ageMs < graceMs) { + inFlightPendingIndex.push(entry.nonce); + } else { dropped.push(entry.nonce); } } @@ -457,6 +667,7 @@ export function reconcile( ledger: nextLedger, adopted, dropped, + inFlightPendingIndex, unpriceableOrphans, }; } diff --git a/tests/sponsor-wallet-machine.test.ts b/tests/sponsor-wallet-machine.test.ts index 33fd5e7..9910caa 100644 --- a/tests/sponsor-wallet-machine.test.ts +++ b/tests/sponsor-wallet-machine.test.ts @@ -1,19 +1,23 @@ import { describe, expect, it } from "vitest"; import { adoptOrphan, + beginPendingBroadcast, BroadcastDecisionSchema, classifyOccupant, decideBroadcast, HiroSponsorTxViewSchema, + LedgerTransitionError, MAX_RBF_ATTEMPTS, OccupantClassificationSchema, quarantine, RELAY_CHAINING_LIMIT, reconcile, + resolveBroadcast, SponsorLedgerSchema, WalletCapacitySchema, type HiroSponsorTxView, type SponsorLedger, + type SponsorLedgerEntry, type WalletCapacity, } from "../src/index.js"; @@ -25,6 +29,8 @@ const SPONSOR = "SP1KGHF33Y4M7Q87WNRXQBCAP2Y6DBSQSHJHQH4T"; const STRANGER = "SP2STRANGERXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"; const TX_OURS = "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; +const TX_OURS_2 = + "0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd"; const TX_FOREIGN = "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; const TX_ORPHAN = @@ -51,6 +57,18 @@ function makeLedger(entries: SponsorLedger["entries"] = {}): SponsorLedger { return { sponsorAddress: SPONSOR, entries }; } +function makeEntry(overrides: Partial = {}): SponsorLedgerEntry { + return { + nonce: 105, + txId: TX_OURS, + fee: "5000", + status: "broadcast_sent", + broadcastAt: NOW.toISOString(), + rbfAttempts: 0, + ...overrides, + }; +} + function hiroTx(overrides: Partial = {}): HiroSponsorTxView { return HiroSponsorTxViewSchema.parse({ tx_id: TX_OURS, @@ -65,7 +83,7 @@ function hiroTx(overrides: Partial = {}): HiroSponsorTxView { } // --------------------------------------------------------------------------- -// SponsorLedgerSchema — key/nonce drift guard +// SponsorLedgerSchema — key/nonce drift guard + required status // --------------------------------------------------------------------------- describe("SponsorLedgerSchema", () => { @@ -73,17 +91,20 @@ describe("SponsorLedgerSchema", () => { const result = SponsorLedgerSchema.safeParse({ sponsorAddress: SPONSOR, entries: { - "104": { - nonce: 105, - txId: TX_OURS, - fee: "5000", - broadcastAt: NOW.toISOString(), - rbfAttempts: 0, - }, + "104": makeEntry({ nonce: 105 }), }, }); expect(result.success).toBe(false); }); + + it("rejects ledger entries that omit status", () => { + const { status: _status, ...rest } = makeEntry(); + const result = SponsorLedgerSchema.safeParse({ + sponsorAddress: SPONSOR, + entries: { "105": rest }, + }); + expect(result.success).toBe(false); + }); }); // --------------------------------------------------------------------------- @@ -99,18 +120,13 @@ describe("classifyOccupant", () => { it("classifies our sponsor-owned tx as in-ledger when it matches", () => { const ledger = makeLedger({ - "105": { - nonce: 105, - txId: TX_OURS, - fee: "5000", - broadcastAt: NOW.toISOString(), - rbfAttempts: 1, - }, + "105": makeEntry({ rbfAttempts: 1 }), }); const result = classifyOccupant(hiroTx(), SPONSOR, ledger, 105); expect(result.kind).toBe("sponsor_owned_in_ledger"); if (result.kind === "sponsor_owned_in_ledger") { expect(result.ledgerEntry.txId).toBe(TX_OURS); + expect(result.ledgerEntry.status).toBe("broadcast_sent"); } }); @@ -152,20 +168,12 @@ describe("classifyOccupant", () => { }); // --------------------------------------------------------------------------- -// decideBroadcast workflow cases (from issue #22 acceptance criteria) +// decideBroadcast workflow cases // --------------------------------------------------------------------------- describe("decideBroadcast", () => { it("sponsor-owned in-ledger conflict → RBF with occupant_fee + 1", () => { - const ledger = makeLedger({ - "105": { - nonce: 105, - txId: TX_OURS, - fee: "5000", - broadcastAt: NOW.toISOString(), - rbfAttempts: 1, - }, - }); + const ledger = makeLedger({ "105": makeEntry({ rbfAttempts: 1 }) }); const occupant = classifyOccupant(hiroTx(), SPONSOR, ledger, 105); const decision = decideBroadcast( makeWallet({ occupiedNonces: [] }), @@ -348,6 +356,143 @@ describe("decideBroadcast", () => { ); expect(decision).toEqual({ kind: "terminal", reason: "sponsor_exhausted" }); }); + + it("returns await_pending_broadcast when the ledger entry is unresolved", () => { + const ledger = makeLedger({ + "105": makeEntry({ status: "pending_broadcast" }), + }); + const decision = decideBroadcast( + makeWallet(), + { outcome: "accepted" }, + { nonce: 105, ledger } + ); + expect(decision).toEqual({ + kind: "await_pending_broadcast", + nonce: 105, + txId: TX_OURS, + }); + expect(BroadcastDecisionSchema.parse(decision)).toEqual(decision); + }); + + it("await_pending_broadcast takes precedence over RBF for a conflict outcome", () => { + const ledger = makeLedger({ + "105": makeEntry({ status: "pending_broadcast" }), + }); + const occupant = classifyOccupant(hiroTx(), SPONSOR, ledger, 105); + const decision = decideBroadcast( + makeWallet(), + { outcome: "nonce_conflict", isOrigin: false }, + { nonce: 105, ledger, occupant } + ); + expect(decision.kind).toBe("await_pending_broadcast"); + }); +}); + +// --------------------------------------------------------------------------- +// Two-phase broadcast lifecycle +// --------------------------------------------------------------------------- + +describe("beginPendingBroadcast", () => { + it("writes a new pending_broadcast entry for a fresh nonce", () => { + const ledger = beginPendingBroadcast(makeLedger(), { + nonce: 105, + txId: TX_OURS, + fee: "5000", + broadcastAt: NOW, + }); + expect(ledger.entries["105"]).toMatchObject({ + nonce: 105, + txId: TX_OURS, + fee: "5000", + status: "pending_broadcast", + broadcastAt: NOW.toISOString(), + rbfAttempts: 0, + }); + expect(SponsorLedgerSchema.parse(ledger)).toEqual(ledger); + }); + + it("allows transition from broadcast_sent (RBF replaces prior entry)", () => { + const prior = makeLedger({ "105": makeEntry({ rbfAttempts: 2 }) }); + const next = beginPendingBroadcast(prior, { + nonce: 105, + txId: TX_OURS_2, + fee: "6000", + rbfAttempts: 3, + broadcastAt: NOW, + }); + expect(next.entries["105"]?.status).toBe("pending_broadcast"); + expect(next.entries["105"]?.txId).toBe(TX_OURS_2); + expect(next.entries["105"]?.rbfAttempts).toBe(3); + }); + + it("allows transition from broadcast_failed (retry)", () => { + const prior = makeLedger({ + "105": makeEntry({ status: "broadcast_failed" }), + }); + const next = beginPendingBroadcast(prior, { + nonce: 105, + txId: TX_OURS_2, + fee: "6000", + broadcastAt: NOW, + }); + expect(next.entries["105"]?.status).toBe("pending_broadcast"); + }); + + it("refuses to overwrite an unresolved pending_broadcast entry", () => { + const prior = makeLedger({ + "105": makeEntry({ status: "pending_broadcast" }), + }); + expect(() => + beginPendingBroadcast(prior, { + nonce: 105, + txId: TX_OURS_2, + fee: "6000", + }) + ).toThrow(LedgerTransitionError); + }); +}); + +describe("resolveBroadcast", () => { + it("transitions pending_broadcast → broadcast_sent", () => { + const ledger = beginPendingBroadcast(makeLedger(), { + nonce: 105, + txId: TX_OURS, + fee: "5000", + broadcastAt: NOW, + }); + const resolved = resolveBroadcast(ledger, 105, "sent"); + expect(resolved.entries["105"]?.status).toBe("broadcast_sent"); + }); + + it("transitions pending_broadcast → broadcast_failed", () => { + const ledger = beginPendingBroadcast(makeLedger(), { + nonce: 105, + txId: TX_OURS, + fee: "5000", + broadcastAt: NOW, + }); + const resolved = resolveBroadcast(ledger, 105, "failed", { + lastOutcome: { outcome: "server_error", reason: "upstream 502" }, + }); + expect(resolved.entries["105"]?.status).toBe("broadcast_failed"); + expect(resolved.entries["105"]?.lastOutcome).toEqual({ + outcome: "server_error", + reason: "upstream 502", + }); + }); + + it("throws when the entry does not exist", () => { + expect(() => resolveBroadcast(makeLedger(), 105, "sent")).toThrow( + LedgerTransitionError + ); + }); + + it("throws when the entry is not pending (no double-resolve)", () => { + const ledger = makeLedger({ "105": makeEntry() }); + expect(() => resolveBroadcast(ledger, 105, "sent")).toThrow( + LedgerTransitionError + ); + }); }); // --------------------------------------------------------------------------- @@ -459,19 +604,13 @@ describe("quarantine", () => { }); // --------------------------------------------------------------------------- -// reconcile — orphans adopted, missing entries flagged +// reconcile — orphans, grace window, pending promotion // --------------------------------------------------------------------------- describe("reconcile", () => { it("adopts orphans seen in the mempool and flags ledger entries that disappeared", () => { const ledger = makeLedger({ - "104": { - nonce: 104, - txId: TX_OURS, - fee: "5000", - broadcastAt: NOW.toISOString(), - rbfAttempts: 0, - }, + "104": makeEntry({ nonce: 104 }), }); const wallet = makeWallet(); @@ -480,27 +619,25 @@ describe("reconcile", () => { 105: hiroTx({ tx_id: TX_ORPHAN, fee_rate: "7000" }), }; - const result = reconcile(wallet, ledger, mempool, SPONSOR, { now: NOW }); + const result = reconcile(wallet, ledger, mempool, SPONSOR, { + now: new Date(NOW.getTime() + 120_000), + }); expect(result.adopted).toEqual([105]); expect(result.dropped).toEqual([104]); + expect(result.inFlightPendingIndex).toEqual([]); expect(result.ledger.entries["105"]?.txId).toBe(TX_ORPHAN); + expect(result.ledger.entries["105"]?.status).toBe("broadcast_sent"); expect(result.wallet.occupiedNonces.map((n) => n.nonce)).toContain(105); }); it("flags ledger drift when mempool tx_id differs from ledger", () => { - const ledger = makeLedger({ - "104": { - nonce: 104, - txId: TX_OURS, - fee: "5000", - broadcastAt: NOW.toISOString(), - rbfAttempts: 0, - }, - }); + const ledger = makeLedger({ "104": makeEntry({ nonce: 104 }) }); const mempool: Record = { 104: hiroTx({ tx_id: TX_ORPHAN, sponsor_nonce: 104 }), }; - const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { + now: new Date(NOW.getTime() + 120_000), + }); expect(result.dropped).toContain(104); }); @@ -530,4 +667,140 @@ describe("reconcile", () => { expect(result.unpriceableOrphans).toEqual([105]); expect(result.ledger.entries["105"]).toBeUndefined(); }); + + it("promotes pending_broadcast → broadcast_sent when the mempool confirms the txId", () => { + const ledger = makeLedger({ + "105": makeEntry({ status: "pending_broadcast" }), + }); + const mempool: Record = { + 105: hiroTx(), + }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.ledger.entries["105"]?.status).toBe("broadcast_sent"); + }); + + it("classifies inside-grace absences as inFlightPendingIndex, not dropped", () => { + const broadcastAt = new Date(NOW.getTime() - 10_000); // 10s ago + const ledger = makeLedger({ + "105": makeEntry({ + status: "pending_broadcast", + broadcastAt: broadcastAt.toISOString(), + }), + }); + const mempool: Record = { 105: null }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.inFlightPendingIndex).toEqual([105]); + expect(result.dropped).toEqual([]); + }); + + it("classifies past-grace absences as dropped", () => { + const broadcastAt = new Date(NOW.getTime() - 60_000); // 60s ago + const ledger = makeLedger({ + "105": makeEntry({ + status: "pending_broadcast", + broadcastAt: broadcastAt.toISOString(), + }), + }); + const mempool: Record = { 105: null }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.dropped).toEqual([105]); + expect(result.inFlightPendingIndex).toEqual([]); + }); + + it("does NOT grace-window broadcast_sent entries — node already confirmed receipt", () => { + // Entry was sent 5s ago (well inside the default 30s grace) but its + // status is broadcast_sent, meaning the node accepted it. A subsequent + // mempool absence is not propagation lag — it's mined or evicted, and + // the caller must look up tx status to disambiguate. Grace doesn't apply. + const broadcastAt = new Date(NOW.getTime() - 5_000); + const ledger = makeLedger({ + "105": makeEntry({ + status: "broadcast_sent", + broadcastAt: broadcastAt.toISOString(), + }), + }); + const mempool: Record = { 105: null }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.dropped).toEqual([105]); + expect(result.inFlightPendingIndex).toEqual([]); + }); + + it("treats txId drift as dropped even within the grace window (drift is not lag)", () => { + const broadcastAt = new Date(NOW.getTime() - 5_000); // 5s ago, well inside grace + const ledger = makeLedger({ + "105": makeEntry({ + nonce: 105, + txId: TX_OURS, + broadcastAt: broadcastAt.toISOString(), + }), + }); + // Mempool reports a *different* sponsor-owned tx at the same nonce. + const mempool: Record = { + 105: hiroTx({ tx_id: TX_ORPHAN, fee_rate: "7000" }), + }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.dropped).toContain(105); + expect(result.inFlightPendingIndex).toEqual([]); + }); + + it("honors a custom justBroadcastGraceSeconds override", () => { + const broadcastAt = new Date(NOW.getTime() - 45_000); // 45s ago + const ledger = makeLedger({ + "105": makeEntry({ + status: "pending_broadcast", + broadcastAt: broadcastAt.toISOString(), + }), + }); + const mempool: Record = { 105: null }; + + const tight = reconcile(makeWallet(), ledger, mempool, SPONSOR, { + now: NOW, + justBroadcastGraceSeconds: 30, + }); + expect(tight.dropped).toEqual([105]); + + const lenient = reconcile(makeWallet(), ledger, mempool, SPONSOR, { + now: NOW, + justBroadcastGraceSeconds: 60, + }); + expect(lenient.inFlightPendingIndex).toEqual([105]); + }); + + it("two-phase happy path end-to-end: begin → reconcile promotes → resolve is a no-op edge case", () => { + let ledger = beginPendingBroadcast(makeLedger(), { + nonce: 105, + txId: TX_OURS, + fee: "5000", + broadcastAt: NOW, + }); + expect(ledger.entries["105"]?.status).toBe("pending_broadcast"); + + // Network call returned success — caller resolves explicitly. + ledger = resolveBroadcast(ledger, 105, "sent"); + expect(ledger.entries["105"]?.status).toBe("broadcast_sent"); + + // Mempool later confirms the same txId; no drift, no adopt. + const mempool: Record = { + 105: hiroTx(), + }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.adopted).toEqual([]); + expect(result.dropped).toEqual([]); + expect(result.inFlightPendingIndex).toEqual([]); + }); + + it("two-phase crash recovery: pending entry older than grace is dropped for caller to inspect", () => { + // Simulate: beginPendingBroadcast succeeded, process died, entry was + // never resolved and the tx never reached the node. + const broadcastAt = new Date(NOW.getTime() - 120_000); + const ledger = makeLedger({ + "105": makeEntry({ + status: "pending_broadcast", + broadcastAt: broadcastAt.toISOString(), + }), + }); + const mempool: Record = { 105: null }; + const result = reconcile(makeWallet(), ledger, mempool, SPONSOR, { now: NOW }); + expect(result.dropped).toEqual([105]); + }); });