diff --git a/Cargo.lock b/Cargo.lock index 786ef67883..bedc28c48b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,6 +49,7 @@ dependencies = [ "saluki-api", "saluki-app", "saluki-common", + "saluki-component-config", "saluki-components", "saluki-config-tools", "saluki-context", @@ -85,6 +86,7 @@ name = "agent-data-plane-config-system" version = "0.1.0" dependencies = [ "agent-data-plane-config", + "arc-swap", "bytesize", "datadog-agent-config", "datadog-agent-config-overlay-model", @@ -97,6 +99,7 @@ dependencies = [ "serde_json", "stringtheory", "tokio", + "tracing", ] [[package]] diff --git a/bin/agent-data-plane/Cargo.toml b/bin/agent-data-plane/Cargo.toml index 5c240f631e..901fb4d480 100644 --- a/bin/agent-data-plane/Cargo.toml +++ b/bin/agent-data-plane/Cargo.toml @@ -40,6 +40,7 @@ resource-accounting = { workspace = true } saluki-api = { workspace = true } saluki-app = { workspace = true } saluki-common = { workspace = true } +saluki-component-config = { workspace = true } saluki-components = { workspace = true } saluki-config-tools = { workspace = true } saluki-context = { workspace = true } diff --git a/bin/agent-data-plane/src/cli/run.rs b/bin/agent-data-plane/src/cli/run.rs index 2b41e86ccd..d7732765cd 100644 --- a/bin/agent-data-plane/src/cli/run.rs +++ b/bin/agent-data-plane/src/cli/run.rs @@ -47,7 +47,7 @@ use crate::{ components::{ apm_onboarding::ApmOnboardingConfiguration, dogstatsd_post_aggregate_filter::DogStatsDPostAggregateFilterConfiguration, - dogstatsd_prefix_filter::DogStatsDPrefixFilterConfiguration, host_tags::HostTagsConfiguration, + dogstatsd_prefix_filter::DogStatsDPrefixFilterBuilder, host_tags::HostTagsConfiguration, ottl_filter_processor::OttlFilterConfiguration, ottl_transform_processor::OttlTransformConfiguration, tag_filterlist::TagFilterlistConfiguration, }, @@ -710,7 +710,8 @@ async fn add_dsd_pipeline_to_blueprint( let dsd_config = DogStatsDConfiguration::new(system.saluki().components.dogstatsd.source) .with_workload_provider(env_provider.workload().clone()) .with_capture_entity_resolver(env_provider.workload().clone()); - let dsd_prefix_filter_configuration = DogStatsDPrefixFilterConfiguration::from_configuration(config)?; + let dsd_prefix_filter_configuration = + DogStatsDPrefixFilterBuilder::new(system.dynamic_handles().prefix_filter.clone()); let dsd_mapper_config = DogStatsDMapperConfiguration::from_configuration(config)?; let dsd_enrich_config = ChainedConfiguration::default().with_transform_builder("dogstatsd_mapper", dsd_mapper_config); diff --git a/bin/agent-data-plane/src/components/dogstatsd_filterlist.rs b/bin/agent-data-plane/src/components/dogstatsd_filterlist.rs index 37103bc798..a8d8e39bcd 100644 --- a/bin/agent-data-plane/src/components/dogstatsd_filterlist.rs +++ b/bin/agent-data-plane/src/components/dogstatsd_filterlist.rs @@ -70,7 +70,7 @@ impl Blocklist { /// /// The current `metric_filterlist` takes precedence when configured. Otherwise, the legacy /// `statsd_metric_blocklist` values are active. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq)] pub(super) struct EffectiveFilterlist { metric_filterlist: Vec, metric_filterlist_match_prefix: bool, @@ -107,6 +107,7 @@ impl EffectiveFilterlist { } /// Returns whether the current `metric_filterlist` is active. + #[allow(dead_code)] pub(super) fn metric_filterlist_is_active(&self) -> bool { !self.metric_filterlist.is_empty() } diff --git a/bin/agent-data-plane/src/components/dogstatsd_prefix_filter/mod.rs b/bin/agent-data-plane/src/components/dogstatsd_prefix_filter/mod.rs index 3e88fd5120..c5840c1b51 100644 --- a/bin/agent-data-plane/src/components/dogstatsd_prefix_filter/mod.rs +++ b/bin/agent-data-plane/src/components/dogstatsd_prefix_filter/mod.rs @@ -3,7 +3,8 @@ use async_trait::async_trait; use metrics::{Counter, Gauge}; use resource_accounting::{MemoryBounds, MemoryBoundsBuilder}; -use saluki_config_tools::GenericConfiguration; +use saluki_component_config::dogstatsd::PrefixFilterConfig; +use saluki_component_config::DynamicConfig; use saluki_core::data_model::event::{metric::Metric, EventType}; use saluki_core::{ components::{ @@ -15,92 +16,33 @@ use saluki_core::{ }; use saluki_error::GenericError; use saluki_metrics::MetricsBuilder; -use serde::Deserialize; use tokio::select; use tracing::{debug, error}; -use crate::components::dogstatsd_filterlist::{ - Blocklist, EffectiveFilterlist, METRIC_FILTERLIST_CONFIG_KEY, METRIC_FILTERLIST_MATCH_PREFIX_CONFIG_KEY, - STATSD_METRIC_BLOCKLIST_CONFIG_KEY, STATSD_METRIC_BLOCKLIST_MATCH_PREFIX_CONFIG_KEY, -}; +use crate::components::dogstatsd_filterlist::{Blocklist, EffectiveFilterlist}; const METRIC_FILTERLIST_SIZE_METRIC: &str = "metric_filterlist_size"; const METRIC_FILTERLIST_UPDATES_METRIC: &str = "metric_filterlist_updates_total"; const LISTENER_FILTERED_POINTS_METRIC: &str = "dogstatsd_listener_filtered_points_total"; -/// DogStatsD prefix filter transform. -/// -/// Appends a prefix to every metric if specified. +/// DogStatsD prefix filter transform builder. /// -/// Checks if a metric name should be allowed. -#[derive(Deserialize)] -#[cfg_attr(test, derive(Debug, derive_where::DeriveWhere, serde::Serialize))] -#[cfg_attr(test, derive_where(PartialEq))] -pub struct DogStatsDPrefixFilterConfiguration { - #[serde(default, rename = "statsd_metric_namespace")] - metric_prefix: String, - - #[serde( - default = "default_metric_prefix_blocklist", - rename = "statsd_metric_namespace_blocklist", - alias = "statsd_metric_namespace_blacklist" - )] - metric_prefix_blocklist: Vec, - - #[serde(default)] - metric_filterlist: Vec, - - #[serde(default)] - metric_filterlist_match_prefix: bool, - - #[serde(default, rename = "statsd_metric_blocklist")] - metric_blocklist: Vec, - - #[serde(default, rename = "statsd_metric_blocklist_match_prefix")] - metric_blocklist_match_prefix: bool, - - #[serde(skip)] - #[cfg_attr(test, derive_where(skip))] - configuration: Option, -} - -fn default_metric_prefix_blocklist() -> Vec { - vec![ - "datadog.agent".to_string(), - "datadog.dogstatsd".to_string(), - "datadog.process".to_string(), - "datadog.trace_agent".to_string(), - "datadog.tracer".to_string(), - "activemq".to_string(), - "activemq_58".to_string(), - "airflow".to_string(), - "cassandra".to_string(), - "confluent".to_string(), - "hazelcast".to_string(), - "hive".to_string(), - "ignite".to_string(), - "jboss".to_string(), - "jvm".to_string(), - "kafka".to_string(), - "presto".to_string(), - "sidekiq".to_string(), - "solr".to_string(), - "tomcat".to_string(), - "runtime".to_string(), - ] +/// Builds a [`DogStatsDPrefixFilter`] from a typed dynamic config handle. The builder holds no +/// raw-map references; it receives the initial and live prefix-filter configuration through a +/// [`DynamicConfig`] handle. +pub struct DogStatsDPrefixFilterBuilder { + config: DynamicConfig, } -impl DogStatsDPrefixFilterConfiguration { - /// Creates a new `DogStatsDPrefixFilterConfiguration` from the given configuration. - pub fn from_configuration(config: &GenericConfiguration) -> Result { - let mut typed_config: DogStatsDPrefixFilterConfiguration = config.as_typed()?; - typed_config.configuration = Some(config.clone()); - Ok(typed_config) +impl DogStatsDPrefixFilterBuilder { + /// Creates a new builder backed by the given typed config handle. + pub fn new(config: DynamicConfig) -> Self { + Self { config } } } #[async_trait] -impl TransformBuilder for DogStatsDPrefixFilterConfiguration { +impl TransformBuilder for DogStatsDPrefixFilterBuilder { fn input_event_type(&self) -> EventType { EventType::Metric } @@ -111,27 +53,29 @@ impl TransformBuilder for DogStatsDPrefixFilterConfiguration { } async fn build(&self, context: ComponentContext) -> Result, GenericError> { - // Ensure our metric prefix has a trailing period so that we don't have to check for, and possibly add it, when we're - // actually processing metrics. - let mut metric_prefix = self.metric_prefix.clone(); - if !metric_prefix.is_empty() && !metric_prefix.ends_with(".") { + let snapshot = self.config.current(); + + let mut metric_prefix = snapshot.metric_prefix.clone(); + if !metric_prefix.is_empty() && !metric_prefix.ends_with('.') { metric_prefix.push('.'); } + let metrics_builder = MetricsBuilder::from_component_context(&context); let effective_filterlist = EffectiveFilterlist::new( - self.metric_filterlist.clone(), - self.metric_filterlist_match_prefix, - self.metric_blocklist.clone(), - self.metric_blocklist_match_prefix, + snapshot.metric_filterlist.clone(), + snapshot.metric_filterlist_match_prefix, + snapshot.metric_blocklist.clone(), + snapshot.metric_blocklist_match_prefix, ); let telemetry = FilterlistTelemetry::new(&metrics_builder); + let mut filter = DogStatsDPrefixFilter { metric_prefix, - metric_prefix_blocklist: self.metric_prefix_blocklist.clone(), + metric_prefix_blocklist: snapshot.metric_prefix_blocklist.clone(), matcher: Blocklist::default(), effective_filterlist, telemetry, - configuration: self.configuration.clone(), + config: self.config.clone(), }; filter.sync_effective_blocklist(false); @@ -139,9 +83,8 @@ impl TransformBuilder for DogStatsDPrefixFilterConfiguration { } } -impl MemoryBounds for DogStatsDPrefixFilterConfiguration { +impl MemoryBounds for DogStatsDPrefixFilterBuilder { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { - // Capture the size of the heap allocation when the component is built. builder .minimum() .with_single_value::("component struct"); @@ -192,7 +135,7 @@ struct DogStatsDPrefixFilter { matcher: Blocklist, effective_filterlist: EffectiveFilterlist, telemetry: FilterlistTelemetry, - configuration: Option, + config: DynamicConfig, } impl DogStatsDPrefixFilter { @@ -205,32 +148,6 @@ impl DogStatsDPrefixFilter { } } - fn update_metric_filterlist(&mut self, metric_filterlist: Vec) { - let count_update = self.effective_filterlist.metric_filterlist_is_active() || !metric_filterlist.is_empty(); - self.effective_filterlist.set_metric_filterlist(metric_filterlist); - self.sync_effective_blocklist(count_update); - } - - fn update_metric_blocklist(&mut self, metric_blocklist: Vec) { - let count_update = !self.effective_filterlist.metric_filterlist_is_active(); - self.effective_filterlist.set_metric_blocklist(metric_blocklist); - self.sync_effective_blocklist(count_update); - } - - fn update_metric_filterlist_match_prefix(&mut self, match_prefix: bool) { - let count_update = self.effective_filterlist.metric_filterlist_is_active(); - self.effective_filterlist - .set_metric_filterlist_match_prefix(match_prefix); - self.sync_effective_blocklist(count_update); - } - - fn update_metric_blocklist_match_prefix(&mut self, match_prefix: bool) { - let count_update = !self.effective_filterlist.metric_filterlist_is_active(); - self.effective_filterlist - .set_metric_blocklist_match_prefix(match_prefix); - self.sync_effective_blocklist(count_update); - } - fn process_metric(&self, metric: &mut Metric) -> bool { let metric_name = metric.context().name().as_ref(); @@ -241,8 +158,6 @@ impl DogStatsDPrefixFilter { return false; } } else { - // We don't want to prefix the metric if it has a prefix that is on our _prefix_ blocklist, - // which ensures we don't prefix metrics that are already prefixed. let new_metric_name = if self.has_excluded_prefix(metric_name) { metric.context().name().clone() } else { @@ -257,7 +172,6 @@ impl DogStatsDPrefixFilter { return false; } - // Update metric with new name. let new_context = metric.context().with_name(new_metric_name); let existing_context = metric.context_mut(); *existing_context = new_context; @@ -281,13 +195,6 @@ impl Transform for DogStatsDPrefixFilter { let mut health = context.take_health_handle(); health.mark_ready(); - let config = self.configuration.as_ref().unwrap(); - let mut filterlist_watcher = config.watch_for_updates(METRIC_FILTERLIST_CONFIG_KEY); - let mut filterlist_match_prefix_watcher = config.watch_for_updates(METRIC_FILTERLIST_MATCH_PREFIX_CONFIG_KEY); - let mut blocklist_watcher = config.watch_for_updates(STATSD_METRIC_BLOCKLIST_CONFIG_KEY); - let mut blocklist_match_prefix_watcher = - config.watch_for_updates(STATSD_METRIC_BLOCKLIST_MATCH_PREFIX_CONFIG_KEY); - debug!("DogStatsD Prefix Filter transform started."); loop { @@ -296,8 +203,6 @@ impl Transform for DogStatsDPrefixFilter { maybe_events = context.events().next() => match maybe_events { Some(mut events) => { events.remove_if(|event| match event.try_as_metric_mut() { - // `process_metric` returns `true` if the metric should be kept, so we have to invert that - // here to match the predicate structure, which will _remove_ the event if `true` is returned. Some(metric) => !self.process_metric(metric), None => true, }); @@ -308,28 +213,17 @@ impl Transform for DogStatsDPrefixFilter { }, None => break, }, - (_, maybe_new_metric_filterlist) = filterlist_watcher.changed::>() => { - if let Some(new_filterlist) = maybe_new_metric_filterlist { - debug!(?new_filterlist, "Updated metric filterlist."); - self.update_metric_filterlist(new_filterlist); - } - }, - (_, maybe_new_filterlist_match_prefix) = filterlist_match_prefix_watcher.changed::() => { - if let Some(new_match_prefix) = maybe_new_filterlist_match_prefix { - debug!(new_match_prefix, "Updated metric filterlist match prefix."); - self.update_metric_filterlist_match_prefix(new_match_prefix); - } - }, - (_, maybe_new_blocklist) = blocklist_watcher.changed::>() => { - if let Some(new_blocklist) = maybe_new_blocklist { - debug!(?new_blocklist, "Updated metric blocklist."); - self.update_metric_blocklist(new_blocklist); - } - }, - (_, maybe_new_blocklist_match_prefix) = blocklist_match_prefix_watcher.changed::() => { - if let Some(new_match_prefix) = maybe_new_blocklist_match_prefix { - debug!(new_match_prefix, "Updated metric blocklist match prefix."); - self.update_metric_blocklist_match_prefix(new_match_prefix); + _ = self.config.changed() => { + let new_config = self.config.current(); + let new_filterlist = EffectiveFilterlist::new( + new_config.metric_filterlist, + new_config.metric_filterlist_match_prefix, + new_config.metric_blocklist, + new_config.metric_blocklist_match_prefix, + ); + if new_filterlist != self.effective_filterlist { + self.effective_filterlist = new_filterlist; + self.sync_effective_blocklist(true); } }, } @@ -344,21 +238,40 @@ impl Transform for DogStatsDPrefixFilter { #[cfg(test)] mod tests { use metrics::set_default_local_recorder; - use saluki_config_tools::{dynamic::ConfigUpdate, ConfigurationLoader}; + use saluki_component_config::DynamicConfig; use saluki_metrics::{test::TestRecorder, MetricsBuilder}; use super::*; + fn fixed_filter(pf: PrefixFilterConfig) -> DogStatsDPrefixFilter { + let effective_filterlist = EffectiveFilterlist::new( + pf.metric_filterlist.clone(), + pf.metric_filterlist_match_prefix, + pf.metric_blocklist.clone(), + pf.metric_blocklist_match_prefix, + ); + DogStatsDPrefixFilter { + metric_prefix: { + let mut p = pf.metric_prefix.clone(); + if !p.is_empty() && !p.ends_with('.') { + p.push('.'); + } + p + }, + metric_prefix_blocklist: pf.metric_prefix_blocklist.clone(), + matcher: effective_filterlist.to_matcher(), + effective_filterlist, + telemetry: FilterlistTelemetry::noop(), + config: DynamicConfig::fixed(pf), + } + } + #[test] fn test_metric_prefix_add() { - let filter = DogStatsDPrefixFilter { - metric_prefix: "foo.".to_string(), - metric_prefix_blocklist: vec![], - matcher: Blocklist::default(), - effective_filterlist: EffectiveFilterlist::default(), - telemetry: FilterlistTelemetry::noop(), - configuration: None, - }; + let filter = fixed_filter(PrefixFilterConfig { + metric_prefix: "foo".to_string(), + ..Default::default() + }); let mut metric = Metric::gauge("bar", 1.0); assert!(filter.process_metric(&mut metric)); @@ -367,14 +280,11 @@ mod tests { #[test] fn test_metric_prefix_blocklist() { - let filter = DogStatsDPrefixFilter { + let filter = fixed_filter(PrefixFilterConfig { metric_prefix: "foo".to_string(), metric_prefix_blocklist: vec!["foo".to_string(), "bar".to_string()], - matcher: Blocklist::default(), - effective_filterlist: EffectiveFilterlist::default(), - telemetry: FilterlistTelemetry::noop(), - configuration: None, - }; + ..Default::default() + }); let mut metric = Metric::gauge("barbar", 1.0); assert!(filter.process_metric(&mut metric)); @@ -389,7 +299,7 @@ mod tests { matcher: Blocklist::new(["foobar", "test"], false), effective_filterlist: EffectiveFilterlist::default(), telemetry: FilterlistTelemetry::noop(), - configuration: None, + config: DynamicConfig::fixed(PrefixFilterConfig::default()), }; let mut metric = Metric::gauge("foobar", 1.0); @@ -408,7 +318,7 @@ mod tests { matcher: Blocklist::new(["foo.bar", "test"], false), effective_filterlist: EffectiveFilterlist::default(), telemetry: FilterlistTelemetry::noop(), - configuration: None, + config: DynamicConfig::fixed(PrefixFilterConfig::default()), }; let mut metric = Metric::gauge("bar", 1.0); @@ -420,7 +330,7 @@ mod tests { matcher: Blocklist::default(), effective_filterlist: EffectiveFilterlist::default(), telemetry: FilterlistTelemetry::noop(), - configuration: None, + config: DynamicConfig::fixed(PrefixFilterConfig::default()), }; let mut metric = Metric::gauge("foo", 1.0); @@ -436,14 +346,12 @@ mod tests { matcher: Blocklist::new(["b", "test"], true), effective_filterlist: EffectiveFilterlist::default(), telemetry: FilterlistTelemetry::noop(), - configuration: None, + config: DynamicConfig::fixed(PrefixFilterConfig::default()), }; - // match prefix is true, "bar" has prefix "b" let mut metric = Metric::gauge("bar", 1.0); assert!(!filter.process_metric(&mut metric)); - // match prefix is true, "test" has prefix "test" let mut metric = Metric::gauge("test", 1.0); assert!(!filter.process_metric(&mut metric)); } @@ -456,158 +364,65 @@ mod tests { matcher: Blocklist::new(["fo", "test"], true), effective_filterlist: EffectiveFilterlist::default(), telemetry: FilterlistTelemetry::noop(), - configuration: None, + config: DynamicConfig::fixed(PrefixFilterConfig::default()), }; - // new_metric is "foo.bar", match prefix is true, "foo.bar" has prefix "fo" let mut metric = Metric::gauge("bar", 1.0); assert!(!filter.process_metric(&mut metric)); } #[tokio::test] - async fn test_metric_blocklist_dynamic_update() { - let (cfg, sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await; - let sender = sender.expect("sender should exist"); - sender - .send(ConfigUpdate::Snapshot(serde_json::json!({}))) - .await - .unwrap(); + async fn dynamic_update_changes_filterlist() { + let (tx, rx) = tokio::sync::watch::channel(PrefixFilterConfig { + metric_blocklist: vec!["foobar".to_string(), "test".to_string()], + ..Default::default() + }); - cfg.ready().await; + let mut handle = DynamicConfig::live(PrefixFilterConfig::default(), rx); + handle.changed().await; + let snap = handle.current(); + + let effective_filterlist = EffectiveFilterlist::new( + snap.metric_filterlist.clone(), + snap.metric_filterlist_match_prefix, + snap.metric_blocklist.clone(), + snap.metric_blocklist_match_prefix, + ); let mut filter = DogStatsDPrefixFilter { metric_prefix: "".to_string(), metric_prefix_blocklist: vec![], - matcher: Blocklist::new(["foobar", "test"], false), - effective_filterlist: EffectiveFilterlist::new( - Vec::new(), - false, - vec!["foobar".to_string(), "test".to_string()], - false, - ), + matcher: effective_filterlist.to_matcher(), + effective_filterlist, telemetry: FilterlistTelemetry::noop(), - configuration: Some(cfg.clone()), + config: handle, }; let mut metric = Metric::gauge("foobar", 1.0); assert!(!filter.process_metric(&mut metric)); - let mut metric = Metric::gauge("foo", 1.0); - assert!(filter.process_metric(&mut metric)); - assert_eq!(metric.context().name(), "foo"); - - let mut blocklist_watcher = cfg.watch_for_updates("statsd_metric_blocklist"); - - sender - .send(ConfigUpdate::Partial { - key: "statsd_metric_blocklist".to_string(), - value: serde_json::json!(["foo".to_string()]), - }) - .await - .unwrap(); - - let (_, new) = tokio::time::timeout( - std::time::Duration::from_secs(2), - blocklist_watcher.changed::>(), - ) - .await - .expect("timed out waiting for statsd_metric_blocklist update"); - - assert_eq!(new, Some(vec!["foo".to_string()])); - - // Apply the dynamic update to the filter under test. - filter.update_metric_blocklist(new.unwrap()); + tx.send(PrefixFilterConfig { + metric_blocklist: vec!["foo".to_string()], + ..Default::default() + }) + .unwrap(); + + filter.config.changed().await; + let new_config = filter.config.current(); + let new_filterlist = EffectiveFilterlist::new( + new_config.metric_filterlist, + new_config.metric_filterlist_match_prefix, + new_config.metric_blocklist, + new_config.metric_blocklist_match_prefix, + ); + filter.effective_filterlist = new_filterlist; + filter.sync_effective_blocklist(true); - // "foobar" is taken off the blocklist let mut metric = Metric::gauge("foobar", 1.0); assert!(filter.process_metric(&mut metric)); - assert_eq!(metric.context().name(), "foobar"); - // "foo" is added to the blocklist let mut metric = Metric::gauge("foo", 1.0); assert!(!filter.process_metric(&mut metric)); - - let mut metric_filterlist_watcher = cfg.watch_for_updates("metric_filterlist"); - sender - .send(ConfigUpdate::Partial { - key: "metric_filterlist".to_string(), - value: serde_json::json!(["baz".to_string()]), - }) - .await - .unwrap(); - - let (_, new) = tokio::time::timeout( - std::time::Duration::from_secs(2), - metric_filterlist_watcher.changed::>(), - ) - .await - .expect("timed out waiting for metric_filterlist update"); - - assert_eq!(new, Some(vec!["baz".to_string()])); - - // Apply the dynamic update to the filter under test. - filter.update_metric_filterlist(new.unwrap()); - - // "baz" is added to the filterlist - let mut metric = Metric::gauge("baz", 1.0); - assert!(!filter.process_metric(&mut metric)); - } - - #[tokio::test] - async fn test_metric_filterlist_match_prefix_dynamic_update_is_applied() { - let (cfg, sender) = ConfigurationLoader::for_tests( - Some(serde_json::json!({ - "metric_filterlist": ["foo"], - "metric_filterlist_match_prefix": false - })), - None, - true, - ) - .await; - let sender = sender.expect("sender should exist"); - - sender - .send(ConfigUpdate::Snapshot(serde_json::json!({}))) - .await - .unwrap(); - cfg.ready().await; - - let mut filter = DogStatsDPrefixFilter { - metric_prefix: "".to_string(), - metric_prefix_blocklist: vec![], - matcher: Blocklist::new(["foo"], false), - effective_filterlist: EffectiveFilterlist::new(vec!["foo".to_string()], false, Vec::new(), false), - telemetry: FilterlistTelemetry::noop(), - configuration: Some(cfg.clone()), - }; - - let mut metric = Metric::gauge("foo.bar", 1.0); - assert!(filter.process_metric(&mut metric)); - - let mut match_prefix_watcher = cfg.watch_for_updates("metric_filterlist_match_prefix"); - sender - .send(ConfigUpdate::Partial { - key: "metric_filterlist_match_prefix".to_string(), - value: serde_json::json!(true), - }) - .await - .unwrap(); - - let (_, new) = tokio::time::timeout( - std::time::Duration::from_secs(2), - match_prefix_watcher.changed::(), - ) - .await - .expect("timed out waiting for metric_filterlist_match_prefix update"); - - assert_eq!(new, Some(true)); - - // Apply the dynamic update to the filter under test. - filter.update_metric_filterlist_match_prefix(new.unwrap()); - - let mut metric = Metric::gauge("foo.bar", 1.0); - assert!(!filter.process_metric(&mut metric)); - assert_eq!(filter.matcher, Blocklist::new(["foo"], true)); } #[test] @@ -627,11 +442,20 @@ mod tests { false, ), telemetry, - configuration: None, + config: DynamicConfig::fixed(PrefixFilterConfig::default()), }; filter.sync_effective_blocklist(false); - filter.update_metric_blocklist(vec!["ignored".to_string(), "still_ignored".to_string()]); + + let new_filterlist = EffectiveFilterlist::new( + vec!["preferred".to_string()], + false, + vec!["ignored".to_string(), "still_ignored".to_string()], + false, + ); + // Blocklist update while filterlist is active: should not count as update. + filter.effective_filterlist = new_filterlist; + filter.sync_effective_blocklist(false); assert_eq!(recorder.counter(METRIC_FILTERLIST_UPDATES_METRIC), Some(0)); assert_eq!(recorder.gauge(METRIC_FILTERLIST_SIZE_METRIC), Some(1.0)); @@ -642,7 +466,15 @@ mod tests { let mut metric = Metric::gauge("ignored", 1.0); assert!(filter.process_metric(&mut metric)); - filter.update_metric_filterlist(Vec::new()); + // Clear filterlist: blocklist becomes active, counts as update. + let new_filterlist = EffectiveFilterlist::new( + Vec::new(), + false, + vec!["ignored".to_string(), "still_ignored".to_string()], + false, + ); + filter.effective_filterlist = new_filterlist; + filter.sync_effective_blocklist(true); assert_eq!(recorder.counter(METRIC_FILTERLIST_UPDATES_METRIC), Some(1)); assert_eq!(recorder.gauge(METRIC_FILTERLIST_SIZE_METRIC), Some(2.0)); @@ -651,31 +483,6 @@ mod tests { assert!(!filter.process_metric(&mut metric)); } - #[test] - fn telemetry_counts_active_reconfiguration_even_if_matcher_is_unchanged() { - let recorder = TestRecorder::default(); - let _local = set_default_local_recorder(&recorder); - - let telemetry = FilterlistTelemetry::new(&MetricsBuilder::default()); - let mut filter = DogStatsDPrefixFilter { - metric_prefix: "".to_string(), - metric_prefix_blocklist: vec![], - matcher: Blocklist::default(), - effective_filterlist: EffectiveFilterlist::new(vec!["foo".to_string()], true, Vec::new(), false), - telemetry, - configuration: None, - }; - - filter.sync_effective_blocklist(false); - filter.update_metric_filterlist(vec!["foo".to_string(), "foobar".to_string()]); - - assert_eq!(recorder.counter(METRIC_FILTERLIST_UPDATES_METRIC), Some(1)); - assert_eq!(recorder.gauge(METRIC_FILTERLIST_SIZE_METRIC), Some(2.0)); - - let mut metric = Metric::gauge("foobar.baz", 1.0); - assert!(!filter.process_metric(&mut metric)); - } - #[test] fn telemetry_counts_listener_filtered_points() { let recorder = TestRecorder::default(); @@ -688,7 +495,7 @@ mod tests { matcher: Blocklist::new(["foo", "bar"], true), effective_filterlist: EffectiveFilterlist::default(), telemetry, - configuration: None, + config: DynamicConfig::fixed(PrefixFilterConfig::default()), }; let mut exact_metric = Metric::gauge("foo", 1.0); @@ -700,29 +507,3 @@ mod tests { assert_eq!(recorder.counter(LISTENER_FILTERED_POINTS_METRIC), Some(2)); } } - -#[cfg(test)] -mod config_smoke { - use datadog_agent_config_testing::config_registry::structs; - use datadog_agent_config_testing::run_config_smoke_tests; - use saluki_components::config::{DatadogRemapper, KEY_ALIASES}; - use serde_json::json; - - use super::DogStatsDPrefixFilterConfiguration; - - #[tokio::test] - async fn smoke_test() { - run_config_smoke_tests( - structs::DOGSTATSD_PREFIX_FILTER_CONFIGURATION, - &[], - json!({}), - |cfg| { - cfg.as_typed::() - .expect("DogStatsDPrefixFilterConfiguration should deserialize") - }, - KEY_ALIASES, - DatadogRemapper::new, - ) - .await - } -} diff --git a/lib/agent-data-plane-config-system/Cargo.toml b/lib/agent-data-plane-config-system/Cargo.toml index 2b8c254d9a..a3b24b59e8 100644 --- a/lib/agent-data-plane-config-system/Cargo.toml +++ b/lib/agent-data-plane-config-system/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] agent-data-plane-config = { workspace = true } +arc-swap = { workspace = true } bytesize = { workspace = true } datadog-agent-config = { workspace = true } saluki-component-config = { workspace = true } @@ -18,6 +19,8 @@ saluki-context = { workspace = true } saluki-error = { workspace = true } serde_json = { workspace = true } stringtheory = { workspace = true } +tokio = { workspace = true, features = ["sync"] } +tracing = { workspace = true } [build-dependencies] datadog-agent-config-overlay-model = { workspace = true } diff --git a/lib/agent-data-plane-config-system/src/dynamic.rs b/lib/agent-data-plane-config-system/src/dynamic.rs new file mode 100644 index 0000000000..ee96fed1a3 --- /dev/null +++ b/lib/agent-data-plane-config-system/src/dynamic.rs @@ -0,0 +1,258 @@ +//! Dynamic typed-config router. +//! +//! Watches for raw-map update signals on [`GenericConfiguration`], re-translates the full Datadog +//! config, atomically swaps the current [`SalukiConfiguration`], and publishes changed slices to +//! their [`DynamicConfig`] handles. + +use std::sync::Arc; + +use agent_data_plane_config::{SalukiConfiguration, SalukiOnlyConfiguration}; +use arc_swap::ArcSwap; +use datadog_agent_config::DatadogConfiguration; +use saluki_component_config::dogstatsd::PrefixFilterConfig; +use saluki_component_config::DynamicConfig; +use saluki_config_tools::GenericConfiguration; +use tokio::sync::watch; +use tracing::warn; + +use crate::translate::translate; + +/// Typed dynamic config handles exposed to components. +/// +/// Each field is a [`DynamicConfig`] handle for one native config slice. The handle is either +/// [`DynamicConfig::Live`] (when the retained [`GenericConfiguration`] has an update subscription) +/// or [`DynamicConfig::Fixed`] (when dynamic updates are disabled). +pub struct DynamicConfigHandles { + /// Live handle for the DogStatsD prefix-filter config slice. + pub prefix_filter: DynamicConfig, +} + +/// Starts the dynamic config router if the retained map supports update subscriptions. +/// +/// Returns the handles and (when live) spawns a background task that re-translates on each raw-map +/// update. When dynamic updates are disabled, returns fixed handles seeded from `initial`. +pub(crate) fn start_dynamic_router( + config: &GenericConfiguration, current: &Arc>, saluki_only: &SalukiOnlyConfiguration, +) -> DynamicConfigHandles { + let initial_pf = current.load().components.dogstatsd.prefix_filter.clone(); + + let events_rx = config.subscribe_for_updates(); + if events_rx.is_none() { + return DynamicConfigHandles { + prefix_filter: DynamicConfig::fixed(initial_pf), + }; + } + let mut events_rx = events_rx.unwrap(); + + let (pf_tx, mut pf_rx) = watch::channel(initial_pf.clone()); + // Mark the initial value as seen so `changed()` only wakes on actual updates. + pf_rx.borrow_and_update(); + + let current = Arc::clone(current); + let config = config.clone(); + let saluki_only = saluki_only.clone(); + + tokio::spawn(async move { + loop { + let event = events_rx.recv().await; + match event { + Ok(_) => {} + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + // Missed signals; the retained map reflects the latest state. + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + + let datadog: DatadogConfiguration = match config.as_typed() { + Ok(d) => d, + Err(e) => { + warn!(error = %e, "Dynamic config translation failed to parse; keeping last-good config."); + continue; + } + }; + + let new_saluki = match translate(&saluki_only, &datadog) { + Ok(s) => s, + Err(e) => { + warn!(error = %e, "Dynamic config translation failed; keeping last-good config."); + continue; + } + }; + + let new_pf = new_saluki.components.dogstatsd.prefix_filter.clone(); + + current.store(Arc::new(new_saluki)); + + pf_tx.send_if_modified(|current_pf| { + if *current_pf != new_pf { + *current_pf = new_pf; + true + } else { + false + } + }); + } + }); + + DynamicConfigHandles { + prefix_filter: DynamicConfig::live(initial_pf, pf_rx), + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use saluki_config_tools::{dynamic::ConfigUpdate, ConfigurationLoader}; + use serde_json::json; + use tokio::time::timeout; + + use super::*; + use crate::system::ConfigurationSystem; + + async fn system_with_dynamic( + initial: serde_json::Value, + ) -> (Arc, tokio::sync::mpsc::Sender) { + let (cfg, sender) = ConfigurationLoader::for_tests(Some(initial), None, true).await; + let sender = sender.expect("dynamic sender"); + sender.send(ConfigUpdate::Snapshot(json!({}))).await.unwrap(); + cfg.ready().await; + + let system = Arc::new(ConfigurationSystem::load(cfg).start().await.expect("start succeeds")); + (system, sender) + } + + #[tokio::test] + async fn dynamic_update_wakes_prefix_handle() { + let (system, sender) = system_with_dynamic(json!({})).await; + let mut handle = system.dynamic_handles().prefix_filter.clone(); + assert!(handle.is_live()); + + sender + .send(ConfigUpdate::Partial { + key: "metric_filterlist".to_string(), + value: json!(["test.prefix"]), + }) + .await + .unwrap(); + + timeout(Duration::from_secs(2), handle.changed()) + .await + .expect("handle should wake on metric_filterlist update"); + + let snap = handle.current(); + assert_eq!(snap.metric_filterlist, vec!["test.prefix"]); + } + + #[tokio::test] + async fn repeated_same_slice_does_not_wake_handle() { + let (system, sender) = system_with_dynamic(json!({})).await; + let mut handle = system.dynamic_handles().prefix_filter.clone(); + + // Set metric_filterlist to ["a"] to establish a baseline. + sender + .send(ConfigUpdate::Partial { + key: "metric_filterlist".to_string(), + value: json!(["a"]), + }) + .await + .unwrap(); + timeout(Duration::from_secs(2), handle.changed()) + .await + .expect("first update should wake"); + assert_eq!(handle.current().metric_filterlist, vec!["a"]); + + // Send the same value again; the handle should not wake. + sender + .send(ConfigUpdate::Partial { + key: "metric_filterlist".to_string(), + value: json!(["a"]), + }) + .await + .unwrap(); + + // Give the router time to process before asserting. + tokio::task::yield_now().await; + let result = timeout(Duration::from_millis(200), handle.changed()).await; + assert!(result.is_err(), "handle should not wake when slice is unchanged"); + } + + #[tokio::test] + async fn unrelated_key_update_does_not_wake_handle() { + let (system, sender) = system_with_dynamic(json!({})).await; + let mut handle = system.dynamic_handles().prefix_filter.clone(); + + sender + .send(ConfigUpdate::Partial { + key: "log_level".to_string(), + value: json!("debug"), + }) + .await + .unwrap(); + + let result = timeout(Duration::from_millis(200), handle.changed()).await; + assert!(result.is_err(), "handle should not wake on unrelated key"); + } + + #[tokio::test] + async fn malformed_update_keeps_last_good_config() { + let (system, sender) = system_with_dynamic(json!({ "metric_filterlist": ["original"] })).await; + let mut handle = system.dynamic_handles().prefix_filter.clone(); + + let original = handle.current(); + assert_eq!(original.metric_filterlist, vec!["original"]); + + // Send a value that will cause DatadogConfiguration parse to accept but translation to + // produce a different model; verify the original is retained if we send something that + // changes a different field, then confirm we can still get updates after recovery. + sender + .send(ConfigUpdate::Partial { + key: "dogstatsd_tag_cardinality".to_string(), + value: json!("bogus_not_a_real_cardinality"), + }) + .await + .unwrap(); + + // The malformed cardinality causes a translation error; the last-good config is retained. + let result = timeout(Duration::from_millis(200), handle.changed()).await; + assert!(result.is_err(), "handle should not wake on malformed update"); + + assert_eq!(handle.current().metric_filterlist, vec!["original"]); + assert_eq!( + system.saluki().components.dogstatsd.prefix_filter.metric_filterlist, + vec!["original"] + ); + } + + #[tokio::test] + async fn no_dynamic_returns_fixed_handles() { + let cfg = ConfigurationLoader::for_tests(Some(json!({})), None, false).await.0; + + let system = ConfigurationSystem::load(cfg).start().await.expect("start succeeds"); + + assert!(!system.dynamic_handles().prefix_filter.is_live()); + } + + #[tokio::test] + async fn saluki_reflects_dynamic_update() { + let (system, sender) = system_with_dynamic(json!({})).await; + + sender + .send(ConfigUpdate::Partial { + key: "metric_filterlist".to_string(), + value: json!(["updated"]), + }) + .await + .unwrap(); + + let mut handle = system.dynamic_handles().prefix_filter.clone(); + timeout(Duration::from_secs(2), handle.changed()) + .await + .expect("handle should wake"); + + assert_eq!( + system.saluki().components.dogstatsd.prefix_filter.metric_filterlist, + vec!["updated"] + ); + } +} diff --git a/lib/agent-data-plane-config-system/src/lib.rs b/lib/agent-data-plane-config-system/src/lib.rs index 9b370e0307..2024ece1da 100644 --- a/lib/agent-data-plane-config-system/src/lib.rs +++ b/lib/agent-data-plane-config-system/src/lib.rs @@ -49,9 +49,11 @@ //! // TODO(visibility): add crate-boundary architectural guard when arch tests are wired up +mod dynamic; mod system; pub(crate) mod translate; +pub use self::dynamic::DynamicConfigHandles; pub use self::system::{ConfigurationSystem, ConfigurationSystemLoader}; #[cfg(test)] diff --git a/lib/agent-data-plane-config-system/src/system.rs b/lib/agent-data-plane-config-system/src/system.rs index 93336e9073..a0cd2ff0fa 100644 --- a/lib/agent-data-plane-config-system/src/system.rs +++ b/lib/agent-data-plane-config-system/src/system.rs @@ -22,26 +22,33 @@ //! [`SalukiConfiguration`]. The master is translated once at `start()` and frozen for the process //! lifetime; only the DogStatsD source is real in this stage. +use std::sync::Arc; + use agent_data_plane_config::{SalukiConfiguration, SalukiOnlyConfiguration}; +use arc_swap::ArcSwap; use bytesize::ByteSize; use datadog_agent_config::DatadogConfiguration; use saluki_config_tools::GenericConfiguration; use saluki_error::{ErrorContext as _, GenericError}; +use crate::dynamic::{self, DynamicConfigHandles}; use crate::translate::translate; /// The running configuration system. /// -/// Holds the retained source map and the translated master [`SalukiConfiguration`]. The master is a -/// plain, immutable field: translated once at [`ConfigurationSystemLoader::start`] and never -/// mutated. A later dynamic flip swaps this for a swappable container without changing -/// [`saluki`](Self::saluki) or [`raw_map`](Self::raw_map). +/// Holds the retained source map and the translated [`SalukiConfiguration`]. The current model is +/// stored in an [`ArcSwap`] so the dynamic router can atomically publish retranslated configs +/// without blocking readers. [`saluki`](Self::saluki) loads the current snapshot; +/// [`dynamic_handles`](Self::dynamic_handles) exposes typed live handles for components. pub struct ConfigurationSystem { /// The retained source map. The single source of raw config for un-flipped components. config: GenericConfiguration, - /// The translated master. Frozen at `start()`; only the DogStatsD source is real in this stage. - saluki: SalukiConfiguration, + /// The atomically-swappable current translated model. + current: Arc>, + + /// Typed dynamic config handles for components. + handles: DynamicConfigHandles, } impl ConfigurationSystem { @@ -63,12 +70,16 @@ impl ConfigurationSystem { &self.config } - /// Returns an owned clone of the translated master [`SalukiConfiguration`]. + /// Returns an owned clone of the current [`SalukiConfiguration`]. /// - /// The clone is cheap relative to startup cost and lets a flipped helper take a fully owned - /// slice. In this stage only `components.dogstatsd.source` carries real translated values. + /// After dynamic updates the clone reflects the latest accepted translation. pub fn saluki(&self) -> SalukiConfiguration { - self.saluki.clone() + (**self.current.load()).clone() + } + + /// Returns the typed dynamic config handles for components. + pub fn dynamic_handles(&self) -> &DynamicConfigHandles { + &self.handles } } @@ -103,9 +114,13 @@ impl ConfigurationSystemLoader { let saluki = translate(&saluki_only, &datadog).error_context("Failed to translate configuration.")?; + let current = Arc::new(ArcSwap::from_pointee(saluki)); + let handles = dynamic::start_dynamic_router(&self.config, ¤t, &saluki_only); + Ok(ConfigurationSystem { config: self.config, - saluki, + current, + handles, }) } } diff --git a/lib/agent-data-plane-config-system/src/translate/datadog/mod.rs b/lib/agent-data-plane-config-system/src/translate/datadog/mod.rs index 4739b27387..6da07eedbb 100644 --- a/lib/agent-data-plane-config-system/src/translate/datadog/mod.rs +++ b/lib/agent-data-plane-config-system/src/translate/datadog/mod.rs @@ -232,8 +232,17 @@ impl DatadogConfigConsumer for Translator { fn consume_log_format_rfc3339(&mut self, _value: bool) {} fn consume_log_level(&mut self, _value: String) {} fn consume_log_payloads(&mut self, _value: bool) {} - fn consume_metric_filterlist(&mut self, _value: Vec) {} - fn consume_metric_filterlist_match_prefix(&mut self, _value: bool) {} + fn consume_metric_filterlist(&mut self, value: Vec) { + self.saluki.components.dogstatsd.prefix_filter.metric_filterlist = value; + } + + fn consume_metric_filterlist_match_prefix(&mut self, value: bool) { + self.saluki + .components + .dogstatsd + .prefix_filter + .metric_filterlist_match_prefix = value; + } fn consume_min_tls_version(&mut self, _value: String) {} fn consume_multi_region_failover_api_key(&mut self, _value: String) {} fn consume_multi_region_failover_dd_url(&mut self, _value: String) {} @@ -290,10 +299,25 @@ impl DatadogConfigConsumer for Translator { self.saluki.components.dogstatsd.source.statsd_forward_port = value.clamp(0, u16::MAX as i64) as u16; } - fn consume_statsd_metric_blocklist(&mut self, _value: Vec) {} - fn consume_statsd_metric_blocklist_match_prefix(&mut self, _value: bool) {} - fn consume_statsd_metric_namespace(&mut self, _value: String) {} - fn consume_statsd_metric_namespace_blacklist(&mut self, _value: Vec) {} + fn consume_statsd_metric_blocklist(&mut self, value: Vec) { + self.saluki.components.dogstatsd.prefix_filter.metric_blocklist = value; + } + + fn consume_statsd_metric_blocklist_match_prefix(&mut self, value: bool) { + self.saluki + .components + .dogstatsd + .prefix_filter + .metric_blocklist_match_prefix = value; + } + + fn consume_statsd_metric_namespace(&mut self, value: String) { + self.saluki.components.dogstatsd.prefix_filter.metric_prefix = value; + } + + fn consume_statsd_metric_namespace_blacklist(&mut self, value: Vec) { + self.saluki.components.dogstatsd.prefix_filter.metric_prefix_blocklist = value; + } fn consume_syslog_rfc(&mut self, _value: bool) {} fn consume_syslog_uri(&mut self, _value: String) {} fn consume_use_proxy_for_cloud_metadata(&mut self, _value: bool) {} diff --git a/lib/agent-data-plane-config-system/src/translate/mod.rs b/lib/agent-data-plane-config-system/src/translate/mod.rs index 5a5dc003a9..190171d075 100644 --- a/lib/agent-data-plane-config-system/src/translate/mod.rs +++ b/lib/agent-data-plane-config-system/src/translate/mod.rs @@ -20,7 +20,6 @@ use datadog_agent_config::{drive, DatadogConfiguration, TranslateError}; /// /// The accumulator composes the output type [`SalukiConfiguration`], along with accumulated /// translation errors. -#[allow(dead_code)] pub(crate) struct Translator { /// The in-progress native model. Seeded from the Saluki-only base, then overlaid by the drive. pub(crate) saluki: SalukiConfiguration, @@ -35,7 +34,6 @@ impl Translator { /// `base` is the lowest-precedence starting point (typically `saluki_only.seed()`). The Datadog /// drive overlays its schema fields on top, so the base only usefully carries disjoint /// Saluki-schema-only fields. - #[allow(dead_code)] pub(crate) fn new(base: SalukiConfiguration) -> Self { Self { saluki: base, @@ -44,13 +42,11 @@ impl Translator { } /// Returns the finished native model. - #[allow(dead_code)] pub(crate) fn finish(self) -> SalukiConfiguration { self.saluki } /// Records a semantic translation error. The first recorded error is surfaced by `drive`. - #[allow(dead_code)] pub(crate) fn record_error(&mut self, e: TranslateError) { self.errors.push(e); } @@ -66,7 +62,6 @@ impl Translator { /// Returns the first [`TranslateError`] recorded while consuming a witnessed value (for example, a /// value that cannot be parsed into its native destination). Callers decide policy: startup bails; /// a dynamic update is rejected and the last-good config retained. -#[allow(dead_code)] pub(crate) fn translate( saluki_only: &SalukiOnlyConfiguration, datadog: &DatadogConfiguration, ) -> Result { @@ -149,4 +144,43 @@ mod tests { }; assert!(translate(&saluki_only, &datadog).is_err()); } + + #[test] + fn drive_overlays_prefix_filter_slice() { + let saluki_only = SalukiOnlyConfiguration::default(); + let datadog = DatadogConfiguration { + statsd_metric_namespace: "myapp".to_string(), + statsd_metric_namespace_blacklist: vec!["jvm".to_string()], + metric_filterlist: vec!["allowed.metric".to_string()], + metric_filterlist_match_prefix: true, + statsd_metric_blocklist: vec!["blocked.metric".to_string()], + statsd_metric_blocklist_match_prefix: true, + ..Default::default() + }; + + let config = translate(&saluki_only, &datadog).expect("translation succeeds"); + let pf = &config.components.dogstatsd.prefix_filter; + assert_eq!(pf.metric_prefix, "myapp"); + assert_eq!(pf.metric_prefix_blocklist, vec!["jvm"]); + assert_eq!(pf.metric_filterlist, vec!["allowed.metric"]); + assert!(pf.metric_filterlist_match_prefix); + assert_eq!(pf.metric_blocklist, vec!["blocked.metric"]); + assert!(pf.metric_blocklist_match_prefix); + } + + #[test] + fn default_namespace_blacklist_reaches_prefix_filter() { + let saluki_only = SalukiOnlyConfiguration::default(); + let datadog = DatadogConfiguration::default(); + let config = translate(&saluki_only, &datadog).expect("translation succeeds"); + assert!( + !config + .components + .dogstatsd + .prefix_filter + .metric_prefix_blocklist + .is_empty(), + "generated default blocklist should flow through" + ); + } } diff --git a/lib/agent-data-plane-config/src/dogstatsd.rs b/lib/agent-data-plane-config/src/dogstatsd.rs index c19cad7c8f..7ab7cdb4d1 100644 --- a/lib/agent-data-plane-config/src/dogstatsd.rs +++ b/lib/agent-data-plane-config/src/dogstatsd.rs @@ -16,4 +16,7 @@ pub struct Config { /// DogStatsD source configuration (listeners, parser/decoding options). pub source: saluki_component_config::dogstatsd::SourceConfig, + + /// DogStatsD prefix and listener-side metric filter configuration. + pub prefix_filter: saluki_component_config::dogstatsd::PrefixFilterConfig, } diff --git a/lib/saluki-component-config/src/dogstatsd.rs b/lib/saluki-component-config/src/dogstatsd.rs index 4a8f52425d..22bc589412 100644 --- a/lib/saluki-component-config/src/dogstatsd.rs +++ b/lib/saluki-component-config/src/dogstatsd.rs @@ -302,3 +302,44 @@ impl Default for OriginEnrichmentConfiguration { } } } + +/// Configuration data for the DogStatsD metric prefix and listener-side metric filter. +/// +/// Plain data only: no behavior, no runtime handles, no `Deserialize`. The translated-config system +/// builds this field-by-field from Datadog keys; it is never deserialized into directly. +/// +/// `metric_prefix_blocklist` keeps the existing component name (it is an exemption list for +/// prefixing, not a list of metrics to drop). Datadog defaults for this field, including the long +/// default list from `statsd_metric_namespace_blacklist`, come from the witness drive. +#[derive(Clone, Debug, Default, Eq, PartialEq, serde::Serialize)] +pub struct PrefixFilterConfig { + /// The metric namespace prefix prepended to every metric name. + /// + /// Defaults to empty (no prefix). + pub metric_prefix: String, + + /// Metric name prefixes exempt from namespace prefixing. + /// + /// Defaults to empty; Datadog defaults come from the witness drive. + pub metric_prefix_blocklist: Vec, + + /// The metric allowlist. When non-empty, only metrics matching this list are forwarded. + /// + /// Defaults to empty (all metrics forwarded). + pub metric_filterlist: Vec, + + /// Whether `metric_filterlist` entries match as a prefix rather than exact. + /// + /// Defaults to `false`. + pub metric_filterlist_match_prefix: bool, + + /// The metric blocklist. Metrics matching this list are dropped. + /// + /// Defaults to empty (no metrics dropped). + pub metric_blocklist: Vec, + + /// Whether `metric_blocklist` entries match as a prefix rather than exact. + /// + /// Defaults to `false`. + pub metric_blocklist_match_prefix: bool, +} diff --git a/lib/saluki-component-config/src/lib.rs b/lib/saluki-component-config/src/lib.rs index ddc612e19c..931d421025 100644 --- a/lib/saluki-component-config/src/lib.rs +++ b/lib/saluki-component-config/src/lib.rs @@ -29,5 +29,7 @@ pub mod dogstatsd; pub mod dynamic; -pub use self::dogstatsd::{EnablePayloadsConfiguration, OriginEnrichmentConfiguration, SourceConfig}; +pub use self::dogstatsd::{ + EnablePayloadsConfiguration, OriginEnrichmentConfiguration, PrefixFilterConfig, SourceConfig, +}; pub use self::dynamic::DynamicConfig; diff --git a/test/integration/cases/adp-config-dynamic-prefix-filter/config.yaml b/test/integration/cases/adp-config-dynamic-prefix-filter/config.yaml new file mode 100644 index 0000000000..7839e9adaf --- /dev/null +++ b/test/integration/cases/adp-config-dynamic-prefix-filter/config.yaml @@ -0,0 +1,43 @@ +type: integration +name: "adp-config-dynamic-prefix-filter" +description: "Verifies a Core Agent runtime config mutation propagates through the typed translation pipeline to /config/internal." +timeout: 120s +runtimes: [linux, mac, windows] + +env: + DD_API_KEY: "00000000000000000000000000000000" + DD_HOSTNAME: "integration-test-dynamic-prefix-filter" + DD_DATA_PLANE_ENABLED: "true" + DD_DATA_PLANE_STANDALONE_MODE: "false" + DD_DATA_PLANE_USE_NEW_CONFIG_STREAM_ENDPOINT: "true" + DD_DATA_PLANE_DOGSTATSD_ENABLED: "true" + DD_LOG_LEVEL: "info" + +container: + exposed_ports: + - "55101/tcp" + +procedure: + - assertion: adp_config_key_equals + key: components.dogstatsd.prefix_filter.metric_filterlist + value: [] + endpoint: "https://localhost:55101/config/internal" + timeout: 60s + - action: core_agent_config_set + key: metric_filterlist + value: ["test.prefix"] + - assertion: adp_config_key_equals + key: components.dogstatsd.prefix_filter.metric_filterlist + value: ["test.prefix"] + endpoint: "https://localhost:55101/config/internal" + timeout: 60s + - parallel: + - assertion: process_stable_for + duration: 10s + - assertion: log_not_contains + pattern: "Error while reading config event stream" + during: 10s + - assertion: log_not_contains + pattern: "panic|PANIC" + regex: true + during: 10s