diff --git a/README.md b/README.md index 5e69988..103a5b5 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,69 @@ -# ryder-bridge-rust -Bridge to ryder device in rust +# ryder-bridge -The bridge translates json http requests to serial commands and back. +A bridge to facilitate connections between Ryder devices and other applications through a WebSocket interface. -## Running the bridge -### Linux -On linux following packages are required: rustup/cargo, build-essential, libudev-dev +## Building and running + +Requires Rust 1.61 or higher. + +First, clone the repository: + +``` +git clone https://github.com/Light-Labs/ryder-bridge-rust/ +``` + +Then, the bridge can be built with: + +``` +cargo build --release +``` + +and ran with: ``` -cargo run +./target/release/ryder-bridge +``` + +where `` is the path to the serial port of the Ryder device or simulator. Applications can then connect to the bridge through ``. + +A simple bridge client is provided for testing, and can be ran with: + ``` -e.g. +cargo run --bin client -- ``` -cargo run /dev/pts/10 localhost:8080 -``` + +where `` is a URL to the bridge, such as `ws://localhost:8888`. 2-digit hexadecimal inputs can be entered to be sent to the bridge, and any responses from the bridge or device are printed. + +### Running tests + +Unit and integration tests can be ran with: + +``` +cargo test --all -- --test-threads 1 +``` + +or simply `make test`. The tests will launch the bridge with a default listening port of 8080, but this can be changed by setting the `RYDER_BRIDGE_TEST_PORT` environment variable. + +## Bridge responses for clients + +The bridge may return several responses in addition to those received from the Ryder device itself. All of these extra responses are text messages, whereas device responses are binary messages. + +Multiple clients may connect to the bridge at one time, and they will be placed in a queue and served in the order they connected in. + +When a client connects, it will receive one of two responses: + +- `RESPONSE_WAIT_IN_QUEUE` if another client currently has access to the device and this client must wait in the queue. +- `RESPONSE_BEING_SERVED` otherwise. + +If the client is placed in the queue, it will eventually receive one of two responses: + +- `RESPONSE_BEING_SERVED` if the client has moved to the front of the queue and is now being served. +- `RESPONSE_BRIDGE_SHUTDOWN` if the bridge itself is shutting down. The client will be disconnected. + +Once `RESPONSE_BEING_SERVED` has been returned, all future responses will be one of the following: + +- `RESPONSE_BRIDGE_SHUTDOWN` if the bridge itself is shutting down. The client will be disconnected. +- `RESPONSE_DEVICE_NOT_CONNECTED` if the device is not currently connected. This is only returned immediately after `RESPONSE_BEING_SERVED`. The client will be disconnected. +- `RESPONSE_DEVICE_DISCONNECTED` if the device has disconnected. This is returned only in cases where `RESPONSE_DEVICE_NOT_CONNECTED` is not. The client will be disconnected. + +All other responses are binary messages from the device. diff --git a/mock/src/client.rs b/mock/src/client.rs index cf72458..b37f751 100644 --- a/mock/src/client.rs +++ b/mock/src/client.rs @@ -19,16 +19,14 @@ type WSOutgoingSink = SplitSink>, Mess /// A simple client that connects to the bridge and allows sending data and receiving responses. pub struct WSClient { - incoming: WSIncomingStream, - outgoing: WSOutgoingSink, - /// Whether the connection has been closed. - terminated: bool, + // `Some` if the connection is open, or `None` otherwise + connection: Option<(WSOutgoingSink, WSIncomingStream)>, } impl WSClient { /// Returns a new `WSClient` that connects to `ws://localhost:port`. pub async fn new(port: u16) -> Result { - let url = Url::parse(&format!("ws://localhost:{}", port)).unwrap(); + let url = Url::parse(&format!("ws://127.0.0.1:{}", port)).unwrap(); let (stream, _) = select! { s = tokio_tungstenite::connect_async(url).fuse() => s?, _ = time::sleep(Duration::from_millis(1000)).fuse() => { @@ -39,15 +37,13 @@ impl WSClient { let (outgoing, incoming) = stream.split(); Ok(WSClient { - incoming, - outgoing, - terminated: false, + connection: Some((outgoing, incoming)), }) } /// Sends `message` to the bridge. pub async fn send(&mut self, message: Message) -> Result<(), Error> { - self.outgoing.send(message).await + self.connection.as_mut().unwrap().0.send(message).await } /// Waits for the next response from the bridge and returns `Some(Ok)` if one was received, @@ -55,8 +51,8 @@ impl WSClient { /// will be received. pub async fn next_response(&mut self) -> Option> { // Only call `next` if `None` has not been returned and the connection is still open - if !self.terminated { - let res = self.incoming.next().await; + if let Some((_, ref mut incoming)) = self.connection { + let res = incoming.next().await; if res.is_none() { let _ = self.close(); @@ -71,9 +67,9 @@ impl WSClient { /// Closes the client's WebSocket connection. pub async fn close(&mut self) -> Result<(), Error> { // Only close the connection if it is open - if !self.terminated { - self.terminated = true; - self.outgoing.close().await + // `take` is used so the connection is dropped to fully close it + if let Some((mut outgoing, _)) = self.connection.take() { + outgoing.close().await } else { Ok(()) } diff --git a/mock/src/serial.rs b/mock/src/serial.rs index d8a8646..9cd688b 100644 --- a/mock/src/serial.rs +++ b/mock/src/serial.rs @@ -5,6 +5,7 @@ use serialport::{ClearBuffer, DataBits, FlowControl, Parity, SerialPort, StopBit use std::io::{self, Read, Write}; use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; use std::time::Duration; /// A simple serial port implementation that echoes any data written to it. This type is a handle @@ -82,6 +83,9 @@ impl Read for TestPort { return Err(io::ErrorKind::TimedOut.into()); } + // Wait before returning data to simulate delayed responses + thread::sleep(Duration::from_millis(50)); + // Read bytes equal to the smaller of the lengths of the target buffer and the internal // buffer let bytes = buf.len().min(port_buf.len()); diff --git a/src/connection.rs b/src/connection.rs index 88fd5ea..a81ea45 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -20,7 +20,8 @@ use crate::queue::TicketNotifier; /// Returned if another client is currently controlling the device. The current client is placed in /// a queue. pub const RESPONSE_WAIT_IN_QUEUE: &str = "RESPONSE_WAIT_IN_QUEUE"; -/// Returned to clients after [`RESPONSE_WAIT_IN_QUEUE`] when they move to the front of the queue. +/// Returned to clients when they are at the front of the queue. This includes when a client first +/// connects if the queue is empty. pub const RESPONSE_BEING_SERVED: &str = "RESPONSE_BEING_SERVED"; /// Returned to the current client if the device disconnects for any reason. pub const RESPONSE_DEVICE_DISCONNECTED: &str = "RESPONSE_DEVICE_DISCONNECTED"; @@ -102,7 +103,9 @@ impl WSConnection { self.state = State::Active(active); } State::Active(s) => { - s.process().await; + // Discard any errors here because they were already printed and the connection + // is shutting down anyways + let _ = s.process().await; break; } } @@ -162,17 +165,14 @@ impl Waiting { close(&mut self.shared.ws_outgoing).await; return Err(()); }, - // This connection is being served now - _ = &mut self.ticket_rx => { - // Notify the client that the device is ready - send_or_close(&mut self.shared.ws_outgoing, RESPONSE_BEING_SERVED).await?; - } // The bridge is shutting down _ = self.shared.terminate_rx.changed().fuse() => { let _ = self.shared.ws_outgoing.send(Message::text(RESPONSE_BRIDGE_SHUTDOWN)).await; close(&mut self.shared.ws_outgoing).await; return Err(()); } + // This connection is being served now + _ = &mut self.ticket_rx => {} } // The connection is now being served, so return the next state @@ -194,7 +194,10 @@ impl Active { /// Relays data between the WebSocket client and the serial port IO server until the connection /// is closed, the bridge shuts down, or the serial device disconnects. - async fn process(mut self) { + async fn process(mut self) -> Result<(), ()> { + // Notify the client that it has access to the device + send_or_close(&mut self.shared.ws_outgoing, RESPONSE_BEING_SERVED).await?; + // Take control of the serial client connection let mut serial_client = self.shared.serial_client .try_lock() @@ -209,13 +212,16 @@ impl Active { let serial_tx = tx; let serial_rx = rx; + // Clear data intended for previous clients + while let Ok(Some(_)) = serial_rx.try_next() {} + // If the serial device is not connected, notify the client and return if *device_state.borrow() == DeviceState::NotConnected { let _ = self.shared .ws_outgoing .send(Message::text(RESPONSE_DEVICE_NOT_CONNECTED)).await; close(&mut self.shared.ws_outgoing).await; - return; + return Ok(()); } // Set up message receiver for the WebSocket @@ -280,6 +286,8 @@ impl Active { // Close the WebSocket connection close(&mut self.shared.ws_outgoing).await; + + Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 7ae7e4f..e3e4c7f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ use tokio::{signal, task}; use tokio::sync::{watch, Mutex as TokioMutex, oneshot}; use tokio::task::JoinHandle; -use std::net::SocketAddr; +use std::io; use std::path::PathBuf; use std::sync::{Arc, Mutex}; @@ -29,6 +29,9 @@ use crate::queue::ConnectionQueue; use crate::serial::{OpenPort, Server}; use crate::connection::WSConnection; +/// The result type returned by the bridge. +pub type BridgeResult = Result<(), io::Error>; + /// A token that signals that a `tokio` task is still alive as long as it has not been dropped. #[derive(Clone)] pub struct TaskAliveToken(mpsc::Sender<()>); @@ -55,18 +58,18 @@ impl BridgeHandle { /// handle to the bridge's task that should be `await`ed, and a [`BridgeHandle`] that can be used /// to control the bridge. pub fn launch( - listening_addr: SocketAddr, + listening_addr: String, serial_port_path: PathBuf, -) -> (JoinHandle<()>, BridgeHandle) { +) -> (JoinHandle, BridgeHandle) { launch_with_port_open_fn(listening_addr, serial_port_path, serial::open_serial_port) } /// Like [`launch`], but uses a custom function for opening the serial port. pub fn launch_with_port_open_fn( - listening_addr: SocketAddr, + listening_addr: String, serial_port_path: PathBuf, port_open_fn: F, -) -> (JoinHandle<()>, BridgeHandle) { +) -> (JoinHandle, BridgeHandle) { // Create a handle for the bridge let (bridge_handle, terminate_rx) = BridgeHandle::new(); @@ -81,17 +84,16 @@ pub fn launch_with_port_open_fn( /// Launches the Ryder Bridge for the given serial port and listening address. `handle_terminate_rx` /// is watched for a signal to terminate the bridge and all connections. async fn launch_internal( - listening_addr: SocketAddr, + listening_addr: String, serial_port_path: PathBuf, port_open_fn: Box, handle_terminate_rx: oneshot::Receiver<()>, -) { - println!("Listening on: {}", listening_addr); - println!("Ryder port: {}", serial_port_path.display()); - +) -> BridgeResult { // Create the event loop and TCP listener we'll accept connections on. - let try_socket = TcpListener::bind(listening_addr).await; - let listener = try_socket.expect("Failed to bind"); + let listener = TcpListener::bind(listening_addr).await?; + + println!("Ryder port: {}", serial_port_path.display()); + println!("Listening on: {}", listener.local_addr()?); let queue = Arc::new(Mutex::new(ConnectionQueue::new())); // Set up channel to wait for all tasks to finish @@ -106,6 +108,7 @@ async fn launch_internal( let (serial_server, serial_client, error) = Server::with_port_open_fn(serial_port_path, port_open_fn, terminate_rx.clone()); let serial_client = Arc::new(TokioMutex::new(serial_client)); + let serial_client_clone = serial_client.clone(); if let Err(e) = error { eprintln!("Failed to open serial port: {}", e); @@ -134,7 +137,7 @@ async fn launch_internal( let connection = WSConnection::new( stream, addr, - serial_client.clone(), + serial_client_clone.clone(), terminate_rx_copy.clone(), ticket_rx, task_alive_token.clone(), @@ -207,7 +210,13 @@ async fn launch_internal( server_handle.await.unwrap(); } + // Ensure that the serial `Client` is dropped only after the serial `Server` exits to avoid + // closed channel issues + let _ = serial_client; + println!("Shutting down"); + + Ok(()) } #[cfg(test)] @@ -222,8 +231,8 @@ mod tests { use super::*; /// Launches the bridge for testing using a nonexistent serial port. - fn launch_bridge_test() -> (JoinHandle<()>, BridgeHandle) { - launch(mock::get_bridge_test_addr(), Path::new("./nonexistent").into()) + fn launch_bridge_test() -> (JoinHandle, BridgeHandle) { + launch(mock::get_bridge_test_addr().to_string(), Path::new("./nonexistent").into()) } #[tokio::test] diff --git a/src/main.rs b/src/main.rs index 5be7e3d..a395038 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,6 @@ use clap::Parser; -use std::net::SocketAddr; use std::path::PathBuf; #[derive(Parser)] @@ -12,7 +11,7 @@ struct Cli { #[arg(name = "serial port path")] serial_port: PathBuf, #[arg(name = "listening address")] - addr: SocketAddr, + addr: String, } #[tokio::main] @@ -23,5 +22,7 @@ async fn main() { let (task_handle, _) = ryder_bridge::launch(cli.addr, cli.serial_port); // Wait for it to close - task_handle.await.unwrap(); + if let Err(e) = task_handle.await.unwrap() { + eprintln!("Error: {}", e); + } } diff --git a/tests/tests.rs b/tests/tests.rs index 9e32b94..1113c06 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -34,7 +34,7 @@ where // Launch the bridge let addr = mock::get_bridge_test_addr(); let (bridge_task_handle, bridge_handle) = ryder_bridge::launch_with_port_open_fn( - addr, + addr.to_string(), Path::new("./nonexistent").into(), move |_: &Path| test_port.try_clone(), ); @@ -59,7 +59,7 @@ where } // Wait for the bridge to close - bridge_task_handle.await.unwrap(); + bridge_task_handle.await.unwrap().unwrap(); // Verify that the test succeeded assert!(result.is_ok()); @@ -78,6 +78,12 @@ async fn test_echo() { run_test(|bridge_port, _, _| async move { let mut client = WSClient::new(bridge_port).await.unwrap(); + // There is no queue + assert_eq!( + next_response_timeout(&mut client).await.unwrap(), + Message::text(RESPONSE_BEING_SERVED), + ); + // No response is available yet assert!(next_response_timeout(&mut client).await.is_err()); @@ -106,7 +112,13 @@ async fn test_device_not_connected() { let mut client = WSClient::new(bridge_port).await.unwrap(); - // The bridge notifies the client and disconnects it + // There is no queue + assert_eq!( + next_response_timeout(&mut client).await.unwrap(), + Message::text(RESPONSE_BEING_SERVED), + ); + + // The bridge notifies the client about the device state and disconnects it assert_eq!( next_response_timeout(&mut client).await.unwrap(), Message::text(RESPONSE_DEVICE_NOT_CONNECTED), @@ -127,6 +139,12 @@ async fn run_test_device_disconnected( let mut client_1 = WSClient::new(bridge_port).await.unwrap(); let mut client_2 = WSClient::new(bridge_port).await.unwrap(); + // The first client is the first in the queue + assert_eq!( + next_response_timeout(&mut client_1).await.unwrap(), + Message::text(RESPONSE_BEING_SERVED), + ); + // The second client must wait in the queue assert_eq!( next_response_timeout(&mut client_2).await.unwrap(), @@ -148,8 +166,8 @@ async fn run_test_device_disconnected( Message::Close(None), ); - // The second client is in the queue, so it first receives `RESPONSE_DEVICE_READY` when the - // first client disconnects + // The second client is in the queue, so it first receives `RESPONSE_DEVICE_BEING_SERVED` + // when the first client disconnects assert_eq!( next_response_timeout(&mut client_2).await.unwrap(), Message::text(RESPONSE_BEING_SERVED), @@ -186,6 +204,12 @@ async fn test_bridge_shutdown() { let mut client_1 = WSClient::new(bridge_port).await.unwrap(); let mut client_2 = WSClient::new(bridge_port).await.unwrap(); + // The first client is the first in the queue + assert_eq!( + next_response_timeout(&mut client_1).await.unwrap(), + Message::text(RESPONSE_BEING_SERVED), + ); + // The second client must wait in the queue assert_eq!( next_response_timeout(&mut client_2).await.unwrap(), @@ -225,6 +249,12 @@ async fn test_queue() { let mut client_1 = WSClient::new(bridge_port).await.unwrap(); let mut client_2 = WSClient::new(bridge_port).await.unwrap(); + // The first client is the first in the queue + assert_eq!( + next_response_timeout(&mut client_1).await.unwrap(), + Message::text(RESPONSE_BEING_SERVED), + ); + // The second client must wait in the queue assert_eq!( next_response_timeout(&mut client_2).await.unwrap(), @@ -259,3 +289,38 @@ async fn test_queue() { ); }).await; } + +#[tokio::test] +async fn test_discard_data_if_no_clients() { + run_test(|bridge_port, _, _| async move { + let mut client_1 = WSClient::new(bridge_port).await.unwrap(); + + // There is no queue + assert_eq!( + next_response_timeout(&mut client_1).await.unwrap(), + Message::text(RESPONSE_BEING_SERVED), + ); + + // A client sends data but disconnects before receiving a response + client_1.send(Message::binary([1, 2, 3])).await.unwrap(); + client_1.close().await.unwrap(); + + // Give the bridge time to receive the response to the previous message + // Otherwise, the bridge will receive the message while the second client is already + // connected, and it will send it to that client without discarding it. This is not possible + // to prevent in general, as the serial device may have arbitrary delays in its responses. + time::sleep(Duration::from_millis(250)).await; + + // A second client connects, but it should not receive the response that could not be sent + // to the first client + let mut client_2 = WSClient::new(bridge_port).await.unwrap(); + + // There is no queue because the first client has already disconnected + assert_eq!( + next_response_timeout(&mut client_2).await.unwrap(), + Message::text(RESPONSE_BEING_SERVED), + ); + + assert!(next_response_timeout(&mut client_2).await.is_err()); + }).await; +}