Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish-wasm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Publish WASM Packages
on:
push:
tags:
- 'wasm-v*'
- 'buttplug-wasm-*'

jobs:
publish:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ plans
docs
.deciduous
**/.DS_Store
.worktrees
node_modules
package-lock.json
**/dist/
11 changes: 11 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# CLAUDE.md

Last verified: 2026-05-17

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Build Commands
Expand Down Expand Up @@ -71,6 +73,15 @@ AwaitingHandshake → Connected { client_name, spec_version }
- `DeviceCommunicationManager` - Hardware discovery
- `ServerDeviceManager` - Orchestrates devices and protocols

**Output Observability** (opt-in):
When `emit_output_observations` is enabled, the server broadcasts `OutputObservation` events for every output command sent to a device. The data flows through broadcast channels:
```
DeviceHandle → ServerDeviceManager → ButtplugServer::output_observation_stream()
→ ButtplugRemoteServer (as ButtplugRemoteServerEvent::OutputObservation)
→ Frontend (as EngineMessage::DeviceOutputObservation)
```
Each observation carries `device_index`, `feature_index`, `output_type`, and `value`. Disabled by default to avoid overhead; enable via `ServerDeviceManagerBuilder::emit_output_observations(true)` or `EngineOptions::emit_output_observations`.

## Contributing

