Skip to content

feat: WebSocket support via functional events#85

Open
brainkim wants to merge 27 commits into
mainfrom
feat/websocket-functional-events
Open

feat: WebSocket support via functional events#85
brainkim wants to merge 27 commits into
mainfrom
feat/websocket-functional-events

Conversation

@brainkim

@brainkim brainkim commented Apr 22, 2026

Copy link
Copy Markdown
Member

Summary

WebSocket support for Shovel via functional events, across all three platforms (Node, Bun, Cloudflare). Supersedes #82.

Built on one strict rule: no modifications to standards classes. WebSocket, Client, Clients, BroadcastChannel, ClientQueryOptions are untouched. Everything new lives on new event classes we own, plus a single FetchEvent.upgradeWebSocket() method (shape-analogous to respondWith/waitUntil). See the #82 closing rationale for why the earlier WebSocketClient extends Client approach was abandoned.

User-facing API

interface WebSocketConnection {
  readonly id: string;
  send(data: string | ArrayBuffer): void;
  close(code?: number, reason?: string): void;
  subscribe(channel: string): void;
  unsubscribe(channel: string): void;
}

interface FetchEvent {
  upgradeWebSocket(): WebSocketConnection;
}

interface WebSocketMessageEvent extends ExtendableEvent {
  readonly source: WebSocketConnection;
  readonly data: string | ArrayBuffer;
}

interface WebSocketCloseEvent extends ExtendableEvent {
  readonly id: string;
  readonly code: number;
  readonly reason: string;
  readonly wasClean: boolean;
}
self.addEventListener("fetch", async (event) => {
  if (event.request.headers.get("upgrade") === "websocket") {
    const ws = event.upgradeWebSocket();
    ws.subscribe("room:lobby");
    ws.send(JSON.stringify({type: "welcome"}));
    return;
  }
  event.respondWith(new Response("..."));
});

self.addEventListener("websocketmessage", (event) => {
  new BroadcastChannel("room:lobby").postMessage(event.data);
});

self.addEventListener("websocketclose", async (event) => {
  await storage.hdel("ws:users", event.id);
});

Cross-isolate fan-out piggybacks on BroadcastChannel via one semantic rule: a message published on a channel name is delivered as send() to every WebSocket that subscribe()'d to that channel — publishable from a cron handler, another isolate, anywhere.

Scope

  • Core runtime: ShovelWebSocketConnection, upgradeWebSocket(), websocketmessage / websocketclose dispatch, per-connection ordered delivery (globals.d.ts, runtime.ts, index.ts).
  • Node adapter (ws package, lazy-loaded) — direct + pool modes.
  • Bun adapter (native Bun.serve) — direct + pool modes.
  • Cloudflare adapter — Durable Object with the Hibernation API (websocket-do.ts), and a pub/sub DO (pubsub.ts) that holds a persisted per-channel registry of subscriber DO IDs and wakes hibernated subscriber DOs on publish via fetch("/_shovel_publish"). This is what makes channel fan-out correct on Cloudflare even when a subscriber's DO is evicted.
  • Pool IPC: WorkerUpgradeMessage / WebSocketPoolHandlers thread subscriber identity across the worker-thread boundary.
  • Example: examples/chat — single-room chat + e2e test.

Reviewer guidance — where to focus

This is a large PR; the highest-leverage areas for review:

  1. Cloudflare cross-isolate delivery semantics (pubsub.ts) — the registry/wake model is the load-bearing, newest piece. See "Known limitations" below; the central question is whether at-most-once, fire-and-forget delivery is the right v1 contract.
  2. Hibernation lifecycle (websocket-do.ts) — serializeAttachment/rehydration on wake, the webSocketErrorwebsocketclose dedup, per-connection ordered dispatch.
  3. Per-adapter duplication — the ordered-dispatch chain, the PendingFrame buffer-then-flush relay, and toHttpErrorResponse are each reimplemented per adapter. Intentional for now, but a candidate for a shared runtime helper (noted below).

Known limitations (intentional v1 tradeoffs — flagged for discussion)

