Skip to content

Commit 0ba2bdb

Browse files
philcunliffeclaude
andcommitted
Add SqlExecutionBudget propagation through executor
Plumbs an optional SqlExecutionBudget through the executor context so materializing operators can detect runaway resource use and abort with a structured error indicating which limit was hit. Budget fields: maxRowsToMaterialize (global row counter across operators), maxHeapBytes (global byte counter), maxIntermediateBytes (per-operator byte counter), timeoutMs (wall-clock deadline), allowDerivedColumnScan (opts out of the scalar aggregate scanColumn fast path). Operators that materialize now call context.budget?.operator(name).addRow() before pinning each row: Sort, HashAggregate, ScalarAggregate slow path, HashJoin and NestedLoopJoin (right side), PositionalJoin (both sides), Distinct, UNION/INTERSECT/EXCEPT, and Window's non-streaming path. Streaming loops (filterRows, scanColumnAggregate chunks) check the timeout deadline so long-running pure-stream queries also abort. Errors throw a new SqlBudgetError with limit/value/max/operator fields. Budget passes through Subquery, LATERAL, and correlated subquery contexts. 28 new tests cover each limit reaching threshold, the structured error shape, fast-path opt-out, and confirm budget-free queries are unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f28772e commit 0ba2bdb

10 files changed

Lines changed: 615 additions & 7 deletions

File tree

