Skip to content

Replace sync cache build with recorder operator#78

Draft
Andrewtangtang wants to merge 2 commits into
mainfrom
yuntang/add-buildcache-operator
Draft

Replace sync cache build with recorder operator#78
Andrewtangtang wants to merge 2 commits into
mainfrom
yuntang/add-buildcache-operator

Conversation

@Andrewtangtang

@Andrewtangtang Andrewtangtang commented Apr 19, 2026

Copy link
Copy Markdown
Collaborator

Closes: #75

Summary

This PR replaces the synchronous full-table build that the optimizer used to
run on cache miss with a PhysicalCacheRecorder operator that observes the
user's own query and builds the cache as a side-effect. The first query on
a new predicate no longer pays an extra full-table scan.

An observed_vectors watermark is added to RowGroupFilter so the cache
can distinguish "observed empty" (safe to prune) from "not yet observed"
(must scan). Recorder injection is narrow: it only runs on a full cache
miss. Partial refill and scan-skipped row groups are tracked as TODOs.

Operator Hierarchy

Cache miss — optimizer injects the recorder above LogicalGet:

LogicalFilter (user's predicate)
  └─ LogicalCacheRecorder                ← new; evaluates predicate, records
       └─ LogicalGet
            ├─ column_ids: [..., ROW_ID]
            ├─ projection_ids: [..., ROW_ID]   ← surfaced for recorder
            └─ table_filters[ROW_ID] = CacheExpressionFilter

Cache hit — no recorder, filter only:

LogicalFilter (user's predicate)
  └─ LogicalGet
       ├─ column_ids: [..., ROW_ID]
       ├─ projection_ids: [...]           ← ROW_ID read but not projected
       └─ table_filters[ROW_ID] = CacheExpressionFilter

Runtime flow for the cache-miss case:

PhysicalTableScan
  │ emits chunks (post zone-map / table filter)
  ▼
PhysicalCacheRecorder::Execute
  ├─ compute (rg_idx, vec_idx) from first row_id
  ├─ expr_executor.SelectExpression(chunk)
  └─ RecordChunkObservation -> task-local bitvector + watermark
  │ chunk.Reference(input)   (pass-through)
  ▼
PhysicalFilter / downstream ops

... at query end ...

PhysicalCacheRecorder::OperatorFinalize
  ├─ merge all task-local entries -> destination
  └─ store->Upsert(key, destination)

The optimizer used to call BuildCacheForPredicate on cache miss,
making the first query pay for an extra full table scan. It now
injects a PhysicalCacheRecorder above LogicalGet; the recorder
evaluates the predicate per chunk and merges task-local
bitvectors into the store at Finalize.

Add an observed_vectors watermark to RowGroupFilter to tell
"observed empty" apart from "not yet observed".
// at or above this index is unknown and must be scanned.
struct RowGroupFilter {
array<uint64_t, BITVECTOR_ARRAY_SIZE> matching_vectors = {};
idx_t observed_vectors = 0;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the watermark management is not quite good for cache invalidation.
how about we store another bitmask to record if the vector is observed?

struct RowGroupFilter {
	array<uint64_t, BITVECTOR_ARRAY_SIZE> matching_vectors = {};
    array<uint64_t, BITVECTOR_ARRAY_SIZE> observed = {};

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree.

Switch matching_vectors and observed from raw array<uint64_t, N>
to std::bitset<VECTORS_PER_ROW_GROUP>, and update related tests.
@peterxcli peterxcli mentioned this pull request Apr 21, 2026
entry = make_shared_ptr<ConditionCacheEntry>();
store->Upsert(context, key, entry);

// TODO: Also inject the recorder on a partial cache hit so the watermark can

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already removed the watermark?

make_uniq<CacheExpressionFilter>(std::move(filter_expr), entry));
}

void QueryConditionCacheOptimizer::InjectCacheRecorder(ClientContext &context, unique_ptr<LogicalOperator> &plan,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use plain words to explain the logic of this function, espically the rowid_chunk_idx and rowid_column_ids_pos


auto query_state = input.context.registered_state->Get<CacheOptimizerQueryState>(CacheOptimizerQueryState::NAME);
if (!query_state || query_state->cache_apply_pending.empty()) {
if (!query_state || (query_state->cache_apply_pending.empty() && query_state->cache_recorder_pending.empty())) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our disscusion is to keep keep the recorder anyway? and thus I think we can remove the two phase optimizer(query context)? because we aren't going to do any cache lookup thing when plan rewriting, we do all lookup and upsert things inside physical recorder operator and the condition cache filter expression when they are processing the incoming data chunk?

Comment on lines +68 to +71
auto first_idx = rowid_data.sel->get_index(0);
if (!rowid_data.validity.RowIsValid(first_idx)) {
return OperatorResultType::NEED_MORE_INPUT;
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this case possible? if this happened means no rows in the vector is valid, and I though all invalid vector wont flow into here?

@peterxcli peterxcli left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should think this through:

  1. When the recorder sees a vector that hasn’t been observed yet, it should upsert both the observed and matching_vectors bitmasks.
  2. Do we actually only need observed per row group? The recorder could upsert all cache entries for every vector it sees, then mark that row group as observed.
  3. The query cache expression filter should only skip a vector if that vector’s row group is marked as observed in the cache entry.

@Andrewtangtang Andrewtangtang marked this pull request as draft April 21, 2026 10:02
@peterxcli peterxcli mentioned this pull request Apr 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Incremental Cache Build

2 participants