Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/agent-data-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fips = ["saluki-app/tls-fips", "saluki-components/fips"]
antithesis = ["dep:antithesis_sdk", "antithesis_sdk/full", "dep:antithesis-instrumentation", "saluki-components/antithesis"]

[dependencies]
agent-data-plane-config-system = { workspace = true }
antithesis-instrumentation = { workspace = true, optional = true }
antithesis_sdk = { workspace = true, optional = true }
argh = { workspace = true, features = ["help"] }
Expand Down
34 changes: 24 additions & 10 deletions bin/agent-data-plane/src/cli/run.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{
collections::HashSet,
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};

use agent_data_plane_config_system::ConfigurationSystem;
use argh::FromArgs;
use datadog_agent_commons::platform::PlatformSettings;
use datadog_agent_config::classifier::{ConfigClassifier, Pipeline, PipelineAffinity, Severity, SupportLevel};
Expand Down Expand Up @@ -154,22 +156,29 @@ pub async fn handle_run_command(
return Ok(());
}

// The configuration system takes ownership of the now-final configuration, establishing it as
// the single source. Components that have not yet been flipped onto typed config read through
// `raw_map()`, which stays live in remote-agent stream mode exactly as before.
let system = Arc::new(ConfigurationSystem::load(config).start().await?);
let config = system.raw_map();

let active_pipelines = active_pipelines(&dp_config);
check_and_warn_config(&config, &active_pipelines).error_context("Incompatible configuration detected.")?;
check_and_warn_config(config, &active_pipelines).error_context("Incompatible configuration detected.")?;

// Set up all of the building blocks for building our topologies and launching internal processes.
let component_registry = ComponentRegistry::default();
let health_registry = HealthRegistry::new();
let (env_provider, maybe_env_supervisor) =
ADPEnvironmentProvider::from_configuration(&config, &dp_config, &component_registry, &health_registry).await?;
ADPEnvironmentProvider::from_configuration(config, &dp_config, &component_registry, &health_registry).await?;

// Create the blueprint for our primary topology.
let (mut blueprint, control_surfaces) =
create_topology(&config, &dp_config, &env_provider, &component_registry).await?;
create_topology(&system, &dp_config, &env_provider, &component_registry).await?;