Surfaced by review and deliberately not patched here, because they're design decisions rather than bugs:

  • Cross-isolate delivery is fire-and-forget / at-most-once. A failed /subscribe or /_shovel_publish is logged, not retried; there's no ack or reconciliation. A transiently-failed subscribe can leave an isolate locally subscribed but absent from the registry.
  • No registry reaping. A subscriber DO evicted/crashed without a clean /unsubscribe leaves its (channel, doId) entry in storage, so future publishes keep waking a dead DO. No TTL/liveness pruning yet.
  • Non-DO receive socket doesn't reconnect if the pub/sub DO is evicted mid-session.
  • websocketmessage waitUntil is awaited inside the per-connection ordered chain, so a slow background task head-of-line-blocks later frames on that connection. Fixing correctly means plumbing ctx.waitUntil through the message event — a deliberate change.

If reviewers consider any of these blockers rather than follow-ups, I'll address them in this PR.

Review fixes already applied

From an initial review pass (latest commit):

  • webSocketError now dispatches websocketclose (workerd may fire error without a follow-up close), with a #finalized guard so the handler runs exactly once.
  • Fixed a pendingFrames leak on failed pool-upgrade handshakes (Node + Bun).
  • Hot-path efficiency: serialize the publish payload once per publish (not per subscriber); re-serialize the hibernation attachment only when subscriptions change; capture _dispatchPubSubMessage to drop a per-delivery dynamic import.

Testing

  • Platform package suites (Node/Bun/Cloudflare/core): 257 pass.
  • Cloudflare e2e (Miniflare): cloudflare-websocket, cloudflare-pubsub-hibernation (wake + publish ordering), pool-websocket-close.
  • build / lint / typecheck / test green in CI.

Note: the Miniflare-based cloudflare-*.test.* files are run in separate bun test invocations (see bunfig.toml) to avoid a workerd "Broken pipe on fd=3" flake when run concurrently.

🤖 Generated with Claude Code

brainkim and others added 27 commits April 22, 2026 14:27
Defines the type surface for WebSocket support in Shovel ServiceWorker:

- FetchEvent.upgradeWebSocket() returns a WebSocketConnection
- WebSocketConnection: id, send, close, subscribe, unsubscribe
- websocketmessage event with source: WebSocketConnection (follows
  ExtendableMessageEvent.source pattern)
- websocketclose event with id + close details
- Runtime routes BroadcastChannel messages to subscribed connections
  via send() — BC class untouched, coupling is at the delivery layer

Design constraints:
- No modifications to WebSocket, Client, Clients, or BroadcastChannel
- No nested closures — all user state is module-scope or runtime-mediated
- Extension to FetchEvent limited to upgradeWebSocket(), shape-analogous
  to respondWith/waitUntil

Runtime implementation TBD — upgradeWebSocket() currently throws.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the platform-agnostic building blocks for WebSocket support:

- ShovelWebSocketConnection implements WebSocketConnection from globals.d.ts
  with id/send/close/subscribe/unsubscribe. Subscriptions are realized as
  ShovelBroadcastChannel listeners internal to the connection — BC messages
  on the subscribed channel are forwarded via send(). The set of channel
  names is persistable (survives hibernation); BC instances are ephemeral
  and rebuilt on each rehydration.

- ShovelWebSocketMessageEvent/CloseEvent extend ShovelExtendableEvent.
  Message event carries { source, data }; close event carries
  { id, code, reason, wasClean }.

- ShovelFetchEvent gains upgradeWebSocket(): validates Upgrade header,
  enforces sync call during dispatch, creates the connection, fires
  onUpgrade synchronously so the adapter can register before the handler
  continues (lets in-handler close find the connection, and lets the
  adapter clean up on handler throw — phantom-client lesson from prior PR).

- dispatchFetchEvent returns {event, response|null} so adapters can check
  kGetUpgradeResult after dispatch. dispatchWebSocketMessage/Close dispatch
  events and release subscriptions only after handlers have observed the
  close (deferred-removal lesson from prior PR).

Exports: WebSocketRelay interface, kGetUpgradeResult, kGetConnectionState,
kBindRelay symbols, WebSocketConnectionState interface.

Runtime implementation is platform-agnostic — Node/Bun/Cloudflare adapters
will wire up their own WebSocketRelay implementations in follow-up commits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Nine tests covering the platform-agnostic core:

- upgradeWebSocket creates a connection with a UUID id
- upgradeWebSocket throws without Upgrade: websocket header
- onUpgrade fires synchronously during the fetch handler (so adapters
  can register before handler continues, enabling phantom-client cleanup)
- websocketmessage dispatches with source + data; event.source.send works
- websocketclose dispatches with id + code + reason + wasClean, and
  subscriptions are released after handlers run
