Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
87864c0
build(data-pipeline): add tonic 0.14 for gRPC OTLP transport
bm1549 Jun 26, 2026
de5097a
feat(data-pipeline): add OtlpProtocol::Grpc variant and OtlpGrpcTrace…
bm1549 Jun 26, 2026
646e19b
feat(data-pipeline): implement OtlpGrpcTransport and send_otlp_traces…
bm1549 Jun 26, 2026
27e2e5e
fix(data-pipeline): re-export gRPC transport types from otlp module
bm1549 Jun 26, 2026
9de5c10
feat(data-pipeline): add OtlpExportMode enum; dispatch to gRPC send path
bm1549 Jun 26, 2026
986f842
feat(data-pipeline): build OtlpGrpcTransport when Grpc protocol + end…
bm1549 Jun 26, 2026
335a023
feat(data-pipeline-ffi): accept 'grpc' as valid OTLP protocol in FFI
bm1549 Jun 26, 2026
6a9b1d9
fix(data-pipeline): call Grpc::ready() before unary() to satisfy towe…
bm1549 Jun 26, 2026
daf3bf1
test(data-pipeline): end-to-end gRPC OTLP trace export test with in-p…
bm1549 Jun 26, 2026
99b6290
chore(data-pipeline): remove stale allow attrs and duplicate parse tests
bm1549 Jun 26, 2026
ca48cce
fix(data-pipeline): gate gRPC OTLP transport off wasm32; stabilize mi…
bm1549 Jun 26, 2026
08cd15a
perf(data-pipeline): avoid per-export gRPC transport clone; add tests…
bm1549 Jun 26, 2026
8485e77
fix(data-pipeline): reject gRPC on wasm32; fix deadline status kind; …
bm1549 Jun 26, 2026
51af411
test(data-pipeline): make OTLP gRPC e2e test robust under CI contention
bm1549 Jun 27, 2026
83b28d2
test(data-pipeline): bound the OTLP gRPC e2e server so it cannot hang CI
bm1549 Jun 27, 2026
69aab62
test(data-pipeline): ignore the timing-fragile OTLP gRPC e2e test in CI
bm1549 Jun 27, 2026
9dcddce
fix(data-pipeline): drive the h2 connection in the OTLP gRPC e2e test…
bm1549 Jun 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 46 additions & 19 deletions libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,15 +503,17 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint(
)
}