src/execute/aggregates.js

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,13 @@ export function executeHashAggregate(plan, context) {
8080
columns: selectColumnNames(plan.columns, child.columns),
8181
maxRows: child.maxRows,
8282
async *rows() {
83+
const op = context.budget?.operator('HashAggregate')
8384
// Collect all rows
8485
/** @type {AsyncRow[]} */
8586
const allRows = []
8687
for await (const row of child.rows()) {
8788
if (context.signal?.aborted) return
89+
op?.addRow()
8890
allRows.push(row)
8991
}
9092

@@ -147,7 +149,8 @@ export function executeHashAggregate(plan, context) {
147149
*/
148150
export function executeScalarAggregate(plan, context) {
149151
// Fast path: use scanColumn when available
150-
const fast = tryColumnScanAggregate(plan, context)
152+
const allowFast = context.budget ? context.budget.allowDerivedColumnScan : true
153+
const fast = allowFast ? tryColumnScanAggregate(plan, context) : undefined
151154
if (fast) {
152155
return {
153156
columns: selectColumnNames(plan.columns, []),
@@ -163,11 +166,13 @@ export function executeScalarAggregate(plan, context) {
163166
numRows: plan.having ? undefined : 1,
164167
maxRows: 1,
165168
async *rows() {
169+
const op = context.budget?.operator('ScalarAggregate')
166170
// Collect all rows into single group
167171
/** @type {AsyncRow[]} */
168172
const group = []
169173
for await (const row of child.rows()) {
170174
if (context.signal?.aborted) return
175+
op?.addRow()
171176
group.push(row)
172177
}
173178

@@ -212,7 +217,7 @@ export function executeScalarAggregate(plan, context) {
212217
* @param {ExecuteContext} context
213218
* @returns {(() => AsyncGenerator<AsyncRow>) | undefined}
214219
*/
215-
function tryColumnScanAggregate(plan, { tables, signal }) {
220+
function tryColumnScanAggregate(plan, { tables, signal, budget }) {
216221
// No HAVING support in fast path
217222
if (plan.having) return
218223
// Child must be a direct table scan
@@ -243,7 +248,7 @@ function tryColumnScanAggregate(plan, { tables, signal }) {
243248

244249
for (const spec of specs) {
245250
columns.push(spec.alias)
246-
cells[spec.alias] = () => scanColumnAggregate({ table, spec, limit, offset, signal })
251+
cells[spec.alias] = () => scanColumnAggregate({ table, spec, limit, offset, signal, budget })
247252
}
248253

249254
yield { columns, cells }
@@ -283,15 +288,17 @@ function extractColumnAggSpec({ expr, alias }) {
283288
* @param {number} [options.limit]
284289
* @param {number} [options.offset]
285290
* @param {AbortSignal} [options.signal]
291+
* @param {import('../types.js').BudgetTracker} [options.budget]
286292
* @returns {Promise<SqlPrimitive>}
287293
*/
288-
async function scanColumnAggregate({ table, spec, limit, offset, signal }) {
294+
async function scanColumnAggregate({ table, spec, limit, offset, signal, budget }) {
289295
const values = table.scanColumn({ column: spec.column, limit, offset, signal })
290296

291297
if (spec.funcName === 'COUNT' && spec.distinct) {
292298
const seen = new Set()
293299
for await (const chunk of values) {
294300
if (signal?.aborted) return
301+
budget?.checkTimeout()
295302
for (let i = 0; i < chunk.length; i++) {
296303
const v = chunk[i]
297304
if (v == null) continue
@@ -305,6 +312,7 @@ async function scanColumnAggregate({ table, spec, limit, offset, signal }) {
305312
let count = 0
306313
for await (const chunk of values) {
307314
if (signal?.aborted) return
315+
budget?.checkTimeout()
308316
for (let i = 0; i < chunk.length; i++) {
309317
if (chunk[i] != null) count++
310318
}
@@ -322,6 +330,7 @@ async function scanColumnAggregate({ table, spec, limit, offset, signal }) {
322330

323331
for await (const chunk of values) {
324332
if (signal?.aborted) return
333+
budget?.checkTimeout()
325334
for (let i = 0; i < chunk.length; i++) {
326335
const v = chunk[i]
327336
if (v == null) continue

src/execute/budget.js

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/**
2+
* @import { BudgetOperator, BudgetTracker, SqlExecutionBudget } from '../types.js'
3+
*/
4+
5+
const DEFAULT_ROW_BYTES = 64
6+
7+
/**
8+
* Structured error thrown when a SQL execution budget is exceeded. The
9+
* `limit` field identifies which budget field was breached so callers can
10+
* decide whether to abort, fall back to a different strategy, or surface a
11+
* user-facing message.
12+
*/
13+
export class SqlBudgetError extends Error {
14+
/**
15+
* @param {Object} options
16+
* @param {'maxRowsToMaterialize' | 'maxHeapBytes' | 'maxIntermediateBytes' | 'timeoutMs'} options.limit
17+
* @param {number} options.value - actual measured value at the time of abort
18+
* @param {number} options.max - configured budget limit
19+
* @param {string} [options.operator] - operator name where the limit was hit
20+
*/
21+
constructor({ limit, value, max, operator }) {
22+
const where = operator ? ` (operator=${operator})` : ''
23+
const message = `SQL execution budget exceeded: ${limit}=${value} > ${max}${where}`
24+
super(message)
25+
this.name = 'SqlBudgetError'
26+
this.limit = limit
27+
this.value = value
28+
this.max = max
29+
if (operator !== undefined) this.operator = operator
30+
}
31+
}
32+
33+
/**
34+
* Creates a budget tracker for a single query execution. Returns undefined
35+
* when no budget is provided so callers can use `tracker?.operator(...)`
36+
* without conditional plumbing through every operator.
37+
*
38+
* @param {SqlExecutionBudget} [budget]
39+
* @returns {BudgetTracker | undefined}
40+
*/
41+
export function createBudget(budget) {
42+
if (!budget) return undefined
43+
const startTime = Date.now()
44+
const { timeoutMs } = budget
45+
const deadline = timeoutMs !== undefined ? startTime + timeoutMs : undefined
46+
47+
let totalRows = 0
48+
let totalHeapBytes = 0
49+
50+
/**
51+
* @param {string} [operator]
52+
*/
53+
function timeoutCheck(operator) {
54+
if (deadline === undefined || timeoutMs === undefined) return
55+
if (Date.now() > deadline) {
56+
throw new SqlBudgetError({
57+
limit: 'timeoutMs',
58+
value: Date.now() - startTime,
59+
max: timeoutMs,
60+
operator,
61+
})
62+
}
63+
}
64+
65+
return {
66+
budget,
67+
allowDerivedColumnScan: budget.allowDerivedColumnScan !== false,
68+
checkTimeout() { timeoutCheck() },
69+
operator(name) {
70+
let opBytes = 0
71+
/** @type {BudgetOperator} */
72+
const handle = {
73+
addRow(approxBytes) {
74+
const bytes = approxBytes ?? DEFAULT_ROW_BYTES
75+
totalRows++
76+
if (budget.maxRowsToMaterialize !== undefined && totalRows > budget.maxRowsToMaterialize) {
77+
throw new SqlBudgetError({
78+
limit: 'maxRowsToMaterialize',
79+
value: totalRows,
80+
max: budget.maxRowsToMaterialize,
81+
operator: name,
82+
})
83+
}
84+
opBytes += bytes
85+
if (budget.maxIntermediateBytes !== undefined && opBytes > budget.maxIntermediateBytes) {
86+
throw new SqlBudgetError({
87+
limit: 'maxIntermediateBytes',
88+
value: opBytes,
89+
max: budget.maxIntermediateBytes,
90+
operator: name,
91+
})
92+
}
93+
totalHeapBytes += bytes
94+
if (budget.maxHeapBytes !== undefined && totalHeapBytes > budget.maxHeapBytes) {
95+
throw new SqlBudgetError({
96+
limit: 'maxHeapBytes',
97+
value: totalHeapBytes,
98+
max: budget.maxHeapBytes,
99+
operator: name,
100+
})
101+
}
102+
timeoutCheck(name)
103+
},
104+
checkTimeout() { timeoutCheck(name) },
105+
}
106+
return handle
107+
},
108+
}
109+
}

src/execute/execute.js

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { statementScope } from '../plan/columns.js'
88
import { validateScan, validateTable } from '../validation/tables.js'
99
import { executeHashAggregate, executeScalarAggregate } from './aggregates.js'
1010
import { filterBatches, limitBatches, projectBatchesSimple } from './batchOps.js'
11+
import { createBudget } from './budget.js'
1112
import { executeHashJoin, executeNestedLoopJoin, executePositionalJoin } from './join.js'
1213
import { executeSort } from './sort.js'
1314
import { addBounds, minBounds, stableRowKey } from './utils.js'
@@ -24,7 +25,7 @@ import { executeWindow } from './window.js'
2425
* @param {ExecuteSqlOptions} options
2526
* @returns {QueryResults}
2627
*/
27-
export function executeSql({ tables, query, functions, signal }) {
28+
export function executeSql({ tables, query, functions, signal, budget }) {
2829
const parsed = typeof query === 'string' ? parseSql({ query, functions }) : query
2930

3031
// Normalize tables: convert arrays to AsyncDataSource
@@ -47,7 +48,7 @@ export function executeSql({ tables, query, functions, signal }) {
4748
const ctePlans = new Map()
4849
/** @type {Map<string, string[]>} */
4950
const cteColumns = new Map()
50-
const context = { tables: normalizedTables, functions, signal, scope, ctePlans, cteColumns }
51+
const context = { tables: normalizedTables, functions, signal, scope, ctePlans, cteColumns, budget: createBudget(budget) }
5152
const plan = planSql({ query: parsed, functions, tables: normalizedTables, ctePlans, cteColumns })
5253
return executePlan({ plan, context })
5354
}
@@ -425,6 +426,7 @@ async function* filterRows(rows, condition, context, limit) {
425426
buffer.push({ row, rowIndex })
426427

427428
if (buffer.length >= chunkSize) {
429+
context.budget?.checkTimeout()
428430
const results = await Promise.all(buffer.map(b =>
429431
evaluateExpr({ node: condition, row: b.row, rowIndex: b.rowIndex, context })
430432
))
@@ -626,6 +628,7 @@ function executeDistinct(plan, context) {
626628
maxRows: child.maxRows,
627629
async *rows() {
628630
const { signal } = context
631+
const op = context.budget?.operator('Distinct')
629632
const MAX_CHUNK = 256
630633

631634
const seen = new Set()
@@ -642,6 +645,7 @@ function executeDistinct(plan, context) {
642645
for (let i = 0; i < buffer.length; i++) {
643646
const key = await keys[i]
644647
if (!seen.has(key)) {
648+
op?.addRow()
645649
seen.add(key)
646650
yield buffer[i]
647651
}
@@ -656,6 +660,7 @@ function executeDistinct(plan, context) {
656660
for (let i = 0; i < buffer.length; i++) {
657661
const key = await keys[i]
658662
if (!seen.has(key)) {
663+
op?.addRow()
659664
seen.add(key)
660665
yield buffer[i]
661666
}
@@ -714,11 +719,13 @@ function executeSetOperation(plan, context) {
714719
maxRows: addBounds(left.maxRows, right.maxRows),
715720
async *rows() {
716721
// UNION: yield deduplicated rows from both sides
722+
const op = context.budget?.operator('Union')
717723
const seen = new Set()
718724
for await (const row of left.rows()) {
719725
if (signal?.aborted) return
720726
const key = await stableRowKey(row)
721727
if (!seen.has(key)) {
728+
op?.addRow()
722729
seen.add(key)
723730
yield row
724731
}
@@ -727,6 +734,7 @@ function executeSetOperation(plan, context) {
727734
if (signal?.aborted) return
728735
const key = await stableRowKey(row)
729736
if (!seen.has(key)) {
737+
op?.addRow()
730738
seen.add(key)
731739
yield row
732740
}
@@ -741,12 +749,14 @@ function executeSetOperation(plan, context) {
741749
columns: left.columns,
742750
maxRows: minBounds(left.maxRows, right.maxRows),
743751
async *rows() {
752+
const op = context.budget?.operator('Intersect')
744753
// Materialize right side keys
745754
/** @type {Map<any, number>} */
746755
const rightKeys = new Map()
747756
for await (const row of right.rows()) {
748757
if (signal?.aborted) return
749758
const key = await stableRowKey(row)
759+
if (!rightKeys.has(key)) op?.addRow()
750760
rightKeys.set(key, (rightKeys.get(key) ?? 0) + 1)
751761
}
752762

@@ -768,6 +778,7 @@ function executeSetOperation(plan, context) {
768778
if (signal?.aborted) return
769779
const key = await stableRowKey(row)
770780
if (rightKeys.has(key) && !seen.has(key)) {
781+
op?.addRow()
771782
seen.add(key)
772783
yield row
773784
}
@@ -783,12 +794,14 @@ function executeSetOperation(plan, context) {
783794
columns: left.columns,
784795
maxRows: left.maxRows,
785796
async *rows() {
797+
const op = context.budget?.operator('Except')
786798
// Materialize right side keys
787799
/** @type {Map<any, number>} */
788800
const rightKeys = new Map()
789801
for await (const row of right.rows()) {
790802
if (signal?.aborted) return
791803
const key = await stableRowKey(row)
804+
if (!rightKeys.has(key)) op?.addRow()
792805
rightKeys.set(key, (rightKeys.get(key) ?? 0) + 1)
793806
}
794807

@@ -811,6 +824,7 @@ function executeSetOperation(plan, context) {
811824
if (signal?.aborted) return
812825
const key = await stableRowKey(row)
813826
if (!rightKeys.has(key) && !seen.has(key)) {
827+
op?.addRow()
814828
seen.add(key)
815829
yield row
816830
}

src/execute/join.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export function executeNestedLoopJoin(plan, context) {
2323
return {
2424
columns: mergeColumnNames(left.columns, right.columns, plan.leftAlias, plan.rightAlias),
2525
async *rows() {
26+
const op = context.budget?.operator('NestedLoopJoin')
2627
const leftTable = plan.leftAlias
2728
const rightTable = plan.rightAlias
2829

@@ -31,6 +32,7 @@ export function executeNestedLoopJoin(plan, context) {
3132
const rightRows = []
3233
for await (const row of right.rows()) {
3334
if (context.signal?.aborted) return
35+
op?.addRow()
3436
rightRows.push(row)
3537
}
3638

@@ -153,6 +155,7 @@ export function executePositionalJoin(plan, context) {
153155
maxRows: maxBounds(left.maxRows, right.maxRows),
154156
async *rows() {
155157
const { signal } = context
158+
const op = context.budget?.operator('PositionalJoin')
156159
const leftTable = plan.leftAlias
157160
const rightTable = plan.rightAlias
158161

@@ -161,13 +164,15 @@ export function executePositionalJoin(plan, context) {
161164
const leftRows = []
162165
for await (const row of left.rows()) {
163166
if (signal?.aborted) return
167+
op?.addRow()
164168
leftRows.push(row)
165169
}
166170

167171
/** @type {AsyncRow[]} */
168172
const rightRows = []
169173
for await (const row of right.rows()) {
170174
if (signal?.aborted) return
175+
op?.addRow()
171176
rightRows.push(row)
172177
}
173178

@@ -198,6 +203,7 @@ export function executeHashJoin(plan, context) {
198203
return {
199204
columns: mergeColumnNames(left.columns, right.columns, plan.leftAlias, plan.rightAlias),
200205
async *rows() {
206+
const op = context.budget?.operator('HashJoin')
201207
const leftTable = plan.leftAlias
202208
const rightTable = plan.rightAlias
203209
const { leftKeys, rightKeys, residual } = plan
@@ -207,6 +213,7 @@ export function executeHashJoin(plan, context) {
207213
const rightRows = []
208214
for await (const row of right.rows()) {
209215
if (context.signal?.aborted) return
216+
op?.addRow()
210217
rightRows.push(row)
211218
}
212219

src/execute/sort.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,10 +355,12 @@ export function executeSort(plan, context) {
355355
}
356356

357357
// Full sort path: buffer all rows, then sort.
358+
const op = context.budget?.operator('Sort')
358359
/** @type {AsyncRow[]} */
359360
const rows = []
360361
for await (const row of child.rows()) {
361362
if (context.signal?.aborted) return
363+
op?.addRow()
362364
rows.push(row)
363365
}
364366

0 commit comments

Comments
 (0)