- subscribe() routes BroadcastChannel messages to the connection
- unsubscribe() stops forwarding
- Connection state round-trips for hibernation: rehydrating from state
  re-registers BC listeners
- send/close are no-ops after close

Also routes websocketmessage/websocketclose through the SERVICE_WORKER_EVENTS
dispatch path so self.addEventListener() reaches registration handlers, and
moves WebSocket symbols above ShovelFetchEvent to fix TDZ on module load.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds attachNodeWebSocketHandler() which installs a WebSocket upgrade
listener on an existing http.Server. On upgrade, dispatches a fetch
event with a buffering WebSocketRelay; if the handler calls
event.upgradeWebSocket(), completes the handshake via the `ws` package
and wires message/close frames to dispatchWebSocketMessage/Close.

Baked-in lessons from prior PR:
- Connection registered synchronously via onUpgrade; phantom cleanup in
  the dispatch catch path drops state if the handler throws.
- Buffering relay collects conn.send()/conn.close() calls made during
  the fetch handler; flushed only AFTER ws.on("message")/ws.on("close")
  listeners are attached (prevents missing close-during-upgrade frames).
- Per-connection dispatch chain serializes messages in arrival order.
- Connection removal from local registry deferred until websocketclose
  handlers finish.

7 end-to-end tests (platform-node/test/websocket-node.test.ts):
- echo
- greeting (send during upgrade)
- close-during-upgrade (client sees custom close code + reason)
- websocketclose fires with the arrival close code
- subscribe routes BroadcastChannel messages to the connection
- multiple clients get distinct ids
- binary frames round-trip as ArrayBuffer

Package additions:
- ws ^8.18.0 runtime dep, @types/ws devDep
- ./websocket subpath export from @b9g/platform-node

Pool-mode IPC, Bun, and Cloudflare DO adapters still TODO.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
createBunWebSocketServer() returns a {fetch, websocket, cleanup} config
you spread into Bun.serve(). Same conceptual flow as the Node adapter
but uses Bun.serve's native WebSocket support.

Mirrors the Node adapter's hardening:
- Connection registered via onUpgrade, phantom cleanup on throw
- Buffering relay flushed in websocket.open() AFTER binding the live socket
- Per-connection dispatch chain for ordered delivery
- Binary frame byteOffset/byteLength preserved via buffer.slice
- Subscription state survives via the core runtime's connection

5 end-to-end tests (echo, greeting, subscribe, binary, close code).

Adds ./websocket subpath export from @b9g/platform-bun.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
packages/platform-cloudflare/src/websocket-do.ts — ShovelWebSocketDO
extends DurableObject, using the Hibernation API:

- ctx.acceptWebSocket(server) registers for hibernation-capable dispatch
- ws.serializeAttachment({id, url, subscribedChannels}) persists enough
  state to reconstruct the ShovelWebSocketConnection after wake
- #ensureRuntime() runs after every wake, rehydrates one connection per
  ctx.getWebSockets() entry; ShovelWebSocketConnection's constructor
  re-registers BC listeners for each subscribed channel
- Per-connection ordered dispatch queues (webSocketMessage/Close)
- Deferred removal — connection + queue dropped only after close dispatch
- Phantom cleanup if the fetch handler throws after onUpgrade fires

Worker routing (runtime.ts):
- If SHOVEL_WS binding present, Upgrade: websocket requests forward to a
  single shared DO via ns.idFromName("shovel-ws") (all connections in one
  isolate keeps subscribe() fan-out in-process for the common case)
- If absent, dispatches locally and returns 426 with a clear message if
  user code calls upgradeWebSocket()

Generated worker entry (platform.ts):
- Re-exports ShovelWebSocketDO so wrangler can bind the class. Dead-code
  eliminated if the user doesn't configure SHOVEL_WS.

End-to-end test (test/cloudflare-websocket.test.js):
- Builds the cloudflare-websocket fixture, runs it through Miniflare with
  a SHOVEL_WS binding, connects via a real WebSocket client (avoids the
  known Miniflare.dispatchFetch WS hang under Bun test)
- Verifies upgrade, welcome frame during upgrade, echo round-trip, and
  clean close with user-supplied code

Package additions:
- ./websocket-do subpath export
- cloudflare-websocket fixture
- Root test script runs the new Miniflare test with
  --path-ignore-patterns override

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Node and Bun production worker templates (emitted by shovel build)
now attach the WebSocket upgrade handler so end users get WS support
turnkey in direct mode (config.workers <= 1).