/// Sets the OTLP export protocol. Accepts the OTel-standard values `http/json` (default) or
/// `http/protobuf`; `grpc` is rejected as not yet supported. The host language resolves the value
/// (e.g. from `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`).
/// Sets the OTLP export protocol. Accepts all three OTel-standard values:
/// - `http/json` (default) — OTLP over HTTP with JSON body
/// - `http/protobuf` — OTLP over HTTP with protobuf body
/// - `grpc` — OTLP over HTTP/2 (plaintext only; `https://` is not yet supported)
///
/// Has no effect unless an OTLP endpoint is also configured via
/// `ddog_trace_exporter_config_set_otlp_endpoint`; without one, traces are sent to the
/// Datadog agent and this protocol selection is ignored.
/// The host language resolves the value (e.g. from `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`).
///
/// Returns `None` on success, `ErrorCode::InvalidArgument` for a null config or an unaccepted
/// Has no effect unless an OTLP endpoint is configured via
/// `ddog_trace_exporter_config_set_otlp_endpoint`.
///
/// Returns `None` on success, `ErrorCode::InvalidArgument` for a null config or an unrecognized
/// value, and `ErrorCode::InvalidInput` for a non-UTF-8 string.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol(
Expand All @@ -524,9 +526,9 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol(
Ok(s) => s,
Err(e) => return Some(e),
};
// `FromStr` is the single source of truth for string -> OtlpProtocol. It accepts only
// the supported HTTP encodings (`http/json`, `http/protobuf`); `grpc` and any unknown
// value are rejected with an error, so an unsupported protocol can never be stored.
// `FromStr` is the single source of truth for string -> OtlpProtocol. It accepts all
// three OTel-standard encodings (`http/json`, `http/protobuf`, `grpc`); any unknown
// value is rejected with an error, so an unsupported protocol can never be stored.
match value.parse::<OtlpProtocol>() {
Ok(p) => {
handle.otlp_protocol = Some(p);
Expand Down Expand Up @@ -1362,14 +1364,17 @@ mod tests {
Some(OtlpProtocol::HttpProtobuf)
);

// "grpc" → InvalidArgument
// "grpc" → success (gRPC is now supported)
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from("grpc"),
);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(error);
assert_eq!(error, None);
assert_eq!(
config.as_ref().unwrap().otlp_protocol,
Some(OtlpProtocol::Grpc)
);

// Garbage value → InvalidArgument
let mut config = Some(TraceExporterConfig::default());
Expand Down Expand Up @@ -1407,17 +1412,39 @@ mod tests {
}

#[test]
fn set_otlp_protocol_rejects_grpc_and_unknown() {
let mut cfg = TraceExporterConfig::default();
for bad in ["grpc", "nonsense"] {
fn set_otlp_protocol_accepts_all_three_protocols() {
use libdd_data_pipeline::OtlpProtocol;
for (input, expected) in [
("http/json", OtlpProtocol::HttpJson),
("http/protobuf", OtlpProtocol::HttpProtobuf),
("grpc", OtlpProtocol::Grpc),
] {
let mut cfg = TraceExporterConfig::default();
let err = unsafe {
ddog_trace_exporter_config_set_otlp_protocol(Some(&mut cfg), CharSlice::from(bad))
ddog_trace_exporter_config_set_otlp_protocol(Some(&mut cfg), CharSlice::from(input))
};
assert!(err.is_some(), "expected error for {bad}");
assert_eq!(cfg.otlp_protocol, None, "{bad} must not be stored");
assert!(err.is_none(), "expected success for {input}: {err:?}");
assert_eq!(
cfg.otlp_protocol,
Some(expected),
"wrong protocol for {input}"
);
}
}

#[test]
fn set_otlp_protocol_rejects_unknown() {
let mut cfg = TraceExporterConfig::default();
let err = unsafe {
ddog_trace_exporter_config_set_otlp_protocol(
Some(&mut cfg),
CharSlice::from("nonsense"),
)
};
assert!(err.is_some(), "expected error for unknown protocol");
assert_eq!(cfg.otlp_protocol, None, "must not be stored on error");
}

#[cfg(all(feature = "catch_panic", panic = "unwind"))]
#[test]
fn catch_panic_test() {
Expand Down
17 changes: 16 additions & 1 deletion libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,17 @@ libdd-tinybytes = { version = "1.1.1", path = "../libdd-tinybytes", features = [
"bytes_string",
"serialization",
] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.23", features = ["time", "test-util"], default-features = false }
libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl", default-features = false }
# tonic's transport (hyper/tokio/socket2) does not build for wasm32, so the gRPC
# OTLP transport — and prost, which only the gRPC exporter uses — are gated off
# wasm targets along with the code that uses them.
tonic = { version = "0.14", default-features = false, features = [
"transport", # Channel, Endpoint, Server
"codegen", # Grpc<T> client, tonic::client::Grpc
] }
prost = "0.14.1"

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
Expand All @@ -63,6 +70,11 @@ name = "trace_buffer"
harness = false
path = "benches/trace_buffer.rs"

[[bench]]
name = "otlp_grpc_export"
harness = false
path = "benches/otlp_grpc_export.rs"

[dev-dependencies]
libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl" }
libdd-log = { path = "../libdd-log" }
Expand All @@ -81,8 +93,11 @@ tokio = { version = "1.23", features = [
"rt",
"time",
"test-util",
"macros",
], default-features = false }
duplicate = "2.0.1"
h2 = "0.4"
tokio-stream = { version = "0.1", features = ["net"] }

[features]
default = ["https", "telemetry"]
Expand Down
115 changes: 115 additions & 0 deletions libdd-data-pipeline/benches/otlp_grpc_export.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Benchmarks for the OTLP gRPC export hot path.
//!
//! The gRPC exporter turns native trace chunks into the length-prefixed gRPC
//! wire frame that tonic's codec puts on the socket once per export. The prost
//! protobuf encoding is shared with the HTTP/protobuf path (already covered by
//! `libdd-trace-utils/benches/otlp_encoding.rs`); these benches measure the
//! gRPC-specific framing on top of it, plus the full native-spans -> wire-frame
//! preparation across trace sizes so the per-span cost is visible.

use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion};
use libdd_trace_utils::msgpack_decoder;
use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo};
use prost::Message;
use serde_json::{json, Value};

/// A realistic OTLP-bound span: a handful of string `meta` tags and a couple of
/// numeric `metrics`, so the per-span attribute work (the dominant cost) is
/// exercised. Mirrors the fixture in `libdd-trace-utils/benches/otlp_encoding.rs`.
fn generate_spans(num_spans: usize, trace_id: u64) -> Vec<Value> {
let root_span_id = 100_000_000_000 + (trace_id % 1_000_000);
(0..num_spans)
.map(|i| {
let span_id = root_span_id + i as u64;
let is_root = i == 0;
let parent_id = if is_root { 0 } else { root_span_id };
let mut meta = json!({
"http.method": "GET",
"http.url": "https://example.com/api/v1/users/12345",
"http.status_code": "200",
"env": "production",
"version": "1.2.3",
"component": "net/http",
});
if is_root {
meta["_dd.p.tid"] = json!("5b8efff798038103");
}
json!({
"service": "bench-service",
"name": "http.request",
"resource": "GET /api/v1/users",
"trace_id": trace_id,
"span_id": span_id,
"parent_id": parent_id,
"start": 1_544_712_660_000_000_000_i64 + i as i64,
"duration": 1_000_000,
"error": 0,
"meta": meta,
"metrics": { "_sampling_priority_v1": 1, "_dd.top_level": 1 },
"type": "web",
})
})
.collect()
}

fn resource_info() -> OtlpResourceInfo {
// `OtlpResourceInfo` is `#[non_exhaustive]`, so build via Default + field assignment.
let mut info = OtlpResourceInfo::default();
info.service = "bench-service".to_string();
info.env = "production".to_string();
info.app_version = "1.2.3".to_string();
info.language = "rust".to_string();
info.tracer_version = "9.9.9".to_string();
info.runtime_id = "11111111-2222-3333-4444-555555555555".to_string();
info
}

/// Frame protobuf bytes exactly as gRPC does: a 1-byte compression flag and a
/// 4-byte big-endian length prefix, then the message body.
fn grpc_frame(body: &[u8]) -> Vec<u8> {
let mut framed = Vec::with_capacity(5 + body.len());
framed.push(0u8); // compression flag: 0 = uncompressed
framed.extend_from_slice(&(body.len() as u32).to_be_bytes());
framed.extend_from_slice(body);
framed
}

pub fn grpc_export_benches(c: &mut Criterion) {
let info = resource_info();

for &num_spans in &[1usize, 1000usize] {
let id = format!("1x{num_spans}");
let bytes =
rmp_serde::to_vec(&vec![generate_spans(num_spans, 100_000_000_000)]).expect("fixture");
let (spans, _) =
msgpack_decoder::v04::from_slice(bytes.as_slice()).expect("decode fixture");
let req = map_traces_to_otlp(spans.clone(), &info, false);

// Encode-only: prost OTLP IR -> gRPC wire frame (what the codec emits per export).
c.bench_function(&format!("grpc/encode_framed/{id}"), |b| {
b.iter(|| {
let body = black_box(&req).encode_to_vec();
black_box(grpc_frame(&body))
})
});

// End-to-end: native spans -> mapped OTLP IR -> gRPC wire frame.
c.bench_function(&format!("grpc/e2e_framed/{id}"), |b| {
b.iter_batched(
|| spans.clone(),
|s| {
let req = map_traces_to_otlp(s, &info, false);
let body = req.encode_to_vec();
black_box(grpc_frame(&body))
},
BatchSize::SmallInput,
)
});
}
}

criterion_group!(benches, grpc_export_benches);
criterion_main!(benches);
57 changes: 43 additions & 14 deletions libdd-data-pipeline/src/otlp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,26 @@
use http::HeaderMap;
use std::time::Duration;

/// OTLP trace export protocol — selects the HTTP body encoding and `Content-Type`.
/// OTLP trace export protocol — selects the wire transport and body encoding.
///
/// Only the HTTP encodings libdatadog actually supports are representable. A `grpc` value (e.g.
/// resolved from the OTel-default `OTEL_EXPORTER_OTLP_PROTOCOL`) is rejected by
/// [`FromStr`](std::str::FromStr) rather than represented here, so an unsupported protocol can
/// never be constructed and silently mishandled downstream.
/// All three OTel-standard protocol strings parse successfully; the selection
/// controls which send path the exporter uses:
/// - `http/json` and `http/protobuf` → OTLP over HTTP/1.1 via
/// [`HttpClientCapability`](libdd_capabilities::HttpClientCapability).
/// - `grpc` → OTLP over HTTP/2 via a tonic [`Channel`](tonic::transport::Channel).
///
/// Plaintext gRPC (`http://` scheme, port 4317) is supported. TLS gRPC
/// (`https://` scheme) is not yet implemented — use a TLS-terminating sidecar.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum OtlpProtocol {
/// HTTP with a JSON body (`Content-Type: application/json`). The default.
#[default]
HttpJson,
/// HTTP with a protobuf body (`Content-Type: application/x-protobuf`).
HttpProtobuf,
/// gRPC over HTTP/2. Protobuf-encoded body with 5-byte gRPC framing.
/// Default port is 4317. Only plaintext (`http://`) is supported.
Grpc,
}

impl std::str::FromStr for OtlpProtocol {
Expand All @@ -27,11 +34,7 @@ impl std::str::FromStr for OtlpProtocol {
match s {
"http/json" => Ok(OtlpProtocol::HttpJson),
"http/protobuf" => Ok(OtlpProtocol::HttpProtobuf),
// gRPC is a valid OTLP protocol in the OTel spec but is not implemented in
// libdatadog. Reject it explicitly so callers get a clean error at the parse
// boundary, rather than constructing an unsupported value that has to be guarded
// against everywhere downstream.
"grpc" => Err("OTLP gRPC export is not supported".to_string()),
"grpc" => Ok(OtlpProtocol::Grpc),
other => Err(format!("unknown OTLP protocol: {other}")),
}
}
Expand All @@ -40,24 +43,30 @@ impl std::str::FromStr for OtlpProtocol {
impl OtlpProtocol {
/// The HTTP `Content-Type` for this protocol's body encoding. Crate-internal: the public type
/// is only constructed/selected by callers; encoding is the exporter's job.
/// Only called on the HTTP path; the gRPC path uses tonic's ProstCodec.
pub(crate) fn content_type(&self) -> http::HeaderValue {
#[allow(clippy::unreachable)]
match self {
OtlpProtocol::HttpJson => libdd_common::header::APPLICATION_JSON,
OtlpProtocol::HttpProtobuf => libdd_common::header::APPLICATION_PROTOBUF,
OtlpProtocol::Grpc => unreachable!("gRPC path does not call content_type()"),
}
}

/// Encode the prost OTLP request to this protocol's wire format. Crate-internal so the
/// third-party `serde_json::Error` does not leak into the public API.
/// Only called on the HTTP path; the gRPC path uses tonic's ProstCodec.
pub(crate) fn encode(
&self,
req: &libdd_trace_utils::otlp_encoder::ProtoExportTraceServiceRequest,
) -> Result<Vec<u8>, serde_json::Error> {
#[allow(clippy::unreachable)]
match self {
OtlpProtocol::HttpJson => libdd_trace_utils::otlp_encoder::encode_otlp_json(req),
OtlpProtocol::HttpProtobuf => {
Ok(libdd_trace_utils::otlp_encoder::encode_otlp_protobuf(req))
}
OtlpProtocol::Grpc => unreachable!("gRPC path does not call encode()"),
}
}
}
Expand Down Expand Up @@ -99,10 +108,15 @@ mod tests {
}

#[test]
fn grpc_is_rejected_at_parse() {
// gRPC is unsupported, so it must not parse into a protocol: an unsupported value can
// never be constructed.
assert!(OtlpProtocol::from_str("grpc").is_err());
fn grpc_parses_successfully() {
// gRPC is now a supported protocol — it must parse without error.
assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc);
}

#[test]
fn grpc_config_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<OtlpGrpcTraceConfig>();
}

#[test]
Expand All @@ -118,6 +132,21 @@ mod tests {
}
}

/// Parsed OTLP gRPC trace exporter configuration.
///
/// The endpoint URL is consumed at build time to construct the tonic
/// [`Channel`](tonic::transport::Channel); only the per-request settings below
/// are retained here.
#[derive(Clone, Debug)]
pub struct OtlpGrpcTraceConfig {
/// Custom key-value pairs forwarded as gRPC request metadata.
pub headers: Vec<(String, String)>,
/// Per-request timeout (applied via [`tokio::time::timeout`]).
pub timeout: Duration,
/// When `true`, omit DD-specific per-span attributes from the payload.
pub otel_trace_semantics_enabled: bool,
}

/// Parsed OTLP trace-metrics exporter configuration.
#[derive(Clone, Debug)]
pub struct OtlpMetricsConfig {
Expand Down
Loading
Loading