Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, R:
None,
);

let res = self.send_trace_chunks_inner(traces).await?;
let res = self.send_trace_chunks_inner(traces, false).await?;
if matches!(&res, AgentResponse::Changed { body } if body.is_empty()) {
return Err(TraceExporterError::Agent(
error::AgentErrorKind::EmptyResponse,
Expand Down Expand Up @@ -512,6 +512,10 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, R:

/// Send a list of trace chunks to the agent (or OTLP endpoint when configured).
///
/// Over-long span string fields are truncated before processing when `T::Text` is
/// [`libdd_tinybytes::BytesString`] (the standard owned-span type); custom `SpanText`
/// implementations that do not override `maybe_truncate` will silently skip truncation.
///
/// Sync facade over [`Self::send_trace_chunks_async`]; panics inside an existing
/// tokio context.
///
Expand Down Expand Up @@ -553,6 +557,10 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, R:

/// Send a list of trace chunks to the agent, asynchronously (or OTLP when configured).
///
/// Over-long span string fields are truncated before processing when `T::Text` is
/// [`libdd_tinybytes::BytesString`] (the standard owned-span type); custom `SpanText`
/// implementations that do not override `maybe_truncate` will silently skip truncation.
///
/// # Arguments
/// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans.
///
Expand All @@ -564,7 +572,7 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, R:
trace_chunks: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError> {
self.check_agent_info().await;
self.send_trace_chunks_inner(trace_chunks).await
self.send_trace_chunks_inner(trace_chunks, true).await
}

/// Sends trace chunks via OTLP HTTP (JSON or protobuf) when OTLP config is enabled.
Expand Down Expand Up @@ -658,9 +666,18 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, R:
async fn send_trace_chunks_inner<T: TraceData>(
&self,
mut traces: Vec<Vec<Span<T>>>,
truncate: bool,
) -> Result<AgentResponse, TraceExporterError> {
let mut header_tags: TracerHeaderTags = self.metadata.borrow().into();

// Truncate over-long string fields before any downstream processing so that stats,
// serialisation, and the OTLP path all operate on the same normalised payload.
// Skipped on the msgpack path (`send`/`send_async`) where the tracer is responsible
// for enforcing field-length limits before encoding.
if truncate {
libdd_trace_utils::span::trace_utils::truncate_span_strings(&mut traces);
}

// Process stats computation and drop non-sampled (p0) chunks.
// This must run before the OTLP path so that unsampled spans are not exported.
stats::process_traces_for_stats(
Expand Down Expand Up @@ -2126,6 +2143,50 @@ mod tests {
);
mock_otlp.assert();
}

// Documents the `truncate=false` path: spans decoded from msgpack via from_slice
// have T::Text = &str, for which truncate_span_strings is a no-op. This proves
// that send/send_async correctly leaves over-long fields unchanged.
//
// Note: there is no integration test that exercises send_trace_chunks_inner with
// truncate=true through the full send_trace_chunks_async call chain. The unit
// tests in trace_utils.rs prove that truncate_span_strings works on BytesData
// spans, and the trace_serializer round-trip tests verify that truncated data
// survives encoding, but neither goes through send_trace_chunks_inner itself.

use libdd_trace_utils::span::trace_utils::MAX_SPAN_STRING_LEN;

/// send_async decodes via from_slice (&str spans); truncate_span_strings is a
/// no-op on &str, so over-long fields pass through unchanged (tracer's responsibility).
#[test]
fn test_send_async_does_not_truncate_over_long_fields() {
let over_limit: String = std::iter::repeat_n('b', MAX_SPAN_STRING_LEN + 1).collect();
let span = SpanBytes {
resource: BytesString::from_string(over_limit),
name: BytesString::from_slice(b"op").unwrap(),
service: BytesString::from_slice(b"svc").unwrap(),
span_id: 1,
trace_id: 1,
start: 1_000_000,
duration: 1_000,
..Default::default()
};
let payload = libdd_trace_utils::msgpack_encoder::v04::to_vec(&[vec![span]]);

// Decode via from_slice — produces SpanSlice<'_> where T::Text = &str.
let (mut traces, _) =
libdd_trace_utils::msgpack_decoder::v04::from_slice(&payload).unwrap();

// truncate_span_strings is a no-op for &str spans regardless of the truncate
// flag; calling it here proves the no-op property directly.
libdd_trace_utils::span::trace_utils::truncate_span_strings(&mut traces);

assert_eq!(
traces[0][0].resource.chars().count(),
MAX_SPAN_STRING_LEN + 1,
"send_async must not truncate — tracer is responsible for field-length limits"
);
}
}

#[cfg(test)]
Expand Down
170 changes: 170 additions & 0 deletions libdd-data-pipeline/src/trace_exporter/trace_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,4 +460,174 @@ mod tests {
assert!(!headers.contains_key("datadog-client-computed-stats"));
assert!(headers.contains_key("datadog-client-computed-top-level"));
}

// -----------------------------------------------------------------------
// Truncation end-to-end regression tests
//
// These tests verify that over-long string fields survive the full
// truncate → encode → decode round-trip correctly. They mirror the
// dd-trace-py snapshot tests for
// `test_encode_span_with_large_string_attributes` (ASCII) and
// `test_encode_span_with_large_unicode_string_attributes` (multi-byte).
// -----------------------------------------------------------------------

use libdd_trace_utils::span::trace_utils::{
truncate_span_strings, MAX_SPAN_STRING_LEN, TRUNCATED_SPAN_STRING_LEN,
};

const TRUNCATION_SUFFIX: &str = "<truncated>...";

fn long_bytes_string(c: char, n: usize) -> BytesString {
BytesString::from_string(std::iter::repeat_n(c, n).collect())
}

/// Build a span whose `resource`, one meta key, and one meta value are
/// each at interesting boundary lengths, matching the dd-trace-py snapshot
/// test fixture:
/// - name: 25 000 'a' chars → exactly at the limit → NOT truncated
/// - resource: 25 001 'b' chars → one over the limit → truncated to 2 500
/// - meta key: 25 001 'c' chars → truncated to 2 500
/// - meta value: 2 000 'd' chars → well under limit → unchanged
fn create_large_string_span() -> SpanBytes {
SpanBytes {
name: long_bytes_string('a', MAX_SPAN_STRING_LEN),
resource: long_bytes_string('b', MAX_SPAN_STRING_LEN + 1),
service: BytesString::from_slice(b"svc").unwrap(),
meta: vec![(
long_bytes_string('c', MAX_SPAN_STRING_LEN + 1),
long_bytes_string('d', 2_000),
)]
.into(),
span_id: 1,
trace_id: 1,
start: 1_000_000,
duration: 1_000,
..Default::default()
}
}

fn assert_truncation_invariants(span: &libdd_trace_utils::span::v04::SpanBytes) {
// name at exactly the limit — must be unchanged
assert_eq!(
span.name.as_str().chars().count(),
MAX_SPAN_STRING_LEN,
"name should not be truncated"
);

// resource one over the limit — must be truncated
assert_eq!(
span.resource.as_str().chars().count(),
TRUNCATED_SPAN_STRING_LEN,
"resource should be truncated to {TRUNCATED_SPAN_STRING_LEN}"
);
assert!(
span.resource.as_str().ends_with(TRUNCATION_SUFFIX),
"truncated resource must end with the suffix"
);

// meta: key was over the limit, value was under
let (k, v) = span.meta.iter().next().expect("meta should be non-empty");
assert_eq!(
k.as_str().chars().count(),
TRUNCATED_SPAN_STRING_LEN,
"meta key should be truncated"
);
assert!(k.as_str().ends_with(TRUNCATION_SUFFIX));
assert_eq!(
v.as_str().chars().count(),
2_000,
"meta value under limit must be unchanged"
);
}

#[test]
fn test_truncation_survives_v04_encode_decode_round_trip() {
let serializer = TraceSerializer::new();
let mut traces = vec![vec![create_large_string_span()]];

truncate_span_strings(&mut traces);

let payload = serializer
.collect_and_process_traces(traces, TraceExporterOutputFormat::V04)
.unwrap();
let serialized = serializer
.serialize_payload(&payload, &TracerMetadata::default())
.unwrap();

let (decoded, _) =
libdd_trace_utils::msgpack_decoder::v04::from_slice(&serialized).unwrap();
assert_eq!(decoded.len(), 1);
assert_eq!(decoded[0].len(), 1);

// Decoded spans use &str (SliceData); re-check lengths via char count.
let span = &decoded[0][0];
assert_eq!(span.name.chars().count(), MAX_SPAN_STRING_LEN);
assert_eq!(span.resource.chars().count(), TRUNCATED_SPAN_STRING_LEN);
assert!(span.resource.ends_with(TRUNCATION_SUFFIX));
let (k, v) = span.meta.iter().next().unwrap();
assert_eq!(k.chars().count(), TRUNCATED_SPAN_STRING_LEN);
assert_eq!(v.chars().count(), 2_000);
}

#[test]
fn test_truncation_survives_v05_encode_decode_round_trip() {
let serializer = TraceSerializer::new();
let mut traces = vec![vec![create_large_string_span()]];

truncate_span_strings(&mut traces);

// Verify truncation happened in memory before we encode.
assert_truncation_invariants(&traces[0][0]);

let payload = serializer
.collect_and_process_traces(traces, TraceExporterOutputFormat::V05)
.unwrap();
let serialized = serializer
.serialize_payload(&payload, &TracerMetadata::default())
.unwrap();

let (decoded, _) =
libdd_trace_utils::msgpack_decoder::v05::from_slice(&serialized).unwrap();
assert_eq!(decoded.len(), 1);
assert_eq!(decoded[0].len(), 1);

let span = &decoded[0][0];
assert_eq!(span.name.chars().count(), MAX_SPAN_STRING_LEN);
assert_eq!(span.resource.chars().count(), TRUNCATED_SPAN_STRING_LEN);
assert!(span.resource.ends_with(TRUNCATION_SUFFIX));
let (k, v) = span.meta.iter().next().unwrap();
assert_eq!(k.chars().count(), TRUNCATED_SPAN_STRING_LEN);
assert_eq!(v.chars().count(), 2_000);
}

#[test]
fn test_truncation_unicode_survives_v04_encode_decode_round_trip() {
// Each '€' is 3 bytes; 25 001 euros → truncated to 2 500 code points.
let serializer = TraceSerializer::new();
let mut traces = vec![vec![SpanBytes {
name: long_bytes_string('€', MAX_SPAN_STRING_LEN + 1),
resource: BytesString::from_slice(b"r").unwrap(),
service: BytesString::from_slice(b"svc").unwrap(),
span_id: 1,
trace_id: 1,
start: 1_000_000,
duration: 1_000,
..Default::default()
}]];

truncate_span_strings(&mut traces);

let payload = serializer
.collect_and_process_traces(traces, TraceExporterOutputFormat::V04)
.unwrap();
let serialized = serializer
.serialize_payload(&payload, &TracerMetadata::default())
.unwrap();

let (decoded, _) =
libdd_trace_utils::msgpack_decoder::v04::from_slice(&serialized).unwrap();
let name = decoded[0][0].name;
assert_eq!(name.chars().count(), TRUNCATED_SPAN_STRING_LEN);
assert!(name.ends_with(TRUNCATION_SUFFIX));
}
}
51 changes: 51 additions & 0 deletions libdd-trace-utils/src/span/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,69 @@ use std::{fmt, ptr};
/// from a static str and check if the string is empty.
pub trait SpanText: Debug + Eq + Hash + Borrow<str> + Serialize + Default {
fn from_static_str(value: &'static str) -> Self;

/// If `self` exceeds `max_chars` Unicode code points, return a new value consisting of the
/// first `result_chars - suffix.chars().count()` code points followed by `suffix`; otherwise
/// return `self` unchanged.
///
/// Implementations that cannot allocate (e.g. `&str`) return `self` unmodified.
fn maybe_truncate(self, max_chars: usize, result_chars: usize, suffix: &str) -> Self {
// Default: no allocation possible, so return unchanged.
// Implementations that own their storage (e.g. `BytesString`) should override this.
let _ = (max_chars, result_chars, suffix);
self
}
Comment thread
brettlangdon marked this conversation as resolved.
}

impl SpanText for &str {
fn from_static_str(value: &'static str) -> Self {
value
}
// maybe_truncate uses the default (no-op): &str is borrowed and cannot allocate.
Comment thread
brettlangdon marked this conversation as resolved.
// The only path that produces &str spans is the zero-copy msgpack decoder
// (SpanSlice / SliceData), whose callers enforce length limits upstream.
}

impl SpanText for BytesString {
fn from_static_str(value: &'static str) -> Self {
BytesString::from_static(value)
}

fn maybe_truncate(self, max_chars: usize, result_chars: usize, suffix: &str) -> Self {
let s = self.as_str();
// Fast path: UTF-8 byte length >= char count, so byte length within limit ⇒ chars fit.
if s.len() <= max_chars {
return self;
}
// Single pass: find the byte offset of char `keep_chars` and count total chars together,
// avoiding a separate O(n) `chars().count()` scan followed by another `char_indices()`
// walk.
let suffix_chars = suffix.chars().count();
debug_assert!(
result_chars >= suffix_chars,
"result_chars ({result_chars}) must be >= suffix length ({suffix_chars})"
);
let keep_chars = result_chars.saturating_sub(suffix_chars);
let mut keep_byte_end = None;
let mut total_chars = 0usize;
for (byte_pos, _) in s.char_indices() {
if total_chars == keep_chars {
keep_byte_end = Some(byte_pos);
}
total_chars += 1;
if total_chars > max_chars {
break;
}
}
if total_chars <= max_chars {
return self;
}
let end = keep_byte_end.unwrap_or(s.len());
let mut truncated = String::with_capacity(end + suffix.len());
truncated.push_str(&s[..end]);
truncated.push_str(suffix);
BytesString::from_string(truncated)
}
}

pub trait SpanBytes: Debug + Eq + Hash + Borrow<[u8]> + Serialize + Default {
Expand Down
Loading
Loading