Add transports, connection layer, and cleanup chaining#9
Conversation
Three composable layers with the same { listener, sender } interface:
- Transports: iframe (with origin validation), worker, BroadcastChannel
- Connection: SYN/ACK handshake, message queue, per-message ACKs, auto-retry
- Cleanup chaining: invoke/listen call transport cleanup automatically
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR introduces modular transport implementations (broadcast channels, iframes, web workers) and a new Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Conn as createConnection
participant Transport as Transport Layer
participant Remote as Remote Peer
App->>Conn: createConnection(transport)
activate Conn
Conn->>Conn: Start SYN handshake loop
Conn->>Transport: Send {type: 'syn'}
Transport->>Remote: Deliver SYN
Remote->>Transport: Send {type: 'ack'}
Transport->>Conn: Receive ACK
Conn->>Conn: Mark connected, flush queue
deactivate Conn
App->>Conn: invoke(message, id:1)
activate Conn
Conn->>Conn: Queue until connected
Conn->>Transport: Send message + id:1
Transport->>Remote: Deliver message
Remote->>Transport: Send {type: 'msg-ack', id:1}
Transport->>Conn: Receive ACK
Conn->>Conn: Clear retry timer, resolve
deactivate Conn
App->>Conn: invoke(message, id:2)<br/>(with dropped response)
activate Conn
Conn->>Transport: Send message + id:2
Transport->>Remote: Deliver message
Note over Remote: No ACK returned
Conn->>Conn: Retry with backoff
Conn->>Transport: Resend message + id:2
Note over Conn: After max retries exhausted
Conn->>App: Reject with error
deactivate Conn
Estimated code review effort🎯 4 (Complex) | ⏱️ ~55 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (1)
src/invoke/invoke.ts (1)
74-82: Harden cleanup chaining against partial teardown.If
listenerCleanup()throws,sentMessagesStore.clear()andtransportCleanup?.()are skipped. Wrapping cleanup steps intry/finallypreserves teardown guarantees.♻️ Proposed defensive cleanup chaining
const cleanup = (): void => { if (cleanedUp) { throw new Error('cleanup() has already been called.') } cleanedUp = true - listenerCleanup() - sentMessagesStore.clear() - transportCleanup?.() + try { + listenerCleanup() + } finally { + try { + sentMessagesStore.clear() + } finally { + transportCleanup?.() + } + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/invoke/invoke.ts` around lines 74 - 82, The cleanup() function must run all teardown steps even if one throws; change the body (after setting cleanedUp = true) to execute listenerCleanup(), sentMessagesStore.clear(), and transportCleanup?.() in a defensive chain using nested try/finally (or try/catch + rethrow) so that each step is always attempted; if a step throws, capture the error, continue to run the remaining teardowns, and rethrow the original error after all teardown attempts complete. Target the cleanup function and the symbols listenerCleanup, sentMessagesStore.clear, and transportCleanup to implement this robust chaining.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/connection/createConnection.test.ts`:
- Around line 106-131: The test never triggers the retry logic because connB
currently always acknowledges and responds; modify the test to simulate a
dropped acknowledgement/response on the first delivery and then assert that the
request is retried (and not double-invoked). Concretely: using
createTransportPair()/createConnection() and the listen({ model, ...connB })
handler, instrument the message handler for the "greet" RPC so that the first
incoming request from connA is ignored (do not call back or send a
msg-ack/response), then allow subsequent deliveries to be processed normally;
assert that callCount is 1 after retry (or assert resend by observing that connA
sends the message again and the handler receives it twice but only processes
once if dedupe is expected). Ensure the test still calls connA.cleanup() and
connB.cleanup().
In `@src/connection/createConnection.ts`:
- Around line 130-148: The retry logic in scheduleRetry (and similar blocks
around lines 182-188, 205-213) can cause replay of non-idempotent RPCs when ACKs
are lost; add duplicate-suppression and cached-response replay keyed by message
id before performing a resend. Specifically, when scheduling or performing a
retry in scheduleRetry, check a new dedupe cache (map keyed by pending.id) that
records recently-seen incoming messages and stored responses; if the receiver
already handled the id, reuse/replay the cached response instead of calling
transport.sender/incomingHandler again, and ensure msg-ack still clears
pending.retryTimer and removes entries appropriately; update pendingMessages and
rejectMessage flows to populate and expire the dedupe cache based on
config.retry windows so duplicates are suppressed and cached responses are
replayed rather than re-executing handlers.
- Around line 80-87: The handshake timeout leaves the transport in a dead state
because you clear synInterval and reject the current queue but do not mark the
connection as terminal, so subsequent sender() calls keep pushing into queue;
fix by making the timeout terminal: after rejecting and clearing queue in the
handshake timeout handler (handshakeTimeout) set cleanedUp = true (or set a new
terminal flag) and ensure sender() checks that flag and immediately rejects new
sends via rejectMessage; apply the same terminal-flag change to the later
timeout block referenced (around the 227-233 logic) so both code paths prevent
further enqueuing after timeout.
- Around line 174-181: The 'syn' branch in handleControlMessage currently
replies with an 'ack' but does not mark the connection as connected; update
handleControlMessage so that when msg.type === 'syn' it both sends the ack via
transport.sender(superjson.stringify({ type: 'ack' })) and calls onConnected()
immediately (same as the 'ack' branch) to treat an incoming syn as proof the
peer is up, preventing premature timeouts with asymmetric synInterval/timeout
values.
- Around line 242-245: In cleanup(), before clearing timers and pendingMessages,
call rejectMessage (or the existing rejection path) for each pending message id
so any outstanding invoke() promises waiting for msg-ack are rejected; iterate
pendingMessages.values(), if pending.retryTimer clear it, then call
rejectMessage(pending.id, new Error("connection closed")) (or existing reject
helper) for each pending entry, and only after that pendingMessages.clear() to
ensure callers receive rejection instead of hanging; update the cleanup function
that currently touches pendingMessages and retryTimer to include this rejection
step.
In `@src/transports/iframe.ts`:
- Around line 18-21: The message handler cb currently only checks event.origin
and event.data; also ensure the message comes from the intended window by gating
on event.source === targetWindow before invoking handler. Update the cb
(MessageEvent) checks to validate event.source against the targetWindow variable
(in addition to existing origin and typeof data checks) and only call
handler(event.data) when all three conditions pass.
---
Nitpick comments:
In `@src/invoke/invoke.ts`:
- Around line 74-82: The cleanup() function must run all teardown steps even if
one throws; change the body (after setting cleanedUp = true) to execute
listenerCleanup(), sentMessagesStore.clear(), and transportCleanup?.() in a
defensive chain using nested try/finally (or try/catch + rethrow) so that each
step is always attempted; if a step throws, capture the error, continue to run
the remaining teardowns, and rethrow the original error after all teardown
attempts complete. Target the cleanup function and the symbols listenerCleanup,
sentMessagesStore.clear, and transportCleanup to implement this robust chaining.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 434b87f9-da31-4fa6-b662-0216a36ae72f
📒 Files selected for processing (14)
package.jsonsrc/bime.tssrc/broadcast-channel.tssrc/connection.tssrc/connection/createConnection.test.tssrc/connection/createConnection.tssrc/iframe.tssrc/invoke/invoke.tssrc/listen/listen.tssrc/transports/broadcastChannel.tssrc/transports/iframe.tssrc/transports/worker.tssrc/types.tssrc/worker.ts
| test('retries unacknowledged messages', async () => { | ||
| const { a, b } = createTransportPair() | ||
|
|
||
| const connA = createConnection(a, { | ||
| synInterval: 10, | ||
| retry: { timeout: 20, tries: 3, backoff: 1 }, | ||
| }) | ||
| const connB = createConnection(b, { synInterval: 10 }) | ||
|
|
||
| const callCount = jest.fn() | ||
| const model = { | ||
| greet: (name: string) => { | ||
| callCount() | ||
| return `Hello ${name}` | ||
| }, | ||
| } | ||
|
|
||
| listen({ model, ...connB }) | ||
| const api = invoke<typeof model>(connA) | ||
|
|
||
| const result = await api.greet('World') | ||
| expect(result).toEqual('Hello World') | ||
|
|
||
| connA.cleanup() | ||
| connB.cleanup() | ||
| }) |
There was a problem hiding this comment.
This "retry" test never enters the retry path.
Nothing here suppresses a msg-ack or response, so the first delivery succeeds and callCount is never used. As written, this passes even if scheduleRetry() is dead code. Please drop a msg-ack/response in this case and assert the expected resend or dedupe behavior.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/connection/createConnection.test.ts` around lines 106 - 131, The test
never triggers the retry logic because connB currently always acknowledges and
responds; modify the test to simulate a dropped acknowledgement/response on the
first delivery and then assert that the request is retried (and not
double-invoked). Concretely: using createTransportPair()/createConnection() and
the listen({ model, ...connB }) handler, instrument the message handler for the
"greet" RPC so that the first incoming request from connA is ignored (do not
call back or send a msg-ack/response), then allow subsequent deliveries to be
processed normally; assert that callCount is 1 after retry (or assert resend by
observing that connA sends the message again and the handler receives it twice
but only processes once if dedupe is expected). Ensure the test still calls
connA.cleanup() and connB.cleanup().
| const handshakeTimeout = setTimeout(() => { | ||
| if (connected) return | ||
| clearInterval(synInterval) | ||
| for (const raw of queue) { | ||
| rejectMessage(raw, 'Connection timed out') | ||
| } | ||
| queue.length = 0 | ||
| }, config.timeout) |
There was a problem hiding this comment.
Handshake timeout leaves the transport in a dead queueing state.
After timeout you stop the SYN loop and reject the current queue, but later sender() calls still push onto queue because neither connected nor cleanedUp changes. Unless a late ack happens, every post-timeout send can hang forever. Either make timeout terminal and reject future sends immediately, or keep the handshake loop alive so those queued messages still have a path to connect.
Also applies to: 227-233
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/connection/createConnection.ts` around lines 80 - 87, The handshake
timeout leaves the transport in a dead state because you clear synInterval and
reject the current queue but do not mark the connection as terminal, so
subsequent sender() calls keep pushing into queue; fix by making the timeout
terminal: after rejecting and clearing queue in the handshake timeout handler
(handshakeTimeout) set cleanedUp = true (or set a new terminal flag) and ensure
sender() checks that flag and immediately rejects new sends via rejectMessage;
apply the same terminal-flag change to the later timeout block referenced
(around the 227-233 logic) so both code paths prevent further enqueuing after
timeout.
| function scheduleRetry( | ||
| pending: PendingMessage, | ||
| delay: number, | ||
| triesLeft: number, | ||
| ) { | ||
| pending.retryTimer = setTimeout(() => { | ||
| if (pending.acknowledged || cleanedUp) return | ||
|
|
||
| if (triesLeft > 0) { | ||
| transport.sender(pending.raw) | ||
| scheduleRetry(pending, delay * config.retry.backoff, triesLeft - 1) | ||
| } else { | ||
| pendingMessages.delete(pending.id) | ||
| rejectMessage( | ||
| pending.raw, | ||
| `Message was not acknowledged after ${config.retry.tries} retries`, | ||
| ) | ||
| } | ||
| }, delay) |
There was a problem hiding this comment.
Retries can replay non-idempotent RPCs.
Only msg-ack clears the retry timer, so if that ACK is lost the original message is resent verbatim. The receiver then forwards every copy with the same id to incomingHandler, which can execute the same procedure multiple times and duplicate side effects. This layer needs duplicate suppression keyed by message id, ideally with cached-response replay, before auto-retrying arbitrary RPC traffic.
Also applies to: 182-188, 205-213
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/connection/createConnection.ts` around lines 130 - 148, The retry logic
in scheduleRetry (and similar blocks around lines 182-188, 205-213) can cause
replay of non-idempotent RPCs when ACKs are lost; add duplicate-suppression and
cached-response replay keyed by message id before performing a resend.
Specifically, when scheduling or performing a retry in scheduleRetry, check a
new dedupe cache (map keyed by pending.id) that records recently-seen incoming
messages and stored responses; if the receiver already handled the id,
reuse/replay the cached response instead of calling
transport.sender/incomingHandler again, and ensure msg-ack still clears
pending.retryTimer and removes entries appropriately; update pendingMessages and
rejectMessage flows to populate and expire the dedupe cache based on
config.retry windows so duplicates are suppressed and cached responses are
replayed rather than re-executing handlers.
| function handleControlMessage(msg: ControlMessage) { | ||
| switch (msg.type) { | ||
| case 'syn': | ||
| transport.sender(superjson.stringify({ type: 'ack' })) | ||
| break | ||
| case 'ack': | ||
| onConnected() | ||
| break |
There was a problem hiding this comment.
Treat an incoming syn as proof that the peer is up.
The syn branch replies with ack but keeps this side disconnected until it later receives an ack to its own future syn. With asymmetric synInterval/timeout values, this side can time out even though it already saw a live peer. Call onConnected() when handling syn.
🔧 Proposed fix
case 'syn':
transport.sender(superjson.stringify({ type: 'ack' }))
+ onConnected()
break📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| function handleControlMessage(msg: ControlMessage) { | |
| switch (msg.type) { | |
| case 'syn': | |
| transport.sender(superjson.stringify({ type: 'ack' })) | |
| break | |
| case 'ack': | |
| onConnected() | |
| break | |
| function handleControlMessage(msg: ControlMessage) { | |
| switch (msg.type) { | |
| case 'syn': | |
| transport.sender(superjson.stringify({ type: 'ack' })) | |
| onConnected() | |
| break | |
| case 'ack': | |
| onConnected() | |
| break |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/connection/createConnection.ts` around lines 174 - 181, The 'syn' branch
in handleControlMessage currently replies with an 'ack' but does not mark the
connection as connected; update handleControlMessage so that when msg.type ===
'syn' it both sends the ack via transport.sender(superjson.stringify({ type:
'ack' })) and calls onConnected() immediately (same as the 'ack' branch) to
treat an incoming syn as proof the peer is up, preventing premature timeouts
with asymmetric synInterval/timeout values.
| for (const pending of pendingMessages.values()) { | ||
| if (pending.retryTimer) clearTimeout(pending.retryTimer) | ||
| } | ||
| pendingMessages.clear() |
There was a problem hiding this comment.
Cleanup should reject unacknowledged sends before dropping them.
This loop clears timers and pendingMessages, but it never calls rejectMessage() for those ids. Any composed invoke() call whose request was sent and is still waiting for msg-ack will hang when conn.cleanup() runs.
🧹 Proposed fix
for (const pending of pendingMessages.values()) {
if (pending.retryTimer) clearTimeout(pending.retryTimer)
+ rejectMessage(pending.raw, 'Connection was cleaned up')
}
pendingMessages.clear()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for (const pending of pendingMessages.values()) { | |
| if (pending.retryTimer) clearTimeout(pending.retryTimer) | |
| } | |
| pendingMessages.clear() | |
| for (const pending of pendingMessages.values()) { | |
| if (pending.retryTimer) clearTimeout(pending.retryTimer) | |
| rejectMessage(pending.raw, 'Connection was cleaned up') | |
| } | |
| pendingMessages.clear() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/connection/createConnection.ts` around lines 242 - 245, In cleanup(),
before clearing timers and pendingMessages, call rejectMessage (or the existing
rejection path) for each pending message id so any outstanding invoke() promises
waiting for msg-ack are rejected; iterate pendingMessages.values(), if
pending.retryTimer clear it, then call rejectMessage(pending.id, new
Error("connection closed")) (or existing reject helper) for each pending entry,
and only after that pendingMessages.clear() to ensure callers receive rejection
instead of hanging; update the cleanup function that currently touches
pendingMessages and retryTimer to include this rejection step.
| const cb = (event: MessageEvent) => { | ||
| if (event.origin !== origin) return | ||
| if (typeof event.data !== 'string') return | ||
| handler(event.data) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n src/transports/iframe.tsRepository: mike-pete/bime
Length of output: 1103
🏁 Script executed:
# Check how iframeTransport is used in the codebase
rg "iframeTransport" --type ts --type tsx -B 2 -A 5Repository: mike-pete/bime
Length of output: 85
🏁 Script executed:
# Check how iframeTransport is used in the codebase
rg "iframeTransport" --type ts -B 2 -A 5Repository: mike-pete/bime
Length of output: 484
🏁 Script executed:
# Check the Transport type definition
cat -n src/types.tsRepository: mike-pete/bime
Length of output: 401
🏁 Script executed:
# Search for test files or examples showing how iframeTransport is used
fd "test\|spec\|example" --type f | head -20Repository: mike-pete/bime
Length of output: 40
Also validate event.source against the target window.
Checking event.origin alone accepts messages from any same-origin frame/popup, allowing same-origin peers to inject messages into this transport or cross-talk between instances. Gate dispatch on event.source === targetWindow before calling handler.
🛡️ Proposed fix
const cb = (event: MessageEvent) => {
if (event.origin !== origin) return
+ if (event.source !== targetWindow) return
if (typeof event.data !== 'string') return
handler(event.data)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const cb = (event: MessageEvent) => { | |
| if (event.origin !== origin) return | |
| if (typeof event.data !== 'string') return | |
| handler(event.data) | |
| const cb = (event: MessageEvent) => { | |
| if (event.origin !== origin) return | |
| if (event.source !== targetWindow) return | |
| if (typeof event.data !== 'string') return | |
| handler(event.data) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/transports/iframe.ts` around lines 18 - 21, The message handler cb
currently only checks event.origin and event.data; also ensure the message comes
from the intended window by gating on event.source === targetWindow before
invoking handler. Update the cb (MessageEvent) checks to validate event.source
against the targetWindow variable (in addition to existing origin and typeof
data checks) and only call handler(event.data) when all three conditions pass.
Context
Adds a three-layer composable architecture for reliable message passing:
api.cleanup()tears down the whole stack automaticallyEach layer produces the same
{ listener, sender }interface (decorator pattern), so they compose naturally:Summary by CodeRabbit
Release Notes
New Features
Tests
Chores