diff --git a/src/yb/cdc/cdc_producer.h b/src/yb/cdc/cdc_producer.h index eadd61fc862d..8439f7aad89f 100644 --- a/src/yb/cdc/cdc_producer.h +++ b/src/yb/cdc/cdc_producer.h @@ -34,6 +34,10 @@ namespace yb { class MemTracker; +namespace xrepl { +class CDCSDKTabletMetrics; +} // namespace xrepl + namespace cdc { struct SchemaDetails { @@ -49,6 +53,14 @@ using consensus::HaveMoreMessages; struct CDCThroughputMetrics { uint64_t records_sent = 0; uint64_t bytes_sent = 0; + // Per-tablet metrics for the CDCSDK stream/tablet pair. Non-owning. Null on the xCluster + // path or when metrics could not be acquired; observation sites must null-check before use + // (ScopedLatencyMetric handles this automatically). + xrepl::CDCSDKTabletMetrics* tablet_metrics = nullptr; + // Per-call accumulators populated inside the producer and emitted as histograms by the + // service layer at the end of the RPC. + uint64_t wal_records_read = 0; + uint64_t wal_bytes_read = 0; }; using UpdateOnSplitOpFunc = std::function; diff --git a/src/yb/cdc/cdc_service.cc b/src/yb/cdc/cdc_service.cc index 994e795c377c..4ef3bed8ad98 100644 --- a/src/yb/cdc/cdc_service.cc +++ b/src/yb/cdc/cdc_service.cc @@ -1698,6 +1698,19 @@ void CDCServiceImpl::GetChanges( CDCErrorPB::INTERNAL_ERROR, context); StreamMetadata& record = *stream_meta_ptr; + // Acquire per-tablet CDCSDK metrics so the rest of GetChanges can record per-phase + // latencies and per-call batch sizes. Skipped for xCluster (different metric class) and for + // the sys catalog tablet (same guard used by UpdateTabletMetrics below). Total RPC latency + // is already covered by the server-wide handler_latency_yb_cdc_CDCService_GetChanges. + std::shared_ptr cdc_sdk_metrics; + if (record.GetSourceType() == CDCSDK && + producer_tablet.tablet_id != master::kSysCatalogTabletId) { + auto metrics_result = GetCDCSDKTabletMetrics(*tablet_peer.get(), stream_id); + if (metrics_result.ok()) { + cdc_sdk_metrics = std::move(metrics_result.get()); + } + } + // Polling sys catalog tablet is only supported for CDC. RPC_CHECK_AND_RETURN_ERROR( !tablet_peer->tablet_metadata()->IsSysCatalog() || record.GetSourceType() == CDCSDK, @@ -1770,6 +1783,9 @@ void CDCServiceImpl::GetChanges( // Get opId from request. if (!GetFromOpId(req, &from_op_id, &cdc_sdk_from_op_id)) { + ScopedLatencyMetric scoped_last_checkpoint( + cdc_sdk_metrics ? cdc_sdk_metrics->cdcsdk_get_last_checkpoint_latency + : scoped_refptr{}); auto last_checkpoint = RPC_VERIFY_RESULT( GetLastCheckpoint(producer_tablet, stream_meta_ptr.get()->GetSourceType(), false), resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); @@ -1903,6 +1919,15 @@ void CDCServiceImpl::GetChanges( getchanges_resp_max_size_bytes = req->getchanges_resp_max_size_bytes(); } + // Plumb the per-tablet metrics through to the producer for per-phase instrumentation. + throughput_metrics.tablet_metrics = cdc_sdk_metrics.get(); + + // Record pre-flight latency: everything from RPC entry up to the GetChangesForCDCSDK call. + if (cdc_sdk_metrics) { + cdc_sdk_metrics->cdcsdk_get_changes_preflight_latency->Increment( + MonoTime::Now().GetDeltaSince(start_time).ToMicroseconds()); + } + status = GetChangesForCDCSDK( stream_id, req->tablet_id(), cdc_sdk_from_op_id, record, tablet_peer, mem_tracker, enum_map, composite_atts_map, req->cdcsdk_request_source(), client(), &msgs_holder, resp, @@ -2089,12 +2114,17 @@ void CDCServiceImpl::GetChanges( } bool force_update = req->force_update_cdc_state_checkpoint() || snapshot_bootstrap; - RPC_STATUS_RETURN_ERROR( - UpdateCheckpointAndActiveTime( - producer_tablet, OpId::FromPB(resp->checkpoint().op_id()), commit_op_id, - last_record_hybrid_time, cdc_sdk_safe_time, record.GetSourceType(), force_update, - is_snapshot, snapshot_key, (is_snapshot && is_colocated) ? req->table_id() : ""), - resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); + { + ScopedLatencyMetric scoped_update_checkpoint( + cdc_sdk_metrics ? cdc_sdk_metrics->cdcsdk_update_checkpoint_latency + : scoped_refptr{}); + RPC_STATUS_RETURN_ERROR( + UpdateCheckpointAndActiveTime( + producer_tablet, OpId::FromPB(resp->checkpoint().op_id()), commit_op_id, + last_record_hybrid_time, cdc_sdk_safe_time, record.GetSourceType(), force_update, + is_snapshot, snapshot_key, (is_snapshot && is_colocated) ? req->table_id() : ""), + resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); + } } // TODO(#25632): Due to incorrect memory tracking in log cache, we accumulate a lot of untracked @@ -2116,6 +2146,14 @@ void CDCServiceImpl::GetChanges( UpdateTabletMetrics( *resp, producer_tablet, tablet_peer, from_op_id, record, last_readable_index, have_more_messages, throughput_metrics); + + // Per-call CDCSDK batch-size histograms. + if (cdc_sdk_metrics) { + cdc_sdk_metrics->cdcsdk_wal_records_read->Increment(throughput_metrics.wal_records_read); + cdc_sdk_metrics->cdcsdk_wal_bytes_read->Increment(throughput_metrics.wal_bytes_read); + cdc_sdk_metrics->cdcsdk_response_records->Increment(resp->cdc_sdk_proto_records_size()); + cdc_sdk_metrics->cdcsdk_response_bytes->Increment(resp->ByteSizeLong()); + } } if (report_tablet_split) { diff --git a/src/yb/cdc/cdcsdk_producer.cc b/src/yb/cdc/cdcsdk_producer.cc index 72f63ac5ccbd..5a1cef592f48 100644 --- a/src/yb/cdc/cdcsdk_producer.cc +++ b/src/yb/cdc/cdcsdk_producer.cc @@ -12,6 +12,7 @@ #include "yb/cdc/cdc_producer.h" +#include "yb/cdc/xrepl_metrics.h" #include "yb/cdc/xrepl_stream_metadata.h" #include "yb/client/client.h" @@ -50,6 +51,7 @@ #include "yb/tablet/transaction_participant.h" #include "yb/util/logging.h" +#include "yb/util/metrics.h" #include "yb/util/scope_exit.h" #include "yb/util/status.h" #include "yb/util/status_format.h" @@ -122,6 +124,27 @@ using dockv::PrimitiveValue; using dockv::SchemaPackingStorage; namespace { + +// Returns the latency event-stats pointer if both throughput_metrics and its tablet_metrics +// are populated, else an empty scoped_refptr (ScopedLatencyMetric handles null as a no-op). +inline scoped_refptr GetCDCSDKLatency( + CDCThroughputMetrics* tm, + scoped_refptr xrepl::CDCSDKTabletMetrics::*member) { + if (!tm || !tm->tablet_metrics) { + return {}; + } + return tm->tablet_metrics->*member; +} + +inline void IncrementCDCSDKCounter( + CDCThroughputMetrics* tm, + scoped_refptr xrepl::CDCSDKTabletMetrics::*member) { + if (!tm || !tm->tablet_metrics) { + return; + } + IncrementCounter(tm->tablet_metrics->*member); +} + YB_DEFINE_ENUM(OpType, (INSERT)(UPDATE)(DELETE)); Result IsPackedRowUpdate(const dockv::ValueEntryType value_type, Slice value_slice) { @@ -805,7 +828,11 @@ Result GetOrPopulateRequiredSchemaDetails( continue; } - auto result = client->GetTableSchemaFromSysCatalog(cur_table_id, read_hybrid_time); + auto result = ([&]() { + ScopedLatencyMetric sl(GetCDCSDKLatency( + throughput_metrics, &xrepl::CDCSDKTabletMetrics::cdcsdk_schema_lookup_latency)); + return client->GetTableSchemaFromSysCatalog(cur_table_id, read_hybrid_time); + })(); // Failed to get specific schema version from the system catalog, use the latest // schema version for the key-value decoding. if (!result.ok()) { @@ -1840,10 +1867,17 @@ Status ProcessIntents( TEST_SYNC_POINT("AddBeginRecord::End"); } - RETURN_NOT_OK(tablet->GetIntentsForCDC(transaction_id, aborted, keyValueIntents, - stream_state)); + { + ScopedLatencyMetric sl(GetCDCSDKLatency( + throughput_metrics, &xrepl::CDCSDKTabletMetrics::cdcsdk_get_intents_latency)); + RETURN_NOT_OK(tablet->GetIntentsForCDC(transaction_id, aborted, keyValueIntents, + stream_state)); + } VLOG(1) << "The size of intentKeyValues for transaction id: " << transaction_id << ", with apply record op_id : " << op_id << ", is: " << (*keyValueIntents).size(); + IncrementStats(GetCDCSDKLatency( + throughput_metrics, &xrepl::CDCSDKTabletMetrics::cdcsdk_intents_per_txn), + static_cast(keyValueIntents->size())); const OpId& checkpoint_op_id = tablet_peer->GetLatestCheckPoint(); if ((*keyValueIntents).size() == 0 && op_id <= checkpoint_op_id) { @@ -2114,7 +2148,8 @@ Status GetConsistentWALRecords( const int64_t& safe_hybrid_time_req, const CoarseTimePoint& deadline, std::vector>* consistent_wal_records, std::vector>* all_checkpoints, - HybridTime* last_read_wal_op_record_time, bool* is_entire_wal_read) { + HybridTime* last_read_wal_op_record_time, bool* is_entire_wal_read, + CDCThroughputMetrics* throughput_metrics) { VLOG(2) << "Getting consistent WAL records. safe_hybrid_time_req: " << safe_hybrid_time_req << ", consistent_safe_time: " << *consistent_safe_time << ", last_seen_op_id: " << last_seen_op_id->ToString() @@ -2150,6 +2185,10 @@ Status GetConsistentWALRecords( if (read_ops.read_from_disk_size && mem_tracker) { (*consumption) = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size); } + if (throughput_metrics) { + throughput_metrics->wal_bytes_read += read_ops.read_from_disk_size; + throughput_metrics->wal_records_read += read_ops.messages.size(); + } for (const auto& msg : read_ops.messages) { last_seen_op_id->term = msg->id().term(); @@ -2250,7 +2289,8 @@ Status GetWALRecords( uint64_t consistent_safe_time, OpId* last_seen_op_id, int64_t** last_readable_opid_index, const int64_t& safe_hybrid_time, const CoarseTimePoint& deadline, bool skip_intents, std::vector>* wal_records, - std::vector>* all_checkpoints) { + std::vector>* all_checkpoints, + CDCThroughputMetrics* throughput_metrics) { auto consensus = VERIFY_RESULT(tablet_peer->GetConsensus()); auto read_ops = VERIFY_RESULT(consensus->ReadReplicatedMessagesForCDC( *last_seen_op_id, *last_readable_opid_index, deadline)); @@ -2265,6 +2305,10 @@ Status GetWALRecords( if (read_ops.read_from_disk_size && mem_tracker) { (*consumption) = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size); } + if (throughput_metrics) { + throughput_metrics->wal_bytes_read += read_ops.read_from_disk_size; + throughput_metrics->wal_records_read += read_ops.messages.size(); + } for (const auto& msg : read_ops.messages) { last_seen_op_id->term = msg->id().term(); @@ -2680,6 +2724,9 @@ Status GetChangesForCDCSDK( auto scope_exit = ScopeExit([&] { docdb::DeleteMemoryContextForCDCWrapper(); }); DCHECK(throughput_metrics); + // Per-tablet metric pointer used by the ScopedLatencyMetric wrappers below. May be null on + // the xCluster path or when metric acquisition failed; ScopedLatencyMetric handles null. + xrepl::CDCSDKTabletMetrics* tm = throughput_metrics->tablet_metrics; auto op_id = OpId::FromPB(from_op_id); VLOG(1) << "GetChanges request has from_op_id: " << AsString(from_op_id) << ", safe_hybrid_time: " << safe_hybrid_time_req @@ -2703,9 +2750,14 @@ Status GetChangesForCDCSDK( } else { leader_safe_time = *leader_safe_time_result; } - uint64_t consistent_stream_safe_time = VERIFY_RESULT(GetConsistentStreamSafeTime( - tablet_peer, tablet_ptr, leader_safe_time, safe_hybrid_time_req, deadline, - &txn_load_in_progress)); + uint64_t consistent_stream_safe_time; + { + ScopedLatencyMetric scoped_safe_time( + tm ? tm->cdcsdk_safe_time_wait_latency : scoped_refptr{}); + consistent_stream_safe_time = VERIFY_RESULT(GetConsistentStreamSafeTime( + tablet_peer, tablet_ptr, leader_safe_time, safe_hybrid_time_req, deadline, + &txn_load_in_progress)); + } OpId historical_max_op_id = tablet_ptr->transaction_participant() ? tablet_ptr->transaction_participant()->GetHistoricalMaxOpId() : OpId::Invalid(); @@ -2739,19 +2791,23 @@ Status GetChangesForCDCSDK( std::vector> wal_records, all_checkpoints; DCHECK(last_readable_opid_index); - if (FLAGS_cdc_enable_consistent_records) - RETURN_NOT_OK(GetConsistentWALRecords( - tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time, - historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index, - safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints, - &last_read_wal_op_record_time, &is_entire_wal_read)); - else - // 'skip_intents' is true here because we want the first transaction to be the partially - // streamed transaction. - RETURN_NOT_OK(GetWALRecords( - tablet_peer, mem_tracker, msgs_holder, &consumption, consistent_stream_safe_time, - &last_seen_op_id, &last_readable_opid_index, safe_hybrid_time_req, deadline, true, - &wal_records, &all_checkpoints)); + { + ScopedLatencyMetric scoped_wal_read( + tm ? tm->cdcsdk_wal_read_latency : scoped_refptr{}); + if (FLAGS_cdc_enable_consistent_records) + RETURN_NOT_OK(GetConsistentWALRecords( + tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time, + historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index, + safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints, + &last_read_wal_op_record_time, &is_entire_wal_read, throughput_metrics)); + else + // 'skip_intents' is true here because we want the first transaction to be the partially + // streamed transaction. + RETURN_NOT_OK(GetWALRecords( + tablet_peer, mem_tracker, msgs_holder, &consumption, consistent_stream_safe_time, + &last_seen_op_id, &last_readable_opid_index, safe_hybrid_time_req, deadline, true, + &wal_records, &all_checkpoints, throughput_metrics)); + } have_more_messages = HaveMoreMessages(true); @@ -2797,11 +2853,15 @@ Status GetChangesForCDCSDK( wal_records[wal_segment_index]->transaction_state().aborted().set())) : SubtxnSet(); - RETURN_NOT_OK(ProcessIntentsWithInvalidSchemaRetry( - op_id, transaction_id, xrepl_origin_id, aborted_subtxns, stream_metadata, - enum_oid_label_map, composite_atts_map, request_source, resp, &consumption, &checkpoint, - tablet_peer, &keyValueIntents, &stream_state, client, cached_schema_details, - schema_packing_storages, commit_timestamp, throughput_metrics)); + { + ScopedLatencyMetric sl(GetCDCSDKLatency( + throughput_metrics, &xrepl::CDCSDKTabletMetrics::cdcsdk_process_intents_latency)); + RETURN_NOT_OK(ProcessIntentsWithInvalidSchemaRetry( + op_id, transaction_id, xrepl_origin_id, aborted_subtxns, stream_metadata, + enum_oid_label_map, composite_atts_map, request_source, resp, &consumption, + &checkpoint, tablet_peer, &keyValueIntents, &stream_state, client, + cached_schema_details, schema_packing_storages, commit_timestamp, throughput_metrics)); + } if (checkpoint.write_id() == 0 && checkpoint.key().empty() && wal_records.size()) { AcknowledgeStreamedMultiShardTxn( @@ -2848,9 +2908,13 @@ Status GetChangesForCDCSDK( do { size_t next_checkpoint_index = 0; - consistent_stream_safe_time = VERIFY_RESULT(GetConsistentStreamSafeTime( - tablet_peer, tablet_ptr, leader_safe_time, safe_hybrid_time_req, deadline, - &txn_load_in_progress)); + { + ScopedLatencyMetric scoped_safe_time( + tm ? tm->cdcsdk_safe_time_wait_latency : scoped_refptr{}); + consistent_stream_safe_time = VERIFY_RESULT(GetConsistentStreamSafeTime( + tablet_peer, tablet_ptr, leader_safe_time, safe_hybrid_time_req, deadline, + &txn_load_in_progress)); + } if (txn_load_in_progress) { LOG(INFO) << "Loading of transactions is in progress for tablet: " << tablet_id @@ -2859,19 +2923,24 @@ Status GetChangesForCDCSDK( } DCHECK(last_readable_opid_index); - if (FLAGS_cdc_enable_consistent_records) - RETURN_NOT_OK(GetConsistentWALRecords( - tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time, - historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index, - safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints, - &last_read_wal_op_record_time, &is_entire_wal_read)); - else - // 'skip_intents' is false otherwise in case the complete wal segment is filled with - // intents we will break the loop thinking that WAL has no more records. - RETURN_NOT_OK(GetWALRecords( - tablet_peer, mem_tracker, msgs_holder, &consumption, consistent_stream_safe_time, - &last_seen_op_id, &last_readable_opid_index, safe_hybrid_time_req, deadline, false, - &wal_records, &all_checkpoints)); + { + ScopedLatencyMetric scoped_wal_read( + tm ? tm->cdcsdk_wal_read_latency : scoped_refptr{}); + if (FLAGS_cdc_enable_consistent_records) + RETURN_NOT_OK(GetConsistentWALRecords( + tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time, + historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, + *last_readable_opid_index, safe_hybrid_time_req, deadline, &wal_records, + &all_checkpoints, &last_read_wal_op_record_time, &is_entire_wal_read, + throughput_metrics)); + else + // 'skip_intents' is false otherwise in case the complete wal segment is filled with + // intents we will break the loop thinking that WAL has no more records. + RETURN_NOT_OK(GetWALRecords( + tablet_peer, mem_tracker, msgs_holder, &consumption, consistent_stream_safe_time, + &last_seen_op_id, &last_readable_opid_index, safe_hybrid_time_req, deadline, false, + &wal_records, &all_checkpoints, throughput_metrics)); + } if (wait_for_wal_update) { VLOG_WITH_FUNC(1) @@ -2957,6 +3026,8 @@ Status GetChangesForCDCSDK( switch (msg->op_type()) { case consensus::OperationType::UPDATE_TRANSACTION_OP: + IncrementCDCSDKCounter( + throughput_metrics, &xrepl::CDCSDKTabletMetrics::cdcsdk_update_txn_ops_seen); // Ignore intents. // Read from IntentDB after they have been applied. if (msg->transaction_state().status() == TransactionStatus::APPLYING) { @@ -2995,12 +3066,17 @@ Status GetChangesForCDCSDK( << msg->id().ShortDebugString() << ", tablet_id: " << tablet_id << ", transaction_id: " << txn_id << ", commit_time: " << *commit_timestamp; - RETURN_NOT_OK(ProcessIntentsWithInvalidSchemaRetry( - op_id, txn_id, xrepl_origin_id, aborted_subtxns, stream_metadata, - enum_oid_label_map, composite_atts_map, request_source, resp, &consumption, - &checkpoint, tablet_peer, &intents, &new_stream_state, client, - cached_schema_details, schema_packing_storages, *commit_timestamp, - throughput_metrics)); + { + ScopedLatencyMetric sl(GetCDCSDKLatency( + throughput_metrics, + &xrepl::CDCSDKTabletMetrics::cdcsdk_process_intents_latency)); + RETURN_NOT_OK(ProcessIntentsWithInvalidSchemaRetry( + op_id, txn_id, xrepl_origin_id, aborted_subtxns, stream_metadata, + enum_oid_label_map, composite_atts_map, request_source, resp, &consumption, + &checkpoint, tablet_peer, &intents, &new_stream_state, client, + cached_schema_details, schema_packing_storages, *commit_timestamp, + throughput_metrics)); + } streamed_txns.insert(txn_id.ToString()); if (new_stream_state.write_id != 0 && !new_stream_state.key.empty()) { @@ -3022,6 +3098,8 @@ Status GetChangesForCDCSDK( break; case consensus::OperationType::WRITE_OP: { + IncrementCDCSDKCounter( + throughput_metrics, &xrepl::CDCSDKTabletMetrics::cdcsdk_write_ops_seen); const auto& batch = msg->write().write_batch(); *commit_timestamp = HybridTime::FromPB(msg->hybrid_time()); @@ -3030,6 +3108,9 @@ Status GetChangesForCDCSDK( << ", hybrid_time: " << *commit_timestamp; if (!batch.has_transaction()) { + ScopedLatencyMetric sl(GetCDCSDKLatency( + throughput_metrics, + &xrepl::CDCSDKTabletMetrics::cdcsdk_populate_write_record_latency)); RETURN_NOT_OK(PopulateCDCSDKWriteRecordWithInvalidSchemaRetry( msg, stream_metadata, tablet_peer, enum_oid_label_map, composite_atts_map, request_source, cached_schema_details, schema_packing_storages, resp, client, @@ -3044,6 +3125,8 @@ Status GetChangesForCDCSDK( } break; case consensus::OperationType::CHANGE_METADATA_OP: { + IncrementCDCSDKCounter( + throughput_metrics, &xrepl::CDCSDKTabletMetrics::cdcsdk_change_metadata_ops_seen); VLOG(3) << "Will stream a DDL record. " << msg->ShortDebugString(); RETURN_NOT_OK(SchemaFromPB( msg->change_metadata_request().schema().ToGoogleProtobuf(), ¤t_schema)); @@ -3070,7 +3153,12 @@ Status GetChangesForCDCSDK( .schema_version = msg->change_metadata_request().schema_version(), .schema = std::make_shared(current_schema)}; changed_schema_version = msg->change_metadata_request().schema_version(); - auto result = client->GetTableSchemaFromSysCatalog(table_id, msg->hybrid_time()); + auto result = ([&]() { + ScopedLatencyMetric sl(GetCDCSDKLatency( + throughput_metrics, + &xrepl::CDCSDKTabletMetrics::cdcsdk_schema_lookup_latency)); + return client->GetTableSchemaFromSysCatalog(table_id, msg->hybrid_time()); + })(); if (!result.ok()) { LOG(WARNING) << "Failed to get the specific schema version from system catalog for table: " @@ -3119,6 +3207,8 @@ Status GetChangesForCDCSDK( } break; case consensus::OperationType::TRUNCATE_OP: { + IncrementCDCSDKCounter( + throughput_metrics, &xrepl::CDCSDKTabletMetrics::cdcsdk_truncate_ops_seen); if (FLAGS_stream_truncate_record) { RETURN_NOT_OK(PopulateCDCSDKTruncateRecord( msg, resp->add_cdc_sdk_proto_records(), current_schema, throughput_metrics)); @@ -3134,6 +3224,8 @@ Status GetChangesForCDCSDK( } break; case yb::consensus::OperationType::SPLIT_OP: { + IncrementCDCSDKCounter( + throughput_metrics, &xrepl::CDCSDKTabletMetrics::cdcsdk_split_ops_seen); const TableId& table_id = tablet_ptr->metadata()->table_id(); auto op_id = OpId::FromPB(msg->id()); diff --git a/src/yb/cdc/xrepl_metrics.cc b/src/yb/cdc/xrepl_metrics.cc index d2cc4dbfee16..d468b402a21c 100644 --- a/src/yb/cdc/xrepl_metrics.cc +++ b/src/yb/cdc/xrepl_metrics.cc @@ -148,6 +148,92 @@ METRIC_DEFINE_gauge_uint64(cdcsdk, cdcsdk_flush_lag, "CDCSDK flush Lag", "Lag between last committed record in the WAL and the replication slot's restart time.", {0 /* zero means we don't expose it as counter */, yb::AggregationFunction::kMax}); +// CDCSDK per-phase latency histograms for the GetChanges RPC. +// (Total RPC latency is covered by the server-wide handler_latency_yb_cdc_CDCService_GetChanges.) +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_get_changes_preflight_latency, + "CDCSDK GetChanges preflight latency", yb::MetricUnit::kMicroseconds, + "Time (microseconds) spent before entering GetChangesForCDCSDK: semaphore, validation, " + "stream/tablet/leader lookup, schema/enum/composite caches."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_get_last_checkpoint_latency, + "CDCSDK GetLastCheckpoint latency", yb::MetricUnit::kMicroseconds, + "Time (microseconds) spent in GetLastCheckpoint (cdc_state read) when the client sends no " + "from_op_id."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_update_checkpoint_latency, + "CDCSDK UpdateCheckpoint latency", yb::MetricUnit::kMicroseconds, + "Time (microseconds) spent in UpdateCheckpointAndActiveTime (cdc_state write) on " + "EXPLICIT ack."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_wal_read_latency, + "CDCSDK WAL read latency", yb::MetricUnit::kMicroseconds, + "Time (microseconds) spent reading WAL records during a single GetChanges call."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_process_intents_latency, + "CDCSDK ProcessIntents latency", yb::MetricUnit::kMicroseconds, + "Time (microseconds) spent in ProcessIntents (intent fetch + record population) per " + "multi-shard transaction."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_get_intents_latency, + "CDCSDK GetIntentsForCDC latency", yb::MetricUnit::kMicroseconds, + "Time (microseconds) spent in tablet->GetIntentsForCDC alone (IntentsDB read inside " + "ProcessIntents)."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_populate_write_record_latency, + "CDCSDK PopulateWriteRecord latency", yb::MetricUnit::kMicroseconds, + "Time (microseconds) spent in PopulateCDCSDKWriteRecord for single-shard writes."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_schema_lookup_latency, + "CDCSDK schema lookup latency", yb::MetricUnit::kMicroseconds, + "Time (microseconds) spent in client->GetTableSchemaFromSysCatalog (master round-trips)."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_safe_time_wait_latency, + "CDCSDK safe time wait latency", yb::MetricUnit::kMicroseconds, + "Time (microseconds) spent in GetConsistentStreamSafeTime (waiting for safe time / txn " + "load)."); + +// CDCSDK per-call batch-size histograms (one observation per successful GetChanges). +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_wal_records_read, + "CDCSDK WAL records read per call", yb::MetricUnit::kEntries, + "Number of WAL records read during a single GetChanges call."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_wal_bytes_read, + "CDCSDK WAL bytes read per call", yb::MetricUnit::kBytes, + "Total bytes read from disk for WAL records during a single GetChanges call."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_intents_per_txn, + "CDCSDK intents per transaction", yb::MetricUnit::kEntries, + "Number of intent key/value pairs read for a single multi-shard transaction."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_response_records, + "CDCSDK response records per call", yb::MetricUnit::kEntries, + "Number of CDC SDK proto records emitted in a single GetChanges response."); + +METRIC_DEFINE_event_stats(cdcsdk, cdcsdk_response_bytes, + "CDCSDK response bytes per call", yb::MetricUnit::kBytes, + "Serialized size in bytes of a single GetChanges response."); + +// CDCSDK per-optype counters incremented in the GetChangesForCDCSDK main loop. +METRIC_DEFINE_counter(cdcsdk, cdcsdk_write_ops_seen, + "CDCSDK WRITE_OP records seen", yb::MetricUnit::kEntries, + "Number of WRITE_OP WAL records processed by GetChangesForCDCSDK."); + +METRIC_DEFINE_counter(cdcsdk, cdcsdk_update_txn_ops_seen, + "CDCSDK UPDATE_TRANSACTION_OP records seen", yb::MetricUnit::kEntries, + "Number of UPDATE_TRANSACTION_OP WAL records processed by GetChangesForCDCSDK."); + +METRIC_DEFINE_counter(cdcsdk, cdcsdk_change_metadata_ops_seen, + "CDCSDK CHANGE_METADATA_OP records seen", yb::MetricUnit::kEntries, + "Number of CHANGE_METADATA_OP WAL records processed by GetChangesForCDCSDK."); + +METRIC_DEFINE_counter(cdcsdk, cdcsdk_truncate_ops_seen, + "CDCSDK TRUNCATE_OP records seen", yb::MetricUnit::kEntries, + "Number of TRUNCATE_OP WAL records processed by GetChangesForCDCSDK."); + +METRIC_DEFINE_counter(cdcsdk, cdcsdk_split_ops_seen, + "CDCSDK SPLIT_OP records seen", yb::MetricUnit::kEntries, + "Number of SPLIT_OP WAL records processed by GetChangesForCDCSDK."); + // CDC Server Metrics METRIC_DEFINE_counter(server, cdc_rpc_proxy_count, "CDC Rpc Proxy Count", yb::MetricUnit::kRequests, "Number of CDC GetChanges requests that required proxy forwarding"); @@ -198,6 +284,25 @@ CDCSDKTabletMetrics::CDCSDKTabletMetrics(const scoped_refptr& enti GINIT(cdcsdk_expiry_time_ms), GINIT(cdcsdk_last_sent_physicaltime), GINIT(cdcsdk_flush_lag), + MINIT(cdcsdk_get_changes_preflight_latency), + MINIT(cdcsdk_get_last_checkpoint_latency), + MINIT(cdcsdk_update_checkpoint_latency), + MINIT(cdcsdk_wal_read_latency), + MINIT(cdcsdk_process_intents_latency), + MINIT(cdcsdk_get_intents_latency), + MINIT(cdcsdk_populate_write_record_latency), + MINIT(cdcsdk_schema_lookup_latency), + MINIT(cdcsdk_safe_time_wait_latency), + MINIT(cdcsdk_wal_records_read), + MINIT(cdcsdk_wal_bytes_read), + MINIT(cdcsdk_intents_per_txn), + MINIT(cdcsdk_response_records), + MINIT(cdcsdk_response_bytes), + MINIT(cdcsdk_write_ops_seen), + MINIT(cdcsdk_update_txn_ops_seen), + MINIT(cdcsdk_change_metadata_ops_seen), + MINIT(cdcsdk_truncate_ops_seen), + MINIT(cdcsdk_split_ops_seen), entity_(entity) {} void CDCSDKTabletMetrics::ClearMetrics() { @@ -207,6 +312,12 @@ void CDCSDKTabletMetrics::ClearMetrics() { cdcsdk_expiry_time_ms->set_value(0); cdcsdk_last_sent_physicaltime->set_value(0); cdcsdk_flush_lag->set_value(0); + cdcsdk_write_ops_seen.reset(); + cdcsdk_update_txn_ops_seen.reset(); + cdcsdk_change_metadata_ops_seen.reset(); + cdcsdk_truncate_ops_seen.reset(); + cdcsdk_split_ops_seen.reset(); + // EventStats are reset implicitly when the entity is re-created; no per-stat reset needed. } Result CDCSDKTabletMetrics::TEST_GetAttribute(const std::string& key) const { diff --git a/src/yb/cdc/xrepl_metrics.h b/src/yb/cdc/xrepl_metrics.h index e5a3dd54d73d..b3b3909241d3 100644 --- a/src/yb/cdc/xrepl_metrics.h +++ b/src/yb/cdc/xrepl_metrics.h @@ -110,6 +110,42 @@ class CDCSDKTabletMetrics { // Lag between last committed record in the WAL and replication slot's restart time. scoped_refptr> cdcsdk_flush_lag; + // Per-phase latency histograms for the GetChanges RPC on the CDCSDK path. + // (Total RPC latency is covered by the server-wide handler_latency_yb_cdc_CDCService_GetChanges.) + // Time spent before entering GetChangesForCDCSDK: semaphore, validation, stream/tablet/leader + // lookup, schema/enum/composite cache. + scoped_refptr cdcsdk_get_changes_preflight_latency; + // Time spent in GetLastCheckpoint (cdc_state read) when the client sends no from_op_id. + scoped_refptr cdcsdk_get_last_checkpoint_latency; + // Time spent in UpdateCheckpointAndActiveTime (cdc_state write) on EXPLICIT ack. + scoped_refptr cdcsdk_update_checkpoint_latency; + // Time spent reading WAL records (GetConsistentWALRecords / GetWALRecords). + scoped_refptr cdcsdk_wal_read_latency; + // Time in ProcessIntentsWithInvalidSchemaRetry (intent fetch + record population). + scoped_refptr cdcsdk_process_intents_latency; + // Time in tablet->GetIntentsForCDC alone (the IntentsDB read inside ProcessIntents). + scoped_refptr cdcsdk_get_intents_latency; + // Time in PopulateCDCSDKWriteRecordWithInvalidSchemaRetry for single-shard writes. + scoped_refptr cdcsdk_populate_write_record_latency; + // Time in client->GetTableSchemaFromSysCatalog (master round-trips). + scoped_refptr cdcsdk_schema_lookup_latency; + // Time in GetConsistentStreamSafeTime (waiting for safe time / txn load). + scoped_refptr cdcsdk_safe_time_wait_latency; + + // Per-call batch-size histograms (observed once per successful GetChanges). + scoped_refptr cdcsdk_wal_records_read; + scoped_refptr cdcsdk_wal_bytes_read; + scoped_refptr cdcsdk_intents_per_txn; + scoped_refptr cdcsdk_response_records; + scoped_refptr cdcsdk_response_bytes; + + // Per-optype counters incremented in the GetChangesForCDCSDK main loop. + scoped_refptr cdcsdk_write_ops_seen; + scoped_refptr cdcsdk_update_txn_ops_seen; + scoped_refptr cdcsdk_change_metadata_ops_seen; + scoped_refptr cdcsdk_truncate_ops_seen; + scoped_refptr cdcsdk_split_ops_seen; + Result TEST_GetAttribute(const std::string& key) const; private: