Add withSignal() to ReactiveStreamStore for per-connection cancellation#1663
Add withSignal() to ReactiveStreamStore for per-connection cancellation#1663mcintyre94 wants to merge 1 commit into
withSignal() to ReactiveStreamStore for per-connection cancellation#1663Conversation
BundleMonFiles updated (22)
Unchanged files (125)
Total files change +7.57KB +1.43% Final result: ✅ View report in BundleMon website ➡️ |
|
Documentation Preview: https://kit-docs-ihyp5wbuh-anza-tech.vercel.app |
trevor-cortex
left a comment
There was a problem hiding this comment.
Summary
Replaces the construction-time abortSignal option on createReactiveStoreFromDataPublisherFactory, createReactiveStoreWithInitialValueAndSlotTracking, and PendingRpcSubscriptionsRequest.reactiveStore() with a per-connection store.withSignal(signal).connect() pattern, mirroring the action store. The createDataPublisher factory is widened from () => Promise<DataPublisher> to (signal: AbortSignal) => Promise<DataPublisher> so the underlying transport can stop on per-connection abort, not just listener cleanup.
The design is sound and the docs/changeset are thorough. Semantics are well thought out:
- Caller-signal abort → transitions to
errorwith the abort reason (visible). - Inner-controller abort (supersession via a fresh
connect()orreset()) → silent so the newer call owns state. - Already-aborted caller signal at
connect()time short-circuits toerrorwithout invoking the factory. - The caller-abort listener is scoped to
{ signal: innerSignal }so it auto-removes on supersede/reset — no listener accumulation across reconnects. - The
error-channel handler still early-returns ifcurrentState.status === 'error', so a publisher error arriving after a caller abort can't overwrite the abort reason.
This looks good to me. A few notes below — nothing blocking.
Implementation divergence between the two stores
The two performConnect implementations have diverged in how they propagate the caller signal:
packages/subscribable/src/reactive-stream-store.tscomposesAbortSignal.any([innerController.signal, callerSignal])and passes that composedsignalto bothcreateDataPublisher(signal)and the{ signal }option on everypublisher.on(...).packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.tspasses onlyinnerSignaltorpcRequest.send/rpcSubscriptionRequest.subscribe, and relies on the caller-abort listener callinginnerController.abort(callerSignal.reason)to propagate cancellation.
Functionally equivalent today — in both, the RPC/transport eventually sees an aborted signal carrying the caller's reason. But two slightly different shapes for the same pattern in sibling files is the kind of drift that bites later (e.g. if AbortSignal.any is ever load-bearing for one but not the other). Worth a follow-up to converge on one pattern, probably the AbortSignal.any one in the subscribable store, since it surfaces the caller reason to the transport without depending on the listener firing first.
Small coverage regression in the kit test
In packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts, the previous versions of the first two tests in the new withSignal() describe block asserted expect(rpcSignal.reason).toBe('test reason') / expect(subscriptionSignal.reason).toBe('test reason') — pinning that the caller's abort reason propagates all the way through to the RPC and subscription abortSignal. The new versions only check .aborted === true and drop the .reason assertions. Given the design explicitly forwards the reason (innerController.abort(callerSignal.reason) and setState({ error: callerSignal.reason, ... })), it'd be worth keeping those reason assertions — the 'does not overwrite the abort-reason error with a late RPC rejection' test only verifies the store-level error, not the signal-level reason that downstream transports actually observe.
For subsequent reviewers
retry()and bareconnect()after a caller-abort error drop the bound signal. This is intentional and called out in thewithSignalJSDoc ("Barestore.connect()calls bypass the bound signal — discipline required"). Worth confirming this is the desired UX, especially for the React<ErrorMessage onRetry={store.connect} />example shown in the docs — if a caller attached a kill switch viawithSignal, that retry button bypasses it. May want a follow-up that returns aretry()on thewithSignalwrapper too, for symmetry.- The
createDataPublisherwidening is source-compatible (TS allows fewer parameters than declared) but the PR is still correctly markedmajorbecause the construction-timeabortSignaloption is removed. - Changeset is present, well-scoped to the three affected packages, and the bumps + prose match the repo's conventions. Skill instructions look respected (auto-generated filename, no manual edits).
3bccb76 to
562ec8f
Compare
28fb867 to
bed0452
Compare
🦋 Changeset detectedLatest commit: 51febd9 The changes in this PR will be included in the next version bump. This PR includes changesets to release 47 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
bed0452 to
b631bce
Compare
Agreed, aligned on the
Updated these tests to check reason again
Intentional, but tweaked the docstring a bit |
562ec8f to
e4b9966
Compare
b631bce to
85182fc
Compare
lorisleiva
left a comment
There was a problem hiding this comment.
store.withMyApproval().connect() 🫡
e4b9966 to
fb3ce78
Compare
85182fc to
6169ab5
Compare
fb3ce78 to
9cad348
Compare
6169ab5 to
1b2b3f3
Compare
9cad348 to
176febd
Compare
1b2b3f3 to
0c67325
Compare
0c67325 to
ade3616
Compare
345a3d2 to
63ce806
Compare
ade3616 to
380b4f8
Compare
63ce806 to
b620102
Compare
1ec234a to
7615ebb
Compare
645eff6 to
54df544
Compare
7615ebb to
34b65f1
Compare
54df544 to
700d89a
Compare
34b65f1 to
4197ebe
Compare
…lation
Adds `store.withSignal(signal).connect()` to `ReactiveStreamStore`, mirroring the action store's per-dispatch `withSignal()` pattern — callers attach a per-connection signal at the call site instead of baking one into the store's construction. Drops the construction-time `abortSignal` option on `createReactiveStoreFromDataPublisherFactory`, `createReactiveStoreWithInitialValueAndSlotTracking`, and `PendingRpcSubscriptionsRequest.reactiveStore()`. The duck-type `ReactiveStreamSource<T>.reactiveStore()` is now parameter-less, mirroring `ReactiveActionSource<T>.reactiveStore()`.
`store.withSignal(signal)` returns a thin wrapper exposing `connect()`. Each call composes the caller-provided signal with the per-connection inner controller via `AbortSignal.any` — aborting either tears down the active connection. Aborting the caller-provided signal surfaces the abort reason on state as `{ status: 'error' }`; supersession via the internal controller (a newer `connect()` or `reset()`) stays silent so the newer call owns state. Use cases:
- Per-connection timeout: `store.withSignal(AbortSignal.timeout(30_000)).connect()` mints a fresh clock per attempt.
- Permanent kill switch: hold one `AbortController`, bind the wrapper once (\`const killable = store.withSignal(killCtrl.signal)\`), and use \`killable.connect()\` everywhere. After \`killCtrl.abort()\`, every future call short-circuits to error. Bare \`store.connect()\` calls bypass the bound signal — same discipline contract as the action store's bind-once pattern.
\`createDataPublisher\` is widened from \`() => Promise<DataPublisher>\` to \`(signal: AbortSignal) => Promise<DataPublisher>\`. The store passes the composed per-connection signal to the factory so the underlying transport can stop on per-connection abort, not just the stream-store's listeners. Existing no-arg factories still satisfy the new shape — TypeScript allows fewer parameters than the declared type.
Internal: the per-connection inner controller plus the optional caller signal are composed with \`AbortSignal.any\`, replacing the manually-scoped abort forwarder and the prior \`outerController\` that bridged the construction-time \`abortSignal\` to inner connections. Same observable behaviour for the supersede / reset paths; new behaviour for the caller-signal path (surfaces as error rather than silently disconnecting).
Tests in \`@solana/subscribable\`, \`@solana/rpc-subscriptions-spec\`, and \`@solana/kit\` updated to exercise the new \`withSignal\` API. The "abort signal" describe blocks (which tested the deprecated construction-time signal) are replaced with focused \`withSignal()\` blocks covering the per-connection timeout, kill-switch, and supersede-doesn't-touch-caller-signal cases.
700d89a to
c060ad7
Compare
4197ebe to
51febd9
Compare

Summary of Changes
This PR adds a per-connect abort signal to the reactive stream store. It uses the same pattern as the action store:
Note that the previous PR removes auto-connect from the stream store, so this same pattern applies to all connects, including the first.
The function to create a data publisher for the store is widened from
() => Promise<DataPublisher>to(signal: AbortSignal) => Promise<DataPublisher>, ie the per-connect abort signal can be used as part of that factory.The
abortSignalinput to the stream store (and tocreateReactiveStoreWithInitialValueAndSlotTracking, andPendingRpcSubscriptionsRequest.reactiveStore()) is removed in favour of this per-connect pattern.