Skip to content

StreamWAL#11

Draft
jtanza wants to merge 14 commits into
masterfrom
jt/streamWAL-v1
Draft

StreamWAL#11
jtanza wants to merge 14 commits into
masterfrom
jt/streamWAL-v1

Conversation

@jtanza

@jtanza jtanza commented Jun 16, 2026

Copy link
Copy Markdown

Overall goal

This PR adds StreamWAL, a new per-tablet, leader-only tserver RPC that delivers fully-decoded, committed change events as a stream of CDCSDKProtoRecordPB — the same wire format the existing CDCSDK gRPC connector already consumes.

The client owns all stream state (a per-tablet (term, index) cursor); the server registers nothing. There is no cdc_state table, no stream IDs, no master-driven control plane, and no per-stream aggregation.

The data plane reuses the existing CDCSDK decoder family unchanged: transactional WRITE_OPs are skipped on the wire and the corresponding rows are emitted at APPLYING time by reading intents from IntentsDB, sandwiched between BEGIN/COMMIT envelopes stamped with commit_hybrid_time.

By default StreamWAL delivers records in WAL (apply) order. It also adds an optional, per-request consistent-commit-order mode that instead delivers committed records in commit-time order, watermark-gated, via a composite (term, index, commit_ht) cursor.

StreamWAL lives alongside GetChanges; no existing CDC code path is modified. The one cross-cutting server change is a new wall-clock intent-retention mechanism (gated behind a flag, default-off) that lets a checkpoint-less consumer keep just-applied intents readable.

These changes are broken into a number of sections:

