Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,69 @@
# ryder-bridge-rust
Bridge to ryder device in rust
# ryder-bridge

The bridge translates json http requests to serial commands and back.
A bridge to facilitate connections between Ryder devices and other applications through a WebSocket interface.

## Running the bridge
### Linux
On linux following packages are required: rustup/cargo, build-essential, libudev-dev
## Building and running

Requires Rust 1.61 or higher.

First, clone the repository:

```
git clone https://github.com/Light-Labs/ryder-bridge-rust/
```

Then, the bridge can be built with:

```
cargo build --release
```

and ran with:

```
cargo run <ryder-port from simulator> <server url>
./target/release/ryder-bridge <device serial port> <listening address>
```

where `<device serial port>` is the path to the serial port of the Ryder device or simulator. Applications can then connect to the bridge through `<listening address>`.

A simple bridge client is provided for testing, and can be ran with:

```
e.g.
cargo run --bin client -- <WebSocket address>
```
cargo run /dev/pts/10 localhost:8080
```

where `<WebSocket address>` is a URL to the bridge, such as `ws://localhost:8888`. 2-digit hexadecimal inputs can be entered to be sent to the bridge, and any responses from the bridge or device are printed.

### Running tests

Unit and integration tests can be ran with:

```
cargo test --all -- --test-threads 1
```

or simply `make test`. The tests will launch the bridge with a default listening port of 8080, but this can be changed by setting the `RYDER_BRIDGE_TEST_PORT` environment variable.

## Bridge responses for clients

The bridge may return several responses in addition to those received from the Ryder device itself. All of these extra responses are text messages, whereas device responses are binary messages.

Multiple clients may connect to the bridge at one time, and they will be placed in a queue and served in the order they connected in.

When a client connects, it will receive one of two responses:

- `RESPONSE_WAIT_IN_QUEUE` if another client currently has access to the device and this client must wait in the queue.
- `RESPONSE_BEING_SERVED` otherwise.

If the client is placed in the queue, it will eventually receive one of two responses:

- `RESPONSE_BEING_SERVED` if the client has moved to the front of the queue and is now being served.
- `RESPONSE_BRIDGE_SHUTDOWN` if the bridge itself is shutting down. The client will be disconnected.

Once `RESPONSE_BEING_SERVED` has been returned, all future responses will be one of the following:

- `RESPONSE_BRIDGE_SHUTDOWN` if the bridge itself is shutting down. The client will be disconnected.
- `RESPONSE_DEVICE_NOT_CONNECTED` if the device is not currently connected. This is only returned immediately after `RESPONSE_BEING_SERVED`. The client will be disconnected.
- `RESPONSE_DEVICE_DISCONNECTED` if the device has disconnected. This is returned only in cases where `RESPONSE_DEVICE_NOT_CONNECTED` is not. The client will be disconnected.

All other responses are binary messages from the device.
24 changes: 10 additions & 14 deletions mock/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ type WSOutgoingSink = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Mess

/// A simple client that connects to the bridge and allows sending data and receiving responses.
pub struct WSClient {
incoming: WSIncomingStream,
outgoing: WSOutgoingSink,
/// Whether the connection has been closed.
terminated: bool,
// `Some` if the connection is open, or `None` otherwise
connection: Option<(WSOutgoingSink, WSIncomingStream)>,
}

