diff --git a/.changeset/ocapn-op-flush.md b/.changeset/ocapn-op-flush.md new file mode 100644 index 0000000000..052c3da10a --- /dev/null +++ b/.changeset/ocapn-op-flush.md @@ -0,0 +1,8 @@ +--- +'@endo/ocapn': minor +--- + +- Add the OCapN `op:flush` and `op:flush-done` operations and wire them into the message dispatcher. +- On receiving `op:flush` for an exported promise position, the receiver mints a fresh local promise, swaps it in at the same export position (preserving the slot's refcount) and replies with `op:flush-done`. Subsequent deliveries that target the position queue on the new promise so per-reference FIFO order is preserved during promise shortening. +- Expose `_debug.flushExport(remoteValue)` to send `op:flush` and obtain a promise that resolves when `op:flush-done` is received. +- Add `replaceExportValue` on the pairwise table to support in-place value replacement under flush. diff --git a/.github/workflows/ocapn-flush-proof.yml b/.github/workflows/ocapn-flush-proof.yml new file mode 100644 index 0000000000..665274de7e --- /dev/null +++ b/.github/workflows/ocapn-flush-proof.yml @@ -0,0 +1,167 @@ +name: OCapN flush proof + +# Builds and verifies the Lean 4 + Veil proof of the op:flush per-reference +# FIFO claim under packages/ocapn/proofs/veil. The proof's verification of +# record is Veil's `#check_invariants` step, which calls Z3 and CVC5 to +# discharge every VC. We assert that the build succeeds and parse the build +# log to confirm there are no failed clauses (`❌`) and that exactly the +# expected set of `sorry`-admitted declarations are present. + +on: + pull_request: + paths: + - 'packages/ocapn/proofs/**' + - '.github/workflows/ocapn-flush-proof.yml' + push: + branches: [master] + paths: + - 'packages/ocapn/proofs/**' + - '.github/workflows/ocapn-flush-proof.yml' + workflow_dispatch: + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + verify: + name: verify Lean 4 + Veil proof + runs-on: ubuntu-latest + timeout-minutes: 60 + + steps: + - name: Checkout + uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 + + # Veil ships a `downloadDependencies` step in its lakefile that fetches + # `uv` from astral.sh in addition to z3/cvc5 from GitHub releases. The + # GitHub-hosted runners can reach astral.sh, but it has historically + # been flaky; pre-installing `uv` here and dropping it into the + # build directory means the lakefile target succeeds even when astral + # 503s. We also use `uv` only for SMT-model decoration, so a slightly + # newer pip-resolved version is a fine substitute for the pinned + # 0.9.13. + - name: Install uv + run: | + python3 -m pip install --user uv + echo "UV_BIN=$(python3 -m uv --version >/dev/null && command -v uv || echo "$HOME/.local/bin/uv")" >> "$GITHUB_ENV" + + - name: Cache Lake build artifacts + uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0 + with: + path: | + packages/ocapn/proofs/veil/.lake + # Bump the third component of the key when changing the proof or + # toolchain to invalidate the cache. + key: lake-${{ runner.os }}-${{ hashFiles('packages/ocapn/proofs/veil/lean-toolchain', 'packages/ocapn/proofs/veil/lakefile.lean', 'packages/ocapn/proofs/veil/lake-manifest.json', 'packages/ocapn/proofs/veil/OcapnFlush/**') }} + restore-keys: | + lake-${{ runner.os }}- + + # Install Lean directly from the GitHub release tarball matching + # `packages/ocapn/proofs/veil/lean-toolchain`. We deliberately do NOT + # use leanprover/lean-action here: the org-level GitHub Actions + # policy forbids non-pinned actions, and lean-action's composite + # internally references `actions/cache/restore@v5` which is rejected + # at runtime. Driving the toolchain manually keeps every executed + # action SHA-pinned. + - name: Install Lean toolchain + working-directory: packages/ocapn/proofs/veil + run: | + set -euo pipefail + spec=$(cat lean-toolchain) + # Expected format: leanprover/lean4:v4.X.Y + version="${spec##*:v}" + if [ -z "$version" ] || [ "$version" = "$spec" ]; then + echo "FAIL: could not parse lean-toolchain '$spec'" + exit 1 + fi + tarball="lean-${version}-linux.tar.zst" + url="https://github.com/leanprover/lean4/releases/download/v${version}/${tarball}" + echo "Fetching $url" + curl -fsSL --retry 4 --retry-delay 5 -o "/tmp/${tarball}" "$url" + sudo apt-get update + sudo apt-get install -y zstd + mkdir -p "$RUNNER_TEMP/lean" + tar --use-compress-program='zstd -d' \ + -xf "/tmp/${tarball}" \ + -C "$RUNNER_TEMP/lean" --strip-components=1 + echo "$RUNNER_TEMP/lean/bin" >> "$GITHUB_PATH" + "$RUNNER_TEMP/lean/bin/lean" --version + "$RUNNER_TEMP/lean/bin/lake" --version + + - name: Resolve dependencies + working-directory: packages/ocapn/proofs/veil + run: lake update + + - name: Pre-stage uv binary for Veil's downloadDependencies + working-directory: packages/ocapn/proofs/veil + run: | + mkdir -p .lake/packages/veil/.lake/build + cp "$UV_BIN" .lake/packages/veil/.lake/build/uv + + - name: Build proof and capture log + working-directory: packages/ocapn/proofs/veil + run: | + set -o pipefail + lake build 2>&1 | tee build.log + + - name: Assert all invariants discharged by SMT + working-directory: packages/ocapn/proofs/veil + run: | + # No clause should fail. + if grep -E '❌' build.log; then + echo "FAIL: at least one invariant clause failed verification." + exit 1 + fi + # Sanity: at least one ✅ — guards against a silent change that + # disables `#check_invariants`. + if ! grep -qE '✅' build.log; then + echo "FAIL: build log contains no ✅ markers; #check_invariants did not run." + exit 1 + fi + # Veil reports how many VCs the SMT solver was trusted with. We + # don't pin the exact number (it scales with the model) but we + # assert it ran. + if ! grep -qE 'Trusting the SMT solver for [0-9]+ theorems' build.log; then + echo "FAIL: no 'Trusting the SMT solver' line; SMT verification did not occur." + exit 1 + fi + + - name: Assert only documented sorry usages + working-directory: packages/ocapn/proofs/veil + run: | + # Each module under OcapnFlush/ admits two sorrys — one for + # `prove_inv_inductive` (Lean proof-term reconstruction + # admitted to SMT) and one for the `sat trace` BMC. Anything + # else is a regression. We expect 2 sorrys per module file + # (currently Flush.lean + FourParty.lean = 4). + extra=$(grep -E "warning: OcapnFlush/.*: declaration uses 'sorry'" build.log || true) + count=$(printf '%s\n' "$extra" | grep -c "declaration uses 'sorry'" || true) + # Count distinct module files that contributed. + modules=$(printf '%s\n' "$extra" | grep -oE 'OcapnFlush/[^:]+' | sort -u | wc -l) + expected=$(( modules * 2 )) + if [ "$count" -ne "$expected" ]; then + echo "FAIL: expected exactly $expected admitted declarations (2 per module across $modules modules), found $count:" + printf '%s\n' "$extra" + exit 1 + fi + + - name: Assert build succeeded + working-directory: packages/ocapn/proofs/veil + run: | + if ! grep -qE 'Build completed successfully' build.log; then + echo "FAIL: 'Build completed successfully' not found in log." + exit 1 + fi + + - name: Upload build log + if: always() + uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5 + with: + name: ocapn-flush-proof-build-log + path: packages/ocapn/proofs/veil/build.log + if-no-files-found: warn + retention-days: 14 diff --git a/packages/ocapn/proofs/veil/.gitignore b/packages/ocapn/proofs/veil/.gitignore new file mode 100644 index 0000000000..509a5c3211 --- /dev/null +++ b/packages/ocapn/proofs/veil/.gitignore @@ -0,0 +1,4 @@ +# Lake build artifacts. We commit lake-manifest.json so dependency versions +# are pinned for reproducible builds. +.lake/ +build/ diff --git a/packages/ocapn/proofs/veil/OcapnFlush.lean b/packages/ocapn/proofs/veil/OcapnFlush.lean new file mode 100644 index 0000000000..2ae7f569fc --- /dev/null +++ b/packages/ocapn/proofs/veil/OcapnFlush.lean @@ -0,0 +1,3 @@ +-- Top-level module aggregating the per-scenario flush proofs. +import OcapnFlush.Flush +import OcapnFlush.FourParty diff --git a/packages/ocapn/proofs/veil/OcapnFlush/Flush.lean b/packages/ocapn/proofs/veil/OcapnFlush/Flush.lean new file mode 100644 index 0000000000..dd1b4e756a --- /dev/null +++ b/packages/ocapn/proofs/veil/OcapnFlush/Flush.lean @@ -0,0 +1,294 @@ +import Veil + +set_option linter.dupNamespace false + +/-! # End-to-end per-reference FIFO under op:flush + +This module models a deliberately **minimal** abstraction of the OCapN +flush-based promise-shortening protocol just rich enough to mechanically prove +end-to-end per-reference FIFO across a single shortening event. + +## Modeled scenario (3-party Alice / Bob / Carol) + +* Alice (A) hosts an entity `e`, exported as a local promise `p1` at slot `N`. +* Bob (B) holds a remote reference at A's slot `N` and is the sole sender of + reference messages targeting `p1`/`e`. +* Carol (C) will eventually receive the reference via a third-party handoff + initiated by Bob, shortening B → A into the path C → A. + +Reference messages are partitioned into two phases: + +* `phase1 m`: Bob sent `m` over the B→A channel **before** initiating the + flush (so it travels the unshortened path). +* `phase2 m`: Bob sent `m` such that, after the handoff is complete, Carol + forwards `m` to Alice over the C→A channel (the shortened path). + +## Axioms relied on + +* **A1 — Pairwise FIFO**: collapsed into action preconditions on Alice's + processing events ("can only process X after the channel-prior Y"). +* **A3 — Handoff value-availability**: `withdraw_gift` cannot be processed at + Alice until the matching `deposit_gift` has been processed at Alice. +* **A4 — Flush discipline**: Bob does not send `deposit_gift` (B→A) or + `handoff_give` (B→C) until after receiving `flush_done`. + +## Safety claim + +> for every phase-2 message `m₂` that Alice has processed and every phase-1 +> message `m₁`, Alice has also already processed `m₁`. + +This is the "two-fence" cross-phase FIFO claim. Within-phase FIFO is +established by the underlying transport and is not in scope here. -/ + +veil module Flush + +/- An uninterpreted type of reference messages. -/ +type msg + +/-! ## State -/ + +relation phase1 (m : msg) +relation phase2 (m : msg) +relation processed (m : msg) + +relation flushSentByB +relation flushProcessedAtA +relation flushDoneSent +relation flushDoneReceivedAtB +relation depositGiftSent +relation depositGiftProcessedAtA +relation handoffGiveSent +relation handoffGiveReceivedAtC +relation withdrawGiftSent +relation withdrawGiftProcessedAtA +relation cHoldsRef + +#gen_state + +/-! ## Initial state — every flag false. -/ + +after_init { + phase1 M := False + phase2 M := False + processed M := False + flushSentByB := False + flushProcessedAtA := False + flushDoneSent := False + flushDoneReceivedAtB := False + depositGiftSent := False + depositGiftProcessedAtA := False + handoffGiveSent := False + handoffGiveReceivedAtC := False + withdrawGiftSent := False + withdrawGiftProcessedAtA := False + cHoldsRef := False +} + +/-! ## Actions -/ + +/- Bob sends a phase-1 reference message `m` on B→A. -/ +action sendPhase1Msg (m : msg) = { + require ¬ flushSentByB + require ¬ phase1 m + require ¬ phase2 m + phase1 m := True +} + +/- Alice processes a phase-1 reference message `m`. By B→A FIFO, she does +this before processing `op:flush`. -/ +action processPhase1 (m : msg) = { + require phase1 m + require ¬ processed m + require ¬ flushProcessedAtA + processed m := True +} + +/- Bob sends `op:flush` on B→A. -/ +action bobSendFlush = { + require ¬ flushSentByB + flushSentByB := True +} + +/- Alice processes `op:flush`. By B→A FIFO Alice has already processed every +phase-1 message Bob sent (since `sendPhase1Msg` requires `¬ flushSentByB`, +all phase-1 messages were sent strictly before flush on the B→A channel). -/ +action aliceProcessFlush = { + require flushSentByB + require ¬ flushProcessedAtA + require ∀ M, phase1 M → processed M + flushProcessedAtA := True +} + +/- Alice queues `op:flush-done` after the swap. -/ +action aliceSendFlushDone = { + require flushProcessedAtA + require ¬ flushDoneSent + flushDoneSent := True +} + +/- Bob receives `op:flush-done`. -/ +action bobReceiveFlushDone = { + require flushDoneSent + require ¬ flushDoneReceivedAtB + flushDoneReceivedAtB := True +} + +/- Bob sends the deposit-gift on B→A. (A4.) -/ +action bobSendDepositGift = { + require flushDoneReceivedAtB + require ¬ depositGiftSent + depositGiftSent := True +} + +/- Alice processes the deposit-gift. By B→A FIFO Alice has already processed +the earlier `op:flush` on the same channel. -/ +action aliceProcessDepositGift = { + require depositGiftSent + require ¬ depositGiftProcessedAtA + require flushProcessedAtA + depositGiftProcessedAtA := True +} + +/- Bob sends the handoff-give to Carol on B→C. (A4.) -/ +action bobSendHandoffGive = { + require flushDoneReceivedAtB + require ¬ handoffGiveSent + handoffGiveSent := True +} + +/- Carol receives the handoff-give. -/ +action carolReceiveHandoffGive = { + require handoffGiveSent + require ¬ handoffGiveReceivedAtC + handoffGiveReceivedAtC := True +} + +/- Carol sends withdraw-gift on C→A. -/ +action carolSendWithdrawGift = { + require handoffGiveReceivedAtC + require ¬ withdrawGiftSent + withdrawGiftSent := True +} + +/- Alice processes withdraw-gift. By A3 (handoff value-availability), this +requires that Alice has already processed the matching deposit-gift. -/ +action aliceProcessWithdrawGift = { + require withdrawGiftSent + require ¬ withdrawGiftProcessedAtA + require depositGiftProcessedAtA + withdrawGiftProcessedAtA := True +} + +/- Carol receives the resolved gift back. -/ +action carolHoldsRef = { + require withdrawGiftProcessedAtA + require ¬ cHoldsRef + cHoldsRef := True +} + +/- Bob authors a phase-2 reference message; Carol forwards it on C→A. -/ +action sendPhase2Msg (m : msg) = { + require cHoldsRef + require ¬ phase1 m + require ¬ phase2 m + phase2 m := True +} + +/- Alice processes a phase-2 reference message. By C→A FIFO, Alice has +already processed every C→A message Carol sent earlier — most importantly +the withdraw-gift. -/ +action processPhase2 (m : msg) = { + require phase2 m + require ¬ processed m + require withdrawGiftProcessedAtA + processed m := True +} + +/-! ## Safety: end-to-end per-reference FIFO + +For any phase-1 message `M1` and any phase-2 message `M2`, if Alice has +processed `M2` then she has also already processed `M1`. -/ + +safety [refFifo] (phase1 M1 ∧ phase2 M2 ∧ processed M2) → processed M1 + +/-! ## Inductive invariants + +The two-fence chain: + +* `flushProcessedAtA` ⇒ every phase-1 message is processed. +* `depositGiftProcessedAtA` ⇒ the flush is done. +* `withdrawGiftProcessedAtA` ⇒ deposit is done. +* `processed M ∧ phase2 M` ⇒ withdraw-gift is done. + +Together they chain end-to-end. -/ + +invariant [phasesDisjoint] ¬ (phase1 M ∧ phase2 M) +invariant [processedHasPhase] processed M → (phase1 M ∨ phase2 M) +invariant [flushSentMonotone] flushProcessedAtA → flushSentByB +invariant [flushFence] flushProcessedAtA → (phase1 M → processed M) +invariant [depositAfterFlush] depositGiftProcessedAtA → flushProcessedAtA +invariant [withdrawAfterDeposit] withdrawGiftProcessedAtA → depositGiftProcessedAtA +invariant [cHoldsRefMeansWithdraw] cHoldsRef → withdrawGiftProcessedAtA +invariant [phase2NeedsWithdraw] (phase2 M ∧ processed M) → withdrawGiftProcessedAtA + +#gen_spec + +set_option veil.printCounterexamples true +set_option veil.smt.model.minimize true +set_option veil.vc_gen "transition" +set_option maxHeartbeats 4000000 + +/-! `#check_invariants` discharges every VC by calling Z3 / CVC5 on the +generated SMT queries. With our 9 invariant clauses and 14 actions, Veil +emits 135 SMT obligations, every one of which is reported as ✅, plus a +single `safety` obligation that the chosen invariant clauses imply +`refFifo`. Crucially, this is the verification step of record in Veil: +Veil's "the invariant holds" claim is established by the SMT solver, not +by the (optional) Lean proof-term reconstruction below. -/ +#check_invariants + +prove_inv_init by { simp_all [initSimp, actSimp, invSimp] } + +prove_inv_safe by { + sdestruct st + simp_all [invSimp] +} + +/-! Best-effort Lean proof-term reconstruction from the SMT proofs. With +`veil.smt.reconstructProofs := false` (the default in this file) Veil emits +admitted lemmas that are trusted to follow from the already-validated SMT +results. Setting it to `true` attempts full Lean proof reconstruction; that +path can be brittle for protocols of this size and is not required for the +soundness story. -/ +set_option veil.smt.reconstructProofs false + +prove_inv_inductive by { + constructor + · apply inv_init + intro st st' _ hinv hnext + sts_induction <;> sdestruct_goal <;> solve_clause +} + +/-! ## Sanity (BMC): the protocol admits a non-trivial trace where both a +phase-1 and a phase-2 message reach the entity. -/ + +sat trace [happyPath] { + sendPhase1Msg + bobSendFlush + processPhase1 + aliceProcessFlush + aliceSendFlushDone + bobReceiveFlushDone + bobSendDepositGift + bobSendHandoffGive + aliceProcessDepositGift + carolReceiveHandoffGive + carolSendWithdrawGift + aliceProcessWithdrawGift + carolHoldsRef + sendPhase2Msg + processPhase2 + assert (∃ M1 M2, phase1 M1 ∧ phase2 M2 ∧ processed M1 ∧ processed M2) +} by { bmc_sat } + +end Flush diff --git a/packages/ocapn/proofs/veil/OcapnFlush/FourParty.lean b/packages/ocapn/proofs/veil/OcapnFlush/FourParty.lean new file mode 100644 index 0000000000..f88dd7b4cb --- /dev/null +++ b/packages/ocapn/proofs/veil/OcapnFlush/FourParty.lean @@ -0,0 +1,434 @@ +import Veil + +set_option linter.dupNamespace false + +/-! # End-to-end per-reference FIFO across two sequential shortenings (4-party) + +This module extends `OcapnFlush.Flush` to a 4-party scenario in which two +shortenings happen back-to-back, exercising both *non-trivial interleavings* +(the second shortening may begin while the first is still draining) and the +*chain propagation* the alternate proof report calls out in §3 (n-party +generalization). + +## Modeled scenario + +* Alice (A) hosts the entity at slot N. +* Bob (B) holds the initial remote reference and is the first sender. +* Carol (C) receives the reference via a third-party handoff initiated by + Bob (after Bob's `op:flush` round-trip with Alice). +* Dave (D) receives the reference via a *second* third-party handoff, + initiated by Carol after she in turn flushes Alice. + +Bob's reference messages are partitioned into three phases according to the +sender at the time Alice processes them: + +* `phase1 m` — sent by Bob before Bob's `op:flush`, traveling B→A. +* `phase2 m` — sent by Carol after Bob's handoff completed and before + Carol's own `op:flush`, traveling C→A. +* `phase3 m` — sent by Dave after Carol's handoff completed, + traveling D→A. + +Each handoff uses the same fence chain as in `Flush.lean`: a flush +handshake, a deposit-gift, a withdraw-gift gated on the deposit-gift, and +a "the new sender now holds the ref" step. + +## Non-trivial interleavings exercised + +* Phase-2 messages may be sent (by Carol) and processed at Alice while + Carol is *also* preparing her own flush — no precondition on her flush + initiation requires phase-2 traffic to be finished. +* Carol's `op:flush` handshake may run concurrently (in any interleaving) + with phase-2 messages still in flight. +* Phase-3 messages may begin as soon as Dave acquires the ref, without + waiting for any phase-2 message Alice hasn't yet processed. + +Despite all of these interleavings, the safety claim is the +generalization of the `Flush.lean` claim: + +> for every later-phase message Alice has processed, every message in any +> earlier phase has likewise been processed. + +i.e. the per-reference FIFO order survives the chain of shortenings +regardless of how the steps interleave on the wire. + +## Why this isn't fully concurrent flushes (alt §4) + +The two flushes here are *causally* sequential — Carol cannot issue her +flush until she has received Bob's handoff (otherwise she has no +reference to flush). What's "concurrent" is everything *else*: phase-2 +sends, phase-2 processings, phase-3 sends, etc. all interleave with the +flush handshakes in arbitrary order. The fully-symmetric concurrent case +from alt §4 (two adjacent intermediaries flushing with no causal +dependency) requires a different chain topology and is left as future +work; this is the natural intermediate step. -/ + +veil module FourParty + +type msg + +/-! ## State -/ + +relation phase1 (m : msg) +relation phase2 (m : msg) +relation phase3 (m : msg) +relation processed (m : msg) + +/- B↔A flush + handoff (transition from phase 1 to phase 2). -/ +relation bcFlushSentByB +relation bcFlushProcessedAtA +relation bcFlushDoneSent +relation bcFlushDoneReceivedAtB +relation bcDepositGiftSent +relation bcDepositGiftProcessedAtA +relation bcHandoffGiveSent +relation bcHandoffGiveReceivedAtC +relation bcWithdrawGiftSent +relation bcWithdrawGiftProcessedAtA +relation cHoldsRef + +/- C↔A flush + handoff (transition from phase 2 to phase 3). -/ +relation cdFlushSentByC +relation cdFlushProcessedAtA +relation cdFlushDoneSent +relation cdFlushDoneReceivedAtC +relation cdDepositGiftSent +relation cdDepositGiftProcessedAtA +relation cdHandoffGiveSent +relation cdHandoffGiveReceivedAtD +relation cdWithdrawGiftSent +relation cdWithdrawGiftProcessedAtA +relation dHoldsRef + +#gen_state + +after_init { + phase1 M := False + phase2 M := False + phase3 M := False + processed M := False + bcFlushSentByB := False + bcFlushProcessedAtA := False + bcFlushDoneSent := False + bcFlushDoneReceivedAtB := False + bcDepositGiftSent := False + bcDepositGiftProcessedAtA := False + bcHandoffGiveSent := False + bcHandoffGiveReceivedAtC := False + bcWithdrawGiftSent := False + bcWithdrawGiftProcessedAtA := False + cHoldsRef := False + cdFlushSentByC := False + cdFlushProcessedAtA := False + cdFlushDoneSent := False + cdFlushDoneReceivedAtC := False + cdDepositGiftSent := False + cdDepositGiftProcessedAtA := False + cdHandoffGiveSent := False + cdHandoffGiveReceivedAtD := False + cdWithdrawGiftSent := False + cdWithdrawGiftProcessedAtA := False + dHoldsRef := False +} + +/-! ## Phase 1 — Bob -/ + +action sendPhase1Msg (m : msg) = { + require ¬ bcFlushSentByB + require ¬ phase1 m + require ¬ phase2 m + require ¬ phase3 m + phase1 m := True +} + +action processPhase1 (m : msg) = { + require phase1 m + require ¬ processed m + require ¬ bcFlushProcessedAtA + processed m := True +} + +action bobSendBcFlush = { + require ¬ bcFlushSentByB + bcFlushSentByB := True +} + +action aliceProcessBcFlush = { + require bcFlushSentByB + require ¬ bcFlushProcessedAtA + require ∀ M, phase1 M → processed M + bcFlushProcessedAtA := True +} + +action aliceSendBcFlushDone = { + require bcFlushProcessedAtA + require ¬ bcFlushDoneSent + bcFlushDoneSent := True +} + +action bobReceiveBcFlushDone = { + require bcFlushDoneSent + require ¬ bcFlushDoneReceivedAtB + bcFlushDoneReceivedAtB := True +} + +action bobSendBcDepositGift = { + require bcFlushDoneReceivedAtB + require ¬ bcDepositGiftSent + bcDepositGiftSent := True +} + +action aliceProcessBcDepositGift = { + require bcDepositGiftSent + require ¬ bcDepositGiftProcessedAtA + require bcFlushProcessedAtA + bcDepositGiftProcessedAtA := True +} + +action bobSendBcHandoffGive = { + require bcFlushDoneReceivedAtB + require ¬ bcHandoffGiveSent + bcHandoffGiveSent := True +} + +action carolReceiveBcHandoffGive = { + require bcHandoffGiveSent + require ¬ bcHandoffGiveReceivedAtC + bcHandoffGiveReceivedAtC := True +} + +action carolSendBcWithdrawGift = { + require bcHandoffGiveReceivedAtC + require ¬ bcWithdrawGiftSent + bcWithdrawGiftSent := True +} + +action aliceProcessBcWithdrawGift = { + require bcWithdrawGiftSent + require ¬ bcWithdrawGiftProcessedAtA + require bcDepositGiftProcessedAtA + bcWithdrawGiftProcessedAtA := True +} + +action carolHoldsRef = { + require bcWithdrawGiftProcessedAtA + require ¬ cHoldsRef + cHoldsRef := True +} + +/-! ## Phase 2 — Carol -/ + +action sendPhase2Msg (m : msg) = { + require cHoldsRef + require ¬ cdFlushSentByC + require ¬ phase1 m + require ¬ phase2 m + require ¬ phase3 m + phase2 m := True +} + +action processPhase2 (m : msg) = { + require phase2 m + require ¬ processed m + require bcWithdrawGiftProcessedAtA + require ¬ cdFlushProcessedAtA + processed m := True +} + +action carolSendCdFlush = { + require cHoldsRef + require ¬ cdFlushSentByC + cdFlushSentByC := True +} + +action aliceProcessCdFlush = { + require cdFlushSentByC + require ¬ cdFlushProcessedAtA + require ∀ M, phase2 M → processed M + cdFlushProcessedAtA := True +} + +action aliceSendCdFlushDone = { + require cdFlushProcessedAtA + require ¬ cdFlushDoneSent + cdFlushDoneSent := True +} + +action carolReceiveCdFlushDone = { + require cdFlushDoneSent + require ¬ cdFlushDoneReceivedAtC + cdFlushDoneReceivedAtC := True +} + +action carolSendCdDepositGift = { + require cdFlushDoneReceivedAtC + require ¬ cdDepositGiftSent + cdDepositGiftSent := True +} + +action aliceProcessCdDepositGift = { + require cdDepositGiftSent + require ¬ cdDepositGiftProcessedAtA + require cdFlushProcessedAtA + cdDepositGiftProcessedAtA := True +} + +action carolSendCdHandoffGive = { + require cdFlushDoneReceivedAtC + require ¬ cdHandoffGiveSent + cdHandoffGiveSent := True +} + +action daveReceiveCdHandoffGive = { + require cdHandoffGiveSent + require ¬ cdHandoffGiveReceivedAtD + cdHandoffGiveReceivedAtD := True +} + +action daveSendCdWithdrawGift = { + require cdHandoffGiveReceivedAtD + require ¬ cdWithdrawGiftSent + cdWithdrawGiftSent := True +} + +action aliceProcessCdWithdrawGift = { + require cdWithdrawGiftSent + require ¬ cdWithdrawGiftProcessedAtA + require cdDepositGiftProcessedAtA + cdWithdrawGiftProcessedAtA := True +} + +action daveHoldsRef = { + require cdWithdrawGiftProcessedAtA + require ¬ dHoldsRef + dHoldsRef := True +} + +/-! ## Phase 3 — Dave -/ + +action sendPhase3Msg (m : msg) = { + require dHoldsRef + require ¬ phase1 m + require ¬ phase2 m + require ¬ phase3 m + phase3 m := True +} + +action processPhase3 (m : msg) = { + require phase3 m + require ¬ processed m + require cdWithdrawGiftProcessedAtA + processed m := True +} + +/-! ## Safety: cross-phase end-to-end FIFO + +For any pair of phases (i < j) and any messages `M_i` (phase i), `M_j` +(phase j), if Alice has processed `M_j` then she has also already +processed `M_i`. -/ + +safety [refFifo12] (phase1 M1 ∧ phase2 M2 ∧ processed M2) → processed M1 +safety [refFifo23] (phase2 M2 ∧ phase3 M3 ∧ processed M3) → processed M2 +safety [refFifo13] (phase1 M1 ∧ phase3 M3 ∧ processed M3) → processed M1 + +/-! ## Inductive invariants + +The fence chain doubles up: one chain for B→C, one for C→D. Each chain has +the same shape as in `Flush.lean`, and the two are linked by the fact +that `cdFlushProcessedAtA` only fires after every phase-2 message has been +processed (which by `bcFlushFence` means every phase-1 message has been +processed too). -/ + +invariant [phasesDisjoint12] ¬ (phase1 M ∧ phase2 M) +invariant [phasesDisjoint13] ¬ (phase1 M ∧ phase3 M) +invariant [phasesDisjoint23] ¬ (phase2 M ∧ phase3 M) +invariant [processedHasPhase] processed M → (phase1 M ∨ phase2 M ∨ phase3 M) + +invariant [bcFlushSentMonotone] bcFlushProcessedAtA → bcFlushSentByB +invariant [bcFlushFence] bcFlushProcessedAtA → (phase1 M → processed M) +invariant [bcDepositAfterFlush] bcDepositGiftProcessedAtA → bcFlushProcessedAtA +invariant [bcWithdrawAfterDeposit] bcWithdrawGiftProcessedAtA → bcDepositGiftProcessedAtA +invariant [cHoldsRefMeansBcWithdraw] cHoldsRef → bcWithdrawGiftProcessedAtA +invariant [phase2NeedsBcWithdraw] (phase2 M ∧ processed M) → bcWithdrawGiftProcessedAtA + +invariant [cdFlushSentMonotone] cdFlushProcessedAtA → cdFlushSentByC +invariant [cdFlushFence] cdFlushProcessedAtA → (phase2 M → processed M) +invariant [cdDepositAfterFlush] cdDepositGiftProcessedAtA → cdFlushProcessedAtA +invariant [cdWithdrawAfterDeposit] cdWithdrawGiftProcessedAtA → cdDepositGiftProcessedAtA +invariant [dHoldsRefMeansCdWithdraw] dHoldsRef → cdWithdrawGiftProcessedAtA +invariant [phase3NeedsCdWithdraw] (phase3 M ∧ processed M) → cdWithdrawGiftProcessedAtA + +/- Linkage between the two chains: Carol can only initiate the C→D flush +once she holds the ref, which transitively requires Bob's handoff to have +completed. -/ +invariant [cdFlushNeedsCHoldsRef] cdFlushSentByC → cHoldsRef + +#gen_spec + +set_option veil.printCounterexamples true +set_option veil.smt.model.minimize true +set_option veil.vc_gen "transition" +set_option maxHeartbeats 4000000 + +#check_invariants + +prove_inv_init by { simp_all [initSimp, actSimp, invSimp] } + +set_option maxRecDepth 8192 in +set_option maxHeartbeats 4000000 in +prove_inv_safe by { + sdestruct st + simp (config := { maxSteps := 4000000 }) [invSimp] +} + +set_option veil.smt.reconstructProofs false + +prove_inv_inductive by { + constructor + · apply inv_init + intro st st' _ hinv hnext + sts_induction <;> sdestruct_goal <;> solve_clause +} + +/-! ## Sanity: BMC trace exercising both shortenings, with phase-2 traffic +flowing while Carol is preparing her own flush. -/ + +sat trace [twoShortenings] { + -- Phase 1: Bob sends and Alice processes some, then Bob flushes. + sendPhase1Msg + bobSendBcFlush + processPhase1 + aliceProcessBcFlush + aliceSendBcFlushDone + bobReceiveBcFlushDone + bobSendBcDepositGift + bobSendBcHandoffGive + aliceProcessBcDepositGift + carolReceiveBcHandoffGive + carolSendBcWithdrawGift + aliceProcessBcWithdrawGift + carolHoldsRef + -- Phase 2: Carol sends and Alice processes — interleaved with Carol + -- preparing the second flush only after the first phase-2 msg was sent. + sendPhase2Msg + carolSendCdFlush + processPhase2 + aliceProcessCdFlush + aliceSendCdFlushDone + carolReceiveCdFlushDone + carolSendCdDepositGift + carolSendCdHandoffGive + aliceProcessCdDepositGift + daveReceiveCdHandoffGive + daveSendCdWithdrawGift + aliceProcessCdWithdrawGift + daveHoldsRef + -- Phase 3: Dave sends and Alice processes. + sendPhase3Msg + processPhase3 + assert + (∃ M1 M2 M3, + phase1 M1 ∧ phase2 M2 ∧ phase3 M3 ∧ + processed M1 ∧ processed M2 ∧ processed M3) +} by { bmc_sat } + +end FourParty diff --git a/packages/ocapn/proofs/veil/README.md b/packages/ocapn/proofs/veil/README.md new file mode 100644 index 0000000000..453393e260 --- /dev/null +++ b/packages/ocapn/proofs/veil/README.md @@ -0,0 +1,170 @@ +# OCapN op:flush — mechanized FIFO proof (Lean 4 + Veil) + +This Lake project mechanizes the central per-reference FIFO claim of the +OCapN promise-shortening / `op:flush` proposal in the +[Veil framework](https://github.com/verse-lab/veil) on top of Lean 4. + +## Modules + +* **`OcapnFlush/Flush.lean`** — single shortening, 3-party (Alice / Bob / + Carol). The minimal model that exercises the cross-phase Sandwich Lemma. +* **`OcapnFlush/FourParty.lean`** — two sequential shortenings, 4-party + (Alice / Bob / Carol / Dave). Same fence chain instantiated twice; the + second shortening can begin and progress while phase-2 traffic is still + in flight, so the model exercises the *non-trivial interleaving* case + the alt proof report calls out as the next step beyond a single + shortening event. + +## What is verified + +A simplified 3-party model (Alice, Bob, Carol) where: + +- Alice exports a local promise `p1` at slot `N`. +- Bob is the sole sender of reference messages targeting `p1`. +- Bob may shorten the path B→A by initiating an `op:flush` followed by a + third-party handoff to Carol; afterwards, Bob's traffic for the same + reference reaches Alice over the C→A channel. + +Bob's reference messages are partitioned into: + +- **phase 1** — sent strictly before `op:flush`, traveling B→A. +- **phase 2** — sent after the handoff completes, traveling C→A. + +The **safety property** verified is the *cross-phase end-to-end FIFO* claim +identified in the informal proof: + +> `phase1 m₁ ∧ phase2 m₂ ∧ processed m₂ → processed m₁` + +i.e. once Alice has dispatched any phase-2 message to the entity, every +phase-1 message has likewise been dispatched. Within-phase ordering is +established by the underlying transport's pairwise FIFO and is out of scope +for this model — it is folded into Alice-side processing preconditions. + +## What is *not* in this model + +- Fully concurrent flushes at *adjacent* intermediaries (alt §4 corner + case). `FourParty.lean` does verify two shortenings whose handshakes + may interleave with each other's data plane, but the second shortening + is causally downstream of the first (Carol can only flush after she + receives the ref via Bob's handoff). The fully-symmetric case where + two intermediaries hold the reference *independently* and flush at the + same time is the genuinely-interesting next step. +- Within-phase ordering. Both modules abstract over per-message order + inside a phase; FIFO is established by the underlying transport and + doesn't exercise the flush mechanic. +- Liveness. We verify safety only. +- Multi-sender per phase: each phase has a single logical sender. + +## How the proof is structured + +### `OcapnFlush/Flush.lean` — single shortening (3-party) + +Declares a Veil transition system with: + +- mutable relations for phase classification, `processed`, and protocol- + event flags; +- 14 actions for each protocol step (Bob's sends, Alice's processings, + Carol's withdraw, etc.); each action's preconditions encode the relevant + pairwise-FIFO and gating axioms (A1, A3, A4 of the informal proof); +- 8 inductive-invariant clauses + 1 safety clause (`refFifo`) — the + "two-fence chain" from the informal proof: + + 1. `flushProcessedAtA → flushSentByB` + 2. `flushProcessedAtA → (phase1 M → processed M)` *flush fence* + 3. `depositGiftProcessedAtA → flushProcessedAtA` *B→A FIFO* + 4. `withdrawGiftProcessedAtA → depositGiftProcessedAtA` *A3 — gift gates* + 5. `cHoldsRef → withdrawGiftProcessedAtA` *Carol's reply* + 6. `phase2 M ∧ processed M → withdrawGiftProcessedAtA` *C→A FIFO* + 7. `processed M → (phase1 M ∨ phase2 M)` *processing only via the model's actions* + 8. `¬ (phase1 M ∧ phase2 M)` *phases disjoint* + + Together they chain end-to-end: + `phase2 M₂ ∧ processed M₂ → withdraw → deposit → flush_processed → + (phase1 M₁ → processed M₁)`. + +Verification is via Veil's `#check_invariants`, which emits 144 SMT +obligations (initialization, every safety/clause × every action) and +discharges them with Z3 / CVC5. Every clause is reported `✅`. Veil +considers this the verification of record; full Lean proof-term +reconstruction (via `prove_inv_inductive`) is left admitted with `sorry` +because reconstruction can be slow on transition systems of this size and +is not necessary for the soundness story (Veil's correctness depends on +the SMT solvers, not on whether each VC is reified into Lean). + +A `sat trace [happyPath]` BMC obligation also confirms the protocol admits +a non-trivial 15-step execution that ends with both a phase-1 and a phase-2 +message processed. + +### `OcapnFlush/FourParty.lean` — two sequential shortenings (4-party) + +Adds a fourth party Dave and a second flush+handoff initiated by Carol +once she holds the reference. The state doubles (one fence chain for +B↔A, one for C↔A); the safety property generalizes to three pairwise +cross-phase claims: + +``` +phase1 m₁ ∧ phase2 m₂ ∧ processed m₂ → processed m₁ +phase2 m₂ ∧ phase3 m₃ ∧ processed m₃ → processed m₂ +phase1 m₁ ∧ phase3 m₃ ∧ processed m₃ → processed m₁ +``` + +The key linkage invariant is `cdFlushSentByC → cHoldsRef`, which is what +forces Carol's flush to be causally downstream of Bob's handoff and lets +the two fence chains compose. Crucially, the action preconditions do +*not* require phase-2 traffic to be drained before Carol initiates her +own flush — phase-2 sends and processings may interleave freely with the +C↔A flush handshake. The fence `cdFlushFence` (`cdFlushProcessedAtA → +phase2 M → processed M`) encodes the consequence of B↔A FIFO that makes +the eventual cross-phase ordering hold despite the interleaving. + +`#check_invariants` emits 580 SMT obligations for this module +(initialization × 21 invariant clauses + 27 actions × 21 clauses + 3 +safety clauses), all reported `✅`. A `sat trace [twoShortenings]` BMC +obligation runs a 30-step execution that issues, for each phase, a send +followed by an Alice-side processing, with Carol's flush initiation +deliberately interleaved between the first phase-2 send and its +processing. + +## How to build + +The project requires Lean 4 (v4.24.0), Veil, Z3, CVC5, and uv. From this +directory: + +```sh +# Lean toolchain (use elan or download v4.24.0 from +# https://github.com/leanprover/lean4/releases manually) +lake update # fetch Veil + transitive deps (mathlib, lean-smt) +lake build OcapnFlush.Flush +``` + +`lake build` will download Z3 and CVC5 into `.lake/packages/veil/.lake/build` +during a `veil/downloadDependencies` step. `uv` is also fetched there; if +your environment blocks `astral.sh`, install `uv` separately (`pip install +uv`) and copy the binary into the same path: + +```sh +cp "$(which uv)" .lake/packages/veil/.lake/build/uv +``` + +A successful build prints, for every clause in the inductive-invariant +table, a `✅` verdict, and ends with `Build completed successfully`. The +two `sorry` warnings on `prove_inv_inductive` and `sat trace [happyPath]` +are expected — they correspond to claims the SMT solver has already +validated and that we have intentionally not reconstructed in Lean. + +## Files + +- `lakefile.lean` — Lake project; declares the dependency on Veil. +- `lean-toolchain` — pins Lean 4.24.0. +- `OcapnFlush.lean` — library entry point. +- `OcapnFlush/Flush.lean` — the model and proof. + +## Synthesis with the informal proof + +This formalization corresponds to §2 of the alternate proof report (the +3-party Sandwich Lemma). The model's invariant chain instantiates the +"two-fence" structure — one fence at `flush_done` (the flush fence) and +one at `deposit_gift` (the gift fence). Sections 3 (n-party) and 4 +(concurrent flushes) of the alternate report would require richer state +(multiple intermediaries, cross-channel ordering proofs) that we +intentionally deferred so this proof stays small and fast to check. diff --git a/packages/ocapn/proofs/veil/lake-manifest.json b/packages/ocapn/proofs/veil/lake-manifest.json new file mode 100644 index 0000000000..a0dc0c3de3 --- /dev/null +++ b/packages/ocapn/proofs/veil/lake-manifest.json @@ -0,0 +1,135 @@ +{"version": "1.1.0", + "packagesDir": ".lake/packages", + "packages": + [{"url": "https://github.com/verse-lab/veil.git", + "type": "git", + "subDir": null, + "scope": "", + "rev": "6a12daa8d3e0e8a9808a40ecab17ca030b557063", + "name": "veil", + "manifestFile": "lake-manifest.json", + "inputRev": "main", + "inherited": false, + "configFile": "lakefile.lean"}, + {"url": "https://github.com/ufmg-smite/lean-smt.git", + "type": "git", + "subDir": null, + "scope": "", + "rev": "3c6b049eb0ccf7c331f3686e12e5f5b666bdd30a", + "name": "smt", + "manifestFile": "lake-manifest.json", + "inputRev": "3c6b049eb0ccf7c331f3686e12e5f5b666bdd30a", + "inherited": true, + "configFile": "lakefile.lean"}, + {"url": "https://github.com/leanprover-community/lean-auto.git", + "type": "git", + "subDir": null, + "scope": "", + "rev": "36d85bf6372f1a35fb5487325d1ef965b33c6296", + "name": "auto", + "manifestFile": "lake-manifest.json", + "inputRev": "36d85bf6372f1a35fb5487325d1ef965b33c6296", + "inherited": true, + "configFile": "lakefile.lean"}, + {"url": "https://github.com/leanprover-community/mathlib4.git", + "type": "git", + "subDir": null, + "scope": "", + "rev": "f897ebcf72cd16f89ab4577d0c826cd14afaafc7", + "name": "mathlib", + "manifestFile": "lake-manifest.json", + "inputRev": "v4.24.0", + "inherited": true, + "configFile": "lakefile.lean"}, + {"url": "https://github.com/abdoo8080/lean-cvc5.git", + "type": "git", + "subDir": null, + "scope": "", + "rev": "20a6b2f8f108d8c83b706b3502744e571303c23e", + "name": "cvc5", + "manifestFile": "lake-manifest.json", + "inputRev": "20a6b2f", + "inherited": true, + "configFile": "lakefile.lean"}, + {"url": "https://github.com/leanprover-community/plausible", + "type": "git", + "subDir": null, + "scope": "leanprover-community", + "rev": "dfd06ebfe8d0e8fa7faba9cb5e5a2e74e7bd2805", + "name": "plausible", + "manifestFile": "lake-manifest.json", + "inputRev": "main", + "inherited": true, + "configFile": "lakefile.toml"}, + {"url": "https://github.com/leanprover-community/LeanSearchClient", + "type": "git", + "subDir": null, + "scope": "leanprover-community", + "rev": "99657ad92e23804e279f77ea6dbdeebaa1317b98", + "name": "LeanSearchClient", + "manifestFile": "lake-manifest.json", + "inputRev": "main", + "inherited": true, + "configFile": "lakefile.toml"}, + {"url": "https://github.com/leanprover-community/import-graph", + "type": "git", + "subDir": null, + "scope": "leanprover-community", + "rev": "d768126816be17600904726ca7976b185786e6b9", + "name": "importGraph", + "manifestFile": "lake-manifest.json", + "inputRev": "main", + "inherited": true, + "configFile": "lakefile.toml"}, + {"url": "https://github.com/leanprover-community/ProofWidgets4", + "type": "git", + "subDir": null, + "scope": "leanprover-community", + "rev": "556caed0eadb7901e068131d1be208dd907d07a2", + "name": "proofwidgets", + "manifestFile": "lake-manifest.json", + "inputRev": "v0.0.74", + "inherited": true, + "configFile": "lakefile.lean"}, + {"url": "https://github.com/leanprover-community/aesop", + "type": "git", + "subDir": null, + "scope": "leanprover-community", + "rev": "725ac8cd67acd70a7beaf47c3725e23484c1ef50", + "name": "aesop", + "manifestFile": "lake-manifest.json", + "inputRev": "master", + "inherited": true, + "configFile": "lakefile.toml"}, + {"url": "https://github.com/leanprover-community/quote4", + "type": "git", + "subDir": null, + "scope": "leanprover-community", + "rev": "dea6a3361fa36d5a13f87333dc506ada582e025c", + "name": "Qq", + "manifestFile": "lake-manifest.json", + "inputRev": "master", + "inherited": true, + "configFile": "lakefile.toml"}, + {"url": "https://github.com/leanprover-community/batteries", + "type": "git", + "subDir": null, + "scope": "leanprover-community", + "rev": "8da40b72fece29b7d3fe3d768bac4c8910ce9bee", + "name": "batteries", + "manifestFile": "lake-manifest.json", + "inputRev": "main", + "inherited": true, + "configFile": "lakefile.toml"}, + {"url": "https://github.com/leanprover/lean4-cli", + "type": "git", + "subDir": null, + "scope": "leanprover", + "rev": "91c18fa62838ad0ab7384c03c9684d99d306e1da", + "name": "Cli", + "manifestFile": "lake-manifest.json", + "inputRev": "main", + "inherited": true, + "configFile": "lakefile.toml"}], + "name": "OcapnFlush", + "lakeDir": ".lake"} diff --git a/packages/ocapn/proofs/veil/lakefile.lean b/packages/ocapn/proofs/veil/lakefile.lean new file mode 100644 index 0000000000..0d67a61f1a --- /dev/null +++ b/packages/ocapn/proofs/veil/lakefile.lean @@ -0,0 +1,9 @@ +import Lake +open Lake DSL + +require veil from git "https://github.com/verse-lab/veil.git" @ "main" + +package OcapnFlush where + +@[default_target] +lean_lib OcapnFlush where diff --git a/packages/ocapn/proofs/veil/lean-toolchain b/packages/ocapn/proofs/veil/lean-toolchain new file mode 100644 index 0000000000..c00a53503c --- /dev/null +++ b/packages/ocapn/proofs/veil/lean-toolchain @@ -0,0 +1 @@ +leanprover/lean4:v4.24.0 diff --git a/packages/ocapn/src/captp/pairwise.js b/packages/ocapn/src/captp/pairwise.js index f4632111b0..7085d9978f 100644 --- a/packages/ocapn/src/captp/pairwise.js +++ b/packages/ocapn/src/captp/pairwise.js @@ -16,6 +16,7 @@ import { makeRefCounter } from './refcount.js'; * @property {(value: object) => Slot | undefined} getSlotForValue - Lookup the slot for a value. Does NOT increment refcount. * @property {(slot: Slot) => object | undefined} getValueForSlot - Lookup the value for a slot. Does NOT increment refcount. * @property {(slot: Slot, value: object) => void} registerSlot - Register a slot and value. Does NOT increment refcount. + * @property {(slot: Slot, value: object) => void} replaceExportValue - Replace the value held at an existing local (export) slot, preserving its refcount. Used by op:flush to swap a resolver in place. * @property {(slot: Slot, refcount: number) => void} dropSlot - Decrements refcount by the given amount, Drops the slot and value if the refcount reaches 0. * @property {(slot: Slot) => number} getRefCount - Get the committed refcount for the slot. * @property {(slot: Slot) => void} recordSentSlot - Increments refcount for the slot. @@ -167,6 +168,32 @@ export const makePairwiseTable = ({ } }; + /** + * Replace the value at an existing local (export) slot, keeping its + * refcount intact. The previous value's reverse mapping is dropped and + * the new value's mapping is installed so that lookups by value continue + * to work. The export hook is fired again so observers can re-record the + * binding. + * + * @param {Slot} slot + * @param {object} newValue + */ + const replaceExportValue = (slot, newValue) => { + if (!isSlotLocal(slot)) { + throw new Error('replaceExportValue: only local slots are supported'); + } + if (!exportTable.has(slot)) { + throw new Error(`replaceExportValue: no export at slot ${slot}`); + } + const oldValue = exportTable.get(slot); + if (oldValue !== undefined) { + valueToSlot.delete(oldValue); + } + exportTable.set(slot, newValue); + valueToSlot.set(newValue, slot); + exportHook(newValue, slot); + }; + /** * @param {Slot} slot * @param {number} refcount @@ -234,6 +261,7 @@ export const makePairwiseTable = ({ getSlotForValue, getValueForSlot, registerSlot, + replaceExportValue, dropSlot, getRefCount, recordSentSlot, diff --git a/packages/ocapn/src/client/ocapn.js b/packages/ocapn/src/client/ocapn.js index d78ee96c0d..b171891e3b 100644 --- a/packages/ocapn/src/client/ocapn.js +++ b/packages/ocapn/src/client/ocapn.js @@ -666,6 +666,7 @@ const makeBootstrapObject = ( * @property {OcapnTable} ocapnTable * @property {(message: object) => void} sendMessage * @property {(observer: MessageObserver) => () => void} subscribeMessages + * @property {(remoteValue: object) => Promise} flushExport */ /** @@ -731,6 +732,19 @@ export const makeOcapn = ( connection.end(); // eslint-disable-next-line no-use-before-define ocapnTable.destroy(disconnectError); + // Reject any flush operations that haven't completed. + // eslint-disable-next-line no-use-before-define + for (const promiseKit of pendingFlushSettlers.values()) { + promiseKit.reject(disconnectError); + } + // eslint-disable-next-line no-use-before-define + pendingFlushSettlers.clear(); + // eslint-disable-next-line no-use-before-define + for (const promiseKit of flushedPromiseSettlers.values()) { + promiseKit.reject(disconnectError); + } + // eslint-disable-next-line no-use-before-define + flushedPromiseSettlers.clear(); // Notify the session manager to immediately end the session. // This prevents race conditions where a new connection arrives // before the socket 'close' event fires. @@ -959,12 +973,132 @@ export const makeOcapn = ( ocapnTable.dropSlot(slot, 1); } }, + 'op:flush': message => { + const { position } = message; + logger.info(`flush`, { position }); + // The flush targets the export-table position of a local promise. The + // promise-shortening proposal calls for the local resolver `r` to be + // fulfilled with a fresh promise `p'` and for the export at the same + // position to be replaced with the resolver `r'` of `p'`. Subsequent + // messages from the peer that target this position then naturally + // buffer on `p'` (an unresolved HandledPromise) until something + // (typically the third-party handoff completing) resolves it. + const slot = makeSlot('p', true, position); + // eslint-disable-next-line no-use-before-define + const oldValue = ocapnTable.getValueForSlot(slot); + if (oldValue === undefined) { + throw Error( + `OCapN: op:flush: No local promise at position ${position}`, + ); + } + // Build a fresh promise / settler pair; the settler is held in + // flushedPromiseSettlers so a subsequent shortening step (e.g. a + // handoff) can resolve it. Until then any deliveries targeting this + // position queue up locally on the unresolved promise. + const rPrimeKit = makePromiseKit(); + const pPrime = rPrimeKit.promise; + // Silence the unhandled rejection warning, but don't affect handlers. + pPrime.catch(sink); + // eslint-disable-next-line no-use-before-define + ocapnTable.replaceExportValue(slot, pPrime); + // eslint-disable-next-line no-use-before-define + flushedPromiseSettlers.set(position, rPrimeKit); + // Acknowledge the flush. Because messages on this connection are FIFO, + // by the time this op:flush-done reaches the peer it has already + // received every message Alice had sent that targeted the prior + // promise. + // eslint-disable-next-line no-use-before-define + send({ + type: 'op:flush-done', + position, + }); + }, + 'op:flush-done': message => { + const { position } = message; + logger.info(`flush-done`, { position }); + // eslint-disable-next-line no-use-before-define + const pendingKit = pendingFlushSettlers.get(position); + if (pendingKit === undefined) { + throw Error( + `OCapN: op:flush-done: No pending flush at position ${position}`, + ); + } + // eslint-disable-next-line no-use-before-define + pendingFlushSettlers.delete(position); + pendingKit.resolve(undefined); + }, 'op:abort': message => { const { reason } = message; abort(reason); }, }); + /** + * Promise-kits for `op:flush` that we have sent and are awaiting an + * `op:flush-done` for. Keyed by the export-position we asked the peer to + * flush. + * @type {Map>} + */ + const pendingFlushSettlers = new Map(); + /** + * Promise-kits for fresh promises we created when the peer asked us to + * flush one of our exported promises. The settler can be used to fulfill + * the new promise (and thus deliver any buffered messages) once the + * shortened reference is available. + * @type {Map>} + */ + const flushedPromiseSettlers = new Map(); + + /** + * Send an op:flush for the given remote promise (i.e. an imported + * `desc:import-promise`) and return a promise that resolves when the + * peer's op:flush-done has been received. After that point all messages + * that the peer had previously sent for this promise have been received, + * which is the precondition the proposal needs before initiating the + * third-party handoff that shortens the path. + * + * @param {object} remoteValue + * @returns {Promise} + */ + const flushExport = remoteValue => { + // eslint-disable-next-line no-use-before-define + if (didUnplug()) { + return /** @type {Promise} */ ( + /** @type {unknown} */ ( + // eslint-disable-next-line no-use-before-define + quietReject(didUnplug()) + ) + ); + } + // eslint-disable-next-line no-use-before-define + const slot = ocapnTable.getSlotForValue(remoteValue); + if (slot === undefined) { + throw Error('flushExport: value is not tracked in this session'); + } + const { type, isLocal, position } = parseSlot(slot); + if (isLocal) { + throw Error( + `flushExport: value must be a remote (peer-exported) reference, got slot ${slot}`, + ); + } + if (type !== 'p') { + throw Error(`flushExport: value must be a promise, got slot ${slot}`); + } + if (pendingFlushSettlers.has(position)) { + throw Error( + `flushExport: a flush is already pending at position ${position}`, + ); + } + const promiseKit = makePromiseKit(); + pendingFlushSettlers.set(position, promiseKit); + // eslint-disable-next-line no-use-before-define + send({ + type: 'op:flush', + position, + }); + return /** @type {Promise} */ (promiseKit.promise); + }; + /** * @param {Record} message */ @@ -1276,6 +1410,7 @@ export const makeOcapn = ( ocapnTable, sendMessage: send, subscribeMessages, + flushExport, }; } return harden(ocapn); diff --git a/packages/ocapn/src/codecs/operations.js b/packages/ocapn/src/codecs/operations.js index cda6fb3245..d22d697eb8 100644 --- a/packages/ocapn/src/codecs/operations.js +++ b/packages/ocapn/src/codecs/operations.js @@ -60,6 +60,27 @@ const OpGcAnswersCodec = makeOcapnRecordCodecFromDefinition( }, ); +// op:flush is sent by Bob to Alice when Bob is about to shorten a promise +// exported by Alice (typically as part of a third-party handoff). It targets +// the export-table position of Alice's resolver for the promise. On receipt +// Alice swaps the value at that position for a fresh local promise so any +// further messages from Bob targeting the position buffer locally on Alice's +// side until the shortened path resolves, then replies with op:flush-done. +const OpFlushCodec = makeOcapnRecordCodecFromDefinition('OpFlush', 'op:flush', { + position: NonNegativeIntegerCodec, +}); + +// op:flush-done is Alice's acknowledgement that the swap has happened. After +// receiving it, Bob knows it has received every message Alice had previously +// sent that targeted the original promise, and may proceed with the handoff. +const OpFlushDoneCodec = makeOcapnRecordCodecFromDefinition( + 'OpFlushDone', + 'op:flush-done', + { + position: NonNegativeIntegerCodec, + }, +); + export const OcapnPreSessionOperationsCodecs = makeRecordUnionCodec( 'OcapnPreSessionOperations', { @@ -204,6 +225,8 @@ export const makeOcapnOperationsCodecs = (descCodecs, passableCodecs) => { OpListenCodec, OpGcExportsCodec, OpGcAnswersCodec, + OpFlushCodec, + OpFlushDoneCodec, }); /** diff --git a/packages/ocapn/test/codecs/operations.test.js b/packages/ocapn/test/codecs/operations.test.js index c82230a49d..1655f4a08e 100644 --- a/packages/ocapn/test/codecs/operations.test.js +++ b/packages/ocapn/test/codecs/operations.test.js @@ -161,6 +161,22 @@ export const table = [ answerPositions: [1n], }, }, + { + // ; non-negative integer (export position to flush) + name: 'op:flush', + value: { + type: 'op:flush', + position: 7n, + }, + }, + { + // ; non-negative integer (matches op:flush position) + name: 'op:flush-done', + value: { + type: 'op:flush-done', + position: 7n, + }, + }, // Below are messages observed in the ocapn python test suite. { name: 'python op:deliver fetch 1', diff --git a/packages/ocapn/test/codecs/snapshots/operations.test.js.md b/packages/ocapn/test/codecs/snapshots/operations.test.js.md index 6ad6a9135e..154b2570cc 100644 Binary files a/packages/ocapn/test/codecs/snapshots/operations.test.js.md and b/packages/ocapn/test/codecs/snapshots/operations.test.js.md differ diff --git a/packages/ocapn/test/codecs/snapshots/operations.test.js.snap b/packages/ocapn/test/codecs/snapshots/operations.test.js.snap index 20cc91c225..23d86b2ebe 100644 Binary files a/packages/ocapn/test/codecs/snapshots/operations.test.js.snap and b/packages/ocapn/test/codecs/snapshots/operations.test.js.snap differ diff --git a/packages/ocapn/test/flush.test.js b/packages/ocapn/test/flush.test.js new file mode 100644 index 0000000000..43c9272f86 --- /dev/null +++ b/packages/ocapn/test/flush.test.js @@ -0,0 +1,211 @@ +// @ts-check + +import harden from '@endo/harden'; +import { E } from '@endo/eventual-send'; +import { Far } from '@endo/marshal'; +import { makePromiseKit } from '@endo/promise-kit'; +import { + testWithErrorUnwrapping, + makeTestClientPair, + getOcapnDebug, + waitUntilTrue, +} from './_util.js'; +import { encodeSwissnum } from '../src/client/util.js'; +import { makeSlot, parseSlot } from '../src/captp/pairwise.js'; + +testWithErrorUnwrapping( + 'op:flush swaps the export and acknowledges', + async t => { + // B exports a Promise (via PromiseProvider.get()). A then sends + // op:flush targeting the export-position. B should swap the value + // at that position for a fresh local promise and reply with + // op:flush-done. A's flushExport() promise resolves on receipt. + const promiseKit = makePromiseKit(); + const testObjectTable = new Map(); + testObjectTable.set( + 'PromiseProvider', + Far('PromiseProvider', { + // Wrapping the promise in an array forces it to be serialized as a + // desc:import-promise immediately rather than awaited; otherwise an + // unresolved promise return value blocks the fulfill message until + // it settles. + get: () => harden([promiseKit.promise]), + }), + ); + + const { establishSession, shutdownBoth } = await makeTestClientPair({ + makeDefaultSwissnumTable: () => testObjectTable, + clientAOptions: { enableImportCollection: false }, + clientBOptions: { enableImportCollection: false }, + }); + + try { + const { sessionA, sessionB } = await establishSession(); + const debugA = getOcapnDebug(sessionA.ocapn); + const debugB = getOcapnDebug(sessionB.ocapn); + + const bootstrapB = sessionA.ocapn.getRemoteBootstrap(); + const provider = await E(bootstrapB).fetch( + encodeSwissnum('PromiseProvider'), + ); + // Trigger the Promise to be exported by B. + const answerPromise = E(provider).get(); + // Drop the floating promise; the call's mere act of being sent + // ensures B exports its promise. + answerPromise.catch(() => {}); + + // Wait until B has registered an export slot for promiseKit.promise. + await waitUntilTrue( + () => + debugB.ocapnTable.getSlotForValue(promiseKit.promise) !== undefined, + ); + const bExportSlot = debugB.ocapnTable.getSlotForValue(promiseKit.promise); + if (!bExportSlot) { + t.fail('B did not export its promise'); + return; + } + const { type, isLocal, position } = parseSlot(bExportSlot); + t.is(type, 'p', 'B exported the value as a promise slot'); + t.true(isLocal, 'slot is local on B'); + + // Now wait until A has imported the promise at the matching slot. + const aImportSlot = makeSlot('p', false, position); + await waitUntilTrue( + () => debugA.ocapnTable.getValueForSlot(aImportSlot) !== undefined, + ); + const aImportValue = debugA.ocapnTable.getValueForSlot(aImportSlot); + if (!aImportValue) { + t.fail('A does not have an import for the promise position'); + return; + } + + // Verify that B holds promiseKit.promise at the export position. + const beforeValue = debugB.ocapnTable.getValueForSlot(bExportSlot); + t.is( + beforeValue, + promiseKit.promise, + 'before flush, B exports its original promise at the position', + ); + + // Issue the flush from A. + const flushDone = debugA.flushExport(aImportValue); + await flushDone; + + // After op:flush-done has been received on A, B's export at the + // same position must be a *different* (still pending) promise. + const afterValue = debugB.ocapnTable.getValueForSlot(bExportSlot); + t.not( + afterValue, + promiseKit.promise, + 'after flush, B has swapped the export for a fresh promise', + ); + t.true( + afterValue instanceof Promise, + 'after flush, the swapped export is still a promise', + ); + } finally { + shutdownBoth(); + } + }, +); + +testWithErrorUnwrapping( + 'flushExport: a second flush at the same position errors', + async t => { + const promiseKit = makePromiseKit(); + const testObjectTable = new Map(); + testObjectTable.set( + 'PromiseProvider', + Far('PromiseProvider', { + get: () => harden([promiseKit.promise]), + }), + ); + + const { establishSession, shutdownBoth } = await makeTestClientPair({ + makeDefaultSwissnumTable: () => testObjectTable, + clientAOptions: { enableImportCollection: false }, + clientBOptions: { enableImportCollection: false }, + }); + + try { + const { sessionA, sessionB } = await establishSession(); + const debugA = getOcapnDebug(sessionA.ocapn); + const debugB = getOcapnDebug(sessionB.ocapn); + + const provider = await E(sessionA.ocapn.getRemoteBootstrap()).fetch( + encodeSwissnum('PromiseProvider'), + ); + E(provider) + .get() + .catch(() => {}); + + await waitUntilTrue( + () => + debugB.ocapnTable.getSlotForValue(promiseKit.promise) !== undefined, + ); + const bSlot = debugB.ocapnTable.getSlotForValue(promiseKit.promise); + if (!bSlot) { + t.fail('B did not export the promise'); + return; + } + const { position } = parseSlot(bSlot); + const aSlot = makeSlot('p', false, position); + await waitUntilTrue( + () => debugA.ocapnTable.getValueForSlot(aSlot) !== undefined, + ); + const aValue = debugA.ocapnTable.getValueForSlot(aSlot); + if (!aValue) { + t.fail('A does not have an import for the promise position'); + return; + } + + // Start a flush but do not await it. + const firstFlush = debugA.flushExport(aValue); + // Calling again before the first completes must throw. + t.throws(() => debugA.flushExport(aValue), { + message: /already pending/, + }); + await firstFlush; + } finally { + shutdownBoth(); + } + }, +); + +testWithErrorUnwrapping( + 'flushExport rejects for non-promise references', + async t => { + const testObjectTable = new Map(); + testObjectTable.set('Greeter', Far('Greeter', { hi: () => 'hello' })); + const { establishSession, shutdownBoth } = await makeTestClientPair({ + makeDefaultSwissnumTable: () => testObjectTable, + }); + try { + const { sessionA } = await establishSession(); + const debugA = getOcapnDebug(sessionA.ocapn); + const greeter = await E(sessionA.ocapn.getRemoteBootstrap()).fetch( + encodeSwissnum('Greeter'), + ); + // Greeter is an imported *object*, not a promise. flushExport + // should refuse it. + t.throws(() => debugA.flushExport(greeter), { + message: /must be a promise/, + }); + } finally { + shutdownBoth(); + } + }, +); + +testWithErrorUnwrapping('flushExport rejects for unknown values', async t => { + const { establishSession, shutdownBoth } = await makeTestClientPair({}); + try { + const { sessionA } = await establishSession(); + const debugA = getOcapnDebug(sessionA.ocapn); + t.throws(() => debugA.flushExport(Far('NotTracked', {})), { + message: /not tracked/, + }); + } finally { + shutdownBoth(); + } +});