From 25f6fbb6ee24f2a041a568b933ff97cc60a47414 Mon Sep 17 00:00:00 2001 From: Callum Date: Tue, 19 May 2026 10:13:04 +0000 Subject: [PATCH] Add `useSubscription` for rendering based on a subscription MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `useSubscription(source)` opens a stream-store subscription on mount, re-opens whenever the source identity changes, and tears it down on unmount. The source is any `ReactiveStreamSource` — `PendingRpcSubscriptionsRequest` is the canonical implementation. Pass `null` to disable. The hook mirrors `useRequest`'s structure exactly: construct the lazy store via `useMemo`, fire `store.connect()` in a `useEffect`, tear down via `store.reset()` in cleanup. Same StrictMode-safe lifecycle pattern (mount → cleanup aborts → mount re-fires), same vocabulary, same per-call signal API. The hook returns `{ data, error, reconnect, slot, status }`. Status is one of `loading | loaded | error | disabled`. After a notification arrives, an error-channel publish transitions to `error` while preserving the stale `data`; `reconnect()` returns to `loading` (preserving stale `data` and `error` for stale-while-revalidate) before settling on `loaded` or a fresh `error`. The bridge maps the store's `idle` state to `loading` when enabled (matching the about-to-commit-effect render) and to `disabled` when the source is null. Notifications shaped as `SolanaRpcResponse` (account/program/signature) are unwrapped via `isSolanaRpcResponse` from `@solana/rpc-types`: `data` is the inner value `U` and `slot` is lifted from `context.slot`. Raw notifications (slot/logs/root) pass through with `slot: undefined`. The `UnwrapRpcResponse` conditional type (also from `@solana/rpc-types`) tracks the runtime unwrap at the type level. Optional `getAbortSignal: () => AbortSignal` is invoked on every connection (initial subscribe + every `reconnect()`). The returned signal is composed with the store's per-connection controller via `AbortSignal.any` through `withSignal(signal).connect()`. The natural use is per-connection timeouts (`() => AbortSignal.timeout(30_000)`), which reset on reconnect. Factory is ref-synced — inline closures are fine. `reconnect()` accepts an optional `{ abortSignal }` override for one specific attempt; presence-based semantics distinguish "use factory" (omit the key), "explicit signal" (`{ abortSignal: signal }`), and "no signal" (`{ abortSignal: undefined }`). SSR-safe: on the server the connect effect doesn't run, so the store stays `idle` and the hook reports `status: 'loading'`. The first client render hydrates from the same paint and commits the connect. Adds `disabledStreamStore()` to `staticStores.ts` — the stream-store analogue of `disabledActionStore` for the null-source case. Pinned by new tests alongside the existing `disabledActionStore` invariants. Exports `SubscriptionResult` and `UseSubscriptionOptions` for plugin hooks to build on. --- .changeset/icy-sites-clap.md | 23 ++ packages/react/README.md | 45 +++ .../react/src/__tests__/staticStores-test.ts | 59 +++- .../useSubscription-test.browser.tsx | 329 ++++++++++++++++++ .../__typetests__/useSubscription-typetest.ts | 56 +++ packages/react/src/index.ts | 1 + packages/react/src/staticStores.ts | 30 +- packages/react/src/useSubscription.ts | 136 ++++++++ packages/react/src/useSubscriptionResult.ts | 37 ++ 9 files changed, 714 insertions(+), 2 deletions(-) create mode 100644 .changeset/icy-sites-clap.md create mode 100644 packages/react/src/__tests__/useSubscription-test.browser.tsx create mode 100644 packages/react/src/__typetests__/useSubscription-typetest.ts create mode 100644 packages/react/src/useSubscription.ts create mode 100644 packages/react/src/useSubscriptionResult.ts diff --git a/.changeset/icy-sites-clap.md b/.changeset/icy-sites-clap.md new file mode 100644 index 000000000..492b14132 --- /dev/null +++ b/.changeset/icy-sites-clap.md @@ -0,0 +1,23 @@ +--- +'@solana/react': minor +--- + +Add `useSubscription` — a React hook for subscription-based live data. Pass a `ReactiveStreamSource` (satisfied by `PendingRpcSubscriptionsRequest`) and the hook opens the subscription on mount, re-opens whenever the source identity changes, and tears it down on unmount. + +```tsx +function AccountBalance({ address }: { address: Address }) { + const client = useClient>(); + const source = useMemo(() => client.rpcSubscriptions.accountNotifications(address), [client, address]); + const { data, error, reconnect } = useSubscription(source); + if (error) return ; + return

{data ? `${data.value.lamports} lamports at slot ${data.context.slot}` : 'Connecting…'}

; +} +``` + +The result reports `status` as one of `loading | loaded | error | disabled`. `data` is the notification exactly as the source emits it — no unwrapping or reshaping. For RPC subscriptions that emit `SolanaRpcResponse` (account/program/signature), read the inner value at `data.value` and the slot at `data.context.slot`; for raw notifications (slot/logs/root) `data` is the raw shape. Pass `null` for the source to gate the subscription off — useful while inputs aren't yet known. The result then reports `status: 'disabled'`. After a notification arrives, an error transitions to `status: 'error'` while preserving the stale `data`; `reconnect()` returns to `loading` (preserving stale `data` and `error` for stale-while-revalidate) before settling on `loaded` or a fresh `error`. + +Optional `getAbortSignal: () => AbortSignal` is a factory invoked on every connection (initial subscribe + every `reconnect()`). Each connection gets a fresh signal that the underlying store composes with its per-connection controller via `AbortSignal.any`. The natural use is per-connection timeouts: `getAbortSignal: () => AbortSignal.timeout(30_000)` gives every connection its own 30-second clock that resets on reconnect. The factory is held in a ref synced to the latest render, so inline closures are fine — no `useCallback` needed. `reconnect()` also accepts an optional `{ abortSignal }` override to replace the factory for one specific attempt (presence-based: omit to use the factory, `{ abortSignal: signal }` to override, `{ abortSignal: undefined }` to opt out). + +The hook mirrors `useRequest`'s structure exactly: construct the lazy store via `useMemo`, fire `store.connect()` in a `useEffect`, tear down via `store.reset()` in cleanup. Same StrictMode-safe lifecycle, same vocabulary, same per-call signal API. SSR-safe — on the server the connect effect doesn't run, so the store stays `idle` and the hook reports `status: 'loading'`; first client render hydrates from the same paint and commits the connect. + +`SubscriptionResult` and `UseSubscriptionOptions` are exported alongside the hook so plugin hooks built on top can declare their return shape against them. diff --git a/packages/react/README.md b/packages/react/README.md index 9b35a9c88..e23b444cb 100644 --- a/packages/react/README.md +++ b/packages/react/README.md @@ -199,6 +199,51 @@ refresh({ abortSignal: undefined }); // no abort signal for this attempt refresh(); // omit the key to use the factory (default) ``` +### `useSubscription(source, options?)` + +Subscribe to a stream-store source and surface the latest notification as reactive state. Returns `{ data, error, reconnect, status }` where `status` is one of `'loading' | 'loaded' | 'error' | 'disabled'`. Use it for any RPC subscription (`accountNotifications`, `slotNotifications`, `logsNotifications`, etc.) or any plugin-authored stream that satisfies `ReactiveStreamSource`. + +`source` is any `ReactiveStreamSource` — the `{ reactiveStore() }` duck-type satisfied by `PendingRpcSubscriptionsRequest`. Pass `null` to disable. Memoize the source with `useMemo` keyed on whatever inputs it depends on; stable identity is how the hook knows when to tear down and re-open. + +`data` is the notification as the source emits it. For RPC subscriptions that emit `SolanaRpcResponse`, read the inner value at `data.value` and the slot at `data.context.slot`. For raw notifications, `data` is the raw shape. + +```tsx +import { useClient, useSubscription } from '@solana/react'; +import type { Address, AccountNotificationsApi, ClientWithRpcSubscriptions } from '@solana/kit'; + +function AccountBalance({ address }: { address: Address }) { + const client = useClient>(); + const source = useMemo(() => client.rpcSubscriptions.accountNotifications(address), [client, address]); + const { data, error, reconnect } = useSubscription(source); + if (error) return ; + return