impl WSClient {
/// Returns a new `WSClient` that connects to `ws://localhost:port`.
pub async fn new(port: u16) -> Result<Self, Error> {
let url = Url::parse(&format!("ws://localhost:{}", port)).unwrap();
let url = Url::parse(&format!("ws://127.0.0.1:{}", port)).unwrap();
let (stream, _) = select! {
s = tokio_tungstenite::connect_async(url).fuse() => s?,
_ = time::sleep(Duration::from_millis(1000)).fuse() => {
Expand All @@ -39,24 +37,22 @@ impl WSClient {
let (outgoing, incoming) = stream.split();

Ok(WSClient {
incoming,
outgoing,
terminated: false,
connection: Some((outgoing, incoming)),
})
}

/// Sends `message` to the bridge.
pub async fn send(&mut self, message: Message) -> Result<(), Error> {
self.outgoing.send(message).await
self.connection.as_mut().unwrap().0.send(message).await
}

/// Waits for the next response from the bridge and returns `Some(Ok)` if one was received,
/// `Some(Err)` if there was an error, or `None` if there was no error but no more responses
/// will be received.
pub async fn next_response(&mut self) -> Option<Result<Message, Error>> {
// Only call `next` if `None` has not been returned and the connection is still open
if !self.terminated {
let res = self.incoming.next().await;
if let Some((_, ref mut incoming)) = self.connection {
let res = incoming.next().await;

if res.is_none() {
let _ = self.close();
Expand All @@ -71,9 +67,9 @@ impl WSClient {
/// Closes the client's WebSocket connection.
pub async fn close(&mut self) -> Result<(), Error> {
// Only close the connection if it is open
if !self.terminated {
self.terminated = true;
self.outgoing.close().await
// `take` is used so the connection is dropped to fully close it
if let Some((mut outgoing, _)) = self.connection.take() {
outgoing.close().await
} else {
Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions mock/src/serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serialport::{ClearBuffer, DataBits, FlowControl, Parity, SerialPort, StopBit
use std::io::{self, Read, Write};
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;

/// A simple serial port implementation that echoes any data written to it. This type is a handle
Expand Down Expand Up @@ -82,6 +83,9 @@ impl Read for TestPort {
return Err(io::ErrorKind::TimedOut.into());
}

// Wait before returning data to simulate delayed responses
thread::sleep(Duration::from_millis(50));

// Read bytes equal to the smaller of the lengths of the target buffer and the internal
// buffer
let bytes = buf.len().min(port_buf.len());
Expand Down
26 changes: 17 additions & 9 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::queue::TicketNotifier;
/// Returned if another client is currently controlling the device. The current client is placed in
/// a queue.
pub const RESPONSE_WAIT_IN_QUEUE: &str = "RESPONSE_WAIT_IN_QUEUE";
/// Returned to clients after [`RESPONSE_WAIT_IN_QUEUE`] when they move to the front of the queue.
/// Returned to clients when they are at the front of the queue. This includes when a client first
/// connects if the queue is empty.
pub const RESPONSE_BEING_SERVED: &str = "RESPONSE_BEING_SERVED";
/// Returned to the current client if the device disconnects for any reason.
pub const RESPONSE_DEVICE_DISCONNECTED: &str = "RESPONSE_DEVICE_DISCONNECTED";
Expand Down Expand Up @@ -102,7 +103,9 @@ impl WSConnection {
self.state = State::Active(active);
}
State::Active(s) => {
s.process().await;
// Discard any errors here because they were already printed and the connection
// is shutting down anyways
let _ = s.process().await;
break;
}
}
Expand Down Expand Up @@ -162,17 +165,14 @@ impl Waiting {
close(&mut self.shared.ws_outgoing).await;
return Err(());
},
// This connection is being served now
_ = &mut self.ticket_rx => {
// Notify the client that the device is ready
send_or_close(&mut self.shared.ws_outgoing, RESPONSE_BEING_SERVED).await?;
}
// The bridge is shutting down
_ = self.shared.terminate_rx.changed().fuse() => {
let _ = self.shared.ws_outgoing.send(Message::text(RESPONSE_BRIDGE_SHUTDOWN)).await;
close(&mut self.shared.ws_outgoing).await;
return Err(());
}
// This connection is being served now
_ = &mut self.ticket_rx => {}
}

// The connection is now being served, so return the next state
Expand All @@ -194,7 +194,10 @@ impl Active {

/// Relays data between the WebSocket client and the serial port IO server until the connection
/// is closed, the bridge shuts down, or the serial device disconnects.
async fn process(mut self) {
async fn process(mut self) -> Result<(), ()> {
// Notify the client that it has access to the device
send_or_close(&mut self.shared.ws_outgoing, RESPONSE_BEING_SERVED).await?;

// Take control of the serial client connection
let mut serial_client = self.shared.serial_client
.try_lock()
Expand All @@ -209,13 +212,16 @@ impl Active {
let serial_tx = tx;
let serial_rx = rx;

// Clear data intended for previous clients
while let Ok(Some(_)) = serial_rx.try_next() {}

// If the serial device is not connected, notify the client and return
if *device_state.borrow() == DeviceState::NotConnected {
let _ = self.shared
.ws_outgoing
.send(Message::text(RESPONSE_DEVICE_NOT_CONNECTED)).await;
close(&mut self.shared.ws_outgoing).await;
return;
return Ok(());
}

// Set up message receiver for the WebSocket
Expand Down Expand Up @@ -280,6 +286,8 @@ impl Active {

// Close the WebSocket connection
close(&mut self.shared.ws_outgoing).await;

Ok(())
}
}

Expand Down
39 changes: 24 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ use tokio::{signal, task};
use tokio::sync::{watch, Mutex as TokioMutex, oneshot};
use tokio::task::JoinHandle;

use std::net::SocketAddr;
use std::io;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

use crate::queue::ConnectionQueue;
use crate::serial::{OpenPort, Server};
use crate::connection::WSConnection;

/// The result type returned by the bridge.
pub type BridgeResult = Result<(), io::Error>;

/// A token that signals that a `tokio` task is still alive as long as it has not been dropped.
#[derive(Clone)]
pub struct TaskAliveToken(mpsc::Sender<()>);
Expand All @@ -55,18 +58,18 @@ impl BridgeHandle {
/// handle to the bridge's task that should be `await`ed, and a [`BridgeHandle`] that can be used
/// to control the bridge.
pub fn launch(
listening_addr: SocketAddr,
listening_addr: String,
serial_port_path: PathBuf,
) -> (JoinHandle<()>, BridgeHandle) {
) -> (JoinHandle<BridgeResult>, BridgeHandle) {
launch_with_port_open_fn(listening_addr, serial_port_path, serial::open_serial_port)
}

/// Like [`launch`], but uses a custom function for opening the serial port.
pub fn launch_with_port_open_fn<F: OpenPort + 'static>(
listening_addr: SocketAddr,
listening_addr: String,
serial_port_path: PathBuf,
port_open_fn: F,
) -> (JoinHandle<()>, BridgeHandle) {
) -> (JoinHandle<BridgeResult>, BridgeHandle) {
// Create a handle for the bridge
let (bridge_handle, terminate_rx) = BridgeHandle::new();

Expand All @@ -81,17 +84,16 @@ pub fn launch_with_port_open_fn<F: OpenPort + 'static>(
/// Launches the Ryder Bridge for the given serial port and listening address. `handle_terminate_rx`
/// is watched for a signal to terminate the bridge and all connections.
async fn launch_internal(
listening_addr: SocketAddr,
listening_addr: String,
serial_port_path: PathBuf,
port_open_fn: Box<dyn OpenPort>,
handle_terminate_rx: oneshot::Receiver<()>,
) {
println!("Listening on: {}", listening_addr);
println!("Ryder port: {}", serial_port_path.display());

) -> BridgeResult {
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(listening_addr).await;
let listener = try_socket.expect("Failed to bind");
let listener = TcpListener::bind(listening_addr).await?;

println!("Ryder port: {}", serial_port_path.display());
println!("Listening on: {}", listener.local_addr()?);

let queue = Arc::new(Mutex::new(ConnectionQueue::new()));
// Set up channel to wait for all tasks to finish
Expand All @@ -106,6 +108,7 @@ async fn launch_internal(
let (serial_server, serial_client, error) =
Server::with_port_open_fn(serial_port_path, port_open_fn, terminate_rx.clone());
let serial_client = Arc::new(TokioMutex::new(serial_client));
let serial_client_clone = serial_client.clone();

if let Err(e) = error {
eprintln!("Failed to open serial port: {}", e);
Expand Down Expand Up @@ -134,7 +137,7 @@ async fn launch_internal(
let connection = WSConnection::new(
stream,
addr,
serial_client.clone(),
serial_client_clone.clone(),
terminate_rx_copy.clone(),
ticket_rx,
task_alive_token.clone(),
Expand Down Expand Up @@ -207,7 +210,13 @@ async fn launch_internal(
server_handle.await.unwrap();
}

// Ensure that the serial `Client` is dropped only after the serial `Server` exits to avoid
// closed channel issues
let _ = serial_client;

println!("Shutting down");

Ok(())
}

#[cfg(test)]
Expand All @@ -222,8 +231,8 @@ mod tests {
use super::*;

/// Launches the bridge for testing using a nonexistent serial port.
fn launch_bridge_test() -> (JoinHandle<()>, BridgeHandle) {
launch(mock::get_bridge_test_addr(), Path::new("./nonexistent").into())
fn launch_bridge_test() -> (JoinHandle<BridgeResult>, BridgeHandle) {
launch(mock::get_bridge_test_addr().to_string(), Path::new("./nonexistent").into())
}

#[tokio::test]
Expand Down
7 changes: 4 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use clap::Parser;

use std::net::SocketAddr;
use std::path::PathBuf;

#[derive(Parser)]
Expand All @@ -12,7 +11,7 @@ struct Cli {
#[arg(name = "serial port path")]
serial_port: PathBuf,
#[arg(name = "listening address")]
addr: SocketAddr,
addr: String,
}

#[tokio::main]
Expand All @@ -23,5 +22,7 @@ async fn main() {
let (task_handle, _) = ryder_bridge::launch(cli.addr, cli.serial_port);

// Wait for it to close
task_handle.await.unwrap();
if let Err(e) = task_handle.await.unwrap() {
eprintln!("Error: {}", e);
}
}
Loading