Node template:
- Imports attachNodeWebSocketHandler from @b9g/platform-node/websocket
- After createServer() + listen(), attaches the handler to
  server.httpServer
- Stores the returned detach function and calls it on shutdown

Bun template:
- Replaces platform.createServer() with direct Bun.serve using the config
  returned by createBunWebSocketServer(registration) (fetch+websocket in
  one object)
- Calls adapter.cleanup() on shutdown

Plumbing:
- NodeServer interface extends Server with readonly httpServer
- NodePlatform.createServer returns NodeServer

Dev-mode templates still use startWorkerMessageLoop (develop command owns
HTTP), so they'll need pool-mode WS forwarding to gain WebSocket support.

Test: new "WebSocket echo via generated worker template" in
test/e2e-direct-mode.test.js builds a WS app, starts the production
server via the supervisor, and verifies greeting + echo round-trip
through a real ws client.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Multi-worker WebSocket support: supervisor owns the http.Server + real
socket; worker threads own the runtime Connection and user handlers;
frames are forwarded across the worker boundary via postMessage.

New in ServiceWorkerPool:
- WebSocketUpgradeResult discriminated union
- handleUpgradeRequest(request) — variant of handleRequest that may
  return an upgrade signal instead of a Response
- setWebSocketHandlers(h) — platform adapter registers callbacks for
  "send this frame on the real socket" / "close the real socket"
- sendWebSocketMessage(id, data) / sendWebSocketClose(...) — inbound
  frame/close forwarded from the real socket to the owning worker
- Internal connectionID → worker mapping for routing

Wire protocol additions (worker ↔ supervisor):
- ws:upgrade — worker tells supervisor "accept the handshake for
  request X, connection id Y"
- ws:send — worker asks supervisor to send a frame on connection Y
- ws:close — worker asks supervisor to close connection Y, OR
  supervisor tells worker the real socket closed (wasClean flag)
- ws:message — supervisor delivers inbound frame to owning worker

Worker-side (startWorkerMessageLoop):
- Maintains wsConnections map keyed by connectionID
- Per-connection ordered dispatch chain preserves arrival order
- handleFetchRequest now dispatches via dispatchFetchEvent with a
  supervisor-bound relay; if handler calls upgradeWebSocket(), posts
  ws:upgrade back instead of an HTTP response
- onUpgrade registers the Connection synchronously so phantom cleanup
  is possible if the handler throws after upgrade

Supervisor-side Node adapter (attachNodePoolWebSocketHandler):
- Listens on http.Server "upgrade" event
- Routes upgrade requests through pool.handleUpgradeRequest()
- Completes the handshake via ws.WebSocketServer
- Buffers outbound frames produced by the worker's fetch handler
  until the physical socket is live, then flushes AFTER the message
  listener is attached (prevents dropping close-during-upgrade frames)
- Forwards inbound frames/close back to the pool

NodePlatform.listen() wires the pool WS handler automatically when the
pool exposes setWebSocketHandlers. Existing HTTP-only callers are
unaffected since handleRequest still returns Response.

Tests:
- New e2e test: multi-worker production server handles WebSocket
  upgrade via pool IPC. 21 tests pass in the Node direct-mode suite
  (15 build output + 6 runtime + WS direct + WS pool).
- No regressions: 215/215 package tests + 21/21 direct-mode e2e.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
12 new tests in packages/platform/test/websocket.test.ts covering
subtle behaviors that caused bugs in the previous PR's Codex cycles:

- subscribe() idempotent — double subscribe only forwards once
- unsubscribe on non-subscribed channel is a no-op (never throws)
- multi-channel delivery: messages route to correct channels, cross-talk
  does not leak
- partial unsubscribe: one channel can be removed while others keep working
- wire safety: BC messages that aren't string/ArrayBuffer are silently
  dropped instead of forwarded as bogus WS frames
- binary frames: ArrayBuffer BC payloads forward as binary WS frames
- hibernation round-trip: subscriptions survive serialization +
  reconstruction, all channels still deliver after rehydration
- close idempotency: close event fires exactly once even if close() is
  called multiple times
- isolation: concurrent connections maintain independent subscription
  state, shared channels fan out, private channels stay private
- onUpgrade timing: synchronously fires DURING upgradeWebSocket(), not
  after the handler returns — this is the contract that makes phantom
  cleanup possible for platform adapters
