From 01e0c22f431fab810356397d0dd81805736c959e Mon Sep 17 00:00:00 2001 From: Junyi Ou Date: Tue, 23 Jun 2026 13:47:05 -0400 Subject: [PATCH 1/3] refactor(runspace_pool): extract host-call construction/classification into host_call.rs Move the pure PipelineHostCall parsing and the needs-session-key classification out of pool.rs into a new private runspace_pool::host_call module (named to avoid collision with crate::host). Fragmentation-coupled send + the pending_host_calls queue stay in the pool. Behavior-preserving. --- crates/ironposh-client-core/src/runspace_pool/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/ironposh-client-core/src/runspace_pool/mod.rs b/crates/ironposh-client-core/src/runspace_pool/mod.rs index abf5543..ce9c9b4 100644 --- a/crates/ironposh-client-core/src/runspace_pool/mod.rs +++ b/crates/ironposh-client-core/src/runspace_pool/mod.rs @@ -2,6 +2,7 @@ pub mod creator; mod crypto; pub mod enums; pub mod expect_shell_connected; +mod host_call; pub mod expect_shell_created; mod host_call; pub mod pool; From 886d91fa108114a98c7ce76654b8b6bf470d2a6d Mon Sep 17 00:00:00 2001 From: Junyi Ou Date: Tue, 23 Jun 2026 13:55:35 -0400 Subject: [PATCH 2/3] refactor(runspace_pool): group outbound WSMan request builders into requests.rs Split the eight outbound-request methods (fire_receive/disconnect/reconnect, kill_pipeline, invoke_spec, send_pipeline_host_response, send_runspace_pool_message, build_public_key_blob_base64) out of pool.rs into a sibling requests.rs via a split impl RunspacePool block. Collects all &self.connection (WsMan) usage in one place as groundwork for a future transport seam. pool.rs 2029 -> 1594 lines; behavior-identical. --- .../src/runspace_pool/mod.rs | 2 +- .../src/runspace_pool/pool.rs | 234 +--------------- .../src/runspace_pool/requests.rs | 254 ++++++++++++++++++ 3 files changed, 257 insertions(+), 233 deletions(-) create mode 100644 crates/ironposh-client-core/src/runspace_pool/requests.rs diff --git a/crates/ironposh-client-core/src/runspace_pool/mod.rs b/crates/ironposh-client-core/src/runspace_pool/mod.rs index ce9c9b4..74b2acb 100644 --- a/crates/ironposh-client-core/src/runspace_pool/mod.rs +++ b/crates/ironposh-client-core/src/runspace_pool/mod.rs @@ -2,10 +2,10 @@ pub mod creator; mod crypto; pub mod enums; pub mod expect_shell_connected; -mod host_call; pub mod expect_shell_created; mod host_call; pub mod pool; +mod requests; pub mod types; // Re-export public types diff --git a/crates/ironposh-client-core/src/runspace_pool/pool.rs b/crates/ironposh-client-core/src/runspace_pool/pool.rs index 821a6b3..56a6a9b 100644 --- a/crates/ironposh-client-core/src/runspace_pool/pool.rs +++ b/crates/ironposh-client-core/src/runspace_pool/pool.rs @@ -14,7 +14,6 @@ use ironposh_winrm::{ ws_management::{OptionSetValue, WsAction, WsMan}, }; use ironposh_xml::parser::XmlDeserialize; -use rsa::traits::PublicKeyParts; use rsa::{RsaPrivateKey, pkcs1v15::Pkcs1v15Encrypt}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -23,7 +22,7 @@ use uuid::Uuid; use crate::{ PwshCoreError, host::HostCall, - pipeline::{Pipeline, PipelineCommand, PipelineSpec}, + pipeline::{Pipeline, PipelineCommand}, powershell::PipelineHandle, runspace::win_rs::WinRunspace, runspace_pool::PsInvocationState, @@ -286,19 +285,6 @@ impl RunspacePool { )) } - // We should accept the pipeline id here, but for now let's ignore it - pub(crate) fn fire_receive( - &self, - desired_streams: Vec, - ) -> Result { - debug_assert!(!desired_streams.is_empty(), "At least one desired stream"); - Ok(self - .shell - .fire_receive(&self.connection, desired_streams) - .into() - .to_xml_string()?) - } - /// Server-assigned shell id (available after the shell has been created). pub fn shell_id(&self) -> Option<&str> { self.shell.shell_id() @@ -309,27 +295,6 @@ impl RunspacePool { self.application_private_data.as_ref() } - /// Build a Disconnect request for this pool's shell (MS-WSMV 3.1.4.13). - /// Valid only in `Opened` state; transitions the pool to `Disconnecting`. - #[instrument(skip(self))] - pub fn fire_disconnect(&mut self) -> Result { - if self.state != RunspacePoolState::Opened { - return Err(crate::PwshCoreError::InvalidState( - "RunspacePool must be in Opened state to disconnect", - )); - } - - let xml = self - .shell - .fire_disconnect(&self.connection) - .into() - .to_xml_string()?; - - self.state = RunspacePoolState::Disconnecting; - info!(runspace_pool_id = %self.id, "runspace pool disconnect requested"); - Ok(xml) - } - /// Accept the server's DisconnectResponse. /// Valid only in `Disconnecting` state; transitions the pool to `Disconnected`. #[instrument(skip(self, soap_envelope), fields(envelope_length = soap_envelope.len()))] @@ -395,27 +360,6 @@ impl RunspacePool { } } - /// Build a Reconnect request for this pool's shell (MS-WSMV 3.1.4.14). - /// Valid only in `Disconnected` state; transitions the pool to `Connecting`. - #[instrument(skip(self))] - pub fn fire_reconnect(&mut self) -> Result { - if self.state != RunspacePoolState::Disconnected { - return Err(crate::PwshCoreError::InvalidState( - "RunspacePool must be in Disconnected state to reconnect", - )); - } - - let xml = self - .shell - .fire_reconnect(&self.connection) - .into() - .to_xml_string()?; - - self.state = RunspacePoolState::Connecting; - info!(runspace_pool_id = %self.id, "runspace pool reconnect requested"); - Ok(xml) - } - /// Accept the server's ReconnectResponse. /// Valid only in `Connecting` state; transitions the pool back to `Opened`. /// The caller is responsible for resuming the receive loop afterwards. @@ -1481,34 +1425,6 @@ impl RunspacePool { Ok(request.into().to_xml_string()?) } - pub fn kill_pipeline(&mut self, handle: &PipelineHandle) -> Result { - let pipeline = self - .pipelines - .get_mut(&handle.id()) - .ok_or(PwshCoreError::InvalidState( - "Pipeline handle not found, pipeline_id", - )) - .inspect_err(|_| { - error!(pipeline_id = ?&handle.id(), "Pipeline handle not found "); - })?; - - if pipeline.is_terminal() { - return Err(PwshCoreError::InvalidState( - "Cannot kill a pipeline that is already stopped, completed, or failed", - )); - } - - // Set pipeline state to Stopping - pipeline.set_state(PsInvocationState::Stopping); - info!(pipeline_id = %handle.id(), "Killing pipeline"); - - let request = self - .shell - .terminal_pipeline_signal(&self.connection, handle.id())?; - - Ok(request.into().to_xml_string()?) - } - #[instrument(skip_all)] pub fn handle_pipeline_host_call( &mut self, @@ -1519,58 +1435,6 @@ impl RunspacePool { super::host_call::pipeline_host_call_from(ps_value, stream_name, command_id) } - /// Send a pipeline host response to the server - #[instrument( - skip_all, - fields( - command_id = %command_id, - call_id = host_response.call_id, - method = ?host_response.method - ) - )] - pub fn send_pipeline_host_response( - &mut self, - command_id: uuid::Uuid, - host_response: &ironposh_psrp::PipelineHostResponse, - ) -> Result { - let _span = tracing::trace_span!("send_pipeline_host_response").entered(); - - // Fragment the host response message - tracing::trace!(stage = "fragment"); - let fragmented = - self.fragmenter - .fragment(host_response, self.id, Some(command_id), None)?; - tracing::trace!(fragment_count = fragmented.len(), stage = "fragmented"); - - // Encode fragments as base64 - tracing::trace!(stage = "base64_encode"); - let arguments = fragmented - .into_iter() - .map(|bytes| base64::engine::general_purpose::STANDARD.encode(&bytes[..])) - .collect::>(); - tracing::trace!( - argument_count = arguments.len(), - first_len = arguments.first().map(String::len), - stage = "base64_encoded" - ); - - // Create WS-Man Send request (send data to stdin) - tracing::trace!(stage = "wsman_send_request"); - let request = - self.shell - .send_data_request(&self.connection, Some(command_id), &arguments)?; - tracing::trace!(stage = "wsman_send_request_built"); - - let element: ironposh_xml::builder::Element<'_> = request.into(); - tracing::trace!(stage = "serialize_xml"); - let xml = element.to_xml_string().map_err(|e| { - tracing::error!(error = %e, stage = "serialize_xml", "failed to serialize XML"); - e - })?; - tracing::trace!(xml_len = xml.len(), stage = "done"); - Ok(xml) - } - /// Send a runspace pool host response to the server #[instrument( skip_all, @@ -1630,37 +1494,7 @@ impl RunspacePool { Ok(xml) } - fn send_runspace_pool_message( - &mut self, - message: &dyn ironposh_psrp::PsObjectWithType, - ) -> Result { - let _span = tracing::trace_span!( - "send_runspace_pool_message", - message_type = ?message.message_type() - ) - .entered(); - - tracing::trace!(stage = "fragment"); - let fragmented = self.fragmenter.fragment(message, self.id, None, None)?; - tracing::trace!(fragment_count = fragmented.len(), stage = "fragmented"); - - tracing::trace!(stage = "base64_encode"); - let arguments = fragmented - .into_iter() - .map(|bytes| base64::engine::general_purpose::STANDARD.encode(&bytes[..])) - .collect::>(); - - tracing::trace!(stage = "wsman_send_request"); - let request = self - .shell - .send_data_request(&self.connection, None, &arguments)?; - - let element: ironposh_xml::builder::Element<'_> = request.into(); - let xml = element.to_xml_string()?; - Ok(xml) - } - - fn ensure_key_exchange_state( + pub(super) fn ensure_key_exchange_state( &mut self, ) -> Result<&mut super::crypto::KeyExchangeState, PwshCoreError> { if self.key_exchange.is_none() { @@ -1680,52 +1514,6 @@ impl RunspacePool { .expect("key exchange state initialized")) } - fn build_public_key_blob_base64(&mut self) -> Result { - const MAGIC: [u8; 4] = [0x06, 0x02, 0x00, 0x00]; - const KEYTYPE: [u8; 4] = [0x00, 0xA4, 0x00, 0x00]; - const RSA1: [u8; 4] = [0x52, 0x53, 0x41, 0x31]; - const BITLEN_2048: [u8; 4] = [0x00, 0x08, 0x00, 0x00]; - - let state = self.ensure_key_exchange_state()?; - let public_key = state.private_key.to_public_key(); - - let exponent_be_raw = public_key.e().to_bytes_be(); - if exponent_be_raw.is_empty() || exponent_be_raw.len() > 4 { - return Err(PwshCoreError::InternalError(format!( - "unexpected RSA exponent length: {} bytes", - exponent_be_raw.len() - ))); - } - let mut exponent_be_padded = [0u8; 4]; - exponent_be_padded[4 - exponent_be_raw.len()..].copy_from_slice(&exponent_be_raw); - let exponent_u32 = u32::from_be_bytes(exponent_be_padded); - let exponent_le_u32_bytes = exponent_u32.to_le_bytes(); - - let mut modulus_be = public_key.n().to_bytes_be(); - if modulus_be.len() > 256 { - return Err(PwshCoreError::InternalError(format!( - "RSA modulus too large: {} bytes", - modulus_be.len() - ))); - } - if modulus_be.len() < 256 { - let mut padded = vec![0u8; 256 - modulus_be.len()]; - padded.extend_from_slice(&modulus_be); - modulus_be = padded; - } - let modulus_le_bytes = modulus_be.into_iter().rev().collect::>(); - - let mut blob = Vec::with_capacity(4 + 4 + 4 + 4 + 4 + 256); - blob.extend_from_slice(&MAGIC); - blob.extend_from_slice(&KEYTYPE); - blob.extend_from_slice(&RSA1); - blob.extend_from_slice(&BITLEN_2048); - blob.extend_from_slice(&exponent_le_u32_bytes); - blob.extend_from_slice(&modulus_le_bytes); - - Ok(base64::engine::general_purpose::STANDARD.encode(blob)) - } - pub fn handle_pipeline_output( &mut self, ps_value: PsValue, @@ -1754,24 +1542,6 @@ impl RunspacePool { pipeline.add_command(command); Ok(()) } - - /// Create, populate, and invoke a pipeline in one operation - pub(crate) fn invoke_spec( - &mut self, - uuid: Uuid, - spec: PipelineSpec, - ) -> Result { - // 1) Create the pipeline - let handle = self.init_pipeline(uuid)?; - - // 2) Add all commands from the spec - for cmd in spec.commands { - self.add_command(&handle, cmd)?; - } - - // 3) Invoke the pipeline using existing logic - self.invoke_pipeline_request(&handle) - } } #[cfg(test)] diff --git a/crates/ironposh-client-core/src/runspace_pool/requests.rs b/crates/ironposh-client-core/src/runspace_pool/requests.rs new file mode 100644 index 0000000..1f9d938 --- /dev/null +++ b/crates/ironposh-client-core/src/runspace_pool/requests.rs @@ -0,0 +1,254 @@ +//! Outbound WSMan request builders for [`RunspacePool`]. +//! +//! This file is a continuation of the `impl RunspacePool` block that lives in +//! [`super::pool`]. It groups the methods that build outbound WSMan requests +//! (Receive / Disconnect / Reconnect / Signal / Send) and the PSRP message +//! serialization that feeds them, so that all `&self.connection` (WsMan) usage +//! is collected in one place as groundwork for a future transport seam. +//! +//! This is a pure file-organization split: the methods here are behavior- and +//! signature-identical to their previous definitions in `pool.rs`. + +use base64::Engine; +use rsa::traits::PublicKeyParts; +use tracing::{error, info, instrument}; +use uuid::Uuid; + +use crate::{ + PwshCoreError, pipeline::PipelineSpec, powershell::PipelineHandle, + runspace_pool::PsInvocationState, +}; + +use super::enums::RunspacePoolState; +use super::pool::{DesiredStream, RunspacePool}; + +impl RunspacePool { + // We should accept the pipeline id here, but for now let's ignore it + pub(crate) fn fire_receive( + &self, + desired_streams: Vec, + ) -> Result { + debug_assert!(!desired_streams.is_empty(), "At least one desired stream"); + Ok(self + .shell + .fire_receive(&self.connection, desired_streams) + .into() + .to_xml_string()?) + } + + /// Build a Disconnect request for this pool's shell (MS-WSMV 3.1.4.13). + /// Valid only in `Opened` state; transitions the pool to `Disconnecting`. + #[instrument(skip(self))] + pub fn fire_disconnect(&mut self) -> Result { + if self.state != RunspacePoolState::Opened { + return Err(crate::PwshCoreError::InvalidState( + "RunspacePool must be in Opened state to disconnect", + )); + } + + let xml = self + .shell + .fire_disconnect(&self.connection) + .into() + .to_xml_string()?; + + self.state = RunspacePoolState::Disconnecting; + info!(runspace_pool_id = %self.id, "runspace pool disconnect requested"); + Ok(xml) + } + + /// Build a Reconnect request for this pool's shell (MS-WSMV 3.1.4.14). + /// Valid only in `Disconnected` state; transitions the pool to `Connecting`. + #[instrument(skip(self))] + pub fn fire_reconnect(&mut self) -> Result { + if self.state != RunspacePoolState::Disconnected { + return Err(crate::PwshCoreError::InvalidState( + "RunspacePool must be in Disconnected state to reconnect", + )); + } + + let xml = self + .shell + .fire_reconnect(&self.connection) + .into() + .to_xml_string()?; + + self.state = RunspacePoolState::Connecting; + info!(runspace_pool_id = %self.id, "runspace pool reconnect requested"); + Ok(xml) + } + + pub fn kill_pipeline(&mut self, handle: &PipelineHandle) -> Result { + let pipeline = self + .pipelines + .get_mut(&handle.id()) + .ok_or(PwshCoreError::InvalidState( + "Pipeline handle not found, pipeline_id", + )) + .inspect_err(|_| { + error!(pipeline_id = ?&handle.id(), "Pipeline handle not found "); + })?; + + if pipeline.is_terminal() { + return Err(PwshCoreError::InvalidState( + "Cannot kill a pipeline that is already stopped, completed, or failed", + )); + } + + // Set pipeline state to Stopping + pipeline.set_state(PsInvocationState::Stopping); + info!(pipeline_id = %handle.id(), "Killing pipeline"); + + let request = self + .shell + .terminal_pipeline_signal(&self.connection, handle.id())?; + + Ok(request.into().to_xml_string()?) + } + + /// Send a pipeline host response to the server + #[instrument( + skip_all, + fields( + command_id = %command_id, + call_id = host_response.call_id, + method = ?host_response.method + ) + )] + pub fn send_pipeline_host_response( + &mut self, + command_id: uuid::Uuid, + host_response: &ironposh_psrp::PipelineHostResponse, + ) -> Result { + let _span = tracing::trace_span!("send_pipeline_host_response").entered(); + + // Fragment the host response message + tracing::trace!(stage = "fragment"); + let fragmented = + self.fragmenter + .fragment(host_response, self.id, Some(command_id), None)?; + tracing::trace!(fragment_count = fragmented.len(), stage = "fragmented"); + + // Encode fragments as base64 + tracing::trace!(stage = "base64_encode"); + let arguments = fragmented + .into_iter() + .map(|bytes| base64::engine::general_purpose::STANDARD.encode(&bytes[..])) + .collect::>(); + tracing::trace!( + argument_count = arguments.len(), + first_len = arguments.first().map(String::len), + stage = "base64_encoded" + ); + + // Create WS-Man Send request (send data to stdin) + tracing::trace!(stage = "wsman_send_request"); + let request = + self.shell + .send_data_request(&self.connection, Some(command_id), &arguments)?; + tracing::trace!(stage = "wsman_send_request_built"); + + let element: ironposh_xml::builder::Element<'_> = request.into(); + tracing::trace!(stage = "serialize_xml"); + let xml = element.to_xml_string().map_err(|e| { + tracing::error!(error = %e, stage = "serialize_xml", "failed to serialize XML"); + e + })?; + tracing::trace!(xml_len = xml.len(), stage = "done"); + Ok(xml) + } + + pub(super) fn send_runspace_pool_message( + &mut self, + message: &dyn ironposh_psrp::PsObjectWithType, + ) -> Result { + let _span = tracing::trace_span!( + "send_runspace_pool_message", + message_type = ?message.message_type() + ) + .entered(); + + tracing::trace!(stage = "fragment"); + let fragmented = self.fragmenter.fragment(message, self.id, None, None)?; + tracing::trace!(fragment_count = fragmented.len(), stage = "fragmented"); + + tracing::trace!(stage = "base64_encode"); + let arguments = fragmented + .into_iter() + .map(|bytes| base64::engine::general_purpose::STANDARD.encode(&bytes[..])) + .collect::>(); + + tracing::trace!(stage = "wsman_send_request"); + let request = self + .shell + .send_data_request(&self.connection, None, &arguments)?; + + let element: ironposh_xml::builder::Element<'_> = request.into(); + let xml = element.to_xml_string()?; + Ok(xml) + } + + pub(super) fn build_public_key_blob_base64(&mut self) -> Result { + const MAGIC: [u8; 4] = [0x06, 0x02, 0x00, 0x00]; + const KEYTYPE: [u8; 4] = [0x00, 0xA4, 0x00, 0x00]; + const RSA1: [u8; 4] = [0x52, 0x53, 0x41, 0x31]; + const BITLEN_2048: [u8; 4] = [0x00, 0x08, 0x00, 0x00]; + + let state = self.ensure_key_exchange_state()?; + let public_key = state.private_key.to_public_key(); + + let exponent_be_raw = public_key.e().to_bytes_be(); + if exponent_be_raw.is_empty() || exponent_be_raw.len() > 4 { + return Err(PwshCoreError::InternalError(format!( + "unexpected RSA exponent length: {} bytes", + exponent_be_raw.len() + ))); + } + let mut exponent_be_padded = [0u8; 4]; + exponent_be_padded[4 - exponent_be_raw.len()..].copy_from_slice(&exponent_be_raw); + let exponent_u32 = u32::from_be_bytes(exponent_be_padded); + let exponent_le_u32_bytes = exponent_u32.to_le_bytes(); + + let mut modulus_be = public_key.n().to_bytes_be(); + if modulus_be.len() > 256 { + return Err(PwshCoreError::InternalError(format!( + "RSA modulus too large: {} bytes", + modulus_be.len() + ))); + } + if modulus_be.len() < 256 { + let mut padded = vec![0u8; 256 - modulus_be.len()]; + padded.extend_from_slice(&modulus_be); + modulus_be = padded; + } + let modulus_le_bytes = modulus_be.into_iter().rev().collect::>(); + + let mut blob = Vec::with_capacity(4 + 4 + 4 + 4 + 4 + 256); + blob.extend_from_slice(&MAGIC); + blob.extend_from_slice(&KEYTYPE); + blob.extend_from_slice(&RSA1); + blob.extend_from_slice(&BITLEN_2048); + blob.extend_from_slice(&exponent_le_u32_bytes); + blob.extend_from_slice(&modulus_le_bytes); + + Ok(base64::engine::general_purpose::STANDARD.encode(blob)) + } + + /// Create, populate, and invoke a pipeline in one operation + pub(crate) fn invoke_spec( + &mut self, + uuid: Uuid, + spec: PipelineSpec, + ) -> Result { + // 1) Create the pipeline + let handle = self.init_pipeline(uuid)?; + + // 2) Add all commands from the spec + for cmd in spec.commands { + self.add_command(&handle, cmd)?; + } + + // 3) Invoke the pipeline using existing logic + self.invoke_pipeline_request(&handle) + } +} From 1aa31fa05cd0fd32f8ca2758c4d62aba58f5c3af Mon Sep 17 00:00:00 2001 From: Junyi Ou Date: Tue, 23 Jun 2026 14:07:32 -0400 Subject: [PATCH 3/3] refactor(runspace_pool): group inbound message handling into incoming.rs Split the inbound server-response / PSRP message-handling methods (accept_response, handle_pwsh_responses + its per-message handlers, handle_session_capability, handle_application_private_data, handle_pipeline_host_call/output, accept_disconnect/ reconnect_response, fault helpers) out of pool.rs into a sibling incoming.rs via a split impl RunspacePool block. Mirrors requests.rs (outbound); groundwork for a future event-driven on_message API. pool.rs ~1800 -> ~660 lines; no visibility widening; behavior-identical. --- .../src/runspace_pool/incoming.rs | 1078 +++++++++++++++++ .../src/runspace_pool/mod.rs | 1 + .../src/runspace_pool/pool.rs | 1058 +--------------- 3 files changed, 1084 insertions(+), 1053 deletions(-) create mode 100644 crates/ironposh-client-core/src/runspace_pool/incoming.rs diff --git a/crates/ironposh-client-core/src/runspace_pool/incoming.rs b/crates/ironposh-client-core/src/runspace_pool/incoming.rs new file mode 100644 index 0000000..01c0168 --- /dev/null +++ b/crates/ironposh-client-core/src/runspace_pool/incoming.rs @@ -0,0 +1,1078 @@ +//! Inbound server-response / PSRP message handling for [`RunspacePool`]. +//! +//! This file is a continuation of the `impl RunspacePool` block that lives in +//! [`super::pool`]. It groups the methods that accept and dispatch inbound +//! server traffic: the WSMan Disconnect/Reconnect responses, the SOAP-fault +//! helpers, the main `accept_response` entry point, and the per-message PSRP +//! handlers it fans out to (`handle_pwsh_responses` and friends). This is +//! groundwork for a future event-driven `on_message` API. +//! +//! This is a pure file-organization split: the methods here are behavior- and +//! signature-identical to their previous definitions in `pool.rs`. + +use std::collections::HashSet; + +use base64::Engine; +use ironposh_psrp::{ + ApplicationPrivateData, ErrorRecord, PipelineOutput, PsValue, RunspacePoolStateMessage, + SessionCapability, fragmentation, +}; +use ironposh_winrm::{soap::SoapEnvelope, ws_management::WsAction}; +use ironposh_xml::parser::XmlDeserialize; +use rsa::pkcs1v15::Pkcs1v15Encrypt; +use tracing::{debug, error, info, instrument, trace, warn}; +use uuid::Uuid; + +use crate::{ + PwshCoreError, host::HostCall, powershell::PipelineHandle, runspace::win_rs::WinRunspace, + runspace_pool::PsInvocationState, +}; + +use super::enums::RunspacePoolState; +use super::pool::{AcceptResponsResult, DesiredStream, RunspacePool}; + +impl RunspacePool { + /// Accept the server's DisconnectResponse. + /// Valid only in `Disconnecting` state; transitions the pool to `Disconnected`. + #[instrument(skip(self, soap_envelope), fields(envelope_length = soap_envelope.len()))] + pub fn accept_disconnect_response( + &mut self, + soap_envelope: &str, + ) -> Result<(), crate::PwshCoreError> { + if self.state != RunspacePoolState::Disconnecting { + return Err(crate::PwshCoreError::InvalidState( + "RunspacePool must be in Disconnecting state to accept a disconnect response", + )); + } + + let parsed = ironposh_xml::parser::parse(soap_envelope)?; + let soap_envelope = SoapEnvelope::from_node(parsed.root_element()) + .map_err(crate::PwshCoreError::XmlParsingError)?; + + Self::fault_to_error(&soap_envelope)?; + + // Real Windows servers answer shell Disconnect with an empty Body and + // identify the operation via the `a:Action` header only; the + // documented `rsp:DisconnectResponse` body element is accepted too. + if soap_envelope.body.as_ref().disconnect_response.is_none() + && !Self::header_action_is(&soap_envelope, &WsAction::DisconnectResponse) + { + return Err(crate::PwshCoreError::InvalidResponse( + "No DisconnectResponse found in response".into(), + )); + } + + self.state = RunspacePoolState::Disconnected; + info!(runspace_pool_id = %self.id, "runspace pool disconnected"); + Ok(()) + } + + /// Accept the server's ReconnectResponse. + /// Valid only in `Connecting` state; transitions the pool back to `Opened`. + /// The caller is responsible for resuming the receive loop afterwards. + #[instrument(skip(self, soap_envelope), fields(envelope_length = soap_envelope.len()))] + pub fn accept_reconnect_response( + &mut self, + soap_envelope: &str, + ) -> Result<(), crate::PwshCoreError> { + if self.state != RunspacePoolState::Connecting { + return Err(crate::PwshCoreError::InvalidState( + "RunspacePool must be in Connecting state to accept a reconnect response", + )); + } + + let parsed = ironposh_xml::parser::parse(soap_envelope)?; + let soap_envelope = SoapEnvelope::from_node(parsed.root_element()) + .map_err(crate::PwshCoreError::XmlParsingError)?; + + Self::fault_to_error(&soap_envelope)?; + + // Real Windows servers answer shell Reconnect with an empty Body and + // identify the operation via the `a:Action` header only; the + // documented `rsp:ReconnectResponse` body element is accepted too. + if soap_envelope.body.as_ref().reconnect_response.is_none() + && !Self::header_action_is(&soap_envelope, &WsAction::ReconnectResponse) + { + return Err(crate::PwshCoreError::InvalidResponse( + "No ReconnectResponse found in response".into(), + )); + } + + self.state = RunspacePoolState::Opened; + // The Receive that was in flight before the disconnect is gone; the caller + // must fire a fresh pool-stream Receive to resume the receive loop. + self.desired_stream_is_pooling = false; + info!(runspace_pool_id = %self.id, "runspace pool reconnected"); + Ok(()) + } + + /// Whether the envelope's `a:Action` header equals the given WSMan action. + fn header_action_is(soap_envelope: &SoapEnvelope<'_>, action: &WsAction) -> bool { + soap_envelope + .header + .as_ref() + .and_then(|header| header.as_ref().action.as_ref()) + .is_some_and(|tag| tag.as_ref().as_ref() == action.as_str()) + } + + /// Surface a WSMan SOAP fault as a `SoapFault` error. + pub(super) fn fault_to_error( + soap_envelope: &SoapEnvelope<'_>, + ) -> Result<(), crate::PwshCoreError> { + if let Some(fault_tag) = soap_envelope.body.as_ref().fault.as_ref() { + let fault = fault_tag.as_ref(); + let code = fault + .code + .as_ref() + .and_then(|c| c.as_ref().value.as_ref()) + .map_or("unknown", |v| <&str>::from(v.as_ref())) + .to_string(); + let reason = fault.reason_text().unwrap_or("unknown").to_string(); + error!(target: "accept_response", %code, %reason, "received SOAP fault"); + return Err(PwshCoreError::SoapFault { code, reason }); + } + Ok(()) + } + + #[expect(clippy::too_many_lines)] + #[instrument(skip(self, soap_envelope), fields(envelope_length = soap_envelope.len()))] + pub(crate) fn accept_response( + &mut self, + soap_envelope: &str, + ) -> Result, crate::PwshCoreError> { + debug!(target: "soap", "parsing SOAP envelope"); + + let parsed = ironposh_xml::parser::parse(soap_envelope).map_err(|e| { + error!(target: "xml", error = %e, xml = soap_envelope, "failed to parse XML"); + e + })?; + + let soap_envelope = SoapEnvelope::from_node(parsed.root_element()).map_err(|e| { + error!(target: "soap", error = %e, "failed to parse SOAP envelope"); + crate::PwshCoreError::XmlParsingError(e) + })?; + + let mut result = Vec::new(); + + if soap_envelope.body.as_ref().receive_response.is_some() { + debug!(target: "receive", "processing receive response"); + + let (streams, command_state) = WinRunspace::accept_receive_response(&soap_envelope) + .map_err(|e| { + error!(target: "receive", error = %e, "failed to accept receive response"); + e + })?; + + let streams_ids = streams + .iter() + .filter_map(|stream| stream.command_id().copied()) + .collect::>(); + + let is_there_a_stream_has_no_command_id = + streams.iter().any(|stream| stream.command_id().is_none()); + + if is_there_a_stream_has_no_command_id { + debug!( + target: "receive", + "stream without command_id found, should be runspace pool stream" + ); + self.desired_stream_is_pooling = false; + } + + debug!( + target: "receive", + stream_count = streams.len(), + stream_command_ids = ?streams_ids, + "processing streams" + ); + + let handle_results = self.handle_pwsh_responses(streams).map_err(|e| { + error!(target: "pwsh", error = %e, "failed to handle PowerShell responses"); + e + })?; + + let already_scheduled_receive = handle_results + .iter() + .any(|r| matches!(r, AcceptResponsResult::SendThenReceive { .. })); + + debug!( + target: "pwsh", + response_count = handle_results.len(), + already_scheduled_receive, + "handled PowerShell responses" + ); + + result.extend(handle_results); + + if let Some(command_state) = command_state + && command_state.is_done() + { + debug!( + target: "pipeline", + pipeline_id = ?command_state.command_id, + "command state done received, removing pipeline" + ); + // If command state is done, we can remove the pipeline from the pool + let pipeline = self.pipelines.remove(&command_state.command_id); + if pipeline.is_some() { + result.push(AcceptResponsResult::PipelineFinished(PipelineHandle { + id: command_state.command_id, + })); + } + } + + let desired_streams = if !streams_ids.is_empty() { + // find the intersetction of streams.id and self.pipelines.keys() + let next_desired_streams = streams_ids.into_iter().filter(|stream| { + self.pipelines + .keys() + .any(|pipeline_id| pipeline_id == stream) + }); + + // keep unique stream with the same id + let mut stream_set = HashSet::new(); + + for stream in next_desired_streams { + stream_set.insert(stream); + } + + stream_set + .into_iter() + .map(|stream| DesiredStream::new("stdout", stream.into())) + .collect::>() + } else if !self.desired_stream_is_pooling { + self.desired_stream_is_pooling = true; + DesiredStream::runspace_pool_streams() + } else { + vec![] + }; + + if !already_scheduled_receive && !desired_streams.is_empty() { + result.push(AcceptResponsResult::ReceiveResponse { desired_streams }); + } + } + + if soap_envelope.body.as_ref().command_response.is_some() { + let pipeline_id = self.shell.accept_commannd_response(&soap_envelope)?; + + self.pipelines + .get_mut(&pipeline_id) + .ok_or_else(|| { + crate::PwshCoreError::InvalidResponse( + "Pipeline not found for command response".into(), + ) + })? + .set_state(PsInvocationState::Running); + + result.push(AcceptResponsResult::ReceiveResponse { + desired_streams: vec![DesiredStream::stdout_for_command(pipeline_id)], + }); + + result.push(AcceptResponsResult::PipelineCreated(PipelineHandle { + id: pipeline_id, + })); + } + + if soap_envelope.body.as_ref().signal_response.is_some() { + let pipeline_id = self.shell.accept_signal_response(&soap_envelope)?; + match pipeline_id { + None => { + // Don't know what to do with it + } + Some(id) => match self.pipelines.remove(&id) { + None => { + warn!( + target: "signal", + pipeline_id = ?id, + "received signal response for unknown pipeline" + ); + } + Some(_) => { + result.push(AcceptResponsResult::PipelineFinished(PipelineHandle { id })); + } + }, + } + } + + // Handle SOAP faults (e.g. operation timeout heartbeats) + if let Some(fault_tag) = soap_envelope.body.as_ref().fault.as_ref() { + let fault = fault_tag.as_ref(); + if fault.is_timeout() { + info!( + target: "accept_response", + "received WS-Management timeout fault (heartbeat), re-issuing Receive" + ); + // Normal heartbeat - re-issue Receive for active streams + let desired_streams = self.compute_active_desired_streams(); + if !desired_streams.is_empty() { + result.push(AcceptResponsResult::ReceiveResponse { desired_streams }); + } + } else if fault.is_invalid_selectors() { + // Common cancel race: we had a Receive(CommandId=...) in flight while the + // server already tore down the command. Treat this as non-fatal and + // stop polling pipelines so the session remains usable. + let reason = fault.reason_text().unwrap_or("unknown"); + warn!( + target: "accept_response", + reason = %reason, + pipeline_count = self.pipelines.len(), + "received WS-Management InvalidSelectors fault; dropping active pipelines and continuing" + ); + + let finished: Vec = self.pipelines.keys().copied().collect(); + self.pipelines.clear(); + + for id in finished { + result.push(AcceptResponsResult::PipelineFinished(PipelineHandle { id })); + } + + let desired_streams = self.compute_active_desired_streams(); + if !desired_streams.is_empty() { + result.push(AcceptResponsResult::ReceiveResponse { desired_streams }); + } + } else { + // Real fault - propagate as error + let code = fault + .code + .as_ref() + .and_then(|c| c.as_ref().value.as_ref()) + .map_or("unknown", |v| <&str>::from(v.as_ref())) + .to_string(); + let reason = fault.reason_text().unwrap_or("unknown").to_string(); + error!( + target: "accept_response", + %code, + %reason, + "received non-timeout SOAP fault" + ); + return Err(PwshCoreError::SoapFault { code, reason }); + } + } + + debug!( + target: "accept_response", + result_count = result.len(), + result_types = ?result.iter().map(std::mem::discriminant).collect::>(), + "accept response results" + ); + + Ok(result) + } + + /// Fire create pipeline for a specific pipeline handle (used by service API) + #[expect(clippy::too_many_lines)] + #[instrument(skip(self, responses))] + fn handle_pwsh_responses( + &mut self, + responses: Vec, + ) -> Result, crate::PwshCoreError> { + let mut result = Vec::new(); + + for (stream_index, stream) in responses.into_iter().enumerate() { + debug!( + target: "stream", + stream_index, + stream_name = ?stream.name(), + pipeline_id = ?stream.command_id(), + "processing stream" + ); + + let messages = match self.defragmenter.defragment(stream.value()).map_err(|e| { + error!(target: "defragment", stream_index, error = %e, "failed to defragment stream"); + e + })? { + fragmentation::DefragmentResult::Incomplete => { + debug!(target: "defragment", stream_index, "stream incomplete, continuing"); + continue; + } + fragmentation::DefragmentResult::Complete(power_shell_remoting_messages) => { + debug!( + target: "defragment", + stream_index, + message_count = power_shell_remoting_messages.len(), + "stream complete" + ); + power_shell_remoting_messages + } + }; + + for (msg_index, message) in messages.into_iter().enumerate() { + let ps_value = message.parse_ps_message().map_err(|e| { + error!( + target: "ps_message", + stream_index, + ?message, + error = %e, + "failed to parse PS message" + ); + e + })?; + + info!( + target: "ps_message", + message_type = ?message.message_type, + stream_index, + msg_index, + "parsed PS message" + ); + + match message.message_type { + ironposh_psrp::MessageType::PublicKeyRequest => { + debug!( + target: "key_exchange", + stream_name = ?stream.name(), + command_id = ?stream.command_id(), + "handling PublicKeyRequest message" + ); + + // Validate the payload (best-effort). + if let Err(e) = ironposh_psrp::PublicKeyRequest::try_from(ps_value.clone()) + { + warn!( + target: "key_exchange", + error = %e, + payload = ?ps_value, + "unexpected PublicKeyRequest payload" + ); + } + + let public_key_b64 = self.build_public_key_blob_base64()?; + let public_key_msg = ironposh_psrp::PublicKey { + public_key: public_key_b64, + }; + let send_xml = self.send_runspace_pool_message(&public_key_msg)?; + + result.push(AcceptResponsResult::SendThenReceive { + send_xml, + desired_streams: DesiredStream::runspace_pool_streams(), + }); + } + ironposh_psrp::MessageType::EncryptedSessionKey => { + debug!( + target: "key_exchange", + stream_name = ?stream.name(), + command_id = ?stream.command_id(), + "handling EncryptedSessionKey message" + ); + + let PsValue::Object(obj) = ps_value else { + return Err(crate::PwshCoreError::InvalidResponse( + "Expected EncryptedSessionKey as PsValue::Object".into(), + )); + }; + + let encrypted = ironposh_psrp::EncryptedSessionKey::try_from(obj)?; + let decoded = base64::engine::general_purpose::STANDARD + .decode(encrypted.encrypted_session_key) + .map_err(|e| { + crate::PwshCoreError::InvalidResponse( + format!("Invalid base64 EncryptedSessionKey: {e}").into(), + ) + })?; + + if decoded.len() < 12 + 256 { + return Err(crate::PwshCoreError::InvalidResponse( + format!( + "EncryptedSessionKey blob too short: {} bytes", + decoded.len() + ) + .into(), + )); + } + + let encrypted_bytes = &decoded[12..12 + 256]; + let state = self.ensure_key_exchange_state()?; + + let decrypted = state + .private_key + .decrypt(Pkcs1v15Encrypt, encrypted_bytes) + .or_else(|e| { + // Some stacks may provide a representation that requires reversing. + // Try best-effort before failing hard. + let mut reversed = encrypted_bytes.to_vec(); + reversed.reverse(); + state + .private_key + .decrypt(Pkcs1v15Encrypt, &reversed) + .map_err(|_e2| e) + }) + .map_err(|e| { + crate::PwshCoreError::InternalError(format!( + "failed to decrypt EncryptedSessionKey: {e}" + )) + })?; + + if decrypted.len() != 32 { + return Err(crate::PwshCoreError::InvalidResponse( + format!( + "Unexpected decrypted PSRP session key length: {} bytes", + decrypted.len() + ) + .into(), + )); + } + + info!( + target: "key_exchange", + session_key_len = decrypted.len(), + "stored decrypted PSRP session key" + ); + state.session_key = Some(decrypted); + + self.psrp_key_exchange_pending = false; + while let Some(host_call) = self.pending_host_calls.pop_front() { + debug!( + target: "key_exchange", + host_call = ?host_call, + "releasing deferred host call after key exchange" + ); + result.push(AcceptResponsResult::HostCall(host_call)); + } + } + ironposh_psrp::MessageType::SessionCapability => { + debug!(target: "session", "handling SessionCapability message"); + self.handle_session_capability(ps_value).map_err(|e| { + error!(target: "session", error = %e, "failed to handle SessionCapability"); + e + })?; + } + ironposh_psrp::MessageType::ApplicationPrivateData => { + debug!(target: "session", "handling ApplicationPrivateData message"); + self.handle_application_private_data(ps_value) + .map_err(|e| { + error!(target: "session", error = %e, "failed to handle ApplicationPrivateData"); + e + })?; + } + ironposh_psrp::MessageType::RunspacepoolState => { + debug!(target: "runspace", "handling RunspacepoolState message"); + self.handle_runspacepool_state(ps_value).map_err(|e| { + error!(target: "runspace", error = %e, "failed to handle RunspacepoolState"); + e + })?; + } + ironposh_psrp::MessageType::ProgressRecord => { + debug!( + target: "progress", + stream_name = ?stream.name(), + command_id = ?stream.command_id(), + "handling ProgressRecord message" + ); + let record = + self.handle_progress_record(ps_value, stream.name(), stream.command_id()) + .map_err(|e| { + error!(target: "progress", error = %e, "failed to handle ProgressRecord"); + e + })?; + + let cmd = *stream.command_id().ok_or_else(|| { + crate::PwshCoreError::InvalidResponse( + "ProgressRecord message must have a command_id".into(), + ) + })?; + let message_type = message.message_type.clone(); + let message_type_value = message_type.value(); + result.push(AcceptResponsResult::PipelineRecord { + record: crate::psrp_record::PsrpRecord::Progress { + meta: crate::psrp_record::PsrpRecordMeta { + message_type, + message_type_value, + stream: stream.name().to_string(), + command_id: Some(cmd), + data_len: message.data.len(), + }, + record, + }, + handle: PipelineHandle { id: cmd }, + }); + } + ironposh_psrp::MessageType::InformationRecord => { + debug!( + target: "information", + stream_name = ?stream.name(), + command_id = ?stream.command_id(), + "handling InformationRecord message" + ); + let Some(cmd) = stream.command_id().copied() else { + warn!( + target: "ps_message", + message_type = ?message.message_type, + message_type_value = message.message_type.value(), + stream = %stream.name(), + command_id = ?stream.command_id(), + "InformationRecord message missing command_id; ignoring" + ); + continue; + }; + + let record = self + .handle_information_record(ps_value, stream.name(), &cmd) + .map_err(|e| { + error!( + target: "information", + error = %e, + "failed to handle InformationRecord" + ); + e + })?; + let message_type = message.message_type.clone(); + let message_type_value = message_type.value(); + result.push(AcceptResponsResult::PipelineRecord { + record: crate::psrp_record::PsrpRecord::Information { + meta: crate::psrp_record::PsrpRecordMeta { + message_type, + message_type_value, + stream: stream.name().to_string(), + command_id: Some(cmd), + data_len: message.data.len(), + }, + record, + }, + handle: PipelineHandle { id: cmd }, + }); + } + ironposh_psrp::MessageType::DebugRecord + | ironposh_psrp::MessageType::VerboseRecord + | ironposh_psrp::MessageType::WarningRecord => { + let Some(cmd) = stream.command_id().copied() else { + warn!( + target: "ps_message", + message_type = ?message.message_type, + message_type_value = message.message_type.value(), + stream = %stream.name(), + command_id = ?stream.command_id(), + "record message missing command_id; ignoring" + ); + continue; + }; + + let msg = ps_value.as_string().unwrap_or_else(|| ps_value.to_string()); + + let message_type = message.message_type.clone(); + let message_type_value = message_type.value(); + let meta = crate::psrp_record::PsrpRecordMeta { + message_type: message_type.clone(), + message_type_value, + stream: stream.name().to_string(), + command_id: Some(cmd), + data_len: message.data.len(), + }; + + let record = match message_type { + ironposh_psrp::MessageType::DebugRecord => { + crate::psrp_record::PsrpRecord::Debug { meta, message: msg } + } + ironposh_psrp::MessageType::VerboseRecord => { + crate::psrp_record::PsrpRecord::Verbose { meta, message: msg } + } + ironposh_psrp::MessageType::WarningRecord => { + crate::psrp_record::PsrpRecord::Warning { meta, message: msg } + } + _ => unreachable!("guarded by match arm"), + }; + + result.push(AcceptResponsResult::PipelineRecord { + record, + handle: PipelineHandle { id: cmd }, + }); + } + ironposh_psrp::MessageType::PipelineState => { + debug!( + target: "pipeline", + stream_name = ?stream.name(), + command_id = ?stream.command_id(), + "handling PipelineState message" + ); + self.handle_pipeline_state(ps_value, stream.name(), stream.command_id()) + .map_err(|e| { + error!(target: "pipeline", error = %e, "failed to handle PipelineState"); + e + })?; + } + ironposh_psrp::MessageType::PipelineHostCall => { + debug!( + target: "host_call", + stream_name = ?stream.name(), + pipeline_id = ?stream.command_id(), + "handling PipelineHostCall message" + ); + + let host_call = self + .handle_pipeline_host_call(ps_value, stream.name(), stream.command_id()) + .map_err(|e| { + error!(target: "host_call", error = %e, "failed to handle PipelineHostCall"); + e + })?; + debug!(target: "host_call", host_call = ?host_call, "successfully created host call"); + + let needs_session_key = super::host_call::needs_session_key(&host_call); + + let has_session_key = self + .key_exchange + .as_ref() + .and_then(|s| s.session_key.as_ref()) + .is_some(); + + if needs_session_key && !has_session_key { + info!( + target: "key_exchange", + host_call_method = host_call.method_name(), + "deferring host call until PSRP session key is established" + ); + self.pending_host_calls.push_back(host_call); + + if !self.psrp_key_exchange_pending { + self.psrp_key_exchange_pending = true; + + info!( + target: "key_exchange", + "starting client-initiated PSRP key exchange" + ); + let public_key_b64 = self.build_public_key_blob_base64()?; + let public_key_msg = ironposh_psrp::PublicKey { + public_key: public_key_b64, + }; + let send_xml = self.send_runspace_pool_message(&public_key_msg)?; + result.push(AcceptResponsResult::SendThenReceive { + send_xml, + desired_streams: DesiredStream::runspace_pool_streams(), + }); + } + } else { + result.push(AcceptResponsResult::HostCall(host_call)); + } + } + ironposh_psrp::MessageType::PipelineOutput => { + debug!( + target: "pipeline_output", + stream_name = ?stream.name(), + command_id = ?stream.command_id(), + "handling PipelineOutput message" + ); + + let output = self.handle_pipeline_output(ps_value)?; + + debug!(target: "pipeline_output", output = ?output, "successfully handled PipelineOutput"); + result.push(AcceptResponsResult::PipelineOutput { + output, + handle: PipelineHandle { + id: *stream.command_id().ok_or_else(|| { + crate::PwshCoreError::InvalidResponse( + "PipelineOutput message must have a command_id".into(), + ) + })?, + }, + }); + } + ironposh_psrp::MessageType::ErrorRecord => { + debug!( + target: "error_record", + stream_name = ?stream.name(), + command_id = ?stream.command_id(), + "handling ErrorRecord message" + ); + + let PsValue::Object(complex_object) = ps_value else { + return Err(crate::PwshCoreError::InvalidResponse( + "Expected ErrorRecord as PsValue::Object".into(), + )); + }; + + let error_record = ErrorRecord::try_from(complex_object).map_err(|e| { + error!(target: "error_record", error = %e, "failed to parse ErrorRecord"); + e + })?; + + debug!(target: "error_record", error_record = ?error_record, "successfully parsed ErrorRecord"); + result.push(AcceptResponsResult::ErrorRecord { + error_record, + handle: PipelineHandle { + id: *stream.command_id().ok_or_else(|| { + crate::PwshCoreError::InvalidResponse( + "ErrorRecord message must have a command_id".into(), + ) + })?, + }, + }); + } + _ => { + let data_len = message.data.len(); + let data_preview = String::from_utf8_lossy( + &message.data[..std::cmp::min(message.data.len(), 512)], + ); + error!( + target: "ps_message", + message_type = ?message.message_type, + message_type_value = message.message_type.value(), + stream = %stream.name(), + command_id = ?stream.command_id(), + data_len, + data_preview = %data_preview, + "received message type but no handler implemented" + ); + + let Some(cmd) = stream.command_id().copied() else { + // No pipeline to attach to; log only (do not crash the session). + continue; + }; + let message_type = message.message_type.clone(); + let message_type_value = message_type.value(); + + result.push(AcceptResponsResult::PipelineRecord { + record: crate::psrp_record::PsrpRecord::Unsupported { + meta: crate::psrp_record::PsrpRecordMeta { + message_type, + message_type_value, + stream: stream.name().to_string(), + command_id: Some(cmd), + data_len, + }, + data_preview: data_preview.to_string(), + }, + handle: PipelineHandle { id: cmd }, + }); + } + } + } + } + + info!( + target: "pwsh_responses", + result_count = result.len(), + "processed PowerShell responses" + ); + Ok(result) + } + + #[instrument(skip(self, session_capability), fields(protocol_version = tracing::field::Empty, ps_version = tracing::field::Empty))] + pub(super) fn handle_session_capability( + &mut self, + session_capability: PsValue, + ) -> Result<(), crate::PwshCoreError> { + let PsValue::Object(session_capability) = session_capability else { + return Err(PwshCoreError::InvalidResponse( + "Expected SessionCapability as PsValue::Object".into(), + )); + }; + + let session_capability = SessionCapability::try_from(session_capability)?; + + debug!( + target: "session", + capability = ?session_capability, + "received SessionCapability" + ); + self.session_capability = Some(session_capability); + Ok(()) + } + + #[instrument(skip(self, app_data))] + pub(super) fn handle_application_private_data( + &mut self, + app_data: PsValue, + ) -> Result<(), crate::PwshCoreError> { + let PsValue::Object(app_data) = app_data else { + return Err(PwshCoreError::InvalidResponse( + "Expected ApplicationPrivateData as PsValue::Object".into(), + )); + }; + + let app_data = ApplicationPrivateData::try_from(app_data)?; + trace!(target: "session", app_data = ?app_data, "received ApplicationPrivateData"); + self.application_private_data = Some(app_data); + Ok(()) + } + + #[instrument(skip(self, ps_value), fields(runspace_state = tracing::field::Empty))] + fn handle_runspacepool_state(&mut self, ps_value: PsValue) -> Result<(), crate::PwshCoreError> { + let PsValue::Object(runspacepool_state) = ps_value else { + return Err(PwshCoreError::InvalidResponse( + "Expected RunspacepoolState as PsValue::Object".into(), + )); + }; + + let runspacepool_state = RunspacePoolStateMessage::try_from(runspacepool_state)?; + + // Record the state in the span + let span = tracing::Span::current(); + span.record( + "runspace_state", + format!("{:?}", runspacepool_state.runspace_state), + ); + + trace!(target: "runspace", state = ?runspacepool_state, "received RunspacePoolState"); + + self.state = RunspacePoolState::from(&runspacepool_state.runspace_state); + + Ok(()) + } + + #[instrument(skip(self, ps_value), fields(stream_name, command_id = ?command_id))] + fn handle_progress_record( + &mut self, + ps_value: PsValue, + stream_name: &str, + command_id: Option<&Uuid>, + ) -> Result { + let PsValue::Object(progress_record) = ps_value else { + return Err(PwshCoreError::InvalidResponse( + "Expected ProgressRecord as PsValue::Object".into(), + )); + }; + + let progress_record = ironposh_psrp::ProgressRecord::try_from(progress_record)?; + + // Question: Can we have a Optional command id here? + let Some(command_id) = command_id else { + return Err(PwshCoreError::InvalidResponse( + "Expected command_id to be Some".into(), + )); + }; + + trace!( + target: "progress", + progress_record = ?progress_record, + stream_name = stream_name, + command_id = ?command_id, + "received ProgressRecord" + ); + + // Find the pipeline by command_id + let pipeline = self.pipelines.get_mut(command_id).ok_or_else(|| { + PwshCoreError::InvalidResponse("Pipeline not found for command_id".into()) + })?; + + pipeline.add_progress_record(progress_record.clone()); + + Ok(progress_record) + } + + #[instrument(skip(self, ps_value, stream_name, command_id))] + fn handle_information_record( + &mut self, + ps_value: PsValue, + stream_name: &str, + command_id: &Uuid, + ) -> Result { + let (info_record_obj, lossy_fallback_str) = match ps_value { + PsValue::Object(obj) => { + let fallback = obj + .to_string + .clone() + .or_else(|| { + obj.properties + .get("MessageData") + .or_else(|| obj.properties.get("InformationalRecord_Message")) + .map(ToString::to_string) + }) + .unwrap_or_else(|| "".to_string()); + (obj, fallback) + } + other @ PsValue::Primitive(_) => { + warn!( + target: "information", + stream_name = stream_name, + command_id = %command_id, + "InformationRecord payload was not an object; using lossy string" + ); + return Ok(ironposh_psrp::InformationRecord::builder() + .message_data(ironposh_psrp::InformationMessageData::String( + other.to_string(), + )) + .build()); + } + }; + + let info_record = match ironposh_psrp::InformationRecord::try_from(info_record_obj) { + Ok(info_record) => info_record, + Err(e) => { + // `Write-Information -MessageData` is typed as `object` and does not always serialize + // as a primitive string. Keep the session alive and fall back to a best-effort + // string representation. + warn!( + target: "information", + error = %e, + stream_name = stream_name, + command_id = %command_id, + "failed to parse InformationRecord; using lossy message_data" + ); + ironposh_psrp::InformationRecord::builder() + .message_data(ironposh_psrp::InformationMessageData::String( + lossy_fallback_str, + )) + .build() + } + }; + trace!( + ?info_record, + stream_name = stream_name, + command_id = %command_id, + "Received InformationRecord" + ); + + // Find the pipeline by command_id + let pipeline = self.pipelines.get_mut(command_id).ok_or_else(|| { + PwshCoreError::InvalidResponse("Pipeline not found for command_id".into()) + })?; + + pipeline.add_information_record(info_record.clone()); + + Ok(info_record) + } + + #[instrument(skip(self, ps_value, stream_name, command_id))] + fn handle_pipeline_state( + &mut self, + ps_value: PsValue, + stream_name: &str, + command_id: Option<&Uuid>, + ) -> Result<(), crate::PwshCoreError> { + let PsValue::Object(pipeline_state) = ps_value else { + return Err(PwshCoreError::InvalidResponse( + "Expected PipelineState as PsValue::Object".into(), + )); + }; + + let pipeline_state = ironposh_psrp::PipelineStateMessage::try_from(pipeline_state)?; + trace!( + ?pipeline_state, + stream_name = stream_name, + command_id = ?command_id, + "Received PipelineState" + ); + // Question: Can we have a Optional command id here? + let Some(command_id) = command_id else { + return Err(PwshCoreError::InvalidResponse( + "Expected command_id to be Some".into(), + )); + }; + + // Find the pipeline by command_id + let pipeline = self.pipelines.get_mut(command_id).ok_or_else(|| { + PwshCoreError::InvalidResponse("Pipeline not found for command_id".into()) + })?; + // Update the pipeline state + pipeline.set_state(PsInvocationState::from(pipeline_state.pipeline_state)); + + Ok(()) + } + + #[instrument(skip_all)] + pub fn handle_pipeline_host_call( + &mut self, + ps_value: PsValue, + stream_name: &str, + command_id: Option<&Uuid>, + ) -> Result { + super::host_call::pipeline_host_call_from(ps_value, stream_name, command_id) + } + + pub fn handle_pipeline_output( + &mut self, + ps_value: PsValue, + ) -> Result { + let pipeline_output = PipelineOutput::from(ps_value); + + Ok(pipeline_output) + } +} diff --git a/crates/ironposh-client-core/src/runspace_pool/mod.rs b/crates/ironposh-client-core/src/runspace_pool/mod.rs index 74b2acb..0297d5a 100644 --- a/crates/ironposh-client-core/src/runspace_pool/mod.rs +++ b/crates/ironposh-client-core/src/runspace_pool/mod.rs @@ -4,6 +4,7 @@ pub mod enums; pub mod expect_shell_connected; pub mod expect_shell_created; mod host_call; +mod incoming; pub mod pool; mod requests; pub mod types; diff --git a/crates/ironposh-client-core/src/runspace_pool/pool.rs b/crates/ironposh-client-core/src/runspace_pool/pool.rs index 56a6a9b..5bfce3c 100644 --- a/crates/ironposh-client-core/src/runspace_pool/pool.rs +++ b/crates/ironposh-client-core/src/runspace_pool/pool.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, VecDeque}, sync::Arc, }; @@ -7,15 +7,11 @@ use base64::Engine; use ironposh_psrp::{ ApartmentState, ApplicationArguments, ApplicationPrivateData, ConnectRunspacePool, CreatePipeline, Defragmenter, ErrorRecord, HostInfo, InitRunspacePool, PSThreadOptions, - PipelineOutput, PsValue, RunspacePoolStateMessage, SessionCapability, fragmentation, + PipelineOutput, SessionCapability, fragmentation, }; -use ironposh_winrm::{ - soap::SoapEnvelope, - ws_management::{OptionSetValue, WsAction, WsMan}, -}; -use ironposh_xml::parser::XmlDeserialize; -use rsa::{RsaPrivateKey, pkcs1v15::Pkcs1v15Encrypt}; -use tracing::{debug, error, info, instrument, trace, warn}; +use ironposh_winrm::ws_management::{OptionSetValue, WsMan}; +use rsa::RsaPrivateKey; +use tracing::{debug, info, instrument, trace, warn}; use uuid::Uuid; @@ -295,41 +291,6 @@ impl RunspacePool { self.application_private_data.as_ref() } - /// Accept the server's DisconnectResponse. - /// Valid only in `Disconnecting` state; transitions the pool to `Disconnected`. - #[instrument(skip(self, soap_envelope), fields(envelope_length = soap_envelope.len()))] - pub fn accept_disconnect_response( - &mut self, - soap_envelope: &str, - ) -> Result<(), crate::PwshCoreError> { - if self.state != RunspacePoolState::Disconnecting { - return Err(crate::PwshCoreError::InvalidState( - "RunspacePool must be in Disconnecting state to accept a disconnect response", - )); - } - - let parsed = ironposh_xml::parser::parse(soap_envelope)?; - let soap_envelope = SoapEnvelope::from_node(parsed.root_element()) - .map_err(crate::PwshCoreError::XmlParsingError)?; - - Self::fault_to_error(&soap_envelope)?; - - // Real Windows servers answer shell Disconnect with an empty Body and - // identify the operation via the `a:Action` header only; the - // documented `rsp:DisconnectResponse` body element is accepted too. - if soap_envelope.body.as_ref().disconnect_response.is_none() - && !Self::header_action_is(&soap_envelope, &WsAction::DisconnectResponse) - { - return Err(crate::PwshCoreError::InvalidResponse( - "No DisconnectResponse found in response".into(), - )); - } - - self.state = RunspacePoolState::Disconnected; - info!(runspace_pool_id = %self.id, "runspace pool disconnected"); - Ok(()) - } - /// Abort an in-flight Disconnect after the server faulted the request. /// Valid only in `Disconnecting` state; reverts the pool to `Opened`. pub(crate) fn abort_disconnect(&mut self) { @@ -360,298 +321,6 @@ impl RunspacePool { } } - /// Accept the server's ReconnectResponse. - /// Valid only in `Connecting` state; transitions the pool back to `Opened`. - /// The caller is responsible for resuming the receive loop afterwards. - #[instrument(skip(self, soap_envelope), fields(envelope_length = soap_envelope.len()))] - pub fn accept_reconnect_response( - &mut self, - soap_envelope: &str, - ) -> Result<(), crate::PwshCoreError> { - if self.state != RunspacePoolState::Connecting { - return Err(crate::PwshCoreError::InvalidState( - "RunspacePool must be in Connecting state to accept a reconnect response", - )); - } - - let parsed = ironposh_xml::parser::parse(soap_envelope)?; - let soap_envelope = SoapEnvelope::from_node(parsed.root_element()) - .map_err(crate::PwshCoreError::XmlParsingError)?; - - Self::fault_to_error(&soap_envelope)?; - - // Real Windows servers answer shell Reconnect with an empty Body and - // identify the operation via the `a:Action` header only; the - // documented `rsp:ReconnectResponse` body element is accepted too. - if soap_envelope.body.as_ref().reconnect_response.is_none() - && !Self::header_action_is(&soap_envelope, &WsAction::ReconnectResponse) - { - return Err(crate::PwshCoreError::InvalidResponse( - "No ReconnectResponse found in response".into(), - )); - } - - self.state = RunspacePoolState::Opened; - // The Receive that was in flight before the disconnect is gone; the caller - // must fire a fresh pool-stream Receive to resume the receive loop. - self.desired_stream_is_pooling = false; - info!(runspace_pool_id = %self.id, "runspace pool reconnected"); - Ok(()) - } - - /// Whether the envelope's `a:Action` header equals the given WSMan action. - fn header_action_is(soap_envelope: &SoapEnvelope<'_>, action: &WsAction) -> bool { - soap_envelope - .header - .as_ref() - .and_then(|header| header.as_ref().action.as_ref()) - .is_some_and(|tag| tag.as_ref().as_ref() == action.as_str()) - } - - /// Surface a WSMan SOAP fault as a `SoapFault` error. - pub(super) fn fault_to_error( - soap_envelope: &SoapEnvelope<'_>, - ) -> Result<(), crate::PwshCoreError> { - if let Some(fault_tag) = soap_envelope.body.as_ref().fault.as_ref() { - let fault = fault_tag.as_ref(); - let code = fault - .code - .as_ref() - .and_then(|c| c.as_ref().value.as_ref()) - .map_or("unknown", |v| <&str>::from(v.as_ref())) - .to_string(); - let reason = fault.reason_text().unwrap_or("unknown").to_string(); - error!(target: "accept_response", %code, %reason, "received SOAP fault"); - return Err(PwshCoreError::SoapFault { code, reason }); - } - Ok(()) - } - - #[expect(clippy::too_many_lines)] - #[instrument(skip(self, soap_envelope), fields(envelope_length = soap_envelope.len()))] - pub(crate) fn accept_response( - &mut self, - soap_envelope: &str, - ) -> Result, crate::PwshCoreError> { - debug!(target: "soap", "parsing SOAP envelope"); - - let parsed = ironposh_xml::parser::parse(soap_envelope).map_err(|e| { - error!(target: "xml", error = %e, xml = soap_envelope, "failed to parse XML"); - e - })?; - - let soap_envelope = SoapEnvelope::from_node(parsed.root_element()).map_err(|e| { - error!(target: "soap", error = %e, "failed to parse SOAP envelope"); - crate::PwshCoreError::XmlParsingError(e) - })?; - - let mut result = Vec::new(); - - if soap_envelope.body.as_ref().receive_response.is_some() { - debug!(target: "receive", "processing receive response"); - - let (streams, command_state) = WinRunspace::accept_receive_response(&soap_envelope) - .map_err(|e| { - error!(target: "receive", error = %e, "failed to accept receive response"); - e - })?; - - let streams_ids = streams - .iter() - .filter_map(|stream| stream.command_id().copied()) - .collect::>(); - - let is_there_a_stream_has_no_command_id = - streams.iter().any(|stream| stream.command_id().is_none()); - - if is_there_a_stream_has_no_command_id { - debug!( - target: "receive", - "stream without command_id found, should be runspace pool stream" - ); - self.desired_stream_is_pooling = false; - } - - debug!( - target: "receive", - stream_count = streams.len(), - stream_command_ids = ?streams_ids, - "processing streams" - ); - - let handle_results = self.handle_pwsh_responses(streams).map_err(|e| { - error!(target: "pwsh", error = %e, "failed to handle PowerShell responses"); - e - })?; - - let already_scheduled_receive = handle_results - .iter() - .any(|r| matches!(r, AcceptResponsResult::SendThenReceive { .. })); - - debug!( - target: "pwsh", - response_count = handle_results.len(), - already_scheduled_receive, - "handled PowerShell responses" - ); - - result.extend(handle_results); - - if let Some(command_state) = command_state - && command_state.is_done() - { - debug!( - target: "pipeline", - pipeline_id = ?command_state.command_id, - "command state done received, removing pipeline" - ); - // If command state is done, we can remove the pipeline from the pool - let pipeline = self.pipelines.remove(&command_state.command_id); - if pipeline.is_some() { - result.push(AcceptResponsResult::PipelineFinished(PipelineHandle { - id: command_state.command_id, - })); - } - } - - let desired_streams = if !streams_ids.is_empty() { - // find the intersetction of streams.id and self.pipelines.keys() - let next_desired_streams = streams_ids.into_iter().filter(|stream| { - self.pipelines - .keys() - .any(|pipeline_id| pipeline_id == stream) - }); - - // keep unique stream with the same id - let mut stream_set = HashSet::new(); - - for stream in next_desired_streams { - stream_set.insert(stream); - } - - stream_set - .into_iter() - .map(|stream| DesiredStream::new("stdout", stream.into())) - .collect::>() - } else if !self.desired_stream_is_pooling { - self.desired_stream_is_pooling = true; - DesiredStream::runspace_pool_streams() - } else { - vec![] - }; - - if !already_scheduled_receive && !desired_streams.is_empty() { - result.push(AcceptResponsResult::ReceiveResponse { desired_streams }); - } - } - - if soap_envelope.body.as_ref().command_response.is_some() { - let pipeline_id = self.shell.accept_commannd_response(&soap_envelope)?; - - self.pipelines - .get_mut(&pipeline_id) - .ok_or_else(|| { - crate::PwshCoreError::InvalidResponse( - "Pipeline not found for command response".into(), - ) - })? - .set_state(PsInvocationState::Running); - - result.push(AcceptResponsResult::ReceiveResponse { - desired_streams: vec![DesiredStream::stdout_for_command(pipeline_id)], - }); - - result.push(AcceptResponsResult::PipelineCreated(PipelineHandle { - id: pipeline_id, - })); - } - - if soap_envelope.body.as_ref().signal_response.is_some() { - let pipeline_id = self.shell.accept_signal_response(&soap_envelope)?; - match pipeline_id { - None => { - // Don't know what to do with it - } - Some(id) => match self.pipelines.remove(&id) { - None => { - warn!( - target: "signal", - pipeline_id = ?id, - "received signal response for unknown pipeline" - ); - } - Some(_) => { - result.push(AcceptResponsResult::PipelineFinished(PipelineHandle { id })); - } - }, - } - } - - // Handle SOAP faults (e.g. operation timeout heartbeats) - if let Some(fault_tag) = soap_envelope.body.as_ref().fault.as_ref() { - let fault = fault_tag.as_ref(); - if fault.is_timeout() { - info!( - target: "accept_response", - "received WS-Management timeout fault (heartbeat), re-issuing Receive" - ); - // Normal heartbeat - re-issue Receive for active streams - let desired_streams = self.compute_active_desired_streams(); - if !desired_streams.is_empty() { - result.push(AcceptResponsResult::ReceiveResponse { desired_streams }); - } - } else if fault.is_invalid_selectors() { - // Common cancel race: we had a Receive(CommandId=...) in flight while the - // server already tore down the command. Treat this as non-fatal and - // stop polling pipelines so the session remains usable. - let reason = fault.reason_text().unwrap_or("unknown"); - warn!( - target: "accept_response", - reason = %reason, - pipeline_count = self.pipelines.len(), - "received WS-Management InvalidSelectors fault; dropping active pipelines and continuing" - ); - - let finished: Vec = self.pipelines.keys().copied().collect(); - self.pipelines.clear(); - - for id in finished { - result.push(AcceptResponsResult::PipelineFinished(PipelineHandle { id })); - } - - let desired_streams = self.compute_active_desired_streams(); - if !desired_streams.is_empty() { - result.push(AcceptResponsResult::ReceiveResponse { desired_streams }); - } - } else { - // Real fault - propagate as error - let code = fault - .code - .as_ref() - .and_then(|c| c.as_ref().value.as_ref()) - .map_or("unknown", |v| <&str>::from(v.as_ref())) - .to_string(); - let reason = fault.reason_text().unwrap_or("unknown").to_string(); - error!( - target: "accept_response", - %code, - %reason, - "received non-timeout SOAP fault" - ); - return Err(PwshCoreError::SoapFault { code, reason }); - } - } - - debug!( - target: "accept_response", - result_count = result.len(), - result_types = ?result.iter().map(std::mem::discriminant).collect::>(), - "accept response results" - ); - - Ok(result) - } - /// Compute desired streams for all currently active pipelines, plus the runspace pool stream. /// Used to re-issue a Receive after a timeout heartbeat. pub(crate) fn compute_active_desired_streams(&self) -> Vec { @@ -683,704 +352,6 @@ impl RunspacePool { Ok(PipelineHandle { id: uuid }) } - /// Fire create pipeline for a specific pipeline handle (used by service API) - #[expect(clippy::too_many_lines)] - #[instrument(skip(self, responses))] - fn handle_pwsh_responses( - &mut self, - responses: Vec, - ) -> Result, crate::PwshCoreError> { - let mut result = Vec::new(); - - for (stream_index, stream) in responses.into_iter().enumerate() { - debug!( - target: "stream", - stream_index, - stream_name = ?stream.name(), - pipeline_id = ?stream.command_id(), - "processing stream" - ); - - let messages = match self.defragmenter.defragment(stream.value()).map_err(|e| { - error!(target: "defragment", stream_index, error = %e, "failed to defragment stream"); - e - })? { - fragmentation::DefragmentResult::Incomplete => { - debug!(target: "defragment", stream_index, "stream incomplete, continuing"); - continue; - } - fragmentation::DefragmentResult::Complete(power_shell_remoting_messages) => { - debug!( - target: "defragment", - stream_index, - message_count = power_shell_remoting_messages.len(), - "stream complete" - ); - power_shell_remoting_messages - } - }; - - for (msg_index, message) in messages.into_iter().enumerate() { - let ps_value = message.parse_ps_message().map_err(|e| { - error!( - target: "ps_message", - stream_index, - ?message, - error = %e, - "failed to parse PS message" - ); - e - })?; - - info!( - target: "ps_message", - message_type = ?message.message_type, - stream_index, - msg_index, - "parsed PS message" - ); - - match message.message_type { - ironposh_psrp::MessageType::PublicKeyRequest => { - debug!( - target: "key_exchange", - stream_name = ?stream.name(), - command_id = ?stream.command_id(), - "handling PublicKeyRequest message" - ); - - // Validate the payload (best-effort). - if let Err(e) = ironposh_psrp::PublicKeyRequest::try_from(ps_value.clone()) - { - warn!( - target: "key_exchange", - error = %e, - payload = ?ps_value, - "unexpected PublicKeyRequest payload" - ); - } - - let public_key_b64 = self.build_public_key_blob_base64()?; - let public_key_msg = ironposh_psrp::PublicKey { - public_key: public_key_b64, - }; - let send_xml = self.send_runspace_pool_message(&public_key_msg)?; - - result.push(AcceptResponsResult::SendThenReceive { - send_xml, - desired_streams: DesiredStream::runspace_pool_streams(), - }); - } - ironposh_psrp::MessageType::EncryptedSessionKey => { - debug!( - target: "key_exchange", - stream_name = ?stream.name(), - command_id = ?stream.command_id(), - "handling EncryptedSessionKey message" - ); - - let PsValue::Object(obj) = ps_value else { - return Err(crate::PwshCoreError::InvalidResponse( - "Expected EncryptedSessionKey as PsValue::Object".into(), - )); - }; - - let encrypted = ironposh_psrp::EncryptedSessionKey::try_from(obj)?; - let decoded = base64::engine::general_purpose::STANDARD - .decode(encrypted.encrypted_session_key) - .map_err(|e| { - crate::PwshCoreError::InvalidResponse( - format!("Invalid base64 EncryptedSessionKey: {e}").into(), - ) - })?; - - if decoded.len() < 12 + 256 { - return Err(crate::PwshCoreError::InvalidResponse( - format!( - "EncryptedSessionKey blob too short: {} bytes", - decoded.len() - ) - .into(), - )); - } - - let encrypted_bytes = &decoded[12..12 + 256]; - let state = self.ensure_key_exchange_state()?; - - let decrypted = state - .private_key - .decrypt(Pkcs1v15Encrypt, encrypted_bytes) - .or_else(|e| { - // Some stacks may provide a representation that requires reversing. - // Try best-effort before failing hard. - let mut reversed = encrypted_bytes.to_vec(); - reversed.reverse(); - state - .private_key - .decrypt(Pkcs1v15Encrypt, &reversed) - .map_err(|_e2| e) - }) - .map_err(|e| { - crate::PwshCoreError::InternalError(format!( - "failed to decrypt EncryptedSessionKey: {e}" - )) - })?; - - if decrypted.len() != 32 { - return Err(crate::PwshCoreError::InvalidResponse( - format!( - "Unexpected decrypted PSRP session key length: {} bytes", - decrypted.len() - ) - .into(), - )); - } - - info!( - target: "key_exchange", - session_key_len = decrypted.len(), - "stored decrypted PSRP session key" - ); - state.session_key = Some(decrypted); - - self.psrp_key_exchange_pending = false; - while let Some(host_call) = self.pending_host_calls.pop_front() { - debug!( - target: "key_exchange", - host_call = ?host_call, - "releasing deferred host call after key exchange" - ); - result.push(AcceptResponsResult::HostCall(host_call)); - } - } - ironposh_psrp::MessageType::SessionCapability => { - debug!(target: "session", "handling SessionCapability message"); - self.handle_session_capability(ps_value).map_err(|e| { - error!(target: "session", error = %e, "failed to handle SessionCapability"); - e - })?; - } - ironposh_psrp::MessageType::ApplicationPrivateData => { - debug!(target: "session", "handling ApplicationPrivateData message"); - self.handle_application_private_data(ps_value) - .map_err(|e| { - error!(target: "session", error = %e, "failed to handle ApplicationPrivateData"); - e - })?; - } - ironposh_psrp::MessageType::RunspacepoolState => { - debug!(target: "runspace", "handling RunspacepoolState message"); - self.handle_runspacepool_state(ps_value).map_err(|e| { - error!(target: "runspace", error = %e, "failed to handle RunspacepoolState"); - e - })?; - } - ironposh_psrp::MessageType::ProgressRecord => { - debug!( - target: "progress", - stream_name = ?stream.name(), - command_id = ?stream.command_id(), - "handling ProgressRecord message" - ); - let record = - self.handle_progress_record(ps_value, stream.name(), stream.command_id()) - .map_err(|e| { - error!(target: "progress", error = %e, "failed to handle ProgressRecord"); - e - })?; - - let cmd = *stream.command_id().ok_or_else(|| { - crate::PwshCoreError::InvalidResponse( - "ProgressRecord message must have a command_id".into(), - ) - })?; - let message_type = message.message_type.clone(); - let message_type_value = message_type.value(); - result.push(AcceptResponsResult::PipelineRecord { - record: crate::psrp_record::PsrpRecord::Progress { - meta: crate::psrp_record::PsrpRecordMeta { - message_type, - message_type_value, - stream: stream.name().to_string(), - command_id: Some(cmd), - data_len: message.data.len(), - }, - record, - }, - handle: PipelineHandle { id: cmd }, - }); - } - ironposh_psrp::MessageType::InformationRecord => { - debug!( - target: "information", - stream_name = ?stream.name(), - command_id = ?stream.command_id(), - "handling InformationRecord message" - ); - let Some(cmd) = stream.command_id().copied() else { - warn!( - target: "ps_message", - message_type = ?message.message_type, - message_type_value = message.message_type.value(), - stream = %stream.name(), - command_id = ?stream.command_id(), - "InformationRecord message missing command_id; ignoring" - ); - continue; - }; - - let record = self - .handle_information_record(ps_value, stream.name(), &cmd) - .map_err(|e| { - error!( - target: "information", - error = %e, - "failed to handle InformationRecord" - ); - e - })?; - let message_type = message.message_type.clone(); - let message_type_value = message_type.value(); - result.push(AcceptResponsResult::PipelineRecord { - record: crate::psrp_record::PsrpRecord::Information { - meta: crate::psrp_record::PsrpRecordMeta { - message_type, - message_type_value, - stream: stream.name().to_string(), - command_id: Some(cmd), - data_len: message.data.len(), - }, - record, - }, - handle: PipelineHandle { id: cmd }, - }); - } - ironposh_psrp::MessageType::DebugRecord - | ironposh_psrp::MessageType::VerboseRecord - | ironposh_psrp::MessageType::WarningRecord => { - let Some(cmd) = stream.command_id().copied() else { - warn!( - target: "ps_message", - message_type = ?message.message_type, - message_type_value = message.message_type.value(), - stream = %stream.name(), - command_id = ?stream.command_id(), - "record message missing command_id; ignoring" - ); - continue; - }; - - let msg = ps_value.as_string().unwrap_or_else(|| ps_value.to_string()); - - let message_type = message.message_type.clone(); - let message_type_value = message_type.value(); - let meta = crate::psrp_record::PsrpRecordMeta { - message_type: message_type.clone(), - message_type_value, - stream: stream.name().to_string(), - command_id: Some(cmd), - data_len: message.data.len(), - }; - - let record = match message_type { - ironposh_psrp::MessageType::DebugRecord => { - crate::psrp_record::PsrpRecord::Debug { meta, message: msg } - } - ironposh_psrp::MessageType::VerboseRecord => { - crate::psrp_record::PsrpRecord::Verbose { meta, message: msg } - } - ironposh_psrp::MessageType::WarningRecord => { - crate::psrp_record::PsrpRecord::Warning { meta, message: msg } - } - _ => unreachable!("guarded by match arm"), - }; - - result.push(AcceptResponsResult::PipelineRecord { - record, - handle: PipelineHandle { id: cmd }, - }); - } - ironposh_psrp::MessageType::PipelineState => { - debug!( - target: "pipeline", - stream_name = ?stream.name(), - command_id = ?stream.command_id(), - "handling PipelineState message" - ); - self.handle_pipeline_state(ps_value, stream.name(), stream.command_id()) - .map_err(|e| { - error!(target: "pipeline", error = %e, "failed to handle PipelineState"); - e - })?; - } - ironposh_psrp::MessageType::PipelineHostCall => { - debug!( - target: "host_call", - stream_name = ?stream.name(), - pipeline_id = ?stream.command_id(), - "handling PipelineHostCall message" - ); - - let host_call = self - .handle_pipeline_host_call(ps_value, stream.name(), stream.command_id()) - .map_err(|e| { - error!(target: "host_call", error = %e, "failed to handle PipelineHostCall"); - e - })?; - debug!(target: "host_call", host_call = ?host_call, "successfully created host call"); - - let needs_session_key = super::host_call::needs_session_key(&host_call); - - let has_session_key = self - .key_exchange - .as_ref() - .and_then(|s| s.session_key.as_ref()) - .is_some(); - - if needs_session_key && !has_session_key { - info!( - target: "key_exchange", - host_call_method = host_call.method_name(), - "deferring host call until PSRP session key is established" - ); - self.pending_host_calls.push_back(host_call); - - if !self.psrp_key_exchange_pending { - self.psrp_key_exchange_pending = true; - - info!( - target: "key_exchange", - "starting client-initiated PSRP key exchange" - ); - let public_key_b64 = self.build_public_key_blob_base64()?; - let public_key_msg = ironposh_psrp::PublicKey { - public_key: public_key_b64, - }; - let send_xml = self.send_runspace_pool_message(&public_key_msg)?; - result.push(AcceptResponsResult::SendThenReceive { - send_xml, - desired_streams: DesiredStream::runspace_pool_streams(), - }); - } - } else { - result.push(AcceptResponsResult::HostCall(host_call)); - } - } - ironposh_psrp::MessageType::PipelineOutput => { - debug!( - target: "pipeline_output", - stream_name = ?stream.name(), - command_id = ?stream.command_id(), - "handling PipelineOutput message" - ); - - let output = self.handle_pipeline_output(ps_value)?; - - debug!(target: "pipeline_output", output = ?output, "successfully handled PipelineOutput"); - result.push(AcceptResponsResult::PipelineOutput { - output, - handle: PipelineHandle { - id: *stream.command_id().ok_or_else(|| { - crate::PwshCoreError::InvalidResponse( - "PipelineOutput message must have a command_id".into(), - ) - })?, - }, - }); - } - ironposh_psrp::MessageType::ErrorRecord => { - debug!( - target: "error_record", - stream_name = ?stream.name(), - command_id = ?stream.command_id(), - "handling ErrorRecord message" - ); - - let PsValue::Object(complex_object) = ps_value else { - return Err(crate::PwshCoreError::InvalidResponse( - "Expected ErrorRecord as PsValue::Object".into(), - )); - }; - - let error_record = ErrorRecord::try_from(complex_object).map_err(|e| { - error!(target: "error_record", error = %e, "failed to parse ErrorRecord"); - e - })?; - - debug!(target: "error_record", error_record = ?error_record, "successfully parsed ErrorRecord"); - result.push(AcceptResponsResult::ErrorRecord { - error_record, - handle: PipelineHandle { - id: *stream.command_id().ok_or_else(|| { - crate::PwshCoreError::InvalidResponse( - "ErrorRecord message must have a command_id".into(), - ) - })?, - }, - }); - } - _ => { - let data_len = message.data.len(); - let data_preview = String::from_utf8_lossy( - &message.data[..std::cmp::min(message.data.len(), 512)], - ); - error!( - target: "ps_message", - message_type = ?message.message_type, - message_type_value = message.message_type.value(), - stream = %stream.name(), - command_id = ?stream.command_id(), - data_len, - data_preview = %data_preview, - "received message type but no handler implemented" - ); - - let Some(cmd) = stream.command_id().copied() else { - // No pipeline to attach to; log only (do not crash the session). - continue; - }; - let message_type = message.message_type.clone(); - let message_type_value = message_type.value(); - - result.push(AcceptResponsResult::PipelineRecord { - record: crate::psrp_record::PsrpRecord::Unsupported { - meta: crate::psrp_record::PsrpRecordMeta { - message_type, - message_type_value, - stream: stream.name().to_string(), - command_id: Some(cmd), - data_len, - }, - data_preview: data_preview.to_string(), - }, - handle: PipelineHandle { id: cmd }, - }); - } - } - } - } - - info!( - target: "pwsh_responses", - result_count = result.len(), - "processed PowerShell responses" - ); - Ok(result) - } - - #[instrument(skip(self, session_capability), fields(protocol_version = tracing::field::Empty, ps_version = tracing::field::Empty))] - pub(super) fn handle_session_capability( - &mut self, - session_capability: PsValue, - ) -> Result<(), crate::PwshCoreError> { - let PsValue::Object(session_capability) = session_capability else { - return Err(PwshCoreError::InvalidResponse( - "Expected SessionCapability as PsValue::Object".into(), - )); - }; - - let session_capability = SessionCapability::try_from(session_capability)?; - - debug!( - target: "session", - capability = ?session_capability, - "received SessionCapability" - ); - self.session_capability = Some(session_capability); - Ok(()) - } - - #[instrument(skip(self, app_data))] - pub(super) fn handle_application_private_data( - &mut self, - app_data: PsValue, - ) -> Result<(), crate::PwshCoreError> { - let PsValue::Object(app_data) = app_data else { - return Err(PwshCoreError::InvalidResponse( - "Expected ApplicationPrivateData as PsValue::Object".into(), - )); - }; - - let app_data = ApplicationPrivateData::try_from(app_data)?; - trace!(target: "session", app_data = ?app_data, "received ApplicationPrivateData"); - self.application_private_data = Some(app_data); - Ok(()) - } - - #[instrument(skip(self, ps_value), fields(runspace_state = tracing::field::Empty))] - fn handle_runspacepool_state(&mut self, ps_value: PsValue) -> Result<(), crate::PwshCoreError> { - let PsValue::Object(runspacepool_state) = ps_value else { - return Err(PwshCoreError::InvalidResponse( - "Expected RunspacepoolState as PsValue::Object".into(), - )); - }; - - let runspacepool_state = RunspacePoolStateMessage::try_from(runspacepool_state)?; - - // Record the state in the span - let span = tracing::Span::current(); - span.record( - "runspace_state", - format!("{:?}", runspacepool_state.runspace_state), - ); - - trace!(target: "runspace", state = ?runspacepool_state, "received RunspacePoolState"); - - self.state = RunspacePoolState::from(&runspacepool_state.runspace_state); - - Ok(()) - } - - #[instrument(skip(self, ps_value), fields(stream_name, command_id = ?command_id))] - fn handle_progress_record( - &mut self, - ps_value: PsValue, - stream_name: &str, - command_id: Option<&Uuid>, - ) -> Result { - let PsValue::Object(progress_record) = ps_value else { - return Err(PwshCoreError::InvalidResponse( - "Expected ProgressRecord as PsValue::Object".into(), - )); - }; - - let progress_record = ironposh_psrp::ProgressRecord::try_from(progress_record)?; - - // Question: Can we have a Optional command id here? - let Some(command_id) = command_id else { - return Err(PwshCoreError::InvalidResponse( - "Expected command_id to be Some".into(), - )); - }; - - trace!( - target: "progress", - progress_record = ?progress_record, - stream_name = stream_name, - command_id = ?command_id, - "received ProgressRecord" - ); - - // Find the pipeline by command_id - let pipeline = self.pipelines.get_mut(command_id).ok_or_else(|| { - PwshCoreError::InvalidResponse("Pipeline not found for command_id".into()) - })?; - - pipeline.add_progress_record(progress_record.clone()); - - Ok(progress_record) - } - - #[instrument(skip(self, ps_value, stream_name, command_id))] - fn handle_information_record( - &mut self, - ps_value: PsValue, - stream_name: &str, - command_id: &Uuid, - ) -> Result { - let (info_record_obj, lossy_fallback_str) = match ps_value { - PsValue::Object(obj) => { - let fallback = obj - .to_string - .clone() - .or_else(|| { - obj.properties - .get("MessageData") - .or_else(|| obj.properties.get("InformationalRecord_Message")) - .map(ToString::to_string) - }) - .unwrap_or_else(|| "".to_string()); - (obj, fallback) - } - other @ PsValue::Primitive(_) => { - warn!( - target: "information", - stream_name = stream_name, - command_id = %command_id, - "InformationRecord payload was not an object; using lossy string" - ); - return Ok(ironposh_psrp::InformationRecord::builder() - .message_data(ironposh_psrp::InformationMessageData::String( - other.to_string(), - )) - .build()); - } - }; - - let info_record = match ironposh_psrp::InformationRecord::try_from(info_record_obj) { - Ok(info_record) => info_record, - Err(e) => { - // `Write-Information -MessageData` is typed as `object` and does not always serialize - // as a primitive string. Keep the session alive and fall back to a best-effort - // string representation. - warn!( - target: "information", - error = %e, - stream_name = stream_name, - command_id = %command_id, - "failed to parse InformationRecord; using lossy message_data" - ); - ironposh_psrp::InformationRecord::builder() - .message_data(ironposh_psrp::InformationMessageData::String( - lossy_fallback_str, - )) - .build() - } - }; - trace!( - ?info_record, - stream_name = stream_name, - command_id = %command_id, - "Received InformationRecord" - ); - - // Find the pipeline by command_id - let pipeline = self.pipelines.get_mut(command_id).ok_or_else(|| { - PwshCoreError::InvalidResponse("Pipeline not found for command_id".into()) - })?; - - pipeline.add_information_record(info_record.clone()); - - Ok(info_record) - } - - #[instrument(skip(self, ps_value, stream_name, command_id))] - fn handle_pipeline_state( - &mut self, - ps_value: PsValue, - stream_name: &str, - command_id: Option<&Uuid>, - ) -> Result<(), crate::PwshCoreError> { - let PsValue::Object(pipeline_state) = ps_value else { - return Err(PwshCoreError::InvalidResponse( - "Expected PipelineState as PsValue::Object".into(), - )); - }; - - let pipeline_state = ironposh_psrp::PipelineStateMessage::try_from(pipeline_state)?; - trace!( - ?pipeline_state, - stream_name = stream_name, - command_id = ?command_id, - "Received PipelineState" - ); - // Question: Can we have a Optional command id here? - let Some(command_id) = command_id else { - return Err(PwshCoreError::InvalidResponse( - "Expected command_id to be Some".into(), - )); - }; - - // Find the pipeline by command_id - let pipeline = self.pipelines.get_mut(command_id).ok_or_else(|| { - PwshCoreError::InvalidResponse("Pipeline not found for command_id".into()) - })?; - // Update the pipeline state - pipeline.set_state(PsInvocationState::from(pipeline_state.pipeline_state)); - - Ok(()) - } - #[instrument(skip_all)] pub fn invoke_pipeline_request( &mut self, @@ -1425,16 +396,6 @@ impl RunspacePool { Ok(request.into().to_xml_string()?) } - #[instrument(skip_all)] - pub fn handle_pipeline_host_call( - &mut self, - ps_value: PsValue, - stream_name: &str, - command_id: Option<&Uuid>, - ) -> Result { - super::host_call::pipeline_host_call_from(ps_value, stream_name, command_id) - } - /// Send a runspace pool host response to the server #[instrument( skip_all, @@ -1514,15 +475,6 @@ impl RunspacePool { .expect("key exchange state initialized")) } - pub fn handle_pipeline_output( - &mut self, - ps_value: PsValue, - ) -> Result { - let pipeline_output = PipelineOutput::from(ps_value); - - Ok(pipeline_output) - } - pub(crate) fn add_command( &mut self, powershell: &PipelineHandle,