Skip to content

[vpj] Snapshot-at-T: new VPJ mode (rewind knobs + offline RT merge)#2851

Open
sixpluszero wants to merge 7 commits into
linkedin:mainfrom
sixpluszero:snapshot-at-t-rewind-config-knobs
Open

[vpj] Snapshot-at-T: new VPJ mode (rewind knobs + offline RT merge)#2851
sixpluszero wants to merge 7 commits into
linkedin:mainfrom
sixpluszero:snapshot-at-t-rewind-config-knobs

Conversation

@sixpluszero

@sixpluszero sixpluszero commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Problem Statement

A full (BATCH) push to a hybrid store is not servable when the batch data finishes loading. After End-Of-Push, the new version replays the real-time topic from (SOP or EOP) - rewindTimeInSeconds up to the live tail before it can go online. For stores with a multi-day rewind, this replay dominates the push wall-clock and sits on the version-readiness critical path. On a representative A/A + write-compute store with a ~3-day rewind, the rewind measured at ~7 hours, ~84% of each push's wall-clock (two pushes × three fabrics).

A push that fixes a cutoff timestamp T and ships a batch dataset already current as of T would only need to rewind the short [T, now] tail. This PR adds the per-VPJ control knobs for that rewind-shortening. The offline merge that produces the combined dataset is a follow-up (see Solution).

Solution

Adds per-push-job knobs that, for a hybrid full push, set the existing rewindTimeInSecondsOverride to (now - T) + buffer (reusing the controller's handleRewindTimeOverride path and REWIND_FROM_SOP semantics), gated by a threshold so it only triggers when the store's effective rewind is large enough to be worth it.

The gate (maybeApplySnapshotAtTRewindOverride) is skipped for incremental/KIF-repush jobs, non-hybrid stores, when an explicit rewind override is already set, or when the store's effective rewind is below the threshold.

Shortening the rewind is correct only when the batch dataset already incorporates real-time data up to T. The master flag defaults off and must remain off in production until the merge below is wired end to end, otherwise the shortened rewind would skip nearline writes between the offline cutoff and T (a data gap).

Data plane: offline RT-to-batch merge core

SnapshotAtTRecordMerger is the correctness core that produces the merged dataset the shortened rewind relies on. It folds RT PUT/UPDATE/DELETE records onto the batch base by reusing the server's own MergeConflictResolver (da-vinci-client is already a venice-push-job dependency), and emits the merged value and its RMD, so the offline result equals what a server produces by replaying the same RT during rewind, including Active-Active field-level timestamps and write-compute partial updates. seedFromBatch establishes the batch-sentinel RMD (timestamp 0) exactly as a server stores batch data, so any real RT write folded in afterwards wins. Values are held as byte[] so a ByteBuffer position the resolver advances while reading inputs cannot corrupt a later operation.

The data plane is built around this engine:

  • SnapshotAtTRtReader reads a region's RT topic from start to the current tail and normalizes each PUT/UPDATE/DELETE, tagging it with the region's colo id (so the merger does cross-region conflict resolution). Bytes are copied out of the consumed message immediately, since pooled consumer buffers are reused across polls.
  • SnapshotAtTPushExecutor seeds the merger from the batch value per key, folds in all regions' RT records, and writes the merged value+RMD (or an RMD-carrying delete) via a VeniceWriter, which partitions each key itself.

This is wired into VenicePushJob.run() as a first-class mode: when the gate triggers, the job verifies the store is hybrid + Active-Active, reads the offline batch input in-process, reads each region's RT (per snapshot.at.t.rt.region.brokers), merges, compresses to the version's compression strategy, and produces the merged version through a single writer that owns SOP+data+EOP (so Data-Integrity-Validation stays consistent; the controller skips SOP for this mode). SnapshotAtTSchemaRepository supplies the store's real value/RMD/derived schemas to the merge. So a single VenicePushJob run performs the whole new-mode push, for NO_OP and ZSTD_WITH_DICT stores alike.

The remaining productionization is auto-discovering the per-region RT brokers from the controller's Active-Active source-fabric config (today they're passed via snapshot.at.t.rt.region.brokers).

