Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/yb/cdc/cdc_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ namespace yb {

class MemTracker;

namespace xrepl {
class CDCSDKTabletMetrics;
} // namespace xrepl

namespace cdc {

struct SchemaDetails {
Expand All @@ -49,6 +53,14 @@ using consensus::HaveMoreMessages;
struct CDCThroughputMetrics {
uint64_t records_sent = 0;
uint64_t bytes_sent = 0;
// Per-tablet metrics for the CDCSDK stream/tablet pair. Non-owning. Null on the xCluster
// path or when metrics could not be acquired; observation sites must null-check before use
// (ScopedLatencyMetric handles this automatically).
xrepl::CDCSDKTabletMetrics* tablet_metrics = nullptr;
// Per-call accumulators populated inside the producer and emitted as histograms by the
// service layer at the end of the RPC.
uint64_t wal_records_read = 0;
uint64_t wal_bytes_read = 0;
};

using UpdateOnSplitOpFunc = std::function<Status(const consensus::ReplicateMsg&)>;
Expand Down
50 changes: 44 additions & 6 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,19 @@ void CDCServiceImpl::GetChanges(
CDCErrorPB::INTERNAL_ERROR, context);
StreamMetadata& record = *stream_meta_ptr;

// Acquire per-tablet CDCSDK metrics so the rest of GetChanges can record per-phase
// latencies and per-call batch sizes. Skipped for xCluster (different metric class) and for
// the sys catalog tablet (same guard used by UpdateTabletMetrics below). Total RPC latency
// is already covered by the server-wide handler_latency_yb_cdc_CDCService_GetChanges.
std::shared_ptr<xrepl::CDCSDKTabletMetrics> cdc_sdk_metrics;
if (record.GetSourceType() == CDCSDK &&
producer_tablet.tablet_id != master::kSysCatalogTabletId) {
auto metrics_result = GetCDCSDKTabletMetrics(*tablet_peer.get(), stream_id);
if (metrics_result.ok()) {
cdc_sdk_metrics = std::move(metrics_result.get());
}
}

// Polling sys catalog tablet is only supported for CDC.
RPC_CHECK_AND_RETURN_ERROR(
!tablet_peer->tablet_metadata()->IsSysCatalog() || record.GetSourceType() == CDCSDK,
Expand Down Expand Up @@ -1770,6 +1783,9 @@ void CDCServiceImpl::GetChanges(

// Get opId from request.
if (!GetFromOpId(req, &from_op_id, &cdc_sdk_from_op_id)) {
ScopedLatencyMetric<EventStats> scoped_last_checkpoint(
cdc_sdk_metrics ? cdc_sdk_metrics->cdcsdk_get_last_checkpoint_latency
: scoped_refptr<EventStats>{});
auto last_checkpoint = RPC_VERIFY_RESULT(
GetLastCheckpoint(producer_tablet, stream_meta_ptr.get()->GetSourceType(), false),
resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
Expand Down Expand Up @@ -1903,6 +1919,15 @@ void CDCServiceImpl::GetChanges(
getchanges_resp_max_size_bytes = req->getchanges_resp_max_size_bytes();
}

// Plumb the per-tablet metrics through to the producer for per-phase instrumentation.
throughput_metrics.tablet_metrics = cdc_sdk_metrics.get();

// Record pre-flight latency: everything from RPC entry up to the GetChangesForCDCSDK call.
if (cdc_sdk_metrics) {
cdc_sdk_metrics->cdcsdk_get_changes_preflight_latency->Increment(
MonoTime::Now().GetDeltaSince(start_time).ToMicroseconds());
}

status = GetChangesForCDCSDK(
stream_id, req->tablet_id(), cdc_sdk_from_op_id, record, tablet_peer, mem_tracker, enum_map,
composite_atts_map, req->cdcsdk_request_source(), client(), &msgs_holder, resp,
Expand Down Expand Up @@ -2089,12 +2114,17 @@ void CDCServiceImpl::GetChanges(
}

bool force_update = req->force_update_cdc_state_checkpoint() || snapshot_bootstrap;
RPC_STATUS_RETURN_ERROR(
UpdateCheckpointAndActiveTime(
producer_tablet, OpId::FromPB(resp->checkpoint().op_id()), commit_op_id,
last_record_hybrid_time, cdc_sdk_safe_time, record.GetSourceType(), force_update,
is_snapshot, snapshot_key, (is_snapshot && is_colocated) ? req->table_id() : ""),
resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
{
ScopedLatencyMetric<EventStats> scoped_update_checkpoint(
cdc_sdk_metrics ? cdc_sdk_metrics->cdcsdk_update_checkpoint_latency
: scoped_refptr<EventStats>{});
RPC_STATUS_RETURN_ERROR(
UpdateCheckpointAndActiveTime(
producer_tablet, OpId::FromPB(resp->checkpoint().op_id()), commit_op_id,
last_record_hybrid_time, cdc_sdk_safe_time, record.GetSourceType(), force_update,
is_snapshot, snapshot_key, (is_snapshot && is_colocated) ? req->table_id() : ""),
resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);
}
}

// TODO(#25632): Due to incorrect memory tracking in log cache, we accumulate a lot of untracked
Expand All @@ -2116,6 +2146,14 @@ void CDCServiceImpl::GetChanges(
UpdateTabletMetrics(
*resp, producer_tablet, tablet_peer, from_op_id, record, last_readable_index,
have_more_messages, throughput_metrics);

// Per-call CDCSDK batch-size histograms.
if (cdc_sdk_metrics) {
cdc_sdk_metrics->cdcsdk_wal_records_read->Increment(throughput_metrics.wal_records_read);
cdc_sdk_metrics->cdcsdk_wal_bytes_read->Increment(throughput_metrics.wal_bytes_read);
cdc_sdk_metrics->cdcsdk_response_records->Increment(resp->cdc_sdk_proto_records_size());
cdc_sdk_metrics->cdcsdk_response_bytes->Increment(resp->ByteSizeLong());
}
}

if (report_tablet_split) {
Expand Down
Loading
Loading