diff --git a/src/l2ps/channel/index.ts b/src/l2ps/channel/index.ts index b3e7baf..495d223 100644 --- a/src/l2ps/channel/index.ts +++ b/src/l2ps/channel/index.ts @@ -50,3 +50,12 @@ export { verifyTranscript, type TranscriptVerificationResult, } from "./transcript" + +export { + L2PSChannelTransport, + type L2PSChannelTransportOpts, + type ChannelSessionLike, + type L2PSMessagingPeerLike, + type SerializedEncryptedMessage, + type IncomingMessagePayload, +} from "./transport" diff --git a/src/l2ps/channel/transport.ts b/src/l2ps/channel/transport.ts new file mode 100644 index 0000000..45c207a --- /dev/null +++ b/src/l2ps/channel/transport.ts @@ -0,0 +1,246 @@ +/** + * SR-4 WI-A — Channel-over-L2PS transport adapter. + * + * The DACS channel layer (`ChannelSession`) is transport-agnostic: it + * produces signed `ChannelMessage` envelopes and verifies incoming ones, + * but does not move bytes. The L2PS instant-messaging layer + * (`L2PSMessagingPeer`) moves encrypted bytes between subnet members but + * knows nothing about channel envelopes. This adapter bridges the two: + * it serialises + encrypts an outgoing envelope onto the messaging + * transport, and on the receive side decrypts, deserialises, and feeds + * envelopes into the session **in strict sequence order**. + * + * Why a reorder buffer is mandatory: + * `ChannelSession.receiveIncoming` accepts any sequence strictly greater + * than the highest it has seen and then advances its counter to that + * value — so a *gap* (seq N+2 arriving before N+1) is not rejected, it + * permanently skips the missing messages. The L2PS transport delivers by + * timestamp and replays an offline queue, so out-of-order arrival is + * normal. This adapter therefore only hands the session the next + * contiguous sequence and buffers anything ahead of the gap until it + * fills. + * + * Scope / assumptions: + * - The shared per-channel sequence counter assumes turn-based messaging + * (DACS negotiation is offer → counter → accept; a party waits for the + * counter before replying). Two parties emitting the same sequence + * concurrently is out of scope for SR-4 v1; such a colliding inbound + * message (seq <= already-applied) is dropped as a duplicate. + * - Membership is the channel's `members`; the transport sends to every + * member except self. Confidentiality is the subnet AES key (only + * members hold it) — CH-2. + */ + +import type { ChannelMessage, ChannelMessageType } from "./types" + +/** + * Minimal surface of `ChannelSession` the adapter depends on. Declared + * structurally so tests can inject a fake without real signing keys. + */ +export interface ChannelSessionLike { + readonly channelId: string + sendOutgoing(opts: { + type: ChannelMessageType + body: unknown + sentAt?: number + repliesTo?: number + }): Promise + receiveIncoming(msg: ChannelMessage): Promise +} + +/** Wire shape the L2PS messaging transport carries (ciphertext + nonce, base64). */ +export interface SerializedEncryptedMessage { + ciphertext: string + nonce: string + ephemeralKey?: string +} + +/** Incoming payload the messaging peer hands to an onMessage handler. */ +export interface IncomingMessagePayload { + from: string + encrypted: SerializedEncryptedMessage + messageHash: string + offline?: boolean +} + +/** + * Minimal surface of `L2PSMessagingPeer` the adapter depends on. + * Declared structurally for the same reason as `ChannelSessionLike`. + */ +export interface L2PSMessagingPeerLike { + send( + to: string, + encrypted: SerializedEncryptedMessage, + messageHash: string, + ): Promise + onMessage(handler: (payload: IncomingMessagePayload) => void): void +} + +export interface L2PSChannelTransportOpts { + session: ChannelSessionLike + peer: L2PSMessagingPeerLike + /** Subnet AES-256 key (raw 32 bytes) — only members hold it (CH-2). */ + sharedKey: Uint8Array + /** + * Recipient public keys to deliver to (every channel member except + * self), in the format the messaging peer routes on. + */ + recipients: string[] + /** Called with each envelope once it has been applied in order. */ + onMessage?: (msg: ChannelMessage) => void + /** Called on a decrypt / parse / verification failure. */ + onError?: (err: Error) => void +} + +export class L2PSChannelTransport { + private readonly session: ChannelSessionLike + private readonly peer: L2PSMessagingPeerLike + private readonly sharedKey: Uint8Array + private readonly recipients: string[] + private readonly onMessage?: (msg: ChannelMessage) => void + private readonly onError?: (err: Error) => void + + /** Sequences seen ahead of the gap, awaiting contiguous application. */ + private readonly buffer = new Map() + /** Highest contiguous sequence applied locally (sent or received). */ + private appliedSeq = 0 + /** Serialises drain() so concurrent inbound frames can't interleave. */ + private draining: Promise = Promise.resolve() + private started = false + + constructor(opts: L2PSChannelTransportOpts) { + this.session = opts.session + this.peer = opts.peer + this.sharedKey = opts.sharedKey + this.recipients = opts.recipients + this.onMessage = opts.onMessage + this.onError = opts.onError + } + + /** Wire the peer's inbound handler. Call once. */ + start(): void { + if (this.started) throw new Error("L2PSChannelTransport: already started") + this.started = true + this.peer.onMessage(payload => { + // Chain onto `draining` so frames are processed one at a time + // in arrival order; each appends to the buffer then drains. + this.draining = this.draining + .then(() => this.ingest(payload)) + .catch(err => this.onError?.(err as Error)) + }) + } + + /** + * Build + sign the next outgoing envelope via the session, encrypt + * it under the subnet key, and deliver to every recipient. + */ + async send(opts: { + type: ChannelMessageType + body: unknown + repliesTo?: number + }): Promise { + const signed = await this.session.sendOutgoing(opts) + // Our own send advances the shared per-channel counter; record it + // so we expect the peer's reply at appliedSeq + 1. + if (signed.sequence > this.appliedSeq) this.appliedSeq = signed.sequence + + const encrypted = await this.encrypt(JSON.stringify(signed)) + const messageHash = signed.signature.signature // unique per signed envelope + for (const to of this.recipients) { + await this.peer.send(to, encrypted, messageHash) + } + return signed + } + + private async ingest(payload: IncomingMessagePayload): Promise { + let msg: ChannelMessage + try { + const json = await this.decrypt(payload.encrypted) + msg = JSON.parse(json) as ChannelMessage + } catch (e) { + throw new Error( + `L2PSChannelTransport: failed to decrypt/parse inbound frame from ${payload.from}: ${ + (e as Error).message + }`, + ) + } + + // Ignore traffic for other channels sharing the same subnet. + if (msg.channelId !== this.session.channelId) return + // Duplicate / already-applied (or a colliding concurrent send): drop. + if (msg.sequence <= this.appliedSeq) return + // Buffer; a later sequence with the same value would be a protocol + // error — keep the first seen. + if (!this.buffer.has(msg.sequence)) this.buffer.set(msg.sequence, msg) + + await this.drain() + } + + /** Apply every buffered message that is now contiguous with appliedSeq. */ + private async drain(): Promise { + // Sequence counter is shared across both directions, so the next + // expected inbound is appliedSeq + 1. + let next = this.buffer.get(this.appliedSeq + 1) + while (next) { + this.buffer.delete(next.sequence) + // Throws on tamper (bad signature, wrong sender, channelId + // mismatch) — propagate as channel-fatal per §8.12. + await this.session.receiveIncoming(next) + this.appliedSeq = next.sequence + this.onMessage?.(next) + next = this.buffer.get(this.appliedSeq + 1) + } + } + + /** Count of out-of-order messages currently held awaiting the gap. */ + get bufferedCount(): number { + return this.buffer.size + } + + // ── AES-256-GCM (Web Crypto) — matches the messaging wire format: + // ciphertext (with auth tag) + 12-byte nonce, both base64. ── + + private async encrypt(plaintext: string): Promise { + const nonce = crypto.getRandomValues(new Uint8Array(12)) + const key = await crypto.subtle.importKey( + "raw", + this.sharedKey.buffer as ArrayBuffer, + "AES-GCM", + false, + ["encrypt"], + ) + const cipher = await crypto.subtle.encrypt( + { name: "AES-GCM", iv: nonce }, + key, + new TextEncoder().encode(plaintext), + ) + return { + ciphertext: bytesToBase64(new Uint8Array(cipher)), + nonce: bytesToBase64(nonce), + } + } + + private async decrypt(enc: SerializedEncryptedMessage): Promise { + const key = await crypto.subtle.importKey( + "raw", + this.sharedKey.buffer as ArrayBuffer, + "AES-GCM", + false, + ["decrypt"], + ) + const plain = await crypto.subtle.decrypt( + { name: "AES-GCM", iv: base64ToBytes(enc.nonce) as BufferSource }, + key, + base64ToBytes(enc.ciphertext) as BufferSource, + ) + return new TextDecoder().decode(plain) + } +} + +function bytesToBase64(bytes: Uint8Array): string { + return Buffer.from(bytes).toString("base64") +} + +function base64ToBytes(b64: string): Uint8Array { + return new Uint8Array(Buffer.from(b64, "base64")) +} diff --git a/src/tests/l2ps/transport.e2e.test.ts b/src/tests/l2ps/transport.e2e.test.ts new file mode 100644 index 0000000..3cff7c9 --- /dev/null +++ b/src/tests/l2ps/transport.e2e.test.ts @@ -0,0 +1,214 @@ +/** + * WI-A end-to-end: two REAL Demos identities, two REAL ChannelSessions, + * two L2PSChannelTransports, talking over an in-process relay that + * stands in for the L2PS messaging server. Unlike transport.test.ts + * (which mocks the session to isolate the reorder buffer), this exercises + * the full stack: real CCI-key signing, real cross-party verification + * (signature + sender∈members + channelId), real AES-256-GCM under a + * shared subnet key, and the reorder buffer under realistic out-of-order + * delivery. + */ + +import { Demos, DemosWebAuth } from "@/websdk" +import { demosClaimRefForAddress, type ClaimReference } from "@/identity/cci" +import { ChannelSession } from "@/l2ps/channel" +import { + L2PSChannelTransport, + type IncomingMessagePayload, + type L2PSMessagingPeerLike, + type SerializedEncryptedMessage, +} from "@/l2ps/channel/transport" + +const CHANNEL = "ch-e2e-1" +const SUBNET_KEY = new Uint8Array(32).fill(11) + +async function newConnectedDemos(): Promise<{ + demos: Demos + claim: ClaimReference +}> { + const auth = new DemosWebAuth() + await auth.create() + const demos = new Demos() + await demos.connectWallet(auth.keypair.privateKey as Uint8Array) + return { + demos, + claim: demosClaimRefForAddress(await demos.getEd25519Address()), + } +} + +/** + * In-process two-endpoint relay simulating the messaging transport. + * `deliver` routes a send to the *other* endpoint's handler. When + * `hold` is true, frames are queued instead of delivered so a test can + * flush them in an arbitrary order (offline-queue replay / reorder). + */ +class Relay { + private handlers = new Map void>() + private held: { to: string; payload: IncomingMessagePayload }[] = [] + hold = false + + endpoint(selfId: string): L2PSMessagingPeerLike { + return { + send: async ( + to: string, + encrypted: SerializedEncryptedMessage, + messageHash: string, + ) => { + const payload: IncomingMessagePayload = { + from: selfId, + encrypted, + messageHash, + } + if (this.hold) this.held.push({ to, payload }) + else this.dispatch(to, payload) + return { messageHash, l2psStatus: "submitted" as const } + }, + onMessage: handler => this.handlers.set(selfId, handler), + } + } + + private dispatch(to: string, payload: IncomingMessagePayload): void { + const h = this.handlers.get(to) + if (h) h(payload) + } + + /** Flush held frames in the given index order (to simulate reordering). */ + flush(order: number[]): void { + const frames = this.held + this.held = [] + for (const i of order) this.dispatch(frames[i].to, frames[i].payload) + } + + /** Tamper the most recently held frame's signature, then flush in order. */ + flushTampered(): void { + for (const f of this.held) { + // Corrupt the ciphertext so decrypt fails (cheapest tamper that + // still drives the transport's error path). + f.payload.encrypted = { + ...f.payload.encrypted, + ciphertext: f.payload.encrypted.ciphertext.slice(0, -4) + "AAAA", + } + } + const frames = this.held + this.held = [] + for (const f of frames) this.dispatch(f.to, f.payload) + } +} + +async function flush(): Promise { + for (let i = 0; i < 4; i++) await new Promise(r => setTimeout(r, 0)) +} + +describe("L2PSChannelTransport — E2E with real identities", () => { + let alice: { demos: Demos; claim: ClaimReference } + let bob: { demos: Demos; claim: ClaimReference } + + beforeEach(async () => { + alice = await newConnectedDemos() + bob = await newConnectedDemos() + }) + + function wire(relay: Relay) { + const members = [alice.claim, bob.claim] + const aSes = new ChannelSession({ + channelId: CHANNEL, + members, + me: alice.claim, + demos: alice.demos, + }) + const bSes = new ChannelSession({ + channelId: CHANNEL, + members, + me: bob.claim, + demos: bob.demos, + }) + const aRecv: number[] = [] + const bRecv: number[] = [] + const aErr: Error[] = [] + const bErr: Error[] = [] + const aTx = new L2PSChannelTransport({ + session: aSes, + peer: relay.endpoint("alice"), + sharedKey: SUBNET_KEY, + recipients: ["bob"], + onMessage: m => aRecv.push(m.sequence), + onError: e => aErr.push(e), + }) + const bTx = new L2PSChannelTransport({ + session: bSes, + peer: relay.endpoint("bob"), + sharedKey: SUBNET_KEY, + recipients: ["alice"], + onMessage: m => bRecv.push(m.sequence), + onError: e => bErr.push(e), + }) + return { aSes, bSes, aTx, bTx, aRecv, bRecv, aErr, bErr } + } + + it("full offer → counter → accept round verifies on both ends", async () => { + const relay = new Relay() + const { aSes, bSes, aTx, bTx, aRecv, bRecv, aErr, bErr } = wire(relay) + await aSes.open() + await bSes.open() + aTx.start() + bTx.start() + + await aTx.send({ type: "offer", body: { price: 100 } }) // seq 1 + await flush() + await bTx.send({ type: "counter", body: { price: 90 }, repliesTo: 1 }) // seq 2 + await flush() + await aTx.send({ type: "accept", body: { price: 90 }, repliesTo: 2 }) // seq 3 + await flush() + + // bob received alice's offer(1) + accept(3); alice received bob's counter(2) + expect(bRecv).toEqual([1, 3]) + expect(aRecv).toEqual([2]) + // Real signature verification passed throughout — no errors. + expect(aErr).toEqual([]) + expect(bErr).toEqual([]) + // Both transcripts agree on the three messages in sequence. + expect(aSes.messages().map(m => m.sequence)).toEqual([1, 2, 3]) + expect(bSes.messages().map(m => m.sequence)).toEqual([1, 2, 3]) + }) + + it("reorders a burst delivered out of order (offline-queue replay)", async () => { + const relay = new Relay() + const { aSes, bSes, aTx, bTx, bRecv, bErr } = wire(relay) + await aSes.open() + await bSes.open() + aTx.start() + bTx.start() + + // Alice bursts two messages without waiting; the relay holds them. + relay.hold = true + await aTx.send({ type: "offer", body: { n: 1 } }) // seq 1 + await aTx.send({ type: "counter", body: { n: 2 } }) // seq 2 + // Deliver REVERSED: seq2 arrives before seq1. + relay.flush([1, 0]) + await flush() + + // Despite reversed arrival, bob applied them in sequence order. + expect(bRecv).toEqual([1, 2]) + expect(bErr).toEqual([]) + expect(bSes.messages().map(m => m.sequence)).toEqual([1, 2]) + }) + + it("surfaces a tampered frame via onError without advancing", async () => { + const relay = new Relay() + const { aSes, bSes, aTx, bTx, bRecv, bErr } = wire(relay) + await aSes.open() + await bSes.open() + aTx.start() + bTx.start() + + relay.hold = true + await aTx.send({ type: "offer", body: { n: 1 } }) // seq 1 + relay.flushTampered() + await flush() + + // Corrupted ciphertext → decrypt fails → onError, nothing applied. + expect(bRecv).toEqual([]) + expect(bErr.length).toBe(1) + expect(bSes.messages()).toEqual([]) + }) +}) diff --git a/src/tests/l2ps/transport.test.ts b/src/tests/l2ps/transport.test.ts new file mode 100644 index 0000000..a6d8cd4 --- /dev/null +++ b/src/tests/l2ps/transport.test.ts @@ -0,0 +1,201 @@ +import { + L2PSChannelTransport, + type ChannelSessionLike, + type IncomingMessagePayload, + type L2PSMessagingPeerLike, + type SerializedEncryptedMessage, +} from "@/l2ps/channel/transport" +import type { ChannelMessage, ChannelMessageType } from "@/l2ps/channel" + +const CHANNEL = "ch-test-1" +const KEY = new Uint8Array(32).fill(7) + +/** + * Fake session: enforces the same monotonic rule as the real + * ChannelSession (`sequence` must be strictly greater than the highest + * seen) and records the order it accepted messages, but without real + * signatures so the reorder logic can be tested in isolation. + */ +class FakeSession implements ChannelSessionLike { + readonly channelId = CHANNEL + highestSeen = 0 + applied: number[] = [] + private outSeq = 0 + + async sendOutgoing(opts: { + type: ChannelMessageType + body: unknown + }): Promise { + const sequence = ++this.highestSeen + this.outSeq = sequence + return makeMsg(sequence, opts.type, opts.body) + } + + async receiveIncoming(msg: ChannelMessage): Promise { + if (msg.sequence <= this.highestSeen) { + throw new Error( + `non-monotonic ${msg.sequence} <= ${this.highestSeen}`, + ) + } + this.highestSeen = msg.sequence + this.applied.push(msg.sequence) + } +} + +/** Fake peer: lets the test capture sends and inject inbound frames. */ +class FakePeer implements L2PSMessagingPeerLike { + sent: { to: string; messageHash: string }[] = [] + private handler: ((p: IncomingMessagePayload) => void) | null = null + + async send(to: string, _e: SerializedEncryptedMessage, messageHash: string) { + this.sent.push({ to, messageHash }) + return { messageHash, l2psStatus: "submitted" as const } + } + onMessage(handler: (p: IncomingMessagePayload) => void): void { + this.handler = handler + } + inject(p: IncomingMessagePayload): void { + if (!this.handler) throw new Error("no handler registered") + this.handler(p) + } +} + +function makeMsg( + sequence: number, + type: ChannelMessageType = "offer", + body: unknown = {}, +): ChannelMessage { + return { + channelId: CHANNEL, + sequence, + sender: "demos:0x" + "a".repeat(64), + sentAt: 1000 + sequence, + type, + body, + signature: { sigVersion: "1", signature: "0xsig" + sequence }, + } as ChannelMessage +} + +/** + * AES-256-GCM encrypt to the SerializedEncryptedMessage wire shape using + * the same scheme the adapter decrypts with, so an injected frame + * round-trips. (The adapter's own encrypt() is private; this mirrors it.) + */ +async function encryptEnvelope( + msg: ChannelMessage, +): Promise { + const nonce = crypto.getRandomValues(new Uint8Array(12)) + const key = await crypto.subtle.importKey( + "raw", + KEY.buffer as ArrayBuffer, + "AES-GCM", + false, + ["encrypt"], + ) + const cipher = await crypto.subtle.encrypt( + { name: "AES-GCM", iv: nonce }, + key, + new TextEncoder().encode(JSON.stringify(msg)), + ) + return { + ciphertext: Buffer.from(new Uint8Array(cipher)).toString("base64"), + nonce: Buffer.from(nonce).toString("base64"), + } +} + +async function inject( + peer: FakePeer, + msg: ChannelMessage, + from = "0xpeer", +): Promise { + peer.inject({ + from, + encrypted: await encryptEnvelope(msg), + messageHash: msg.signature.signature, + }) +} + +/** Wait for the adapter's internal drain chain to settle. */ +async function flush(): Promise { + await new Promise(r => setTimeout(r, 0)) + await new Promise(r => setTimeout(r, 0)) +} + +describe("L2PSChannelTransport — reorder buffer", () => { + function setup() { + const session = new FakeSession() + const peer = new FakePeer() + const applied: number[] = [] + const transport = new L2PSChannelTransport({ + session, + peer, + sharedKey: KEY, + recipients: ["0xpeer"], + onMessage: m => applied.push(m.sequence), + }) + transport.start() + return { session, peer, transport, applied } + } + + it("applies in-order arrivals immediately", async () => { + const { peer, transport, applied } = setup() + await inject(peer, makeMsg(1)) + await flush() + await inject(peer, makeMsg(2)) + await flush() + expect(applied).toEqual([1, 2]) + expect(transport.bufferedCount).toBe(0) + }) + + it("buffers a gap and applies in order once it fills", async () => { + const { peer, transport, applied } = setup() + // seq 3 and 2 arrive before 1 (offline-queue replay / reorder) + await inject(peer, makeMsg(3)) + await flush() + expect(applied).toEqual([]) // held — gap at 1,2 + expect(transport.bufferedCount).toBe(1) + + await inject(peer, makeMsg(2)) + await flush() + expect(applied).toEqual([]) // still held — gap at 1 + expect(transport.bufferedCount).toBe(2) + + await inject(peer, makeMsg(1)) + await flush() + // 1 fills the gap → 1,2,3 drain contiguously, in order + expect(applied).toEqual([1, 2, 3]) + expect(transport.bufferedCount).toBe(0) + }) + + it("drops duplicates / already-applied sequences", async () => { + const { peer, applied } = setup() + await inject(peer, makeMsg(1)) + await flush() + await inject(peer, makeMsg(1)) // replay + await flush() + expect(applied).toEqual([1]) + }) + + it("ignores envelopes for a different channel on the same subnet", async () => { + const { peer, applied } = setup() + const other = makeMsg(1) + ;(other as { channelId: string }).channelId = "ch-other" + await inject(peer, other) + await flush() + expect(applied).toEqual([]) + }) + + it("interleaves our sends with peer replies on the shared counter", async () => { + const { peer, transport, applied } = setup() + // We send seq 1; peer replies seq 2; we send seq 3; peer seq 4. + await transport.send({ type: "offer", body: {} }) // seq 1 (outgoing) + await inject(peer, makeMsg(2, "counter")) + await flush() + await transport.send({ type: "counter", body: {} }) // seq 3 (outgoing) + await inject(peer, makeMsg(4, "accept")) + await flush() + // Only inbound (2,4) go through onMessage; sends are returned directly. + expect(applied).toEqual([2, 4]) + expect(peer.sent.length).toBe(2) // one delivery per send to the single recipient + }) +}) diff --git a/src/websdk/BroadcastFailedError.ts b/src/websdk/BroadcastFailedError.ts index 5ff5f16..490e897 100644 --- a/src/websdk/BroadcastFailedError.ts +++ b/src/websdk/BroadcastFailedError.ts @@ -13,7 +13,9 @@ */ export class BroadcastFailedError extends Error { public readonly txHash: string - public override readonly cause: unknown + // No `override`: the configured lib (es2021) predates ES2022's + // `Error.cause`, so the base class has no `cause` to override (TS4113). + public readonly cause: unknown constructor(opts: { txHash: string; cause: unknown; message?: string }) { const causeMessage = diff --git a/src/websdk/TransportError.ts b/src/websdk/TransportError.ts index fe4e07a..2b12ddf 100644 --- a/src/websdk/TransportError.ts +++ b/src/websdk/TransportError.ts @@ -9,7 +9,9 @@ */ export class TransportError extends Error { public readonly attempts: number - public override readonly cause: unknown + // No `override`: the configured lib (es2021) predates ES2022's + // `Error.cause`, so the base class has no `cause` to override (TS4113). + public readonly cause: unknown constructor(message: string, opts: { cause: unknown; attempts: number }) { super(message)