From 9ba2c3f5a5d16be0251d871413424c165e5684cf Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Thu, 6 Nov 2025 17:22:58 +0100 Subject: [PATCH 1/5] feat(span_processor): add debug mode for abandoned spans # Motivation This feature is inspired from the go tracer https://github.com/DataDog/dd-trace-go/pull/2188 It's hard to understand and locate spans that are never finished. This PR adds a debug mode to the tracer that will track the age and root span name of traces, and * log warnings from time to time if the traces are older than some amount of time. * log warning if some traces are still open during tracer shutdown # Implementation This feature should not add any cost to the tracer if it is not enabled, but requires storing some extra data associated with each trace. In order to not use any extra memory if the feature is not enabled, I track it in an additional registry which is only used in debug mode. This registry tracks the root span name, and the age of the trace. The debug mode is controlled by 2 new configurations: * DD_TRACE_DEBUG_OPEN_SPANS * DD_TRACE_OPEN_SPAN_TIMEOUT In order to test this feature correctly, I added extra code to intercept and store logs during integration tests. This is done through a thread local Logger, which is overridden and propagated locally during tests. Everything is hidden behind the test-utils feature and should thus be zero cost --- datadog-opentelemetry/src/abandoned_traces.rs | 237 ++++++++++++++++++ datadog-opentelemetry/src/lib.rs | 1 + datadog-opentelemetry/src/sampler.rs | 1 + datadog-opentelemetry/src/span_exporter.rs | 36 ++- datadog-opentelemetry/src/span_processor.rs | 174 +++++++++---- datadog-opentelemetry/src/spans_metrics.rs | 56 ++++- .../tests/integration_tests/tracing_api.rs | 48 +++- .../benches/inject_benchmark.rs | 2 +- dd-trace/src/configuration/configuration.rs | 98 +++++++- dd-trace/src/configuration/remote_config.rs | 3 +- .../configuration/supported_configurations.rs | 4 + dd-trace/src/log.rs | 129 +++++++++- supported-configurations.json | 16 ++ 13 files changed, 702 insertions(+), 103 deletions(-) create mode 100644 datadog-opentelemetry/src/abandoned_traces.rs diff --git a/datadog-opentelemetry/src/abandoned_traces.rs b/datadog-opentelemetry/src/abandoned_traces.rs new file mode 100644 index 00000000..2c0d66c4 --- /dev/null +++ b/datadog-opentelemetry/src/abandoned_traces.rs @@ -0,0 +1,237 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::span_processor::ShardedTraces; +use hashbrown::{hash_map::Entry, HashMap}; +use std::time::{Duration, Instant}; + +#[derive(Debug)] +struct TraceInfo { + name: String, + start_ts: Instant, + open_spans: usize, +} + +pub struct OldTrace { + pub tid: u128, + pub root_span_name: String, + pub age: Duration, + pub open_spans: usize, +} + +#[derive(Clone, Debug)] +/// This registry tracks the age and name of currently open traces +pub struct AbandonedTracesRegistry { + shards: ShardedTraces, +} + +impl AbandonedTracesRegistry { + pub fn new() -> Self { + Self { + shards: ShardedTraces::new(|_| InnerAbandonedTracesRegistry { + traces: HashMap::new(), + }), + } + } + pub fn register_root_span_sampling(&self, trace_id: [u8; 16], name: String) { + self.shards + .write_shard(trace_id) + .register_root_span_sampling(trace_id, name); + } + + pub fn register_local_root_span(&self, trace_id: [u8; 16]) { + self.shards + .write_shard(trace_id) + .register_root_span(trace_id); + } + + pub fn register_span(&self, trace_id: [u8; 16]) { + self.shards.write_shard(trace_id).register_span(trace_id); + } + + pub fn finish_span(&self, trace_id: [u8; 16]) { + self.shards.write_shard(trace_id).finish_span(trace_id); + } + + pub fn iter_open_traces(&self) -> impl Iterator + use<'_> { + let now = Instant::now(); + self.shards + .iter() + .map(move |shard| { + let shard = shard + .read() + .expect("failed to lock the abandoned spans registry"); + let now = now.clone(); + shard + .traces + .iter() + .filter_map(|(tid, trace)| { + let age: Duration = now.checked_duration_since(trace.start_ts)?; + Some(OldTrace { + tid: u128::from_be_bytes(*tid), + root_span_name: trace.name.clone(), + age, + open_spans: trace.open_spans, + }) + }) + .collect::>() + }) + .flatten() + } + + pub fn iter_old_traces(&self, min_age: Duration) -> impl Iterator + use<'_> { + let now = Instant::now(); + self.shards + .iter() + .map(move |shard| { + let shard = shard + .read() + .expect("failed to lock the abandoned spans registry"); + let now = now.clone(); + shard + .traces + .iter() + .filter_map(|(tid, trace)| { + let age = now.checked_duration_since(trace.start_ts)?; + if age < min_age { + return None; + } + Some(OldTrace { + tid: u128::from_be_bytes(*tid), + root_span_name: trace.name.clone(), + age, + open_spans: trace.open_spans, + }) + }) + .collect::>() + }) + .flatten() + } +} + +#[derive(Debug)] +struct InnerAbandonedTracesRegistry { + traces: HashMap<[u8; 16], TraceInfo>, +} + +impl InnerAbandonedTracesRegistry { + fn register_root_span_sampling(&mut self, trace_id: [u8; 16], name: String) { + self.traces + .entry(trace_id) + .or_insert(TraceInfo { + open_spans: 0, + name, + start_ts: Instant::now(), + }) + .open_spans += 1; + } + + fn register_root_span(&mut self, trace_id: [u8; 16]) { + let Entry::Vacant(e) = self.traces.entry(trace_id) else { + return; + }; + e.insert(TraceInfo { + open_spans: 1, + name: "unknown_name".to_string(), + start_ts: Instant::now(), + }); + } + + fn register_span(&mut self, trace_id: [u8; 16]) { + self.traces + .entry(trace_id) + .or_insert(TraceInfo { + open_spans: 0, + name: "".to_string(), + start_ts: Instant::now(), + }) + .open_spans += 1; + } + + fn finish_span(&mut self, trace_id: [u8; 16]) { + let Entry::Occupied(mut e) = self.traces.entry(trace_id) else { + return; + }; + let trace = e.get_mut(); + trace.open_spans -= 1; + if trace.open_spans == 0 { + e.remove(); + } + } +} + +#[cfg(test)] +mod tests { + use std::{thread, time::Duration}; + + use hashbrown::HashSet; + + use crate::abandoned_traces::AbandonedTracesRegistry; + + fn active_traces(r: &AbandonedTracesRegistry) -> usize { + r.shards + .iter() + .map(|s| s.read().unwrap().traces.len()) + .sum::() + } + + #[test] + fn test_span_registration() { + let registry = AbandonedTracesRegistry::new(); + let trace_id = [1; 16]; + registry.register_root_span_sampling(trace_id, "root_span".to_owned()); + registry.register_local_root_span(trace_id); + for _ in 0..16 { + registry.register_span(trace_id); + registry.finish_span(trace_id); + } + assert_eq!(active_traces(®istry), 1); + + registry.finish_span(trace_id); + + assert_eq!(active_traces(®istry), 0); + } + + #[test] + fn test_abandoned_spans() { + let registry = AbandonedTracesRegistry::new(); + for i in 1..=2 { + let trace_id = (i as u128).to_be_bytes(); + registry.register_root_span_sampling(trace_id, format!("root_span_{i}")); + registry.register_local_root_span(trace_id); + } + thread::sleep(Duration::from_millis(50)); + + let trace_id = (3 as u128).to_be_bytes(); + registry.register_root_span_sampling(trace_id, format!("root_span_{}", 3)); + registry.register_local_root_span(trace_id); + + let old_traces = registry + .iter_old_traces(Duration::from_millis(10)) + .map(|t| (t.tid, t.root_span_name, t.open_spans)) + .collect::>(); + assert_eq!(active_traces(®istry), 3); + assert_eq!( + old_traces, + HashSet::from_iter([ + (1, "root_span_1".to_string(), 1), + (2, "root_span_2".to_string(), 1), + ]) + ); + + for i in 1..=2 { + let trace_id = (i as u128).to_be_bytes(); + registry.finish_span(trace_id); + } + thread::sleep(Duration::from_millis(50)); + let old_traces = registry + .iter_old_traces(Duration::from_millis(10)) + .map(|t| (t.tid, t.root_span_name, t.open_spans)) + .collect::>(); + assert_eq!(active_traces(®istry), 1); + assert_eq!( + old_traces, + HashSet::from_iter([(3, "root_span_3".to_string(), 1),]) + ); + } +} diff --git a/datadog-opentelemetry/src/lib.rs b/datadog-opentelemetry/src/lib.rs index f4776960..68947c8a 100644 --- a/datadog-opentelemetry/src/lib.rs +++ b/datadog-opentelemetry/src/lib.rs @@ -70,6 +70,7 @@ mod span_processor; mod spans_metrics; mod text_map_propagator; mod trace_id; +mod abandoned_traces; use std::sync::{Arc, RwLock}; diff --git a/datadog-opentelemetry/src/sampler.rs b/datadog-opentelemetry/src/sampler.rs index e724c40d..ec6621e7 100644 --- a/datadog-opentelemetry/src/sampler.rs +++ b/datadog-opentelemetry/src/sampler.rs @@ -144,6 +144,7 @@ impl ShouldSample for Sampler { .register_local_root_trace_propagation_data( trace_id.to_bytes(), trace_propagation_data, + self.cfg.trace_debug_open_spans().then(|| name.to_string()) ) { RegisterTracePropagationResult::Existing(sampling_decision) => { return opentelemetry::trace::SamplingResult { diff --git a/datadog-opentelemetry/src/span_exporter.rs b/datadog-opentelemetry/src/span_exporter.rs index e73cdbcf..26ab1468 100644 --- a/datadog-opentelemetry/src/span_exporter.rs +++ b/datadog-opentelemetry/src/span_exporter.rs @@ -589,25 +589,23 @@ impl TraceExporterWorker { agent_response_handler: Option Fn(&'a str) + Send + Sync>>, ) -> TraceExporterHandle { - let handle = thread::spawn({ - move || { - let trace_exporter = match builder.build() { - Ok(exporter) => exporter, - Err(e) => { - return Err(e); - } - }; - let cached_config = CachedConfig::new(&cfg); - let task = Self { - trace_exporter, - cached_config, - rx, - otel_resource, - agent_response_handler, - }; - task.run() - } - }); + let handle = thread::spawn(dd_trace::log::with_local_logger(move || { + let trace_exporter = match builder.build() { + Ok(exporter) => exporter, + Err(e) => { + return Err(e); + } + }; + let cached_config = CachedConfig::new(&cfg); + let task = Self { + trace_exporter, + cached_config, + rx, + otel_resource, + agent_response_handler, + }; + task.run() + })); TraceExporterHandle { handle: Mutex::new(Some(handle)), } diff --git a/datadog-opentelemetry/src/span_processor.rs b/datadog-opentelemetry/src/span_processor.rs index f9a33a1a..fe5d6f21 100644 --- a/datadog-opentelemetry/src/span_processor.rs +++ b/datadog-opentelemetry/src/span_processor.rs @@ -5,8 +5,10 @@ use hashbrown::{hash_map, HashMap as BHashMap}; use std::{ collections::HashMap, fmt::Debug, + ops::DerefMut, str::FromStr, sync::{Arc, RwLock}, + time::Duration, }; use dd_trace::{ @@ -29,6 +31,7 @@ use opentelemetry_sdk::Resource; use opentelemetry_semantic_conventions::resource::SERVICE_NAME; use crate::{ + abandoned_traces::{self, AbandonedTracesRegistry}, create_dd_resource, span_exporter::DatadogExporter, spans_metrics::{TelemetryMetricsCollector, TelemetryMetricsCollectorHandle}, @@ -41,7 +44,6 @@ struct Trace { /// Root span will always be the first span in this vector if it is present finished_spans: Vec, open_span_count: usize, - propagation_data: TracePropagationData, } @@ -247,6 +249,47 @@ impl InnerTraceRegistry { } } +#[derive(Debug)] +pub struct ShardedTraces { + shards: Arc<[CachePadded>; TRACE_REGISTRY_SHARDS]>, + hasher: foldhash::fast::RandomState, +} + +impl ShardedTraces { + pub fn new T>(mut f: F) -> Self { + Self { + shards: Arc::new(std::array::from_fn(|i| CachePadded(RwLock::new(f(i))))), + hasher: foldhash::fast::RandomState::default(), + } + } + + pub fn write_shard(&self, trace_id: [u8; 16]) -> impl DerefMut + use<'_, T> { + self.get_shard(trace_id) + .write() + .expect("Failed to acquire lock on trace registry") + } + + pub fn iter(&self) -> impl Iterator> { + self.shards.iter().map(|i| &i.0) + } + + fn get_shard(&self, trace_id: [u8; 16]) -> &RwLock { + use std::hash::BuildHasher; + let hash = self.hasher.hash_one(u128::from_ne_bytes(trace_id)); + let shard: usize = hash as usize % TRACE_REGISTRY_SHARDS; + &self.shards[shard].0 + } +} + +impl Clone for ShardedTraces { + fn clone(&self) -> Self { + Self { + shards: self.shards.clone(), + hasher: self.hasher.clone(), + } + } +} + const TRACE_REGISTRY_SHARDS: usize = 64; #[repr(align(128))] @@ -262,34 +305,24 @@ struct CachePadded(T); /// - The number of open spans in the trace /// - The sampling decision of the trace pub(crate) struct TraceRegistry { - // Example: - // inner: Arc<[CacheAligned>; N]>; - // to access a trace we do inner[hash(trace_id) % N].read() - inner: Arc<[CachePadded>; TRACE_REGISTRY_SHARDS]>, - hasher: foldhash::fast::RandomState, + shards: ShardedTraces, + abandoned_spans: Option, } impl TraceRegistry { pub fn new(config: Arc) -> Self { Self { - inner: Arc::new(std::array::from_fn(|_| { - CachePadded(RwLock::new(InnerTraceRegistry { - registry: BHashMap::new(), - metrics: TraceRegistryMetrics::default(), - config: config.clone(), - })) - })), - hasher: foldhash::fast::RandomState::default(), + shards: ShardedTraces::new(|_| InnerTraceRegistry { + registry: BHashMap::new(), + metrics: TraceRegistryMetrics::default(), + config: config.clone(), + }), + abandoned_spans: config + .trace_debug_open_spans() + .then(AbandonedTracesRegistry::new), } } - fn get_shard(&self, trace_id: [u8; 16]) -> &RwLock { - use std::hash::BuildHasher; - let hash = self.hasher.hash_one(u128::from_ne_bytes(trace_id)); - let shard = hash as usize % TRACE_REGISTRY_SHARDS; - &self.inner[shard].0 - } - /// Register the trace propagation data for a given trace ID /// This increases the open span count for the trace by 1, but does not set the root span ID. /// You will then need to call `register_local_root_span` to set the root span ID @@ -300,11 +333,13 @@ impl TraceRegistry { &self, trace_id: [u8; 16], propagation_data: TracePropagationData, + span_name: Option, ) -> RegisterTracePropagationResult { - let mut inner = self - .get_shard(trace_id) - .write() - .expect("Failed to acquire lock on trace registry"); + self.abandoned_spans + .as_ref() + .and_then(|a| Some(a.register_root_span_sampling(trace_id, span_name?))); + + let mut inner = self.shards.write_shard(trace_id); inner.register_local_root_trace_propagation_data(trace_id, propagation_data) } @@ -312,10 +347,11 @@ impl TraceRegistry { /// This will also increment the open span count for the trace. /// If the trace is already registered, it will ignore the new root span ID and log a warning. pub fn register_local_root_span(&self, trace_id: [u8; 16], root_span_id: [u8; 8]) { - let mut inner = self - .get_shard(trace_id) - .write() - .expect("Failed to acquire lock on trace registry"); + self.abandoned_spans + .as_ref() + .map(|a| a.register_local_root_span(trace_id)); + + let mut inner = self.shards.write_shard(trace_id); inner.register_local_root_span(trace_id, root_span_id); } @@ -326,10 +362,11 @@ impl TraceRegistry { span_id: [u8; 8], propagation_data: TracePropagationData, ) { - let mut inner = self - .get_shard(trace_id) - .write() - .expect("Failed to acquire lock on trace registry"); + self.abandoned_spans + .as_ref() + .map(|a| a.register_span(trace_id)); + + let mut inner = self.shards.write_shard(trace_id); inner.register_span(trace_id, span_id, propagation_data); } @@ -337,26 +374,23 @@ impl TraceRegistry { /// If the trace is finished (i.e., all spans are finished), return the full trace chunk to /// flush fn finish_span(&self, trace_id: [u8; 16], span_data: SpanData) -> Option { - let mut inner = self - .get_shard(trace_id) - .write() - .expect("Failed to acquire lock on trace registry"); + self.abandoned_spans + .as_ref() + .map(|a| a.finish_span(trace_id)); + + let mut inner = self.shards.write_shard(trace_id); inner.finish_span(trace_id, span_data) } pub fn get_trace_propagation_data(&self, trace_id: [u8; 16]) -> TracePropagationData { - let inner = self - .get_shard(trace_id) - .read() - .expect("Failed to acquire lock on trace registry"); - + let inner = self.shards.write_shard(trace_id); inner.get_trace_propagation_data(trace_id).clone() } pub fn get_metrics(&self) -> TraceRegistryMetrics { let mut stats = TraceRegistryMetrics::default(); - for shard_idx in 0..TRACE_REGISTRY_SHARDS { - let mut shard = self.inner[shard_idx].0.write().unwrap(); + for shard in self.shards.iter() { + let mut shard = shard.write().unwrap(); let shard_stats = shard.get_metrics(); stats.spans_created += shard_stats.spans_created; stats.spans_finished += shard_stats.spans_finished; @@ -366,6 +400,21 @@ impl TraceRegistry { } stats } + + pub fn iter_old_traces( + &self, + min_age: Duration, + ) -> impl Iterator + use<'_> { + self.abandoned_spans + .iter() + .flat_map(move |a| a.iter_old_traces(min_age)) + } + + pub fn iter_lost_traces(&self) -> impl Iterator + use<'_> { + self.abandoned_spans + .iter() + .flat_map(move |a| a.iter_open_traces()) + } } #[derive(Default, Debug)] @@ -412,9 +461,15 @@ impl DatadogSpanProcessor { } else { None }; + let span_exporter = DatadogExporter::new(config.clone(), agent_response_handler); + let telemetry_metrics_handle = config.telemetry_enabled().then(|| { - TelemetryMetricsCollector::start(registry.clone(), span_exporter.queue_metrics()) + TelemetryMetricsCollector::start( + config.clone(), + registry.clone(), + span_exporter.queue_metrics(), + ) }); Self { @@ -868,6 +923,7 @@ mod tests { "foobar".to_string(), )])), }, + None, ); } tr @@ -955,7 +1011,7 @@ mod tests { let span_id = [1u8; 8]; // Register and finish a single span - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, span_id); let span_data = create_test_span_data(trace_id, span_id); @@ -977,7 +1033,7 @@ mod tests { let child2_span_id = [3u8; 8]; // Register root span - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); // Register child spans @@ -1018,7 +1074,11 @@ mod tests { let trace_id = [i; 16]; let span_id = [i; 8]; - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data( + trace_id, + EMPTY_PROPAGATION_DATA, + None, + ); registry.register_local_root_span(trace_id, span_id); let span_data = create_test_span_data(trace_id, span_id); @@ -1046,7 +1106,7 @@ mod tests { let child_span_id = [2u8; 8]; // Register root and child spans - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); registry.register_span(trace_id, child_span_id, EMPTY_PROPAGATION_DATA); @@ -1091,7 +1151,7 @@ mod tests { let span_id = [1u8; 8]; // Create and finish a trace - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, span_id); let span_data = create_test_span_data(trace_id, span_id); registry.finish_span(trace_id, span_data); @@ -1118,7 +1178,11 @@ mod tests { let trace_id = (i as u128).to_be_bytes(); let span_id = [i as u8; 8]; - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data( + trace_id, + EMPTY_PROPAGATION_DATA, + None, + ); registry.register_local_root_span(trace_id, span_id); let span_data = create_test_span_data(trace_id, span_id); @@ -1139,7 +1203,7 @@ mod tests { let root_span_id = [1u8; 8]; // Register root - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); // Register 5 child spans @@ -1177,7 +1241,7 @@ mod tests { let root_span_id = [1u8; 8]; // Register root span - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); // Register and finish more than default min_spans @@ -1216,7 +1280,7 @@ mod tests { let root_span_id = [1u8; 8]; // Register root span - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); // Register 15 child spans @@ -1265,7 +1329,7 @@ mod tests { let root_span_id = [1u8; 8]; // Register root span - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); // Register 20 child spans diff --git a/datadog-opentelemetry/src/spans_metrics.rs b/datadog-opentelemetry/src/spans_metrics.rs index c5c7a50a..77523c60 100644 --- a/datadog-opentelemetry/src/spans_metrics.rs +++ b/datadog-opentelemetry/src/spans_metrics.rs @@ -3,11 +3,15 @@ use std::{sync::Arc, time::Duration}; -use dd_trace::utils::{ShutdownSignaler, WorkerError, WorkerHandle}; +use dd_trace::{ + utils::{ShutdownSignaler, WorkerError, WorkerHandle}, + Config, +}; use crate::{span_exporter::QueueMetricsFetcher, TraceRegistry}; pub struct TelemetryMetricsCollector { + config: Arc, registry: TraceRegistry, exporter_queue_metrics: QueueMetricsFetcher, shutdown_rx: std::sync::mpsc::Receiver<()>, @@ -37,18 +41,20 @@ impl Drop for TelemetryMetricsCollector { impl TelemetryMetricsCollector { pub fn start( + config: Arc, registry: TraceRegistry, exporter_queue_metrics: QueueMetricsFetcher, ) -> TelemetryMetricsCollectorHandle { let (shutdown_tx, shutdown_rx) = std::sync::mpsc::sync_channel(1); let shutdown_finished = ShutdownSignaler::new(); let worker = Self { + config, registry, shutdown_rx, shutdown_finished: shutdown_finished.clone(), exporter_queue_metrics, }; - let handle = std::thread::spawn(|| worker.run()); + let handle = std::thread::spawn(dd_trace::log::with_local_logger(|| worker.run())); TelemetryMetricsCollectorHandle { shutdown_tx, worker_handle: WorkerHandle::new(shutdown_finished, handle), @@ -56,12 +62,54 @@ impl TelemetryMetricsCollector { } fn run(mut self) { + let interval; + #[cfg(feature = "test-utils")] + { + interval = self.config.__internal_span_metrics_interval(); + } + #[cfg(not(feature = "test-utils"))] + { + interval = Duration::from_secs(10); + } loop { - match self.shutdown_rx.recv_timeout(Duration::from_secs(10)) { + match self.shutdown_rx.recv_timeout(interval) { Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} - Err(std::sync::mpsc::RecvTimeoutError::Disconnected) | Ok(()) => return, + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) | Ok(()) => break, }; self.emit_metrics(); + if self.config.trace_debug_open_spans() { + self.warn_maybe_abandoned_traces(); + } + } + if self.config.trace_debug_open_spans() { + self.warn_shutdown_abandoned_traces(); + } + } + + fn warn_shutdown_abandoned_traces(&self) { + for t in self.registry.iter_lost_traces().take(100) { + // Log at most 100 traces + dd_trace::dd_warn!( + "lost trace not finished during shutdown trace_id={} root_name={} age={}ms open_spans={}", + t.tid, + t.root_span_name.as_str(), + t.age.as_millis(), + t.open_spans + ) + } + } + + fn warn_maybe_abandoned_traces(&self) { + let min_age = self.config.trace_open_span_timeout(); + // Log at most 100 traces + for t in self.registry.iter_old_traces(min_age).take(100) { + dd_trace::dd_warn!( + "possibly abandoned trace trace_id={} root_name={} age={}ms open_spans={}", + t.tid, + t.root_span_name.as_str(), + t.age.as_millis(), + t.open_spans + ) } } diff --git a/datadog-opentelemetry/tests/integration_tests/tracing_api.rs b/datadog-opentelemetry/tests/integration_tests/tracing_api.rs index a0afbbc9..b64ba197 100644 --- a/datadog-opentelemetry/tests/integration_tests/tracing_api.rs +++ b/datadog-opentelemetry/tests/integration_tests/tracing_api.rs @@ -1,7 +1,7 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashMap; +use std::{collections::HashMap, thread, time::Duration}; use opentelemetry::{ trace::{TraceContextExt, TracerProvider}, @@ -85,3 +85,49 @@ async fn test_remote_span_extraction_propagation() { }) .await; } + +#[tokio::test] +async fn test_debug_open_spans() { + const SESSION_NAME: &str = "tracing_api/test_debug_open_spans"; + let mut cfg = dd_trace::Config::builder(); + cfg.set_log_level_filter(dd_trace::log::LevelFilter::Off) + .set_trace_debug_open_spans(true) + .set_trace_open_span_timeout(Duration::from_millis(1)) + .__internal_set_span_metrics_interval(Duration::from_millis(100)); + let _logger_guard = dd_trace::log::test_logger::activate_test_logger(); + with_test_agent_session(SESSION_NAME, cfg, |_, tracer_provider, _, _| { + let subscriber = tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .with(tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("test"))); + let _guard = subscriber.set_default(); + + // leak a span + let _root_span = tracing::trace_span!("root_span").entered(); + std::mem::forget(tracing::trace_span!("child_span")); + thread::sleep(Duration::from_millis(200)); + + + let test_logs = dd_trace::log::test_logger::take_test_logs().unwrap(); + let abandoned_logs = test_logs + .iter() + .filter(|(lvl, msg)| { + *lvl == dd_trace::log::Level::Warn + && msg.contains("possibly abandoned trace") + && msg.contains("root_name=root_span") + }) + .collect::>(); + assert!(abandoned_logs.len() >= 1) + }) + .await; + + let test_logs = dd_trace::log::test_logger::take_test_logs().unwrap(); + let abandoned_logs = test_logs + .iter() + .filter(|(lvl, msg)| { + *lvl == dd_trace::log::Level::Warn + && msg.contains("lost trace not finished during shutdown") + && msg.contains("root_name=root_span") + }) + .collect::>(); + assert!(abandoned_logs.len() >= 1) +} diff --git a/dd-trace-propagation/benches/inject_benchmark.rs b/dd-trace-propagation/benches/inject_benchmark.rs index 28bf17d0..f964fda3 100644 --- a/dd-trace-propagation/benches/inject_benchmark.rs +++ b/dd-trace-propagation/benches/inject_benchmark.rs @@ -101,7 +101,7 @@ fn bench_datadog_only_inject> { } } -impl_config_value_provider!(simple: Cow<'static, str>, bool, u32, usize, i32, f64, ServiceName, LevelFilter, ParsedSamplingRules); +impl_config_value_provider!(simple: Cow<'static, str>, bool, u32, u64, usize, i32, f64, ServiceName, LevelFilter, ParsedSamplingRules); impl_config_value_provider!(option: String); +impl ConfigurationValueProvider for Duration { + fn get_configuration_value(&self) -> String { + self.as_secs_f64().to_string() + } +} + #[derive(Clone)] #[non_exhaustive] /// Configuration for the Datadog Tracer @@ -811,6 +818,7 @@ pub struct Config { /// Configurations for testing. Not exposed to customer #[cfg(feature = "test-utils")] wait_agent_info_ready: bool, + span_metrics_interval: Duration, // # Telemetry configuration /// Disables telemetry if false @@ -824,6 +832,10 @@ pub struct Config { trace_partial_flush_enabled: ConfigItem, trace_partial_flush_min_spans: ConfigItem, + /// Debug potentially abandoned spans + trace_debug_open_spans: ConfigItem, + trace_open_span_timeout: ConfigItem, + /// Trace propagation configuration trace_propagation_style: ConfigItem>>, trace_propagation_style_extract: ConfigItem>>, @@ -986,6 +998,15 @@ impl Config { default.telemetry_heartbeat_interval, |interval: f64| interval.abs(), ), + trace_debug_open_spans: cisu.update_parsed( + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS, + default.trace_debug_open_spans, + ), + trace_open_span_timeout: cisu.update_parsed_with_transform( + SupportedConfigurations::DD_TRACE_OPEN_SPAN_TIMEOUT, + default.trace_open_span_timeout, + |val: u64| Duration::from_secs(val.max(1)), + ), trace_propagation_style: cisu.update_parsed_with_transform( SupportedConfigurations::DD_TRACE_PROPAGATION_STYLE, default.trace_propagation_style, @@ -1005,8 +1026,6 @@ impl Config { SupportedConfigurations::DD_TRACE_PROPAGATION_EXTRACT_FIRST, default.trace_propagation_extract_first, ), - #[cfg(feature = "test-utils")] - wait_agent_info_ready: default.wait_agent_info_ready, extra_services_tracker: ExtraServicesTracker::new(), remote_config_enabled: cisu.update_parsed( SupportedConfigurations::DD_REMOTE_CONFIGURATION_ENABLED, @@ -1023,6 +1042,11 @@ impl Config { default.datadog_tags_max_length, |max: usize| max.min(DATADOG_TAGS_MAX_LENGTH), ), + + // Test only configs + #[cfg(feature = "test-utils")] + wait_agent_info_ready: default.wait_agent_info_ready, + span_metrics_interval: default.span_metrics_interval, } } @@ -1127,6 +1151,14 @@ impl Config { self.dogstatsd_agent_url.value() } + pub fn trace_debug_open_spans(&self) -> bool { + *self.trace_debug_open_spans.value() + } + + pub fn trace_open_span_timeout(&self) -> Duration { + *self.trace_open_span_timeout.value() + } + pub fn trace_sampling_rules(&self) -> impl Deref + use<'_> { self.trace_sampling_rules.value() } @@ -1147,11 +1179,6 @@ impl Config { *self.trace_stats_computation_enabled.value() } - #[cfg(feature = "test-utils")] - pub fn __internal_wait_agent_info_ready(&self) -> bool { - self.wait_agent_info_ready - } - /// Static runtime id if the process fn process_runtime_id() -> &'static str { // TODO(paullgdc): Regenerate on fork? Would we even support forks? @@ -1308,6 +1335,17 @@ impl Config { pub fn datadog_tags_max_length(&self) -> usize { *self.datadog_tags_max_length.value() } + + // Test only configs + #[cfg(feature = "test-utils")] + pub fn __internal_wait_agent_info_ready(&self) -> bool { + self.wait_agent_info_ready + } + + #[cfg(feature = "test-utils")] + pub fn __internal_span_metrics_interval(&self) -> Duration { + self.span_metrics_interval + } } impl std::fmt::Debug for Config { @@ -1326,6 +1364,11 @@ impl std::fmt::Debug for Config { .field("trace_rate_limit", &self.trace_rate_limit) .field("enabled", &self.enabled) .field("log_level_filter", &self.log_level_filter) + .field("trace_debug_open_spans", &self.trace_debug_open_spans) + .field( + "trace_open_span_timeout_secs", + &self.trace_open_span_timeout, + ) .field( "trace_stats_computation_enabled", &self.trace_stats_computation_enabled, @@ -1384,6 +1427,14 @@ fn default_config() -> Config { SupportedConfigurations::DD_DOGSTATSD_URL, Cow::Borrowed(""), ), + trace_debug_open_spans: ConfigItem::new( + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS, + false, + ), + trace_open_span_timeout: ConfigItem::new( + SupportedConfigurations::DD_TRACE_OPEN_SPAN_TIMEOUT, + Duration::from_secs(60), + ), trace_sampling_rules: ConfigItemWithOverride::new_rc( SupportedConfigurations::DD_TRACE_SAMPLING_RULES, ParsedSamplingRules::default(), // Empty rules by default @@ -1401,9 +1452,6 @@ fn default_config() -> Config { SupportedConfigurations::DD_TRACE_STATS_COMPUTATION_ENABLED, true, ), - #[cfg(feature = "test-utils")] - wait_agent_info_ready: false, - telemetry_enabled: ConfigItem::new( SupportedConfigurations::DD_INSTRUMENTATION_TELEMETRY_ENABLED, true, @@ -1457,6 +1505,11 @@ fn default_config() -> Config { SupportedConfigurations::DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH, DATADOG_TAGS_MAX_LENGTH, ), + + // Test only configs + #[cfg(feature = "test-utils")] + wait_agent_info_ready: false, + span_metrics_interval: Duration::from_secs(10), } } @@ -1570,6 +1623,18 @@ impl ConfigBuilder { self } + pub fn set_trace_debug_open_spans(&mut self, enabled: bool) -> &mut Self { + self.config.trace_debug_open_spans.set_code(enabled); + self + } + + pub fn set_trace_open_span_timeout(&mut self, timeout: Duration) -> &mut Self { + self.config + .trace_open_span_timeout + .set_code(timeout.max(Duration::from_millis(1))); + self + } + pub fn set_trace_partial_flush_enabled(&mut self, enabled: bool) -> &mut Self { self.config.trace_partial_flush_enabled.set_code(enabled); self @@ -1664,7 +1729,10 @@ impl ConfigBuilder { } #[cfg(feature = "test-utils")] - pub fn set_datadog_tags_max_length_with_no_limit(&mut self, length: usize) -> &mut Self { + pub fn __internal_set_datadog_tags_max_length_with_no_limit( + &mut self, + length: usize, + ) -> &mut Self { self.config.datadog_tags_max_length.set_code(length); self } @@ -1677,6 +1745,12 @@ impl ConfigBuilder { self.config.wait_agent_info_ready = wait_agent_info_ready; self } + + #[cfg(feature = "test-utils")] + pub fn __internal_set_span_metrics_interval(&mut self, interval: Duration) -> &mut Self { + self.config.span_metrics_interval = interval; + self + } } #[cfg(test)] diff --git a/dd-trace/src/configuration/remote_config.rs b/dd-trace/src/configuration/remote_config.rs index 15d24532..0004563b 100644 --- a/dd-trace/src/configuration/remote_config.rs +++ b/dd-trace/src/configuration/remote_config.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::configuration::Config; +use crate::log; use crate::utils::{ShutdownSignaler, WorkerHandle}; use anyhow::Result; @@ -330,7 +331,7 @@ impl RemoteConfigClientWorker { client: RemoteConfigClient::new(config)?, shutdown_receiver, }; - let join_handle = thread::spawn(move || worker.run()); + let join_handle = thread::spawn(log::with_local_logger(move || worker.run())); Ok(RemoteConfigClientHandle { cancel_token, worker_handle: WorkerHandle::new(shutdown_finished, join_handle), diff --git a/dd-trace/src/configuration/supported_configurations.rs b/dd-trace/src/configuration/supported_configurations.rs index 6b638635..099781ce 100644 --- a/dd-trace/src/configuration/supported_configurations.rs +++ b/dd-trace/src/configuration/supported_configurations.rs @@ -24,6 +24,8 @@ pub(crate) enum SupportedConfigurations { DD_TELEMETRY_LOG_COLLECTION_ENABLED, DD_TRACE_AGENT_PORT, DD_TRACE_AGENT_URL, + DD_TRACE_DEBUG_OPEN_SPANS, + DD_TRACE_OPEN_SPAN_TIMEOUT, DD_TRACE_ENABLED, DD_TRACE_PARTIAL_FLUSH_ENABLED, DD_TRACE_PARTIAL_FLUSH_MIN_SPANS, @@ -71,6 +73,8 @@ impl SupportedConfigurations { } SupportedConfigurations::DD_TRACE_AGENT_PORT => "DD_TRACE_AGENT_PORT", SupportedConfigurations::DD_TRACE_AGENT_URL => "DD_TRACE_AGENT_URL", + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS => "DD_TRACE_DEBUG_OPEN_SPANS", + SupportedConfigurations::DD_TRACE_OPEN_SPAN_TIMEOUT => "DD_TRACE_OPEN_SPAN_TIMEOUT", SupportedConfigurations::DD_TRACE_ENABLED => "DD_TRACE_ENABLED", SupportedConfigurations::DD_TRACE_PARTIAL_FLUSH_ENABLED => { "DD_TRACE_PARTIAL_FLUSH_ENABLED" diff --git a/dd-trace/src/log.rs b/dd-trace/src/log.rs index 3bfd62e2..989a62cd 100644 --- a/dd-trace/src/log.rs +++ b/dd-trace/src/log.rs @@ -66,7 +66,7 @@ impl Display for LevelFilter { } #[repr(usize)] -#[derive(Copy, Debug, Hash)] +#[derive(Copy, Debug, Hash, PartialEq)] pub enum Level { Error = 1, // this value must match with LogLevelFilter::Error Warn, @@ -128,6 +128,98 @@ impl PartialOrd for Level { } } +#[cfg(feature = "test-utils")] +pub mod test_logger { + //! Implements a thread local, overridable logger + //! + //! Tests can locally intercept logs by calling to `activate_test_logger` + //! + //! ```no_run + //! let _log_guard = dd_trace::log::test_logger::activate_test_logger;(); + //! // whatever is logged by the dd_(level)! macros will be stored + //! dd_trace::dd_debug!("my log"); + //! let logs = dd_trace::log::test_logger::take_test_logs().unwrap(); + //! // logs should contain (Debug, "my log") + //! + //! // to see logs in threads spawned from the test, the function passed to spawn + //! // should be wrapped by `with_local_logger` + //! std::thread::spawn(dd_trace::log::with_local_logger(|| { + //! dd_trace::dd_debug!("my log"); + //! })).join(); + //! ``` + use std::{cell::RefCell, sync::Arc}; + + #[derive(Default)] + struct TestLogger(std::sync::Mutex>); + + pub fn print_log( + lvl: crate::log::Level, + log: std::fmt::Arguments, + _file: &str, + _line: u32, + _template: Option<&str>, + ) { + let _ = LOCAL_LOGGER.try_with(|l| { + if let Some(l) = &*l.borrow() { + l.0.lock().unwrap().push((lvl, log.to_string())) + } + }); + } + + thread_local! { + static LOCAL_LOGGER: RefCell>> = RefCell::new(None); + } + + pub fn with_local_logger R, R>(f: F) -> impl FnOnce() -> R { + let logger = LOCAL_LOGGER.try_with(|l| l.borrow().clone()).ok().flatten(); + move || { + let _guard = LoggerGuard { + prev: LOCAL_LOGGER.replace(logger), + }; + f() + } + } + + pub struct LoggerGuard { + prev: Option>, + } + + impl Drop for LoggerGuard { + fn drop(&mut self) { + LOCAL_LOGGER.set(self.prev.take()); + } + } + + pub fn activate_test_logger() -> LoggerGuard { + let prev = LOCAL_LOGGER.replace(Some(Arc::new(TestLogger::default()))); + LoggerGuard { prev } + } + + pub fn take_test_logs() -> Option> { + use std::ops::DerefMut; + + LOCAL_LOGGER + .try_with(|l| { + l.borrow() + .as_deref() + .map(|l| std::mem::take(l.0.lock().unwrap().deref_mut())) + }) + .ok() + .flatten() + } +} + +pub fn with_local_logger R, R>(f: F) -> impl FnOnce() -> R { + #[cfg(feature = "test-utils")] + { + test_logger::with_local_logger(f) + } + #[cfg(not(feature = "test-utils"))] + { + f + } +} + pub fn print_log( lvl: crate::log::Level, log: fmt::Arguments, @@ -190,23 +282,21 @@ macro_rules! dd_log { let loc = std::panic::Location::caller(); $crate::log::print_log(lvl, format_args!($first, $($rest)*), loc.file(), loc.line(), Some($first)); } + #[cfg(feature = "test-utils")] + { + let loc = std::panic::Location::caller(); + $crate::log::test_logger::print_log(lvl, format_args!($first, $($rest)*), loc.file(), loc.line(), Some($first)) + } }}; ($lvl:expr, $first:expr) => { - let lvl = $lvl; - if lvl <= $crate::log::max_level() { - let loc = std::panic::Location::caller(); - $crate::log::print_log(lvl, format_args!($first), loc.file(), loc.line(), Some($first)); - } + $crate::dd_log!($lvl, $first,) }; } #[cfg(test)] mod tests { - use crate::{ - log::LevelFilter, - log::{max_level, set_max_level, Level}, - }; + use crate::log::{max_level, set_max_level, test_logger, Level, LevelFilter}; #[test] fn test_default_max_level() { @@ -245,4 +335,23 @@ mod tests { } } } + + #[test] + fn test_test_logger() { + let _g = test_logger::activate_test_logger(); + dd_debug!("debug log {}", "foo"); + std::thread::spawn(test_logger::with_local_logger(|| { + dd_warn!("debug log {}", "bar"); + })) + .join() + .unwrap(); + let test_logs = test_logger::take_test_logs().unwrap(); + assert_eq!( + &test_logs, + &[ + (Level::Debug, "debug log foo".into()), + (Level::Warn, "debug log bar".into()) + ] + ); + } } diff --git a/supported-configurations.json b/supported-configurations.json index 7c2a6a58..d886fa66 100644 --- a/supported-configurations.json +++ b/supported-configurations.json @@ -130,6 +130,22 @@ "propertyKeys": ["trace_agent_url"] } ], + "DD_TRACE_DEBUG_OPEN_SPANS": [ + { + "version": "A", + "type": "boolean", + "default_value": "false", + "propertyKeys": ["trace_debug_open_spans"] + } + ], + "DD_TRACE_OPEN_SPAN_TIMEOUT": [ + { + "version": "A", + "type": "seconds", + "default_value": "60", + "propertyKeys": ["trace_open_span_timeout"] + } + ], "DD_TRACE_ENABLED": [ { "version": "A", From 0080297fd1ad07ee41208a92cef6cdee0eb46101 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 7 Nov 2025 17:38:59 +0100 Subject: [PATCH 2/5] fix: change open spans timeout config name --- datadog-opentelemetry/src/spans_metrics.rs | 2 +- .../tests/integration_tests/tracing_api.rs | 2 +- dd-trace/src/configuration/configuration.rs | 24 +++++++++---------- .../configuration/supported_configurations.rs | 6 +++-- supported-configurations.json | 4 ++-- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/datadog-opentelemetry/src/spans_metrics.rs b/datadog-opentelemetry/src/spans_metrics.rs index 77523c60..2667fbc8 100644 --- a/datadog-opentelemetry/src/spans_metrics.rs +++ b/datadog-opentelemetry/src/spans_metrics.rs @@ -100,7 +100,7 @@ impl TelemetryMetricsCollector { } fn warn_maybe_abandoned_traces(&self) { - let min_age = self.config.trace_open_span_timeout(); + let min_age = self.config.trace_debug_open_spans_timeout(); // Log at most 100 traces for t in self.registry.iter_old_traces(min_age).take(100) { dd_trace::dd_warn!( diff --git a/datadog-opentelemetry/tests/integration_tests/tracing_api.rs b/datadog-opentelemetry/tests/integration_tests/tracing_api.rs index b64ba197..0ec1fe74 100644 --- a/datadog-opentelemetry/tests/integration_tests/tracing_api.rs +++ b/datadog-opentelemetry/tests/integration_tests/tracing_api.rs @@ -92,7 +92,7 @@ async fn test_debug_open_spans() { let mut cfg = dd_trace::Config::builder(); cfg.set_log_level_filter(dd_trace::log::LevelFilter::Off) .set_trace_debug_open_spans(true) - .set_trace_open_span_timeout(Duration::from_millis(1)) + .set_trace_debug_open_spans_timeout(Duration::from_millis(1)) .__internal_set_span_metrics_interval(Duration::from_millis(100)); let _logger_guard = dd_trace::log::test_logger::activate_test_logger(); with_test_agent_session(SESSION_NAME, cfg, |_, tracer_provider, _, _| { diff --git a/dd-trace/src/configuration/configuration.rs b/dd-trace/src/configuration/configuration.rs index a39a4594..b5bc87a0 100644 --- a/dd-trace/src/configuration/configuration.rs +++ b/dd-trace/src/configuration/configuration.rs @@ -834,7 +834,7 @@ pub struct Config { /// Debug potentially abandoned spans trace_debug_open_spans: ConfigItem, - trace_open_span_timeout: ConfigItem, + trace_debug_open_spans_timeout: ConfigItem, /// Trace propagation configuration trace_propagation_style: ConfigItem>>, @@ -1002,9 +1002,9 @@ impl Config { SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS, default.trace_debug_open_spans, ), - trace_open_span_timeout: cisu.update_parsed_with_transform( - SupportedConfigurations::DD_TRACE_OPEN_SPAN_TIMEOUT, - default.trace_open_span_timeout, + trace_debug_open_spans_timeout: cisu.update_parsed_with_transform( + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT, + default.trace_debug_open_spans_timeout, |val: u64| Duration::from_secs(val.max(1)), ), trace_propagation_style: cisu.update_parsed_with_transform( @@ -1155,8 +1155,8 @@ impl Config { *self.trace_debug_open_spans.value() } - pub fn trace_open_span_timeout(&self) -> Duration { - *self.trace_open_span_timeout.value() + pub fn trace_debug_open_spans_timeout(&self) -> Duration { + *self.trace_debug_open_spans_timeout.value() } pub fn trace_sampling_rules(&self) -> impl Deref + use<'_> { @@ -1366,8 +1366,8 @@ impl std::fmt::Debug for Config { .field("log_level_filter", &self.log_level_filter) .field("trace_debug_open_spans", &self.trace_debug_open_spans) .field( - "trace_open_span_timeout_secs", - &self.trace_open_span_timeout, + "trace_debug_open_spans_timeout_secs", + &self.trace_debug_open_spans_timeout, ) .field( "trace_stats_computation_enabled", @@ -1431,8 +1431,8 @@ fn default_config() -> Config { SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS, false, ), - trace_open_span_timeout: ConfigItem::new( - SupportedConfigurations::DD_TRACE_OPEN_SPAN_TIMEOUT, + trace_debug_open_spans_timeout: ConfigItem::new( + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT, Duration::from_secs(60), ), trace_sampling_rules: ConfigItemWithOverride::new_rc( @@ -1628,9 +1628,9 @@ impl ConfigBuilder { self } - pub fn set_trace_open_span_timeout(&mut self, timeout: Duration) -> &mut Self { + pub fn set_trace_debug_open_spans_timeout(&mut self, timeout: Duration) -> &mut Self { self.config - .trace_open_span_timeout + .trace_debug_open_spans_timeout .set_code(timeout.max(Duration::from_millis(1))); self } diff --git a/dd-trace/src/configuration/supported_configurations.rs b/dd-trace/src/configuration/supported_configurations.rs index 099781ce..b0f233ba 100644 --- a/dd-trace/src/configuration/supported_configurations.rs +++ b/dd-trace/src/configuration/supported_configurations.rs @@ -25,7 +25,7 @@ pub(crate) enum SupportedConfigurations { DD_TRACE_AGENT_PORT, DD_TRACE_AGENT_URL, DD_TRACE_DEBUG_OPEN_SPANS, - DD_TRACE_OPEN_SPAN_TIMEOUT, + DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT, DD_TRACE_ENABLED, DD_TRACE_PARTIAL_FLUSH_ENABLED, DD_TRACE_PARTIAL_FLUSH_MIN_SPANS, @@ -74,7 +74,9 @@ impl SupportedConfigurations { SupportedConfigurations::DD_TRACE_AGENT_PORT => "DD_TRACE_AGENT_PORT", SupportedConfigurations::DD_TRACE_AGENT_URL => "DD_TRACE_AGENT_URL", SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS => "DD_TRACE_DEBUG_OPEN_SPANS", - SupportedConfigurations::DD_TRACE_OPEN_SPAN_TIMEOUT => "DD_TRACE_OPEN_SPAN_TIMEOUT", + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT => { + "DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT" + } SupportedConfigurations::DD_TRACE_ENABLED => "DD_TRACE_ENABLED", SupportedConfigurations::DD_TRACE_PARTIAL_FLUSH_ENABLED => { "DD_TRACE_PARTIAL_FLUSH_ENABLED" diff --git a/supported-configurations.json b/supported-configurations.json index d886fa66..1e5aab40 100644 --- a/supported-configurations.json +++ b/supported-configurations.json @@ -138,12 +138,12 @@ "propertyKeys": ["trace_debug_open_spans"] } ], - "DD_TRACE_OPEN_SPAN_TIMEOUT": [ + "DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT": [ { "version": "A", "type": "seconds", "default_value": "60", - "propertyKeys": ["trace_open_span_timeout"] + "propertyKeys": ["trace_debug_open_spans_timeout"] } ], "DD_TRACE_ENABLED": [ From 62fd94d4bdbe97f4f10dc05f66b83f89b3846772 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 7 Nov 2025 17:45:37 +0100 Subject: [PATCH 3/5] fix(lint): clippy + fmt --- datadog-opentelemetry/src/abandoned_traces.rs | 86 +++++++++---------- datadog-opentelemetry/src/lib.rs | 2 +- datadog-opentelemetry/src/sampler.rs | 2 +- datadog-opentelemetry/src/span_processor.rs | 27 +++--- datadog-opentelemetry/src/spans_metrics.rs | 1 + .../tests/integration_tests/tracing_api.rs | 5 +- dd-trace/src/log.rs | 20 +++-- 7 files changed, 70 insertions(+), 73 deletions(-) diff --git a/datadog-opentelemetry/src/abandoned_traces.rs b/datadog-opentelemetry/src/abandoned_traces.rs index 2c0d66c4..e84c4820 100644 --- a/datadog-opentelemetry/src/abandoned_traces.rs +++ b/datadog-opentelemetry/src/abandoned_traces.rs @@ -55,57 +55,51 @@ impl AbandonedTracesRegistry { pub fn iter_open_traces(&self) -> impl Iterator + use<'_> { let now = Instant::now(); - self.shards - .iter() - .map(move |shard| { - let shard = shard - .read() - .expect("failed to lock the abandoned spans registry"); - let now = now.clone(); - shard - .traces - .iter() - .filter_map(|(tid, trace)| { - let age: Duration = now.checked_duration_since(trace.start_ts)?; - Some(OldTrace { - tid: u128::from_be_bytes(*tid), - root_span_name: trace.name.clone(), - age, - open_spans: trace.open_spans, - }) + self.shards.iter().flat_map(move |shard| { + let shard = shard + .read() + .expect("failed to lock the abandoned spans registry"); + let now = now; + shard + .traces + .iter() + .filter_map(|(tid, trace)| { + let age: Duration = now.checked_duration_since(trace.start_ts)?; + Some(OldTrace { + tid: u128::from_be_bytes(*tid), + root_span_name: trace.name.clone(), + age, + open_spans: trace.open_spans, }) - .collect::>() - }) - .flatten() + }) + .collect::>() + }) } pub fn iter_old_traces(&self, min_age: Duration) -> impl Iterator + use<'_> { let now = Instant::now(); - self.shards - .iter() - .map(move |shard| { - let shard = shard - .read() - .expect("failed to lock the abandoned spans registry"); - let now = now.clone(); - shard - .traces - .iter() - .filter_map(|(tid, trace)| { - let age = now.checked_duration_since(trace.start_ts)?; - if age < min_age { - return None; - } - Some(OldTrace { - tid: u128::from_be_bytes(*tid), - root_span_name: trace.name.clone(), - age, - open_spans: trace.open_spans, - }) + self.shards.iter().flat_map(move |shard| { + let shard = shard + .read() + .expect("failed to lock the abandoned spans registry"); + let now = now; + shard + .traces + .iter() + .filter_map(|(tid, trace)| { + let age = now.checked_duration_since(trace.start_ts)?; + if age < min_age { + return None; + } + Some(OldTrace { + tid: u128::from_be_bytes(*tid), + root_span_name: trace.name.clone(), + age, + open_spans: trace.open_spans, }) - .collect::>() - }) - .flatten() + }) + .collect::>() + }) } } @@ -202,7 +196,7 @@ mod tests { } thread::sleep(Duration::from_millis(50)); - let trace_id = (3 as u128).to_be_bytes(); + let trace_id = 3_u128.to_be_bytes(); registry.register_root_span_sampling(trace_id, format!("root_span_{}", 3)); registry.register_local_root_span(trace_id); diff --git a/datadog-opentelemetry/src/lib.rs b/datadog-opentelemetry/src/lib.rs index 68947c8a..f26c7746 100644 --- a/datadog-opentelemetry/src/lib.rs +++ b/datadog-opentelemetry/src/lib.rs @@ -63,6 +63,7 @@ //! .init(); //! ``` +mod abandoned_traces; mod ddtrace_transform; mod sampler; mod span_exporter; @@ -70,7 +71,6 @@ mod span_processor; mod spans_metrics; mod text_map_propagator; mod trace_id; -mod abandoned_traces; use std::sync::{Arc, RwLock}; diff --git a/datadog-opentelemetry/src/sampler.rs b/datadog-opentelemetry/src/sampler.rs index ec6621e7..d9363389 100644 --- a/datadog-opentelemetry/src/sampler.rs +++ b/datadog-opentelemetry/src/sampler.rs @@ -144,7 +144,7 @@ impl ShouldSample for Sampler { .register_local_root_trace_propagation_data( trace_id.to_bytes(), trace_propagation_data, - self.cfg.trace_debug_open_spans().then(|| name.to_string()) + self.cfg.trace_debug_open_spans().then(|| name.to_string()), ) { RegisterTracePropagationResult::Existing(sampling_decision) => { return opentelemetry::trace::SamplingResult { diff --git a/datadog-opentelemetry/src/span_processor.rs b/datadog-opentelemetry/src/span_processor.rs index fe5d6f21..ba856ad9 100644 --- a/datadog-opentelemetry/src/span_processor.rs +++ b/datadog-opentelemetry/src/span_processor.rs @@ -285,7 +285,7 @@ impl Clone for ShardedTraces { fn clone(&self) -> Self { Self { shards: self.shards.clone(), - hasher: self.hasher.clone(), + hasher: self.hasher, } } } @@ -335,9 +335,10 @@ impl TraceRegistry { propagation_data: TracePropagationData, span_name: Option, ) -> RegisterTracePropagationResult { - self.abandoned_spans - .as_ref() - .and_then(|a| Some(a.register_root_span_sampling(trace_id, span_name?))); + self.abandoned_spans.as_ref().and_then(|a| { + let _: () = a.register_root_span_sampling(trace_id, span_name?); + Some(()) + }); let mut inner = self.shards.write_shard(trace_id); inner.register_local_root_trace_propagation_data(trace_id, propagation_data) @@ -347,9 +348,9 @@ impl TraceRegistry { /// This will also increment the open span count for the trace. /// If the trace is already registered, it will ignore the new root span ID and log a warning. pub fn register_local_root_span(&self, trace_id: [u8; 16], root_span_id: [u8; 8]) { - self.abandoned_spans - .as_ref() - .map(|a| a.register_local_root_span(trace_id)); + if let Some(a) = self.abandoned_spans.as_ref() { + a.register_local_root_span(trace_id) + } let mut inner = self.shards.write_shard(trace_id); inner.register_local_root_span(trace_id, root_span_id); @@ -362,9 +363,9 @@ impl TraceRegistry { span_id: [u8; 8], propagation_data: TracePropagationData, ) { - self.abandoned_spans - .as_ref() - .map(|a| a.register_span(trace_id)); + if let Some(a) = self.abandoned_spans.as_ref() { + a.register_span(trace_id) + } let mut inner = self.shards.write_shard(trace_id); inner.register_span(trace_id, span_id, propagation_data); @@ -374,9 +375,9 @@ impl TraceRegistry { /// If the trace is finished (i.e., all spans are finished), return the full trace chunk to /// flush fn finish_span(&self, trace_id: [u8; 16], span_data: SpanData) -> Option { - self.abandoned_spans - .as_ref() - .map(|a| a.finish_span(trace_id)); + if let Some(a) = self.abandoned_spans.as_ref() { + a.finish_span(trace_id) + } let mut inner = self.shards.write_shard(trace_id); inner.finish_span(trace_id, span_data) diff --git a/datadog-opentelemetry/src/spans_metrics.rs b/datadog-opentelemetry/src/spans_metrics.rs index 2667fbc8..b600f2ba 100644 --- a/datadog-opentelemetry/src/spans_metrics.rs +++ b/datadog-opentelemetry/src/spans_metrics.rs @@ -71,6 +71,7 @@ impl TelemetryMetricsCollector { { interval = Duration::from_secs(10); } + #[allow(clippy::while_let_loop)] loop { match self.shutdown_rx.recv_timeout(interval) { Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} diff --git a/datadog-opentelemetry/tests/integration_tests/tracing_api.rs b/datadog-opentelemetry/tests/integration_tests/tracing_api.rs index 0ec1fe74..9b514cfb 100644 --- a/datadog-opentelemetry/tests/integration_tests/tracing_api.rs +++ b/datadog-opentelemetry/tests/integration_tests/tracing_api.rs @@ -106,7 +106,6 @@ async fn test_debug_open_spans() { std::mem::forget(tracing::trace_span!("child_span")); thread::sleep(Duration::from_millis(200)); - let test_logs = dd_trace::log::test_logger::take_test_logs().unwrap(); let abandoned_logs = test_logs .iter() @@ -116,7 +115,7 @@ async fn test_debug_open_spans() { && msg.contains("root_name=root_span") }) .collect::>(); - assert!(abandoned_logs.len() >= 1) + assert!(!abandoned_logs.is_empty()) }) .await; @@ -129,5 +128,5 @@ async fn test_debug_open_spans() { && msg.contains("root_name=root_span") }) .collect::>(); - assert!(abandoned_logs.len() >= 1) + assert!(!abandoned_logs.is_empty()) } diff --git a/dd-trace/src/log.rs b/dd-trace/src/log.rs index 989a62cd..721bd890 100644 --- a/dd-trace/src/log.rs +++ b/dd-trace/src/log.rs @@ -130,22 +130,24 @@ impl PartialOrd for Level { #[cfg(feature = "test-utils")] pub mod test_logger { - //! Implements a thread local, overridable logger - //! + //! Implements a thread local, overridable logger + //! //! Tests can locally intercept logs by calling to `activate_test_logger` - //! + //! //! ```no_run - //! let _log_guard = dd_trace::log::test_logger::activate_test_logger;(); + //! let _log_guard = dd_trace::log::test_logger::activate_test_logger; + //! (); //! // whatever is logged by the dd_(level)! macros will be stored //! dd_trace::dd_debug!("my log"); //! let logs = dd_trace::log::test_logger::take_test_logs().unwrap(); //! // logs should contain (Debug, "my log") - //! - //! // to see logs in threads spawned from the test, the function passed to spawn + //! + //! // to see logs in threads spawned from the test, the function passed to spawn //! // should be wrapped by `with_local_logger` //! std::thread::spawn(dd_trace::log::with_local_logger(|| { - //! dd_trace::dd_debug!("my log"); - //! })).join(); + //! dd_trace::dd_debug!("my log"); + //! })) + //! .join(); //! ``` use std::{cell::RefCell, sync::Arc}; @@ -167,7 +169,7 @@ pub mod test_logger { } thread_local! { - static LOCAL_LOGGER: RefCell>> = RefCell::new(None); + static LOCAL_LOGGER: RefCell>> = const { RefCell::new(None) }; } pub fn with_local_logger R, R>(f: F) -> impl FnOnce() -> R { From ec51b46d06588eec29d8820ada2a6e5d1da1f988 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 12 Nov 2025 19:59:10 +0100 Subject: [PATCH 4/5] feat(span_processor): log abandoned span names --- datadog-opentelemetry/src/abandoned_traces.rs | 105 ++++++++++++------ datadog-opentelemetry/src/sampler.rs | 5 + datadog-opentelemetry/src/span_processor.rs | 8 +- datadog-opentelemetry/src/spans_metrics.rs | 26 +++-- .../integration_tests/opentelemetry_api.rs | 55 ++++++++- .../tests/integration_tests/tracing_api.rs | 31 ++++-- 6 files changed, 179 insertions(+), 51 deletions(-) diff --git a/datadog-opentelemetry/src/abandoned_traces.rs b/datadog-opentelemetry/src/abandoned_traces.rs index e84c4820..0b9617b3 100644 --- a/datadog-opentelemetry/src/abandoned_traces.rs +++ b/datadog-opentelemetry/src/abandoned_traces.rs @@ -7,14 +7,14 @@ use std::time::{Duration, Instant}; #[derive(Debug)] struct TraceInfo { - name: String, + open_span_names: HashMap, start_ts: Instant, open_spans: usize, } pub struct OldTrace { pub tid: u128, - pub root_span_name: String, + pub open_span_names: HashMap, pub age: Duration, pub open_spans: usize, } @@ -45,12 +45,20 @@ impl AbandonedTracesRegistry { .register_root_span(trace_id); } + pub fn register_span_sampling(&self, trace_id: [u8; 16], name: String) { + self.shards + .write_shard(trace_id) + .register_span_sampling(trace_id, name); + } + pub fn register_span(&self, trace_id: [u8; 16]) { self.shards.write_shard(trace_id).register_span(trace_id); } - pub fn finish_span(&self, trace_id: [u8; 16]) { - self.shards.write_shard(trace_id).finish_span(trace_id); + pub fn finish_span(&self, trace_id: [u8; 16], name: &str) { + self.shards + .write_shard(trace_id) + .finish_span(trace_id, name); } pub fn iter_open_traces(&self) -> impl Iterator + use<'_> { @@ -67,7 +75,7 @@ impl AbandonedTracesRegistry { let age: Duration = now.checked_duration_since(trace.start_ts)?; Some(OldTrace { tid: u128::from_be_bytes(*tid), - root_span_name: trace.name.clone(), + open_span_names: trace.open_span_names.clone(), age, open_spans: trace.open_spans, }) @@ -93,7 +101,7 @@ impl AbandonedTracesRegistry { } Some(OldTrace { tid: u128::from_be_bytes(*tid), - root_span_name: trace.name.clone(), + open_span_names: trace.open_span_names.clone(), age, open_spans: trace.open_spans, }) @@ -110,14 +118,11 @@ struct InnerAbandonedTracesRegistry { impl InnerAbandonedTracesRegistry { fn register_root_span_sampling(&mut self, trace_id: [u8; 16], name: String) { - self.traces - .entry(trace_id) - .or_insert(TraceInfo { - open_spans: 0, - name, - start_ts: Instant::now(), - }) - .open_spans += 1; + self.traces.entry(trace_id).or_insert_with(|| TraceInfo { + open_spans: 1, + open_span_names: HashMap::from_iter([(name, 1)]), + start_ts: Instant::now(), + }); } fn register_root_span(&mut self, trace_id: [u8; 16]) { @@ -126,27 +131,53 @@ impl InnerAbandonedTracesRegistry { }; e.insert(TraceInfo { open_spans: 1, - name: "unknown_name".to_string(), + open_span_names: HashMap::new(), start_ts: Instant::now(), }); } + fn register_span_sampling(&mut self, trace_id: [u8; 16], name: String) { + let c = self + .traces + .entry(trace_id) + .or_insert(TraceInfo { + open_spans: 0, + start_ts: Instant::now(), + open_span_names: HashMap::new(), + }) + .open_span_names + .entry(name) + .or_default(); + *c += 1; + } + fn register_span(&mut self, trace_id: [u8; 16]) { self.traces .entry(trace_id) .or_insert(TraceInfo { open_spans: 0, - name: "".to_string(), + open_span_names: HashMap::new(), start_ts: Instant::now(), }) .open_spans += 1; } - fn finish_span(&mut self, trace_id: [u8; 16]) { + fn finish_span(&mut self, trace_id: [u8; 16], name: &str) { + dbg!("finish span"); let Entry::Occupied(mut e) = self.traces.entry(trace_id) else { return; }; let trace = e.get_mut(); + if *trace + .open_span_names + .entry_ref(name) + .and_modify(|c| *c = c.saturating_sub(1)) + .or_default() + == 0 + { + trace.open_span_names.remove(name); + }; + dbg!(name, &trace.open_span_names); trace.open_spans -= 1; if trace.open_spans == 0 { e.remove(); @@ -175,13 +206,13 @@ mod tests { let trace_id = [1; 16]; registry.register_root_span_sampling(trace_id, "root_span".to_owned()); registry.register_local_root_span(trace_id); - for _ in 0..16 { + for i in 0..16 { registry.register_span(trace_id); - registry.finish_span(trace_id); + registry.finish_span(trace_id, &i.to_string()); } assert_eq!(active_traces(®istry), 1); - registry.finish_span(trace_id); + registry.finish_span(trace_id, "root_span"); assert_eq!(active_traces(®istry), 0); } @@ -200,32 +231,42 @@ mod tests { registry.register_root_span_sampling(trace_id, format!("root_span_{}", 3)); registry.register_local_root_span(trace_id); - let old_traces = registry - .iter_old_traces(Duration::from_millis(10)) - .map(|t| (t.tid, t.root_span_name, t.open_spans)) - .collect::>(); + let collect_old_traces = || { + registry + .iter_old_traces(Duration::from_millis(10)) + .map(|t| { + ( + t.tid, + t.open_span_names + .iter() + .map(|(k, v)| (k.to_owned(), *v)) + .collect::>(), + t.open_spans, + ) + }) + .collect::>() + }; + + let old_traces = collect_old_traces(); assert_eq!(active_traces(®istry), 3); assert_eq!( old_traces, HashSet::from_iter([ - (1, "root_span_1".to_string(), 1), - (2, "root_span_2".to_string(), 1), + (1, vec![("root_span_1".to_owned(), 1)], 1), + (2, vec![("root_span_2".to_owned(), 1)], 1), ]) ); for i in 1..=2 { let trace_id = (i as u128).to_be_bytes(); - registry.finish_span(trace_id); + registry.finish_span(trace_id, &format!("root_span_{}", 3)); } thread::sleep(Duration::from_millis(50)); - let old_traces = registry - .iter_old_traces(Duration::from_millis(10)) - .map(|t| (t.tid, t.root_span_name, t.open_spans)) - .collect::>(); + let old_traces = collect_old_traces(); assert_eq!(active_traces(®istry), 1); assert_eq!( old_traces, - HashSet::from_iter([(3, "root_span_3".to_string(), 1),]) + HashSet::from_iter([(3, vec![("root_span_3".to_owned(), 1)], 1),]) ); } } diff --git a/datadog-opentelemetry/src/sampler.rs b/datadog-opentelemetry/src/sampler.rs index d9363389..6fa0da31 100644 --- a/datadog-opentelemetry/src/sampler.rs +++ b/datadog-opentelemetry/src/sampler.rs @@ -165,6 +165,11 @@ impl ShouldSample for Sampler { } RegisterTracePropagationResult::New => {} } + } else { + self.trace_registry.register_span_sampling( + trace_id.to_bytes(), + self.cfg.trace_debug_open_spans().then(|| name.to_string()), + ) } opentelemetry::trace::SamplingResult { diff --git a/datadog-opentelemetry/src/span_processor.rs b/datadog-opentelemetry/src/span_processor.rs index 7e32359f..569c3bb8 100644 --- a/datadog-opentelemetry/src/span_processor.rs +++ b/datadog-opentelemetry/src/span_processor.rs @@ -356,6 +356,12 @@ impl TraceRegistry { inner.register_local_root_span(trace_id, root_span_id); } + pub fn register_span_sampling(&self, trace_id: [u8; 16], span_name: Option) { + self.abandoned_spans + .as_ref() + .and_then(|a| Some(a.register_span_sampling(trace_id, span_name?))); + } + /// Register a new span with the given trace ID and span ID. pub fn register_span( &self, @@ -376,7 +382,7 @@ impl TraceRegistry { /// flush fn finish_span(&self, trace_id: [u8; 16], span_data: SpanData) -> Option { if let Some(a) = self.abandoned_spans.as_ref() { - a.finish_span(trace_id) + a.finish_span(trace_id, &span_data.name) } let mut inner = self.shards.write_shard(trace_id); diff --git a/datadog-opentelemetry/src/spans_metrics.rs b/datadog-opentelemetry/src/spans_metrics.rs index b600f2ba..d2493cd6 100644 --- a/datadog-opentelemetry/src/spans_metrics.rs +++ b/datadog-opentelemetry/src/spans_metrics.rs @@ -1,7 +1,7 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use std::{sync::Arc, time::Duration}; +use std::{fmt, sync::Arc, time::Duration}; use dd_trace::{ utils::{ShutdownSignaler, WorkerError, WorkerHandle}, @@ -91,11 +91,11 @@ impl TelemetryMetricsCollector { for t in self.registry.iter_lost_traces().take(100) { // Log at most 100 traces dd_trace::dd_warn!( - "lost trace not finished during shutdown trace_id={} root_name={} age={}ms open_spans={}", + "lost trace not finished during shutdown trace_id={} age={}ms open_spans={} open_span_names={} ", t.tid, - t.root_span_name.as_str(), t.age.as_millis(), - t.open_spans + t.open_spans, + SpanNamesDisplay(&t.open_span_names), ) } } @@ -105,11 +105,11 @@ impl TelemetryMetricsCollector { // Log at most 100 traces for t in self.registry.iter_old_traces(min_age).take(100) { dd_trace::dd_warn!( - "possibly abandoned trace trace_id={} root_name={} age={}ms open_spans={}", + "possibly abandoned trace trace_id={} age={}ms open_spans={} open_span_names={} ", t.tid, - t.root_span_name.as_str(), t.age.as_millis(), - t.open_spans + t.open_spans, + SpanNamesDisplay(&t.open_span_names), ) } } @@ -145,3 +145,15 @@ impl TelemetryMetricsCollector { ]); } } + +struct SpanNamesDisplay<'a>(&'a hashbrown::HashMap); + +impl fmt::Display for SpanNamesDisplay<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + for (k, v) in self.0.iter() { + write!(f, "({},{}),", k, v)?; + } + write!(f, "]") + } +} diff --git a/datadog-opentelemetry/tests/integration_tests/opentelemetry_api.rs b/datadog-opentelemetry/tests/integration_tests/opentelemetry_api.rs index db294b3a..a802ff4a 100644 --- a/datadog-opentelemetry/tests/integration_tests/opentelemetry_api.rs +++ b/datadog-opentelemetry/tests/integration_tests/opentelemetry_api.rs @@ -1,14 +1,16 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use std::thread; +use std::time::Duration; use std::{collections::HashMap, ops::Deref, sync::Arc}; use datadog_opentelemetry::make_test_tracer; use dd_trace::configuration::{SamplingRuleConfig, TracePropagationStyle}; use opentelemetry::global::ObjectSafeSpan; use opentelemetry::trace::{ - SamplingDecision, SamplingResult, SpanBuilder, TraceContextExt, TraceState, Tracer, - TracerProvider, + mark_span_as_active, SamplingDecision, SamplingResult, SpanBuilder, TraceContextExt, + TraceState, Tracer, TracerProvider, }; use opentelemetry::Context; @@ -306,3 +308,52 @@ async fn test_tracing_disabled() { }) .await } + +#[tokio::test] +async fn test_debug_open_spans() { + const SESSION_NAME: &str = "opentelemetry_api/test_debug_open_spans"; + let mut cfg = dd_trace::Config::builder(); + cfg.set_log_level_filter(dd_trace::log::LevelFilter::Debug) + .set_trace_debug_open_spans(true) + .set_trace_debug_open_spans_timeout(Duration::from_millis(1)) + .__internal_set_span_metrics_interval(Duration::from_millis(100)); + let _logger_guard = dd_trace::log::test_logger::activate_test_logger(); + with_test_agent_session(SESSION_NAME, cfg, |_, tracer_provider, _, _| { + let tracer = tracer_provider.tracer("test_debug_open_spans"); + let _child_span_1; + let child_span_2; + + // leak a span + { + let _root = mark_span_as_active(tracer.start("root_span")); + _child_span_1 = tracer.start("child_span"); + child_span_2 = tracer.start("child_span"); + } + thread::sleep(Duration::from_millis(300)); + + let test_logs = dd_trace::log::test_logger::take_test_logs().unwrap(); + let abandoned_logs = test_logs + .iter() + .filter(|(lvl, msg)| { + *lvl == dd_trace::log::Level::Warn + && msg.contains("possibly abandoned trace") + && msg.contains("open_span_names=") + && msg.contains("[(child_span,2),]") + }) + .collect::>(); + assert!(!abandoned_logs.is_empty()); + std::mem::forget(child_span_2); + }) + .await; + + let test_logs = dd_trace::log::test_logger::take_test_logs().unwrap(); + let abandoned_logs = test_logs + .iter() + .filter(|(lvl, msg)| { + *lvl == dd_trace::log::Level::Warn + && msg.contains("lost trace not finished during shutdown") + && msg.contains("[(child_span,1),]") + }) + .collect::>(); + assert!(!abandoned_logs.is_empty()) +} diff --git a/datadog-opentelemetry/tests/integration_tests/tracing_api.rs b/datadog-opentelemetry/tests/integration_tests/tracing_api.rs index dfc56617..feaf3a7c 100644 --- a/datadog-opentelemetry/tests/integration_tests/tracing_api.rs +++ b/datadog-opentelemetry/tests/integration_tests/tracing_api.rs @@ -25,7 +25,7 @@ async fn test_smoke() { let span = tracing::trace_span!("test_span", _sampling_priority_v1 = 2); span.in_scope(|| { { - tracing::trace_span!("child_span_1") + tracing::trace_span!("child_span_1"); }; { tracing::trace_span!("child_span_2") @@ -90,7 +90,7 @@ async fn test_remote_span_extraction_propagation() { async fn test_debug_open_spans() { const SESSION_NAME: &str = "tracing_api/test_debug_open_spans"; let mut cfg = dd_trace::Config::builder(); - cfg.set_log_level_filter(dd_trace::log::LevelFilter::Off) + cfg.set_log_level_filter(dd_trace::log::LevelFilter::Debug) .set_trace_debug_open_spans(true) .set_trace_debug_open_spans_timeout(Duration::from_millis(1)) .__internal_set_span_metrics_interval(Duration::from_millis(100)); @@ -98,13 +98,22 @@ async fn test_debug_open_spans() { with_test_agent_session(SESSION_NAME, cfg, |_, tracer_provider, _, _| { let subscriber = tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer()) - .with(tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("test"))); + .with( + tracing_opentelemetry::layer() + .with_context_activation(true) + .with_tracer(tracer_provider.tracer("test")), + ); let _guard = subscriber.set_default(); + let _child_span_1; + let child_span_2; // leak a span - let _root_span = tracing::trace_span!("root_span").entered(); - std::mem::forget(tracing::trace_span!("child_span")); - thread::sleep(Duration::from_millis(200)); + { + let _root_span = tracing::trace_span!("root_span").entered(); + _child_span_1 = tracing::trace_span!("child_span").entered().exit(); + child_span_2 = tracing::trace_span!("child_span").entered().exit(); + } + thread::sleep(Duration::from_millis(300)); let test_logs = dd_trace::log::test_logger::take_test_logs().unwrap(); let abandoned_logs = test_logs @@ -112,10 +121,13 @@ async fn test_debug_open_spans() { .filter(|(lvl, msg)| { *lvl == dd_trace::log::Level::Warn && msg.contains("possibly abandoned trace") - && msg.contains("root_name=root_span") + && msg.contains("open_span_names=") + && msg.contains("(root_span,1)") + && msg.contains("(child_span,2)") }) .collect::>(); - assert!(!abandoned_logs.is_empty()) + assert!(!abandoned_logs.is_empty()); + std::mem::forget(child_span_2); }) .await; @@ -125,7 +137,8 @@ async fn test_debug_open_spans() { .filter(|(lvl, msg)| { *lvl == dd_trace::log::Level::Warn && msg.contains("lost trace not finished during shutdown") - && msg.contains("root_name=root_span") + && msg.contains("(root_span,1)") + && msg.contains("(child_span,1)") }) .collect::>(); assert!(!abandoned_logs.is_empty()) From 0c256897ce1c52ba1fada1e67eac1df43b65ad4d Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 14 Nov 2025 15:24:43 +0100 Subject: [PATCH 5/5] fix: remove debug prints --- datadog-opentelemetry/src/abandoned_traces.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datadog-opentelemetry/src/abandoned_traces.rs b/datadog-opentelemetry/src/abandoned_traces.rs index 0b9617b3..1b787ef3 100644 --- a/datadog-opentelemetry/src/abandoned_traces.rs +++ b/datadog-opentelemetry/src/abandoned_traces.rs @@ -163,7 +163,6 @@ impl InnerAbandonedTracesRegistry { } fn finish_span(&mut self, trace_id: [u8; 16], name: &str) { - dbg!("finish span"); let Entry::Occupied(mut e) = self.traces.entry(trace_id) else { return; }; @@ -177,7 +176,6 @@ impl InnerAbandonedTracesRegistry { { trace.open_span_names.remove(name); }; - dbg!(name, &trace.open_span_names); trace.open_spans -= 1; if trace.open_spans == 0 { e.remove();