Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/perf-frame-decoder-index-pointer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/start-client-core': patch
---

perf: drop consumed chunks from the client frame decoder buffer with an O(1) head pointer instead of `Array.prototype.shift()` (O(n)). The previous approach degraded to O(n²) when a single large frame (e.g. a big `RawStream` payload) was assembled from many small network reads.
32 changes: 25 additions & 7 deletions packages/start-client-core/src/client-rpc/frame-decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ export function createFrameDecoder(
inputReader = reader

const bufferList: Array<Uint8Array> = []
// Index of the first un-consumed chunk in bufferList. Advancing this
// pointer is O(1); using bufferList.shift() to drop a consumed chunk is
// O(n) and degrades to O(n^2) when a single large frame is assembled from
// many small chunks (e.g. a big RawStream payload split across reads).
let bufferHead = 0
let totalLength = 0

/**
Expand All @@ -146,7 +151,7 @@ export function createFrameDecoder(
} | null {
if (totalLength < FRAME_HEADER_SIZE) return null

const first = bufferList[0]!
const first = bufferList[bufferHead]!

// Fast path: header fits entirely in first chunk (common case)
if (first.length >= FRAME_HEADER_SIZE) {
Expand All @@ -170,7 +175,7 @@ export function createFrameDecoder(
const headerBytes = new Uint8Array(FRAME_HEADER_SIZE)
let offset = 0
let remaining = FRAME_HEADER_SIZE
for (let i = 0; i < bufferList.length && remaining > 0; i++) {
for (let i = bufferHead; i < bufferList.length && remaining > 0; i++) {
const chunk = bufferList[i]!
const toCopy = Math.min(chunk.length, remaining)
headerBytes.set(chunk.subarray(0, toCopy), offset)
Expand Down Expand Up @@ -205,22 +210,35 @@ export function createFrameDecoder(
let offset = 0
let remaining = count

while (remaining > 0 && bufferList.length > 0) {
const chunk = bufferList[0]
if (!chunk) break
while (remaining > 0 && bufferHead < bufferList.length) {
const chunk = bufferList[bufferHead]!
const toCopy = Math.min(chunk.length, remaining)
result.set(chunk.subarray(0, toCopy), offset)

offset += toCopy
remaining -= toCopy

if (toCopy === chunk.length) {
bufferList.shift()
// Whole chunk consumed: release it and advance the head pointer
// (O(1)) instead of bufferList.shift() (O(n)).
bufferList[bufferHead++] = EMPTY_BUFFER
} else {
bufferList[0] = chunk.subarray(toCopy)
bufferList[bufferHead] = chunk.subarray(toCopy)
}
}

// Drop consumed chunks so bufferList doesn't grow unbounded over a
// long-lived stream. Fully drained is the common terminal state and
// resets in O(1); otherwise splice off the consumed prefix once it grows
// past a small threshold (amortized O(1) per consumed chunk).
if (bufferHead === bufferList.length) {
bufferList.length = 0
bufferHead = 0
} else if (bufferHead >= 32) {
bufferList.splice(0, bufferHead)
bufferHead = 0
}

totalLength -= count
return result
}
Expand Down
85 changes: 85 additions & 0 deletions packages/start-client-core/tests/frame-decoder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -558,5 +558,90 @@ describe('frame-decoder', () => {
const { done: finalDone } = await reader.read()
expect(finalDone).toBe(true)
})

it('reassembles a large chunk payload delivered one byte at a time', async () => {
// Forces the header slow path AND many whole-chunk consumptions within a
// single extract, exercising the head-pointer advance + fully-drained
// reset. With the previous bufferList.shift() this path was O(n^2).
const payload = new Uint8Array(200)
for (let i = 0; i < payload.length; i++) payload[i] = (i * 7) % 256

const jsonFrame = encodeJSONFrame('{"ref":21}')
const chunkFrame = encodeChunkFrame(21, payload)
const endFrame = encodeEndFrame(21)

const combined = new Uint8Array(
jsonFrame.length + chunkFrame.length + endFrame.length,
)
combined.set(jsonFrame, 0)
combined.set(chunkFrame, jsonFrame.length)
combined.set(endFrame, jsonFrame.length + chunkFrame.length)

const input = new ReadableStream<Uint8Array>({
start(controller) {
for (let i = 0; i < combined.length; i++) {
controller.enqueue(combined.subarray(i, i + 1))
}
controller.close()
},
})

const { getOrCreateStream, jsonChunks } = createFrameDecoder(input)
const stream21 = getOrCreateStream(21)

const jsonReader = jsonChunks.getReader()
const { value: jsonValue } = await jsonReader.read()
expect(jsonValue).toBe('{"ref":21}')

const rawReader = stream21.getReader()
const received: Array<number> = []
while (true) {
const { done, value } = await rawReader.read()
if (done) break
if (value) received.push(...value)
}
expect(received).toEqual(Array.from(payload))
})

it('decodes many frames when reads never align with frame boundaries', async () => {
// 100-byte frames fed in 7-byte reads never align until the very end, so
// consumed chunks accumulate and the head pointer climbs past the
// compaction threshold repeatedly — exercising the splice() prefix drop.
const FRAME_COUNT = 7
const expected: Array<string> = []
const frames: Array<Uint8Array> = []
for (let i = 0; i < FRAME_COUNT; i++) {
const payload = `frame-${i}`.padEnd(91, '.') // 91 bytes => 100-byte frame
expected.push(payload)
frames.push(encodeJSONFrame(payload))
}

const totalLen = frames.reduce((acc, f) => acc + f.length, 0)
const combined = new Uint8Array(totalLen)
let offset = 0
for (const f of frames) {
combined.set(f, offset)
offset += f.length
}

const input = new ReadableStream<Uint8Array>({
start(controller) {
for (let i = 0; i < combined.length; i += 7) {
controller.enqueue(combined.subarray(i, i + 7))
}
controller.close()
},
})

const { jsonChunks } = createFrameDecoder(input)
const reader = jsonChunks.getReader()
const received: Array<string> = []
while (true) {
const { done, value } = await reader.read()
if (done) break
received.push(value)
}
expect(received).toEqual(expected)
})
})
})