Proto changes

  • Adds the StreamWAL RPC to the existing CDCService, three new top-level messages (StreamWalRequestPB, StreamWalResponsePB, StreamWalCursorPB
  • Two new optional fields on the existing CDCSDKProtoRecordPB (aborted_subtxn_set, split_tablet_request)
  • One new CDCErrorPB::Code value (INTENTS_GC_ERROR = 15).
  • Every change is a strictly-additive, backward-compatible proto edit; no existing field numbers or messages change.
  • Consistent-commit-order mode adds three optional, additive fields: StreamWalRequestPB.consistent_commit_order (bool, default false), StreamWalCursorPB.commit_ht (the commit-time frontier), and StreamWalResponsePB.resolution_safe_time (the per-tablet resolution watermark the batch gated on).

Files changed:

src/yb/cdc/cdc_service.proto

StreamWAL handler and control flow

  • Adds the CDCServiceImpl::StreamWAL handler.
  • The driver loop that reads ReplicateMsgs and dispatches each through the decoder, leader/safe-time resolution, INTENTS_GC_ERROR detection, and the partial-APPLYING batch/spill logic.
  • The supporting structs (StreamWalDecodeContext, StreamWalIntentResumeState, StreamWalDispatchResult) live in cdc_producer.h.
  • All net-new symbols; no existing handler is touched.
  • When consistent_commit_order=true, the handler forks early into CDCServiceImpl::HandleStreamWALConsistentCommitOrder and returns; the default (WAL-order) path is left entirely untouched.

Files changed:

src/yb/cdc/cdc_service.cc, src/yb/cdc/cdc_service.h, src/yb/cdc/cdc_producer.h

Decoder helpers reusing the CDCSDK pipeline

  • Adds the per-op StreamWAL dispatchers (DispatchWalOpForStreamWAL, DispatchApplyingForStreamWALImpl) and envelope builders (PopulateSyntheticBootstrapDDLs, PopulateStreamWalApplyingRecord, PopulateStreamWalSplitRecord, StampOpIdOnLeadingBeginForStreamWAL).
  • These are new callsites layered on top of the existing Populate* decoder family (PopulateCDCSDKWriteRecord, PopulateCDCSDKIntentRecord, DDL/truncate fillers), which are reused verbatim.
  • Purely additive the underlying decoders are unchanged, so subtxn-rollback filtering and schema resolution come for free.

Files changed:

src/yb/cdc/cdcsdk_producer.cc

Wall-clock intent retention

  • Introduces the runtime flag --intents_min_seconds_to_retain (default 0), a time-based parallel to --log_min_seconds_to_retain for IntentsDB, so a checkpoint-less consumer can read committed-but-just-applied intents without a per-stream lease barrier.
  • This is the one area that modifies existing logic rather than only adding: TransactionParticipant::Cleanup and the IntentsDB compaction filter now consult the commit hybrid time when deciding whether to GC a transaction's intents, and TransactionIdApplyOpIdMap in transaction.h changes from mapping to a bare OpId to a TransactionApplyOpIdInfo struct that also carries commit_ht.
  • All new retention behavior is gated behind the flag — with the default 0, every path falls through to existing behavior, so this is a no-op for non-CDC and lease-based CDC clusters.
  • The remaining files are additive helpers: RunningTransaction::SetApplyHybridTimes (stamps commit/log HT for single-batch applies), Tablet/docdb::CountTxnReverseIndexEntriesForCDC (distinguishes a real intent-GC from a zero-intent txn to avoid spurious INTENTS_GC_ERROR).

Files changed:

src/yb/tablet/transaction_participant.cc, src/yb/docdb/docdb_compaction_filter_intents.cc, src/yb/common/transaction.h, src/yb/tablet/running_transaction.cc, src/yb/tablet/running_transaction.h, src/yb/tablet/tablet.cc, src/yb/tablet/tablet.h, src/yb/docdb/docdb.cc, src/yb/docdb/docdb.h

Stream-less metadata bootstrap

  • Adds an idempotent initializer so StreamMetadata can be constructed for stream-id-less callers, letting StreamWAL reuse the existing decoder context without registering a stream.
  • Additive; existing stream-backed construction paths are unchanged.

Files changed:

src/yb/cdc/xrepl_stream_metadata.cc, src/yb/cdc/xrepl_stream_metadata.h

Consistent commit-order mode (optional, per-request)

  • Default StreamWAL ships records in WAL (apply) order. A multi-shard transaction's rows are emitted at its APPLYING WAL entry, which can materialize arbitrarily later than the commit, so commit_time can move backwards across consecutive records on a tablet.
  • With consistent_commit_order=true, the server delivers committed records in non-decreasing commit_time
    order, gated behind the per-tablet resolution watermark, using a composite cursor: (term, index) is the WAL
    re-read floor (and retention point) and commit_ht is the commit-time frontier (the server skips records with
    commit_time <= commit_ht on resume). Floor + frontier together discharge ordering and dedup with zero client
    state.
  • Adds GetConsistentChangesForStreamWAL (in cdcsdk_producer.cc) plus the StreamWalConsistentInput /
    StreamWalConsistentOutput structs (in cdc_producer.h). It mirrors the consistent branch of GetChangesForCDCSDK but emits via the existing StreamWAL decoder dispatch and packages the composite cursor.
  • Reuse without coupling. The mode is genuinely per-request and independent of the global
    FLAGS_cdc_enable_consistent_records gflag that governs the legacy GetChanges path.

Files changed:

src/yb/cdc/cdcsdk_producer.cc, src/yb/cdc/cdc_producer.h, src/yb/cdc/cdc_service.cc,

Java client bindings & leader-hint failover

  • Adds the streamWAL(...) client methods and the request/response wrappers.
  • TabletClient.decode gains a CDC-scoped branch that, on LEADER_NOT_READY, applies the tablet_consensus_info hint from the response to refresh the meta-cache leader pointer before retrying (RemoteTablet.applyLeaderHint), avoiding a master round-trip on leader failover mid-stream.
  • The hint path only fires for StreamWalRequest/StreamWalResponse pairs, so existing RPC dispatch is unaffected.

Files changed:

java/yb-client/src/main/java/org/yb/client/AsyncYBClient.java, YBClient.java, StreamWalRequest.java (new), StreamWalResponse.java (new), TabletClient.java

Server-side observability metrics

  • Adds six per-tablet StreamWAL metrics via a new StreamWALTabletMetrics container. Consistent with the
    stream-id-less design, these attach directly to the tablet's existing metric entity (not a separate
    per-stream entity like CDCSDKTabletMetrics/XClusterTabletMetrics), so they aggregate at the table level
  • Lazily instantiated on the first StreamWAL call for a tablet, so tablets that are never streamed cost nothing.
  • RPC-level latency / throughput / call-count for StreamWAL come for free from the auto-generated
    handler_latency_yb_cdc_CDCService_StreamWAL family (plus service_request_bytes /
    service_response_bytes), so no custom RPC metrics are added.
  • Additive: the only cdc_service.cc touchpoints are net-new — a lazy GetStreamWALTabletMetrics helper,
    streamwal_intents_gc_errors increments on the two INTENTS_GC_ERROR return paths, and the success-path
    gauge/counter updates before RespondSuccess(). No existing metric or handler is modified.

