From 01e0c22f431fab810356397d0dd81805736c959e Mon Sep 17 00:00:00 2001 From: Junyi Ou Date: Tue, 23 Jun 2026 13:47:05 -0400 Subject: [PATCH 1/2] refactor(runspace_pool): extract host-call construction/classification into host_call.rs Move the pure PipelineHostCall parsing and the needs-session-key classification out of pool.rs into a new private runspace_pool::host_call module (named to avoid collision with crate::host). Fragmentation-coupled send + the pending_host_calls queue stay in the pool. Behavior-preserving. --- crates/ironposh-client-core/src/runspace_pool/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/ironposh-client-core/src/runspace_pool/mod.rs b/crates/ironposh-client-core/src/runspace_pool/mod.rs index abf5543..ce9c9b4 100644 --- a/crates/ironposh-client-core/src/runspace_pool/mod.rs +++ b/crates/ironposh-client-core/src/runspace_pool/mod.rs @@ -2,6 +2,7 @@ pub mod creator; mod crypto; pub mod enums; pub mod expect_shell_connected; +mod host_call; pub mod expect_shell_created; mod host_call; pub mod pool; From 886d91fa108114a98c7ce76654b8b6bf470d2a6d Mon Sep 17 00:00:00 2001 From: Junyi Ou Date: Tue, 23 Jun 2026 13:55:35 -0400 Subject: [PATCH 2/2] refactor(runspace_pool): group outbound WSMan request builders into requests.rs Split the eight outbound-request methods (fire_receive/disconnect/reconnect, kill_pipeline, invoke_spec, send_pipeline_host_response, send_runspace_pool_message, build_public_key_blob_base64) out of pool.rs into a sibling requests.rs via a split impl RunspacePool block. Collects all &self.connection (WsMan) usage in one place as groundwork for a future transport seam. pool.rs 2029 -> 1594 lines; behavior-identical. --- .../src/runspace_pool/mod.rs | 2 +- .../src/runspace_pool/pool.rs | 234 +--------------- .../src/runspace_pool/requests.rs | 254 ++++++++++++++++++ 3 files changed, 257 insertions(+), 233 deletions(-) create mode 100644 crates/ironposh-client-core/src/runspace_pool/requests.rs diff --git a/crates/ironposh-client-core/src/runspace_pool/mod.rs b/crates/ironposh-client-core/src/runspace_pool/mod.rs index ce9c9b4..74b2acb 100644 --- a/crates/ironposh-client-core/src/runspace_pool/mod.rs +++ b/crates/ironposh-client-core/src/runspace_pool/mod.rs @@ -2,10 +2,10 @@ pub mod creator; mod crypto; pub mod enums; pub mod expect_shell_connected; -mod host_call; pub mod expect_shell_created; mod host_call; pub mod pool; +mod requests; pub mod types; // Re-export public types diff --git a/crates/ironposh-client-core/src/runspace_pool/pool.rs b/crates/ironposh-client-core/src/runspace_pool/pool.rs index 821a6b3..56a6a9b 100644 --- a/crates/ironposh-client-core/src/runspace_pool/pool.rs +++ b/crates/ironposh-client-core/src/runspace_pool/pool.rs @@ -14,7 +14,6 @@ use ironposh_winrm::{ ws_management::{OptionSetValue, WsAction, WsMan}, }; use ironposh_xml::parser::XmlDeserialize; -use rsa::traits::PublicKeyParts; use rsa::{RsaPrivateKey, pkcs1v15::Pkcs1v15Encrypt}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -23,7 +22,7 @@ use uuid::Uuid; use crate::{ PwshCoreError, host::HostCall, - pipeline::{Pipeline, PipelineCommand, PipelineSpec}, + pipeline::{Pipeline, PipelineCommand}, powershell::PipelineHandle, runspace::win_rs::WinRunspace, runspace_pool::PsInvocationState, @@ -286,19 +285,6 @@ impl RunspacePool { )) } - // We should accept the pipeline id here, but for now let's ignore it - pub(crate) fn fire_receive( - &self, - desired_streams: Vec, - ) -> Result { - debug_assert!(!desired_streams.is_empty(), "At least one desired stream"); - Ok(self - .shell - .fire_receive(&self.connection, desired_streams) - .into() - .to_xml_string()?) - } - /// Server-assigned shell id (available after the shell has been created). pub fn shell_id(&self) -> Option<&str> { self.shell.shell_id() @@ -309,27 +295,6 @@ impl RunspacePool { self.application_private_data.as_ref() } - /// Build a Disconnect request for this pool's shell (MS-WSMV 3.1.4.13). - /// Valid only in `Opened` state; transitions the pool to `Disconnecting`. - #[instrument(skip(self))] - pub fn fire_disconnect(&mut self) -> Result { - if self.state != RunspacePoolState::Opened { - return Err(crate::PwshCoreError::InvalidState( - "RunspacePool must be in Opened state to disconnect", - )); - } - - let xml = self - .shell - .fire_disconnect(&self.connection) - .into() - .to_xml_string()?; - - self.state = RunspacePoolState::Disconnecting; - info!(runspace_pool_id = %self.id, "runspace pool disconnect requested"); - Ok(xml) - } - /// Accept the server's DisconnectResponse. /// Valid only in `Disconnecting` state; transitions the pool to `Disconnected`. #[instrument(skip(self, soap_envelope), fields(envelope_length = soap_envelope.len()))] @@ -395,27 +360,6 @@ impl RunspacePool { } } - /// Build a Reconnect request for this pool's shell (MS-WSMV 3.1.4.14). - /// Valid only in `Disconnected` state; transitions the pool to `Connecting`. - #[instrument(skip(self))] - pub fn fire_reconnect(&mut self) -> Result { - if self.state != RunspacePoolState::Disconnected { - return Err(crate::PwshCoreError::InvalidState( - "RunspacePool must be in Disconnected state to reconnect", - )); - } - - let xml = self - .shell - .fire_reconnect(&self.connection) - .into() - .to_xml_string()?; - - self.state = RunspacePoolState::Connecting; - info!(runspace_pool_id = %self.id, "runspace pool reconnect requested"); - Ok(xml) - } - /// Accept the server's ReconnectResponse. /// Valid only in `Connecting` state; transitions the pool back to `Opened`. /// The caller is responsible for resuming the receive loop afterwards. @@ -1481,34 +1425,6 @@ impl RunspacePool { Ok(request.into().to_xml_string()?) } - pub fn kill_pipeline(&mut self, handle: &PipelineHandle) -> Result { - let pipeline = self - .pipelines - .get_mut(&handle.id()) - .ok_or(PwshCoreError::InvalidState( - "Pipeline handle not found, pipeline_id", - )) - .inspect_err(|_| { - error!(pipeline_id = ?&handle.id(), "Pipeline handle not found "); - })?; - - if pipeline.is_terminal() { - return Err(PwshCoreError::InvalidState( - "Cannot kill a pipeline that is already stopped, completed, or failed", - )); - } - - // Set pipeline state to Stopping - pipeline.set_state(PsInvocationState::Stopping); - info!(pipeline_id = %handle.id(), "Killing pipeline"); - - let request = self - .shell - .terminal_pipeline_signal(&self.connection, handle.id())?; - - Ok(request.into().to_xml_string()?) - } - #[instrument(skip_all)] pub fn handle_pipeline_host_call( &mut self, @@ -1519,58 +1435,6 @@ impl RunspacePool { super::host_call::pipeline_host_call_from(ps_value, stream_name, command_id) } - /// Send a pipeline host response to the server - #[instrument( - skip_all, - fields( - command_id = %command_id, - call_id = host_response.call_id, - method = ?host_response.method - ) - )] - pub fn send_pipeline_host_response( - &mut self, - command_id: uuid::Uuid, - host_response: &ironposh_psrp::PipelineHostResponse, - ) -> Result { - let _span = tracing::trace_span!("send_pipeline_host_response").entered(); - - // Fragment the host response message - tracing::trace!(stage = "fragment"); - let fragmented = - self.fragmenter - .fragment(host_response, self.id, Some(command_id), None)?; - tracing::trace!(fragment_count = fragmented.len(), stage = "fragmented"); - - // Encode fragments as base64 - tracing::trace!(stage = "base64_encode"); - let arguments = fragmented - .into_iter() - .map(|bytes| base64::engine::general_purpose::STANDARD.encode(&bytes[..])) - .collect::>(); - tracing::trace!( - argument_count = arguments.len(), - first_len = arguments.first().map(String::len), - stage = "base64_encoded" - ); - - // Create WS-Man Send request (send data to stdin) - tracing::trace!(stage = "wsman_send_request"); - let request = - self.shell - .send_data_request(&self.connection, Some(command_id), &arguments)?; - tracing::trace!(stage = "wsman_send_request_built"); - - let element: ironposh_xml::builder::Element<'_> = request.into(); - tracing::trace!(stage = "serialize_xml"); - let xml = element.to_xml_string().map_err(|e| { - tracing::error!(error = %e, stage = "serialize_xml", "failed to serialize XML"); - e - })?; - tracing::trace!(xml_len = xml.len(), stage = "done"); - Ok(xml) - } - /// Send a runspace pool host response to the server #[instrument( skip_all, @@ -1630,37 +1494,7 @@ impl RunspacePool { Ok(xml) } - fn send_runspace_pool_message( - &mut self, - message: &dyn ironposh_psrp::PsObjectWithType, - ) -> Result { - let _span = tracing::trace_span!( - "send_runspace_pool_message", - message_type = ?message.message_type() - ) - .entered(); - - tracing::trace!(stage = "fragment"); - let fragmented = self.fragmenter.fragment(message, self.id, None, None)?; - tracing::trace!(fragment_count = fragmented.len(), stage = "fragmented"); - - tracing::trace!(stage = "base64_encode"); - let arguments = fragmented - .into_iter() - .map(|bytes| base64::engine::general_purpose::STANDARD.encode(&bytes[..])) - .collect::>(); - - tracing::trace!(stage = "wsman_send_request"); - let request = self - .shell - .send_data_request(&self.connection, None, &arguments)?; - - let element: ironposh_xml::builder::Element<'_> = request.into(); - let xml = element.to_xml_string()?; - Ok(xml) - } - - fn ensure_key_exchange_state( + pub(super) fn ensure_key_exchange_state( &mut self, ) -> Result<&mut super::crypto::KeyExchangeState, PwshCoreError> { if self.key_exchange.is_none() { @@ -1680,52 +1514,6 @@ impl RunspacePool { .expect("key exchange state initialized")) } - fn build_public_key_blob_base64(&mut self) -> Result { - const MAGIC: [u8; 4] = [0x06, 0x02, 0x00, 0x00]; - const KEYTYPE: [u8; 4] = [0x00, 0xA4, 0x00, 0x00]; - const RSA1: [u8; 4] = [0x52, 0x53, 0x41, 0x31]; - const BITLEN_2048: [u8; 4] = [0x00, 0x08, 0x00, 0x00]; - - let state = self.ensure_key_exchange_state()?; - let public_key = state.private_key.to_public_key(); - - let exponent_be_raw = public_key.e().to_bytes_be(); - if exponent_be_raw.is_empty() || exponent_be_raw.len() > 4 { - return Err(PwshCoreError::InternalError(format!( - "unexpected RSA exponent length: {} bytes", - exponent_be_raw.len() - ))); - } - let mut exponent_be_padded = [0u8; 4]; - exponent_be_padded[4 - exponent_be_raw.len()..].copy_from_slice(&exponent_be_raw); - let exponent_u32 = u32::from_be_bytes(exponent_be_padded); - let exponent_le_u32_bytes = exponent_u32.to_le_bytes(); - - let mut modulus_be = public_key.n().to_bytes_be(); - if modulus_be.len() > 256 { - return Err(PwshCoreError::InternalError(format!( - "RSA modulus too large: {} bytes", - modulus_be.len() - ))); - } - if modulus_be.len() < 256 { - let mut padded = vec![0u8; 256 - modulus_be.len()]; - padded.extend_from_slice(&modulus_be); - modulus_be = padded; - } - let modulus_le_bytes = modulus_be.into_iter().rev().collect::>(); - - let mut blob = Vec::with_capacity(4 + 4 + 4 + 4 + 4 + 256); - blob.extend_from_slice(&MAGIC); - blob.extend_from_slice(&KEYTYPE); - blob.extend_from_slice(&RSA1); - blob.extend_from_slice(&BITLEN_2048); - blob.extend_from_slice(&exponent_le_u32_bytes); - blob.extend_from_slice(&modulus_le_bytes); - - Ok(base64::engine::general_purpose::STANDARD.encode(blob)) - } - pub fn handle_pipeline_output( &mut self, ps_value: PsValue, @@ -1754,24 +1542,6 @@ impl RunspacePool { pipeline.add_command(command); Ok(()) } - - /// Create, populate, and invoke a pipeline in one operation - pub(crate) fn invoke_spec( - &mut self, - uuid: Uuid, - spec: PipelineSpec, - ) -> Result { - // 1) Create the pipeline - let handle = self.init_pipeline(uuid)?; - - // 2) Add all commands from the spec - for cmd in spec.commands { - self.add_command(&handle, cmd)?; - } - - // 3) Invoke the pipeline using existing logic - self.invoke_pipeline_request(&handle) - } } #[cfg(test)] diff --git a/crates/ironposh-client-core/src/runspace_pool/requests.rs b/crates/ironposh-client-core/src/runspace_pool/requests.rs new file mode 100644 index 0000000..1f9d938 --- /dev/null +++ b/crates/ironposh-client-core/src/runspace_pool/requests.rs @@ -0,0 +1,254 @@ +//! Outbound WSMan request builders for [`RunspacePool`]. +//! +//! This file is a continuation of the `impl RunspacePool` block that lives in +//! [`super::pool`]. It groups the methods that build outbound WSMan requests +//! (Receive / Disconnect / Reconnect / Signal / Send) and the PSRP message +//! serialization that feeds them, so that all `&self.connection` (WsMan) usage +//! is collected in one place as groundwork for a future transport seam. +//! +//! This is a pure file-organization split: the methods here are behavior- and +//! signature-identical to their previous definitions in `pool.rs`. + +use base64::Engine; +use rsa::traits::PublicKeyParts; +use tracing::{error, info, instrument}; +use uuid::Uuid; + +use crate::{ + PwshCoreError, pipeline::PipelineSpec, powershell::PipelineHandle, + runspace_pool::PsInvocationState, +}; + +use super::enums::RunspacePoolState; +use super::pool::{DesiredStream, RunspacePool}; + +impl RunspacePool { + // We should accept the pipeline id here, but for now let's ignore it + pub(crate) fn fire_receive( + &self, + desired_streams: Vec, + ) -> Result { + debug_assert!(!desired_streams.is_empty(), "At least one desired stream"); + Ok(self + .shell + .fire_receive(&self.connection, desired_streams) + .into() + .to_xml_string()?) + } + + /// Build a Disconnect request for this pool's shell (MS-WSMV 3.1.4.13). + /// Valid only in `Opened` state; transitions the pool to `Disconnecting`. + #[instrument(skip(self))] + pub fn fire_disconnect(&mut self) -> Result { + if self.state != RunspacePoolState::Opened { + return Err(crate::PwshCoreError::InvalidState( + "RunspacePool must be in Opened state to disconnect", + )); + } + + let xml = self + .shell + .fire_disconnect(&self.connection) + .into() + .to_xml_string()?; + + self.state = RunspacePoolState::Disconnecting; + info!(runspace_pool_id = %self.id, "runspace pool disconnect requested"); + Ok(xml) + } + + /// Build a Reconnect request for this pool's shell (MS-WSMV 3.1.4.14). + /// Valid only in `Disconnected` state; transitions the pool to `Connecting`. + #[instrument(skip(self))] + pub fn fire_reconnect(&mut self) -> Result { + if self.state != RunspacePoolState::Disconnected { + return Err(crate::PwshCoreError::InvalidState( + "RunspacePool must be in Disconnected state to reconnect", + )); + } + + let xml = self + .shell + .fire_reconnect(&self.connection) + .into() + .to_xml_string()?; + + self.state = RunspacePoolState::Connecting; + info!(runspace_pool_id = %self.id, "runspace pool reconnect requested"); + Ok(xml) + } + + pub fn kill_pipeline(&mut self, handle: &PipelineHandle) -> Result { + let pipeline = self + .pipelines + .get_mut(&handle.id()) + .ok_or(PwshCoreError::InvalidState( + "Pipeline handle not found, pipeline_id", + )) + .inspect_err(|_| { + error!(pipeline_id = ?&handle.id(), "Pipeline handle not found "); + })?; + + if pipeline.is_terminal() { + return Err(PwshCoreError::InvalidState( + "Cannot kill a pipeline that is already stopped, completed, or failed", + )); + } + + // Set pipeline state to Stopping + pipeline.set_state(PsInvocationState::Stopping); + info!(pipeline_id = %handle.id(), "Killing pipeline"); + + let request = self + .shell + .terminal_pipeline_signal(&self.connection, handle.id())?; + + Ok(request.into().to_xml_string()?) + } + + /// Send a pipeline host response to the server + #[instrument( + skip_all, + fields( + command_id = %command_id, + call_id = host_response.call_id, + method = ?host_response.method + ) + )] + pub fn send_pipeline_host_response( + &mut self, + command_id: uuid::Uuid, + host_response: &ironposh_psrp::PipelineHostResponse, + ) -> Result { + let _span = tracing::trace_span!("send_pipeline_host_response").entered(); + + // Fragment the host response message + tracing::trace!(stage = "fragment"); + let fragmented = + self.fragmenter + .fragment(host_response, self.id, Some(command_id), None)?; + tracing::trace!(fragment_count = fragmented.len(), stage = "fragmented"); + + // Encode fragments as base64 + tracing::trace!(stage = "base64_encode"); + let arguments = fragmented + .into_iter() + .map(|bytes| base64::engine::general_purpose::STANDARD.encode(&bytes[..])) + .collect::>(); + tracing::trace!( + argument_count = arguments.len(), + first_len = arguments.first().map(String::len), + stage = "base64_encoded" + ); + + // Create WS-Man Send request (send data to stdin) + tracing::trace!(stage = "wsman_send_request"); + let request = + self.shell + .send_data_request(&self.connection, Some(command_id), &arguments)?; + tracing::trace!(stage = "wsman_send_request_built"); + + let element: ironposh_xml::builder::Element<'_> = request.into(); + tracing::trace!(stage = "serialize_xml"); + let xml = element.to_xml_string().map_err(|e| { + tracing::error!(error = %e, stage = "serialize_xml", "failed to serialize XML"); + e + })?; + tracing::trace!(xml_len = xml.len(), stage = "done"); + Ok(xml) + } + + pub(super) fn send_runspace_pool_message( + &mut self, + message: &dyn ironposh_psrp::PsObjectWithType, + ) -> Result { + let _span = tracing::trace_span!( + "send_runspace_pool_message", + message_type = ?message.message_type() + ) + .entered(); + + tracing::trace!(stage = "fragment"); + let fragmented = self.fragmenter.fragment(message, self.id, None, None)?; + tracing::trace!(fragment_count = fragmented.len(), stage = "fragmented"); + + tracing::trace!(stage = "base64_encode"); + let arguments = fragmented + .into_iter() + .map(|bytes| base64::engine::general_purpose::STANDARD.encode(&bytes[..])) + .collect::>(); + + tracing::trace!(stage = "wsman_send_request"); + let request = self + .shell + .send_data_request(&self.connection, None, &arguments)?; + + let element: ironposh_xml::builder::Element<'_> = request.into(); + let xml = element.to_xml_string()?; + Ok(xml) + } + + pub(super) fn build_public_key_blob_base64(&mut self) -> Result { + const MAGIC: [u8; 4] = [0x06, 0x02, 0x00, 0x00]; + const KEYTYPE: [u8; 4] = [0x00, 0xA4, 0x00, 0x00]; + const RSA1: [u8; 4] = [0x52, 0x53, 0x41, 0x31]; + const BITLEN_2048: [u8; 4] = [0x00, 0x08, 0x00, 0x00]; + + let state = self.ensure_key_exchange_state()?; + let public_key = state.private_key.to_public_key(); + + let exponent_be_raw = public_key.e().to_bytes_be(); + if exponent_be_raw.is_empty() || exponent_be_raw.len() > 4 { + return Err(PwshCoreError::InternalError(format!( + "unexpected RSA exponent length: {} bytes", + exponent_be_raw.len() + ))); + } + let mut exponent_be_padded = [0u8; 4]; + exponent_be_padded[4 - exponent_be_raw.len()..].copy_from_slice(&exponent_be_raw); + let exponent_u32 = u32::from_be_bytes(exponent_be_padded); + let exponent_le_u32_bytes = exponent_u32.to_le_bytes(); + + let mut modulus_be = public_key.n().to_bytes_be(); + if modulus_be.len() > 256 { + return Err(PwshCoreError::InternalError(format!( + "RSA modulus too large: {} bytes", + modulus_be.len() + ))); + } + if modulus_be.len() < 256 { + let mut padded = vec![0u8; 256 - modulus_be.len()]; + padded.extend_from_slice(&modulus_be); + modulus_be = padded; + } + let modulus_le_bytes = modulus_be.into_iter().rev().collect::>(); + + let mut blob = Vec::with_capacity(4 + 4 + 4 + 4 + 4 + 256); + blob.extend_from_slice(&MAGIC); + blob.extend_from_slice(&KEYTYPE); + blob.extend_from_slice(&RSA1); + blob.extend_from_slice(&BITLEN_2048); + blob.extend_from_slice(&exponent_le_u32_bytes); + blob.extend_from_slice(&modulus_le_bytes); + + Ok(base64::engine::general_purpose::STANDARD.encode(blob)) + } + + /// Create, populate, and invoke a pipeline in one operation + pub(crate) fn invoke_spec( + &mut self, + uuid: Uuid, + spec: PipelineSpec, + ) -> Result { + // 1) Create the pipeline + let handle = self.init_pipeline(uuid)?; + + // 2) Add all commands from the spec + for cmd in spec.commands { + self.add_command(&handle, cmd)?; + } + + // 3) Invoke the pipeline using existing logic + self.invoke_pipeline_request(&handle) + } +}