- post-close API: subscribe/unsubscribe after close are ignored
- URL exposure: connection.url is accessible and persisted to state

Total: 21 core WS tests, all passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds createBunPoolWebSocketAdapter() — the Bun analogue of
attachNodePoolWebSocketHandler. BunPlatform.listen() now detects a
WS-capable pool and replaces its Bun.serve config with one that
routes upgrades through pool.handleUpgradeRequest and forwards frames
via pool IPC.

This closes two gaps:

- Bun supervisor mode (prod with workers > 1 via BunPlatform.listen):
  now handles WebSocket upgrades at the supervisor level in addition
  to the existing per-worker reusePort servers.
- Bun dev mode (shovel dev on Bun): createDevServer calls
  BunPlatform.listen(), so dev workflows now have WebSocket support
  for the first time.

Same hardening as the Node pool adapter:
- Frame buffering per connectionID flushed on websocket.open
- Binary frame slice preserves byteOffset/byteLength
- Phantom cleanup on handler throw, defensive upgrade-failed close
- wasClean flag based on 1006 vs explicit close code

Node dev mode was already covered transitively — NodePlatform.listen()
was wired in commit 176479e and is used by both dev and prod paths;
the existing "multi-worker production server handles WebSocket upgrade
via pool IPC" test validates the full chain.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spawns `shovel develop` with a WS-handling fixture and verifies:
- greeting frame delivered during upgrade
- echo round-trip
- clean close with user code

Covers the full dev pipeline (bundler → NodePlatform/BunPlatform →
createDevServer → platform.listen → pool IPC → worker handlers), which
exercises the exact path end users hit with `shovel dev`.

