diff --git a/lib/saluki-components/src/common/datadog/config.rs b/lib/saluki-components/src/common/datadog/config.rs index ac9a94a2802..fac9fd8a431 100644 --- a/lib/saluki-components/src/common/datadog/config.rs +++ b/lib/saluki-components/src/common/datadog/config.rs @@ -1,7 +1,7 @@ use std::time::Duration; use facet::Facet; -use saluki_config::GenericConfiguration; +use saluki_config::{DurationString, GenericConfiguration}; use saluki_error::GenericError; use saluki_io::net::client::http::{HttpProtocol, TlsMinimumVersion}; use serde::Deserialize; @@ -25,6 +25,10 @@ const fn default_endpoint_buffer_size() -> usize { 16 } +const fn default_tls_handshake_timeout() -> DurationString { + DurationString::new(Duration::from_secs(10)) +} + const fn default_forwarder_connection_reset_interval() -> u64 { 0 } @@ -166,6 +170,14 @@ pub struct ForwarderConfiguration { #[serde(default = "default_request_timeout_secs", rename = "forwarder_timeout")] request_timeout_secs: u64, + /// TLS handshake timeout. + /// + /// Defaults to 10 seconds. If the TLS handshake does not complete within this duration after the + /// TCP connection is established, the connection attempt fails with a timeout error. + #[serde(default = "default_tls_handshake_timeout", rename = "tls_handshake_timeout")] + #[facet(opaque)] + tls_handshake_timeout: DurationString, + /// Maximum number of pending requests for an individual endpoint. /// /// Defaults to 16. @@ -253,6 +265,11 @@ impl ForwarderConfiguration { Duration::from_secs(self.request_timeout_secs) } + /// Returns the TLS handshake timeout. + pub const fn tls_handshake_timeout(&self) -> Duration { + self.tls_handshake_timeout.as_duration() + } + /// Returns the maximum number of pending requests for an individual endpoint. pub const fn endpoint_buffer_size(&self) -> usize { self.endpoint_buffer_size @@ -474,6 +491,21 @@ mod tests { assert_eq!(proxies[0].uri().to_string(), PROXY_B_URI); } + #[tokio::test] + async fn tls_handshake_timeout_accepts_duration_string() { + let config = + forwarder_config_from(config_with(serde_json::json!({ "tls_handshake_timeout": "10s" })), None).await; + + assert_eq!(config.tls_handshake_timeout(), Duration::from_secs(10)); + } + + #[tokio::test] + async fn tls_handshake_timeout_defaults_to_10_seconds() { + let config = forwarder_config_from(base_config(), None).await; + + assert_eq!(config.tls_handshake_timeout(), Duration::from_secs(10)); + } + #[tokio::test] async fn forwarder_http_protocol_defaults_to_auto() { let config = forwarder_config_from(base_config(), None).await; diff --git a/lib/saluki-components/src/common/datadog/io.rs b/lib/saluki-components/src/common/datadog/io.rs index 26289e52f90..52575b403c3 100644 --- a/lib/saluki-components/src/common/datadog/io.rs +++ b/lib/saluki-components/src/common/datadog/io.rs @@ -161,6 +161,7 @@ where let endpoints = config.build_routable_endpoints(live_config.clone())?; let mut client_builder = HttpClient::builder() .with_request_timeout(config.request_timeout()) + .with_tls_handshake_timeout(config.tls_handshake_timeout()) .with_min_tls_version(config.min_tls_version()) .with_http_protocol(config.http_protocol()) .with_bytes_sent_counter(telemetry.bytes_sent().clone()) diff --git a/lib/saluki-components/src/config_registry/classifier.rs b/lib/saluki-components/src/config_registry/classifier.rs index b9b85833181..6b9484612df 100644 --- a/lib/saluki-components/src/config_registry/classifier.rs +++ b/lib/saluki-components/src/config_registry/classifier.rs @@ -117,7 +117,7 @@ mod tests { // tests or choose a different key. fn incompatible_non_default() { let c = classifier(); - let key = unsupported::TLS_HANDSHAKE_TIMEOUT.yaml_path(); + let key = unsupported::AGGREGATOR_BUFFER_SIZE.yaml_path(); let result = c.classify(key, &Value::Number(999.into())).unwrap(); assert!(matches!(result.support_level, SupportLevel::Incompatible(_))); assert!(!result.is_default); @@ -126,8 +126,8 @@ mod tests { #[test] fn incompatible_default() { let c = classifier(); - let ann = &unsupported::TLS_HANDSHAKE_TIMEOUT; - let result = c.classify(ann.yaml_path(), &Value::String("".into())).unwrap(); + let ann = &unsupported::AGGREGATOR_BUFFER_SIZE; + let result = c.classify(ann.yaml_path(), &Value::Number(100.into())).unwrap(); assert!(matches!(result.support_level, SupportLevel::Incompatible(_))); assert!(result.is_default); } diff --git a/lib/saluki-components/src/config_registry/datadog/forwarder.rs b/lib/saluki-components/src/config_registry/datadog/forwarder.rs index e34379a072d..21f249983f2 100644 --- a/lib/saluki-components/src/config_registry/datadog/forwarder.rs +++ b/lib/saluki-components/src/config_registry/datadog/forwarder.rs @@ -128,6 +128,18 @@ crate::declare_annotations! { pipeline_affinity: PipelineAffinity::CrossCutting, }; + /// `tls_handshake_timeout`—TLS handshake timeout as a duration string. + TLS_HANDSHAKE_TIMEOUT = SalukiAnnotation { + schema: &schema::TLS_HANDSHAKE_TIMEOUT, + support_level: SupportLevel::Full, + additional_yaml_paths: &[], + env_var_override: None, + used_by: &[structs::FORWARDER_CONFIGURATION], + value_type_override: Some(ValueType::String), + test_json: Some("\"30s\""), + pipeline_affinity: PipelineAffinity::CrossCutting, + }; + /// `forwarder_high_prio_buffer_size`—max pending requests per endpoint. Schema Float; field usize. FORWARDER_HIGH_PRIO_BUFFER_SIZE = SalukiAnnotation { schema: &schema::FORWARDER_HIGH_PRIO_BUFFER_SIZE, diff --git a/lib/saluki-components/src/config_registry/datadog/unsupported.rs b/lib/saluki-components/src/config_registry/datadog/unsupported.rs index b4b9a975ee8..32d1475adc8 100644 --- a/lib/saluki-components/src/config_registry/datadog/unsupported.rs +++ b/lib/saluki-components/src/config_registry/datadog/unsupported.rs @@ -248,20 +248,6 @@ crate::declare_annotations! { pipeline_affinity: PipelineAffinity::CrossCutting, }; - /// `tls_handshake_timeout` - HTTP TLS handshake timeout. - TLS_HANDSHAKE_TIMEOUT = SalukiAnnotation { - schema: &schema::TLS_HANDSHAKE_TIMEOUT, - // Not implemented. Request timeout covers the gap. #178 - support_level: SupportLevel::Incompatible(Severity::Medium), - additional_yaml_paths: &[], - env_var_override: None, - used_by: &[], - value_type_override: None, - test_json: None, - // TLS is process-wide. - pipeline_affinity: PipelineAffinity::CrossCutting, - }; - /// `aggregator_buffer_size` - aggregator input channel depth. AGGREGATOR_BUFFER_SIZE = SalukiAnnotation { schema: &schema::AGGREGATOR_BUFFER_SIZE, diff --git a/lib/saluki-io/Cargo.toml b/lib/saluki-io/Cargo.toml index 683131330ee..e4b59b64ece 100644 --- a/lib/saluki-io/Cargo.toml +++ b/lib/saluki-io/Cargo.toml @@ -79,5 +79,6 @@ tokio-vsock = { workspace = true } http-body-util = { workspace = true } proptest = { workspace = true } rand_distr = { workspace = true } +saluki-metrics = { workspace = true, features = ["test"] } tempfile = { workspace = true } tokio-test = { workspace = true } diff --git a/lib/saluki-io/src/net/client/http/client.rs b/lib/saluki-io/src/net/client/http/client.rs index 7cf2a797ed7..9cf3aec038e 100644 --- a/lib/saluki-io/src/net/client/http/client.rs +++ b/lib/saluki-io/src/net/client/http/client.rs @@ -171,6 +171,19 @@ impl HttpClientBuilder { self } + /// Sets the timeout for completing the TLS handshake after a connection is established. + /// + /// Defaults to 10 seconds. + /// + /// This timeout is independent of the per-request timeout set by [`Self::with_request_timeout`]. + /// When a new connection is needed, both timers run concurrently and whichever fires first + /// determines the outcome. Setting a short request timeout does not bound the TLS handshake + /// if the connection pool decides to open a new connection before the request timer starts. + pub fn with_tls_handshake_timeout(mut self, timeout: Duration) -> Self { + self.connector_builder = self.connector_builder.with_tls_handshake_timeout(timeout); + self + } + /// Sets the HTTP protocol selection for client connections. /// /// Defaults to [`HttpProtocol::Auto`], which automatically negotiates HTTP/2 with HTTP/1.1 fallback. diff --git a/lib/saluki-io/src/net/client/http/conn.rs b/lib/saluki-io/src/net/client/http/conn.rs index 6e76ebe141f..4dcf93b7b6e 100644 --- a/lib/saluki-io/src/net/client/http/conn.rs +++ b/lib/saluki-io/src/net/client/http/conn.rs @@ -1,25 +1,27 @@ +#[cfg(unix)] +use std::path::PathBuf; use std::{ future::Future, io, pin::Pin, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; -#[cfg(unix)] -use std::{path::PathBuf, sync::Arc}; use hickory_resolver::net::NetError; use http::{Extensions, Uri}; -use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder, MaybeHttpsStream}; +use hyper_rustls::MaybeHttpsStream; use hyper_util::{ client::legacy::connect::{CaptureConnection, Connected, Connection, HttpConnector}, rt::TokioIo, }; use metrics::Counter; use pin_project_lite::pin_project; -use rustls::ClientConfig; +use rustls::{pki_types::ServerName, ClientConfig}; use saluki_error::{ErrorContext as _, GenericError}; use tokio::net::TcpStream; +use tokio_rustls::TlsConnector; #[cfg(target_os = "linux")] use tokio_vsock::{VsockAddr, VsockStream}; use tower::{BoxError, Service}; @@ -362,7 +364,9 @@ pub enum HttpProtocol { /// A connector that supports HTTP or HTTPS. #[derive(Clone)] pub struct HttpsCapableConnector { - inner: HttpsConnector, + inner: InnerConnector, + tls_config: Arc, + tls_handshake_timeout: Duration, bytes_sent: Option, error_telemetry: Option, conn_age_limit: Option, @@ -378,27 +382,57 @@ impl Service for HttpsCapableConnector { } fn call(&mut self, dst: Uri) -> Self::Future { - let inner = self.inner.call(dst); + let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS); + let inner_fut = self.inner.call(dst.clone()); + let tls_config = self.tls_config.clone(); + let tls_handshake_timeout = self.tls_handshake_timeout; let bytes_sent = self.bytes_sent.clone(); let error_telemetry = self.error_telemetry.clone(); let conn_age_limit = self.conn_age_limit; + Box::pin(async move { - match inner.await { - Ok(inner) => Ok(HttpsCapableConnection { - inner, - bytes_sent, - error_telemetry, - conn_age_limit, - }), - Err(error) => { - if is_tls_error(error.as_ref()) { - if let Some(error_telemetry) = &error_telemetry { - error_telemetry.increment_tls_error(); - } - } - Err(error) + let transport = inner_fut.await?; + + let stream = if is_https { + let host = dst.host().unwrap_or_default(); + let mut hostname = host; + if let Some(trimmed) = hostname.strip_prefix('[').and_then(|h| h.strip_suffix(']')) { + hostname = trimmed; } - } + let server_name = ServerName::try_from(hostname) + .map_err(|e| BoxError::from(io::Error::other(e)))? + .to_owned(); + + let tls_connector = TlsConnector::from(tls_config); + let tls_stream = tokio::time::timeout( + tls_handshake_timeout, + tls_connector.connect(server_name, TokioIo::new(transport)), + ) + .await + .map_err(|_| { + if let Some(error_telemetry) = &error_telemetry { + error_telemetry.increment_tls_error(); + } + BoxError::from(io::Error::new(io::ErrorKind::TimedOut, "TLS handshake timed out")) + })? + .map_err(|e| { + if let Some(error_telemetry) = &error_telemetry { + error_telemetry.increment_tls_error(); + } + BoxError::from(io::Error::other(e)) + })?; + + MaybeHttpsStream::Https(TokioIo::new(tls_stream)) + } else { + MaybeHttpsStream::Http(transport) + }; + + Ok(HttpsCapableConnection { + inner: stream, + bytes_sent, + error_telemetry, + conn_age_limit, + }) }) } } @@ -418,6 +452,7 @@ fn build_dns_resolver( #[derive(Default)] pub struct HttpsCapableConnectorBuilder { connect_timeout: Option, + tls_handshake_timeout: Option, bytes_sent: Option, error_telemetry: Option, conn_age_limit: Option, @@ -437,6 +472,14 @@ impl HttpsCapableConnectorBuilder { self } + /// Sets the timeout for completing the TLS handshake after a connection is established. + /// + /// Defaults to 10 seconds. + pub fn with_tls_handshake_timeout(mut self, timeout: Duration) -> Self { + self.tls_handshake_timeout = Some(timeout); + self + } + /// Sets the HTTP protocol selection for client connections. /// /// Defaults to [`HttpProtocol::Auto`]. @@ -503,8 +546,9 @@ impl HttpsCapableConnectorBuilder { } /// Builds the `HttpsCapableConnector` from the given TLS configuration. - pub fn build(self, tls_config: ClientConfig) -> Result { + pub fn build(self, mut tls_config: ClientConfig) -> Result { let connect_timeout = self.connect_timeout.unwrap_or(Duration::from_secs(30)); + let tls_handshake_timeout = self.tls_handshake_timeout.unwrap_or(Duration::from_secs(10)); // On Linux with vsock configured, the DNS resolver is never called — vsock connections // bypass the TCP/DNS stack entirely. Use a noop resolver to avoid failures in environments @@ -537,17 +581,15 @@ impl HttpsCapableConnectorBuilder { vsock_addr: self.vsock_addr, }; - // Create the HTTPS connector. - let https_connector_builder = HttpsConnectorBuilder::new().with_tls_config(tls_config).https_or_http(); - let https_connector = match self.http_protocol { - HttpProtocol::Auto => https_connector_builder - .enable_all_versions() - .wrap_connector(inner_connector), - HttpProtocol::Http1 => https_connector_builder.enable_http1().wrap_connector(inner_connector), + tls_config.alpn_protocols = match self.http_protocol { + HttpProtocol::Auto => vec![b"h2".to_vec(), b"http/1.1".to_vec()], + HttpProtocol::Http1 => vec![b"http/1.1".to_vec()], }; Ok(HttpsCapableConnector { - inner: https_connector, + inner: inner_connector, + tls_config: Arc::new(tls_config), + tls_handshake_timeout, bytes_sent: self.bytes_sent, error_telemetry: self.error_telemetry, conn_age_limit: self.conn_age_limit, @@ -555,31 +597,6 @@ impl HttpsCapableConnectorBuilder { } } -#[cfg(test)] -fn configure_tls_alpn_for_http_protocol(mut tls_config: ClientConfig, protocol: HttpProtocol) -> ClientConfig { - match protocol { - HttpProtocol::Auto => { - tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - } - HttpProtocol::Http1 => { - tls_config.alpn_protocols.clear(); - } - } - - tls_config -} - -fn is_tls_error(error: &(dyn std::error::Error + 'static)) -> bool { - let mut current = Some(error); - while let Some(error) = current { - if error.downcast_ref::().is_some() { - return true; - } - current = error.source(); - } - false -} - fn is_dns_error(error: &(dyn std::error::Error + 'static)) -> bool { let mut current = Some(error); while let Some(error) = current { @@ -611,9 +628,15 @@ pub(super) fn check_connection_state(captured_conn: CaptureConnection) { #[cfg(test)] mod tests { - use super::{configure_tls_alpn_for_http_protocol, HttpProtocol}; + use std::time::Duration; - fn empty_tls_config() -> rustls::ClientConfig { + use saluki_metrics::{test::TestRecorder, MetricsBuilder}; + use tower::Service as _; + + use super::{HttpProtocol, HttpsCapableConnectorBuilder}; + use crate::net::client::http::telemetry::HttpTransactionErrorTelemetry; + + fn test_tls_config() -> rustls::ClientConfig { rustls::ClientConfig::builder_with_provider(rustls::crypto::aws_lc_rs::default_provider().into()) .with_safe_default_protocol_versions() .expect("AWS-LC default protocol versions should be valid") @@ -621,18 +644,88 @@ mod tests { .with_no_client_auth() } - #[test] - fn auto_protocol_advertises_h2_and_http1_alpn() { - let tls_config = configure_tls_alpn_for_http_protocol(empty_tls_config(), HttpProtocol::Auto); + #[tokio::test] + async fn tls_handshake_timeout_fires() { + use tokio::net::TcpListener; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + // Accept the TCP connection but never send TLS data, simulating a stalled handshake. + tokio::time::sleep(Duration::from_secs(60)).await; + }); - assert_eq!(tls_config.alpn_protocols, vec![b"h2".to_vec(), b"http/1.1".to_vec()]); + let mut connector = HttpsCapableConnectorBuilder::default() + .with_tls_handshake_timeout(Duration::from_millis(100)) + .build(test_tls_config()) + .unwrap(); + + let uri: http::Uri = format!("https://127.0.0.1:{}/", addr.port()).parse().unwrap(); + let err = connector.call(uri).await.err().expect("expected a timeout error"); + assert!( + err.to_string().contains("timed out"), + "expected TLS handshake timeout error, got: {err}" + ); + } + + #[tokio::test] + async fn tls_handshake_timeout_increments_tls_error_telemetry() { + use tokio::net::TcpListener; + + let recorder = TestRecorder::default(); + let _recorder_guard = metrics::set_default_local_recorder(&recorder); + let metrics_builder = MetricsBuilder::default(); + let error_telemetry = HttpTransactionErrorTelemetry::from_builder(&metrics_builder); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + tokio::time::sleep(Duration::from_secs(60)).await; + }); + + let mut connector = HttpsCapableConnectorBuilder::default() + .with_tls_handshake_timeout(Duration::from_millis(100)) + .with_error_telemetry(error_telemetry) + .build(test_tls_config()) + .unwrap(); + + let uri: http::Uri = format!("https://127.0.0.1:{}/", addr.port()).parse().unwrap(); + let _ = connector.call(uri).await.err().expect("expected a timeout error"); + + assert_eq!( + recorder.counter(( + "network_http_requests_errors_total", + &[("error_type", "tls_error"), ("error_scope", "phase")], + )), + Some(1) + ); + } + + #[test] + fn auto_http_protocol_advertises_h2_and_http1_alpn() { + let connector = HttpsCapableConnectorBuilder::default() + .with_http_protocol(HttpProtocol::Auto) + .build(test_tls_config()) + .unwrap(); + + assert_eq!( + connector.tls_config.alpn_protocols, + vec![b"h2".to_vec(), b"http/1.1".to_vec()] + ); } #[test] - fn http1_protocol_leaves_alpn_empty() { - let tls_config = configure_tls_alpn_for_http_protocol(empty_tls_config(), HttpProtocol::Http1); + fn http1_protocol_advertises_http1_alpn() { + let connector = HttpsCapableConnectorBuilder::default() + .with_http_protocol(HttpProtocol::Http1) + .build(test_tls_config()) + .unwrap(); - assert!(tls_config.alpn_protocols.is_empty()); + assert_eq!(connector.tls_config.alpn_protocols, vec![b"http/1.1".to_vec()]); } // vsock takes priority over unix when both are configured, matching Agent behavior. @@ -643,8 +736,6 @@ mod tests { async fn vsock_takes_priority_over_unix_when_both_set() { use std::sync::Arc; - use tower::Service as _; - use super::{InnerConnector, VsockAddr}; use crate::net::dns::HickoryResolver;