Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datadog-ipc/src/shm_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use libdd_trace_stats::span_concentrator::{FixedAggregationKey, FlushableConcent

use crate::platform::{FileBackedHandle, MappedMem, NamedShmHandle};

const SHM_VERSION: u32 = 1;
const SHM_VERSION: u32 = 2;

/// Maximum peer-tag (key, value) pairs per aggregation slot.
pub const MAX_PEER_TAGS: usize = 16;
Expand Down
19 changes: 2 additions & 17 deletions libdd-data-pipeline/src/otlp/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn map_stats_to_otlp_metrics(
continue;
};
data_points.push(json!({
"attributes": build_attributes(group, &exact.grpc_method, is_error, resource_info, otel_trace_semantics_enabled),
"attributes": build_attributes(group, is_error, resource_info, otel_trace_semantics_enabled),
"startTimeUnixNano": b.bucket.start.to_string(),
"timeUnixNano": end.to_string(),
"count": cell.count.to_string(),
Expand Down Expand Up @@ -120,7 +120,6 @@ fn sketch_bucket_counts(sketch: &DDSketch) -> Vec<String> {

fn build_attributes(
group: &pb::ClientGroupedStats,
grpc_method: &str,
is_error: bool,
resource_info: &OtlpResourceInfo,
otel_trace_semantics_enabled: bool,
Expand All @@ -146,7 +145,6 @@ fn build_attributes(
push("span.kind", &group.span_kind);
push("http.request.method", &group.http_method);
push("http.route", &group.http_endpoint);
push("rpc.method", grpc_method);
push("rpc.response.status_code", &group.grpc_status_code);
for tag in &group.peer_tags {
if let Some((k, v)) = tag.split_once(':') {
Expand Down Expand Up @@ -346,22 +344,10 @@ mod tests {
OtlpExactGroup {
ok: cell(ok_ns),
error: cell(err_ns),
grpc_method: String::new(),
},
)
}

fn group_pair_with_grpc(
ok_ns: &[u64],
err_ns: &[u64],
grpc_method: &str,
customize: impl FnOnce(&mut pb::ClientGroupedStats),
) -> (pb::ClientGroupedStats, OtlpExactGroup) {
let (g, mut e) = group_with_exact(ok_ns, err_ns, customize);
e.grpc_method = grpc_method.into();
(g, e)
}

fn buckets(groups: Vec<(pb::ClientGroupedStats, OtlpExactGroup)>) -> Vec<OtlpStatsBucket> {
let (stats, exact): (Vec<_>, Vec<_>) = groups.into_iter().unzip();
vec![OtlpStatsBucket {
Expand Down Expand Up @@ -443,7 +429,7 @@ mod tests {

#[test]
fn data_point_attributes_and_otel_strip() {
let g_pair = group_pair_with_grpc(&[1_000_000_000], &[], "/pkg.Svc/Method", |g| {
let g_pair = group_with_exact(&[1_000_000_000], &[], |g| {
g.http_status_code = 404;
g.http_method = "POST".into();
g.http_endpoint = "/users/:id".into();
Expand All @@ -466,7 +452,6 @@ mod tests {
assert_eq!(str_at(a, "http.request.method"), Some("POST"));
assert_eq!(str_at(a, "http.route"), Some("/users/:id"));
assert!(a.iter().any(|kv| kv["key"] == "http.response.status_code"));
assert_eq!(str_at(a, "rpc.method"), Some("/pkg.Svc/Method"));
assert_eq!(str_at(a, "datadog.operation.name"), Some("test.op"));
assert_eq!(str_at(a, "datadog.span.type"), Some("web"));
assert_eq!(str_at(a, "datadog.origin"), Some("synthetics"));
Expand Down
43 changes: 3 additions & 40 deletions libdd-trace-stats/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const GRPC_STATUS_CODE_FIELD: &[&str] = &[
"rpc.grpc.status.code",
"grpc.status.code",
];
const GRPC_METHOD_FIELD: &[&str] = &["grpc.method.name", "rpc.method"];

/// Aggregation key fields shared across all concentrator implementations — everything
/// **except** peer tags.
Expand Down Expand Up @@ -153,17 +152,6 @@ fn get_grpc_status_code<'a>(span: &'a impl StatSpan<'a>) -> Option<u8> {
None
}

pub(super) fn get_grpc_method<'a>(span: &'a impl StatSpan<'a>) -> &'a str {
for key in GRPC_METHOD_FIELD {
if let Some(val) = span.get_meta(key) {
if !val.is_empty() {
return val;
}
}
}
""
}

fn grpc_status_str_to_int_value(v: &str) -> Option<u8> {
if let Ok(status) = v.parse() {
return Some(status);
Expand Down Expand Up @@ -262,7 +250,6 @@ impl<'a> BorrowedAggregationKey<'a> {
};

let grpc_status_code = get_grpc_status_code(span);

let service_source = span.get_meta(TAG_SVC_SRC).unwrap_or_default();

Self {
Expand Down Expand Up @@ -344,9 +331,6 @@ pub(super) struct GroupedStats {
error_duration: u64,
error_min: u64,
error_max: u64,
// gRPC method for OTLP export only; not part of the aggregation key so agent stats are
// unaffected.
pub(super) grpc_method: String,
Comment thread
mabdinur marked this conversation as resolved.
}

impl GroupedStats {
Expand Down Expand Up @@ -390,14 +374,11 @@ pub struct OtlpExactCell {
}

/// Exact OK/ERROR cells for one aggregation group, in the same order as the `stats` vector
/// of the accompanying [`pb::ClientStatsBucket`]. `grpc_method` is the group's gRPC method (DD
/// schema `grpc.method.name`) carried out-of-band so it does not appear in the agent stats
/// protobuf wire format.
/// of the accompanying [`pb::ClientStatsBucket`].
#[derive(Debug, Clone, Default)]
pub struct OtlpExactGroup {
pub ok: OtlpExactCell,
pub error: OtlpExactCell,
pub grpc_method: String,
}

/// A bucket flushed for the OTLP trace-metrics path. `exact[i]` is the exact-scalar sidecar
Expand Down Expand Up @@ -433,14 +414,10 @@ impl StatsBucket {
duration: i64,
is_error: bool,
is_top_level: bool,
grpc_method: &str,
) {
self.data
.entry_ref(&key)
.or_insert_with(|| GroupedStats {
grpc_method: grpc_method.to_owned(),
..Default::default()
})
.or_default()
.insert(duration, is_error, is_top_level);
}

Expand All @@ -455,8 +432,7 @@ impl StatsBucket {
pub(super) fn flush_with_otlp_exact(self, bucket_duration: u64) -> OtlpStatsBucket {
let mut stats = Vec::with_capacity(self.data.len());
let mut exact = Vec::with_capacity(self.data.len());
for (k, mut g) in self.data {
let grpc_method = std::mem::take(&mut g.grpc_method);
for (k, g) in self.data {
exact.push(OtlpExactGroup {
ok: OtlpExactCell {
count: g.hits.saturating_sub(g.errors),
Expand All @@ -470,7 +446,6 @@ impl StatsBucket {
min_ns: g.error_min,
max_ns: g.error_max,
},
grpc_method,
});
stats.push(encode_grouped_stats(k, g));
}
Expand Down Expand Up @@ -834,18 +809,6 @@ mod tests {
}
.into_key(),
),
// grpc.method.name is carried in GroupedStats (for OTLP), not in the aggregation key.
(
SpanBytes {
meta: vec![("grpc.method.name".into(), "/pkg.Svc/Method".into())].into(),
..Default::default()
},
FixedAggregationKey {
is_trace_root: true,
..Default::default()
}
.into_key(),
),
// Span with grpc status from meta as numeric string
(
SpanBytes {
Expand Down
3 changes: 1 addition & 2 deletions libdd-trace-stats/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use libdd_trace_protobuf::pb;
use aggregation::StatsBucket;

mod aggregation;
use aggregation::{get_grpc_method, BorrowedAggregationKey};
use aggregation::BorrowedAggregationKey;
pub use aggregation::{FixedAggregationKey, OtlpExactCell, OtlpExactGroup, OtlpStatsBucket};

pub mod stat_span;
Expand Down Expand Up @@ -184,7 +184,6 @@ impl SpanConcentrator {
span.duration(),
span.is_error(),
span.has_top_level(),
get_grpc_method(span),
);
}

Expand Down
Loading