// Create the internal supervisor which drives our control plane and internal observability.
let mut internal_supervisor = create_internal_supervisor(
&config,
config,
Arc::clone(&system),
&dp_config,
&component_registry,
health_registry.clone(),
Expand All @@ -181,7 +190,7 @@ pub async fn handle_run_command(
.error_context("Failed to create internal supervisor.")?;

// Run memory bounds validation to ensure that we can launch the topology with our configured memory limit, if any.
let bounds_config = MemoryBoundsConfiguration::try_from_config(&config)?;
let bounds_config = MemoryBoundsConfiguration::try_from_config(config)?;
let memory_limiter = initialize_memory_bounds(bounds_config, component_registry.root())?;

if let Ok(val) = std::env::var("DD_ADP_WRITE_SIZING_GUIDE") {
Expand Down Expand Up @@ -358,9 +367,12 @@ fn is_a_pipeline_affected(active_pipelines: &HashSet<Pipeline>, pipeline_affinit
}

async fn create_topology(
config: &GenericConfiguration, dp_config: &DataPlaneConfiguration, env_provider: &ADPEnvironmentProvider,
system: &ConfigurationSystem, dp_config: &DataPlaneConfiguration, env_provider: &ADPEnvironmentProvider,
component_registry: &ComponentRegistry,
) -> Result<(TopologyBlueprint, TopologyControlSurfaces), GenericError> {
// Un-flipped components read the raw map; the DogStatsD helper takes the whole system so it can
// reach typed config for the flipped source.
let config = system.raw_map();
let mut blueprint = TopologyBlueprint::new("primary", component_registry);
blueprint.with_shutdown_timeout(dp_config.stop_timeout());

Expand Down Expand Up @@ -417,7 +429,7 @@ async fn create_topology(
}

if dp_config.dogstatsd().enabled() {
let dsd_control_surface = add_dsd_pipeline_to_blueprint(&mut blueprint, config, env_provider).await?;
let dsd_control_surface = add_dsd_pipeline_to_blueprint(&mut blueprint, system, env_provider).await?;
control_surfaces.attach_dogstatsd(dsd_control_surface);
}

Expand Down Expand Up @@ -655,8 +667,11 @@ async fn add_baseline_traces_pipeline_to_blueprint(
}

async fn add_dsd_pipeline_to_blueprint(
blueprint: &mut TopologyBlueprint, config: &GenericConfiguration, env_provider: &ADPEnvironmentProvider,
blueprint: &mut TopologyBlueprint, system: &ConfigurationSystem, env_provider: &ADPEnvironmentProvider,
) -> Result<DogStatsDControlSurface, GenericError> {
// The whole helper reads the raw map. The signature takes the system so each sub-component can
// move onto typed config independently without further changing the parameter list.
let config = system.raw_map();
// We're creating the "front half" of the DogStatsD pipeline, which deals solely with accepting DogStatsD payloads,
// and enriching/processing them in DSD-specific ways, relevant to how the Datadog Agent is expected to behave.
//
Expand Down Expand Up @@ -692,8 +707,7 @@ async fn add_dsd_pipeline_to_blueprint(
// │ (destination) │ │ (Datadog Platform) │
// └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘

let dsd_config = DogStatsDConfiguration::from_configuration(config)
.error_context("Failed to configure DogStatsD source.")?
let dsd_config = DogStatsDConfiguration::new(system.saluki().components.dogstatsd.source)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve the run_path capture fallback

In deployments that rely on run_path for the default DogStatsD capture directory, this new construction path skips the old from_configuration fixup that derived run_path/dsd_capture when dogstatsd_capture_path was empty. The translator now leaves the default as an empty PathBuf, and TrafficCapture rejects implicit captures with no configured path, so agent-data-plane dogstatsd capture without an explicit path regresses. Preserve the run_path fallback in the translated/new path.

Useful? React with 👍 / 👎.

.with_workload_provider(env_provider.workload().clone())
.with_capture_entity_resolver(env_provider.workload().clone());
let dsd_prefix_filter_configuration = DogStatsDPrefixFilterConfiguration::from_configuration(config)?;
Expand Down
109 changes: 109 additions & 0 deletions bin/agent-data-plane/src/internal/config_internal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//! Internal configuration API handler.
//!
//! Serves the ADP-native [`SalukiConfiguration`] as JSON on the privileged `/config/internal`
//! route. This is the observable surface used to verify translation end-to-end, and a secondary
//! operator-debugging aid alongside the source-shaped `/config` route.
//!
//! The handler holds the shared [`ConfigurationSystem`] and reads the translated master per request,
//! so it automatically reflects any later re-translation without changing this worker. The body is
//! served raw, without scrubbing, exactly like the existing privileged `/config` route; scrubbing
//! is a client-side display concern.
//!
//! [`SalukiConfiguration`]: agent-data-plane-config

use std::sync::Arc;

use agent_data_plane_config_system::ConfigurationSystem;
use async_trait::async_trait;
use http::StatusCode;
use saluki_api::{
extract::State,
response::IntoResponse,
routing::{get, Router},
APIHandler, DynamicRoute, EndpointType,
};
use saluki_common::sync::shutdown::ShutdownHandle;
use saluki_core::runtime::{state::DataspaceRegistry, InitializationError, Supervisable, SupervisorFuture};
use saluki_error::generic_error;

/// State for the internal configuration API handler.
#[derive(Clone)]
pub struct InternalConfigState {
system: Arc<ConfigurationSystem>,
}

/// An API handler for returning the ADP-native configuration.
///
/// Exposes a single route -- `/config/internal` -- that serializes the translated
/// `SalukiConfiguration` to JSON.
pub struct InternalConfigAPIHandler {
state: InternalConfigState,
}

impl InternalConfigAPIHandler {
fn new(system: Arc<ConfigurationSystem>) -> Self {
Self {
state: InternalConfigState { system },
}
}

async fn config_handler(State(state): State<InternalConfigState>) -> impl IntoResponse {
match serde_json::to_string(&state.system.saluki()) {
Ok(body) => (StatusCode::OK, body).into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to serialize configuration: {}", e),
)
.into_response(),
}
}
}

impl APIHandler for InternalConfigAPIHandler {
type State = InternalConfigState;

fn generate_initial_state(&self) -> Self::State {
self.state.clone()
}

fn generate_routes(&self) -> Router<Self::State> {
Router::new().route("/config/internal", get(Self::config_handler))
}
}

/// A worker for exposing the ADP-native configuration.
///
/// Asserts the `/config/internal` route on the privileged API endpoint. As the configuration may
/// contain sensitive data, the route is only present on the privileged endpoint.
pub struct InternalConfigWorker {
handler: InternalConfigAPIHandler,
}

impl InternalConfigWorker {
/// Creates a new [`InternalConfigWorker`] backed by the shared configuration system.
pub fn new(system: Arc<ConfigurationSystem>) -> Self {
Self {
handler: InternalConfigAPIHandler::new(system),
}
}
}

#[async_trait]
impl Supervisable for InternalConfigWorker {
fn name(&self) -> &str {
"config-internal-api"
}

async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
let config_route = DynamicRoute::http(EndpointType::Privileged, &self.handler);

Ok(Box::pin(async move {
DataspaceRegistry::try_current()
.ok_or_else(|| generic_error!("Dataspace not available."))?
.assert(config_route, "config-internal-api");

process_shutdown.await;
Ok(())
}))
}
}
12 changes: 8 additions & 4 deletions bin/agent-data-plane/src/internal/control_plane.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::Arc;

use agent_data_plane_config_system::ConfigurationSystem;
use datadog_agent_commons::ipc::{config::IpcAuthConfiguration, tls::build_ipc_server_tls_config};
use resource_accounting::ComponentRegistry;
use saluki_api::EndpointType;
Expand All @@ -15,8 +18,8 @@ use saluki_error::GenericError;
use crate::{
config::DataPlaneConfiguration,
internal::{
logging::DynamicLogLevelWorker, remote_agent::RemoteAgentBootstrap, telemetry::InternalTelemetryAPIWorker,
TopologyControlSurfaces,
config_internal::InternalConfigWorker, logging::DynamicLogLevelWorker, remote_agent::RemoteAgentBootstrap,
telemetry::InternalTelemetryAPIWorker, TopologyControlSurfaces,
},
};

Expand All @@ -31,8 +34,8 @@ use crate::{
///
/// If the supervisor can't be created, an error is returned.
pub async fn create_control_plane_supervisor(
config: &GenericConfiguration, dp_config: &DataPlaneConfiguration, component_registry: &ComponentRegistry,
health_registry: HealthRegistry, control_surfaces: TopologyControlSurfaces,
config: &GenericConfiguration, system: Arc<ConfigurationSystem>, dp_config: &DataPlaneConfiguration,
component_registry: &ComponentRegistry, health_registry: HealthRegistry, control_surfaces: TopologyControlSurfaces,
ra_bootstrap: Option<RemoteAgentBootstrap>, logging_controller: LoggingOverrideController,
) -> Result<Supervisor, GenericError> {
let mut supervisor = Supervisor::new("ctrl-pln")?
Expand All @@ -44,6 +47,7 @@ pub async fn create_control_plane_supervisor(
supervisor.add_worker(InternalTelemetryAPIWorker::new());
supervisor.add_worker(DynamicLogLevelWorker::new(config, logging_controller));
supervisor.add_worker(ConfigWorker::new(config.clone()));
supervisor.add_worker(InternalConfigWorker::new(system));

supervisor.add_worker(DynamicAPIBuilder::new(
EndpointType::Unprivileged,
Expand Down
10 changes: 8 additions & 2 deletions bin/agent-data-plane/src/internal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::Arc;

use agent_data_plane_config_system::ConfigurationSystem;
use resource_accounting::ComponentRegistry;
use saluki_app::logging::LoggingOverrideController;
use saluki_config_tools::GenericConfiguration;
Expand All @@ -7,6 +10,8 @@ use saluki_error::GenericError;

use crate::config::DataPlaneConfiguration;

mod config_internal;

mod control_plane;
pub use self::control_plane::create_control_plane_supervisor;

Expand Down Expand Up @@ -35,8 +40,8 @@ mod telemetry;
///
/// If the supervisor can't be created, an error is returned.
pub async fn create_internal_supervisor(
config: &GenericConfiguration, dp_config: &DataPlaneConfiguration, component_registry: &ComponentRegistry,
health_registry: HealthRegistry, control_surfaces: TopologyControlSurfaces,
config: &GenericConfiguration, system: Arc<ConfigurationSystem>, dp_config: &DataPlaneConfiguration,
component_registry: &ComponentRegistry, health_registry: HealthRegistry, control_surfaces: TopologyControlSurfaces,
ra_bootstrap: Option<RemoteAgentBootstrap>, logging_controller: LoggingOverrideController,
) -> Result<Supervisor, GenericError> {
// The root supervisor runs in ambient mode (caller's runtime) since its children each have their own
Expand All @@ -48,6 +53,7 @@ pub async fn create_internal_supervisor(
root.add_worker(
create_control_plane_supervisor(
config,
system,
dp_config,
component_registry,
health_registry.clone(),
Expand Down
14 changes: 14 additions & 0 deletions lib/agent-data-plane-config-system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,22 @@ workspace = true

[dependencies]
agent-data-plane-config = { workspace = true }
bytesize = { workspace = true }
datadog-agent-config = { workspace = true }
saluki-component-config = { workspace = true }
saluki-config-tools = { workspace = true }
saluki-context = { workspace = true }
saluki-error = { workspace = true }
serde_json = { workspace = true }
stringtheory = { workspace = true }

[build-dependencies]
datadog-agent-config-overlay-model = { workspace = true }
indexmap = { workspace = true }

[dev-dependencies]
agent-data-plane-config = { workspace = true }
figment = { workspace = true }
saluki-config-tools = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt"] }
Loading
Loading