From ac6bfa1a8fa151e538230ffb292d8b85fcd5deac Mon Sep 17 00:00:00 2001 From: Hasan Khan Date: Sun, 28 Jun 2026 11:36:01 -0700 Subject: [PATCH 1/2] fix(ssh-console): recover conflicting IPMI SOL sessions Signed-off-by: Hasan Khan --- crates/ssh-console/src/bmc/client.rs | 144 +++- crates/ssh-console/src/bmc/client_pool.rs | 8 + crates/ssh-console/src/bmc/connection.rs | 6 + .../src/bmc/connection_impl/ipmi.rs | 766 ++++++++++++++++-- crates/ssh-console/tests/main.rs | 57 +- crates/ssh-console/tests/util/ipmi_sim.rs | 99 +++ 6 files changed, 992 insertions(+), 88 deletions(-) diff --git a/crates/ssh-console/src/bmc/client.rs b/crates/ssh-console/src/bmc/client.rs index 7f236dbbb2..89ebafbae3 100644 --- a/crates/ssh-console/src/bmc/client.rs +++ b/crates/ssh-console/src/bmc/client.rs @@ -142,6 +142,7 @@ impl BmcClient { // Connect and reconnect, in a loop, until the client is shut down. let mut retries = 0; + let mut previous_connection_close_was_sol_recovery = false; 'retry: loop { // Every retry after the first time, emit a disconnected message if was_disconnected { @@ -209,6 +210,7 @@ impl BmcClient { { Ok(handle) => handle, Err(error) => { + previous_connection_close_was_sol_recovery = false; tracing::error!( %error, %machine_id, @@ -259,10 +261,24 @@ impl BmcClient { // The connection should go forever, so if it doesn't, retry. res = connection_result.clone() => { let connection_time = try_start_time.elapsed(); - if connection_time > self.config.successful_connection_minimum_duration { - tracing::debug!(%machine_id, "last connection lasted {}s, resetting backoff to 0s", connection_time.as_secs()); + let recovered_conflicting_sol_session = res + .as_ref() + .is_err_and(|error| error.retry_immediately()); + if should_reset_retry_backoff( + connection_time, + self.config.successful_connection_minimum_duration, + &res, + previous_connection_close_was_sol_recovery, + ) { + if recovered_conflicting_sol_session { + tracing::debug!(%machine_id, "retrying immediately after IPMI SOL session recovery"); + } else { + tracing::debug!(%machine_id, "last connection lasted {}s, resetting backoff to 0s", connection_time.as_secs()); + } next_retry = Instant::now(); } + previous_connection_close_was_sol_recovery = + recovered_conflicting_sol_session; let error_string = res.err().map(|e| format!("{:?}", e.as_ref())).unwrap_or("".to_string()); tracing::warn!( %machine_id, @@ -445,6 +461,25 @@ fn dev_null(mut rx: broadcast::Receiver) { }); } +fn should_reset_retry_backoff( + connection_time: Duration, + successful_connection_minimum_duration: Duration, + connection_result: &Result<(), Arc>, + previous_connection_close_was_sol_recovery: bool, +) -> bool { + let recovered_conflicting_sol_session = connection_result + .as_ref() + .is_err_and(|error| error.retry_immediately()); + + if recovered_conflicting_sol_session { + // Retry one activation immediately after recovery, then use normal backoff if the conflict + // persists so competing SOL clients cannot cause a tight deactivate/activate loop. + !previous_connection_close_was_sol_recovery + } else { + connection_time > successful_connection_minimum_duration + } +} + /// Calculate the next exponential backoff duration for retrying connections to a console fn next_retry_backoff(config: &Config, prev: Duration) -> Duration { let duration = if prev == Duration::ZERO { @@ -523,3 +558,108 @@ impl Drop for BmcConnectionSubscription { ); } } + +#[cfg(test)] +mod tests { + use std::os::unix::process::ExitStatusExt; + + use super::*; + use crate::bmc::connection_impl::ipmi; + + #[test] + fn retry_backoff_resets_only_after_healthy_connection_or_successful_sol_recovery() { + let minimum_healthy_duration = Duration::from_secs(60); + let cases = [ + ( + "short successful connection", + Duration::from_secs(1), + Ok(()), + false, + false, + ), + ( + "exact minimum connection duration", + minimum_healthy_duration, + Ok(()), + false, + false, + ), + ( + "long-lived connection", + minimum_healthy_duration + Duration::from_secs(1), + Ok(()), + false, + true, + ), + ( + "ordinary IPMI failure", + Duration::from_secs(1), + Err(Arc::new(connection::SpawnError::Ipmi( + ipmi::SpawnError::IpmitoolUnexpectedExit { + exit_status: failed_exit_status(), + output: "authentication failed".to_string(), + }, + ))), + false, + false, + ), + ( + "successful conflicting SOL session recovery", + Duration::from_secs(1), + Err(Arc::new(connection::SpawnError::Ipmi( + ipmi::SpawnError::ConflictingSolSessionDeactivated { + exit_status: failed_exit_status(), + output: "SOL payload already active on another session".to_string(), + }, + ))), + false, + true, + ), + ( + "repeated successful conflicting SOL session recovery", + Duration::from_secs(1), + Err(Arc::new(connection::SpawnError::Ipmi( + ipmi::SpawnError::ConflictingSolSessionDeactivated { + exit_status: failed_exit_status(), + output: "SOL payload already active on another session".to_string(), + }, + ))), + true, + false, + ), + ( + "failed conflicting SOL session recovery", + Duration::from_secs(1), + Err(Arc::new(connection::SpawnError::Ipmi( + ipmi::SpawnError::ConflictingSolSessionDeactivationFailed { + exit_status: failed_exit_status(), + output: "SOL payload already active on another session".to_string(), + error: ipmi::SolDeactivateError::Failure { + exit_status: failed_exit_status(), + output: "deactivation failed".to_string(), + }, + }, + ))), + false, + false, + ), + ]; + + for (scenario, connection_time, result, previous_was_recovery, expected) in cases { + assert_eq!( + should_reset_retry_backoff( + connection_time, + minimum_healthy_duration, + &result, + previous_was_recovery, + ), + expected, + "{scenario}", + ); + } + } + + fn failed_exit_status() -> std::process::ExitStatus { + std::process::ExitStatus::from_raw(256) + } +} diff --git a/crates/ssh-console/src/bmc/client_pool.rs b/crates/ssh-console/src/bmc/client_pool.rs index 390aebd6e5..c504ed21c4 100644 --- a/crates/ssh-console/src/bmc/client_pool.rs +++ b/crates/ssh-console/src/bmc/client_pool.rs @@ -279,6 +279,14 @@ impl BmcPoolMetrics { .build(), } } + + #[cfg(test)] + pub(crate) fn for_test() -> Self { + Self::new( + &opentelemetry::global::meter("ssh-console-ipmi-test"), + Arc::default(), + ) + } } impl BmcPool { diff --git a/crates/ssh-console/src/bmc/connection.rs b/crates/ssh-console/src/bmc/connection.rs index 1fd8048abf..fbfbe7e0ee 100644 --- a/crates/ssh-console/src/bmc/connection.rs +++ b/crates/ssh-console/src/bmc/connection.rs @@ -69,6 +69,12 @@ pub enum SpawnError { Ipmi(#[from] connection_impl::ipmi::SpawnError), } +impl SpawnError { + pub(crate) fn retry_immediately(&self) -> bool { + matches!(self, Self::Ipmi(error) if error.retry_immediately()) + } +} + /// Get the address and auth details to use for a connection to a given machine or instance ID. /// /// This information is normally gotten by calling GetBMCMetadData on carbide-api, but it can diff --git a/crates/ssh-console/src/bmc/connection_impl/ipmi.rs b/crates/ssh-console/src/bmc/connection_impl/ipmi.rs index 26514aa6e2..7cf4934782 100644 --- a/crates/ssh-console/src/bmc/connection_impl/ipmi.rs +++ b/crates/ssh-console/src/bmc/connection_impl/ipmi.rs @@ -15,12 +15,14 @@ * limitations under the License. */ -use std::borrow::Cow; +use std::collections::VecDeque; use std::fmt::Debug; +use std::future::Future; use std::net::SocketAddr; use std::os::fd::{AsRawFd, OwnedFd}; use std::process::{ExitStatus, Stdio}; use std::sync::Arc; +use std::time::Duration; use carbide_uuid::machine::MachineId; use chrono::{DateTime, Utc}; @@ -45,6 +47,11 @@ use crate::io_util::{ self, PtyAllocError, set_controlling_terminal_on_exec, write_data_to_async_fd, }; +const SOL_PAYLOAD_ALREADY_ACTIVE: &str = "SOL payload already active on another session"; +const SOL_SESSION_OPERATIONAL: &[u8] = b"SOL Session operational"; +const MAX_CAPTURED_IPMITOOL_OUTPUT_SIZE: usize = 4096; +const SOL_DEACTIVATE_TIMEOUT: Duration = Duration::from_secs(10); + /// Spawn ipmitool in the background to connect to the given BMC specified by `connection_details`, /// and proxy data between it and the SSH frontend. /// @@ -74,18 +81,8 @@ pub async fn spawn( let pty_master = AsyncFd::new(pty_master).expect("BUG: not in tokio runtime?"); // Run `ipmitool sol activate` with the appropriate args - let mut command = tokio::process::Command::new("ipmitool"); + let mut command = sol_activate_command(&connection_details, &config); command - .arg("-I") - .arg("lanplus") - .arg("-H") - .arg(connection_details.addr.ip().to_string()) - .arg("-p") - .arg(connection_details.addr.port().to_string()) - .arg("-U") - .arg(&connection_details.user) - .arg("-P") - .arg(&connection_details.password) // connect stdin/stdout/stderr to the pty .stdin( pty_slave @@ -114,16 +111,13 @@ pub async fn spawn( // Set the xterm env var as a reasonable default. .env("TERM", "xterm"); - if config.insecure_ipmi_ciphers { - command.arg("-C").arg("3"); // use SHA1 ciphers, useful for ipmi_sim - } - command.arg("sol").arg("activate"); - // Spawn ipmitool in the controlling pty set_controlling_terminal_on_exec(&mut command, pty_slave.as_raw_fd()); let ipmitool_process = command .spawn() .map_err(|error| SpawnError::SpawningIpmitool { error })?; + drop(command); + drop(pty_slave); // Make a channel the frontend can use to send messages to us let (from_frontend_tx, from_frontend_rx) = mpsc::channel::(1); @@ -133,12 +127,14 @@ pub async fn spawn( config, ipmitool_process, output_buf: [0u8; 4096], + captured_output: VecDeque::with_capacity(MAX_CAPTURED_IPMITOOL_OUTPUT_SIZE), shutdown_rx, pty_master, from_frontend_rx, to_frontend_tx, ready_tx, metrics, + sol_session_operational: false, escape_was_pending: false, pending_line: PendingOutputLine::with_max_size(1024), connected_since: Utc::now(), @@ -153,7 +149,7 @@ pub async fn spawn( .await .map_err(|error| SpawnError::ProcessLoop { error, - output: ipmitool_proxy.output_buf_str().to_string(), + output: ipmitool_proxy.captured_output_string(), })?; let exit_status = ipmitool_proxy @@ -161,17 +157,22 @@ pub async fn spawn( .try_wait() .map_err(|error| SpawnError::CheckingIpmitoolExitStatus { error, - output: ipmitool_proxy.output_buf_str().to_string(), + output: ipmitool_proxy.captured_output_string(), })?; match exit_status { Some(exit_status) => { // Any exit from ipmitool is unexpected: It's supposed to run forever until we shut - // it down. - Err(SpawnError::IpmitoolUnexpectedExit { + // it down. A conflicting SOL session is recoverable, so deactivate it before the + // client retries the connection. + let output = ipmitool_proxy.captured_output_string(); + Err(handle_unexpected_ipmitool_exit( exit_status, - output: ipmitool_proxy.output_buf_str().to_string(), - }) + output, + ipmitool_proxy.sol_session_operational, + || deactivate_sol(&ipmitool_proxy.connection_details, &ipmitool_proxy.config), + ) + .await) } None => { // Process is still running (normal shutdown), we can kill it. @@ -221,6 +222,22 @@ pub enum SpawnError { exit_status: ExitStatus, output: String, }, + #[error( + "conflicting IPMI SOL session was deactivated after ipmitool exited unexpectedly: {exit_status}, output: {output}" + )] + ConflictingSolSessionDeactivated { + exit_status: ExitStatus, + output: String, + }, + #[error( + "failed to deactivate conflicting IPMI SOL session after ipmitool exited unexpectedly: {exit_status}, activation output: {output}: {error}" + )] + ConflictingSolSessionDeactivationFailed { + exit_status: ExitStatus, + output: String, + #[source] + error: SolDeactivateError, + }, #[error("Unknown error waiting for ipmitool to be ready")] WaitingForReady, #[error("error running ipmitool: {error}. output: {output}")] @@ -230,6 +247,117 @@ pub enum SpawnError { }, } +impl SpawnError { + pub(crate) fn retry_immediately(&self) -> bool { + matches!(self, Self::ConflictingSolSessionDeactivated { .. }) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum SolDeactivateError { + #[error("error spawning ipmitool for SOL deactivation: {error}")] + Spawning { error: std::io::Error }, + #[error("error waiting for ipmitool SOL deactivation: {error}")] + Waiting { error: std::io::Error }, + #[error("ipmitool SOL deactivation timed out after {timeout:?}")] + Timeout { timeout: Duration }, + #[error("ipmitool SOL deactivation failed with {exit_status}: {output}")] + Failure { + exit_status: ExitStatus, + output: String, + }, +} + +async fn handle_unexpected_ipmitool_exit( + exit_status: ExitStatus, + output: String, + sol_session_operational: bool, + deactivate_sol: Deactivate, +) -> SpawnError +where + Deactivate: FnOnce() -> DeactivateFuture, + DeactivateFuture: Future>, +{ + if sol_session_operational || !is_sol_payload_already_active(&output) { + return SpawnError::IpmitoolUnexpectedExit { + exit_status, + output, + }; + } + + match deactivate_sol().await { + Ok(()) => SpawnError::ConflictingSolSessionDeactivated { + exit_status, + output, + }, + Err(error) => SpawnError::ConflictingSolSessionDeactivationFailed { + exit_status, + output, + error, + }, + } +} + +fn is_sol_payload_already_active(output: &str) -> bool { + output.contains(SOL_PAYLOAD_ALREADY_ACTIVE) +} + +async fn deactivate_sol( + connection_details: &ConnectionDetails, + config: &Config, +) -> Result<(), SolDeactivateError> { + let machine_id = connection_details.machine_id; + // ssh-console multiplexes all frontends over one long-lived SOL session, so recovery here + // intentionally replaces any out-of-band session that prevents it from becoming the owner. + tracing::warn!( + %machine_id, + "conflicting IPMI SOL session detected; deactivating it before reconnecting" + ); + + let result = run_sol_deactivate_command( + sol_deactivate_command(connection_details, config), + SOL_DEACTIVATE_TIMEOUT, + ) + .await; + + if result.is_ok() { + tracing::info!( + %machine_id, + "conflicting IPMI SOL session deactivated; retrying connection immediately" + ); + } + + result +} + +async fn run_sol_deactivate_command( + mut command: tokio::process::Command, + timeout: Duration, +) -> Result<(), SolDeactivateError> { + let child = command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .map_err(|error| SolDeactivateError::Spawning { error })?; + let output = tokio::time::timeout(timeout, child.wait_with_output()) + .await + .map_err(|_| SolDeactivateError::Timeout { timeout })? + .map_err(|error| SolDeactivateError::Waiting { error })?; + + if output.status.success() { + Ok(()) + } else { + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + Err(SolDeactivateError::Failure { + exit_status: output.status, + output: format!("stdout: {stdout:?}, stderr: {stderr:?}"), + }) + } +} + #[derive(thiserror::Error, Debug)] pub enum ProcessLoopError { #[error("Error polling from pty master fd: {error}")] @@ -238,6 +366,12 @@ pub enum ProcessLoopError { WritingToFrontendChannel, #[error("error reading ipmitool output: {error}")] ReadingFromIpmitoolPty { error: std::io::Error }, + #[error("error waiting for ipmitool to exit: {error}")] + WaitingForIpmitool { error: std::io::Error }, + #[error("error checking ipmitool after its pty closed: {error}")] + CheckingIpmitoolAfterPtyClosed { error: std::io::Error }, + #[error("error killing ipmitool after its pty closed: {error}")] + KillingIpmitoolAfterPtyClosed { error: std::io::Error }, #[error("error sending frontend message to ipmi console: {0}")] SendingFrontendMessageToIpmiConsole(#[from] SendFrontendMessageToIpmiConsoleError), #[error("error resetting power: {0}")] @@ -265,12 +399,15 @@ struct IpmitoolMessageProxy { config: Arc, ipmitool_process: Child, output_buf: [u8; 4096], + captured_output: VecDeque, shutdown_rx: oneshot::Receiver<()>, pty_master: AsyncFd, from_frontend_rx: mpsc::Receiver, to_frontend_tx: broadcast::Sender, ready_tx: Option>, metrics: Arc, + // Once ipmitool confirms activation, later console output must not trigger activation recovery. + sol_session_operational: bool, // Keep track of whether the last byte sent from the client was the first byte of an escape sequence. escape_was_pending: bool, // Keep track of the last data we saw after a newline, so that we can replay it when clients join. @@ -282,6 +419,12 @@ struct IpmitoolMessageProxy { output_last_received: Option>, } +enum PtyReadResult { + Data(usize), + WouldBlock, + Closed, +} + impl IpmitoolMessageProxy { /// Poll from the SSH frontend and the ipmitool PTY in the foreground, pumping messages between /// them, until either the frontend closes or ipmitool exits. @@ -292,49 +435,68 @@ impl IpmitoolMessageProxy { async fn manage_ipmitool_process(&mut self) -> Result<(), ProcessLoopError> { let machine_id = self.connection_details.machine_id; let metrics_attrs = vec![KeyValue::new("machine_id", machine_id.to_string())]; + let mut ipmitool_exited = false; + let mut pty_closed = false; + let mut kill_requested = false; loop { tokio::select! { - // Poll for any data to be available in pty_master - guard = self.pty_master.readable() => { - let mut guard = guard.map_err(|error| ProcessLoopError::PollingFromPty { error })?; - // Read the available data - match unistd::read(guard.get_inner(), &mut self.output_buf) { - Ok(n) => { - if n == 0 { - tracing::debug!(%machine_id, "eof from pty fd"); - break; + // Break if we're shut down + _ = &mut self.shutdown_rx => { + tracing::debug!("ipmitool_process_loop shutdown received"); + break; + } + // Record the exit, then keep polling the PTY until all diagnostic output is drained. + exit_status = self.ipmitool_process.wait(), if !ipmitool_exited => { + let exit_status = exit_status + .map_err(|error| ProcessLoopError::WaitingForIpmitool { error })?; + tracing::warn!(%machine_id, ?exit_status, "ipmitool exited"); + ipmitool_exited = true; + loop { + let read_result = read_ipmitool_pty( + self.pty_master.get_ref(), + &mut self.output_buf, + ) + .inspect_err(|_| { + self.metrics + .bmc_rx_errors_total + .add(1, metrics_attrs.as_slice()); + })?; + match read_result { + PtyReadResult::Data(n) => { + self.handle_ipmitool_pty_data(n, &metrics_attrs)?; } - self.output_buf[n] = b'\0'; // null-terminate in case we need to print it later - // We've gotten at least one byte, we're now ready (ipmitool always outputs a message when connected.) - if let Some(ch) = self.ready_tx.take() { - self.connected_since = Utc::now(); - ch.send(()).ok(); + PtyReadResult::WouldBlock | PtyReadResult::Closed => { + pty_closed = true; + break; } - let data = &self.output_buf[0..n]; - self.output_last_received = Some(Utc::now()); - self.metrics.bmc_bytes_received_total.add(n as _, metrics_attrs.as_slice()); - self.bytes_received += n; - self.pending_line.extend(data); - self.to_frontend_tx.send(ToFrontendMessage::Channel(Arc::new(ChannelMsg::Data { data: data.to_vec().into() }))) - .map_err(|_| ProcessLoopError::WritingToFrontendChannel)?; - // Note, we're not clearing the ready state, so the fd will stay readable. - // The next time through the loop we'll get EWOULDBLOCK and clear the - // status. This lets us handle cases where there's more data to read than - // the buf size. } - Err(e) if e == Errno::EWOULDBLOCK => { + } + } + // Poll for any data to be available in pty_master + guard = self.pty_master.readable(), if !pty_closed => { + let mut guard = guard.map_err(|error| ProcessLoopError::PollingFromPty { error })?; + let read_result = read_ipmitool_pty(guard.get_inner(), &mut self.output_buf) + .inspect_err(|_| { + self.metrics.bmc_rx_errors_total.add(1, metrics_attrs.as_slice()); + })?; + match read_result { + PtyReadResult::Data(n) => { + drop(guard); + self.handle_ipmitool_pty_data(n, &metrics_attrs)?; + // Keep the readiness set so the next loop drains any remaining data. + } + PtyReadResult::WouldBlock => { // clear the readiness so we go back to polling guard.clear_ready(); } - Err(e) => { - self.metrics.bmc_rx_errors_total.add(1, metrics_attrs.as_slice()); - return Err(std::io::Error::from_raw_os_error(e as _)) - .map_err(|error| ProcessLoopError::ReadingFromIpmitoolPty { error }) + PtyReadResult::Closed => { + tracing::debug!(%machine_id, "eof from closed ipmitool pty"); + pty_closed = true; } - }; + } } // Poll for any messages from the SSH frontend - res = self.from_frontend_rx.recv() => match res { + res = self.from_frontend_rx.recv(), if !pty_closed => match res { Some(msg) => { self.send_frontend_message_to_ipmi_console(msg).await.inspect_err(|_| { self.metrics.bmc_tx_errors_total.add(1, metrics_attrs.as_slice()); @@ -345,18 +507,63 @@ impl IpmitoolMessageProxy { break; } }, - // Break if ipmitool exits - exit_status = self.ipmitool_process.wait() => { - tracing::warn!("ipmitool exited with status {:?}", exit_status); - break; - } - // Break if we're shut down - _ = &mut self.shutdown_rx => { - tracing::debug!("ipmitool_process_loop shutdown received"); - break; + } + + if pty_closed && !ipmitool_exited { + match self.ipmitool_process.try_wait() { + Ok(Some(exit_status)) => { + tracing::warn!(%machine_id, ?exit_status, "ipmitool exited after closing its pty"); + ipmitool_exited = true; + } + Ok(None) if !kill_requested => { + self.ipmitool_process.start_kill().map_err(|error| { + ProcessLoopError::KillingIpmitoolAfterPtyClosed { error } + })?; + kill_requested = true; + } + Ok(None) => {} + Err(error) => { + return Err(ProcessLoopError::CheckingIpmitoolAfterPtyClosed { error }); + } } } + + if pty_closed && ipmitool_exited { + break; + } + } + + Ok(()) + } + + fn handle_ipmitool_pty_data( + &mut self, + n: usize, + metrics_attrs: &[KeyValue], + ) -> Result<(), ProcessLoopError> { + let data = &self.output_buf[0..n]; + append_captured_output(&mut self.captured_output, data); + if !self.sol_session_operational + && captured_output_contains(&self.captured_output, SOL_SESSION_OPERATIONAL) + { + self.sol_session_operational = true; + } + // ipmitool always emits a message after either connecting or rejecting activation. + if let Some(ready_tx) = self.ready_tx.take() { + self.connected_since = Utc::now(); + ready_tx.send(()).ok(); } + self.output_last_received = Some(Utc::now()); + self.metrics + .bmc_bytes_received_total + .add(n as _, metrics_attrs); + self.bytes_received += n; + self.pending_line.extend(data); + self.to_frontend_tx + .send(ToFrontendMessage::Channel(Arc::new(ChannelMsg::Data { + data: data.to_vec().into(), + }))) + .map_err(|_| ProcessLoopError::WritingToFrontendChannel)?; Ok(()) } @@ -456,25 +663,12 @@ impl IpmitoolMessageProxy { } async fn power_reset(&mut self) -> Result<(), PowerResetError> { - let mut command = tokio::process::Command::new("ipmitool"); + let mut command = ipmitool_command(&self.connection_details, &self.config); command - .arg("-I") - .arg("lanplus") - .arg("-H") - .arg(self.connection_details.addr.ip().to_string()) - .arg("-p") - .arg(self.connection_details.addr.port().to_string()) - .arg("-U") - .arg(&self.connection_details.user) - .arg("-P") - .arg(&self.connection_details.password) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); - if self.config.insecure_ipmi_ciphers { - command.arg("-C").arg("3"); // use SHA1 ciphers, useful for ipmi_sim - } command.arg("power").arg("reset"); let output = command @@ -493,14 +687,86 @@ impl IpmitoolMessageProxy { } } - // output_buf is a 4096-byte array, get the output up to the first null terminator. - fn output_buf_str(&'_ self) -> Cow<'_, str> { - if let Some(null_idx) = self.output_buf.iter().position(|c| *c == b'\0') { - String::from_utf8_lossy(&self.output_buf[0..null_idx]) - } else { - String::from_utf8_lossy(&self.output_buf) - } + fn captured_output_string(&self) -> String { + let output: Vec<_> = self.captured_output.iter().copied().collect(); + String::from_utf8_lossy(&output).into_owned() + } +} + +fn read_ipmitool_pty( + pty_master: &OwnedFd, + output_buf: &mut [u8], +) -> Result { + match unistd::read(pty_master, output_buf) { + Ok(0) | Err(Errno::EIO) => Ok(PtyReadResult::Closed), + Ok(n) => Ok(PtyReadResult::Data(n)), + Err(Errno::EWOULDBLOCK) => Ok(PtyReadResult::WouldBlock), + Err(error) => Err(std::io::Error::from_raw_os_error(error as _)) + .map_err(|error| ProcessLoopError::ReadingFromIpmitoolPty { error }), + } +} + +fn append_captured_output(output: &mut VecDeque, data: &[u8]) { + output.extend(data.iter().copied()); + let excess = output + .len() + .saturating_sub(MAX_CAPTURED_IPMITOOL_OUTPUT_SIZE); + output.drain(..excess); +} + +fn captured_output_contains(output: &VecDeque, needle: &[u8]) -> bool { + !needle.is_empty() + && needle.len() <= output.len() + && (0..=output.len() - needle.len()).any(|start| { + output + .iter() + .skip(start) + .take(needle.len()) + .copied() + .eq(needle.iter().copied()) + }) +} + +fn ipmitool_command( + connection_details: &ConnectionDetails, + config: &Config, +) -> tokio::process::Command { + let mut command = tokio::process::Command::new("ipmitool"); + command + .arg("-I") + .arg("lanplus") + .arg("-H") + .arg(connection_details.addr.ip().to_string()) + .arg("-p") + .arg(connection_details.addr.port().to_string()) + .arg("-U") + .arg(&connection_details.user) + .arg("-P") + .arg(&connection_details.password); + + if config.insecure_ipmi_ciphers { + command.arg("-C").arg("3"); // use SHA1 ciphers, useful for ipmi_sim } + + command +} + +fn sol_deactivate_command( + connection_details: &ConnectionDetails, + config: &Config, +) -> tokio::process::Command { + let mut command = ipmitool_command(connection_details, config); + command.arg("sol").arg("deactivate"); + command +} + +fn sol_activate_command( + connection_details: &ConnectionDetails, + config: &Config, +) -> tokio::process::Command { + let mut command = ipmitool_command(connection_details, config); + command.arg("sol").arg("activate"); + command } #[derive(Clone)] @@ -521,3 +787,333 @@ impl Debug for ConnectionDetails { .finish() } } + +#[cfg(test)] +mod tests { + use std::ffi::OsStr; + use std::net::{IpAddr, Ipv4Addr}; + use std::os::unix::process::ExitStatusExt; + use std::sync::atomic::{AtomicBool, Ordering}; + + use carbide_test_support::value_scenarios; + use carbide_uuid::machine::{MachineIdSource, MachineType}; + + use super::*; + + #[test] + fn sol_payload_already_active_output_is_detected_narrowly() { + value_scenarios!( + run = |output: &str| is_sol_payload_already_active(output); + + "conflicting SOL session" { + "Info: SOL payload already active on another session" => true, + "prefix\r\nInfo: SOL payload already active on another session\r\nsuffix" => true, + } + + "unrelated ipmitool output" { + "" => false, + "Error: Unable to establish IPMI v2 / RMCP+ session" => false, + "Info: SOL payload disabled" => false, + "Info: SOL payload activation limit reached" => false, + "Info: SOL payload already de-activated" => false, + } + ); + } + + #[tokio::test] + async fn unrelated_ipmitool_exit_does_not_deactivate_sol() { + let deactivation_called = AtomicBool::new(false); + + let error = handle_unexpected_ipmitool_exit( + failed_exit_status(), + "authentication failed".to_string(), + false, + || async { + deactivation_called.store(true, Ordering::Relaxed); + Ok(()) + }, + ) + .await; + + assert!(matches!(&error, SpawnError::IpmitoolUnexpectedExit { .. })); + assert!(!deactivation_called.load(Ordering::Relaxed)); + assert!(!error.retry_immediately()); + } + + #[tokio::test] + async fn conflicting_sol_session_is_deactivated_and_retried_immediately() { + let error = handle_unexpected_ipmitool_exit( + failed_exit_status(), + format!("Info: {SOL_PAYLOAD_ALREADY_ACTIVE}\r\n"), + false, + || async { Ok(()) }, + ) + .await; + + assert!(matches!( + &error, + SpawnError::ConflictingSolSessionDeactivated { .. } + )); + assert!(error.retry_immediately()); + } + + #[tokio::test] + async fn failed_sol_deactivation_preserves_normal_retry_backoff() { + let error = handle_unexpected_ipmitool_exit( + failed_exit_status(), + format!("Info: {SOL_PAYLOAD_ALREADY_ACTIVE}\r\n"), + false, + || async { + Err(SolDeactivateError::Failure { + exit_status: failed_exit_status(), + output: "deactivation failed".to_string(), + }) + }, + ) + .await; + + assert!(matches!( + &error, + SpawnError::ConflictingSolSessionDeactivationFailed { .. } + )); + assert!(!error.retry_immediately()); + assert!(error.to_string().contains("deactivation failed")); + } + + #[tokio::test] + async fn established_sol_session_never_treats_console_output_as_activation_failure() { + let deactivation_called = AtomicBool::new(false); + let error = handle_unexpected_ipmitool_exit( + failed_exit_status(), + format!("host output: {SOL_PAYLOAD_ALREADY_ACTIVE}\r\n"), + true, + || async { + deactivation_called.store(true, Ordering::Relaxed); + Ok(()) + }, + ) + .await; + + assert!(matches!(&error, SpawnError::IpmitoolUnexpectedExit { .. })); + assert!(!deactivation_called.load(Ordering::Relaxed)); + assert!(!error.retry_immediately()); + } + + #[test] + fn captured_output_detects_fragmented_activation_banner_and_stays_bounded() { + let mut output = VecDeque::new(); + append_captured_output(&mut output, b"prefix SOL Session oper"); + append_captured_output(&mut output, b"ational. Use ~? for help\r\n"); + + assert!(captured_output_contains(&output, SOL_SESSION_OPERATIONAL)); + + append_captured_output( + &mut output, + &vec![b'x'; MAX_CAPTURED_IPMITOOL_OUTPUT_SIZE + 1], + ); + assert_eq!(output.len(), MAX_CAPTURED_IPMITOOL_OUTPUT_SIZE); + assert!(output.iter().all(|byte| *byte == b'x')); + } + + #[tokio::test] + async fn exited_process_output_is_drained_from_the_pty() { + let OpenptyResult { + master: pty_master, + slave: pty_slave, + } = io_util::alloc_pty(80, 24).expect("allocate pty"); + let pty_master = AsyncFd::new(pty_master).expect("register pty with tokio"); + let mut command = tokio::process::Command::new("sh"); + command + .arg("-c") + .arg(format!( + "printf 'Info: {SOL_PAYLOAD_ALREADY_ACTIVE}\\r\\n'; exit 1" + )) + .stdin(Stdio::null()) + .stdout(pty_slave.try_clone().expect("clone pty for stdout")) + .stderr(pty_slave.try_clone().expect("clone pty for stderr")); + let ipmitool_process = command.spawn().expect("spawn fake ipmitool"); + drop(command); + drop(pty_slave); + + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + let (_from_frontend_tx, from_frontend_rx) = mpsc::channel(1); + let (to_frontend_tx, _to_frontend_rx) = broadcast::channel(8); + let (ready_tx, mut ready_rx) = oneshot::channel(); + let mut proxy = IpmitoolMessageProxy { + connection_details: Arc::new(connection_details()), + config: Arc::new(Config::default()), + ipmitool_process, + output_buf: [0; 4096], + captured_output: VecDeque::with_capacity(MAX_CAPTURED_IPMITOOL_OUTPUT_SIZE), + shutdown_rx, + pty_master, + from_frontend_rx, + to_frontend_tx, + ready_tx: Some(ready_tx), + metrics: Arc::new(BmcPoolMetrics::for_test()), + sol_session_operational: false, + escape_was_pending: false, + pending_line: PendingOutputLine::with_max_size(1024), + bytes_received: 0, + connected_since: Utc::now(), + output_last_received: None, + }; + + tokio::time::timeout(Duration::from_secs(2), proxy.manage_ipmitool_process()) + .await + .expect("PTY drain should not hang") + .expect("PTY drain should succeed"); + + assert!( + proxy + .captured_output_string() + .contains(SOL_PAYLOAD_ALREADY_ACTIVE) + ); + assert!(!proxy.sol_session_operational); + assert!( + ready_rx.try_recv().is_ok(), + "diagnostic output should mark the process ready" + ); + assert_eq!( + proxy + .ipmitool_process + .try_wait() + .expect("read child status") + .and_then(|status| status.code()), + Some(1) + ); + } + + #[test] + fn sol_commands_use_the_same_connection_details_and_cipher_configuration() { + let connection_details = connection_details(); + let config = Config { + insecure_ipmi_ciphers: true, + ..Default::default() + }; + let expected_common_args = [ + "-I", + "lanplus", + "-H", + "192.0.2.10", + "-p", + "1623", + "-U", + "admin", + "-P", + "password", + "-C", + "3", + "sol", + ]; + + for (scenario, command, action) in [ + ( + "activation", + sol_activate_command(&connection_details, &config), + "activate", + ), + ( + "deactivation", + sol_deactivate_command(&connection_details, &config), + "deactivate", + ), + ] { + assert_eq!(command.as_std().get_program(), OsStr::new("ipmitool")); + let args: Vec<_> = command + .as_std() + .get_args() + .map(|arg| arg.to_str().expect("ipmitool arguments should be UTF-8")) + .collect(); + let expected_args: Vec<_> = expected_common_args + .iter() + .copied() + .chain(std::iter::once(action)) + .collect(); + assert_eq!(args, expected_args, "{scenario}"); + } + + let command = sol_deactivate_command(&connection_details, &Config::default()); + assert!( + !command + .as_std() + .get_args() + .any(|arg| arg == OsStr::new("-C")), + "secure defaults should not force the test-only cipher suite" + ); + } + + #[tokio::test] + async fn sol_deactivate_command_accepts_successful_exit() { + let mut command = tokio::process::Command::new("sh"); + command.arg("-c").arg("exit 0"); + + run_sol_deactivate_command(command, SOL_DEACTIVATE_TIMEOUT) + .await + .expect("successful command should be accepted"); + } + + #[tokio::test] + async fn sol_deactivate_command_reports_exit_output() { + let mut command = tokio::process::Command::new("sh"); + command + .arg("-c") + .arg("printf 'standard output'; printf 'standard error' >&2; exit 7"); + + let error = run_sol_deactivate_command(command, SOL_DEACTIVATE_TIMEOUT) + .await + .expect_err("failed command should be reported"); + + match error { + SolDeactivateError::Failure { + exit_status, + output, + } => { + assert_eq!(exit_status.code(), Some(7)); + assert!(output.contains("standard output")); + assert!(output.contains("standard error")); + } + other => panic!("unexpected error: {other}"), + } + } + + #[tokio::test] + async fn sol_deactivate_command_reports_spawn_failure() { + let command = tokio::process::Command::new("/path/that/does/not/exist/ipmitool"); + + let error = run_sol_deactivate_command(command, SOL_DEACTIVATE_TIMEOUT) + .await + .expect_err("missing executable should be reported"); + + assert!(matches!(error, SolDeactivateError::Spawning { .. })); + } + + #[tokio::test] + async fn sol_deactivate_command_reports_timeout() { + let mut command = tokio::process::Command::new("sleep"); + command.arg("60"); + let timeout = Duration::from_millis(10); + + let error = run_sol_deactivate_command(command, timeout) + .await + .expect_err("hung command should time out"); + + assert!(matches!( + error, + SolDeactivateError::Timeout { timeout: actual } if actual == timeout + )); + } + + fn connection_details() -> ConnectionDetails { + ConnectionDetails { + machine_id: MachineId::new(MachineIdSource::Tpm, [0; 32], MachineType::Host), + addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 0, 2, 10)), 1623), + user: "admin".to_string(), + password: "password".to_string(), + } + } + + fn failed_exit_status() -> ExitStatus { + ExitStatus::from_raw(256) + } +} diff --git a/crates/ssh-console/tests/main.rs b/crates/ssh-console/tests/main.rs index ae231623ec..5ce8ceb110 100644 --- a/crates/ssh-console/tests/main.rs +++ b/crates/ssh-console/tests/main.rs @@ -33,7 +33,7 @@ use ::ssh_console::shutdown_handle::ShutdownHandle; use api_test_helper::utils::REPO_ROOT; use util::ssh_console_test_helper; -use crate::util::ssh_client::PermissiveSshClient; +use crate::util::ssh_client::{ConnectionConfig, PermissiveSshClient}; use crate::util::{BaselineTestAssertion, MockBmcType, run_baseline_test_environment}; static TENANT_SSH_PUBKEY: &str = include_str!("fixtures/tenant_ssh_key.pub"); @@ -160,6 +160,61 @@ async fn test_ssh_console() -> eyre::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_ipmi_sol_conflict_recovery() -> eyre::Result<()> { + if std::env::var("REPO_ROOT").is_err() { + tracing::info!("Skipping running ssh-console integration tests, as REPO_ROOT is not set"); + return Ok(()); + } + let Some(env) = run_baseline_test_environment(vec![MockBmcType::Ipmi]).await? else { + return Ok(()); + }; + let mock_host = &env.mock_hosts[0]; + let active_sol_session = util::ipmi_sim::activate_sol( + mock_host + .ipmi_port + .expect("IPMI mock should provide an IPMI port"), + ) + .await?; + + let handle = ssh_console_test_helper::spawn( + env.mock_api_server.addr.port(), + Some(ssh_console_test_helper::ConfigOverrides { + reconnect_interval_base: Some(Duration::from_secs(30)), + reconnect_interval_max: Some(Duration::from_secs(30)), + successful_connection_minimum_duration: Some(Duration::from_secs(60)), + }), + ) + .await?; + + let user = mock_host.machine_id.to_string(); + let expected_prompt = format!("root@{} # ", mock_host.machine_id).into_bytes(); + // Connecting within twenty seconds proves the stale holder was evicted and retried immediately, + // rather than waiting for the configured 30-second normal reconnect delay. Keep the holder + // process alive until then so dropping its local PTY cannot make the conflict disappear. + tokio::time::timeout( + Duration::from_secs(20), + util::ssh_client::assert_connection_works_with_retries_and_timeout( + &ConnectionConfig { + connection_name: "ssh-console after conflicting IPMI SOL session recovery", + user: &user, + private_key_path: &ADMIN_SSH_KEY_PATH, + addr: handle.addr, + expected_prompt: &expected_prompt, + }, + 5, + Duration::from_secs(10), + ), + ) + .await + .context("ssh-console did not recover the conflicting SOL session before normal backoff")??; + + drop(active_sol_session); + + handle.spawn_handle.shutdown_and_wait().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_ssh_console_reconnect() -> eyre::Result<()> { if std::env::var("REPO_ROOT").is_err() { diff --git a/crates/ssh-console/tests/util/ipmi_sim.rs b/crates/ssh-console/tests/util/ipmi_sim.rs index 07a9e92f0b..6fba2accd8 100644 --- a/crates/ssh-console/tests/util/ipmi_sim.rs +++ b/crates/ssh-console/tests/util/ipmi_sim.rs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::os::fd::{AsRawFd, OwnedFd}; use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; use std::process::Stdio; @@ -22,7 +23,12 @@ use std::time::Duration; use api_test_helper::utils::REPO_ROOT; use eyre::Context; use lazy_static::lazy_static; +use nix::errno::Errno; +use nix::fcntl::{FcntlArg, OFlag, fcntl}; +use nix::pty::openpty; +use nix::unistd; use temp_dir::TempDir; +use tokio::io::unix::AsyncFd; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::oneshot; @@ -40,6 +46,99 @@ pub struct IpmiSimHandle { pub port: u16, } +pub struct ActiveSolSession { + _ipmitool: tokio::process::Child, + _pty_master: AsyncFd, +} + +pub async fn activate_sol(port: u16) -> eyre::Result { + let pty = openpty(None, None).context("failed to allocate ipmitool pty")?; + set_nonblocking(&pty.master).context("failed to make ipmitool pty nonblocking")?; + + let mut command = tokio::process::Command::new("ipmitool"); + command + .arg("-I") + .arg("lanplus") + .arg("-H") + .arg("127.0.0.1") + .arg("-p") + .arg(port.to_string()) + .arg("-U") + .arg("root") + .arg("-P") + .arg("password") + .arg("-C") + .arg("3") + .arg("sol") + .arg("activate") + .stdin(pty.slave.try_clone().context("clone pty for stdin")?) + .stdout(pty.slave.try_clone().context("clone pty for stdout")?) + .stderr(pty.slave.try_clone().context("clone pty for stderr")?) + .kill_on_drop(true); + + let pty_slave_fd = pty.slave.as_raw_fd(); + // SAFETY: this runs in the child between fork and exec to give interactive ipmitool a terminal. + unsafe { + command.pre_exec(move || { + unistd::setsid()?; + if libc::ioctl(pty_slave_fd, libc::TIOCSCTTY, 0) < 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) + }); + } + + let ipmitool = command + .spawn() + .context("failed to start conflicting SOL session")?; + drop(command); + drop(pty.slave); + let pty_master = AsyncFd::new(pty.master).context("failed to register ipmitool pty")?; + + tokio::time::timeout(Duration::from_secs(10), async { + let mut output = Vec::new(); + let mut buf = [0; 1024]; + loop { + let mut guard = pty_master.readable().await?; + match unistd::read(guard.get_inner(), &mut buf) { + Ok(0) | Err(Errno::EIO) => { + return Err(eyre::eyre!( + "ipmitool exited before activating SOL: {}", + String::from_utf8_lossy(&output) + )); + } + Ok(n) => { + output.extend_from_slice(&buf[..n]); + if output + .windows(b"SOL Session operational".len()) + .any(|window| window == b"SOL Session operational") + { + return Ok::<(), eyre::Report>(()); + } + } + Err(Errno::EWOULDBLOCK) => guard.clear_ready(), + Err(error) => return Err(error.into()), + } + } + }) + .await + .context("timed out waiting for the conflicting SOL session to activate")??; + + Ok(ActiveSolSession { + _ipmitool: ipmitool, + _pty_master: pty_master, + }) +} + +fn set_nonblocking(fd: &OwnedFd) -> nix::Result<()> { + let current_flags = fcntl(fd, FcntlArg::F_GETFL)?; + fcntl( + fd, + FcntlArg::F_SETFL(OFlag::from_bits_truncate(current_flags) | OFlag::O_NONBLOCK), + )?; + Ok(()) +} + /// Run an instance of ipmi_sim and a corresponding instance of a mock serial console, for tests to /// use. Accepts a `prompt` parameter which will be echoed back when the clients send data (for /// tests to assert that it's the expected host.) From 7fd9735a32ea4303b2aaa7583e657a4b247ea786 Mon Sep 17 00:00:00 2001 From: Hasan Khan Date: Sun, 28 Jun 2026 12:13:26 -0700 Subject: [PATCH 2/2] fix(ssh-console): discard operational console diagnostics Signed-off-by: Hasan Khan --- .../src/bmc/connection_impl/ipmi.rs | 61 +++++++++++++++---- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/crates/ssh-console/src/bmc/connection_impl/ipmi.rs b/crates/ssh-console/src/bmc/connection_impl/ipmi.rs index 7cf4934782..d38f43ed3a 100644 --- a/crates/ssh-console/src/bmc/connection_impl/ipmi.rs +++ b/crates/ssh-console/src/bmc/connection_impl/ipmi.rs @@ -542,12 +542,11 @@ impl IpmitoolMessageProxy { metrics_attrs: &[KeyValue], ) -> Result<(), ProcessLoopError> { let data = &self.output_buf[0..n]; - append_captured_output(&mut self.captured_output, data); - if !self.sol_session_operational - && captured_output_contains(&self.captured_output, SOL_SESSION_OPERATIONAL) - { - self.sol_session_operational = true; - } + capture_ipmitool_startup_output( + &mut self.captured_output, + &mut self.sol_session_operational, + data, + ); // ipmitool always emits a message after either connecting or rejecting activation. if let Some(ready_tx) = self.ready_tx.take() { self.connected_since = Utc::now(); @@ -714,6 +713,22 @@ fn append_captured_output(output: &mut VecDeque, data: &[u8]) { output.drain(..excess); } +fn capture_ipmitool_startup_output( + output: &mut VecDeque, + sol_session_operational: &mut bool, + data: &[u8], +) { + if *sol_session_operational { + return; + } + + append_captured_output(output, data); + if captured_output_contains(output, SOL_SESSION_OPERATIONAL) { + *sol_session_operational = true; + output.clear(); + } +} + fn captured_output_contains(output: &VecDeque, needle: &[u8]) -> bool { !needle.is_empty() && needle.len() <= output.len() @@ -900,19 +915,39 @@ mod tests { } #[test] - fn captured_output_detects_fragmented_activation_banner_and_stays_bounded() { + fn startup_output_capture_is_bounded_and_stops_after_sol_becomes_operational() { let mut output = VecDeque::new(); - append_captured_output(&mut output, b"prefix SOL Session oper"); - append_captured_output(&mut output, b"ational. Use ~? for help\r\n"); - - assert!(captured_output_contains(&output, SOL_SESSION_OPERATIONAL)); - - append_captured_output( + let mut sol_session_operational = false; + capture_ipmitool_startup_output( &mut output, + &mut sol_session_operational, &vec![b'x'; MAX_CAPTURED_IPMITOOL_OUTPUT_SIZE + 1], ); assert_eq!(output.len(), MAX_CAPTURED_IPMITOOL_OUTPUT_SIZE); assert!(output.iter().all(|byte| *byte == b'x')); + + output.clear(); + capture_ipmitool_startup_output( + &mut output, + &mut sol_session_operational, + b"prefix SOL Session oper", + ); + assert!(!sol_session_operational); + + capture_ipmitool_startup_output( + &mut output, + &mut sol_session_operational, + b"ational. Use ~? for help\r\n", + ); + assert!(sol_session_operational); + assert!(output.is_empty()); + + capture_ipmitool_startup_output( + &mut output, + &mut sol_session_operational, + b"post-connect host console data", + ); + assert!(output.is_empty()); } #[tokio::test]