diff --git a/instrumentation/Cargo.lock b/instrumentation/Cargo.lock index d281bea4..08304f7f 100644 --- a/instrumentation/Cargo.lock +++ b/instrumentation/Cargo.lock @@ -17,6 +17,15 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anes" version = "0.1.6" @@ -165,6 +174,20 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chrono" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" +dependencies = [ + "iana-time-zone", + "js-sys", + "num-traits", + "serde", + "wasm-bindgen", + "windows-link", +] + [[package]] name = "ciborium" version = "0.2.2" @@ -355,7 +378,9 @@ version = "0.1.0" dependencies = [ "datadog-opentelemetry", "lambda_runtime", + "libdd-trace-inferrer", "opentelemetry", + "opentelemetry-semantic-conventions", "opentelemetry_sdk", "serde", "serde_json", @@ -809,6 +834,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "2.2.0" @@ -1248,6 +1297,21 @@ dependencies = [ "serde", ] +[[package]] +name = "libdd-trace-inferrer" +version = "1.0.0" +source = "git+https://github.com/DataDog/libdatadog?branch=david.ogbureke%2Flibdd-trace-inferrer#3d602313b31724e8d287e0c6a22d29919374998e" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "regex", + "serde", + "serde_json", + "sha2", + "tracing", +] + [[package]] name = "libdd-trace-normalization" version = "2.0.0" @@ -2687,12 +2751,65 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/instrumentation/Cargo.toml b/instrumentation/Cargo.toml index 55c9c8d7..b251868b 100644 --- a/instrumentation/Cargo.toml +++ b/instrumentation/Cargo.toml @@ -15,6 +15,7 @@ authors = ["Datadog Inc. "] publish = false [workspace.dependencies] +libdd-trace-inferrer = { git = "https://github.com/DataDog/libdatadog", branch = "david.ogbureke/libdd-trace-inferrer" } serde = "1.0.194" serde_json = "1.0.140" opentelemetry = { version = "0.31.0", features = ["trace", "metrics", "logs"], default-features = false } diff --git a/instrumentation/datadog-aws-lambda/Cargo.toml b/instrumentation/datadog-aws-lambda/Cargo.toml index e720e2d6..f14c8d41 100644 --- a/instrumentation/datadog-aws-lambda/Cargo.toml +++ b/instrumentation/datadog-aws-lambda/Cargo.toml @@ -3,7 +3,7 @@ [package] name = "datadog-aws-lambda" -description = "Datadog distributed tracing for AWS Lambda." +description = "Datadog distributed tracing for AWS Lambda. Provides inferred spans and trace context extraction for SQS, SNS, EventBridge, API Gateway (REST and HTTP APIs), and Lambda Function URLs." version.workspace = true edition.workspace = true rust-version = "1.85.0" @@ -14,10 +14,12 @@ publish.workspace = true [dependencies] datadog-opentelemetry = { version = "0.3", path = "../../datadog-opentelemetry" } +libdd-trace-inferrer = { workspace = true } lambda_runtime = "0.13" serde = { workspace = true } serde_json = { workspace = true, features = ["raw_value"] } opentelemetry = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["trace", "metrics", "logs"] } tracing = { workspace = true } diff --git a/instrumentation/datadog-aws-lambda/src/attribute_keys.rs b/instrumentation/datadog-aws-lambda/src/attribute_keys.rs index aa1834bd..b9406803 100644 --- a/instrumentation/datadog-aws-lambda/src/attribute_keys.rs +++ b/instrumentation/datadog-aws-lambda/src/attribute_keys.rs @@ -2,6 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 pub(crate) const OPERATION_NAME: &str = "operation.name"; +// `operation_name` duplicates `operation.name` as a custom attribute +pub(crate) const OPERATION_NAME_CUSTOM: &str = "operation_name"; pub(crate) const RESOURCE_NAME: &str = "resource.name"; pub(crate) const SPAN_TYPE: &str = "span.type"; pub(crate) const ERROR: &str = "error"; @@ -11,8 +13,14 @@ pub(crate) const ERROR_MESSAGE: &str = "error.message"; pub(crate) const LANGUAGE: &str = "language"; pub(crate) const REQUEST_ID: &str = "request_id"; pub(crate) const COLD_START: &str = "cold_start"; +pub(crate) const ASYNC_INVOCATION: &str = "async_invocation"; pub(crate) const FUNCTION_ARN: &str = "function_arn"; pub(crate) const FUNCTION_VERSION: &str = "function_version"; pub(crate) const FUNCTION_NAME: &str = "functionname"; pub(crate) const RESOURCE_NAMES: &str = "resource_names"; pub(crate) const DD_ORIGIN: &str = "_dd.origin"; +pub(crate) const FUNCTION_TRIGGER_EVENT_SOURCE: &str = "function_trigger.event_source"; +pub(crate) const FUNCTION_TRIGGER_EVENT_SOURCE_ARN: &str = "function_trigger.event_source_arn"; + +// OpenTelemetry semantic convention keys +pub(crate) use opentelemetry_semantic_conventions::attribute::{PEER_SERVICE, SERVICE_NAME}; diff --git a/instrumentation/datadog-aws-lambda/src/invocation.rs b/instrumentation/datadog-aws-lambda/src/invocation.rs index e159bf0f..ee03031c 100644 --- a/instrumentation/datadog-aws-lambda/src/invocation.rs +++ b/instrumentation/datadog-aws-lambda/src/invocation.rs @@ -3,19 +3,24 @@ //! Lifecycle management for a single Lambda invocation. //! -//! Each invocation produces one *root span* (`aws.lambda`) that wraps the handler call. +//! Each invocation produces: +//! - Zero or more *inferred spans* derived from the trigger payload (e.g., an SQS span) +//! - One *root span* (`aws.lambda`) that wraps the handler call //! //! Typical usage: -//! 1. [`Invocation::start`] — create the root span before the handler runs. +//! 1. [`Invocation::start`] — create all spans before the handler runs. //! 2. [`Invocation::handler_context`] — pass the returned context to the handler so its OTel spans //! are correctly parented. -//! 3. [`Invocation::finish`] — record errors and end the span after the handler returns. +//! 3. [`Invocation::finish`] — record errors and end all spans after the handler returns. use crate::attribute_keys as attr; +use crate::span_inferrer::{extract_trigger, InferredSpanScope, TriggerContext}; +use libdd_trace_inferrer::SpanInferrer; use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer}; use opentelemetry::{Context, KeyValue}; use opentelemetry_sdk::trace::SdkTracer; +use std::time::SystemTime; pub(crate) static TRACER_NAME: &str = "datadog-lambda-rs"; pub(crate) const ROOT_SPAN_NAME: &str = "aws.lambda"; @@ -34,9 +39,12 @@ pub(crate) struct LambdaSpan { impl LambdaSpan { /// Creates and starts the `aws.lambda` root span. /// - /// The span is parented to the ambient OTel context, which is typically a new root trace. + /// The span is parented to `trigger.parent_cx` when that context contains a valid + /// span (i.e., a propagated or inferred upstream span exists). Otherwise it falls + /// back to the ambient OTel context, which is typically a new root trace. pub(crate) fn start( tracer: &SdkTracer, + trigger: &TriggerContext, lambda_cx: &lambda_runtime::Context, cold_start: bool, ) -> Self { @@ -45,27 +53,44 @@ impl LambdaSpan { tracing::debug!(request_id, "creating invocation root span"); - let parent_cx = Context::current(); + let effective_cx = if trigger.parent_cx.span().span_context().is_valid() { + trigger.parent_cx.clone() + } else { + Context::current() + }; let mut builder = tracer.span_builder(ROOT_SPAN_NAME); builder.span_kind = Some(SpanKind::Server); - let attrs = vec![ + let mut attrs = vec![ KeyValue::new(attr::OPERATION_NAME, ROOT_SPAN_NAME), KeyValue::new(attr::LANGUAGE, "rust"), KeyValue::new(attr::RESOURCE_NAME, function_name.clone()), KeyValue::new(attr::SPAN_TYPE, "serverless"), KeyValue::new(attr::REQUEST_ID, request_id.clone()), KeyValue::new(attr::COLD_START, cold_start), + KeyValue::new(attr::ASYNC_INVOCATION, trigger.is_async), KeyValue::new(attr::FUNCTION_ARN, lambda_cx.invoked_function_arn.clone()), KeyValue::new(attr::FUNCTION_VERSION, lambda_cx.env_config.version.clone()), KeyValue::new(attr::FUNCTION_NAME, function_name.to_lowercase()), KeyValue::new(attr::RESOURCE_NAMES, function_name.clone()), KeyValue::new(attr::DD_ORIGIN, "lambda"), ]; + if let Some(ref source) = trigger.event_source { + attrs.push(KeyValue::new( + attr::FUNCTION_TRIGGER_EVENT_SOURCE, + source.clone(), + )); + } + if let Some(ref arn) = trigger.event_source_arn { + attrs.push(KeyValue::new( + attr::FUNCTION_TRIGGER_EVENT_SOURCE_ARN, + arn.clone(), + )); + } builder.attributes = Some(attrs); - let span = tracer.build_with_context(builder, &parent_cx); - let cx = parent_cx.with_span(span); + let span = tracer.build_with_context(builder, &effective_cx); + let cx = effective_cx.with_span(span); Self { cx, request_id } } @@ -90,21 +115,45 @@ impl LambdaSpan { /// Owns the full lifecycle of a single Lambda invocation's tracing state. /// -/// Holds the root span created for the handler call. +/// Holds the root span and all inferred spans created from the trigger payload. /// Call [`start`](Self::start) before the handler, then [`finish`](Self::finish) after. pub(crate) struct Invocation { /// The `aws.lambda` root span for this invocation. lambda_span: LambdaSpan, + /// Inferred spans derived from the trigger payload (e.g., SQS, SNS). + /// May be empty when the payload has no recognisable trigger. + inferred_spans: InferredSpanScope, + /// Wall-clock time at invocation start, used to compute inferred span durations. + started_at: SystemTime, } impl Invocation { pub(crate) fn start( tracer: &SdkTracer, + inferrer: &SpanInferrer, + payload: &str, lambda_cx: &lambda_runtime::Context, cold_start: bool, ) -> Self { - let lambda_span = LambdaSpan::start(tracer, lambda_cx, cold_start); - Self { lambda_span } + let extraction = extract_trigger(inferrer, payload); + let inferred_spans = InferredSpanScope::start( + tracer, + &extraction.upstream_cx, + &extraction.inference_result, + ); + let trigger = TriggerContext { + parent_cx: inferred_spans.innermost_context(&extraction.upstream_cx), + is_async: extraction.is_async, + event_source: extraction.event_source, + event_source_arn: extraction.event_source_arn, + }; + let lambda_span = LambdaSpan::start(tracer, &trigger, lambda_cx, cold_start); + + Self { + lambda_span, + inferred_spans, + started_at: SystemTime::now(), + } } /// Returns the OTel context to use as the active context during handler execution. @@ -117,7 +166,7 @@ impl Invocation { self.lambda_span.cx.clone() } - /// Records any handler error and ends the root span. + /// Records any handler error and ends all spans. /// /// The result is returned unchanged; this method only has side effects /// (error attributes, span end times). @@ -125,17 +174,29 @@ impl Invocation { where Err: std::fmt::Display, { + let invocation_end = SystemTime::now(); + if let Err(ref err) = result { self.lambda_span.set_error(err); } - self.lambda_span.finish(); + self.finish_spans(invocation_end); result } + + /// Ends all spans in the correct order: inferred spans first, then the root span. + /// + /// Inferred spans must be ended before the root span so that timing relationships + /// are correct in the Datadog backend. + fn finish_spans(self, invocation_end: SystemTime) { + self.inferred_spans.end(self.started_at, invocation_end); + self.lambda_span.finish(); + } } #[cfg(test)] mod tests { use super::*; + use crate::span_inferrer::TriggerContext; use opentelemetry::trace::{ SpanContext, SpanId, TraceFlags, TraceId, TraceState, TracerProvider as _, }; @@ -159,6 +220,15 @@ mod tests { cx } + fn test_trigger() -> TriggerContext { + TriggerContext { + parent_cx: Context::current(), + is_async: false, + event_source: None, + event_source_arn: None, + } + } + fn find_attr<'a>(attrs: &'a [KeyValue], key: &str) -> Option<&'a Value> { attrs .iter() @@ -176,8 +246,12 @@ mod tests { let tracer = provider.tracer("test"); let lambda_cx = test_lambda_cx(); + let trigger = TriggerContext { + is_async: true, + ..test_trigger() + }; - let span = LambdaSpan::start(&tracer, &lambda_cx, true); + let span = LambdaSpan::start(&tracer, &trigger, &lambda_cx, true); span.finish(); provider.force_flush().unwrap(); @@ -207,6 +281,10 @@ mod tests { Some(&Value::String("my-function".into())) ); assert_eq!(find_attr(attrs, "cold_start"), Some(&Value::Bool(true))); + assert_eq!( + find_attr(attrs, "async_invocation"), + Some(&Value::Bool(true)) + ); } #[test] @@ -222,11 +300,13 @@ mod tests { true, TraceState::default(), ); - let _guard = Context::current() - .with_remote_span_context(parent_sc) - .attach(); + let parent_cx = Context::current().with_remote_span_context(parent_sc); + let trigger = TriggerContext { + parent_cx, + ..test_trigger() + }; - let span = LambdaSpan::start(&tracer, &test_lambda_cx(), false); + let span = LambdaSpan::start(&tracer, &trigger, &test_lambda_cx(), false); assert_eq!(span.cx.span().span_context().trace_id(), trace_id); } @@ -234,7 +314,14 @@ mod tests { async fn error_handler_sets_error_attributes() { let (provider, exporter) = test_provider(); let invocation = Invocation { - lambda_span: LambdaSpan::start(&provider.tracer("test"), &test_lambda_cx(), false), + lambda_span: LambdaSpan::start( + &provider.tracer("test"), + &test_trigger(), + &test_lambda_cx(), + false, + ), + inferred_spans: InferredSpanScope::empty(), + started_at: SystemTime::now(), }; let _: Result<(), String> = invocation.finish(Err::<(), String>("boom".to_string())); @@ -253,7 +340,14 @@ mod tests { async fn successful_handler_sets_no_error_attributes() { let (provider, exporter) = test_provider(); let invocation = Invocation { - lambda_span: LambdaSpan::start(&provider.tracer("test"), &test_lambda_cx(), false), + lambda_span: LambdaSpan::start( + &provider.tracer("test"), + &test_trigger(), + &test_lambda_cx(), + false, + ), + inferred_spans: InferredSpanScope::empty(), + started_at: SystemTime::now(), }; let _: Result<(), String> = invocation.finish(Ok(())); @@ -264,4 +358,48 @@ mod tests { assert!(find_attr(attrs, "error").is_none()); assert!(find_attr(attrs, "error.message").is_none()); } + + #[test] + fn start_invocation_materializes_inferred_spans_for_sqs_events() { + let (provider, _) = test_provider(); + let payload = serde_json::json!({ + "Records": [{ + "messageId": "msg-001", + "receiptHandle": "receipt-001", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789:test-queue", + "awsRegion": "us-east-1", + "body": "hello", + "md5OfBody": "d8e8fca2dc0f896fd7cb4cb0031ba249", + "attributes": { + "SentTimestamp": "1718444400000", + "ApproximateFirstReceiveTimestamp": "1718444400100", + "ApproximateReceiveCount": "1", + "SenderId": "AIDAIENQZJOLO23YVJ4VO" + }, + "messageAttributes": { + "_datadog": { + "stringValue": serde_json::to_string(&serde_json::json!({ + "x-datadog-trace-id": "12345", + "x-datadog-parent-id": "67890", + "x-datadog-sampling-priority": "1" + })) + .unwrap(), + "dataType": "String" + } + } + }] + }) + .to_string(); + + let inferrer = crate::span_inferrer::build_inferrer(); + let tracer = opentelemetry::trace::TracerProvider::tracer(&provider, TRACER_NAME); + let invocation = Invocation::start(&tracer, &inferrer, &payload, &test_lambda_cx(), false); + assert!(invocation + .handler_context() + .span() + .span_context() + .is_valid()); + assert!(!invocation.inferred_spans.is_empty()); + } } diff --git a/instrumentation/datadog-aws-lambda/src/lib.rs b/instrumentation/datadog-aws-lambda/src/lib.rs index eef6b976..5ced2088 100644 --- a/instrumentation/datadog-aws-lambda/src/lib.rs +++ b/instrumentation/datadog-aws-lambda/src/lib.rs @@ -9,11 +9,13 @@ mod attribute_keys; mod invocation; +mod span_inferrer; use datadog_opentelemetry::configuration::{Config, ConfigBuilder}; use invocation::{Invocation, TRACER_NAME}; use lambda_runtime::tower::Service; use lambda_runtime::LambdaEvent; +use libdd_trace_inferrer::SpanInferrer; use opentelemetry::trace::{FutureExt, TracerProvider}; use opentelemetry_sdk::trace::SdkTracer; use serde::de::DeserializeOwned; @@ -100,6 +102,7 @@ impl From for TracedServiceError { pub struct TracedService { inner: S, tracer: SdkTracer, + inferrer: SpanInferrer, cold_start: bool, _phantom: PhantomData R>, } @@ -145,6 +148,7 @@ impl TracedService { Self { inner, tracer, + inferrer: span_inferrer::build_inferrer(), cold_start: true, _phantom: PhantomData, } @@ -175,7 +179,13 @@ where fn call(&mut self, event: LambdaEvent>) -> Self::Future { let cold_start = self.take_cold_start(); - let invocation = Invocation::start(&self.tracer, &event.context, cold_start); + let invocation = Invocation::start( + &self.tracer, + &self.inferrer, + event.payload.get(), + &event.context, + cold_start, + ); let typed_payload = match serde_json::from_str::(event.payload.get()) { Ok(payload) => payload, Err(err) => { @@ -221,6 +231,7 @@ mod tests { TracedService { inner: ReadyService, tracer, + inferrer: span_inferrer::build_inferrer(), cold_start: true, _phantom: PhantomData, } diff --git a/instrumentation/datadog-aws-lambda/src/span_inferrer/mod.rs b/instrumentation/datadog-aws-lambda/src/span_inferrer/mod.rs new file mode 100644 index 00000000..9a96050c --- /dev/null +++ b/instrumentation/datadog-aws-lambda/src/span_inferrer/mod.rs @@ -0,0 +1,451 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Bridge between [`libdd_trace_inferrer`] and the OpenTelemetry SDK. +//! +//! Responsibilities: +//! - Parse the raw Lambda event payload with `libdd_trace_inferrer` to identify the trigger type +//! and extract span metadata and carrier headers. +//! - Convert the inferred [`SpanData`](libdd_trace_inferrer::SpanData) into live OTel spans using +//! the Datadog tracer. +//! - Expose [`TriggerContext`] so [`crate::invocation`] can parent the root span correctly. + +use crate::attribute_keys as attr; +use libdd_trace_inferrer::{InferConfig, InferenceResult, SpanInferrer}; + +pub(crate) fn build_inferrer() -> SpanInferrer { + #[allow(clippy::disallowed_methods)] + let region = std::env::var("AWS_REGION").unwrap_or_default(); + SpanInferrer::new(InferConfig { + region, + ..InferConfig::default() + }) +} + +use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer}; +use opentelemetry::{global, Context, KeyValue}; +use opentelemetry_sdk::trace::SdkTracer; +use std::time::{Duration, SystemTime}; + +/// Metadata extracted from the trigger event, used to parent the root span. +pub(crate) struct TriggerContext { + /// OTel context whose active span is the innermost inferred span (or the extracted + /// propagation context when there are no inferred spans). + pub parent_cx: Context, + /// Whether this trigger uses asynchronous invocation semantics. + /// + /// When `true`, inferred span duration = `invocation_start - event_time`. + /// When `false`, inferred span duration = `invocation_end - event_time`. + pub is_async: bool, + /// Short name of the outermost trigger, e.g. `"sqs"`. Added as a tag on the root span. + pub event_source: Option, + /// ARN of the outermost trigger resource. Added as a tag on the root span when present. + pub event_source_arn: Option, +} + +/// An inferred span that is currently open, held as an OTel context. +struct ActiveInferredSpan { + /// OTel context whose active span is this inferred span. + cx: Context, + /// Mirrors [`TriggerContext::is_async`] — determines the end-time passed to + /// [`InferredSpanScope::end`]. + is_async: bool, + /// Source event timestamp for this inferred span, when known. + start_time: Option, +} + +/// Handle for the set of inferred spans created for a trigger. +pub(crate) struct InferredSpanScope { + outer: Option, + inner: Option, +} + +impl InferredSpanScope { + pub(crate) fn empty() -> Self { + Self { + outer: None, + inner: None, + } + } + + #[cfg(test)] + pub(crate) fn is_empty(&self) -> bool { + self.inner.is_none() + } + + /// Creates inferred spans from an [`InferenceResult`]. + /// + /// Call [`innermost_context`](Self::innermost_context) after construction to get the + /// OTel context whose active span is the innermost inferred span (e.g., SQS inside + /// SNS). Callers should use that context as the parent for the `aws.lambda` root span. + pub(crate) fn start(tracer: &SdkTracer, parent_cx: &Context, result: &InferenceResult) -> Self { + if !result.should_create_inferred_span() { + return Self::empty(); + } + + let mut current_cx = parent_cx.clone(); + + let outer = result + .wrapped_span + .as_ref() + .filter(|w| w.should_create_inferred_span()) + .map(|w| { + current_cx = build_inferred_span(tracer, &w.span_data, ¤t_cx); + ActiveInferredSpan { + cx: current_cx.clone(), + is_async: w.is_async, + start_time: inferred_span_start_time(&w.span_data), + } + }); + + current_cx = build_inferred_span(tracer, &result.span_data, ¤t_cx); + let inner = Some(ActiveInferredSpan { + cx: current_cx, + is_async: result.is_async, + start_time: inferred_span_start_time(&result.span_data), + }); + + Self { outer, inner } + } + + /// Returns the OTel context of the innermost inferred span. + /// + /// Falls back to `fallback` (typically the upstream propagation context) when no + /// inferred spans were created. Callers should use this as the parent for the root span. + pub(crate) fn innermost_context(&self, fallback: &Context) -> Context { + self.inner + .as_ref() + .map(|s| s.cx.clone()) + .unwrap_or_else(|| fallback.clone()) + } + + /// End all inferred spans with correct timing. + /// + /// Wrapped outer spans end when the inner event begins. + /// Inner async spans end at invocation start (propagation delay). + /// Inner sync spans end at invocation end (full request duration). + pub(crate) fn end(&self, invocation_start: SystemTime, invocation_end: SystemTime) { + if let Some(outer) = self.outer.as_ref() { + let outer_end_time = self + .inner + .as_ref() + .and_then(|inner| inner.start_time) + .unwrap_or(invocation_start); + outer.cx.span().end_with_timestamp(outer_end_time); + } + + if let Some(inner) = self.inner.as_ref() { + let end_time = if inner.is_async { + invocation_start + } else { + invocation_end + }; + inner.cx.span().end_with_timestamp(end_time); + } + } +} + +fn inferred_span_start_time(span_data: &libdd_trace_inferrer::SpanData) -> Option { + let start_ns = u64::try_from(span_data.start).ok()?; + (start_ns > 0).then(|| SystemTime::UNIX_EPOCH + Duration::from_nanos(start_ns)) +} + +/// Converts a [`libdd_trace_inferrer::SpanData`] into a live OTel span. +/// +/// Returns a new OTel context with the new span as the active span. All metadata +/// from `span_data.meta` is added as span attributes. +/// +/// If `span_data.start` is zero or negative (unknown event time), the OTel SDK +/// assigns the current wall-clock time as the start time. +fn build_inferred_span( + tracer: &SdkTracer, + span_data: &libdd_trace_inferrer::SpanData, + parent_cx: &Context, +) -> Context { + let mut builder = tracer.span_builder(span_data.name.clone()); + builder.span_kind = Some(SpanKind::Server); + + if let Some(start_time) = inferred_span_start_time(span_data) { + builder.start_time = Some(start_time); + } + + let mut attrs = vec![ + KeyValue::new(attr::SERVICE_NAME, span_data.service.clone()), + KeyValue::new(attr::RESOURCE_NAME, span_data.resource.clone()), + KeyValue::new(attr::SPAN_TYPE, span_data.r#type.clone()), + KeyValue::new(attr::OPERATION_NAME, span_data.name.clone()), + KeyValue::new(attr::OPERATION_NAME_CUSTOM, span_data.name.clone()), + KeyValue::new(attr::PEER_SERVICE, span_data.service.clone()), + ]; + for (k, v) in &span_data.meta { + attrs.push(KeyValue::new(k.clone(), v.clone())); + } + builder.attributes = Some(attrs); + + let span = tracer.build_with_context(builder, parent_cx); + parent_cx.with_span(span) +} + +/// Output of [`extract_trigger`]. +pub(crate) struct TriggerExtraction { + /// OTel context extracted from the trigger's carrier headers. + /// + /// Contains the upstream trace/span IDs when the trigger carries Datadog propagation + /// headers. Falls back to the ambient context when no valid carrier is found. + pub upstream_cx: Context, + /// Full inference result from `libdd_trace_inferrer`, including span data, trigger + /// tags, and async/sync classification. + pub inference_result: InferenceResult, + /// Whether this trigger uses asynchronous invocation semantics. + pub is_async: bool, + /// Short name of the outermost trigger, e.g. `"sqs"`. Added as a tag on the root span. + pub event_source: Option, + /// ARN of the outermost trigger resource. Added as a tag on the root span when present. + pub event_source_arn: Option, +} + +/// Infers trigger metadata from `payload` and extracts the upstream OTel context. +/// +/// Carrier extraction uses `x-datadog-trace-id` as a sentinel: a missing or zero +/// trace ID means there are no upstream headers to propagate, so we fall back to +/// the ambient context rather than accidentally creating a span parented to trace ID 0. +pub(crate) fn extract_trigger(inferrer: &SpanInferrer, payload: &str) -> TriggerExtraction { + let result = inferrer.infer_span(payload).unwrap_or_default(); + let trace_id = result.carrier.get("x-datadog-trace-id").map(String::as_str); + let has_upstream_trace = trace_id + .and_then(|id| id.parse::().ok()) + .is_some_and(|id| id != 0); + + let upstream_cx = global::get_text_map_propagator(|p| { + if has_upstream_trace { + tracing::debug!( + trace_id = trace_id.unwrap_or("?"), + "extracted trace context from trigger" + ); + p.extract(&result.carrier) + } else { + tracing::debug!("no trace context found in event"); + Context::current() + } + }); + + TriggerExtraction { + upstream_cx, + is_async: result.is_async, + event_source: result + .trigger_tags + .get("function_trigger.event_source") + .cloned(), + event_source_arn: result + .trigger_tags + .get("function_trigger.event_source_arn") + .cloned(), + inference_result: result, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use libdd_trace_inferrer::{InferenceResult, SpanData}; + use opentelemetry::trace::{TraceContextExt, TracerProvider as _}; + use opentelemetry::Value as OtelValue; + use opentelemetry_sdk::trace::{ + InMemorySpanExporter, SdkTracerProvider, SpanData as OtelSpanData, + }; + use serde_json::json; + use std::collections::HashMap; + use std::time::SystemTime; + + fn test_provider() -> (SdkTracerProvider, InMemorySpanExporter) { + let exporter = InMemorySpanExporter::default(); + let provider = SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + (provider, exporter) + } + + fn find_attr<'a>(attrs: &'a [KeyValue], key: &str) -> Option<&'a OtelValue> { + attrs + .iter() + .find(|kv| kv.key.as_str() == key) + .map(|kv| &kv.value) + } + + fn finished_spans(exporter: &InMemorySpanExporter) -> Vec { + exporter.get_finished_spans().unwrap() + } + + fn make_result(name: &str, service: &str) -> InferenceResult { + InferenceResult { + span_data: SpanData { + name: name.to_string(), + service: service.to_string(), + resource: service.to_string(), + r#type: "web".to_string(), + start: 0, + meta: HashMap::new(), + }, + is_async: false, + ..InferenceResult::default() + } + } + + fn nanos_to_system_time(ns: u64) -> SystemTime { + SystemTime::UNIX_EPOCH + Duration::from_nanos(ns) + } + + #[test] + fn sets_expected_attributes_on_inferred_spans() { + let (provider, exporter) = test_provider(); + let tracer = provider.tracer("test"); + + let result = make_result("aws.sqs", "my-queue"); + let scope = InferredSpanScope::start(&tracer, &Context::current(), &result); + let now = SystemTime::now(); + scope.end(now, now); + provider.force_flush().ok(); + + let spans = finished_spans(&exporter); + assert_eq!(spans.len(), 1); + let attrs = &spans[0].attributes; + + assert_eq!( + find_attr(attrs, "service.name"), + Some(&OtelValue::String("my-queue".into())) + ); + assert_eq!( + find_attr(attrs, "resource.name"), + Some(&OtelValue::String("my-queue".into())) + ); + assert_eq!( + find_attr(attrs, "operation.name"), + Some(&OtelValue::String("aws.sqs".into())) + ); + assert_eq!( + find_attr(attrs, "operation_name"), + Some(&OtelValue::String("aws.sqs".into())) + ); + } + + #[test] + fn chains_inferred_spans() { + let (provider, exporter) = test_provider(); + let tracer = provider.tracer("test"); + + let mut result = make_result("aws.sqs", "my-queue"); + result.wrapped_span = Some(Box::new(make_result("aws.sns", "my-topic"))); + + let parent_cx = Context::current(); + let scope = InferredSpanScope::start(&tracer, &parent_cx, &result); + let cx = scope.innermost_context(&parent_cx); + let now = SystemTime::now(); + scope.end(now, now); + provider.force_flush().ok(); + + let spans = finished_spans(&exporter); + assert_eq!(spans.len(), 2); + // Innermost span (sqs) should be the active span in the returned context + assert_eq!( + cx.span().span_context().span_id(), + spans + .iter() + .find(|s| s.name == "aws.sqs") + .unwrap() + .span_context + .span_id() + ); + } + + #[test] + fn wrapped_span_ends_when_inner_span_starts() { + let (provider, exporter) = test_provider(); + let tracer = provider.tracer("test"); + + let outer_start_ns = 1_000_000_000; + let inner_start_ns = 2_000_000_000; + let invocation_start = nanos_to_system_time(10_000_000_000); + let invocation_end = nanos_to_system_time(20_000_000_000); + + let mut result = make_result("aws.sqs", "my-queue"); + result.span_data.start = inner_start_ns as i64; + result.is_async = true; + + let mut wrapped = make_result("aws.sns", "my-topic"); + wrapped.span_data.start = outer_start_ns as i64; + result.wrapped_span = Some(Box::new(wrapped)); + + let scope = InferredSpanScope::start(&tracer, &Context::current(), &result); + scope.end(invocation_start, invocation_end); + provider.force_flush().ok(); + + let spans = finished_spans(&exporter); + let outer = spans.iter().find(|s| s.name == "aws.sns").unwrap(); + let inner = spans.iter().find(|s| s.name == "aws.sqs").unwrap(); + + assert_eq!(outer.end_time, inner.start_time); + } + + #[test] + fn preserves_parent_context_when_no_inferred_spans_are_created() { + let (provider, _) = test_provider(); + let tracer = provider.tracer("test"); + let parent_cx = Context::current(); + let empty_result = InferenceResult::default(); + let scope = InferredSpanScope::start(&tracer, &parent_cx, &empty_result); + let result_cx = scope.innermost_context(&parent_cx); + + assert_eq!( + result_cx.span().span_context().trace_id(), + parent_cx.span().span_context().trace_id() + ); + assert!(scope.is_empty()); + } + + #[test] + fn extracts_trigger_context_from_sqs_event() { + let carrier_json = json!({ + "x-datadog-trace-id": "12345", + "x-datadog-parent-id": "67890", + "x-datadog-sampling-priority": "1" + }); + let event = json!({ + "Records": [{ + "messageId": "msg-001", + "receiptHandle": "receipt-001", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789:test-queue", + "awsRegion": "us-east-1", + "body": "hello", + "md5OfBody": "d8e8fca2dc0f896fd7cb4cb0031ba249", + "attributes": { + "SentTimestamp": "1718444400000", + "ApproximateFirstReceiveTimestamp": "1718444400100", + "ApproximateReceiveCount": "1", + "SenderId": "AIDAIENQZJOLO23YVJ4VO" + }, + "messageAttributes": { + "_datadog": { + "stringValue": serde_json::to_string(&carrier_json).unwrap(), + "dataType": "String" + } + } + }] + }); + + let inferrer = build_inferrer(); + let extraction = extract_trigger(&inferrer, &event.to_string()); + + assert!(extraction.inference_result.is_async); + assert_eq!( + extraction + .inference_result + .trigger_tags + .get("function_trigger.event_source") + .map(String::as_str), + Some("sqs") + ); + assert!(extraction.inference_result.should_create_inferred_span()); + assert_eq!(extraction.inference_result.span_data.name, "aws.sqs"); + } +}