**Issues must be filed and discussed before PRs are submitted.** Approval from @qdot required. Non-issue PRs will be closed.
Expand Down
19 changes: 19 additions & 0 deletions crates/buttplug_server/src/device/device_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use buttplug_server_device_config::{
use dashmap::DashMap;
use futures::future::{self, BoxFuture, FutureExt};
use tokio::sync::{
broadcast,
mpsc::{Sender, channel},
oneshot,
};
Expand All @@ -56,6 +57,7 @@ use crate::{

use super::{
InternalDeviceEvent,
OutputObservation,
device_task::{DeviceTaskConfig, spawn_device_task},
hardware::{Hardware, HardwareCommand, HardwareConnector, HardwareEvent},
protocol::{ProtocolHandler, ProtocolKeepaliveStrategy, ProtocolSpecializer},
Expand Down Expand Up @@ -108,6 +110,7 @@ pub struct DeviceHandle {
last_output_command: Arc<DashMap<Uuid, CheckedOutputCmdV4>>,
stop_commands: Arc<Vec<ButtplugDeviceCommandMessageUnionV4>>,
internal_hw_msg_sender: Sender<Vec<HardwareCommand>>,
output_observation_sender: Option<broadcast::Sender<OutputObservation>>,
}

impl DeviceHandle {
Expand All @@ -119,6 +122,7 @@ impl DeviceHandle {
identifier: UserDeviceIdentifier,
stop_commands: Vec<ButtplugDeviceCommandMessageUnionV4>,
internal_hw_msg_sender: Sender<Vec<HardwareCommand>>,
output_observation_sender: Option<broadcast::Sender<OutputObservation>>,
) -> Self {
Self {
hardware,
Expand All @@ -129,6 +133,7 @@ impl DeviceHandle {
last_output_command: Arc::new(DashMap::new()),
stop_commands: Arc::new(stop_commands),
internal_hw_msg_sender,
output_observation_sender,
}
}

Expand Down Expand Up @@ -268,6 +273,18 @@ impl DeviceHandle {
self
.last_output_command
.insert(msg.feature_id(), msg.clone());

if let Some(sender) = &self.output_observation_sender {
// OutputType derives Display via strum, producing clean names like "Vibrate", "Rotate".
// The design uses format!("{:?}") but to_string() is preferred for clean output.
let _ = sender.send(OutputObservation {
device_index: msg.device_index(),
feature_index: msg.feature_index(),
output_type: msg.output_command().as_output_type().to_string(),
value: msg.output_command().value() as f64,
});
}

self.handle_generic_command_result(self.handler.handle_output_cmd(msg))
}

Expand Down Expand Up @@ -437,6 +454,7 @@ pub(super) async fn build_device_handle(
mut hardware_connector: Box<dyn HardwareConnector>,
protocol_specializers: Vec<ProtocolSpecializer>,
device_event_sender: tokio::sync::mpsc::Sender<InternalDeviceEvent>,
output_observation_sender: Option<broadcast::Sender<OutputObservation>>,
) -> Result<DeviceHandle, ButtplugDeviceError> {
// At this point, we know we've got hardware that is waiting to connect, and enough protocol
// info to actually do something after we connect. So go ahead and connect.
Expand Down Expand Up @@ -571,6 +589,7 @@ pub(super) async fn build_device_handle(
identifier,
stop_commands,
internal_hw_msg_sender,
output_observation_sender,
);

// If we need a keepalive with a packet replay, set this up via stopping the device on connect.
Expand Down
2 changes: 1 addition & 1 deletion crates/buttplug_server/src/device/device_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async fn run_device_task(

// Priority 2: Batch deadline reached - flush pending commands
_ = batch_fut => {
debug!("Batch deadline reached, sending {} commands", pending_commands.len());
trace!("Batch deadline reached, sending {} commands", pending_commands.len());
while let Some(cmd) = pending_commands.pop_front() {
let _ = hardware.parse_message(&cmd).await;
if track_keepalive {
Expand Down
2 changes: 2 additions & 0 deletions crates/buttplug_server/src/device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ pub mod protocol;
pub mod protocol_impl;
mod server_device_manager;
mod server_device_manager_event_loop;
mod output_observation;

pub use device_handle::{DeviceCommand, DeviceEvent, DeviceHandle};
pub use output_observation::OutputObservation;

use crate::message::ButtplugServerDeviceMessage;
use buttplug_server_device_config::UserDeviceIdentifier;
Expand Down
16 changes: 16 additions & 0 deletions crates/buttplug_server/src/device/output_observation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Buttplug Rust Source Code File - See https://buttplug.io for more info.
//
// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
//
// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
// for full license information.

//! Output observation type for output command tracking and observability

#[derive(Clone, Debug)]
pub struct OutputObservation {
pub device_index: u32,
pub feature_index: u32,
pub output_type: String,
pub value: f64,
}
23 changes: 23 additions & 0 deletions crates/buttplug_server/src/device/server_device_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
ButtplugServerResultFuture,
device::{
DeviceHandle,
OutputObservation,
hardware::communication::{HardwareCommunicationManager, HardwareCommunicationManagerBuilder},
server_device_manager_event_loop::ServerDeviceManagerEventLoop,
},
Expand Down Expand Up @@ -72,13 +73,15 @@ pub struct ServerDeviceInfo {
pub struct ServerDeviceManagerBuilder {
device_configuration_manager: Arc<DeviceConfigurationManager>,
comm_managers: Vec<Box<dyn HardwareCommunicationManagerBuilder>>,
emit_output_observations: bool,
}

impl ServerDeviceManagerBuilder {
pub fn new(device_configuration_manager: DeviceConfigurationManager) -> Self {
Self {
device_configuration_manager: Arc::new(device_configuration_manager),
comm_managers: vec![],
emit_output_observations: false,
}
}

Expand All @@ -88,6 +91,7 @@ impl ServerDeviceManagerBuilder {
Self {
device_configuration_manager,
comm_managers: vec![],
emit_output_observations: false,
}
}

Expand All @@ -99,6 +103,11 @@ impl ServerDeviceManagerBuilder {
self
}

pub fn emit_output_observations(&mut self, enabled: bool) -> &mut Self {
self.emit_output_observations = enabled;
self
}

pub fn finish(&mut self) -> Result<ServerDeviceManager, ButtplugServerError> {
let (device_command_sender, device_command_receiver) = mpsc::channel(256);
let (device_event_sender, device_event_receiver) = mpsc::channel(256);
Expand Down Expand Up @@ -149,6 +158,11 @@ impl ServerDeviceManagerBuilder {
let loop_cancellation_token = CancellationToken::new();

let output_sender = broadcast::channel(255).0;
let output_observation_sender = if self.emit_output_observations {
Some(broadcast::channel(256).0)
} else {
None
};

let mut event_loop = ServerDeviceManagerEventLoop::new(
comm_managers,
Expand All @@ -158,6 +172,7 @@ impl ServerDeviceManagerBuilder {
output_sender.clone(),
device_event_receiver,
device_command_receiver,
output_observation_sender.clone(),
);
buttplug_core::spawn!("ServerDeviceManager event loop", async move {
event_loop.run().await;
Expand All @@ -169,6 +184,7 @@ impl ServerDeviceManagerBuilder {
loop_cancellation_token,
running: Arc::new(AtomicBool::new(true)),
output_sender,
output_observation_sender,
})
}
}
Expand All @@ -183,6 +199,7 @@ pub struct ServerDeviceManager {
loop_cancellation_token: CancellationToken,
running: Arc<AtomicBool>,
output_sender: broadcast::Sender<ButtplugServerMessageV4>,
output_observation_sender: Option<broadcast::Sender<OutputObservation>>,
}

impl ServerDeviceManager {
Expand All @@ -192,6 +209,12 @@ impl ServerDeviceManager {
convert_broadcast_receiver_to_stream(self.output_sender.subscribe())
}

pub fn output_observation_stream(&self) -> Option<impl Stream<Item = OutputObservation>> {
self.output_observation_sender.as_ref().map(|sender| {
convert_broadcast_receiver_to_stream(sender.subscribe())
})
}

fn start_scanning(&self) -> ButtplugServerResultFuture {
let command_sender = self.device_command_sender.clone();
async move {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::server_device_manager::DeviceManagerCommand;
use crate::device::{
DeviceHandle,
InternalDeviceEvent,
OutputObservation,
device_handle::build_device_handle,
hardware::communication::{HardwareCommunicationManager, HardwareCommunicationManagerEvent},
protocol::ProtocolManager,
Expand Down Expand Up @@ -64,6 +65,8 @@ pub(super) struct ServerDeviceManagerEventLoop {
loop_cancellation_token: CancellationToken,
/// Protocol map, for mapping user definitions to protocols
protocol_manager: ProtocolManager,
/// Optional sender for output observations, None when disabled
output_observation_sender: Option<broadcast::Sender<OutputObservation>>,
}

impl ServerDeviceManagerEventLoop {
Expand All @@ -75,6 +78,7 @@ impl ServerDeviceManagerEventLoop {
server_sender: broadcast::Sender<ButtplugServerMessageV4>,
device_comm_receiver: mpsc::Receiver<HardwareCommunicationManagerEvent>,
device_command_receiver: mpsc::Receiver<DeviceManagerCommand>,
output_observation_sender: Option<broadcast::Sender<OutputObservation>>,
) -> Self {
let (device_event_sender, device_event_receiver) = mpsc::channel(256);
Self {
Expand All @@ -90,6 +94,7 @@ impl ServerDeviceManagerEventLoop {
connecting_devices: Arc::new(DashSet::new()),
loop_cancellation_token,
protocol_manager: ProtocolManager::default(),
output_observation_sender,
}
}

Expand Down Expand Up @@ -283,6 +288,7 @@ impl ServerDeviceManagerEventLoop {

let device_config_manager = self.device_config_manager.clone();
let connecting_devices = self.connecting_devices.clone();
let output_observation_sender = self.output_observation_sender.clone();
let span = info_span!(
"device creation",
name = tracing::field::display(name),
Expand All @@ -299,6 +305,7 @@ impl ServerDeviceManagerEventLoop {
creator,
protocol_specializers,
device_event_sender_for_forwarding,
output_observation_sender,
)
.await
{
Expand Down
8 changes: 7 additions & 1 deletion crates/buttplug_server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::server_message_conversion::ButtplugServerDeviceEventMessageConverter;

use super::{
ButtplugServerResultFuture,
device::ServerDeviceManager,
device::{OutputObservation, ServerDeviceManager},
message::{
ButtplugClientMessageVariant,
ButtplugServerMessageVariant,
Expand Down Expand Up @@ -194,6 +194,12 @@ impl ButtplugServer {
self.device_manager.clone()
}

/// Returns a stream of output observations if emission is enabled, otherwise None.
/// This stream emits observations for all output commands processed through the server.
pub fn output_observation_stream(&self) -> Option<impl Stream<Item = OutputObservation>> {
self.device_manager.output_observation_stream()
}

/// If true, client is currently connected to the server.
pub fn connected(&self) -> bool {
matches!(
Expand Down
1 change: 1 addition & 0 deletions crates/buttplug_tests/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ Tests run across multiple protocol spec versions (v0–v4) via version-specific
- `test_serializers.rs` — Message serialization/deserialization across protocol versions
- `test_message_downgrades.rs` — Protocol version downgrade path tests
- `test_disabled_device_features.rs` — Tests for user config feature disabling
- `test_output_observations.rs` — Integration tests for output observability (observation stream, filtering, multi-device, enable/disable)
- `test_websocket_connectors.rs` / `test_websocket_device_comm_manager.rs` — WebSocket transport integration tests
Loading