Both tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Fixes CI lint failures on PR #85:
- prettier auto-format across new WebSocket code
- catch {} → catch (_err) {} for no-restricted-syntax rule

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds 200ms settle delay after waitForPort and logs captured server
output when the WebSocket open handshake errors out. CI fails this
test consistently but passes locally; server-side stderr is needed
to see which path the generated worker takes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Removes the hanging upgrade-via-fetch probe (fetch can't handle 101
cleanly) and adds a direct HTTP fetch plus server output dump before
the WebSocket connect, so we can see whether the server is serving
at all and what it's logging when CI fails this test.
Bun's \`builtinModules\` from \`node:module\` incorrectly includes
non-Node packages (\`ws\`, \`undici\`, \`bun:*\`). Spreading it into
esbuild's \`external\` list marks those packages external, leaving
runtime \`import("ws")\` / \`import("undici")\` calls in the bundled
worker. On CI those don't resolve, producing
\`ERR_MODULE_NOT_FOUND: Cannot find package 'ws'\` during WebSocket
upgrade.

esbuild's \`platform: "node"\` already auto-externalizes every real
Node builtin, so the spread was redundant even before it was harmful.

- packages/platform-node/src/platform.ts: external reduced to ["node:*"]
- packages/platform-bun/src/platform.ts: external reduced to
  ["node:*", "bun", "bun:*"]

With this change, \`ws\` is bundled into the generated worker (where
it belongs — it's a runtime dependency, not a Node builtin), and the
direct-mode + pool-mode WebSocket tests pass on CI.

Also removes diagnostic instrumentation added to surface the error.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Running multiple cloudflare-*.test.* files concurrently in the same
\`bun test\` invocation triggers intermittent workerd failures:
  \`error: disconnected: miniposix::write(fd, pos, size): Broken pipe; fd = 3\`
  (workerd control socket write failing during bun+miniflare setup)

Workaround carried over from PR #82 (commit 1f154fd):
- bunfig.toml: exclude \`**/cloudflare-*.test.*\` from the default walk
- package.json: run each cloudflare-*.test.js in its own \`bun test\`
  invocation, sequentially, via the root \`test\` script

No code changes — purely test orchestration.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously the 'all providers bundled together' test asserted the
supervisor bundle contained no `require(...)` calls. That regex was
a proxy for 'everything is statically bundled' — overly strict because
esbuild's CJS interop emits `__require(...)` wrappers for vendored
CommonJS deps (now including `ws` after dropping it from the external
list). Those calls resolve to Node builtins at runtime and are not a
failure mode.

The actual correctness checks (no 'Cannot find module', no
'ERR_MODULE_NOT_FOUND', no 'Could not resolve') remain on the runtime
stderr assertions below.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The pool's \`#gracefulShutdown\` sent \`{type: "shutdown"}\` to each
worker and waited for a \`shutdown-complete\` reply with a 5s timeout
fallback. When the worker has no shutdown handler (e.g., in tests
using raw Worker scripts) or has already errored, every shutdown paid
a full 5s. In worker-errors.test.ts, two reloads + one terminate
stacked up to ~15s, tripping the test's 10s timeout on slow runners.

Changes:
- Default timeout reduced from 5000ms → 2000ms. Still generous for
  legitimate cleanup (databases.closeAll, etc.) but doesn't dominate
  test runtime when handlers are absent.
- Listen for the worker's \`error\` event — if fired, the worker is
  dying and won't ack a shutdown; finish immediately.
- Wrap \`worker.postMessage\` in try/catch so a throw from an already-
  terminated worker completes the shutdown instead of leaving the
  promise to time out.

Also drops the empty \`trigger CI rerun\` commit left over from
diagnosing the flake.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nate

Three issues flagged by Codex review on #85:

1. [P1] Bun direct-mode HTTP error wrapper bypassed
   createBunWebSocketServer replaces Bun.serve's fetch outright, so
   HTTPError/NotFoundError thrown by user handlers no longer pass
   through BunPlatform.createServer's try/catch/toResponse wrapper.
   They surfaced as Bun's default 500 pages instead of framework-
   formatted responses.

2. [P2] Same regression in Bun pool-mode adapter
   createBunPoolWebSocketAdapter forwards non-upgrade requests via
   pool.handleRequest() directly, bypassing the same wrapper.

3. [P2] Pool-owned WebSockets leaked across reload/terminate
   ServiceWorkerPool.reloadWorkers() and terminate() tear down
   workers but never close physical sockets tracked in
   #wsConnectionOwners. Active pool-mode WebSockets ended up wired
   to terminated workers with no close frame sent to clients.

Fixes:
- Added toHttpErrorResponse() helper in platform-bun/src/websocket.ts
  that mirrors BunPlatform.createServer's error-handling block. Both
  adapters wrap non-upgrade fetch paths and the upgrade-dispatch call
  with it so HTTPErrors land as proper responses again.
- ServiceWorkerPool gained #closePooledWebSockets(code, reason) that
  calls the platform adapter's closeConnection for every live
  connectionID and clears the ownership map. reloadWorkers uses 1012
  ("Server reloading"); terminate uses 1001 ("Server shutting down").

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codex re-review flagged one remaining gap: the direct Bun adapter's
upgrade branch still converted thrown HTTPError/NotFoundError into a
hardcoded 500 response, bypassing toHttpErrorResponse() and the
framework's normal status/body formatting.

Routed through toHttpErrorResponse() so if user code rejects an
upgrade with \`throw new UnauthorizedError()\` (or similar) before
calling upgradeWebSocket(), the client gets a proper 401 response
instead of an opaque 500.

All four addressed cases (non-upgrade direct, non-upgrade pool,
upgrade-dispatch pool, upgrade-dispatch direct) now share the same
wrapper.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rade

Codex re-review found two more leak/phantom-state cases:

1. [P2] Bun handshake failure leaves runtime state alive
   In createBunWebSocketServer, if the fetch handler called
   upgradeWebSocket() and then either threw, or Bun rejected the
   handshake (\`server.upgrade()\` returned false — typically client
   disconnect mid-handshake), the connection map was cleared but the
   ShovelWebSocketConnection itself kept its BC subscriptions. Channel
   listeners stayed attached to a socket that would never exist,
   causing memory growth and broadcast traffic delivery to a phantom.
   Same hole was open in the Node adapter (catch path + ws-load
   failure). Both adapters now call \`conn._releaseSubscriptions()\` in
   every failure branch after onUpgrade fired.

2. [P3] Pool ws:upgrade after request timeout left phantom owner
   ServiceWorkerPool.#handleWorkerMessage's \`ws:upgrade\` case stored
   \`connectionID -> worker\` unconditionally, then checked whether
   the original request was still pending. If the supervisor had
   already timed out and rejected handleUpgradeRequest, the ownership
   map still got the entry. Subsequent ws:send / ws:close from that
   worker routed to a phantom connection until the next reload.

   Now: only set the owner when the pending request is still live.
   When it isn't, post a ws:close back to the worker so it can drop
   its in-worker registry entry too (1011, "Upgrade arrived after
   request timeout"). No phantom on either side.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ed refs

Codex re-review on c140c2b confirmed the prior fixes and surfaced one
more leak path: dispatchWebSocketClose calls _releaseSubscriptions
which tears down BC subscriptions, but never sets #closed = true.
send/close/subscribe/unsubscribe are gated solely on #closed, so a
caller that retained the WebSocketConnection from upgradeWebSocket()
could still call ws.send() / ws.subscribe(...) AFTER the close had
been dispatched. Subscribe in particular re-creates a BC listener
holding the (closed) connection, recreating the phantom we just
tore down.

Fix is one line — set #closed = true at the top of
_releaseSubscriptions. After this, every operation on a retired
connection becomes a no-op (matches the post-close() behavior).

New regression test: "retained connection is inert after
dispatchWebSocketClose" — captures the connection ref from inside
upgradeWebSocket, dispatches close, then verifies subsequent send +
subscribe + BC publish all produce zero side effects.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds examples/chat/ — single-page chat that demonstrates every piece
of the WebSocket functional-events API in ~150 lines of source. Same
src/server.ts runs on Node, Bun, and Cloudflare Workers without any
platform-specific code:

- FetchEvent.upgradeWebSocket() to accept the connection
- WebSocketConnection.subscribe("room:lobby") for runtime-mediated,
  hibernation-safe room membership
- websocketmessage handler republishes the payload to the room channel
- BroadcastChannel as the only fanout primitive — runtime forwards to
  every subscribed connection
- websocketclose for "user left" notifications

Includes a minimal HTML/JS chat UI so opening two tabs is enough to
see broadcast working end-to-end.

Cloudflare deploy is one wrangler.toml binding away (SHOVEL_WS DO,
already declared in the bundled wrangler.toml).

Also adds a runtime e2e test in test/e2e-direct-mode.test.js that
spawns the same scenario as a real production server and verifies the
multi-client BC fan-out path: two clients join, both see the welcome
+ joined broadcasts, A's message reaches both A and B, and A sees B's
"left" event when B disconnects. This is the only test driving real
multi-client BroadcastChannel fan-out — the existing suites cover the
single-client paths and the runtime mechanics in isolation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Complete cross-isolate WebSocket fan-out so that a BroadcastChannel
publish reaches subscribers even when their Durable Object is hibernated.

ShovelPubSubDO now holds a per-channel registry of subscriber DO IDs,
persisted to ctx.storage so eviction doesn't drop it and rehydrated on
wake. Publishes fan out via fetch("/_shovel_publish") to each subscriber
DO ID — the leg Cloudflare uses to wake a hibernated DO — with per-DO
ordered delivery (promise chain + waitUntil) and sender filtering to
avoid echoing a publish back to its origin. Non-DO subscribers (regular
workers, cron handlers) keep a held-open, hibernation-accepted receive
socket and filter by sender client-side.

Supporting plumbing threads subscriber identity through the core runtime
WebSocketConnection, the pool IPC (WorkerUpgradeMessage /
WebSocketPoolHandlers), and the Node/Bun adapters.

Closes the last hibernation correctness gap in the functional-events
WebSocket API: the committed held-open-socket model either pinned the
subscriber DO awake or silently dropped broadcasts on eviction.

Tests: cloudflare-pubsub-hibernation (wake + publish ordering),
pool-websocket-close, plus platform regression suites. typecheck + lint
clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…path waste

Correctness:
- ShovelWebSocketDO now dispatches `websocketclose` from the
  `webSocketError` path (workerd may fire error without a follow-up
  close), so user close handlers and app-level cleanup always run. A
  #finalized guard dedups error+close so the handler fires exactly once.
- Drop buffered `pendingFrames` on a failed pool upgrade handshake in
  both the Node and Bun adapters (the flush-and-clear callback never
  runs on failure), fixing a per-aborted-upgrade memory leak.

Efficiency (Cloudflare fan-out hot path):
- Serialize the downstream publish payload once per publish instead of
  once per subscriber DO.
- Re-serialize the hibernation attachment only when subscribedChannels
  actually changed, not on every inbound data frame.
- Capture _dispatchPubSubMessage during #ensureRuntime so the
  /_shovel_publish route avoids a dynamic import per delivery.

Deferred (documented as reviewer guidance, not bugs): at-most-once
cross-isolate delivery (no retry/reconnect/registry reaping) and
websocketmessage waitUntil head-of-line blocking — both warrant
deliberate design decisions rather than quick patches.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant