Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/great-symbols-kiss.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@solana/kit': patch
---

Fix `createReactiveStoreWithInitialValueAndSlotTracking` stranding the store in `loading` when a fresh `connect()` window is answered only by a stale-slot value

`lastUpdateSlot` persists across `connect()` windows so the surfaced value never regresses. Previously, a successful response at a slot older than the high-water mark was dropped entirely — including the status transition — so a reconnect (e.g. a `useTrackedData` `refresh()`) answered by a lagging RPC node while a quiet account's subscription emitted nothing would sit in `loading` forever. A stale-slot response now settles the store back to `loaded`, retaining the newer data it already holds rather than regressing to the older value.
Original file line number Diff line number Diff line change
Expand Up @@ -637,4 +637,73 @@ describe('createReactiveStoreWithInitialValueAndSlotTracking', () => {
expect(subscriber).toHaveBeenCalledTimes(1);
});
});

// `lastUpdateSlot` persists across `connect()` windows so the surfaced value never regresses.
// But a fresh connect window must still be able to *settle* `loading` when a source answers
// successfully — even at an older slot — otherwise a refresh answered by a lagging node leaves
// a quiet account (whose subscription emits nothing) stuck in `loading` forever.
describe('reconnecting after a value was already loaded', () => {
it('settles to `loaded` with the retained newer data when a stale-slot initial value arrives on the new window', async () => {
expect.assertions(1);
const { instances, source: initialValueSource } = createMockInitialValueSource();
const { source: streamSource } = createMockStreamSource();
const store = createStore(initialValueSource, streamSource);
// Window 1: settle at slot 100.
store.connect();
instances[0].resolve(rpcResponse(100, { count: 42 }));
await jest.runAllTimersAsync();
// Window 2 (refresh): a lagging node answers the re-fetch at an older slot, and the
// quiet account's subscription emits nothing — so this stale value is the only proof
// of liveness.
store.connect();
instances[1].resolve(rpcResponse(99, { count: 7 }));
await jest.runAllTimersAsync();
// The store must leave `loading`, retaining the newer slot-100 data rather than
// regressing to the stale slot-99 value.
expect(store.getUnifiedState()).toStrictEqual({
data: { context: { slot: 100n }, value: 42 },
error: undefined,
status: 'loaded',
});
});
it('settles to `loaded` with the retained newer data when a stale-slot stream notification arrives on the new window', async () => {
expect.assertions(1);
const { source: initialValueSource } = createMockInitialValueSource();
const { publishers, source: streamSource } = createMockStreamSource();
const store = createStore(initialValueSource, streamSource);
// Window 1: settle at slot 100 via a stream notification.
store.connect();
await jest.runAllTimersAsync();
publishers[0].publish('data', rpcResponse(100, { count: 42 }));
await jest.runAllTimersAsync();
// Window 2 (refresh): the new subscription replays an older slot before catching up.
store.connect();
await jest.runAllTimersAsync();
publishers[1].publish('data', rpcResponse(99, { count: 7 }));
await jest.runAllTimersAsync();
expect(store.getUnifiedState()).toStrictEqual({
data: { context: { slot: 100n }, value: 42 },
error: undefined,
status: 'loaded',
});
});
it('notifies subscribers when a stale-slot value settles the new window back to `loaded`', async () => {
expect.assertions(1);
const { instances, source: initialValueSource } = createMockInitialValueSource();
const { source: streamSource } = createMockStreamSource();
const store = createStore(initialValueSource, streamSource);
store.connect();
instances[0].resolve(rpcResponse(100, { count: 42 }));
await jest.runAllTimersAsync();
store.connect();
await jest.runAllTimersAsync();
// Subscribe after the `loaded → loading` reconnect transition so we only observe the
// settle caused by the stale value.
const subscriber = jest.fn();
store.subscribe(subscriber);
instances[1].resolve(rpcResponse(99, { count: 7 }));
await jest.runAllTimersAsync();
expect(subscriber).toHaveBeenCalledTimes(1);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,16 @@ export function createReactiveStoreWithInitialValueAndSlotTracking<TInitialValue
innerController.abort(err);
}

// `lastUpdateSlot` persists across reconnects so the store never regresses. If a source
// delivers a value at a slot older than one we've already seen, we keep waiting for
// something newer before leaving `loading`.
// `lastUpdateSlot` persists across reconnects so the surfaced value never regresses.
function handleSlottedValue({ context: { slot }, value }: SolanaRpcResponse<TItem>) {
if (signal.aborted) return;
if (slot < lastUpdateSlot) return;
if (slot < lastUpdateSlot) {
// Stale slot: keep the newer `data` we already hold, but still set `loaded`.
if (currentState.data !== undefined) {
setState({ data: currentState.data, error: undefined, status: 'loaded' });
}
return;
}
lastUpdateSlot = slot;
setState({ data: { context: { slot }, value }, error: undefined, status: 'loaded' });
}
Expand Down
Loading