{data ? `${data.value.lamports} lamports at slot ${data.context.slot}` : 'Connecting…'}

; +} +``` + +`reconnect()` re-opens the connection. After a `loaded` outcome that transitions to `error`, calling `reconnect()` returns `status` to `'loading'` while preserving the stale `data` and `error` (stale-while-revalidate) → `'loaded'` (or `'error'` again). The hook tears the connection down on unmount via the store's `reset()`; StrictMode's mount → unmount → mount cycle re-opens cleanly. + +#### Per-connection cancellation + +Pass `getAbortSignal` to attach a cancellation signal to each individual connection — initial subscribe plus every `reconnect()`. The natural use is per-connection timeouts: + +```tsx +const { data, error, reconnect } = useSubscription(source, { + // Each connection gets a fresh 30-second clock. `reconnect()` resets it. + getAbortSignal: () => AbortSignal.timeout(30_000), +}); +``` + +The factory is held in a ref synced to the latest render, so inline closures are fine — no `useCallback` needed. To kill the subscription entirely (e.g. on a route change), set the memoized source to `null` (the result reports `disabled`), or let the component unmount. + +`reconnect()` accepts an optional `{ abortSignal }` override that replaces the configured factory for just that attempt — useful when one specific reconnect needs different cancellation semantics: + +```tsx +const userInitiatedCtrl = new AbortController(); +reconnect({ abortSignal: userInitiatedCtrl.signal }); // override: use this signal, ignore the factory +reconnect({ abortSignal: undefined }); // no abort signal for this attempt +reconnect(); // omit the key to use the factory (default) +``` + ## Hooks ### `useSignIn(uiWalletAccount, chain)` diff --git a/packages/react/src/__tests__/staticStores-test.ts b/packages/react/src/__tests__/staticStores-test.ts index f7c437824..6f8b039ee 100644 --- a/packages/react/src/__tests__/staticStores-test.ts +++ b/packages/react/src/__tests__/staticStores-test.ts @@ -1,4 +1,4 @@ -import { disabledActionStore } from '../staticStores'; +import { disabledActionStore, disabledStreamStore } from '../staticStores'; describe('disabledActionStore', () => { it('reports a frozen `idle` state with no data or error', () => { @@ -53,3 +53,60 @@ describe('disabledActionStore', () => { await expect(store.dispatchAsync()).rejects.toMatchObject({ name: 'AbortError' }); }); }); + +describe('disabledStreamStore', () => { + it('reports a frozen `idle` state with no data or error', () => { + const store = disabledStreamStore(); + const state = store.getUnifiedState(); + expect(state).toEqual({ data: undefined, error: undefined, status: 'idle' }); + expect(Object.isFrozen(state)).toBe(true); + }); + + it('returns the same getUnifiedState() reference across calls', () => { + const store = disabledStreamStore(); + expect(store.getUnifiedState()).toBe(store.getUnifiedState()); + }); + + it('`connect()` is a no-op — state does not change', () => { + const store = disabledStreamStore(); + const before = store.getUnifiedState(); + store.connect(); + store.connect(); + expect(store.getUnifiedState()).toBe(before); + }); + + it('`reset()` is a no-op — state does not change', () => { + const store = disabledStreamStore(); + const before = store.getUnifiedState(); + store.reset(); + expect(store.getUnifiedState()).toBe(before); + }); + + it('`retry()` is a no-op — state does not change', () => { + const store = disabledStreamStore(); + const before = store.getUnifiedState(); + store.retry(); + expect(store.getUnifiedState()).toBe(before); + }); + + it('`withSignal(signal).connect()` is a no-op — state does not change, signal is not observed', () => { + const store = disabledStreamStore(); + const ctrl = new AbortController(); + const before = store.getUnifiedState(); + store.withSignal(ctrl.signal).connect(); + ctrl.abort(new Error('would-be-cancellation')); + expect(store.getUnifiedState()).toBe(before); + }); + + it('`subscribe()` never notifies (connect + reset produce no state change to observe)', () => { + const store = disabledStreamStore(); + const listener = jest.fn(); + const unsubscribe = store.subscribe(listener); + store.connect(); + store.reset(); + store.retry(); + store.withSignal(new AbortController().signal).connect(); + expect(listener).not.toHaveBeenCalled(); + unsubscribe(); + }); +}); diff --git a/packages/react/src/__tests__/useSubscription-test.browser.tsx b/packages/react/src/__tests__/useSubscription-test.browser.tsx new file mode 100644 index 000000000..de76fdacc --- /dev/null +++ b/packages/react/src/__tests__/useSubscription-test.browser.tsx @@ -0,0 +1,329 @@ +import { SolanaRpcResponse } from '@solana/rpc-types'; +import { createReactiveStoreFromDataPublisherFactory, DataPublisher, ReactiveStreamSource } from '@solana/subscribable'; +import { act } from '@testing-library/react'; +import React from 'react'; +import { renderToString } from 'react-dom/server'; + +import { renderHook } from '../__test-utils__/render'; +import { useSubscription } from '../useSubscription'; + +type Notification = T | SolanaRpcResponse; + +function makeFakeSubscription(): { + publish: (notification: Notification) => Promise; + publishError: (err: unknown) => Promise; + publishersCreated: () => number; + source: ReactiveStreamSource; +} { + type Listener = (payload: unknown) => void; + let dataListeners: Listener[] = []; + let errorListeners: Listener[] = []; + let createdCount = 0; + let publisherReady = Promise.withResolvers(); + return { + async publish(notification) { + // Wait for the most recent connection's listeners to be wired up before firing + await publisherReady.promise; + dataListeners.forEach(fn => fn(notification)); + }, + async publishError(err) { + await publisherReady.promise; + errorListeners.forEach(fn => fn(err)); + }, + publishersCreated: () => createdCount, + source: { + reactiveStore() { + return createReactiveStoreFromDataPublisherFactory({ + createDataPublisher() { + createdCount++; + // Each connection gets a fresh publisher. Reset the listener arrays so + // any late callbacks from a torn-down prior connection can't reach the + // new one's listeners, and reset the ready handle so tests awaiting the + // *new* connection only proceed once it has wired up. + dataListeners = []; + errorListeners = []; + publisherReady = Promise.withResolvers(); + let onCallCount = 0; + const publisher: DataPublisher = { + on(channel, listener, options) { + const list = channel === 'data' ? dataListeners : errorListeners; + list.push(listener); + options?.signal.addEventListener( + 'abort', + () => { + const idx = list.indexOf(listener); + if (idx !== -1) list.splice(idx, 1); + }, + { once: true }, + ); + // The store binds both `data` and `error` channels on commit; + // after the second `.on` call the listeners are fully wired up + // and any awaiting publish/publishError calls can proceed. + if (++onCallCount === 2) publisherReady.resolve(); + return () => { + const idx = list.indexOf(listener); + if (idx !== -1) list.splice(idx, 1); + }; + }, + }; + return Promise.resolve(publisher); + }, + dataChannelName: 'data', + errorChannelName: 'error', + }); + }, + }, + }; +} + +describe('useSubscription', () => { + it('starts in loading, transitions to loaded on first notification', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + const { result } = renderHook(() => useSubscription(sub.source)); + + expect(result.current.status).toBe('loading'); + expect(result.current.data).toBeUndefined(); + + await act(async () => sub.publish({ value: 42 })); + expect(result.current.status).toBe('loaded'); + expect(result.current.data).toStrictEqual({ value: 42 }); + }); + + it('surfaces `SolanaRpcResponse` envelopes as-is — callers read `.value` and `.context.slot`', async () => { + const sub = makeFakeSubscription>(); + const { result } = renderHook(() => useSubscription(sub.source)); + + await act(async () => sub.publish({ context: { slot: 99n }, value: { lamports: 5n } })); + expect(result.current.data).toStrictEqual({ context: { slot: 99n }, value: { lamports: 5n } }); + }); + + it('passes raw notifications through unchanged', async () => { + const sub = makeFakeSubscription<{ slot: bigint; parent: bigint; root: bigint }>(); + const { result } = renderHook(() => useSubscription(sub.source)); + + await act(async () => sub.publish({ parent: 9n, root: 8n, slot: 10n })); + expect(result.current.data).toStrictEqual({ parent: 9n, root: 8n, slot: 10n }); + }); + + it('transitions to error on error-channel publish, preserving stale data', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + const { result } = renderHook(() => useSubscription(sub.source)); + await act(async () => sub.publish({ value: 1 })); + expect(result.current.data).toStrictEqual({ value: 1 }); + + const boom = new Error('boom'); + await act(async () => sub.publishError(boom)); + expect(result.current.status).toBe('error'); + expect(result.current.error).toBe(boom); + expect(result.current.data).toStrictEqual({ value: 1 }); // stale preserved + }); + + it('reconnect() re-opens the connection and transitions loading → loaded (SWR for data + error)', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + const { result } = renderHook(() => useSubscription(sub.source)); + await act(async () => sub.publish({ value: 1 })); + const boom = new Error('fail'); + await act(async () => sub.publishError(boom)); + expect(result.current.status).toBe('error'); + + act(() => result.current.reconnect()); + expect(result.current.status).toBe('loading'); + expect(result.current.data).toStrictEqual({ value: 1 }); // stale data preserved + expect(result.current.error).toBe(boom); // stale error preserved (SWR) + + await act(async () => sub.publish({ value: 2 })); + expect(result.current.status).toBe('loaded'); + expect(result.current.data).toStrictEqual({ value: 2 }); + expect(result.current.error).toBeUndefined(); // cleared on successful reconnect + expect(sub.publishersCreated()).toBe(2); + }); + + it('reports status: disabled when the source is null', () => { + const { result } = renderHook(() => useSubscription<{ value: number }>(null)); + expect(result.current.status).toBe('disabled'); + expect(result.current.data).toBeUndefined(); + }); + + it('starts connecting when the source transitions from null to a real source', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + const initialProps: { source: ReactiveStreamSource<{ value: number }> | null } = { source: null }; + const { result, rerender } = renderHook(({ source }) => useSubscription(source), { initialProps }); + expect(result.current.status).toBe('disabled'); + expect(sub.publishersCreated()).toBe(0); + + rerender({ source: sub.source }); + expect(result.current.status).toBe('loading'); + await act(async () => sub.publish({ value: 1 })); + expect(result.current.status).toBe('loaded'); + expect(sub.publishersCreated()).toBe(1); + }); + + it('returns to disabled when the source transitions from a real source to null', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + const initialProps: { source: ReactiveStreamSource<{ value: number }> | null } = { source: sub.source }; + const { result, rerender } = renderHook(({ source }) => useSubscription(source), { initialProps }); + await act(async () => sub.publish({ value: 1 })); + expect(result.current.status).toBe('loaded'); + + rerender({ source: null }); + expect(result.current.status).toBe('disabled'); + expect(result.current.data).toBeUndefined(); + }); + + it('opens a fresh subscription when the source identity changes', async () => { + const subA = makeFakeSubscription<{ value: number }>(); + const subB = makeFakeSubscription<{ value: number }>(); + const { result, rerender } = renderHook( + ({ which }: { which: 'a' | 'b' }) => useSubscription(which === 'a' ? subA.source : subB.source), + { initialProps: { which: 'a' } }, + ); + await act(async () => subA.publish({ value: 1 })); + expect(result.current.data).toStrictEqual({ value: 1 }); + + rerender({ which: 'b' }); + await act(async () => subB.publish({ value: 2 })); + expect(result.current.data).toStrictEqual({ value: 2 }); + }); + + it("aborts the prior connection's listeners when the source identity changes", async () => { + const subA = makeFakeSubscription<{ value: number }>(); + const subB = makeFakeSubscription<{ value: number }>(); + const { result, rerender } = renderHook( + ({ which }: { which: 'a' | 'b' }) => useSubscription(which === 'a' ? subA.source : subB.source), + { initialProps: { which: 'a' } }, + ); + await act(async () => subA.publish({ value: 1 })); + + rerender({ which: 'b' }); + // Late publishes from the now-torn-down prior connection must not reach the hook. + await act(async () => subA.publish({ value: 99 })); + expect(result.current.data).not.toStrictEqual({ value: 99 }); + + await act(async () => subB.publish({ value: 2 })); + expect(result.current.data).toStrictEqual({ value: 2 }); + }); + + it('keeps a stable reconnect reference across re-renders', () => { + const sub = makeFakeSubscription<{ value: number }>(); + const { result, rerender } = renderHook(() => useSubscription(sub.source)); + const { reconnect } = result.current; + rerender(); + expect(result.current.reconnect).toBe(reconnect); + }); + + it('invokes `getAbortSignal` on every connection with a fresh signal', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + const signals: AbortSignal[] = []; + const getAbortSignal = jest.fn(() => { + const ctrl = new AbortController(); + signals.push(ctrl.signal); + return ctrl.signal; + }); + const { result } = renderHook(() => useSubscription(sub.source, { getAbortSignal })); + await act(async () => sub.publish({ value: 1 })); + expect(getAbortSignal).toHaveBeenCalledTimes(1); + + await act(async () => sub.publishError(new Error('fail'))); + act(() => result.current.reconnect()); + await act(async () => sub.publish({ value: 2 })); + expect(getAbortSignal).toHaveBeenCalledTimes(2); + expect(signals[1]).not.toBe(signals[0]); + }); + + it('reconnect({ abortSignal }) overrides the getAbortSignal factory for that attempt', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + const getAbortSignal = jest.fn(() => new AbortController().signal); + const { result } = renderHook(() => useSubscription(sub.source, { getAbortSignal })); + await act(async () => sub.publish({ value: 1 })); + expect(getAbortSignal).toHaveBeenCalledTimes(1); + + const overrideCtrl = new AbortController(); + act(() => result.current.reconnect({ abortSignal: overrideCtrl.signal })); + expect(getAbortSignal).toHaveBeenCalledTimes(1); // factory NOT called + expect(sub.publishersCreated()).toBe(2); + + await act(async () => overrideCtrl.abort(new Error('overridden'))); + expect(result.current.status).toBe('error'); + }); + + it('reconnect({ abortSignal: undefined }) opts out of the factory for that attempt', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + const getAbortSignal = jest.fn(() => new AbortController().signal); + const { result } = renderHook(() => useSubscription(sub.source, { getAbortSignal })); + await act(async () => sub.publish({ value: 1 })); + expect(getAbortSignal).toHaveBeenCalledTimes(1); + + act(() => result.current.reconnect({ abortSignal: undefined })); + expect(getAbortSignal).toHaveBeenCalledTimes(1); // factory NOT called + expect(sub.publishersCreated()).toBe(2); + }); + + it('reconnect() without an arg falls back to the getAbortSignal factory', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + const getAbortSignal = jest.fn(() => new AbortController().signal); + const { result } = renderHook(() => useSubscription(sub.source, { getAbortSignal })); + await act(async () => sub.publish({ value: 1 })); + expect(getAbortSignal).toHaveBeenCalledTimes(1); + + act(() => result.current.reconnect()); + expect(getAbortSignal).toHaveBeenCalledTimes(2); + }); + + it('aborting the getAbortSignal transitions the current connection to error; reconnect recovers', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + let currentCtrl: AbortController | undefined; + const getAbortSignal = () => { + currentCtrl = new AbortController(); + return currentCtrl.signal; + }; + const { result } = renderHook(() => useSubscription(sub.source, { getAbortSignal })); + + const timeoutReason = new Error('timeout'); + await act(async () => currentCtrl!.abort(timeoutReason)); + expect(result.current.status).toBe('error'); + expect(result.current.error).toBe(timeoutReason); + + act(() => result.current.reconnect()); + expect(currentCtrl!.signal.aborted).toBe(false); // brand-new controller for the new connection + + await act(async () => sub.publish({ value: 1 })); + expect(result.current.status).toBe('loaded'); + expect(result.current.data).toStrictEqual({ value: 1 }); + }); + + it('aborts the in-flight subscription when the component unmounts', async () => { + const sub = makeFakeSubscription<{ value: number }>(); + const { unmount } = renderHook(() => useSubscription(sub.source)); + await act(async () => sub.publish({ value: 1 })); + expect(sub.publishersCreated()).toBe(1); + unmount(); + // After unmount, late publishes don't drive listeners — they've been removed via reset(). + await sub.publish({ value: 2 }); + // No assertion needed beyond "no throw"; we're verifying reset() ran without error. + }); + + describe('SSR', () => { + it('renders `loading` on the server without opening a subscription', () => { + const sub = makeFakeSubscription<{ value: number }>(); + function Component() { + const { status } = useSubscription(sub.source); + return

{status}

; + } + // `renderToString` drives `useSyncExternalStore` through its server-snapshot path + // (the third arg to `useSyncExternalStore`), and effects don't run during server + // rendering — so the store stays `idle` and the bridge maps that to `loading`. + const html = renderToString(); + expect(html).toBe('

loading

'); + expect(sub.publishersCreated()).toBe(0); + }); + + it('renders `disabled` on the server when the source is null', () => { + function Component() { + const { status } = useSubscription<{ value: number }>(null); + return

{status}

; + } + const html = renderToString(); + expect(html).toBe('

disabled

'); + }); + }); +}); diff --git a/packages/react/src/__typetests__/useSubscription-typetest.ts b/packages/react/src/__typetests__/useSubscription-typetest.ts new file mode 100644 index 000000000..e0e67fbe7 --- /dev/null +++ b/packages/react/src/__typetests__/useSubscription-typetest.ts @@ -0,0 +1,56 @@ +/* eslint-disable react-hooks/rules-of-hooks */ + +import { SolanaRpcResponse } from '@solana/rpc-types'; +import { ReactiveStreamSource } from '@solana/subscribable'; + +import { SubscriptionResult, useSubscription } from '../useSubscription'; + +const accountSource = null as unknown as ReactiveStreamSource>; +const slotSource = null as unknown as ReactiveStreamSource<{ slot: bigint }>; + +// [DESCRIBE] useSubscription +{ + // Envelope sources surface the envelope as-is — callers read `data.value` and + // `data.context.slot` directly. + { + const account = useSubscription(accountSource); + account satisfies SubscriptionResult>; + account.data satisfies SolanaRpcResponse<{ lamports: bigint }> | undefined; + account.data?.value satisfies { lamports: bigint } | undefined; + account.data?.context.slot satisfies bigint | undefined; + } + + // Raw notifications pass through unchanged. + { + useSubscription(slotSource) satisfies SubscriptionResult<{ slot: bigint }>; + } + + // Source argument accepts null + { + useSubscription<{ slot: bigint }>(null) satisfies SubscriptionResult<{ slot: bigint }>; + } + + // Options accept a `getAbortSignal` factory + { + useSubscription(slotSource, { getAbortSignal: () => AbortSignal.timeout(30_000) }); + } + + // `reconnect` accepts no args (uses the factory), an `{ abortSignal }` override, or + // `{ abortSignal: undefined }` to opt out of the factory entirely. + { + const { reconnect } = useSubscription(slotSource); + reconnect(); + reconnect({ abortSignal: AbortSignal.timeout(1_000) }); + reconnect({ abortSignal: undefined }); + // @ts-expect-error - abortSignal must be an AbortSignal (or undefined) + reconnect({ abortSignal: 'nope' }); + } + + // Status is a discriminated string, not a generic string + { + const { status } = useSubscription(slotSource); + status satisfies 'disabled' | 'error' | 'loaded' | 'loading'; + // @ts-expect-error - 'success' is the action-store vocabulary, not stream + status satisfies 'success'; + } +} diff --git a/packages/react/src/index.ts b/packages/react/src/index.ts index ce8705a6e..3921341fc 100644 --- a/packages/react/src/index.ts +++ b/packages/react/src/index.ts @@ -9,6 +9,7 @@ export * from './useClient'; export * from './useClientCapability'; export * from './useRequest'; export * from './useSignAndSendTransaction'; +export * from './useSubscription'; export * from './useSignIn'; export * from './useSignMessage'; export * from './useSignTransaction'; diff --git a/packages/react/src/staticStores.ts b/packages/react/src/staticStores.ts index e3f7b0432..23a4ae208 100644 --- a/packages/react/src/staticStores.ts +++ b/packages/react/src/staticStores.ts @@ -1,4 +1,4 @@ -import { ReactiveActionStore } from '@solana/subscribable'; +import { ReactiveActionStore, ReactiveState, ReactiveStreamStore } from '@solana/subscribable'; const DISABLED_ACTION_STATE = Object.freeze({ data: undefined, @@ -6,6 +6,12 @@ const DISABLED_ACTION_STATE = Object.freeze({ status: 'idle' as const, }); +const IDLE_STREAM_STATE: ReactiveState = Object.freeze({ + data: undefined, + error: undefined, + status: 'idle', +}); + const noopUnsubscribe = () => {}; const noopSubscribe = () => noopUnsubscribe; const rejectedAbortError = (): Promise => Promise.reject(new DOMException('Aborted', 'AbortError')); @@ -32,3 +38,25 @@ export function disabledActionStore(): ReactiveActionStore<[], T> { }), }; } + +/** + * A {@link ReactiveStreamStore} that never transitions out of `idle` and ignores every + * `connect` / `retry` / `reset` / `subscribe` call. Returned by `useSubscription` when its + * source is `null`, signalling that the subscription should be gated off — for example because + * a required input (an address) is not yet known. + * + * The hook's result bridge maps this store's `idle` state to a `disabled` status so call sites + * can distinguish "not enabled" from "loading" without an extra flag. + */ +export function disabledStreamStore(): ReactiveStreamStore { + return { + connect: noopUnsubscribe, + getError: () => undefined, + getState: () => undefined, + getUnifiedState: () => IDLE_STREAM_STATE, + reset: noopUnsubscribe, + retry: noopUnsubscribe, + subscribe: noopSubscribe, + withSignal: () => ({ connect: noopUnsubscribe }), + }; +} diff --git a/packages/react/src/useSubscription.ts b/packages/react/src/useSubscription.ts new file mode 100644 index 000000000..2d9cdc166 --- /dev/null +++ b/packages/react/src/useSubscription.ts @@ -0,0 +1,136 @@ +import { ReactiveStreamSource, ReactiveStreamStore } from '@solana/subscribable'; +import { useMemo } from 'react'; + +import { disabledStreamStore } from './staticStores'; +import { useReactiveStoreLifecycle } from './useReactiveStoreLifecycle'; +import { useSubscriptionResult } from './useSubscriptionResult'; + +// Module-level so it's a stable reference — keeps the `reconnect` callback from +// `useReactiveStoreLifecycle` referentially stable across renders. +function fireConnect(store: ReactiveStreamStore, signal: AbortSignal | undefined): void { + if (signal) store.withSignal(signal).connect(); + else store.connect(); +} + +/** + * Reactive state for a subscription managed by {@link useSubscription} (and other stream-store + * hooks built on top of it). + * + * Lifecycle: starts at `loading` (or `disabled` when the source is `null`) and opens the + * underlying stream on mount; transitions to `loaded` on the first notification or `error` on + * failure. `reconnect()` re-opens the stream — while a reconnect is in flight, `status` returns + * to `loading` and the stale `data` and/or `error` from the prior connection remain populated + * (stale-while-revalidate). + * + * @typeParam T - The notification type emitted by the underlying source. + */ +export type SubscriptionResult = { + /** + * The latest notification. `undefined` on the first load and while disabled. On `loading` + * after a prior outcome, on `error`, and on a subsequent reconnect, holds the last + * received notification. + */ + data: T | undefined; + /** + * Error from the subscription, or `undefined`. On `loading` after a prior `error`, holds the + * stale error so UIs can keep showing the failure context (e.g. a banner) while the + * reconnect is in flight. A subsequent `loaded` clears it. + */ + error: unknown; + /** + * Re-open the stream. By default each call mints a fresh signal from `getAbortSignal` (if + * configured) and threads it through the underlying store's `withSignal(signal).connect()`. + * Pass `{ abortSignal }` to override the configured factory for just this attempt. Pass + * `{ abortSignal: undefined }` to opt out of the factory entirely for this attempt and open + * with no caller-provided signal. + * + * Stable reference. Safe to put in `onClick` handlers or effect deps — typically wired up + * to a "Reconnect" button when `status === 'error'`. Calls `store.connect()` under the + * hood, so it always (re)opens the stream regardless of current status; the bridge + * transitions back through `loading` while preserving stale data and error. + */ + reconnect: (options?: { abortSignal?: AbortSignal | undefined }) => void; + /** + * Lifecycle status as a discriminated string: + * - `loading`: a connection is in progress. On the first connection, `data` and `error` are + * `undefined`. After a reconnect, `data` and `error` hold the last known values from the + * previous connection (stale-while-revalidate). + * - `loaded`: at least one notification has arrived. + * - `error`: the subscription failed; `data` holds the last known value (if any). + * - `disabled`: source was `null` — no subscription was opened. + */ + status: 'disabled' | 'error' | 'loaded' | 'loading'; +}; + +/** Options accepted by {@link useSubscription}. */ +export type UseSubscriptionOptions = { + /** + * Factory invoked on every connection (initial subscribe + every `reconnect()`). The returned + * signal is attached to that connection via the underlying store's + * `withSignal(signal).connect()`, so aborting it tears down that connection. + * + * The most common use is per-connection timeouts: + * `getAbortSignal: () => AbortSignal.timeout(30_000)` gives every connection its own + * 30-second clock that resets on `reconnect()`. + * + * Held in a ref synced to the latest render's closure — there is no need to memoize an inline + * factory. + */ + getAbortSignal?: () => AbortSignal; +}; + +/** + * Subscribe to a stream-store source and surface the latest notification as reactive state. The + * subscription opens on mount, re-opens whenever `source` changes identity, and tears down on + * unmount. + * + * Accepts any {@link ReactiveStreamSource} — the `{ reactiveStore() }` duck-type satisfied by + * `PendingRpcSubscriptionsRequest` (e.g. `client.rpcSubscriptions.accountNotifications(addr)`) + * and any plugin-authored stream object that follows the same convention. Pass `null` to + * disable; the result reports `status: 'disabled'`. + * + * Memoize the source with `useMemo` keyed on whatever inputs it depends on; stable identity is + * how the hook knows when to tear down and re-open. + * + * SSR-safe — on the server the connect effect doesn't run, so the store stays `idle` and the + * hook reports `status: 'loading'`. The first client render hydrates from that same `loading` + * paint, then commits the connect effect. + * + * @typeParam T - The notification type emitted by the source. + * + * @example + * ```tsx + * function AccountBalance({ address }: { address: Address }) { + * const client = useClient>(); + * const source = useMemo(() => client.rpcSubscriptions.accountNotifications(address), [client, address]); + * const { data, error, reconnect } = useSubscription(source); + * if (error) return ; + * return

{data ? `${data.value.lamports} lamports at slot ${data.context.slot}` : 'Connecting…'}

; + * } + * ``` + * + * @see {@link SubscriptionResult} + * @see {@link UseSubscriptionOptions} + */ +export function useSubscription( + source: ReactiveStreamSource | null, + options?: UseSubscriptionOptions, +): SubscriptionResult { + // One store per `source`. Both creation paths return an `idle` store; the initial connect + // lives in the lifecycle effect below so the memo body stays pure (StrictMode's dev + // double-render, and any future render-discard, won't open a subscription from a discarded + // render). + const store = useMemo(() => { + if (source == null) return disabledStreamStore(); + return source.reactiveStore(); + }, [source]); + + // Connect on commit, reset on store change / unmount, and expose the stable `reconnect` + // callback. `store.reset()` aborts the active connection via the stream store's internal + // controller, so under StrictMode's mount → cleanup → mount sequence the first connect is + // properly aborted before the second one fires. `disabledStreamStore` returns a store whose + // `connect`/`reset` are no-ops, so the null-source case needs no explicit gate. + const reconnect = useReactiveStoreLifecycle(store, fireConnect, options?.getAbortSignal); + + return useSubscriptionResult(store, reconnect, source == null); +} diff --git a/packages/react/src/useSubscriptionResult.ts b/packages/react/src/useSubscriptionResult.ts new file mode 100644 index 000000000..15180e52f --- /dev/null +++ b/packages/react/src/useSubscriptionResult.ts @@ -0,0 +1,37 @@ +import { ReactiveStreamStore } from '@solana/subscribable'; +import { useMemo, useSyncExternalStore } from 'react'; + +import { SubscriptionResult } from './useSubscription'; + +/** + * Subscribes to a {@link ReactiveStreamStore} and maps its + * `idle | loading | loaded | error` lifecycle onto the {@link SubscriptionResult} shape + * consumed by `useSubscription`. The notification passes through unchanged. + * + * For status we map the `idle` state to either `loading` or `disabled`, depending on the + * `disabled` param. This is because `useSubscription` automatically connects, so we treat + * `idle` as `loading` when a source is present and `disabled` when it's not. + * + * Stale-while-revalidate flows naturally through `state.data` / `state.error`, which the store + * preserves across `loading` transitions, so the bridge doesn't need to mirror them. + * + * @param store - The store to subscribe to. + * @param reconnect - A stable callback that re-opens the stream. Forwarded to the result so call + * sites have a single, hook-owned recovery affordance. + * @param disabled - When `true`, the result reports `status: 'disabled'`. Used by + * `useSubscription` to signal the null-source case. + * + * @internal + */ +export function useSubscriptionResult( + store: ReactiveStreamStore, + reconnect: (options?: { abortSignal?: AbortSignal | undefined }) => void, + disabled: boolean, +): SubscriptionResult { + const state = useSyncExternalStore(store.subscribe, store.getUnifiedState, store.getUnifiedState); + return useMemo(() => { + const status: SubscriptionResult['status'] = + state.status === 'idle' ? (disabled ? 'disabled' : 'loading') : state.status; + return { data: state.data, error: state.error, reconnect, status }; + }, [state, reconnect, disabled]); +}