Code changes

  • Added new code behind a config. Configs and defaults:
    • snapshot.at.t.rewind.enabled (boolean, default false) — master switch
    • snapshot.at.t.min.rewind.threshold.seconds (long, default 1 day = 86400) — skip when the store's effective rewind is below this
    • snapshot.at.t.cutoff.epoch.seconds (long, default = start of push) — the cutoff T
    • snapshot.at.t.rewind.buffer.seconds (long, default 60) — gap-safety buffer
  • Introduced new log lines. Info-level, one per push decision (job setup, not a hot path), so no rate limiting needed.

Concurrency-Specific Checks

  • No race conditions: the gate is a pure function over a single push job's PushJobSetting, run once during job setup; no shared mutable state.
  • No new synchronization, blocking calls, shared collections, or multi-threaded code is introduced.

How was this PR tested?

  • New unit tests (control plane): gate applied (default cutoff and explicit past cutoff), threshold boundary, below-threshold skip, ineligible jobs (disabled / non-hybrid / incremental / repush / explicit-override), future-cutoff rejection.
  • New unit tests (merge core): value-level PUT (batch sentinel, RT write wins, stale write ignored), DELETE tombstone carrying RMD, and write-compute field-level partial update asserting the per-field RMD timestamp.
  • New integration test (control plane): testSnapshotAtTRewindOverrideReachesControllerRequest exercises config → gate → createNewStoreVersion and asserts the shortened rewind reaches requestTopicForWrites.
  • New integration tests (2-region A/A cluster), both passing (~48s each): SnapshotAtTPushIntegrationTest produces RT to two regions with logical timestamps (including a cross-region conflict key) and asserts every key's served value in both regions equals the correct merged result —
    • testSnapshotAtTMergeAcrossTwoRegions drives the data plane directly (request short-rewind version → read both regions' RT → merge → produce value+RMD → SOP/EOP);
    • testSnapshotAtTViaVenicePushJob drives it through a single VenicePushJob run with snapshot.at.t.enabled=true and a 0s threshold (so the mode triggers on the store's 3600s rewind) plus a batch Avro input.
  • Ran locally — all pass: ./gradlew :clients:venice-push-job:test --tests "*VenicePushJobTest" --tests "*SnapshotAtTRecordMergerTest" and ./gradlew :internal:venice-test-common:integrationTest --tests "*SnapshotAtTPushIntegrationTest".

Does this PR introduce any user-facing or breaking changes?

  • No. The new behavior is entirely behind snapshot.at.t.rewind.enabled, which defaults to false; existing pushes are unchanged.

sixpluszero and others added 2 commits June 10, 2026 01:01
Adds per-push-job knobs that let a hybrid full push ship a shortened rewind
window ([T, now]) instead of the store's full configured rewind, gated by a
threshold so it only triggers when the store's effective rewind is large
enough
to be worth it.

New configs (VenicePushJobConstants):
  - snapshot.at.t.rewind.enabled (master switch, default false)
  - snapshot.at.t.min.rewind.threshold.seconds (skip when the store's
effective
    rewind is below this; default 1 day)
  - snapshot.at.t.cutoff.epoch.seconds (the cutoff T; default = start of push)
  - snapshot.at.t.rewind.buffer.seconds (gap-safety buffer; default 60)

The gate (maybeApplySnapshotAtTRewindOverride) sets the existing
rewindTimeInSecondsOverride to (now - T) + buffer and requires
REWIND_FROM_SOP,
reusing the controller's handleRewindTimeOverride path. It is skipped for
incremental/KIF-repush jobs, non-hybrid stores, when an explicit rewind
override
is already set, or when the effective rewind is below the threshold.

Control plane only: shortening the rewind is correct only when the batch
dataset
already incorporates real-time data up to T (the offline merge data plane,
follow-up). The master flag defaults off and must remain off in production
until
that data plane lands, otherwise the shortened rewind would skip nearline
writes
between the offline cutoff and T (a data gap).

Tests: unit coverage for the gate (applied / below-threshold / boundary /
ineligible jobs / future-cutoff) plus an integration test asserting the
shortened
rewind reaches the controller's requestTopicForWrites.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…th RMD

The snapshot-at-T data plane's correctness core. Folds real-time
PUT/UPDATE/DELETE
records onto a batch base value by reusing the server's own
MergeConflictResolver
(da-vinci-client), and emits the merged value AND its Replication Metadata
(RMD) so the
merged result is identical to what a server produces by replaying the same RT
during a
hybrid rewind.

- seedFromBatch establishes the batch-sentinel RMD (timestamp 0) exactly as a
server
  stores batch data, so any real RT write folded in afterwards wins.
- applyPut/applyUpdate/applyDelete fold RT records via the resolver;
finalizeRecord emits
  (value, serialized RMD, valueSchemaId, rmdProtocolVersion, isDelete).
- Handles value-level and field-level (write-compute / partial-update) RMD;
the
  rmdUseFieldLevelTimestamp flag must match the store config.
- Value is stored as byte[] (position-safe) so the resolver advancing a
ByteBuffer's
  position while reading inputs cannot corrupt a later operation.

This is the merge engine only; wiring it into the multi-region RT reader +
reducer +
writer pipeline is the remaining data-plane integration.

Tests: value-level PUT (sentinel -> RT-wins -> stale-ignored), DELETE
tombstone with RMD,
and write-compute field-level partial update (per-field RMD timestamp).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sixpluszero sixpluszero changed the title [vpj] Add per-VPJ snapshot-at-T rewind-shortening config knobs [vpj] Snapshot-at-T: rewind-shortening knobs + offline RT-merge core Jun 10, 2026
merge-and-produce executor, 2-region A/A integration test

Adds the data-plane components that produce the snapshot-at-T merged version,
plus an
end-to-end multi-region integration test proving correctness.

- SnapshotAtTRtReader: reads a region's RT topic from start to the current
tail (bounded by
  the start->end record count, with optional event-time cutoff), normalizing
each PUT/UPDATE/
  DELETE for the merge and tagging it with the region's colo id. Bytes are
copied out of the
  consumed message immediately so a pooled consumer buffer cannot corrupt a
record processed
  later.
- SnapshotAtTRtRecord: normalized RT record (op, key, payload, value/update
schema ids, write
  timestamp, colo id).
- SnapshotAtTPushExecutor: per key, seeds the merger from the batch value,
folds all regions'
  RT records, and writes the merged value+RMD (or an RMD-carrying delete) via
a VeniceWriter,
  which partitions each key itself.
- SnapshotAtTPushIntegrationTest (2-region A/A cluster): produces RT to BOTH
regions with
  logical timestamps (including a cross-region conflict key), runs the
new-mode push (request a
  version with a short rewind override -> read both regions' RT -> merge ->
produce value+RMD ->
  SOP/EOP), and asserts every key's served value in both regions equals the
correct merged
  result (region 1's newer write wins the conflict key).

Wiring this data path into VenicePushJob.run() as a first-class mode
(in-process batch read +
region-broker discovery) is the remaining productionization; the merge
correctness and
multi-region read are fully validated here.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sixpluszero sixpluszero changed the title [vpj] Snapshot-at-T: rewind-shortening knobs + offline RT-merge core [vpj] Snapshot-at-T: rewind knobs + offline RT-merge data plane Jun 10, 2026
…command,

end to end)

When the snapshot-at-T gate triggers, VenicePushJob.run() now produces the
merged version
itself instead of the data-writer job: it reads the offline batch input
in-process, reads each
region's RT (per snapshot.at.t.rt.region.brokers) up to the cutoff, merges
per key (value+RMD)
via SnapshotAtTRecordMerger, and produces the result through a single writer
that owns
SOP+data+EOP (so Data-Integrity-Validation stays consistent; the controller
skips SOP for this
mode).

- New config snapshot.at.t.rt.region.brokers ("coloId=broker;..." per region).
- SnapshotAtTSchemaRepository: a minimal ReadOnlySchemaRepository built from
the controller's
  value / RMD / derived schemas, so the merge runs with the store's real
schemas.
- Batch input read via VeniceAvroFileIterator; version data written via a
byte[] VeniceWriter
  with the store partitioner + chunking.

Integration test testSnapshotAtTViaVenicePushJob: a single VenicePushJob run
with
snapshot.at.t.enabled=true and a 0s threshold (so the mode triggers on the
store's 3600s
rewind) produces RT to two regions, merges with the batch Avro input, and
serves the correct
merged result in both regions. Passes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sixpluszero sixpluszero changed the title [vpj] Snapshot-at-T: rewind knobs + offline RT-merge data plane [vpj] Snapshot-at-T: new VPJ mode (rewind knobs + offline RT merge) Jun 10, 2026
sixpluszero and others added 3 commits June 10, 2026 11:35
…, fix CI

(spotbugs + coverage)

- Fail fast in the snapshot-at-T mode unless the store is hybrid AND
Active-Active enabled, rather
  than relying on auto-discovery of the RT topology.
- Compress merged values to the version's compression strategy before
producing, and carry the
  dictionary in the Start-Of-Push, so ZSTD_WITH_DICT stores work. The
single-command VPJ integration
  test now runs for both NO_OP and ZSTD_WITH_DICT.
- Remove an unused field flagged by SpotBugs (StaticAnalysis check).
- Add unit tests for SnapshotAtTPushExecutor, SnapshotAtTSchemaRepository,
and the region-broker
  config parser to satisfy diff-coverage.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…isolate

the merge

The test set a 0s threshold so the mode triggers, but it did not prove it
did: a normal push to this
hybrid A/A store would, via the server's 3600s rewind, merge the same RT and
serve the same values,
and the short rewind could even re-apply the RT and mask a broken merge. Now:

- Assert v2's rewind was overridden to the short snapshot window (well below
the store's 3600s),
  which only happens when the snapshot-at-T gate fires and the snapshot data
path runs.
- Age the RT past that short rewind window before the push, so the served
data reflects only the
  offline merge and is not silently fixed up by the server's post-EOP rewind.
A broken merge (RT not
  folded in) now fails the test instead of passing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…test

shard

New integration tests land in the catch-all IntegrationTests_99 bucket until
the next automated
rebalance. That bucket was already near its 15-minute job timeout from other
unassigned tests, so
adding this ~3-minute class tipped it over: the class itself passed (suite
SUCCESS in ~200s), but the
bucket was then cancelled at the timeout. Move it to a dedicated shard (86)
so bucket 99 stays under
the timeout and the test runs with headroom. A later rebalance_test_shards.py
run (with CI timing)
will fold it in optimally.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sixpluszero sixpluszero marked this pull request as ready for review June 10, 2026 22:20
Copilot AI review requested due to automatic review settings June 10, 2026 22:20

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new “snapshot-at-T” Venice Push Job (VPJ) mode intended to shorten hybrid store rewind time by (1) applying per-push rewind override knobs and (2) optionally performing an offline RT→batch merge that produces value + RMD so the version can come online after replaying only a short tail.

Changes:

  • Added per-VPJ config knobs and gating logic to apply a rewind-time override for hybrid full pushes (“snapshot-at-T” rewind shortening).
  • Added snapshot-at-T data-plane implementation: read batch input + read RT from multiple regions + merge via server MergeConflictResolver + write merged value+RMD with SOP/EOP.
  • Added unit/integration tests and CI sharding to cover the new mode.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
internal/venice-test-common/test-shard-assignments.json Adds new integration test shard assignment for snapshot-at-T integration tests.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/snapshot/SnapshotAtTPushIntegrationTest.java New multi-region end-to-end integration tests for snapshot-at-T merge + shortened rewind.
clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java Adds unit/integration-style tests for the rewind override gate and parsing region brokers.
clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTSchemaRepositoryTest.java New unit tests for the controller-backed schema repository used by the merge path.
clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTRecordMergerTest.java New unit tests for merge correctness (PUT/DELETE/update + RMD assertions).
clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTPushExecutorTest.java New unit tests for executing the merge and producing puts/deletes.
clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java Adds snapshot-at-T config keys + defaults and documents behavior.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java Implements gate + controller request wiring + snapshot-at-T merge push execution path.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTSchemaRepository.java New minimal schema repository loaded from controller responses for merge resolver usage.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTRtRecord.java New RT record normalization container used by reader/merger/executor.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTRtReader.java New RT reader that deterministically reads partition ranges and normalizes RT messages.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTRecordMerger.java New offline merge core using MergeConflictResolver to produce merged value + RMD.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTPushExecutor.java New executor that merges per key and writes merged records (put/delete with RMD).
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java Adds snapshot-at-T settings fields to PushJobSetting.
.github/workflows/VeniceCI-E2ETests.yml Adds a new integration test shard job (86) and wires it into the failure aggregation job.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +2810 to +2814
try (VeniceAvroFileIterator iterator = new VeniceAvroFileIterator(fs, status.getPath(), recordReader)) {
while (iterator.next()) {
batchValues.put(ByteBuffer.wrap(iterator.getCurrentKey()), ByteBuffer.wrap(iterator.getCurrentValue()));
}
}
Comment on lines +2699 to +2706
int separator = trimmed.indexOf('=');
if (separator <= 0) {
throw new VeniceException(
"Invalid " + SNAPSHOT_AT_T_RT_REGION_BROKERS + " entry '" + trimmed + "'; expected 'coloId=broker'.");
}
regionBrokers
.put(Integer.parseInt(trimmed.substring(0, separator).trim()), trimmed.substring(separator + 1).trim());
}
Comment on lines +49 to +57
MultiSchemaResponse valueResponse = controllerClient.getAllValueSchema(storeName);
if (valueResponse.isError()) {
throw new VeniceException(
"Failed to fetch value schemas for store " + storeName + ": " + valueResponse.getError());
}
for (MultiSchemaResponse.Schema schema: valueResponse.getSchemas()) {
valueSchemas.put(schema.getId(), new SchemaEntry(schema.getId(), schema.getSchemaStr()));
latestValueSchemaId = Math.max(latestValueSchemaId, schema.getId());
}
Comment on lines +59 to +81
Map<Long, RmdSchemaEntry> rmdSchemas = new HashMap<>();
int rmdVersionId = -1;
MultiSchemaResponse rmdResponse = controllerClient.getAllReplicationMetadataSchemas(storeName);
if (!rmdResponse.isError()) {
for (MultiSchemaResponse.Schema schema: rmdResponse.getSchemas()) {
rmdSchemas.put(
key(schema.getRmdValueSchemaId(), schema.getId()),
new RmdSchemaEntry(schema.getRmdValueSchemaId(), schema.getId(), schema.getSchemaStr()));
rmdVersionId = Math.max(rmdVersionId, schema.getId());
}
}

Map<Long, DerivedSchemaEntry> derivedSchemas = new HashMap<>();
MultiSchemaResponse derivedResponse = controllerClient.getAllValueAndDerivedSchema(storeName);
if (!derivedResponse.isError()) {
for (MultiSchemaResponse.Schema schema: derivedResponse.getSchemas()) {
if (schema.getDerivedSchemaId() > 0) {
derivedSchemas.put(
key(schema.getId(), schema.getDerivedSchemaId()),
new DerivedSchemaEntry(schema.getId(), schema.getDerivedSchemaId(), schema.getSchemaStr()));
}
}
}
Comment on lines +2660 to +2672
long nowSeconds = setting.jobStartTimeMs / 1000;
long cutoffEpochSeconds =
setting.snapshotAtTCutoffEpochSeconds == NOT_SET ? nowSeconds : setting.snapshotAtTCutoffEpochSeconds;
if (cutoffEpochSeconds > nowSeconds) {
throw new VeniceException(
String.format(
"Provided '%d' for %s; the snapshot-at-T cutoff cannot be a timestamp in the future (now=%ds).",
cutoffEpochSeconds,
SNAPSHOT_AT_T_CUTOFF_EPOCH_SECONDS,
nowSeconds));
}
long finalRewindSeconds = (nowSeconds - cutoffEpochSeconds) + setting.snapshotAtTRewindBufferSeconds;
setting.rewindTimeInSecondsOverride = finalRewindSeconds;
Comment on lines +2731 to +2739
Map<ByteBuffer, ByteBuffer> batchValues = readBatchInputForSnapshot();
String realTimeTopicName = Utils.getRealTimeTopicName(storeInfo);
long cutoffMs = setting.snapshotAtTCutoffEpochSeconds > 0 ? setting.snapshotAtTCutoffEpochSeconds * 1000 : 0L;
SnapshotAtTRtReader rtReader = new SnapshotAtTRtReader();
List<SnapshotAtTRtRecord> rtRecords = new ArrayList<>();
for (Map.Entry<Integer, String> region: setting.snapshotAtTRtRegionBrokers.entrySet()) {
rtRecords.addAll(
rtReader.readRegion(
snapshotConsumerProps(region.getValue()),
Comment on lines +129 to +137
consumer.unSubscribe(topicPartition);
if (consumed < target) {
LOGGER.warn(
"RT read of {} partition {} stopped early: consumed {} of {} expected records.",
rtTopicName,
partition,
consumed,
target);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants