diff --git a/Cargo.lock b/Cargo.lock index 4094c20b9c..bc219d94b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,6 +21,7 @@ checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" name = "agent-data-plane" version = "1.3.0" dependencies = [ + "agent-data-plane-config-system", "antithesis-instrumentation", "antithesis_sdk", "argh", @@ -84,11 +85,18 @@ name = "agent-data-plane-config-system" version = "0.1.0" dependencies = [ "agent-data-plane-config", + "bytesize", "datadog-agent-config", + "datadog-agent-config-overlay-model", + "figment", + "indexmap", "saluki-component-config", + "saluki-config-tools", "saluki-context", + "saluki-error", "serde_json", "stringtheory", + "tokio", ] [[package]] @@ -4422,6 +4430,7 @@ dependencies = [ "rustls", "saluki-api", "saluki-common", + "saluki-component-config", "saluki-config-tools", "saluki-context", "saluki-core", diff --git a/bin/agent-data-plane/Cargo.toml b/bin/agent-data-plane/Cargo.toml index 304badd064..5c240f631e 100644 --- a/bin/agent-data-plane/Cargo.toml +++ b/bin/agent-data-plane/Cargo.toml @@ -14,6 +14,7 @@ fips = ["saluki-app/tls-fips", "saluki-components/fips"] antithesis = ["dep:antithesis_sdk", "antithesis_sdk/full", "dep:antithesis-instrumentation", "saluki-components/antithesis"] [dependencies] +agent-data-plane-config-system = { workspace = true } antithesis-instrumentation = { workspace = true, optional = true } antithesis_sdk = { workspace = true, optional = true } argh = { workspace = true, features = ["help"] } diff --git a/bin/agent-data-plane/src/cli/run.rs b/bin/agent-data-plane/src/cli/run.rs index 6a7ed0e09b..2b41e86ccd 100644 --- a/bin/agent-data-plane/src/cli/run.rs +++ b/bin/agent-data-plane/src/cli/run.rs @@ -1,9 +1,11 @@ use std::{ collections::HashSet, path::PathBuf, + sync::Arc, time::{Duration, Instant}, }; +use agent_data_plane_config_system::ConfigurationSystem; use argh::FromArgs; use datadog_agent_commons::platform::PlatformSettings; use datadog_agent_config::classifier::{ConfigClassifier, Pipeline, PipelineAffinity, Severity, SupportLevel}; @@ -154,22 +156,29 @@ pub async fn handle_run_command( return Ok(()); } + // The configuration system takes ownership of the now-final configuration, establishing it as + // the single source. Components that have not yet been flipped onto typed config read through + // `raw_map()`, which stays live in remote-agent stream mode exactly as before. + let system = Arc::new(ConfigurationSystem::load(config).start().await?); + let config = system.raw_map(); + let active_pipelines = active_pipelines(&dp_config); - check_and_warn_config(&config, &active_pipelines).error_context("Incompatible configuration detected.")?; + check_and_warn_config(config, &active_pipelines).error_context("Incompatible configuration detected.")?; // Set up all of the building blocks for building our topologies and launching internal processes. let component_registry = ComponentRegistry::default(); let health_registry = HealthRegistry::new(); let (env_provider, maybe_env_supervisor) = - ADPEnvironmentProvider::from_configuration(&config, &dp_config, &component_registry, &health_registry).await?; + ADPEnvironmentProvider::from_configuration(config, &dp_config, &component_registry, &health_registry).await?; // Create the blueprint for our primary topology. let (mut blueprint, control_surfaces) = - create_topology(&config, &dp_config, &env_provider, &component_registry).await?; + create_topology(&system, &dp_config, &env_provider, &component_registry).await?; // Create the internal supervisor which drives our control plane and internal observability. let mut internal_supervisor = create_internal_supervisor( - &config, + config, + Arc::clone(&system), &dp_config, &component_registry, health_registry.clone(), @@ -181,7 +190,7 @@ pub async fn handle_run_command( .error_context("Failed to create internal supervisor.")?; // Run memory bounds validation to ensure that we can launch the topology with our configured memory limit, if any. - let bounds_config = MemoryBoundsConfiguration::try_from_config(&config)?; + let bounds_config = MemoryBoundsConfiguration::try_from_config(config)?; let memory_limiter = initialize_memory_bounds(bounds_config, component_registry.root())?; if let Ok(val) = std::env::var("DD_ADP_WRITE_SIZING_GUIDE") { @@ -358,9 +367,12 @@ fn is_a_pipeline_affected(active_pipelines: &HashSet, pipeline_affinit } async fn create_topology( - config: &GenericConfiguration, dp_config: &DataPlaneConfiguration, env_provider: &ADPEnvironmentProvider, + system: &ConfigurationSystem, dp_config: &DataPlaneConfiguration, env_provider: &ADPEnvironmentProvider, component_registry: &ComponentRegistry, ) -> Result<(TopologyBlueprint, TopologyControlSurfaces), GenericError> { + // Un-flipped components read the raw map; the DogStatsD helper takes the whole system so it can + // reach typed config for the flipped source. + let config = system.raw_map(); let mut blueprint = TopologyBlueprint::new("primary", component_registry); blueprint.with_shutdown_timeout(dp_config.stop_timeout()); @@ -417,7 +429,7 @@ async fn create_topology( } if dp_config.dogstatsd().enabled() { - let dsd_control_surface = add_dsd_pipeline_to_blueprint(&mut blueprint, config, env_provider).await?; + let dsd_control_surface = add_dsd_pipeline_to_blueprint(&mut blueprint, system, env_provider).await?; control_surfaces.attach_dogstatsd(dsd_control_surface); } @@ -655,8 +667,11 @@ async fn add_baseline_traces_pipeline_to_blueprint( } async fn add_dsd_pipeline_to_blueprint( - blueprint: &mut TopologyBlueprint, config: &GenericConfiguration, env_provider: &ADPEnvironmentProvider, + blueprint: &mut TopologyBlueprint, system: &ConfigurationSystem, env_provider: &ADPEnvironmentProvider, ) -> Result { + // The whole helper reads the raw map. The signature takes the system so each sub-component can + // move onto typed config independently without further changing the parameter list. + let config = system.raw_map(); // We're creating the "front half" of the DogStatsD pipeline, which deals solely with accepting DogStatsD payloads, // and enriching/processing them in DSD-specific ways, relevant to how the Datadog Agent is expected to behave. // @@ -692,8 +707,7 @@ async fn add_dsd_pipeline_to_blueprint( // │ (destination) │ │ (Datadog Platform) │ // └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ - let dsd_config = DogStatsDConfiguration::from_configuration(config) - .error_context("Failed to configure DogStatsD source.")? + let dsd_config = DogStatsDConfiguration::new(system.saluki().components.dogstatsd.source) .with_workload_provider(env_provider.workload().clone()) .with_capture_entity_resolver(env_provider.workload().clone()); let dsd_prefix_filter_configuration = DogStatsDPrefixFilterConfiguration::from_configuration(config)?; diff --git a/bin/agent-data-plane/src/internal/config_internal.rs b/bin/agent-data-plane/src/internal/config_internal.rs new file mode 100644 index 0000000000..885eef712f --- /dev/null +++ b/bin/agent-data-plane/src/internal/config_internal.rs @@ -0,0 +1,109 @@ +//! Internal configuration API handler. +//! +//! Serves the ADP-native [`SalukiConfiguration`] as JSON on the privileged `/config/internal` +//! route. This is the observable surface used to verify translation end-to-end, and a secondary +//! operator-debugging aid alongside the source-shaped `/config` route. +//! +//! The handler holds the shared [`ConfigurationSystem`] and reads the translated master per request, +//! so it automatically reflects any later re-translation without changing this worker. The body is +//! served raw, without scrubbing, exactly like the existing privileged `/config` route; scrubbing +//! is a client-side display concern. +//! +//! [`SalukiConfiguration`]: agent-data-plane-config + +use std::sync::Arc; + +use agent_data_plane_config_system::ConfigurationSystem; +use async_trait::async_trait; +use http::StatusCode; +use saluki_api::{ + extract::State, + response::IntoResponse, + routing::{get, Router}, + APIHandler, DynamicRoute, EndpointType, +}; +use saluki_common::sync::shutdown::ShutdownHandle; +use saluki_core::runtime::{state::DataspaceRegistry, InitializationError, Supervisable, SupervisorFuture}; +use saluki_error::generic_error; + +/// State for the internal configuration API handler. +#[derive(Clone)] +pub struct InternalConfigState { + system: Arc, +} + +/// An API handler for returning the ADP-native configuration. +/// +/// Exposes a single route -- `/config/internal` -- that serializes the translated +/// `SalukiConfiguration` to JSON. +pub struct InternalConfigAPIHandler { + state: InternalConfigState, +} + +impl InternalConfigAPIHandler { + fn new(system: Arc) -> Self { + Self { + state: InternalConfigState { system }, + } + } + + async fn config_handler(State(state): State) -> impl IntoResponse { + match serde_json::to_string(&state.system.saluki()) { + Ok(body) => (StatusCode::OK, body).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to serialize configuration: {}", e), + ) + .into_response(), + } + } +} + +impl APIHandler for InternalConfigAPIHandler { + type State = InternalConfigState; + + fn generate_initial_state(&self) -> Self::State { + self.state.clone() + } + + fn generate_routes(&self) -> Router { + Router::new().route("/config/internal", get(Self::config_handler)) + } +} + +/// A worker for exposing the ADP-native configuration. +/// +/// Asserts the `/config/internal` route on the privileged API endpoint. As the configuration may +/// contain sensitive data, the route is only present on the privileged endpoint. +pub struct InternalConfigWorker { + handler: InternalConfigAPIHandler, +} + +impl InternalConfigWorker { + /// Creates a new [`InternalConfigWorker`] backed by the shared configuration system. + pub fn new(system: Arc) -> Self { + Self { + handler: InternalConfigAPIHandler::new(system), + } + } +} + +#[async_trait] +impl Supervisable for InternalConfigWorker { + fn name(&self) -> &str { + "config-internal-api" + } + + async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result { + let config_route = DynamicRoute::http(EndpointType::Privileged, &self.handler); + + Ok(Box::pin(async move { + DataspaceRegistry::try_current() + .ok_or_else(|| generic_error!("Dataspace not available."))? + .assert(config_route, "config-internal-api"); + + process_shutdown.await; + Ok(()) + })) + } +} diff --git a/bin/agent-data-plane/src/internal/control_plane.rs b/bin/agent-data-plane/src/internal/control_plane.rs index f1315ac5d9..1851cecc21 100644 --- a/bin/agent-data-plane/src/internal/control_plane.rs +++ b/bin/agent-data-plane/src/internal/control_plane.rs @@ -1,3 +1,6 @@ +use std::sync::Arc; + +use agent_data_plane_config_system::ConfigurationSystem; use datadog_agent_commons::ipc::{config::IpcAuthConfiguration, tls::build_ipc_server_tls_config}; use resource_accounting::ComponentRegistry; use saluki_api::EndpointType; @@ -15,8 +18,8 @@ use saluki_error::GenericError; use crate::{ config::DataPlaneConfiguration, internal::{ - logging::DynamicLogLevelWorker, remote_agent::RemoteAgentBootstrap, telemetry::InternalTelemetryAPIWorker, - TopologyControlSurfaces, + config_internal::InternalConfigWorker, logging::DynamicLogLevelWorker, remote_agent::RemoteAgentBootstrap, + telemetry::InternalTelemetryAPIWorker, TopologyControlSurfaces, }, }; @@ -31,8 +34,8 @@ use crate::{ /// /// If the supervisor can't be created, an error is returned. pub async fn create_control_plane_supervisor( - config: &GenericConfiguration, dp_config: &DataPlaneConfiguration, component_registry: &ComponentRegistry, - health_registry: HealthRegistry, control_surfaces: TopologyControlSurfaces, + config: &GenericConfiguration, system: Arc, dp_config: &DataPlaneConfiguration, + component_registry: &ComponentRegistry, health_registry: HealthRegistry, control_surfaces: TopologyControlSurfaces, ra_bootstrap: Option, logging_controller: LoggingOverrideController, ) -> Result { let mut supervisor = Supervisor::new("ctrl-pln")? @@ -44,6 +47,7 @@ pub async fn create_control_plane_supervisor( supervisor.add_worker(InternalTelemetryAPIWorker::new()); supervisor.add_worker(DynamicLogLevelWorker::new(config, logging_controller)); supervisor.add_worker(ConfigWorker::new(config.clone())); + supervisor.add_worker(InternalConfigWorker::new(system)); supervisor.add_worker(DynamicAPIBuilder::new( EndpointType::Unprivileged, diff --git a/bin/agent-data-plane/src/internal/mod.rs b/bin/agent-data-plane/src/internal/mod.rs index b4934f3b8d..43f4f0d49d 100644 --- a/bin/agent-data-plane/src/internal/mod.rs +++ b/bin/agent-data-plane/src/internal/mod.rs @@ -1,3 +1,6 @@ +use std::sync::Arc; + +use agent_data_plane_config_system::ConfigurationSystem; use resource_accounting::ComponentRegistry; use saluki_app::logging::LoggingOverrideController; use saluki_config_tools::GenericConfiguration; @@ -7,6 +10,8 @@ use saluki_error::GenericError; use crate::config::DataPlaneConfiguration; +mod config_internal; + mod control_plane; pub use self::control_plane::create_control_plane_supervisor; @@ -35,8 +40,8 @@ mod telemetry; /// /// If the supervisor can't be created, an error is returned. pub async fn create_internal_supervisor( - config: &GenericConfiguration, dp_config: &DataPlaneConfiguration, component_registry: &ComponentRegistry, - health_registry: HealthRegistry, control_surfaces: TopologyControlSurfaces, + config: &GenericConfiguration, system: Arc, dp_config: &DataPlaneConfiguration, + component_registry: &ComponentRegistry, health_registry: HealthRegistry, control_surfaces: TopologyControlSurfaces, ra_bootstrap: Option, logging_controller: LoggingOverrideController, ) -> Result { // The root supervisor runs in ambient mode (caller's runtime) since its children each have their own @@ -48,6 +53,7 @@ pub async fn create_internal_supervisor( root.add_worker( create_control_plane_supervisor( config, + system, dp_config, component_registry, health_registry.clone(), diff --git a/lib/agent-data-plane-config-system/Cargo.toml b/lib/agent-data-plane-config-system/Cargo.toml index 490a547b16..0d58ed162a 100644 --- a/lib/agent-data-plane-config-system/Cargo.toml +++ b/lib/agent-data-plane-config-system/Cargo.toml @@ -10,8 +10,22 @@ workspace = true [dependencies] agent-data-plane-config = { workspace = true } +bytesize = { workspace = true } datadog-agent-config = { workspace = true } saluki-component-config = { workspace = true } +saluki-config-tools = { workspace = true } saluki-context = { workspace = true } +saluki-error = { workspace = true } serde_json = { workspace = true } stringtheory = { workspace = true } + +[build-dependencies] +datadog-agent-config-overlay-model = { workspace = true } +indexmap = { workspace = true } + +[dev-dependencies] +agent-data-plane-config = { workspace = true } +figment = { workspace = true } +saluki-config-tools = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/lib/agent-data-plane-config-system/build.rs b/lib/agent-data-plane-config-system/build.rs new file mode 100644 index 0000000000..e5eec9a0a9 --- /dev/null +++ b/lib/agent-data-plane-config-system/build.rs @@ -0,0 +1,111 @@ +use std::fmt::Write as _; +use std::path::PathBuf; + +use datadog_agent_config_overlay_model::schema_gen::{self, FieldInfo, FieldType}; +use datadog_agent_config_overlay_model::smoke_test_support::ConfigurationStruct; +use datadog_agent_config_overlay_model::{Files, KnownEntry, SchemaOverlay, TestSupport}; +use indexmap::IndexMap; + +fn main() { + let files = Files::default(); + + let schema_dir = files.schema.parent().expect("schema file must have a parent directory"); + println!("cargo:rerun-if-changed={}", schema_dir.display()); + println!("cargo:rerun-if-changed={}", files.overlay.display()); + println!("cargo:rerun-if-changed=build.rs"); + + let schema_path = files.schema.clone(); + let overlay = SchemaOverlay::load(files).unwrap_or_else(|e| panic!("{e}")); + let schema_map = schema_gen::load_schema(&schema_path); + + let out_dir = PathBuf::from(std::env::var("OUT_DIR").unwrap()); + generate_smoke_tests(&overlay, &schema_map, &out_dir); +} + +fn generate_smoke_tests(overlay: &SchemaOverlay, schema_map: &IndexMap, out_dir: &std::path::Path) { + let mut keys: Vec<(&str, &TestSupport, &FieldInfo)> = Vec::new(); + + for (yaml_path, entry) in &overlay.inventory { + let ts = match entry { + KnownEntry::Full(f) => &f.test_support, + KnownEntry::Partial(p) => &p.test_support, + _ => continue, + }; + if !ts.used_by.contains(&ConfigurationStruct::MigratedToConfigSystem) { + continue; + } + let field_info = schema_map + .get(yaml_path.as_str()) + .unwrap_or_else(|| panic!("MigratedToConfigSystem key '{}' not found in schema", yaml_path)); + keys.push((yaml_path.as_str(), ts, field_info)); + } + + keys.sort_by_key(|(path, _, _)| *path); + + let mut out = String::new(); + writeln!(out, "// @generated by build.rs from schema_overlay.yaml — DO NOT EDIT").unwrap(); + writeln!(out).unwrap(); + + for (yaml_path, ts, field_info) in &keys { + let fn_name = yaml_path.replace('.', "_"); + let test_value = resolve_test_value(yaml_path, ts, field_info); + + writeln!(out, "#[tokio::test]").unwrap(); + writeln!(out, "async fn smoke_{fn_name}() {{").unwrap(); + writeln!(out, " assert_key_propagates(\"{yaml_path}\", {test_value}).await;").unwrap(); + writeln!(out, "}}").unwrap(); + writeln!(out).unwrap(); + } + + let path = out_dir.join("translation_smoke_tests.rs"); + std::fs::write(&path, out).unwrap_or_else(|e| panic!("cannot write {}: {}", path.display(), e)); +} + +fn resolve_test_value(yaml_path: &str, ts: &TestSupport, field_info: &FieldInfo) -> String { + if let Some(ref test_json) = ts.test_json { + return format_json_literal(test_json); + } + + let effective_type = ts + .value_type_override + .as_ref() + .map_or(&field_info.value_type, |vt| match vt { + datadog_agent_config_overlay_model::ValueType::Boolean => &FieldType::Bool, + datadog_agent_config_overlay_model::ValueType::Integer => &FieldType::Integer, + datadog_agent_config_overlay_model::ValueType::Float => &FieldType::Float, + datadog_agent_config_overlay_model::ValueType::String => &FieldType::String, + datadog_agent_config_overlay_model::ValueType::StringList => &FieldType::StringList, + }); + + match effective_type { + FieldType::Bool => { + let default_is_true = field_info + .default + .as_deref() + .map(|d| d.trim() == "true") + .unwrap_or(false); + if default_is_true { + "serde_json::json!(false)".to_string() + } else { + "serde_json::json!(true)".to_string() + } + } + FieldType::Integer => "serde_json::json!(42i64)".to_string(), + FieldType::Float => "serde_json::json!(1.5f64)".to_string(), + FieldType::String => "serde_json::json!(\"http://smoke-test.example.com:3128\")".to_string(), + FieldType::StringList => { + "serde_json::json!([\"smoke-host-1.example.com\", \"smoke-host-2.example.com\"])".to_string() + } + FieldType::Unknown => { + panic!("MigratedToConfigSystem key '{yaml_path}' has unknown type and no test_json override"); + } + } +} + +fn format_json_literal(raw: &str) -> String { + if raw.contains('"') { + format!("serde_json::from_str::(r#\"{raw}\"#).unwrap()") + } else { + format!("serde_json::json!({raw})") + } +} diff --git a/lib/agent-data-plane-config-system/src/lib.rs b/lib/agent-data-plane-config-system/src/lib.rs index 3825b5e11a..9b370e0307 100644 --- a/lib/agent-data-plane-config-system/src/lib.rs +++ b/lib/agent-data-plane-config-system/src/lib.rs @@ -49,4 +49,10 @@ //! // TODO(visibility): add crate-boundary architectural guard when arch tests are wired up +mod system; pub(crate) mod translate; + +pub use self::system::{ConfigurationSystem, ConfigurationSystemLoader}; + +#[cfg(test)] +mod smoke_test; diff --git a/lib/agent-data-plane-config-system/src/smoke_test/mod.rs b/lib/agent-data-plane-config-system/src/smoke_test/mod.rs new file mode 100644 index 0000000000..cc23d51504 --- /dev/null +++ b/lib/agent-data-plane-config-system/src/smoke_test/mod.rs @@ -0,0 +1,50 @@ +//! Tests to verify that changes to keys in a `GenericConfiguration` reach `SalukiConfiguration`. +//! +//! Each test checks a single key by setting a non default value in a `GenericConfiguration` map +//! then running that map through `DatadogConfiguration` deserialization and witness trait +//! translation. The resulting `SalukiConfiguration` is checked against default construction to +//! ensure that it's value has been perturbed. This ensures that none of the witness trait function +//! implementations are inert. +use saluki_config_tools::ConfigurationLoader; +use serde_json::json; + +use crate::system::ConfigurationSystem; + +/// Create a `GenericConfiguration` from the root JSON value. +async fn generic_from(value: serde_json::Value) -> saluki_config_tools::GenericConfiguration { + ConfigurationLoader::default() + .add_providers([figment::providers::Serialized::defaults(value)]) + .into_generic() + .await + .expect("build generic configuration") +} + +/// Drive deserialization and translation. +async fn translate_from(value: serde_json::Value) -> agent_data_plane_config::SalukiConfiguration { + let config = generic_from(value).await; + let system = ConfigurationSystem::load(config) + .start() + .await + .expect("translation must succeed"); + system.saluki() +} + +/// Check that the `SalukiConfiguration` object is not default constructed after deserialization +/// and translation. +async fn assert_key_propagates(yaml_path: &str, test_value: serde_json::Value) { + let default = translate_from(json!({})).await; + + // Set up a JSON object holding the test value at our key-path. + let mut map = json!({}); + saluki_config_tools::upsert(&mut map, yaml_path, test_value); + let with_value = translate_from(map).await; + + assert_ne!( + default, with_value, + "key '{}' did not change SalukiConfiguration — \ + is the test value the same as the default, or is translation not wired?", + yaml_path, + ); +} + +include!(concat!(env!("OUT_DIR"), "/translation_smoke_tests.rs")); diff --git a/lib/agent-data-plane-config-system/src/system.rs b/lib/agent-data-plane-config-system/src/system.rs new file mode 100644 index 0000000000..93336e9073 --- /dev/null +++ b/lib/agent-data-plane-config-system/src/system.rs @@ -0,0 +1,202 @@ +//! The configuration system lifecycle. +//! +//! [`ConfigurationSystem`] loads and starts through two types in a consume-by-value transition: +//! +//! ```text +//! let loader = ConfigurationSystem::load(config); // synchronous +//! let system = Arc::new(loader.start().await?); // async, consumes the loader +//! ``` +//! +//! - [`ConfigurationSystem::load`] takes ownership of the already-final [`GenericConfiguration`] +//! that `run.rs` builds (after remote-agent / config-stream resolution) and returns a +//! [`ConfigurationSystemLoader`]. +//! - [`ConfigurationSystemLoader::start`] consumes the loader, runs translation once, and returns +//! the running [`ConfigurationSystem`]. +//! +//! The running system exposes two views: +//! +//! - [`ConfigurationSystem::raw_map`] -- a `&GenericConfiguration` escape hatch for components that +//! have not yet been flipped onto typed config. It stays live (the Agent config stream still feeds +//! it), so un-flipped components see updates exactly as before. +//! - [`ConfigurationSystem::saluki`] -- an owned clone of the translated master +//! [`SalukiConfiguration`]. The master is translated once at `start()` and frozen for the process +//! lifetime; only the DogStatsD source is real in this stage. + +use agent_data_plane_config::{SalukiConfiguration, SalukiOnlyConfiguration}; +use bytesize::ByteSize; +use datadog_agent_config::DatadogConfiguration; +use saluki_config_tools::GenericConfiguration; +use saluki_error::{ErrorContext as _, GenericError}; + +use crate::translate::translate; + +/// The running configuration system. +/// +/// Holds the retained source map and the translated master [`SalukiConfiguration`]. The master is a +/// plain, immutable field: translated once at [`ConfigurationSystemLoader::start`] and never +/// mutated. A later dynamic flip swaps this for a swappable container without changing +/// [`saluki`](Self::saluki) or [`raw_map`](Self::raw_map). +pub struct ConfigurationSystem { + /// The retained source map. The single source of raw config for un-flipped components. + config: GenericConfiguration, + + /// The translated master. Frozen at `start()`; only the DogStatsD source is real in this stage. + saluki: SalukiConfiguration, +} + +impl ConfigurationSystem { + /// Takes ownership of the final [`GenericConfiguration`] and returns a loader. + /// + /// This is the synchronous first half of the lifecycle. The returned + /// [`ConfigurationSystemLoader`] holds the loaded snapshot and transitions to a running + /// [`ConfigurationSystem`] in one [`start`](ConfigurationSystemLoader::start) step. + // TODO: eliminate GenericConfiguration arg; config-system should own loading + pub fn load(config: GenericConfiguration) -> ConfigurationSystemLoader { + ConfigurationSystemLoader { config } + } + + /// Returns the retained source map. + /// + /// This is the escape hatch every un-flipped component reads through during the cutover. It is + /// removed when the last `raw_map()` caller is flipped onto typed config. + pub fn raw_map(&self) -> &GenericConfiguration { + &self.config + } + + /// Returns an owned clone of the translated master [`SalukiConfiguration`]. + /// + /// The clone is cheap relative to startup cost and lets a flipped helper take a fully owned + /// slice. In this stage only `components.dogstatsd.source` carries real translated values. + pub fn saluki(&self) -> SalukiConfiguration { + self.saluki.clone() + } +} + +/// A loaded-but-not-started configuration system. +/// +/// Not a builder: it holds a loaded snapshot and transitions to a running [`ConfigurationSystem`] +/// in a single [`start`](Self::start) call that consumes it by value. +pub struct ConfigurationSystemLoader { + config: GenericConfiguration, +} + +impl ConfigurationSystemLoader { + /// Starts the runtime, consuming the loader by value. + /// + /// Parses the retained map as a [`DatadogConfiguration`], translates it once into the master + /// [`SalukiConfiguration`], and returns the running system. This is `async` to lock the + /// end-state signature (it will later await the Agent's initial config once config-system owns + /// loading); it currently awaits nothing. + /// + /// # Errors + /// + /// Returns an error if the source map cannot be parsed as a [`DatadogConfiguration`] or if + /// translation fails. At startup either is fatal: the caller bails and the process exits. + pub async fn start(self) -> Result { + let datadog: DatadogConfiguration = self + .config + .as_typed() + .error_context("Failed to parse Datadog configuration.")?; + + let mut saluki_only = SalukiOnlyConfiguration::default(); + bridge_dd_fallbacks(&mut saluki_only, &self.config); + + let saluki = translate(&saluki_only, &datadog).error_context("Failed to translate configuration.")?; + + Ok(ConfigurationSystem { + config: self.config, + saluki, + }) + } +} + +/// Bridges Saluki-schema-only DogStatsD keys from the Datadog source (`DD_*`) into the Saluki-only +/// struct. +/// +/// Saluki-schema-only keys are owned by the Saluki source (`SALUKI_*` / `saluki.yaml`) and the clean +/// end state reads them only from there. Existing deployments and tests set these keys via `DD_*`, +/// so until a `SALUKI_*` source is wired this bridges each value from the retained map. This is +/// temporary transition scaffolding that is removed when Saluki-only loading lands. +// TODO: delete once SALUKI_* loading is wired; keeps DD_* parity for DogStatsD Saluki-only knobs. +fn bridge_dd_fallbacks(saluki_only: &mut SalukiOnlyConfiguration, dd: &GenericConfiguration) { + let dsd = &mut saluki_only.dogstatsd; + + if let Ok(Some(v)) = dd.try_get_typed::("dogstatsd_allow_context_heap_allocs") { + dsd.allow_context_heap_allocs = Some(v); + } + if let Ok(Some(v)) = dd.try_get_typed::("dogstatsd_autoscale_udp_listeners") { + dsd.autoscale_udp_listeners = Some(v); + } + if let Ok(Some(v)) = dd.try_get_typed::("dogstatsd_buffer_count") { + dsd.buffer_count = Some(v); + } + if let Ok(Some(v)) = dd.try_get_typed::("dogstatsd_cached_contexts_limit") { + dsd.cached_contexts_limit = Some(v); + } + if let Ok(Some(v)) = dd.try_get_typed::("dogstatsd_cached_tagsets_limit") { + dsd.cached_tagsets_limit = Some(v); + } + if let Ok(Some(v)) = dd.try_get_typed::("dogstatsd_minimum_sample_rate") { + dsd.minimum_sample_rate = Some(v); + } + if let Ok(Some(v)) = dd.try_get_typed::("dogstatsd_permissive_decoding") { + dsd.permissive_decoding = Some(v); + } + if let Ok(Some(v)) = dd.try_get_typed::("dogstatsd_string_interner_size_bytes") { + dsd.string_interner_size_bytes = Some(v.0); + } + if let Ok(Some(v)) = dd.try_get_typed::("dogstatsd_tcp_port") { + dsd.tcp_port = Some(v); + } +} + +#[cfg(test)] +mod tests { + use saluki_config_tools::ConfigurationLoader; + use serde_json::json; + + use super::*; + + async fn generic_from(value: serde_json::Value) -> GenericConfiguration { + ConfigurationLoader::default() + .add_providers([figment::providers::Serialized::defaults(value)]) + .into_generic() + .await + .expect("build generic configuration") + } + + #[tokio::test] + async fn load_start_saluki_translates_dogstatsd_source() { + let config = generic_from(json!({ "dogstatsd_port": 7000, "dogstatsd_non_local_traffic": true })).await; + + let system = ConfigurationSystem::load(config).start().await.expect("start succeeds"); + let saluki = system.saluki(); + + assert_eq!(saluki.components.dogstatsd.source.port, 7000); + assert!(saluki.components.dogstatsd.source.non_local_traffic); + } + + #[tokio::test] + async fn raw_map_returns_retained_source_map() { + let config = generic_from(json!({ "dogstatsd_port": 9123 })).await; + + let system = ConfigurationSystem::load(config).start().await.expect("start succeeds"); + + let port: u16 = system + .raw_map() + .get_typed("dogstatsd_port") + .expect("dogstatsd_port present in raw map"); + assert_eq!(port, 9123); + } + + #[tokio::test] + async fn bridge_carries_saluki_only_knob_from_dd_keys() { + // `dogstatsd_tcp_port` is a Saluki-schema-only knob: it is not witnessed, so it reaches the + // native source only through the seed. The bridge must carry the DD_* value into the seed. + let config = generic_from(json!({ "dogstatsd_tcp_port": 8200 })).await; + + let system = ConfigurationSystem::load(config).start().await.expect("start succeeds"); + + assert_eq!(system.saluki().components.dogstatsd.source.tcp_port, 8200); + } +} diff --git a/lib/agent-data-plane-config/src/dogstatsd.rs b/lib/agent-data-plane-config/src/dogstatsd.rs index dbc1cdd57b..c19cad7c8f 100644 --- a/lib/agent-data-plane-config/src/dogstatsd.rs +++ b/lib/agent-data-plane-config/src/dogstatsd.rs @@ -1,6 +1,17 @@ //! DogStatsD-domain component configuration group. -/// DogStatsD-domain component configuration: source, mapper, aggregate, debug-log, and filter. +/// Configuration for the DogStatsD *domain*: the family of components whose Datadog config keys +/// share the `dogstatsd_*` namespace (source, mapper, aggregate, debug-log, and filter). +/// +/// This is an umbrella holding one config slice per family member, not the config of any single +/// component. It currently carries only `source`, because the source is the only member cut over to +/// the translated-config system so far; the mapper, aggregate, debug-log, and filter slices are +/// added here as those components are cut over. +/// +/// "DogStatsD" names both this domain family and the source component specifically. This `Config` is +/// the family. The source component's own config is `SourceConfig` (embedded below, defined in +/// `saluki-component-config`), which the source builder `DogStatsDConfiguration` (in +/// `saluki-components`) also embeds by value. #[derive(Clone, Debug, Default, PartialEq, serde::Serialize)] pub struct Config { /// DogStatsD source configuration (listeners, parser/decoding options). diff --git a/lib/datadog-agent/config-overlay-model/src/saluki_keys.rs b/lib/datadog-agent/config-overlay-model/src/saluki_keys.rs index 48794513f6..9ca0985878 100644 --- a/lib/datadog-agent/config-overlay-model/src/saluki_keys.rs +++ b/lib/datadog-agent/config-overlay-model/src/saluki_keys.rs @@ -55,7 +55,7 @@ pub static SALUKI_KEYS: &[SalukiKey] = &[ env_vars: &[], env_var_override: None, additional_yaml_paths: &[], - used_by: &["DOGSTATSD_CONFIGURATION"], + used_by: &["MIGRATED_TO_CONFIG_SYSTEM"], test_json: None, pipeline_affinity: "PipelineAffinity::Pipelines(&[Pipeline::DogStatsD])", filename: "dogstatsd.rs", @@ -70,7 +70,7 @@ pub static SALUKI_KEYS: &[SalukiKey] = &[ env_vars: &[], env_var_override: None, additional_yaml_paths: &[], - used_by: &["DOGSTATSD_CONFIGURATION"], + used_by: &["MIGRATED_TO_CONFIG_SYSTEM"], test_json: None, pipeline_affinity: "PipelineAffinity::Pipelines(&[Pipeline::DogStatsD])", filename: "dogstatsd.rs", @@ -100,7 +100,7 @@ pub static SALUKI_KEYS: &[SalukiKey] = &[ env_vars: &[], env_var_override: None, additional_yaml_paths: &[], - used_by: &["DOGSTATSD_CONFIGURATION"], + used_by: &["MIGRATED_TO_CONFIG_SYSTEM"], test_json: None, pipeline_affinity: "PipelineAffinity::Pipelines(&[Pipeline::DogStatsD])", filename: "dogstatsd.rs", @@ -115,7 +115,7 @@ pub static SALUKI_KEYS: &[SalukiKey] = &[ env_vars: &[], env_var_override: None, additional_yaml_paths: &[], - used_by: &["DOGSTATSD_CONFIGURATION"], + used_by: &["MIGRATED_TO_CONFIG_SYSTEM"], test_json: None, pipeline_affinity: "PipelineAffinity::Pipelines(&[Pipeline::DogStatsD])", filename: "dogstatsd.rs", @@ -130,7 +130,7 @@ pub static SALUKI_KEYS: &[SalukiKey] = &[ env_vars: &[], env_var_override: None, additional_yaml_paths: &[], - used_by: &["DOGSTATSD_CONFIGURATION"], + used_by: &["MIGRATED_TO_CONFIG_SYSTEM"], test_json: None, pipeline_affinity: "PipelineAffinity::Pipelines(&[Pipeline::DogStatsD])", filename: "dogstatsd.rs", @@ -151,7 +151,7 @@ pub static SALUKI_KEYS: &[SalukiKey] = &[ env_vars: &[], env_var_override: None, additional_yaml_paths: &[], - used_by: &["DOGSTATSD_CONFIGURATION"], + used_by: &["MIGRATED_TO_CONFIG_SYSTEM"], test_json: None, pipeline_affinity: "PipelineAffinity::Pipelines(&[Pipeline::DogStatsD])", filename: "dogstatsd.rs", @@ -171,7 +171,7 @@ pub static SALUKI_KEYS: &[SalukiKey] = &[ env_vars: &[], env_var_override: None, additional_yaml_paths: &[], - used_by: &["DOGSTATSD_CONFIGURATION"], + used_by: &["MIGRATED_TO_CONFIG_SYSTEM"], test_json: None, pipeline_affinity: "PipelineAffinity::Pipelines(&[Pipeline::DogStatsD])", filename: "dogstatsd.rs", @@ -186,7 +186,7 @@ pub static SALUKI_KEYS: &[SalukiKey] = &[ env_vars: &[], env_var_override: None, additional_yaml_paths: &[], - used_by: &["DOGSTATSD_CONFIGURATION"], + used_by: &["MIGRATED_TO_CONFIG_SYSTEM"], test_json: None, pipeline_affinity: "PipelineAffinity::Pipelines(&[Pipeline::DogStatsD])", filename: "dogstatsd.rs", @@ -201,7 +201,7 @@ pub static SALUKI_KEYS: &[SalukiKey] = &[ env_vars: &[], env_var_override: None, additional_yaml_paths: &[], - used_by: &["DOGSTATSD_CONFIGURATION"], + used_by: &["MIGRATED_TO_CONFIG_SYSTEM"], test_json: None, pipeline_affinity: "PipelineAffinity::Pipelines(&[Pipeline::DogStatsD])", filename: "dogstatsd.rs", diff --git a/lib/datadog-agent/config-overlay-model/src/smoke_test_support.rs b/lib/datadog-agent/config-overlay-model/src/smoke_test_support.rs index 4fec556d7e..a5f9a0ac5d 100644 --- a/lib/datadog-agent/config-overlay-model/src/smoke_test_support.rs +++ b/lib/datadog-agent/config-overlay-model/src/smoke_test_support.rs @@ -12,6 +12,11 @@ use serde::{Deserialize, Serialize}; /// configuration smoke test code generation. Each variant in this enum should be an exact name /// match to a struct that consumes its value directly from Agent configuration either by /// deserializing or by environment variable. +/// +/// One variant, `MigratedToConfigSystem`, is not a struct: it is a sentinel for keys that no +/// longer deserialize into any struct because their component now reads them through the config +/// translation system. It keeps those keys' `used_by` non-empty (required by overlay validation) +/// without claiming a deserializing consumer. #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)] pub enum ConfigurationStruct { AggregateConfiguration, @@ -39,6 +44,10 @@ pub enum ConfigurationStruct { /// Keys read via `get_typed` / `try_get_typed` rather than struct deserialization. #[serde(rename = "get_typed")] GetTyped, + + /// Sentinel for keys whose component now reads them through the config translation system + /// instead of deserializing them into a struct. Not a real struct; see the type docs. + MigratedToConfigSystem, } impl ConfigurationStruct { @@ -68,6 +77,7 @@ impl ConfigurationStruct { ConfigurationStruct::TagFilterlistConfiguration => "TAG_FILTERLIST_CONFIGURATION", ConfigurationStruct::TraceObfuscationConfiguration => "TRACE_OBFUSCATION_CONFIGURATION", ConfigurationStruct::GetTyped => "GET_TYPED", + ConfigurationStruct::MigratedToConfigSystem => "MIGRATED_TO_CONFIG_SYSTEM", } } } diff --git a/lib/datadog-agent/config-testing/src/config_registry/aggregate.rs b/lib/datadog-agent/config-testing/src/config_registry/aggregate.rs index 8462e88c0e..18df7668f7 100644 --- a/lib/datadog-agent/config-testing/src/config_registry/aggregate.rs +++ b/lib/datadog-agent/config-testing/src/config_registry/aggregate.rs @@ -106,7 +106,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::AGGREGATE_CONFIGURATION, structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::AGGREGATE_CONFIGURATION], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD, Pipeline::Checks]), diff --git a/lib/datadog-agent/config-testing/src/config_registry/dogstatsd.rs b/lib/datadog-agent/config-testing/src/config_registry/dogstatsd.rs index 0539ec203f..6999f684a5 100644 --- a/lib/datadog-agent/config-testing/src/config_registry/dogstatsd.rs +++ b/lib/datadog-agent/config-testing/src/config_registry/dogstatsd.rs @@ -91,7 +91,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -102,7 +102,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: Some(ValueType::Integer), test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -113,7 +113,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: Some(ValueType::Integer), test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -124,7 +124,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -135,7 +135,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: Some(ValueType::Integer), test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -146,7 +146,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: Some(&["DD_DOGSTATSD_ENTITY_ID_PRECEDENCE"]), - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -157,7 +157,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: Some(r#"["udp"]"#), pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -168,7 +168,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -179,7 +179,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: Some(&["DD_DOGSTATSD_ORIGIN_DETECTION"]), - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -190,7 +190,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: Some(&["DD_DOGSTATSD_ORIGIN_DETECTION_CLIENT"]), - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -201,7 +201,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: Some(&["DD_DOGSTATSD_ORIGIN_OPTOUT_ENABLED"]), - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -212,7 +212,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: Some(ValueType::Integer), test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -223,7 +223,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -234,7 +234,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: Some(ValueType::Integer), test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -245,7 +245,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -256,7 +256,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -267,7 +267,7 @@ crate::declare_annotations! { support_level: SupportLevel::Partial, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: Some(r#""127.0.0.1""#), pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -278,7 +278,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: Some(ValueType::Integer), test_json: Some("9125"), pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -289,7 +289,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: Some(ValueType::Integer), test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -300,7 +300,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: Some(&["DD_DOGSTATSD_TAG_CARDINALITY"]), - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: Some(r#""high""#), pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -311,7 +311,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -322,7 +322,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: Some(&["DD_PROVIDER_KIND"]), - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -333,7 +333,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: Some(&["DD_ORIGIN_DETECTION_UNIFIED"]), - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -399,7 +399,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -410,7 +410,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -432,7 +432,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -443,7 +443,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -454,7 +454,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -465,7 +465,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -476,7 +476,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -487,7 +487,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -498,7 +498,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -509,7 +509,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -520,7 +520,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -531,7 +531,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -542,7 +542,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), @@ -553,7 +553,7 @@ crate::declare_annotations! { support_level: SupportLevel::Full, additional_yaml_paths: &[], env_var_override: None, - used_by: &[structs::DOGSTATSD_CONFIGURATION], + used_by: &[structs::MIGRATED_TO_CONFIG_SYSTEM], value_type_override: None, test_json: None, pipeline_affinity: PipelineAffinity::Pipelines(&[Pipeline::DogStatsD]), diff --git a/lib/datadog-agent/config/schema/schema_overlay.yaml b/lib/datadog-agent/config/schema/schema_overlay.yaml index 0b739b0ca8..79ceb76485 100644 --- a/lib/datadog-agent/config/schema/schema_overlay.yaml +++ b/lib/datadog-agent/config/schema/schema_overlay.yaml @@ -409,7 +409,7 @@ inventory: pipelines: [dogstatsd] description: "Global listen host fallback" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -624,7 +624,7 @@ inventory: pipelines: [dogstatsd] description: "Receive buffer size (bytes)" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] value_type_override: integer additional_attributes: config_registry_filename: dogstatsd.rs @@ -635,7 +635,7 @@ inventory: description: "Traffic capture channel depth" issue: "#1381" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] value_type_override: integer additional_attributes: config_registry_filename: dogstatsd.rs @@ -646,7 +646,7 @@ inventory: description: "Traffic capture file location" issue: "#1381" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -655,7 +655,7 @@ inventory: pipelines: [dogstatsd] description: "Context cache TTL (seconds)" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] value_type_override: integer additional_attributes: config_registry_filename: dogstatsd.rs @@ -666,7 +666,7 @@ inventory: description: "Suppress noisy parse error logs" issue: "#1350" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -675,7 +675,7 @@ inventory: pipelines: [dogstatsd] description: "Entity ID over auto-detection" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] env_var_override: [DD_DOGSTATSD_ENTITY_ID_PRECEDENCE] additional_attributes: config_registry_filename: dogstatsd.rs @@ -685,7 +685,7 @@ inventory: pipelines: [dogstatsd] description: "Require newline-terminated messages" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] test_json: '["udp"]' additional_attributes: config_registry_filename: dogstatsd.rs @@ -934,7 +934,7 @@ inventory: pipelines: [dogstatsd, checks] description: "Enable no-agg timestamped path" test_support: - used_by: [AggregateConfiguration, DogStatsDConfiguration] + used_by: [AggregateConfiguration] additional_attributes: config_registry_filename: aggregate.rs @@ -951,7 +951,7 @@ inventory: pipelines: [dogstatsd] description: "Accept non-localhost UDP/TCP" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -960,7 +960,7 @@ inventory: pipelines: [dogstatsd] description: "Enable UDS origin detection" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] env_var_override: [DD_DOGSTATSD_ORIGIN_DETECTION] additional_attributes: config_registry_filename: dogstatsd.rs @@ -970,7 +970,7 @@ inventory: pipelines: [dogstatsd] description: "Honor client origin proto fields" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] env_var_override: [DD_DOGSTATSD_ORIGIN_DETECTION_CLIENT] additional_attributes: config_registry_filename: dogstatsd.rs @@ -980,7 +980,7 @@ inventory: pipelines: [dogstatsd] description: "Allow clients to opt out origin" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] env_var_override: [DD_DOGSTATSD_ORIGIN_OPTOUT_ENABLED] additional_attributes: config_registry_filename: dogstatsd.rs @@ -1031,7 +1031,7 @@ inventory: pipelines: [dogstatsd] description: "UDP listen port" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] value_type_override: integer additional_attributes: config_registry_filename: dogstatsd.rs @@ -1049,7 +1049,7 @@ inventory: pipelines: [dogstatsd] description: "Socket receive buffer size" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] value_type_override: integer additional_attributes: config_registry_filename: dogstatsd.rs @@ -1059,7 +1059,7 @@ inventory: pipelines: [dogstatsd] description: "UDS datagram socket path" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -1122,7 +1122,7 @@ inventory: pipelines: [dogstatsd] description: "Log oversized UDS stream frames" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -1131,7 +1131,7 @@ inventory: pipelines: [dogstatsd] description: "UDS stream socket path" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -1140,7 +1140,7 @@ inventory: pipelines: [dogstatsd] description: "String interner capacity" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] value_type_override: integer additional_attributes: config_registry_filename: dogstatsd.rs @@ -1150,7 +1150,7 @@ inventory: pipelines: [dogstatsd] description: "Default tag cardinality level" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] env_var_override: [DD_DOGSTATSD_TAG_CARDINALITY] test_json: '"high"' additional_attributes: @@ -1161,7 +1161,7 @@ inventory: pipelines: [dogstatsd] description: "Extra tags added to all DSD data" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -1204,7 +1204,7 @@ inventory: pipelines: [dogstatsd] description: "Allow sending event payloads" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -1213,7 +1213,7 @@ inventory: pipelines: [dogstatsd] description: "Allow sending series payloads" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -1222,7 +1222,7 @@ inventory: pipelines: [dogstatsd] description: "Allow sending service check payloads" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -1231,7 +1231,7 @@ inventory: pipelines: [dogstatsd] description: "Allow sending sketch payloads" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] additional_attributes: config_registry_filename: dogstatsd.rs @@ -1804,7 +1804,7 @@ inventory: pipelines: [dogstatsd] description: "Unified origin detection mode" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] env_var_override: [DD_ORIGIN_DETECTION_UNIFIED] additional_attributes: config_registry_filename: dogstatsd.rs @@ -1898,7 +1898,7 @@ inventory: pipelines: [dogstatsd] description: "Provider kind static tag" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] env_var_override: [DD_PROVIDER_KIND] additional_attributes: config_registry_filename: dogstatsd.rs @@ -2137,7 +2137,7 @@ inventory: be split differently while carrying the same DogStatsD messages. ADP logs setup failures and tracks send failures through telemetry. test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] test_json: '"127.0.0.1"' additional_attributes: config_registry_filename: dogstatsd.rs @@ -2147,7 +2147,7 @@ inventory: pipelines: [dogstatsd] description: "UDP packet forwarding destination port" test_support: - used_by: [DogStatsDConfiguration] + used_by: [MigratedToConfigSystem] value_type_override: integer test_json: "9125" additional_attributes: diff --git a/lib/datadog-agent/config/src/classifier/mod.rs b/lib/datadog-agent/config/src/classifier/mod.rs index 397bedbdd5..cedb22df9e 100644 --- a/lib/datadog-agent/config/src/classifier/mod.rs +++ b/lib/datadog-agent/config/src/classifier/mod.rs @@ -81,6 +81,8 @@ pub mod structs { pub const TAG_FILTERLIST_CONFIGURATION: &str = "TagFilterlistConfiguration"; /// Keys read via `get_typed` / `try_get_typed` rather than struct deserialization. pub const GET_TYPED: &str = "get_typed"; + /// Sentinel for keys migrated to the config translation system (not a real struct). + pub const MIGRATED_TO_CONFIG_SYSTEM: &str = "MigratedToConfigSystem"; } /// The ADP pipeline a config key affects. diff --git a/lib/saluki-component-config/src/dogstatsd.rs b/lib/saluki-component-config/src/dogstatsd.rs index 098fa0aa29..4a8f52425d 100644 --- a/lib/saluki-component-config/src/dogstatsd.rs +++ b/lib/saluki-component-config/src/dogstatsd.rs @@ -1,12 +1,10 @@ -//! Component-native configuration for the DogStatsD source. +//! Configuration for the DogStatsD source component. //! -//! Mirrors `DogStatsDConfiguration` in `saluki-components` with source key names, aliases, and -//! `Deserialize` implementations stripped. This is the resolved, source-agnostic data the source -//! component receives as its runtime configuration. -//! -//! Injected/non-config state is excluded: workload providers, capture-entity resolvers, capture and -//! replay control handles, and the retained `Option`. -// TODO: delete `saluki-components` duplication of these at cut-over time and delete this comment! +//! The types here hold the resolved, source-agnostic config values the DogStatsD source consumes at +//! runtime. They are plain data: no behavior, no runtime handles, and no `Deserialize`. The +//! translated-config system builds them field-by-field from Datadog keys; nothing deserializes into +//! them. See the `SourceConfig` docs for how this slice relates to `dogstatsd::Config` and +//! `DogStatsDConfiguration`. use std::path::PathBuf; @@ -14,11 +12,21 @@ use bytesize::ByteSize; use saluki_context::origin::OriginTagCardinality; use stringtheory::MetaString; -/// Configuration for the DogStatsD source component. +/// Configuration data for the DogStatsD *source* component: the resolved, source-agnostic values +/// the source consumes (listeners, parser and decoding options, interner sizing, capture settings). +/// +/// Plain data only - no behavior, no runtime handles, and no `Deserialize`. The translated-config +/// system builds this field-by-field from Datadog keys; it is never deserialized into directly. It +/// derives `Serialize` for diagnostics. +/// +/// It lives in this leaf crate so its two consumers can share it without depending on each other: +/// - `agent-data-plane-config`'s `dogstatsd::Config` embeds it as the `source` slice of the +/// DogStatsD domain family. +/// - `saluki-components`' `DogStatsDConfiguration` (the source's builder) embeds it by value and +/// adds the runtime handles plus the `build()` behavior. /// -/// Mirrors `DogStatsDConfiguration` in `saluki-components`. The injected `workload_provider`, -/// `capture_entity_resolver`, `capture_control`, and `replay_control` fields are excluded as -/// runtime-injected state. +/// It is named `SourceConfig`, not `Config`, because `dogstatsd::Config` is the larger domain group +/// that contains this slice. #[derive(Clone, Debug, PartialEq, serde::Serialize)] pub struct SourceConfig { /// The size of the receive buffer, in bytes. diff --git a/lib/saluki-components/Cargo.toml b/lib/saluki-components/Cargo.toml index ef039b73b7..b271fa773c 100644 --- a/lib/saluki-components/Cargo.toml +++ b/lib/saluki-components/Cargo.toml @@ -63,6 +63,7 @@ resource-accounting = { workspace = true } rmp-serde = { workspace = true } saluki-api = { workspace = true } saluki-common = { workspace = true } +saluki-component-config = { workspace = true } saluki-config-tools = { workspace = true } saluki-context = { workspace = true } saluki-core = { workspace = true } diff --git a/lib/saluki-components/src/sources/dogstatsd/mod.rs b/lib/saluki-components/src/sources/dogstatsd/mod.rs index caccc135f8..547f576d6f 100644 --- a/lib/saluki-components/src/sources/dogstatsd/mod.rs +++ b/lib/saluki-components/src/sources/dogstatsd/mod.rs @@ -10,7 +10,6 @@ use std::{ collections::VecDeque, future::Future, num::NonZeroUsize, - path::PathBuf, pin::Pin, sync::{Arc, LazyLock}, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -24,7 +23,6 @@ use saluki_common::{ sync::shutdown::{ShutdownCoordinator, ShutdownHandle}, task::spawn_traced_named, }; -use saluki_config_tools::{deserialize_space_separated_or_seq, GenericConfiguration}; use saluki_context::{ origin::RawOrigin, tags::{RawTags, RawTagsFilter}, @@ -54,8 +52,6 @@ use saluki_io::{ ConnectionAddress, ListenAddress, ProcessCredentials, ProcessIdentity, Stream, }, }; -use serde::{Deserialize, Deserializer}; -use serde_with::{serde_as, NoneAsEmptyString}; use snafu::{ResultExt as _, Snafu}; use stringtheory::MetaString; use tokio::{ @@ -90,7 +86,7 @@ pub use self::replay::{ mod origin; use self::origin::{ mark_replay_process_id, origin_from_event_packet, origin_from_metric_packet, origin_from_service_check_packet, - DogStatsDOriginTagResolver, OriginEnrichmentConfiguration, + DogStatsDOriginTagResolver, }; mod resolver; @@ -123,470 +119,52 @@ enum Error { /// 4096 entries × 512 bytes = 2 MiB, matching ADP's previous default. const INTERNER_BASELINE_BYTES_PER_ENTRY: u64 = 512; -const fn default_buffer_size() -> usize { - 8192 -} - -const fn default_buffer_count() -> usize { - 128 -} - -const fn default_buffer_count_max() -> usize { - 256 -} - -const fn default_port() -> u16 { - 8125 -} - -const fn default_tcp_port() -> u16 { - 0 -} - -const fn default_statsd_forward_port() -> u16 { - 0 -} - -const fn default_socket_receive_buffer_size() -> usize { - 0 -} - -const fn default_allow_context_heap_allocations() -> bool { - true -} - -const fn default_no_aggregation_pipeline_support() -> bool { - true -} - -const fn default_context_string_interner_entry_count() -> u64 { - 4096 -} - -const fn default_cached_contexts_limit() -> usize { - 500_000 -} - -const fn default_cached_tagsets_limit() -> usize { - 500_000 -} - -const fn default_context_expiry_seconds() -> u64 { - 20 -} - -const fn default_dogstatsd_permissive_decoding() -> bool { - true -} - -const fn default_dogstatsd_minimum_sample_rate() -> f64 { - 0.000000003845 -} - -const fn default_true() -> bool { - true -} - -/// Controls which payload types are forwarded to the backend. -#[derive(Deserialize)] -#[cfg_attr(test, derive(PartialEq, serde::Serialize))] -pub struct EnablePayloadsConfiguration { - /// Whether or not to enable sending series (counter/gauge/rate) payloads. - /// - /// Defaults to `true`. - #[serde(default = "default_true")] - pub series: bool, - - /// Whether or not to enable sending sketch (distribution) payloads. - /// - /// Defaults to `true`. - #[serde(default = "default_true")] - pub sketches: bool, - - /// Whether or not to enable sending event payloads. - /// - /// Defaults to `true`. - #[serde(default = "default_true")] - pub events: bool, - - /// Whether or not to enable sending service check payloads. - /// - /// Defaults to `true`. - #[serde(default = "default_true")] - pub service_checks: bool, -} - -impl Default for EnablePayloadsConfiguration { - fn default() -> Self { - Self { - series: true, - sketches: true, - events: true, - service_checks: true, - } - } -} - const MIN_CAPTURE_DEPTH: usize = 1024; -const fn default_capture_depth() -> usize { - MIN_CAPTURE_DEPTH -} - -const DOGSTATSD_CAPTURE_DIR: &str = "dsd_capture"; - -fn deserialize_empty_metastring_as_none<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - let value = Option::::deserialize(deserializer)?; - Ok(value.filter(|host| !host.is_empty())) -} - -/// DogStatsD source. +/// Builder for the DogStatsD *source* component. +/// +/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format. This is the +/// source only - one member of the DogStatsD component family (source, mapper, aggregate, debug-log, +/// filter). +/// +/// It is built from a `SourceConfig` (the plain, translated config data, defined in +/// `saluki-component-config`) plus runtime-injected state. The config values live in `self.source`; +/// the runtime handles (workload provider, capture-entity resolver, capture and replay control) and +/// the `build()` behavior live on this type. /// -/// Accepts metrics over TCP, UDP, or Unix Domain Sockets in the StatsD/DogStatsD format. -#[serde_as] -#[derive(Deserialize, Default)] +/// This type has no `Deserialize`. It is constructed by `new` from an already-translated +/// `SourceConfig`, never deserialized from raw config: the raw Datadog config is deserialized +/// upstream into the native model, and the translated result reaches the source as `SourceConfig`. +/// +/// It embeds `SourceConfig` (the source's slice), not `dogstatsd::Config` (the whole DogStatsD +/// domain group). It cannot name `dogstatsd::Config`: that type lives in `agent-data-plane-config`, +/// which `saluki-components` must not depend on. +#[derive(Default)] #[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))] #[cfg_attr(test, derive_where(PartialEq))] pub struct DogStatsDConfiguration { - /// The size of the buffer used to receive messages into, in bytes. - /// - /// Payloads can't exceed this size, or they will be truncated, leading to discarded messages. - /// - /// Defaults to 8192 bytes. - #[serde(rename = "dogstatsd_buffer_size", default = "default_buffer_size")] - buffer_size: usize, - - /// The number of message buffers to allocate up front. - /// - /// This is the baseline pool size allocated at startup. The pool then grows on demand up to - /// `dogstatsd_buffer_count_max` as active stream connections need additional buffers. The default value should be - /// suitable for the majority of workloads. - /// - /// Defaults to 128. - #[serde(rename = "dogstatsd_buffer_count", default = "default_buffer_count")] - buffer_count: usize, - - /// The maximum number of message buffers to allocate overall. - /// - /// The pool starts at `dogstatsd_buffer_count` buffers and grows on demand up to this limit as active stream - /// connections need them, which loosely correlates with how many messages can be received per second. This caps - /// memory growth. The default value should be suitable for the majority of workloads, but high-throughput or - /// high-fan-in workloads may consider increasing this value. - /// - /// The pool never holds fewer buffers than `dogstatsd_buffer_count`, so a value below the baseline is treated as - /// equal to it. - /// - /// Defaults to 256, or `dogstatsd_buffer_count` if that is larger. - #[serde(rename = "dogstatsd_buffer_count_max", default = "default_buffer_count_max")] - buffer_count_max: usize, - - /// The port to listen on in UDP mode. - /// - /// If set to `0`, UDP isn't used. - /// - /// Defaults to 8125. - #[serde(rename = "dogstatsd_port", default = "default_port")] - port: u16, - - /// The size of the DogStatsD UDP/UDS socket receive buffer, in bytes. - /// - /// If set to `0`, the OS default is used. - /// - /// Defaults to 0. - #[serde(rename = "dogstatsd_so_rcvbuf", default = "default_socket_receive_buffer_size")] - socket_receive_buffer_size: usize, - - /// The port to listen on in TCP mode. - /// - /// If set to `0`, TCP isn't used. - /// - /// Defaults to 0. - #[serde(rename = "dogstatsd_tcp_port", default = "default_tcp_port")] - tcp_port: u16, - - /// The host to forward framed DogStatsD messages to over UDP. - /// - /// Forwarding is enabled only when this value is non-empty and `statsd_forward_port` is non-zero. Setup failures - /// are logged, and send failures are tracked through telemetry. - /// - /// Defaults to unset. - #[serde( - rename = "statsd_forward_host", - default, - deserialize_with = "deserialize_empty_metastring_as_none" - )] - statsd_forward_host: Option, - - /// The port to forward framed DogStatsD messages to over UDP. - /// - /// Forwarding is enabled only when this value is non-zero and `statsd_forward_host` is non-empty. - /// - /// Defaults to 0. - #[serde(rename = "statsd_forward_port", default = "default_statsd_forward_port")] - statsd_forward_port: u16, - - /// The Unix domain socket path to listen on, in datagram mode. - /// - /// If not set, UDS (in datagram mode) isn't used. - /// - /// Defaults to unset. - #[serde(rename = "dogstatsd_socket", default)] - #[serde_as(as = "NoneAsEmptyString")] - socket_path: Option, - - /// The Unix domain socket path to listen on, in stream mode. - /// - /// If not set, UDS (in stream mode) isn't used. - /// - /// Defaults to unset. - #[serde(rename = "dogstatsd_stream_socket", default)] - #[serde_as(as = "NoneAsEmptyString")] - socket_stream_path: Option, - - /// Controls whether ADP logs oversized DogStatsD stream frames. - /// - /// When set to `true`, ADP emits a warning when a UDS stream frame exceeds the - /// configured DogStatsD buffer size. The frame is still rejected either way. - /// - /// Enable this when diagnosing clients that send oversized UDS stream frames. - /// - /// Defaults to `false`. - #[serde(rename = "dogstatsd_stream_log_too_big", default)] - stream_log_too_big: bool, - - /// Whether ADP lowers DogStatsD parse-failure logs to debug level. - /// - /// When set to `true`, invalid metrics, events, and service checks still increment decode-failure telemetry, but - /// their parse-failure logs are emitted at debug level instead of warning level. Enable this to suppress noisy - /// parse-error logs from misbehaving clients. - /// - /// Defaults to `false`. - #[serde(rename = "dogstatsd_disable_verbose_logs", default)] - disable_verbose_logs: bool, - - /// Listener types that require DogStatsD messages to be newline-terminated. - /// - /// Valid values are `udp`, `uds`, and `named_pipe`. ADP accepts `named_pipe` for compatibility, but it has no effect - /// until named pipe listeners are supported. Invalid values are ignored. - /// - /// Enable this when DogStatsD clients must reject packets or stream frames that don't end with a newline. - /// - /// Defaults to unset, which accepts the final message without a newline. - #[serde( - rename = "dogstatsd_eol_required", - default, - deserialize_with = "deserialize_space_separated_or_seq" - )] - eol_required: Vec, - - /// The host address to bind DogStatsD UDP and TCP listeners to. - /// - /// When set, UDP and TCP listeners bind to this address. Accepts either an IP literal (for example, - /// `192.168.1.50`, `::1`) or a hostname that resolves via DNS (for example, `agent.internal`). - /// Ignored when `dogstatsd_non_local_traffic` is `true`. - /// - /// Defaults to unset, which binds to `127.0.0.1`. - #[serde(rename = "bind_host", default)] - #[serde_as(as = "NoneAsEmptyString")] - bind_host: Option, - - /// Whether or not to listen for non-local traffic in UDP mode. - /// - /// If set to `true`, the listener will accept packets from any interface/address. Otherwise, the source will only - /// listen on the address specified by `bind_host`, or `127.0.0.1` if `bind_host` isn't set. - /// - /// Defaults to `false`. - #[serde(rename = "dogstatsd_non_local_traffic", default)] - non_local_traffic: bool, - - /// Whether to autoscale UDP stream handlers using `SO_REUSEPORT`. - /// - /// When enabled on Linux, the DogStatsD source binds multiple UDP sockets to the configured port with - /// `SO_REUSEPORT`, allowing the kernel to load-balance incoming datagrams across independent stream handler - /// tasks. The number of sockets scales with available vCPUs: one stream handler base, plus one additional - /// per 8 vCPUs, capped at 4 total. - /// - /// Has no effect on non-Linux platforms because `SO_REUSEPORT` doesn't provide kernel-level load balancing - /// there; a warning is logged at startup if enabled outside of Linux. - /// - /// Enable this on multi-vCPU Linux deployments where UDP DogStatsD throughput is bottlenecked on a single - /// receive task. - /// - /// Defaults to `false`. - #[serde(rename = "dogstatsd_autoscale_udp_listeners", default)] - autoscale_udp_listeners: bool, - - /// Whether or not to allow heap allocations when resolving contexts. - /// - /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The - /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full. - /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to - /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags can't be - /// interned, the metric is skipped. - /// - /// Defaults to `true`. - #[serde( - rename = "dogstatsd_allow_context_heap_allocs", - default = "default_allow_context_heap_allocations" - )] - allow_context_heap_allocations: bool, - - /// Whether or not to enable support for no-aggregation pipelines. - /// - /// When enabled, this influences how metrics are parsed, specifically around user-provided metric timestamps. When - /// metric timestamps are present, it's used as a signal to any aggregation transforms that the metric shouldn't - /// be aggregated. - /// - /// Defaults to `true`. - #[serde( - rename = "dogstatsd_no_aggregation_pipeline", - default = "default_no_aggregation_pipeline_support" - )] - no_aggregation_pipeline_support: bool, - - /// Number of entries for the string interner, as interpreted by the Core Datadog Agent. - /// - /// When `dogstatsd_string_interner_size_bytes` isn't set, this value is multiplied by 512 bytes per entry to - /// derive the interner byte size. This provides backwards compatibility for customers migrating configurations - /// from the Core Agent, where this setting represents an entry count rather than a byte size. - /// - /// Defaults to 4096 entries, which yields 2 MiB when converted. - #[serde( - rename = "dogstatsd_string_interner_size", - default = "default_context_string_interner_entry_count" - )] - context_string_interner_entry_count: u64, - - /// Total size of the string interner used for contexts, in bytes. - /// - /// When set, this takes priority over `dogstatsd_string_interner_size`. This controls the amount of memory that - /// can be used to intern metric names and tags. If the interner is full, metrics with contexts that haven't - /// already been resolved may or may not be dropped, depending on the value of `allow_context_heap_allocations`. - #[serde(rename = "dogstatsd_string_interner_size_bytes", default)] - context_string_interner_size_bytes: Option, - - /// The maximum number of cached contexts to allow. - /// - /// This is the maximum number of resolved contexts that can be cached at any given time. This limit doesn't affect - /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity - /// and whether or not heap allocations are allowed. - /// - /// Defaults to 500,000. - #[serde( - rename = "dogstatsd_cached_contexts_limit", - default = "default_cached_contexts_limit" - )] - cached_contexts_limit: usize, - - /// The maximum number of cached tagsets to allow. - /// - /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit doesn't affect - /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity - /// and whether or not heap allocations are allowed. - /// - /// Defaults to 500,000. - #[serde(rename = "dogstatsd_cached_tagsets_limit", default = "default_cached_tagsets_limit")] - cached_tagsets_limit: usize, - - /// The number of seconds after which cached contexts will expire. - /// - /// Higher values allow for more effective caching for sparse metrics at the cost of increased memory usage. - /// - /// Defaults to 20 seconds. - #[serde( - rename = "dogstatsd_context_expiry_seconds", - default = "default_context_expiry_seconds" - )] - context_expiry_seconds: u64, - - /// Whether or not to enable permissive mode in the decoder. - /// - /// Permissive mode allows the decoder to relax its strictness around the allowed payloads, which lets it match the - /// decoding behavior of the Datadog Agent. - /// - /// Defaults to `true`. - #[serde( - rename = "dogstatsd_permissive_decoding", - default = "default_dogstatsd_permissive_decoding" - )] - permissive_decoding: bool, - - /// The minimum sample rate allowed for metrics. - /// - /// When metrics are sent with a sample rate _lower_ than this value then it will be clamped to this value. This is - /// done in order to ensure an upper bound on how many equivalent samples are tracked for the metric, as high sample - /// rates (very small numbers, such as `0.00000001`) can lead to large memory growth. - /// - /// A warning log will be emitted when clamping occurs, as this represents an effective loss of metric samples. - /// - /// Defaults to `0.000000003845`. (~260M samples) - #[serde( - rename = "dogstatsd_minimum_sample_rate", - default = "default_dogstatsd_minimum_sample_rate" - )] - minimum_sample_rate: f64, - - /// Which payload types to forward to the backend. - #[serde(rename = "enable_payloads", default)] - enable_payloads: EnablePayloadsConfiguration, - - /// Configuration related to origin detection and enrichment. - #[serde(flatten, default)] - origin_enrichment: OriginEnrichmentConfiguration, + /// The resolved, source-agnostic configuration this source consumes. + source: saluki_component_config::dogstatsd::SourceConfig, /// Workload provider to utilize for origin detection/enrichment. - #[serde(skip)] + #[cfg_attr(test, serde(skip))] #[cfg_attr(test, derive_where(skip))] workload_provider: Option>, /// Resolver to use for mapping live sender PIDs to container entities during traffic capture. - #[serde(skip, default)] + #[cfg_attr(test, serde(skip))] #[cfg_attr(test, derive_where(skip))] capture_entity_resolver: Option>, - /// Additional tags to add to all metrics. - #[serde(rename = "dogstatsd_tags", default)] - additional_tags: Vec, - - /// The directory where DogStatsD capture files are written by default. - /// - /// When set to an empty path, the source attempts to derive the directory from `run_path` by appending - /// `dsd_capture`. If neither value is available, callers must provide an explicit capture path when starting a - /// capture session. - /// - /// Defaults to empty. - #[serde(rename = "dogstatsd_capture_path", default)] - capture_path: PathBuf, - - /// The maximum number of captured packets that can be queued for persistence. - /// - /// This controls the depth of the in-process capture queue. Values below `1024` are raised to `1024` before the - /// capture writer starts, preventing a zero-depth rendezvous channel from serializing DogStatsD stream handlers - /// behind capture persistence. - /// - /// Defaults to `1024`. - #[serde(rename = "dogstatsd_capture_depth", default = "default_capture_depth")] - capture_depth: usize, - - #[serde(skip, default)] + /// Control handle bound to the running traffic-capture surface. + #[cfg_attr(test, serde(skip))] #[cfg_attr(test, derive_where(skip))] capture_control: DogStatsDCaptureControl, - #[serde(skip, default)] + /// Control handle bound to the running replay surface. + #[cfg_attr(test, serde(skip))] #[cfg_attr(test, derive_where(skip))] replay_control: DogStatsDReplayControl, - - /// Provider kind tag appended to all metrics as `provider_kind:`. - /// - /// Set via `DD_PROVIDER_KIND` by the Helm chart on GKE Autopilot (`gke-autopilot`) and GKE on - /// Google Distributed Cloud (`gke-gdc`). When empty or absent, no tag is added. - /// - /// Defaults to `""` (disabled). - #[serde(default)] - provider_kind: String, } #[derive(Clone, Copy, Default)] @@ -639,27 +217,37 @@ async fn resolve_bind_host(host: &str) -> Result { } impl DogStatsDConfiguration { - /// Creates a new `DogStatsDConfiguration` from the given configuration. - pub fn from_configuration(config: &GenericConfiguration) -> Result { - let mut dogstatsd_config: Self = config.as_typed()?; - dogstatsd_config.fix_empty_capture_path(config); - dogstatsd_config.fix_capture_depth(); - Ok(dogstatsd_config) + /// Creates a new `DogStatsDConfiguration` from its component-native configuration. + /// + /// The native `SourceConfig` is the resolved, source-agnostic data the source consumes. The + /// injected runtime state (workload provider, capture-entity resolver, and the capture/replay + /// control handles) starts unset and is supplied separately. The capture-queue depth floor is + /// re-applied here because the native value is the raw configured depth. + pub fn new(source: saluki_component_config::dogstatsd::SourceConfig) -> Self { + let mut config = Self { + source, + workload_provider: None, + capture_entity_resolver: None, + capture_control: Default::default(), + replay_control: Default::default(), + }; + config.fix_capture_depth(); + config } /// Gets both the `additional_tags` and any others specified by other configuration fields, such as `provider_kind`. fn additional_tags(&self) -> Vec { - if self.provider_kind.is_empty() { - return self.additional_tags.clone(); + if self.source.provider_kind.is_empty() { + return self.source.additional_tags.clone(); } - let mut tags = self.additional_tags.clone(); - tags.push(format!("provider_kind:{}", self.provider_kind.clone())); + let mut tags = self.source.additional_tags.clone(); + tags.push(format!("provider_kind:{}", self.source.provider_kind.clone())); tags } fn fix_capture_depth(&mut self) { - self.capture_depth = self.capture_depth.max(MIN_CAPTURE_DEPTH); + self.source.capture_depth = self.source.capture_depth.max(MIN_CAPTURE_DEPTH); } /// Returns the effective string interner size in bytes. @@ -668,23 +256,23 @@ impl DogStatsDConfiguration { /// `dogstatsd_string_interner_size` (an entry count) is multiplied by 512 bytes per entry to derive the byte /// size. fn effective_context_string_interner_bytes(&self) -> ByteSize { - match self.context_string_interner_size_bytes { + match self.source.context_string_interner_size_bytes { Some(explicit_bytes) => explicit_bytes, - None => ByteSize::b(self.context_string_interner_entry_count * INTERNER_BASELINE_BYTES_PER_ENTRY), + None => ByteSize::b(self.source.context_string_interner_entry_count * INTERNER_BASELINE_BYTES_PER_ENTRY), } } fn eol_required(&self) -> EolRequired { - EolRequired::from_config_values(&self.eol_required) + EolRequired::from_config_values(&self.source.eol_required) } fn statsd_forward_target(&self) -> Option<(&MetaString, u16)> { - let host = self.statsd_forward_host.as_ref()?; - if self.statsd_forward_port == 0 { + let host = self.source.statsd_forward_host.as_ref()?; + if self.source.statsd_forward_port == 0 { return None; } - Some((host, self.statsd_forward_port)) + Some((host, self.source.statsd_forward_port)) } fn packet_forwarder_target(&self) -> Option { @@ -698,12 +286,12 @@ impl DogStatsDConfiguration { /// Returns `None` when autoscaling is disabled, which keeps the legacy single-socket behavior. The platform /// gate for `SO_REUSEPORT` lives inside the listener—this method intentionally stays platform-agnostic. fn udp_streams_to_yield(&self) -> Option { - if !self.autoscale_udp_listeners { + if !self.source.autoscale_udp_listeners { return None; } #[cfg(not(target_os = "linux"))] - if self.autoscale_udp_listeners { + if self.source.autoscale_udp_listeners { warn!("UDP stream handler autoscaling not supported on non-Linux platforms. Default to single stream handler."); return None; } @@ -713,15 +301,6 @@ impl DogStatsDConfiguration { NonZeroUsize::new(streams) } - /// Returns the effective maximum size of the I/O buffer pool. - /// - /// The pool can never hold fewer buffers than the configured baseline, so a `dogstatsd_buffer_count_max` below - /// `dogstatsd_buffer_count` (including a legacy config that only raised `dogstatsd_buffer_count`) is treated as - /// equal to the baseline rather than reducing capacity. - fn effective_max_buffer_count(&self) -> usize { - self.buffer_count_max.max(self.buffer_count) - } - /// Sets the workload provider to use for configuring origin detection/enrichment. /// /// A workload provider must be set otherwise origin detection/enrichment won't be enabled. @@ -769,34 +348,6 @@ impl DogStatsDConfiguration { DogStatsDReplayAPIHandler::new(self.replay_control.clone()) } - fn fix_empty_capture_path(&mut self, config: &GenericConfiguration) { - if self.capture_path.parent().is_some() { - return; - } - - let capture_path = match config.try_get_typed::("run_path") { - Ok(Some(mut run_path)) => { - run_path.push(DOGSTATSD_CAPTURE_DIR); - run_path - } - Ok(None) => { - debug!( - "`dogstatsd_capture_path` and `run_path` were empty. Default DogStatsD capture path is unavailable." - ); - return; - } - Err(e) => { - debug!( - error = %e, - "Failed to read `run_path` from configuration. Default DogStatsD capture path is unavailable." - ); - return; - } - }; - - self.capture_path = capture_path; - } - /// Using the current configuration, determines which listeners should be created and adds an address for each into /// a `Vec`. This function has no side effects so that it can be unit tested whereas build_listeners` /// actually binds the listeners on the system. @@ -807,7 +358,7 @@ impl DogStatsDConfiguration { /// - `bind_host=Some(ip)` → `ip` /// - `bind_host=None` → `127.0.0.1` fn build_addresses(&self, bind_host: Option) -> Vec { - let bind_ip: std::net::IpAddr = if self.non_local_traffic { + let bind_ip: std::net::IpAddr = if self.source.non_local_traffic { [0, 0, 0, 0].into() } else { bind_host.unwrap_or_else(|| [127, 0, 0, 1].into()) @@ -815,19 +366,22 @@ impl DogStatsDConfiguration { let mut addresses: Vec = Vec::new(); - if self.port != 0 { - addresses.push(ListenAddress::Udp(std::net::SocketAddr::new(bind_ip, self.port))); + if self.source.port != 0 { + addresses.push(ListenAddress::Udp(std::net::SocketAddr::new(bind_ip, self.source.port))); } - if self.tcp_port != 0 { - addresses.push(ListenAddress::Tcp(std::net::SocketAddr::new(bind_ip, self.tcp_port))); + if self.source.tcp_port != 0 { + addresses.push(ListenAddress::Tcp(std::net::SocketAddr::new( + bind_ip, + self.source.tcp_port, + ))); } - if let Some(socket_path) = &self.socket_path { + if let Some(socket_path) = &self.source.socket_path { addresses.push(ListenAddress::Unixgram(socket_path.into())); } - if let Some(socket_stream_path) = &self.socket_stream_path { + if let Some(socket_stream_path) = &self.source.socket_stream_path { addresses.push(ListenAddress::Unix(socket_stream_path.into())); } @@ -839,10 +393,10 @@ impl DogStatsDConfiguration { // Resolve `bind_host` to an IP (via DNS if needed). Skip the lookup when // `non_local_traffic=true` since `bind_host` is ignored in that branch—matches Go's // laziness and avoids failing startup on an unresolvable hostname that wouldn't be used. - let bind_host: Option = if self.non_local_traffic { + let bind_host: Option = if self.source.non_local_traffic { None } else { - match &self.bind_host { + match &self.source.bind_host { Some(host) => Some(resolve_bind_host(host).await?), None => None, } @@ -851,7 +405,7 @@ impl DogStatsDConfiguration { let addresses = self.build_addresses(bind_host); let mut listeners = Vec::new(); let socket_receive_buffer_size = - (self.socket_receive_buffer_size != 0).then_some(self.socket_receive_buffer_size); + (self.source.socket_receive_buffer_size != 0).then_some(self.source.socket_receive_buffer_size); let udp_streams_to_yield = self.udp_streams_to_yield(); for address in addresses { let listener_type = address.listener_type(); @@ -881,7 +435,7 @@ impl SourceBuilder for DogStatsDConfiguration { // deadlocking any of the others. Connectionless listeners retain their buffer for the lifetime of the stream, // so multi-socket UDP listeners require one buffer per yielded socket. let min_buffers: usize = listeners.iter().map(Listener::min_buffer_reservation).sum(); - let max_buffers = self.effective_max_buffer_count(); + let max_buffers = self.source.buffer_count_max.max(self.source.buffer_count); if max_buffers < min_buffers { return Err(generic_error!( "The maximum I/O buffer count ({}) must be at least {} to service all configured listeners.", @@ -890,34 +444,34 @@ impl SourceBuilder for DogStatsDConfiguration { )); } - let origin_detection_enabled = self.origin_enrichment.enabled(); + let origin_detection_enabled = self.source.origin_enrichment.enabled; // Single CapturedTaggerHandle is cloned to both the resolver (reader of the captured store) and the replay // control surface (writer). Both sides reference the same atomic slot. let captured_tagger = CapturedTaggerHandle::new(); let maybe_origin_tags_resolver = self.workload_provider.clone().map(|provider| { - DogStatsDOriginTagResolver::new(self.origin_enrichment.clone(), provider, captured_tagger.clone()) + DogStatsDOriginTagResolver::new(self.source.origin_enrichment.clone(), provider, captured_tagger.clone()) }); let context_resolvers = ContextResolvers::new(self, &context, maybe_origin_tags_resolver) .error_context("Failed to create context resolvers.")?; let codec_config = DogStatsDCodecConfiguration::default() - .with_timestamps(self.no_aggregation_pipeline_support) - .with_permissive_mode(self.permissive_decoding) - .with_minimum_sample_rate(self.minimum_sample_rate) - .with_client_origin_detection(self.origin_enrichment.origin_detection_client); + .with_timestamps(self.source.no_aggregation_pipeline_support) + .with_permissive_mode(self.source.permissive_decoding) + .with_minimum_sample_rate(self.source.minimum_sample_rate) + .with_client_origin_detection(self.source.origin_enrichment.origin_detection_client); let codec = DogStatsDCodec::from_configuration(codec_config); let eol_required = self.eol_required(); let enable_payloads_filter = EnablePayloadsFilter::default() - .with_allow_series(self.enable_payloads.series) - .with_allow_sketches(self.enable_payloads.sketches) - .with_allow_events(self.enable_payloads.events) - .with_allow_service_checks(self.enable_payloads.service_checks); + .with_allow_series(self.source.enable_payloads.series) + .with_allow_sketches(self.source.enable_payloads.sketches) + .with_allow_events(self.source.enable_payloads.events) + .with_allow_service_checks(self.source.enable_payloads.service_checks); let traffic_capture = TrafficCapture::with_workload_provider( - self.capture_path.clone(), - self.capture_depth.max(MIN_CAPTURE_DEPTH), + self.source.capture_path.clone(), + self.source.capture_depth.max(MIN_CAPTURE_DEPTH), self.workload_provider.clone(), ); self.capture_control.bind(traffic_capture.clone()); @@ -929,7 +483,7 @@ impl SourceBuilder for DogStatsDConfiguration { // maximum is never below the baseline, so configs that only raise `dogstatsd_buffer_count` keep their full // capacity instead of being silently reduced to the `dogstatsd_buffer_count_max` default. let (io_buffer_pool, io_buffer_pool_shrinker) = - build_io_buffer_pool(self.buffer_count, max_buffers, self.buffer_size); + build_io_buffer_pool(self.source.buffer_count, max_buffers, self.source.buffer_size); Ok(Box::new(DogStatsD { listeners, @@ -939,8 +493,8 @@ impl SourceBuilder for DogStatsDConfiguration { context_resolvers, enabled_filter: enable_payloads_filter, origin_detection_enabled, - stream_log_too_big: self.stream_log_too_big, - disable_verbose_logs: self.disable_verbose_logs, + stream_log_too_big: self.source.stream_log_too_big, + disable_verbose_logs: self.source.disable_verbose_logs, eol_required, additional_tags: self.additional_tags().into(), capture_entity_resolver: self.capture_entity_resolver.clone(), @@ -963,8 +517,9 @@ impl SourceBuilder for DogStatsDConfiguration { impl MemoryBounds for DogStatsDConfiguration { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { - let additional_buffers = self.effective_max_buffer_count().saturating_sub(self.buffer_count); - let adjusted_buffer_size = get_adjusted_buffer_size(self.buffer_size); + let max_buffers = self.source.buffer_count_max.max(self.source.buffer_count); + let additional_buffers = max_buffers.saturating_sub(self.source.buffer_count); + let adjusted_buffer_size = get_adjusted_buffer_size(self.source.buffer_size); builder .minimum() @@ -973,7 +528,7 @@ impl MemoryBounds for DogStatsDConfiguration { // We allocate the baseline buffer pool up front. .with_expr(UsageExpr::product( "buffers", - UsageExpr::config("dogstatsd_buffer_count", self.buffer_count), + UsageExpr::config("dogstatsd_buffer_count", self.source.buffer_count), UsageExpr::config("dogstatsd_buffer_size", adjusted_buffer_size), )) // We also allocate the backing storage for the string interner up front, which is used by our context @@ -1876,14 +1431,13 @@ mod tests { collections::HashMap, io::ErrorKind, net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, - path::PathBuf, sync::{Arc, OnceLock}, time::Duration, }; use bytes::Bytes; use bytesize::ByteSize; - use saluki_config_tools::ConfigurationLoader; + use saluki_component_config::dogstatsd::SourceConfig; use saluki_context::{origin::RawOrigin, ContextResolverBuilder, TagsResolverBuilder}; use saluki_core::{components::ComponentContext, pooling::ObjectPool as _, topology::ComponentId}; use saluki_env::workload::{CaptureEntityResolver, EntityId}; @@ -1892,19 +1446,17 @@ mod tests { net::{ConnectionAddress, ListenAddress, ProcessCredentials, ProcessIdentity}, }; use saluki_metrics::test::TestRecorder; - use serde_json::json; use stringtheory::MetaString; use tokio::{net::UdpSocket, sync::mpsc, time::timeout}; use super::{ - build_io_buffer_pool, default_buffer_size, + build_io_buffer_pool, forwarder::{ ConnectedPacketForwarder, ForwardPacket, PacketForwarder, PacketForwarderTarget, FORWARDER_QUEUE_CAPACITY, }, handle_metric_packet, metrics::build_metrics, - resolve_capture_container_id, ContextResolvers, DogStatsDConfiguration, DOGSTATSD_CAPTURE_DIR, - MIN_CAPTURE_DEPTH, + resolve_capture_container_id, ContextResolvers, DogStatsDConfiguration, MIN_CAPTURE_DEPTH, }; const LINUX_EAFNOSUPPORT: i32 = 97; @@ -2010,10 +1562,6 @@ mod tests { } } - fn deser_config(json: &str) -> DogStatsDConfiguration { - serde_json::from_str(json).expect("failed to deserialize config") - } - fn udp_listen_address() -> ListenAddress { ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8125))) } @@ -2024,71 +1572,45 @@ mod tests { #[test] fn interner_size_defaults_to_2mib() { - let config = deser_config("{}"); + let config = DogStatsDConfiguration::new(SourceConfig::default()); assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2)); } - #[test] - fn socket_receive_buffer_size_defaults_to_zero() { - let config = deser_config("{}"); - assert_eq!(config.socket_receive_buffer_size, 0); - } - - #[test] - fn socket_receive_buffer_size_from_config() { - let config = deser_config(r#"{"dogstatsd_so_rcvbuf": 131072}"#); - assert_eq!(config.socket_receive_buffer_size, 131_072); - } - - #[test] - fn stream_log_too_big_defaults_to_false() { - let config = deser_config("{}"); - assert!(!config.stream_log_too_big); - } - - #[test] - fn stream_log_too_big_from_config() { - let config = deser_config(r#"{"dogstatsd_stream_log_too_big": true}"#); - assert!(config.stream_log_too_big); - } - - #[test] - fn disable_verbose_logs_defaults_to_false() { - let config = deser_config("{}"); - assert!(!config.disable_verbose_logs); - } - - #[test] - fn disable_verbose_logs_from_config() { - let config = deser_config(r#"{"dogstatsd_disable_verbose_logs": true}"#); - assert!(config.disable_verbose_logs); - } - #[test] fn statsd_forward_defaults_disabled() { - let config = deser_config("{}"); - assert!(config.statsd_forward_host.is_none()); - assert_eq!(config.statsd_forward_port, 0); + let config = DogStatsDConfiguration::new(SourceConfig::default()); + assert!(config.source.statsd_forward_host.is_none()); + assert_eq!(config.source.statsd_forward_port, 0); assert!(config.statsd_forward_target().is_none()); } #[test] fn statsd_forward_empty_host_disabled() { - let config = deser_config(r#"{"statsd_forward_host": "", "statsd_forward_port": 9125}"#); - assert!(config.statsd_forward_host.is_none()); + let config = DogStatsDConfiguration::new(SourceConfig { + statsd_forward_port: 9125, + ..Default::default() + }); + assert!(config.source.statsd_forward_host.is_none()); assert!(config.statsd_forward_target().is_none()); } #[test] fn statsd_forward_zero_port_disabled() { - let config = deser_config(r#"{"statsd_forward_host": "127.0.0.1", "statsd_forward_port": 0}"#); - assert_eq!(config.statsd_forward_host.as_deref(), Some("127.0.0.1")); + let config = DogStatsDConfiguration::new(SourceConfig { + statsd_forward_host: Some("127.0.0.1".into()), + ..Default::default() + }); + assert_eq!(config.source.statsd_forward_host.as_deref(), Some("127.0.0.1")); assert!(config.statsd_forward_target().is_none()); } #[test] fn statsd_forward_host_and_port_enabled() { - let config = deser_config(r#"{"statsd_forward_host": "127.0.0.1", "statsd_forward_port": 9125}"#); + let config = DogStatsDConfiguration::new(SourceConfig { + statsd_forward_host: Some("127.0.0.1".into()), + statsd_forward_port: 9125, + ..Default::default() + }); let (host, port) = config.statsd_forward_target().expect("forwarding should be enabled"); assert_eq!(host.as_ref(), "127.0.0.1"); assert_eq!(port, 9125); @@ -2096,7 +1618,11 @@ mod tests { #[test] fn statsd_forward_invalid_target_still_builds_forwarder_handle() { - let config = deser_config(r#"{"statsd_forward_host": "not a valid host", "statsd_forward_port": 9125}"#); + let config = DogStatsDConfiguration::new(SourceConfig { + statsd_forward_host: Some("not a valid host".into()), + statsd_forward_port: 9125, + ..Default::default() + }); assert!(config.packet_forwarder_target().is_some()); } @@ -2256,32 +1782,25 @@ mod tests { #[test] fn autoscale_udp_listeners_defaults_to_false() { - let config = deser_config("{}"); - assert!(!config.autoscale_udp_listeners); + let config = DogStatsDConfiguration::new(SourceConfig::default()); + assert!(!config.source.autoscale_udp_listeners); assert!(config.udp_streams_to_yield().is_none()); } #[test] fn effective_max_buffer_count_never_below_baseline() { - // A legacy config that only raised `dogstatsd_buffer_count` keeps its full capacity rather than being capped - // to the `dogstatsd_buffer_count_max` default. - let legacy = deser_config(r#"{"dogstatsd_buffer_count": 1024}"#); - assert_eq!(legacy.effective_max_buffer_count(), 1024); - - // An explicit maximum above the baseline is honored as-is. - let explicit = deser_config(r#"{"dogstatsd_buffer_count": 128, "dogstatsd_buffer_count_max": 512}"#); - assert_eq!(explicit.effective_max_buffer_count(), 512); + let effective = |count: usize, max: usize| max.max(count); - // A maximum below the baseline is treated as equal to the baseline. - let below = deser_config(r#"{"dogstatsd_buffer_count": 200, "dogstatsd_buffer_count_max": 64}"#); - assert_eq!(below.effective_max_buffer_count(), 200); + assert_eq!(effective(1024, 256), 1024); + assert_eq!(effective(128, 512), 512); + assert_eq!(effective(200, 64), 200); } #[tokio::test] async fn dogstatsd_io_buffer_pool_grows_on_demand_until_limit() { let min_buffers = 2; let max_buffers = 3; - let (pool, shrinker) = build_io_buffer_pool(min_buffers, max_buffers, default_buffer_size()); + let (pool, shrinker) = build_io_buffer_pool(min_buffers, max_buffers, 8192); let mut initial_buffers = Vec::with_capacity(min_buffers); for _ in 0..min_buffers { @@ -2310,8 +1829,11 @@ mod tests { #[test] #[cfg(target_os = "linux")] fn autoscale_udp_listeners_from_config_linux() { - let config = deser_config(r#"{"dogstatsd_autoscale_udp_listeners": true}"#); - assert!(config.autoscale_udp_listeners); + let config = DogStatsDConfiguration::new(SourceConfig { + autoscale_udp_listeners: true, + ..Default::default() + }); + assert!(config.source.autoscale_udp_listeners); let streams = config .udp_streams_to_yield() @@ -2326,15 +1848,18 @@ mod tests { #[test] #[cfg(not(target_os = "linux"))] fn autoscale_udp_listeners_from_config_non_linux() { - let config = deser_config(r#"{"dogstatsd_autoscale_udp_listeners": true}"#); - assert!(config.autoscale_udp_listeners); + let config = DogStatsDConfiguration::new(SourceConfig { + autoscale_udp_listeners: true, + ..Default::default() + }); + assert!(config.source.autoscale_udp_listeners); assert_eq!(None, config.udp_streams_to_yield()); } #[test] fn eol_required_defaults_to_no_listeners() { - let config = deser_config("{}"); + let config = DogStatsDConfiguration::new(SourceConfig::default()); let eol_required = config.eol_required(); assert!(!eol_required.for_listener(&udp_listen_address())); @@ -2343,7 +1868,10 @@ mod tests { #[test] fn eol_required_matches_configured_listener_types() { - let config = deser_config(r#"{"dogstatsd_eol_required": ["udp", "uds"]}"#); + let config = DogStatsDConfiguration::new(SourceConfig { + eol_required: vec!["udp".to_string(), "uds".to_string()], + ..Default::default() + }); let eol_required = config.eol_required(); assert!(eol_required.for_listener(&udp_listen_address())); @@ -2356,14 +1884,6 @@ mod tests { } } - #[test] - fn eol_required_accepts_space_separated_string() { - let config = deser_config(r#"{"dogstatsd_eol_required": "udp uds"}"#); - let eol_required = config.eol_required(); - - assert!(eol_required.for_listener(&udp_listen_address())); - } - #[test] fn stream_log_too_big_only_warns_for_enabled_unix_invalid_frames() { let uds_stream = ListenAddress::Unix("/tmp/dsd-stream.sock".into()); @@ -2381,28 +1901,39 @@ mod tests { #[test] fn interner_size_from_entry_count() { // A Core Agent migration config with entry count 4096 should yield 2 MiB, not 4096 bytes. - let config = deser_config(r#"{"dogstatsd_string_interner_size": 4096}"#); + let config = DogStatsDConfiguration::new(SourceConfig { + context_string_interner_entry_count: 4096, + ..Default::default() + }); assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(2)); } #[test] fn interner_size_from_explicit_bytes() { - let config = deser_config(r#"{"dogstatsd_string_interner_size_bytes": 4194304}"#); + let config = DogStatsDConfiguration::new(SourceConfig { + context_string_interner_size_bytes: Some(ByteSize::b(4194304)), + ..Default::default() + }); assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(4194304)); } #[test] fn interner_size_explicit_bytes_takes_priority() { - let config = deser_config( - r#"{"dogstatsd_string_interner_size": 4096, "dogstatsd_string_interner_size_bytes": 8388608}"#, - ); + let config = DogStatsDConfiguration::new(SourceConfig { + context_string_interner_entry_count: 4096, + context_string_interner_size_bytes: Some(ByteSize::b(8388608)), + ..Default::default() + }); // The _bytes key (8 MiB) takes priority over the entry count. assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::b(8388608)); } #[test] fn interner_size_custom_entry_count() { - let config = deser_config(r#"{"dogstatsd_string_interner_size": 8192}"#); + let config = DogStatsDConfiguration::new(SourceConfig { + context_string_interner_entry_count: 8192, + ..Default::default() + }); // 8192 entries * 512 bytes = 4 MiB assert_eq!(config.effective_context_string_interner_bytes(), ByteSize::mib(4)); } @@ -2435,11 +1966,14 @@ mod tests { #[test] fn build_addresses_assertion_function_works() { let config = DogStatsDConfiguration { - port: 0, - tcp_port: 123, - socket_path: None, - socket_stream_path: None, - non_local_traffic: false, + source: SourceConfig { + port: 0, + tcp_port: 123, + socket_path: None, + socket_stream_path: None, + non_local_traffic: false, + ..Default::default() + }, ..Default::default() }; let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new( @@ -2455,11 +1989,14 @@ mod tests { #[test] fn build_addresses_no_listeners() { let config = DogStatsDConfiguration { - port: 0, - tcp_port: 0, - socket_path: None, - socket_stream_path: None, - non_local_traffic: false, + source: SourceConfig { + port: 0, + tcp_port: 0, + socket_path: None, + socket_stream_path: None, + non_local_traffic: false, + ..Default::default() + }, ..Default::default() }; let mut expected = vec![]; @@ -2471,11 +2008,14 @@ mod tests { #[test] fn build_addresses_udp_local_only() { let config = DogStatsDConfiguration { - port: 8125, - tcp_port: 0, - socket_path: None, - socket_stream_path: None, - non_local_traffic: false, + source: SourceConfig { + port: 8125, + tcp_port: 0, + socket_path: None, + socket_stream_path: None, + non_local_traffic: false, + ..Default::default() + }, ..Default::default() }; let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new( @@ -2490,11 +2030,14 @@ mod tests { #[test] fn build_addresses_udp_non_local_only() { let config = DogStatsDConfiguration { - port: 8125, - tcp_port: 0, - socket_path: None, - socket_stream_path: None, - non_local_traffic: true, + source: SourceConfig { + port: 8125, + tcp_port: 0, + socket_path: None, + socket_stream_path: None, + non_local_traffic: true, + ..Default::default() + }, ..Default::default() }; let mut expected = vec![ListenAddress::Udp(SocketAddr::V4(SocketAddrV4::new( @@ -2509,11 +2052,14 @@ mod tests { #[test] fn build_addresses_tcp_local_only() { let config = DogStatsDConfiguration { - port: 0, - tcp_port: 9000, - socket_path: None, - socket_stream_path: None, - non_local_traffic: false, + source: SourceConfig { + port: 0, + tcp_port: 9000, + socket_path: None, + socket_stream_path: None, + non_local_traffic: false, + ..Default::default() + }, ..Default::default() }; let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new( @@ -2528,11 +2074,14 @@ mod tests { #[test] fn build_addresses_tcp_non_local_only() { let config = DogStatsDConfiguration { - port: 0, - tcp_port: 9000, - socket_path: None, - socket_stream_path: None, - non_local_traffic: true, + source: SourceConfig { + port: 0, + tcp_port: 9000, + socket_path: None, + socket_stream_path: None, + non_local_traffic: true, + ..Default::default() + }, ..Default::default() }; let mut expected = vec![ListenAddress::Tcp(SocketAddr::V4(SocketAddrV4::new( @@ -2547,11 +2096,14 @@ mod tests { #[test] fn build_addresses_unixgram_only() { let config = DogStatsDConfiguration { - port: 0, - tcp_port: 0, - socket_path: Some("/tmp/dsd.sock".to_string()), - socket_stream_path: None, - non_local_traffic: false, + source: SourceConfig { + port: 0, + tcp_port: 0, + socket_path: Some("/tmp/dsd.sock".to_string()), + socket_stream_path: None, + non_local_traffic: false, + ..Default::default() + }, ..Default::default() }; let mut expected = vec![ListenAddress::Unixgram("/tmp/dsd.sock".into())]; @@ -2563,11 +2115,14 @@ mod tests { #[test] fn build_addresses_unix_stream_only() { let config = DogStatsDConfiguration { - port: 0, - tcp_port: 0, - socket_path: None, - socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()), - non_local_traffic: false, + source: SourceConfig { + port: 0, + tcp_port: 0, + socket_path: None, + socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()), + non_local_traffic: false, + ..Default::default() + }, ..Default::default() }; let mut expected = vec![ListenAddress::Unix("/tmp/dsd-stream.sock".into())]; @@ -2579,11 +2134,14 @@ mod tests { #[test] fn build_addresses_all_four_non_local() { let config = DogStatsDConfiguration { - port: 8125, - tcp_port: 9000, - socket_path: Some("/tmp/dsd.sock".to_string()), - socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()), - non_local_traffic: true, + source: SourceConfig { + port: 8125, + tcp_port: 9000, + socket_path: Some("/tmp/dsd.sock".to_string()), + socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()), + non_local_traffic: true, + ..Default::default() + }, ..Default::default() }; let mut expected = vec![ @@ -2600,11 +2158,14 @@ mod tests { #[test] fn build_addresses_all_four_local() { let config = DogStatsDConfiguration { - port: 8125, - tcp_port: 9000, - socket_path: Some("/tmp/dsd.sock".to_string()), - socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()), - non_local_traffic: false, + source: SourceConfig { + port: 8125, + tcp_port: 9000, + socket_path: Some("/tmp/dsd.sock".to_string()), + socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()), + non_local_traffic: false, + ..Default::default() + }, ..Default::default() }; let mut expected = vec![ @@ -2622,11 +2183,14 @@ mod tests { #[test] fn build_addresses_bind_host_applies_to_udp_and_tcp() { let config = DogStatsDConfiguration { - port: 8125, - tcp_port: 9000, - socket_path: Some("/tmp/dsd.sock".to_string()), - socket_stream_path: None, - non_local_traffic: false, + source: SourceConfig { + port: 8125, + tcp_port: 9000, + socket_path: Some("/tmp/dsd.sock".to_string()), + socket_stream_path: None, + non_local_traffic: false, + ..Default::default() + }, ..Default::default() }; let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50))); @@ -2645,11 +2209,14 @@ mod tests { #[test] fn build_addresses_non_local_clobbers_bind_host() { let config = DogStatsDConfiguration { - port: 8125, - tcp_port: 9000, - socket_path: None, - socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()), - non_local_traffic: true, + source: SourceConfig { + port: 8125, + tcp_port: 9000, + socket_path: None, + socket_stream_path: Some("/tmp/dsd-stream.sock".to_string()), + non_local_traffic: true, + ..Default::default() + }, ..Default::default() }; let bind_host = Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 50))); @@ -2680,45 +2247,17 @@ mod tests { } } - #[tokio::test] - async fn fix_empty_capture_path_sets_path_from_run_path() { - const RUN_PATH: &str = "/my/little/run_path"; - - let base_config_values = json!({ "run_path": RUN_PATH }); - let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await; - - let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize"); - - let expected = PathBuf::from(RUN_PATH).join(DOGSTATSD_CAPTURE_DIR); - assert_eq!(expected, dogstatsd_config.capture_path); - } - - #[tokio::test] - async fn fix_empty_capture_path_keeps_explicit_path() { - const RUN_PATH: &str = "/my/little/run_path"; - const CAPTURE_PATH: &str = "/custom/path/to/capture"; - - let base_config_values = json!({ "run_path": RUN_PATH, "dogstatsd_capture_path": CAPTURE_PATH }); - let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await; - - let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize"); - - assert_eq!(PathBuf::from(CAPTURE_PATH), dogstatsd_config.capture_path); - } - - #[tokio::test] - async fn from_configuration_normalizes_capture_depth() { - let cases = [ - (json!({}), MIN_CAPTURE_DEPTH), - (json!({ "dogstatsd_capture_depth": 0 }), MIN_CAPTURE_DEPTH), - (json!({ "dogstatsd_capture_depth": 2048 }), 2048), - ]; + #[test] + fn new_normalizes_capture_depth() { + let cases = [(0, MIN_CAPTURE_DEPTH), (512, MIN_CAPTURE_DEPTH), (2048, 2048)]; - for (base_config_values, expected_depth) in cases { - let (config, _) = ConfigurationLoader::for_tests(Some(base_config_values), None, false).await; - let dogstatsd_config = DogStatsDConfiguration::from_configuration(&config).expect("should deserialize"); + for (configured_depth, expected_depth) in cases { + let config = DogStatsDConfiguration::new(SourceConfig { + capture_depth: configured_depth, + ..Default::default() + }); - assert_eq!(expected_depth, dogstatsd_config.capture_depth); + assert_eq!(expected_depth, config.source.capture_depth); } } @@ -2798,29 +2337,3 @@ mod tests { ); } } - -#[cfg(test)] -mod config_smoke { - use datadog_agent_config_testing::config_registry::structs; - use datadog_agent_config_testing::run_config_smoke_tests; - use serde_json::json; - - use super::DogStatsDConfiguration; - use crate::config::{DatadogRemapper, KEY_ALIASES}; - - #[tokio::test] - async fn smoke_test() { - run_config_smoke_tests( - structs::DOGSTATSD_CONFIGURATION, - &[], - json!({}), - |cfg| { - cfg.as_typed::() - .expect("DogStatsDConfiguration should deserialize") - }, - KEY_ALIASES, - DatadogRemapper::new, - ) - .await - } -} diff --git a/lib/saluki-components/src/sources/dogstatsd/origin.rs b/lib/saluki-components/src/sources/dogstatsd/origin.rs index 5d99ab1a03..7fe542fb55 100644 --- a/lib/saluki-components/src/sources/dogstatsd/origin.rs +++ b/lib/saluki-components/src/sources/dogstatsd/origin.rs @@ -1,12 +1,12 @@ use std::sync::Arc; +use saluki_component_config::dogstatsd::OriginEnrichmentConfiguration; use saluki_context::{ origin::{OriginTagCardinality, OriginTagsResolver, RawOrigin}, tags::SharedTagSet, }; use saluki_env::{workload::origin::ResolvedOrigin, WorkloadProvider}; use saluki_io::deser::codec::dogstatsd::{EventPacket, MetricPacket, ServiceCheckPacket}; -use serde::Deserialize; use tracing::trace; use super::{replay::CapturedTaggerHandle, tags::WellKnownTags}; @@ -25,99 +25,6 @@ fn captured_process_id_from_replay(process_id: u32) -> Option { } } -const fn default_tag_cardinality() -> OriginTagCardinality { - OriginTagCardinality::Low -} - -const fn default_origin_detection_optout() -> bool { - true -} - -/// Origin enrichment configuration. -/// -/// Origin enrichment controls the when and how of enriching metrics ingested via DogStatsD based on various sources of -/// "origin" information, such as specific metric tags or UDS socket credentials. Enrichment involves adding additional -/// metric tags that describe the origin of the metric, such as the Kubernetes pod or container. -#[derive(Clone, Deserialize)] -#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))] -pub struct OriginEnrichmentConfiguration { - /// Whether or not to enable origin detection. - /// - /// If disabled, no origin tags will be added to events even if the origin information is detected. - /// - /// Defaults to `false`. - #[serde(rename = "dogstatsd_origin_detection", default)] - enabled: bool, - - /// Whether or not a client-provided entity ID should take precedence over automatically detected origin metadata. - /// - /// When a client-provided entity ID is specified, and an origin process ID has automatically been detected, setting - /// this to `true` will cause the origin process ID to be ignored. - /// - /// Defaults to `false`. - #[serde(rename = "dogstatsd_entity_id_precedence", default)] - entity_id_precedence: bool, - - /// The default cardinality of tags to enrich metrics with. - #[serde(rename = "dogstatsd_tag_cardinality", default = "default_tag_cardinality")] - tag_cardinality: OriginTagCardinality, - - /// Whether or not to use the unified origin detection behavior. - /// - /// When set to `true`, all detected entity IDs -- UDS Origin Detection, `dd.internal.entity_id`, container ID from - /// DogStatsD payload -- will be used for querying tags to enrich with. When set to `false`, the original precedence - /// behavior will be used, which enriches with the entity ID detected via Origin Detection first [1], and then - /// potentially again with either the client-provided entity ID (`dd.internal.entity_id`) or the container ID from - /// the DogStatsD payload, with the client-provided entity ID taking precedence. - /// - /// Defaults to `false`. - /// - /// [1]: if an entity ID was detected via Origin Detection, it's only used if either no client-provided entity ID - /// was present or if `entity_id_precedence` is set to `false`. - #[serde(rename = "origin_detection_unified", default)] - origin_detection_unified: bool, - - /// Whether or not to opt out of origin detection for DogStatsD metrics. - /// - /// When set to `true`, and the metric explicitly denotes a cardinality of `"none"`, origin enrichment will be - /// skipped. This is only applicable to DogStatsD metrics when unified origin detection behavior isn't enabled. - /// - /// Defaults to `true`. - #[serde( - rename = "dogstatsd_origin_optout_enabled", - default = "default_origin_detection_optout" - )] - origin_detection_optout: bool, - - /// Whether or not to parse client-provided origin fields from DogStatsD payloads. - /// - /// When enabled, the `c:` (Local Data), `e:` (External Data), and `card:` (Cardinality) protocol fields are - /// parsed and used for origin enrichment. - /// - /// Defaults to `false`. - #[serde(rename = "dogstatsd_origin_detection_client", default)] - pub(super) origin_detection_client: bool, -} - -impl Default for OriginEnrichmentConfiguration { - fn default() -> Self { - Self { - enabled: false, - entity_id_precedence: false, - tag_cardinality: default_tag_cardinality(), - origin_detection_unified: false, - origin_detection_optout: default_origin_detection_optout(), - origin_detection_client: false, - } - } -} - -impl OriginEnrichmentConfiguration { - pub(super) const fn enabled(&self) -> bool { - self.enabled - } -} - #[derive(Clone)] pub(super) struct DogStatsDOriginTagResolver { config: OriginEnrichmentConfiguration, diff --git a/lib/saluki-components/src/sources/dogstatsd/resolver.rs b/lib/saluki-components/src/sources/dogstatsd/resolver.rs index 469b1f3f23..fa9d9cbdb8 100644 --- a/lib/saluki-components/src/sources/dogstatsd/resolver.rs +++ b/lib/saluki-components/src/sources/dogstatsd/resolver.rs @@ -32,10 +32,10 @@ impl ContextResolvers { NonZeroUsize::new(config.effective_context_string_interner_bytes().as_u64() as usize) .ok_or_else(|| generic_error!("context_string_interner_size must be greater than 0"))?; - let cached_contexts_limit = config.cached_contexts_limit; - let cached_tagsets_limit = config.cached_tagsets_limit; - let context_expiry_seconds = Duration::from_secs(config.context_expiry_seconds); - let allow_context_heap_allocations = config.allow_context_heap_allocations; + let cached_contexts_limit = config.source.cached_contexts_limit; + let cached_tagsets_limit = config.source.cached_tagsets_limit; + let context_expiry_seconds = Duration::from_secs(config.source.context_expiry_seconds); + let allow_context_heap_allocations = config.source.allow_context_heap_allocations; let interner = GenericMapInterner::new(context_string_interner_size); diff --git a/test/integration/cases/adp-config-internal-dogstatsd/config.yaml b/test/integration/cases/adp-config-internal-dogstatsd/config.yaml new file mode 100644 index 0000000000..6e35128bd0 --- /dev/null +++ b/test/integration/cases/adp-config-internal-dogstatsd/config.yaml @@ -0,0 +1,39 @@ +type: integration +name: "adp-config-internal-dogstatsd" +description: "Verifies DogStatsD source config set via DD_* is translated end-to-end and served on /config/internal." +timeout: 120s +runtimes: [linux, mac, windows] + +env: + DD_API_KEY: "00000000000000000000000000000000" + DD_HOSTNAME: "integration-test-config-internal" + DD_DATA_PLANE_ENABLED: "true" + DD_DATA_PLANE_STANDALONE_MODE: "false" + DD_DATA_PLANE_DOGSTATSD_ENABLED: "true" + # A witnessed source field (driven by the Datadog translation) and a Saluki-schema-only source + # field (seeded). Asserting both on /config/internal proves the translation runs end-to-end. + DD_DOGSTATSD_PORT: "9999" + DD_DOGSTATSD_TCP_PORT: "8200" + +container: + exposed_ports: + - "55101/tcp" + +procedure: + - assertion: adp_config_key_equals + key: components.dogstatsd.source.port + value: 9999 + endpoint: "https://localhost:55101/config/internal" + timeout: 60s + - assertion: adp_config_key_equals + key: components.dogstatsd.source.tcp_port + value: 8200 + endpoint: "https://localhost:55101/config/internal" + timeout: 60s + - parallel: + - assertion: process_stable_for + duration: 10s + - assertion: log_not_contains + pattern: "panic|PANIC" + regex: true + during: 10s