Metrics added:

  • streamwal_records_sent — counter (units), aggregated sum. Total decoded change records sent over
    StreamWAL for this tablet.
  • streamwal_traffic_sent — counter (bytes), aggregated sum. Total decoded record payload bytes sent over
    StreamWAL.
  • streamwal_sent_lag_micros — gauge (µs), aggregated max. Lag between the leader's safe time and the
    commit time of the last record sent.
  • streamwal_wal_lag_index — gauge (ops), aggregated max. WAL ops between the leader tip and the read
    cursor (leader_tip.index − next_op_id.index).
  • streamwal_intent_retention_window_secs — gauge (s), aggregated max. Current
    --intents_min_seconds_to_retain; lets dashboards derive intent-retention headroom as window − sent_lag.
    A true kMin "headroom" gauge is not expressible (only kSum/kMax aggregation functions exist), so the
    window is surfaced and headroom is computed against the kMax lag downstream.
  • streamwal_intents_gc_errors — counter (units), aggregated sum. Count of INTENTS_GC_ERROR responses
    (intents GC'd before StreamWAL could read them). Must always be zero; non-zero indicates data loss.

Files changed:

src/yb/cdc/xrepl_metrics.h, src/yb/cdc/xrepl_metrics.cc, src/yb/cdc/cdc_service.cc,
src/yb/cdc/stream_wal-test.cc

egladysh pushed a commit that referenced this pull request Jun 18, 2026
…tion

Summary:
## Problem

A READ COMMITTED retry triggered by a deadlock / abort could SIGSEGV at
`AfterTriggerEndSubXact` during the subsequent ROLLBACK.

Stack trace:

```
Signal: SIGSEGV
  #0  GetMemoryChunkContext(pointer=0x0)            memutils.h:141:12  (inlined)
  #1  pfree(pointer=0x0)                            mcxt.c:1500:26     (inlined)
  #2  AfterTriggerEndSubXact                        trigger.c:5657:4
  #3  AbortSubTransaction                           xact.c:5726:3
  #4  CommitTransactionCommand                      xact.c:3650:4
  #5  CommitTransactionCommand                      xact.c:0
  #6  yb_exec_simple_query_impl                     postgres.c:3023:3  (inlined finish_xact_command)
  #7  yb_exec_simple_query_impl                     postgres.c:1494:4
  #8  yb_exec_simple_query_impl                     postgres.c:5804:2
  #9  yb_exec_query_wrapper_one_attempt             postgres.c:5764:3
  #10 PostgresMain                                  postgres.c:5796:3
  #11 PostgresMain                                  postgres.c:5821:2  (inlined yb_exec_simple_query)
  yugabyte#12 PostgresMain                                  postgres.c:6623:8
```

### Reading the stack

- **#2 `AfterTriggerEndSubXact` (trigger.c:5657)** -- the call site is
  `pfree(afterTriggers.state)` in the abort branch, gated on
  `trans_stack[my_level].state != NULL`.

- **#3-#5 `CommitTransactionCommand` -> `AbortSubTransaction`** -- called
  because of ROLLBACK.

### Root cause

The retry path before this change ran, in order:

```
yb_restart_transaction
  -> YBCRestartWriteTransaction
       -> AfterTriggerEndXact(false)   // wipes trans_stack to NULL, maxtransdepth to 0
                                       //   and afterTriggers.state to NULL
       -> AfterTriggerBeginXact()
  -> RollbackAndReleaseCurrentSubTransaction
  -> YbBeginInternalSubTransactionForReadCommittedStatement
       -> AfterTriggerBeginSubXact
            -> MemoryContextAlloc(8 * sizeof(AfterTriggersTransData))
              // NOT AllocZero: leaves N-1 slots uninitialized
            -> initializes trans_stack[my_level] only
```

Any live subxact with level < `my_level` -- a user `SAVEPOINT` and the
per-statement RC internal subxact above it -- now points at an
uninitialized slot. ROLLBACK reads garbage as `state`, the non-NULL
gate passes, and `pfree(afterTriggers.state)` faults because `EndXact`
already cleared that field.

## Fix

`YBCRestartWriteTransaction` now rolls back every savepoint /
subtransaction before recreating the top-level write state, so the
PG-side `trans_stack` is empty by the time the surgical reset wipes
the after-trigger state.

Removed the now-redundant `RollbackAndReleaseCurrentSubTransaction()` from
the else branch of `yb_restart_transaction`.

Test Plan: Jenkins

Reviewers: pjain, smishra

Reviewed By: pjain

Subscribers: ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D54387
@jtanza jtanza force-pushed the jt/streamWAL-v1 branch from 1d007c3 to 86d7edb Compare June 24, 2026 18:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant