From 961874853093ec98f1f0268a6200016e1f231e3a Mon Sep 17 00:00:00 2001 From: vianney Date: Mon, 22 Jun 2026 13:03:40 +0200 Subject: [PATCH 1/3] chore(protobuf): add serde to trilean --- libdd-trace-protobuf/build.rs | 2 ++ libdd-trace-protobuf/src/pb.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/libdd-trace-protobuf/build.rs b/libdd-trace-protobuf/build.rs index ee60555381..8a564cf58f 100644 --- a/libdd-trace-protobuf/build.rs +++ b/libdd-trace-protobuf/build.rs @@ -251,6 +251,8 @@ fn generate_protobuf() { "#[serde(rename = \"srv_src\")]", ); + config.type_attribute("Trilean", "#[derive(Deserialize, Serialize)]"); + // idx module type attributes config.type_attribute("pb.idx.AnyValue", "#[derive(Deserialize, Serialize)]"); config.type_attribute( diff --git a/libdd-trace-protobuf/src/pb.rs b/libdd-trace-protobuf/src/pb.rs index feae884fb7..b3085ac0b6 100644 --- a/libdd-trace-protobuf/src/pb.rs +++ b/libdd-trace-protobuf/src/pb.rs @@ -668,6 +668,7 @@ pub struct ClientGroupedStats { >, } /// Trilean is an expanded boolean type that is meant to differentiate between being unset and false. +#[derive(Deserialize, Serialize)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum Trilean { From 88afc4bc4322a14bb3df2862086c91405a5af0ce Mon Sep 17 00:00:00 2001 From: vianney Date: Mon, 22 Jun 2026 13:52:47 +0200 Subject: [PATCH 2/3] chore(stats): use trilean for is_trace_root --- datadog-ipc/src/shm_stats.rs | 8 +-- .../src/span_concentrator/aggregation.rs | 67 ++++++++++--------- 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/datadog-ipc/src/shm_stats.rs b/datadog-ipc/src/shm_stats.rs index 44941ec667..96497d1169 100644 --- a/datadog-ipc/src/shm_stats.rs +++ b/datadog-ipc/src/shm_stats.rs @@ -805,11 +805,7 @@ impl ShmSpanConcentrator { top_level_hits, span_kind: read_str!(f.span_kind), peer_tags, - is_trace_root: if f.is_trace_root { - pb::Trilean::True.into() - } else { - pb::Trilean::False.into() - }, + is_trace_root: f.is_trace_root.into(), http_method: read_str!(f.http_method), http_endpoint: read_str!(f.http_endpoint), grpc_status_code: f @@ -856,7 +852,7 @@ mod tests { service_source: "", http_status_code: 200, is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, grpc_status_code: None, }, peer_tags: &[], diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index cc5f82bdab..4ded936410 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -13,6 +13,9 @@ use std::borrow::{Borrow, Cow}; use crate::span_concentrator::StatSpan; +/// Sentinel value used for cardinality limiting. +pub const TRACER_BLOCKED_VALUE: &str = "tracer_blocked_value"; + const TAG_STATUS_CODE: &str = "http.status_code"; const TAG_SYNTHETICS: &str = "synthetics"; const TAG_SPANKIND: &str = "span.kind"; @@ -48,7 +51,7 @@ pub struct FixedAggregationKey { pub http_status_code: u32, pub grpc_status_code: Option, pub is_synthetics_request: bool, - pub is_trace_root: bool, + pub is_trace_root: pb::Trilean, } impl FixedAggregationKey { @@ -280,7 +283,11 @@ impl<'a> BorrowedAggregationKey<'a> { is_synthetics_request: span .get_meta(TAG_ORIGIN) .is_some_and(|origin| origin.starts_with(TAG_SYNTHETICS)), - is_trace_root: span.is_trace_root(), + is_trace_root: if span.is_trace_root() { + pb::Trilean::True + } else { + pb::Trilean::False + }, }, peer_tags, } @@ -302,7 +309,8 @@ impl From for OwnedAggregationKey { http_status_code: value.http_status_code, grpc_status_code: value.grpc_status_code.parse().ok(), is_synthetics_request: value.synthetics, - is_trace_root: value.is_trace_root == 1, + is_trace_root: pb::Trilean::try_from(value.is_trace_root) + .unwrap_or(pb::Trilean::NotSet), }, peer_tags: value .peer_tags @@ -512,11 +520,7 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl .into_iter() .map(|(k, v)| format!("{k}:{v}")) .collect(), - is_trace_root: if f.is_trace_root { - pb::Trilean::True.into() - } else { - pb::Trilean::False.into() - }, + is_trace_root: f.is_trace_root.into(), http_method: f.http_method, http_endpoint: f.http_endpoint, grpc_status_code: f @@ -533,6 +537,7 @@ mod tests { use libdd_trace_utils::span::v04::{SpanBytes, SpanSlice}; use super::*; + use libdd_trace_protobuf::pb; use std::hash::Hash; fn get_hash(v: &impl Hash) -> u64 { @@ -574,7 +579,7 @@ mod tests { service_name: "service".into(), operation_name: "op".into(), resource_name: "res".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -595,7 +600,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "client".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -620,7 +625,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "client".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -647,7 +652,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "producer".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -675,7 +680,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "server".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -696,7 +701,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), is_synthetics_request: true, - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -717,7 +722,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, http_status_code: 418, ..Default::default() } @@ -739,7 +744,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -760,7 +765,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, http_status_code: 418, ..Default::default() } @@ -788,7 +793,7 @@ mod tests { http_method: "GET".into(), http_endpoint: "/api/v1/users".into(), is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -816,7 +821,7 @@ mod tests { http_method: "POST".into(), http_endpoint: "/users/create2".into(), is_synthetics_request: false, - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -829,7 +834,7 @@ mod tests { }, FixedAggregationKey { grpc_status_code: Some(0), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -854,7 +859,7 @@ mod tests { }, FixedAggregationKey { grpc_status_code: Some(14), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -867,7 +872,7 @@ mod tests { }, FixedAggregationKey { grpc_status_code: Some(14), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -881,7 +886,7 @@ mod tests { }, FixedAggregationKey { grpc_status_code: Some(7), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -894,7 +899,7 @@ mod tests { }, FixedAggregationKey { grpc_status_code: Some(3), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -906,7 +911,7 @@ mod tests { ..Default::default() }, FixedAggregationKey { - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), @@ -926,7 +931,7 @@ mod tests { service_name: "my-service".into(), operation_name: "op".into(), resource_name: "res".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, service_source: "redis".into(), ..Default::default() } @@ -947,7 +952,7 @@ mod tests { service_name: "my-service".into(), operation_name: "op".into(), resource_name: "res".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, service_source: "opt.split_by_tag".into(), ..Default::default() } @@ -967,7 +972,7 @@ mod tests { service_name: "my-service".into(), operation_name: "op".into(), resource_name: "res".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, service_source: "".into(), ..Default::default() } @@ -998,7 +1003,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "client".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key_with_peers(vec![("aws.s3.bucket".into(), "bucket-a".into())]), @@ -1025,7 +1030,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "producer".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key_with_peers(vec![ @@ -1057,7 +1062,7 @@ mod tests { operation_name: "op".into(), resource_name: "res".into(), span_kind: "server".into(), - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), From ebd7e4ef002de62aa5b8d9ab79effbd6e2000291 Mon Sep 17 00:00:00 2001 From: vianney Date: Mon, 22 Jun 2026 15:06:13 +0200 Subject: [PATCH 3/3] feat(stats): add whole-key cardinality limit --- .../src/trace_exporter/builder.rs | 1 + .../src/trace_exporter/stats.rs | 1 + .../benches/span_concentrator_bench.rs | 1 + .../src/span_concentrator/aggregation.rs | 57 +++- .../src/span_concentrator/mod.rs | 24 +- .../src/span_concentrator/tests.rs | 276 +++++++++++++++++- libdd-trace-stats/src/stats_exporter.rs | 1 + 7 files changed, 354 insertions(+), 7 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index b0286df2ea..7f5f871d34 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -617,6 +617,7 @@ impl TraceExporterBuilder { std::time::SystemTime::now(), span_kinds, self.peer_tags.clone(), + None, #[cfg(feature = "stats-obfuscation")] None, ))); diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 11e66cae3c..88960a28c2 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -135,6 +135,7 @@ pub(crate) fn start_stats_computation< std::time::SystemTime::now(), span_kinds, peer_tags, + None, #[cfg(feature = "stats-obfuscation")] Some(client_side_stats.obfuscation_config.clone()), ))); diff --git a/libdd-trace-stats/benches/span_concentrator_bench.rs b/libdd-trace-stats/benches/span_concentrator_bench.rs index 03526acb3e..1e162cb4a2 100644 --- a/libdd-trace-stats/benches/span_concentrator_bench.rs +++ b/libdd-trace-stats/benches/span_concentrator_bench.rs @@ -44,6 +44,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { now, vec![], vec!["db_name".into(), "bucket_s3".into()], + None, #[cfg(feature = "stats-obfuscation")] None, ); diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index 4ded936410..9f1d24490e 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -294,6 +294,29 @@ impl<'a> BorrowedAggregationKey<'a> { } } +impl OwnedAggregationKey { + /// Return the overflow sentinel key. + pub(super) fn overflow_key() -> Self { + OwnedAggregationKey { + fixed: FixedAggregationKey { + resource_name: TRACER_BLOCKED_VALUE.to_owned(), + service_name: TRACER_BLOCKED_VALUE.to_owned(), + operation_name: TRACER_BLOCKED_VALUE.to_owned(), + span_type: TRACER_BLOCKED_VALUE.to_owned(), + span_kind: TRACER_BLOCKED_VALUE.to_owned(), + http_method: TRACER_BLOCKED_VALUE.to_owned(), + http_endpoint: TRACER_BLOCKED_VALUE.to_owned(), + service_source: TRACER_BLOCKED_VALUE.to_owned(), + http_status_code: 0, + grpc_status_code: None, + is_synthetics_request: false, + is_trace_root: pb::Trilean::NotSet, + }, + peer_tags: vec![], + } + } +} + impl From for OwnedAggregationKey { fn from(value: pb::ClientGroupedStats) -> Self { Self { @@ -422,19 +445,35 @@ pub struct OtlpStatsBucket { pub(super) struct StatsBucket { data: HashMap, start: u64, + /// Maximum number of distinct aggregation keys this bucket will hold before collapsing new + /// ones into the overflow sentinel key. + max_entries: usize, + /// Number of spans collapsed into the overflow bucket due to cardinality limiting. + collapsed_count: u64, } impl StatsBucket { - /// Return a new StatsBucket starting at the given timestamp - pub(super) fn new(start_timestamp: u64) -> Self { + /// Return a new StatsBucket starting at `start_timestamp`. + /// + /// `max_entries` is the maximum number of distinct aggregation keys the bucket will hold. + /// Once the limit is reached, new distinct keys are collapsed into the overflow sentinel key. + pub(super) fn new(start_timestamp: u64, max_entries: usize) -> Self { Self { data: HashMap::new(), start: start_timestamp, + max_entries, + collapsed_count: 0, } } - /// Insert a value as stats in the group corresponding to the aggregation key, if it does - /// not exist it creates it. + /// Return the number of spans collapsed into the overflow bucket. + pub(super) fn collapsed_count(&self) -> u64 { + self.collapsed_count + } + + /// Insert a value as stats in the group corresponding to the aggregation key. If the key is new + /// and the `max_entries` limit has not been reached, a new entry is created, else the span is + /// instead merged into the overflow sentinel key. pub(super) fn insert( &mut self, key: BorrowedAggregationKey<'_>, @@ -443,6 +482,14 @@ impl StatsBucket { is_top_level: bool, grpc_method: &str, ) { + if self.data.len() >= self.max_entries && !self.data.contains_key(&key) { + self.collapsed_count += 1; + self.data + .entry(OwnedAggregationKey::overflow_key()) + .or_default() + .insert(duration, is_error, is_top_level); + return; + } self.data .entry_ref(&key) .or_insert_with(|| GroupedStats { @@ -846,7 +893,7 @@ mod tests { ..Default::default() }, FixedAggregationKey { - is_trace_root: true, + is_trace_root: pb::Trilean::True, ..Default::default() } .into_key(), diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 636e87744d..f1fda89d02 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -65,6 +65,14 @@ pub struct StatsComputationObfuscationConfig { pub type SharedStatsComputationObfuscationConfig = std::sync::Arc>; +/// Default maximum number of distinct aggregation keys per time bucket. +/// +/// 7 168 is the limit to exactly saturate hashbrown's internal table at its maximum load factor of +/// 7/8. Any higher limit would immediately force a doubling of the table capacity, wasting +/// half the allocated slots for a modest increase in cardinality. To avoid future changes going +/// over this limit (e.g. adding extra overflow buckets) we set a slightly lower limit. +pub const DEFAULT_MAX_ENTRIES_PER_BUCKET: usize = 7_000; + /// SpanConcentrator compute stats on span aggregated by time and span attributes /// /// # Aggregation @@ -80,6 +88,11 @@ pub type SharedStatsComputationObfuscationConfig = /// When the SpanConcentrator is flushed it keeps the `buffer_len` most recent buckets and remove /// all older buckets returning their content. When using force flush all buckets are flushed /// regardless of their age. +/// +/// # Cardinality limiting +/// Each time bucket holds at most `max_entries_per_bucket` distinct aggregation keys. Once that +/// limit is reached, spans whose key is not already present are merged into a single overflow +/// bucket keyed by [`aggregation::TRACER_BLOCKED_VALUE`]. #[derive(Debug, Clone)] pub struct SpanConcentrator { /// Size of the time buckets used for aggregation in nanos @@ -90,6 +103,8 @@ pub struct SpanConcentrator { oldest_timestamp: u64, /// bufferLen is the number stats bucket we keep when flushing. buffer_len: usize, + /// Maximum number of distinct aggregation keys per bucket. + max_entries_per_bucket: usize, /// span.kind fields eligible for stats computation span_kinds_stats_computed: Vec, /// keys for supplementary tags that describe peer.service entities @@ -104,12 +119,15 @@ impl SpanConcentrator { /// - `now` the current system time, used to define the oldest bucket /// - `span_kinds_stats_computed` list of span kinds eligible for stats computation /// - `peer_tags_keys` list of keys considered as peer tags for aggregation + /// - `override_max_entries_per_bucket` maximum distinct aggregation keys per time bucket before + /// cardinality limiting applies. Pass `None` to use [`DEFAULT_MAX_ENTRIES_PER_BUCKET`]. /// - `obfuscation_config` optional and updatable config for resource key obfuscation pub fn new( bucket_size: Duration, now: SystemTime, span_kinds_stats_computed: Vec, peer_tag_keys: Vec, + override_max_entries_per_bucket: Option, #[cfg(feature = "stats-obfuscation")] obfuscation_config: Option< SharedStatsComputationObfuscationConfig, >, @@ -122,6 +140,8 @@ impl SpanConcentrator { bucket_size.as_nanos() as u64, ), buffer_len: 2, + max_entries_per_bucket: override_max_entries_per_bucket + .unwrap_or(DEFAULT_MAX_ENTRIES_PER_BUCKET), span_kinds_stats_computed, peer_tag_keys, #[cfg(feature = "stats-obfuscation")] @@ -178,7 +198,7 @@ impl SpanConcentrator { }; self.buckets .entry(bucket_timestamp) - .or_insert(StatsBucket::new(bucket_timestamp)) + .or_insert_with(|| StatsBucket::new(bucket_timestamp, self.max_entries_per_bucket)) .insert( agg_key, span.duration(), @@ -233,6 +253,7 @@ impl SpanConcentrator { align_timestamp(now_timestamp, self.bucket_size) - (self.buffer_len as u64 - 1) * self.bucket_size }; + let mut total_collapsed = 0; buckets .into_iter() .filter_map(|(timestamp, bucket)| { @@ -248,6 +269,7 @@ impl SpanConcentrator { self.buckets.insert(timestamp, bucket); return None; } + total_collapsed += bucket.collapsed_count(); Some(encode(bucket, self.bucket_size)) }) .collect() diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 2dd064d93a..81be4fda00 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -1,7 +1,7 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::span_concentrator::aggregation::OwnedAggregationKey; +use crate::span_concentrator::aggregation::{OwnedAggregationKey, TRACER_BLOCKED_VALUE}; use super::*; use libdd_trace_utils::span::v04::VecMap; @@ -105,6 +105,7 @@ fn test_concentrator_oldest_timestamp_cold() { now, vec![], vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -161,6 +162,7 @@ fn test_concentrator_oldest_timestamp_hot() { now, vec![], vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -240,6 +242,7 @@ fn test_concentrator_stats_totals() { now, vec![], vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -306,6 +309,7 @@ fn test_concentrator_stats_counts() { now, vec![], vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -603,6 +607,7 @@ fn test_span_should_be_included_in_stats() { now, get_span_kinds(), vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -683,6 +688,7 @@ fn test_ignore_partial_spans() { now, get_span_kinds(), vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -708,6 +714,7 @@ fn test_force_flush() { now, get_span_kinds(), vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -791,6 +798,7 @@ fn test_peer_tags_aggregation() { now, get_span_kinds(), vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -799,6 +807,7 @@ fn test_peer_tags_aggregation() { now, get_span_kinds(), vec!["db.instance".to_string(), "db.system".to_string()], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -982,6 +991,7 @@ fn test_peer_tags_quantization_aggregation() { "db.system".to_string(), "peer.hostname".to_string(), ], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -1108,6 +1118,7 @@ fn test_base_service_peer_tag() { now, get_span_kinds(), vec!["db.instance".to_string(), "db.system".to_string()], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -1330,6 +1341,7 @@ fn test_pb_span() { now, get_span_kinds(), vec!["db.instance".to_string(), "db.system".to_string()], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -1581,6 +1593,7 @@ fn test_flush_with_otlp_exact_per_cell_scalars() { now, get_span_kinds(), vec![], + None, #[cfg(feature = "stats-obfuscation")] None, ); @@ -1622,3 +1635,264 @@ fn test_flush_with_otlp_exact_per_cell_scalars() { assert_eq!(group.hits, 5); assert_eq!(group.errors, 2); } + +/// Build a minimal concentrator with a tiny `max_entries_per_bucket` for cardinality tests. +fn make_cardinality_concentrator(max_entries: usize) -> SpanConcentrator { + let now = SystemTime::now(); + SpanConcentrator::new( + Duration::from_nanos(BUCKET_SIZE), + now, + get_span_kinds(), + vec![], + Some(max_entries), + #[cfg(feature = "stats-obfuscation")] + None, + ) +} + +/// When the limit is 3 and we insert 5 distinct-resource spans, only 3 normal keys plus one +/// overflow key must appear in the flushed stats. Total hits must equal 5. +#[test] +fn test_cardinality_limit_collapse() { + let now = SystemTime::now(); + let limit: usize = 3; + let mut concentrator = make_cardinality_concentrator(limit); + + // Insert limit + 2 distinct-resource root spans all in the same time bucket. + let resources: Vec = (0..limit + 2).map(|i| format!("resource-{i}")).collect(); + for (i, resource) in resources.iter().enumerate() { + let span = get_test_span_with_meta( + now, + i as u64 + 1, + 0, + 100, + 2, + "svc", + resource, + 0, + &[], + &[("_dd.measured", 1.0)], + ); + concentrator.add_span(&span); + } + + let buckets = concentrator.flush(SystemTime::now(), true); + assert!(!buckets.is_empty(), "should get at least one time bucket"); + + let stats = &buckets[0].stats; + + // Exactly limit normal keys + 1 overflow key. + assert_eq!( + stats.len(), + limit + 1, + "expected {limit} normal groups + 1 overflow group, got {}", + stats.len() + ); + + // Total hits must be preserved. + let total_hits: u64 = stats.iter().map(|g| g.hits).sum(); + assert_eq!( + total_hits, + (limit + 2) as u64, + "total hits must equal the number of inserted spans" + ); + + // Exactly one overflow group, identified by the sentinel resource. + let overflow_groups: Vec<_> = stats + .iter() + .filter(|g| g.resource == TRACER_BLOCKED_VALUE) + .collect(); + assert_eq!( + overflow_groups.len(), + 1, + "expected exactly one overflow group" + ); +} + +/// The overflow bucket must correctly aggregate the hits from overflow spans. +#[test] +fn test_overflow_bucket_counts() { + let now = SystemTime::now(); + let limit: usize = 1; + let mut concentrator = make_cardinality_concentrator(limit); + + // First span fills the sole slot; the next 4 spans all have distinct keys → all overflow. + for i in 0..5usize { + let resource = format!("resource-{i}"); + let span = get_test_span_with_meta( + now, + i as u64 + 1, + 0, + 10 * (i as i64 + 1), + 2, + "svc", + &resource, + 0, + &[], + &[("_dd.measured", 1.0)], + ); + concentrator.add_span(&span); + } + + let buckets = concentrator.flush(SystemTime::now(), true); + assert!(!buckets.is_empty()); + let stats = &buckets[0].stats; + + // There must be exactly 2 groups: 1 normal + 1 overflow. + assert_eq!( + stats.len(), + 2, + "expected exactly 1 normal + 1 overflow group" + ); + + let overflow = stats + .iter() + .find(|g| g.resource == TRACER_BLOCKED_VALUE) + .expect("overflow group must exist"); + + // 4 spans overflowed, total duration = 20 + 30 + 40 + 50 = 140. + assert_eq!(overflow.hits, 4, "all 4 overflow spans must be merged"); + assert_eq!( + overflow.duration, 140, + "overflow durations must sum correctly" + ); +} + +/// When the number of distinct spans is within the limit, no overflow bucket should appear. +#[test] +fn test_no_collapse_within_limit() { + let now = SystemTime::now(); + let limit: usize = 10; + let mut concentrator = make_cardinality_concentrator(limit); + + // Insert exactly `limit` distinct-resource spans — no overflow expected. + for i in 0..limit { + let resource = format!("resource-{i}"); + let span = get_test_span_with_meta( + now, + i as u64 + 1, + 0, + 50, + 2, + "svc", + &resource, + 0, + &[], + &[("_dd.measured", 1.0)], + ); + concentrator.add_span(&span); + } + + let buckets = concentrator.flush(SystemTime::now(), true); + assert!(!buckets.is_empty()); + let stats = &buckets[0].stats; + + assert_eq!( + stats.len(), + limit, + "expected exactly {limit} groups with no overflow" + ); + assert!( + stats.iter().all(|g| g.resource != TRACER_BLOCKED_VALUE), + "no overflow group should be present within the limit" + ); +} + +/// The overflow `ClientGroupedStats` row must carry `tracer_blocked_value` on all sentinel +/// string fields as specified by the RFC. +#[test] +fn test_overflow_bucket_key_sentinel_values() { + let now = SystemTime::now(); + let limit: usize = 1; + let mut concentrator = make_cardinality_concentrator(limit); + + // First span occupies the only slot; second one overflows. + let first = get_test_span_with_meta( + now, + 1, + 0, + 50, + 2, + "my-service", + "my-resource", + 0, + &[], + &[("_dd.measured", 1.0)], + ); + let second = get_test_span_with_meta( + now, + 2, + 0, + 75, + 2, + "other-service", + "other-resource", + 0, + &[], + &[("_dd.measured", 1.0)], + ); + + concentrator.add_span(&first); + concentrator.add_span(&second); + + let buckets = concentrator.flush(SystemTime::now(), true); + assert!(!buckets.is_empty()); + let stats = &buckets[0].stats; + + let overflow = stats + .iter() + .find(|g| g.resource == TRACER_BLOCKED_VALUE) + .expect("overflow group must exist"); + + // Every string dimension must be the sentinel. + assert_eq!( + overflow.service, TRACER_BLOCKED_VALUE, + "service must be sentinel" + ); + assert_eq!(overflow.name, TRACER_BLOCKED_VALUE, "name must be sentinel"); + assert_eq!( + overflow.resource, TRACER_BLOCKED_VALUE, + "resource must be sentinel" + ); + assert_eq!( + overflow.r#type, TRACER_BLOCKED_VALUE, + "type must be sentinel" + ); + assert_eq!( + overflow.span_kind, TRACER_BLOCKED_VALUE, + "span_kind must be sentinel" + ); + assert_eq!( + overflow.http_method, TRACER_BLOCKED_VALUE, + "http_method must be sentinel" + ); + assert_eq!( + overflow.http_endpoint, TRACER_BLOCKED_VALUE, + "http_endpoint must be sentinel" + ); + assert_eq!( + overflow.service_source, TRACER_BLOCKED_VALUE, + "service_source must be sentinel" + ); + // Numeric and boolean fields must be zero/false (NOT_SET per RFC). + assert_eq!(overflow.http_status_code, 0, "http_status_code must be 0"); + assert_eq!( + overflow.grpc_status_code, "", + "grpc_status_code must be empty" + ); + assert!(!overflow.synthetics, "synthetics must be false"); + // is_trace_root uses Trilean; NOT_SET maps to 0. + assert_eq!( + overflow.is_trace_root, 0, + "is_trace_root must be NOT_SET (0)" + ); + assert!(overflow.peer_tags.is_empty(), "peer_tags must be empty"); + + // The normal group must be unaffected. + let normal = stats + .iter() + .find(|g| g.resource != TRACER_BLOCKED_VALUE) + .expect("normal group must exist"); + assert_eq!(normal.service, "my-service"); + assert_eq!(normal.resource, "my-resource"); +} diff --git a/libdd-trace-stats/src/stats_exporter.rs b/libdd-trace-stats/src/stats_exporter.rs index 9406aef888..f92c54e8d7 100644 --- a/libdd-trace-stats/src/stats_exporter.rs +++ b/libdd-trace-stats/src/stats_exporter.rs @@ -309,6 +309,7 @@ mod tests { SystemTime::now() - BUCKETS_DURATION * 3, vec![], vec![], + None, #[cfg(feature = "stats-obfuscation")] None, );