diff --git a/.github/workflows/publish-wasm.yml b/.github/workflows/publish-wasm.yml index c579dcef9..54aec54b1 100644 --- a/.github/workflows/publish-wasm.yml +++ b/.github/workflows/publish-wasm.yml @@ -3,7 +3,7 @@ name: Publish WASM Packages on: push: tags: - - 'wasm-v*' + - 'buttplug-wasm-*' jobs: publish: diff --git a/.gitignore b/.gitignore index f471f85b2..ba11590b3 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ plans docs .deciduous **/.DS_Store +.worktrees node_modules package-lock.json **/dist/ diff --git a/CLAUDE.md b/CLAUDE.md index b3b9788db..1c6519940 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,5 +1,7 @@ # CLAUDE.md +Last verified: 2026-05-17 + This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. ## Build Commands @@ -71,6 +73,15 @@ AwaitingHandshake → Connected { client_name, spec_version } - `DeviceCommunicationManager` - Hardware discovery - `ServerDeviceManager` - Orchestrates devices and protocols +**Output Observability** (opt-in): +When `emit_output_observations` is enabled, the server broadcasts `OutputObservation` events for every output command sent to a device. The data flows through broadcast channels: +``` +DeviceHandle → ServerDeviceManager → ButtplugServer::output_observation_stream() + → ButtplugRemoteServer (as ButtplugRemoteServerEvent::OutputObservation) + → Frontend (as EngineMessage::DeviceOutputObservation) +``` +Each observation carries `device_index`, `feature_index`, `output_type`, and `value`. Disabled by default to avoid overhead; enable via `ServerDeviceManagerBuilder::emit_output_observations(true)` or `EngineOptions::emit_output_observations`. + ## Contributing **Issues must be filed and discussed before PRs are submitted.** Approval from @qdot required. Non-issue PRs will be closed. diff --git a/crates/buttplug_server/src/device/device_handle.rs b/crates/buttplug_server/src/device/device_handle.rs index 9c3e31c3a..6640eed3b 100644 --- a/crates/buttplug_server/src/device/device_handle.rs +++ b/crates/buttplug_server/src/device/device_handle.rs @@ -37,6 +37,7 @@ use buttplug_server_device_config::{ use dashmap::DashMap; use futures::future::{self, BoxFuture, FutureExt}; use tokio::sync::{ + broadcast, mpsc::{Sender, channel}, oneshot, }; @@ -56,6 +57,7 @@ use crate::{ use super::{ InternalDeviceEvent, + OutputObservation, device_task::{DeviceTaskConfig, spawn_device_task}, hardware::{Hardware, HardwareCommand, HardwareConnector, HardwareEvent}, protocol::{ProtocolHandler, ProtocolKeepaliveStrategy, ProtocolSpecializer}, @@ -108,6 +110,7 @@ pub struct DeviceHandle { last_output_command: Arc>, stop_commands: Arc>, internal_hw_msg_sender: Sender>, + output_observation_sender: Option>, } impl DeviceHandle { @@ -119,6 +122,7 @@ impl DeviceHandle { identifier: UserDeviceIdentifier, stop_commands: Vec, internal_hw_msg_sender: Sender>, + output_observation_sender: Option>, ) -> Self { Self { hardware, @@ -129,6 +133,7 @@ impl DeviceHandle { last_output_command: Arc::new(DashMap::new()), stop_commands: Arc::new(stop_commands), internal_hw_msg_sender, + output_observation_sender, } } @@ -268,6 +273,18 @@ impl DeviceHandle { self .last_output_command .insert(msg.feature_id(), msg.clone()); + + if let Some(sender) = &self.output_observation_sender { + // OutputType derives Display via strum, producing clean names like "Vibrate", "Rotate". + // The design uses format!("{:?}") but to_string() is preferred for clean output. + let _ = sender.send(OutputObservation { + device_index: msg.device_index(), + feature_index: msg.feature_index(), + output_type: msg.output_command().as_output_type().to_string(), + value: msg.output_command().value() as f64, + }); + } + self.handle_generic_command_result(self.handler.handle_output_cmd(msg)) } @@ -437,6 +454,7 @@ pub(super) async fn build_device_handle( mut hardware_connector: Box, protocol_specializers: Vec, device_event_sender: tokio::sync::mpsc::Sender, + output_observation_sender: Option>, ) -> Result { // At this point, we know we've got hardware that is waiting to connect, and enough protocol // info to actually do something after we connect. So go ahead and connect. @@ -571,6 +589,7 @@ pub(super) async fn build_device_handle( identifier, stop_commands, internal_hw_msg_sender, + output_observation_sender, ); // If we need a keepalive with a packet replay, set this up via stopping the device on connect. diff --git a/crates/buttplug_server/src/device/device_task.rs b/crates/buttplug_server/src/device/device_task.rs index b6ce7e06b..bcf0a12fa 100644 --- a/crates/buttplug_server/src/device/device_task.rs +++ b/crates/buttplug_server/src/device/device_task.rs @@ -151,7 +151,7 @@ async fn run_device_task( // Priority 2: Batch deadline reached - flush pending commands _ = batch_fut => { - debug!("Batch deadline reached, sending {} commands", pending_commands.len()); + trace!("Batch deadline reached, sending {} commands", pending_commands.len()); while let Some(cmd) = pending_commands.pop_front() { let _ = hardware.parse_message(&cmd).await; if track_keepalive { diff --git a/crates/buttplug_server/src/device/mod.rs b/crates/buttplug_server/src/device/mod.rs index 967cb6b7c..4a6c90596 100644 --- a/crates/buttplug_server/src/device/mod.rs +++ b/crates/buttplug_server/src/device/mod.rs @@ -102,8 +102,10 @@ pub mod protocol; pub mod protocol_impl; mod server_device_manager; mod server_device_manager_event_loop; +mod output_observation; pub use device_handle::{DeviceCommand, DeviceEvent, DeviceHandle}; +pub use output_observation::OutputObservation; use crate::message::ButtplugServerDeviceMessage; use buttplug_server_device_config::UserDeviceIdentifier; diff --git a/crates/buttplug_server/src/device/output_observation.rs b/crates/buttplug_server/src/device/output_observation.rs new file mode 100644 index 000000000..1c79e9288 --- /dev/null +++ b/crates/buttplug_server/src/device/output_observation.rs @@ -0,0 +1,16 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +//! Output observation type for output command tracking and observability + +#[derive(Clone, Debug)] +pub struct OutputObservation { + pub device_index: u32, + pub feature_index: u32, + pub output_type: String, + pub value: f64, +} diff --git a/crates/buttplug_server/src/device/server_device_manager.rs b/crates/buttplug_server/src/device/server_device_manager.rs index af686e7aa..0d832496f 100644 --- a/crates/buttplug_server/src/device/server_device_manager.rs +++ b/crates/buttplug_server/src/device/server_device_manager.rs @@ -13,6 +13,7 @@ use crate::{ ButtplugServerResultFuture, device::{ DeviceHandle, + OutputObservation, hardware::communication::{HardwareCommunicationManager, HardwareCommunicationManagerBuilder}, server_device_manager_event_loop::ServerDeviceManagerEventLoop, }, @@ -72,6 +73,7 @@ pub struct ServerDeviceInfo { pub struct ServerDeviceManagerBuilder { device_configuration_manager: Arc, comm_managers: Vec>, + emit_output_observations: bool, } impl ServerDeviceManagerBuilder { @@ -79,6 +81,7 @@ impl ServerDeviceManagerBuilder { Self { device_configuration_manager: Arc::new(device_configuration_manager), comm_managers: vec![], + emit_output_observations: false, } } @@ -88,6 +91,7 @@ impl ServerDeviceManagerBuilder { Self { device_configuration_manager, comm_managers: vec![], + emit_output_observations: false, } } @@ -99,6 +103,11 @@ impl ServerDeviceManagerBuilder { self } + pub fn emit_output_observations(&mut self, enabled: bool) -> &mut Self { + self.emit_output_observations = enabled; + self + } + pub fn finish(&mut self) -> Result { let (device_command_sender, device_command_receiver) = mpsc::channel(256); let (device_event_sender, device_event_receiver) = mpsc::channel(256); @@ -149,6 +158,11 @@ impl ServerDeviceManagerBuilder { let loop_cancellation_token = CancellationToken::new(); let output_sender = broadcast::channel(255).0; + let output_observation_sender = if self.emit_output_observations { + Some(broadcast::channel(256).0) + } else { + None + }; let mut event_loop = ServerDeviceManagerEventLoop::new( comm_managers, @@ -158,6 +172,7 @@ impl ServerDeviceManagerBuilder { output_sender.clone(), device_event_receiver, device_command_receiver, + output_observation_sender.clone(), ); buttplug_core::spawn!("ServerDeviceManager event loop", async move { event_loop.run().await; @@ -169,6 +184,7 @@ impl ServerDeviceManagerBuilder { loop_cancellation_token, running: Arc::new(AtomicBool::new(true)), output_sender, + output_observation_sender, }) } } @@ -183,6 +199,7 @@ pub struct ServerDeviceManager { loop_cancellation_token: CancellationToken, running: Arc, output_sender: broadcast::Sender, + output_observation_sender: Option>, } impl ServerDeviceManager { @@ -192,6 +209,12 @@ impl ServerDeviceManager { convert_broadcast_receiver_to_stream(self.output_sender.subscribe()) } + pub fn output_observation_stream(&self) -> Option> { + self.output_observation_sender.as_ref().map(|sender| { + convert_broadcast_receiver_to_stream(sender.subscribe()) + }) + } + fn start_scanning(&self) -> ButtplugServerResultFuture { let command_sender = self.device_command_sender.clone(); async move { diff --git a/crates/buttplug_server/src/device/server_device_manager_event_loop.rs b/crates/buttplug_server/src/device/server_device_manager_event_loop.rs index 2ac5b1ba5..8de52e166 100644 --- a/crates/buttplug_server/src/device/server_device_manager_event_loop.rs +++ b/crates/buttplug_server/src/device/server_device_manager_event_loop.rs @@ -13,6 +13,7 @@ use super::server_device_manager::DeviceManagerCommand; use crate::device::{ DeviceHandle, InternalDeviceEvent, + OutputObservation, device_handle::build_device_handle, hardware::communication::{HardwareCommunicationManager, HardwareCommunicationManagerEvent}, protocol::ProtocolManager, @@ -64,6 +65,8 @@ pub(super) struct ServerDeviceManagerEventLoop { loop_cancellation_token: CancellationToken, /// Protocol map, for mapping user definitions to protocols protocol_manager: ProtocolManager, + /// Optional sender for output observations, None when disabled + output_observation_sender: Option>, } impl ServerDeviceManagerEventLoop { @@ -75,6 +78,7 @@ impl ServerDeviceManagerEventLoop { server_sender: broadcast::Sender, device_comm_receiver: mpsc::Receiver, device_command_receiver: mpsc::Receiver, + output_observation_sender: Option>, ) -> Self { let (device_event_sender, device_event_receiver) = mpsc::channel(256); Self { @@ -90,6 +94,7 @@ impl ServerDeviceManagerEventLoop { connecting_devices: Arc::new(DashSet::new()), loop_cancellation_token, protocol_manager: ProtocolManager::default(), + output_observation_sender, } } @@ -283,6 +288,7 @@ impl ServerDeviceManagerEventLoop { let device_config_manager = self.device_config_manager.clone(); let connecting_devices = self.connecting_devices.clone(); + let output_observation_sender = self.output_observation_sender.clone(); let span = info_span!( "device creation", name = tracing::field::display(name), @@ -299,6 +305,7 @@ impl ServerDeviceManagerEventLoop { creator, protocol_specializers, device_event_sender_for_forwarding, + output_observation_sender, ) .await { diff --git a/crates/buttplug_server/src/server.rs b/crates/buttplug_server/src/server.rs index a2522d030..f064de8a8 100644 --- a/crates/buttplug_server/src/server.rs +++ b/crates/buttplug_server/src/server.rs @@ -9,7 +9,7 @@ use crate::server_message_conversion::ButtplugServerDeviceEventMessageConverter; use super::{ ButtplugServerResultFuture, - device::ServerDeviceManager, + device::{OutputObservation, ServerDeviceManager}, message::{ ButtplugClientMessageVariant, ButtplugServerMessageVariant, @@ -194,6 +194,12 @@ impl ButtplugServer { self.device_manager.clone() } + /// Returns a stream of output observations if emission is enabled, otherwise None. + /// This stream emits observations for all output commands processed through the server. + pub fn output_observation_stream(&self) -> Option> { + self.device_manager.output_observation_stream() + } + /// If true, client is currently connected to the server. pub fn connected(&self) -> bool { matches!( diff --git a/crates/buttplug_tests/CLAUDE.md b/crates/buttplug_tests/CLAUDE.md index 45128d8f3..31f5b50a1 100644 --- a/crates/buttplug_tests/CLAUDE.md +++ b/crates/buttplug_tests/CLAUDE.md @@ -53,4 +53,5 @@ Tests run across multiple protocol spec versions (v0–v4) via version-specific - `test_serializers.rs` — Message serialization/deserialization across protocol versions - `test_message_downgrades.rs` — Protocol version downgrade path tests - `test_disabled_device_features.rs` — Tests for user config feature disabling +- `test_output_observations.rs` — Integration tests for output observability (observation stream, filtering, multi-device, enable/disable) - `test_websocket_connectors.rs` / `test_websocket_device_comm_manager.rs` — WebSocket transport integration tests diff --git a/crates/buttplug_tests/tests/test_output_observations.rs b/crates/buttplug_tests/tests/test_output_observations.rs new file mode 100644 index 000000000..6044e1177 --- /dev/null +++ b/crates/buttplug_tests/tests/test_output_observations.rs @@ -0,0 +1,508 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +mod util; + +use buttplug_core::message::{ + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ButtplugServerMessageV4, + OutputCommand, + OutputCmdV4, + OutputValue, + RequestServerInfoV4, + StartScanningV0, + StopCmdV4, +}; +use buttplug_server::message::ButtplugClientMessageVariant; +use futures::{StreamExt, pin_mut}; +use std::time::Duration; +use tokio::time::timeout; +use util::test_server_with_device_and_observations; + +#[tokio::test] +async fn test_ac2_1_observation_emission() { + // AC2.1: Send a vibrate command at value 50, verify one OutputObservation + // appears with correct device_index, feature_index, output_type="Vibrate", and value=50.0 + let (server, _device) = test_server_with_device_and_observations("Massage Demo"); + + // Subscribe to observation stream + let obs_stream = server + .output_observation_stream() + .expect("should be Some when enabled"); + pin_mut!(obs_stream); + + // Handshake + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .unwrap(); + + // Start scanning and wait for device + let event_stream = server.server_version_event_stream(); + pin_mut!(event_stream); + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .unwrap(); + + // Wait for DeviceList event to get device_index + let device_index = loop { + if let Some(ButtplugServerMessageV4::DeviceList(dl)) = event_stream.next().await { + if let Some((&idx, _)) = dl.devices().iter().next() { + break idx; + } + } + }; + + // Send vibrate command at value 50 + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new(device_index, 0, OutputCommand::Vibrate(OutputValue::new(50))) + .into(), + )) + .await + .unwrap(); + + // Verify observation appears with correct values + if let Ok(Some(obs)) = timeout(Duration::from_millis(500), obs_stream.next()).await { + assert_eq!(obs.device_index, device_index); + assert_eq!(obs.feature_index, 0); + assert_eq!(obs.output_type, "Vibrate"); + assert_eq!(obs.value, 50.0); + } else { + panic!("Expected observation but none received or timeout"); + } +} + +#[tokio::test] +async fn test_ac2_2_observation_dedup() { + // AC2.2: Send vibrate at value 50 twice. First should produce observation, + // second should not. Verify by using tokio::time::timeout on the stream — + // second read should time out. + let (server, _device) = test_server_with_device_and_observations("Massage Demo"); + + let obs_stream = server + .output_observation_stream() + .expect("should be Some when enabled"); + pin_mut!(obs_stream); + + // Handshake + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .unwrap(); + + // Start scanning and wait for device + let event_stream = server.server_version_event_stream(); + pin_mut!(event_stream); + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .unwrap(); + + let device_index = loop { + if let Some(ButtplugServerMessageV4::DeviceList(dl)) = event_stream.next().await { + if let Some((&idx, _)) = dl.devices().iter().next() { + break idx; + } + } + }; + + // Send first vibrate command at value 50 + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new(device_index, 0, OutputCommand::Vibrate(OutputValue::new(50))) + .into(), + )) + .await + .unwrap(); + + // Verify first observation appears + if let Ok(Some(obs)) = timeout(Duration::from_millis(500), obs_stream.next()).await { + assert_eq!(obs.device_index, device_index); + assert_eq!(obs.value, 50.0); + } else { + panic!("Expected first observation but none received or timeout"); + } + + // Send same vibrate command again at value 50 + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new(device_index, 0, OutputCommand::Vibrate(OutputValue::new(50))) + .into(), + )) + .await + .unwrap(); + + // Verify second observation does NOT appear (timeout expected) + let result = timeout(Duration::from_millis(100), obs_stream.next()).await; + assert!( + result.is_err(), + "Expected timeout (no observation) for deduplicated command" + ); +} + +#[tokio::test] +async fn test_ac2_3_observation_before_protocol() { + // AC2.3: Observations are emitted after the dedup check passes but before + // protocol processing. This is verified structurally by the tap point location + // (before handle_output_cmd). Test verifies observation arrives even when the + // test device channel hasn't consumed the hardware command yet. + let (server, _device) = test_server_with_device_and_observations("Massage Demo"); + + let obs_stream = server + .output_observation_stream() + .expect("should be Some when enabled"); + pin_mut!(obs_stream); + + // Handshake + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .unwrap(); + + // Start scanning and wait for device + let event_stream = server.server_version_event_stream(); + pin_mut!(event_stream); + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .unwrap(); + + let device_index = loop { + if let Some(ButtplugServerMessageV4::DeviceList(dl)) = event_stream.next().await { + if let Some((&idx, _)) = dl.devices().iter().next() { + break idx; + } + } + }; + + // Send vibrate command + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new(device_index, 0, OutputCommand::Vibrate(OutputValue::new(75))) + .into(), + )) + .await + .unwrap(); + + // Verify observation appears (not waiting on device to process hardware command) + if let Ok(Some(obs)) = timeout(Duration::from_millis(500), obs_stream.next()).await { + assert_eq!(obs.device_index, device_index); + assert_eq!(obs.output_type, "Vibrate"); + assert_eq!(obs.value, 75.0); + } else { + panic!("Expected observation but none received or timeout"); + } +} + +#[tokio::test] +async fn test_ac3_1_stop_as_zero() { + // AC3.1: Send vibrate at value 50, then send StopDeviceCmd for that device. + // Verify zero-value observation appears after stop. + let (server, _device) = test_server_with_device_and_observations("Massage Demo"); + + let obs_stream = server + .output_observation_stream() + .expect("should be Some when enabled"); + pin_mut!(obs_stream); + + // Handshake + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .unwrap(); + + // Start scanning and wait for device + let event_stream = server.server_version_event_stream(); + pin_mut!(event_stream); + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .unwrap(); + + let device_index = loop { + if let Some(ButtplugServerMessageV4::DeviceList(dl)) = event_stream.next().await { + if let Some((&idx, _)) = dl.devices().iter().next() { + break idx; + } + } + }; + + // Send vibrate command at value 50 + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new(device_index, 0, OutputCommand::Vibrate(OutputValue::new(50))) + .into(), + )) + .await + .unwrap(); + + // Verify first observation + if let Ok(Some(obs)) = timeout(Duration::from_millis(500), obs_stream.next()).await { + assert_eq!(obs.value, 50.0); + } else { + panic!("Expected first observation but none received or timeout"); + } + + // Send StopDeviceCmd for that specific device (device_index, None feature_index, outputs=true) + server + .parse_message(ButtplugClientMessageVariant::V4( + StopCmdV4::new(Some(device_index), None, false, true).into(), + )) + .await + .unwrap(); + + // Verify zero-value observation appears + if let Ok(Some(obs)) = timeout(Duration::from_millis(500), obs_stream.next()).await { + assert_eq!(obs.device_index, device_index); + assert_eq!(obs.feature_index, 0); + assert_eq!(obs.output_type, "Vibrate"); + assert_eq!(obs.value, 0.0); + } else { + panic!("Expected zero-value observation after stop but none received or timeout"); + } +} + +#[tokio::test] +async fn test_ac3_2_stop_all_devices() { + // AC3.2: Send vibrate command, then send StopAllDevices. + // Verify zero-value observation appears (stop all targets all devices). + let (server, _device) = test_server_with_device_and_observations("Massage Demo"); + + let obs_stream = server + .output_observation_stream() + .expect("should be Some when enabled"); + pin_mut!(obs_stream); + + // Handshake + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .unwrap(); + + // Start scanning and wait for device + let event_stream = server.server_version_event_stream(); + pin_mut!(event_stream); + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .unwrap(); + + let device_index = loop { + if let Some(ButtplugServerMessageV4::DeviceList(dl)) = event_stream.next().await { + if let Some((&idx, _)) = dl.devices().iter().next() { + break idx; + } + } + }; + + // Send vibrate command + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new(device_index, 0, OutputCommand::Vibrate(OutputValue::new(50))) + .into(), + )) + .await + .unwrap(); + + // Consume the emission observation + let _ = timeout(Duration::from_millis(500), obs_stream.next()).await; + + // Send StopAllDevices (StopCmdV4::default() with no device_index) + server + .parse_message(ButtplugClientMessageVariant::V4( + StopCmdV4::default().into(), + )) + .await + .unwrap(); + + // Verify zero-value observation appears + if let Ok(Some(obs)) = timeout(Duration::from_millis(500), obs_stream.next()).await { + assert_eq!(obs.device_index, device_index); + assert_eq!(obs.feature_index, 0); + assert_eq!(obs.output_type, "Vibrate"); + assert_eq!(obs.value, 0.0); + } else { + panic!("Expected zero-value observation after StopAllDevices but none received or timeout"); + } +} + +#[tokio::test] +async fn test_ac3_3_stop_dedup() { + // AC3.3: Stop-generated zero commands still go through the dedup path. + // When a device is already at zero, sending another stop should not generate + // an observation due to dedup (the zero-value command matches the previous state). + let (server, _device) = test_server_with_device_and_observations("Massage Demo"); + + let obs_stream = server + .output_observation_stream() + .expect("should be Some when enabled"); + pin_mut!(obs_stream); + + // Handshake + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .unwrap(); + + // Start scanning and wait for device + let event_stream = server.server_version_event_stream(); + pin_mut!(event_stream); + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .unwrap(); + + let device_index = loop { + if let Some(ButtplugServerMessageV4::DeviceList(dl)) = event_stream.next().await { + if let Some((&idx, _)) = dl.devices().iter().next() { + break idx; + } + } + }; + + // Test 1: Send a non-zero command, verify observation appears + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new(device_index, 0, OutputCommand::Vibrate(OutputValue::new(50))) + .into(), + )) + .await + .unwrap(); + + if let Ok(Some(obs)) = timeout(Duration::from_millis(500), obs_stream.next()).await { + assert_eq!(obs.value, 50.0); + } else { + panic!("Expected observation for non-zero command"); + } + + // Test 2: Send StopDeviceCmd to set to zero, verify observation(s) appear + // The device has multiple output features, so stop generates multiple observations + server + .parse_message(ButtplugClientMessageVariant::V4( + StopCmdV4::new(Some(device_index), None, false, true).into(), + )) + .await + .unwrap(); + + // Consume all stop observations for this device (one per output feature) + // The Aneros Vivi has 2 vibrate features, so we expect at least 2 observations + let mut stop_obs_count = 0; + for _ in 0..10 { + // Allow up to 10 observations to account for any feature combinations + match timeout(Duration::from_millis(100), obs_stream.next()).await { + Ok(Some(obs)) => { + assert_eq!(obs.value, 0.0); + stop_obs_count += 1; + // Keep consuming until we timeout + } + Err(_) => { + // Timeout - no more observations + break; + } + Ok(None) => { + // Stream ended unexpectedly + panic!("Observation stream ended unexpectedly during Test 2"); + } + } + } + assert!( + stop_obs_count >= 1, + "Expected at least one zero-value observation for stop command" + ); + + // Test 3: Verify that the stop command successfully set the device to zero + // by checking that a second stop doesn't generate an observation. + // The dedup check should prevent sending a zero-value command twice. + // AC3.3 requirement: "Stop dedup: no observation if already at zero." + server + .parse_message(ButtplugClientMessageVariant::V4( + StopCmdV4::new(Some(device_index), None, false, true).into(), + )) + .await + .unwrap(); + + // According to AC3.3, the second stop command should NOT produce an observation + // because the device is already at zero. The dedup logic in handle_outputcmd_v4 + // checks if the new message matches the last message and returns early if so, + // preventing the observation from being sent. + let result = timeout(Duration::from_millis(100), obs_stream.next()).await; + assert!( + result.is_err(), + "Expected timeout (no observation) for deduplicated stop command when device is already at zero" + ); +} + +#[tokio::test] +async fn test_ac5_1_disabled_no_observation_stream() { + // AC5.1: When emit_output_observations is false, output_observation_stream() + // returns None and there's no overhead. + let (server, _device) = util::test_server_with_device("Massage Demo"); + + // Verify observation stream is None when not enabled + let obs_stream = server.output_observation_stream(); + assert!( + obs_stream.is_none(), + "output_observation_stream should be None when disabled" + ); +} diff --git a/crates/buttplug_tests/tests/util/mod.rs b/crates/buttplug_tests/tests/util/mod.rs index 47d1d19cf..6365cf462 100644 --- a/crates/buttplug_tests/tests/util/mod.rs +++ b/crates/buttplug_tests/tests/util/mod.rs @@ -173,3 +173,21 @@ pub fn test_server_v4_with_device(device_type: &str) -> (ButtplugServer, TestDev (test_server_with_comm_manager(builder), device) } + +#[allow(dead_code)] +pub fn test_server_with_device_and_observations( + device_type: &str, +) -> (ButtplugServer, TestDeviceChannelHost) { + let mut builder = TestDeviceCommunicationManagerBuilder::default(); + let device = builder.add_test_device(&TestDeviceIdentifier::new(device_type, None)); + + let mut dm_builder = ServerDeviceManagerBuilder::new(create_test_dcm()); + dm_builder.comm_manager(builder); + dm_builder.emit_output_observations(true); + + let server = ButtplugServerBuilder::new(dm_builder.finish().unwrap()) + .finish() + .unwrap(); + + (server, device) +} diff --git a/crates/intiface_engine/src/buttplug_server.rs b/crates/intiface_engine/src/buttplug_server.rs index 8cd501a52..a50d3c95b 100644 --- a/crates/intiface_engine/src/buttplug_server.rs +++ b/crates/intiface_engine/src/buttplug_server.rs @@ -124,6 +124,9 @@ pub async fn setup_buttplug_server( }; setup_server_device_comm_managers(options, &mut dm_builder); + if options.emit_output_observations() { + dm_builder.emit_output_observations(true); + } let mut server_builder = ButtplugServerBuilder::new( dm_builder .finish() diff --git a/crates/intiface_engine/src/frontend/mod.rs b/crates/intiface_engine/src/frontend/mod.rs index e554ece95..8f4dd416b 100644 --- a/crates/intiface_engine/src/frontend/mod.rs +++ b/crates/intiface_engine/src/frontend/mod.rs @@ -100,6 +100,12 @@ pub async fn frontend_server_event_loop( .send(EngineMessage::DeviceDisconnected{index: device_id}) .await; } + ButtplugRemoteServerEvent::OutputObservation { device_index, feature_index, output_type, value } => { + trace!("Output observation: device {} feature {} {} = {}", device_index, feature_index, output_type, value); + frontend + .send(EngineMessage::DeviceOutputObservation { device_index, feature_index, output_type, value }) + .await; + } }, None => { info!("Lost connection with main thread, breaking."); diff --git a/crates/intiface_engine/src/frontend/process_messages.rs b/crates/intiface_engine/src/frontend/process_messages.rs index e8e6ca808..635492bdc 100644 --- a/crates/intiface_engine/src/frontend/process_messages.rs +++ b/crates/intiface_engine/src/frontend/process_messages.rs @@ -39,6 +39,12 @@ pub enum EngineMessage { ClientRejected { reason: String, }, + DeviceOutputObservation { + device_index: u32, + feature_index: u32, + output_type: String, + value: f64, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/intiface_engine/src/options.rs b/crates/intiface_engine/src/options.rs index 857d96b3f..646c9bbbe 100644 --- a/crates/intiface_engine/src/options.rs +++ b/crates/intiface_engine/src/options.rs @@ -63,6 +63,8 @@ pub struct EngineOptions { repeater_remote_address: Option, #[getset(get_copy = "pub")] rest_api_port: Option, + #[getset(get_copy = "pub")] + emit_output_observations: bool, } #[derive(Default, Debug, Clone)] @@ -94,6 +96,7 @@ pub struct EngineOptionsExternal { pub repeater_local_port: Option, pub repeater_remote_address: Option, pub rest_api_port: Option, + pub emit_output_observations: bool, } impl From for EngineOptions { @@ -126,6 +129,7 @@ impl From for EngineOptions { repeater_local_port: other.repeater_local_port, repeater_remote_address: other.repeater_remote_address, rest_api_port: other.rest_api_port, + emit_output_observations: other.emit_output_observations, } } } @@ -279,6 +283,11 @@ impl EngineOptionsBuilder { self } + pub fn emit_output_observations(&mut self, value: bool) -> &mut Self { + self.options.emit_output_observations = value; + self + } + pub fn finish(&mut self) -> EngineOptions { self.options.clone() } diff --git a/crates/intiface_engine/src/remote_server.rs b/crates/intiface_engine/src/remote_server.rs index ab1a12a7c..9ccb010ee 100644 --- a/crates/intiface_engine/src/remote_server.rs +++ b/crates/intiface_engine/src/remote_server.rs @@ -42,6 +42,12 @@ pub enum ButtplugRemoteServerEvent { DeviceRemoved { index: u32, }, + OutputObservation { + device_index: u32, + feature_index: u32, + output_type: String, + value: f64, + }, //DeviceCommand(ButtplugDeviceCommandMessageUnion) } @@ -262,6 +268,26 @@ impl ButtplugRemoteServer { } }); } + { + let server = server.clone(); + let event_sender = event_sender.clone(); + tokio::spawn(async move { + if let Some(obs_stream) = server.output_observation_stream() { + futures::pin_mut!(obs_stream); + while let Some(obs) = futures::StreamExt::next(&mut obs_stream).await { + if event_sender.receiver_count() > 0 { + let _ = event_sender.send(ButtplugRemoteServerEvent::OutputObservation { + device_index: obs.device_index, + feature_index: obs.feature_index, + output_type: obs.output_type, + value: obs.value, + }); + } + } + trace!("Output observation stream ended"); + } + }); + } Self { event_sender, server, diff --git a/docs/test-plans/2026-05-17-output-observability.md b/docs/test-plans/2026-05-17-output-observability.md new file mode 100644 index 000000000..ec6e92f4a --- /dev/null +++ b/docs/test-plans/2026-05-17-output-observability.md @@ -0,0 +1,71 @@ +# Output Observability - Human Test Plan + +## Prerequisites + +- Build the project: `cargo build` +- All automated tests passing: `cargo test -p buttplug_tests --test test_output_observations` +- Intiface Central or equivalent Frontend implementation available +- A Buttplug-compatible device (physical or simulated via WebSocket device forwarder) +- A Buttplug client application capable of sending vibrate/stop commands + +## Phase 1: End-to-End Observation Flow (AC4.1, AC4.3) + +| Step | Action | Expected | +|------|--------|----------| +| 1 | Build Intiface Engine with `emit_output_observations: true` in EngineOptions. If using EngineOptionsBuilder: call `.emit_output_observations(true)` before `.finish()`. | Engine compiles and starts without errors. | +| 2 | Start the engine with a Frontend connected (Intiface Central or WebSocket frontend). | Engine reports "EngineStarted" to the frontend. | +| 3 | Connect a test device (physical BLE or WebSocket forwarder). | Frontend receives `DeviceConnected` message. | +| 4 | From a client, send a Vibrate command at 50%. | Frontend receives `DeviceOutputObservation` with `device_index` matching connected device, `feature_index: 0`, `output_type: "Vibrate"`, `value: 50.0`. | +| 5 | Send the same Vibrate at 50% again. | No `DeviceOutputObservation` appears (dedup). | +| 6 | Send Vibrate at 75%. | Frontend receives `DeviceOutputObservation` with `value: 75.0`. | +| 7 | Send StopDeviceCmd. | Frontend receives `DeviceOutputObservation` with `value: 0.0` for each stoppable feature. | +| 8 | Send StopDeviceCmd again (already at zero). | No `DeviceOutputObservation` appears (dedup at zero). | +| 9 | Disconnect client and stop engine. | Clean shutdown, no crashes or panics. | + +## Phase 2: Disabled Mode Verification + +| Step | Action | Expected | +|------|--------|----------| +| 1 | Start Intiface Engine with `emit_output_observations: false` (the default). | Engine starts normally. | +| 2 | Connect a device and send vibrate commands. | No `DeviceOutputObservation` messages appear. All other messages work normally. | +| 3 | Check engine logs. | No observation-related channel errors or panics. | + +## Phase 3: Multi-Device Stop-All (supplements AC3.2) + +| Step | Action | Expected | +|------|--------|----------| +| 1 | With observations enabled, connect two or more devices. | Frontend shows `DeviceConnected` for each. | +| 2 | Send vibrate commands to each device. | `DeviceOutputObservation` messages for each device/feature. | +| 3 | Send StopAllDevices. | `DeviceOutputObservation` with `value: 0.0` for every stoppable feature on every device. | + +## Full Session Lifecycle + +1. Start engine with observations enabled and Frontend connected. +2. Connect a client and device. +3. Send vibrate commands at 25, 50, 75, 100%. +4. Verify each non-duplicate produces a `DeviceOutputObservation`. +5. Disconnect client (not device). +6. Reconnect client. +7. Send vibrate at same value as before disconnect. +8. Document whether observation is emitted (dedup state may reset on reconnect). +9. Send StopAllDevices and verify zero-value observations. +10. Disconnect everything, verify clean shutdown. + +## Traceability + +| Acceptance Criterion | Automated Test | Manual Step | +|---------------------|---------------|-------------| +| AC1.1 - EngineOptions field | Structural | -- | +| AC1.2 - External options and builder | Structural | -- | +| AC1.3 - Disabled = no channel | `test_ac5_1_disabled_no_observation_stream` | Phase 2 step 2 | +| AC2.1 - Observation emission | `test_ac2_1_observation_emission` | Phase 1 step 4 | +| AC2.2 - Dedup suppression | `test_ac2_2_observation_dedup` | Phase 1 step 5 | +| AC2.3 - Tap point before protocol | `test_ac2_3_observation_before_protocol` | -- | +| AC3.1 - StopDevice as zero | `test_ac3_1_stop_as_zero` | Phase 1 step 7 | +| AC3.2 - StopAllDevices | `test_ac3_2_stop_all_devices` (single device) | Phase 3 (multi-device) | +| AC3.3 - Stop dedup at zero | `test_ac3_3_stop_dedup` | Phase 1 step 8 | +| AC4.1 - E2E observation flow | None (human only) | Phase 1 steps 1-9 | +| AC4.2 - EngineMessage fields | Structural | -- | +| AC4.3 - Frontend receives observations | None (human only) | Phase 1 step 4 | +| AC5.1 - Disabled = None stream | `test_ac5_1_disabled_no_observation_stream` | Phase 2 steps 1-3 | +| AC5.2 - No allocation when disabled | Structural | Phase 2 step 3 |