From 084e1670c58bc62888228f4ecb1bbd6c15912aba Mon Sep 17 00:00:00 2001 From: Brent Rager Date: Sun, 14 Jun 2026 00:57:13 -0400 Subject: [PATCH] =?UTF-8?q?SMOODEV-1892:=20Redis=20+=20NATS=20Backplane=20?= =?UTF-8?q?backends=20=E2=80=94=20cross-pod=20scale-out?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two distributed Backplane impls behind the existing trait, as sibling adapter crates (mirroring adapters/postgres): RedisBackplane (Redis/Valkey pub/sub) and NatsBackplane (NATS subjects). Each wraps a per-pod InMemoryBackplane for local registry + delivery and adds a bus for cross-pod fan-out — publish() delivers to local sockets, then broadcasts a BackplaneEnvelope so every other pod re-resolves the Target against its own registry and delivers to its sockets (origin skips its echo). Same publish() now reaches a socket on any replica → horizontal scale-out and the cross-pod path for non-AI publishers. - lib: Target is Serialize/Deserialize; new shared BackplaneEnvelope wire type. - server: SMOOTH_AGENT_BACKPLANE (memory|redis|valkey|nats) + _URL selection in build_state_from_env_async; default stays single-process in-memory. - tests: cross-pod fan-out proven end-to-end over real Redis + NATS via testcontainers (skip without Docker, like the PG adapter). Co-Authored-By: Claude Opus 4.8 (1M context) --- .changeset/backplane-impls.md | 5 + rust/Cargo.lock | 267 +++++++++++++++++- rust/Cargo.toml | 2 + rust/adapters/backplane-nats/Cargo.toml | 29 ++ rust/adapters/backplane-nats/src/lib.rs | 151 ++++++++++ .../backplane-nats/tests/cross_pod.rs | 111 ++++++++ rust/adapters/backplane-redis/Cargo.toml | 31 ++ rust/adapters/backplane-redis/src/lib.rs | 164 +++++++++++ .../backplane-redis/tests/cross_pod.rs | 119 ++++++++ rust/smooth-operator-server/Cargo.toml | 4 + rust/smooth-operator-server/src/config.rs | 2 + rust/smooth-operator-server/src/server.rs | 57 ++++ rust/smooth-operator/src/backplane.rs | 25 +- 13 files changed, 959 insertions(+), 8 deletions(-) create mode 100644 .changeset/backplane-impls.md create mode 100644 rust/adapters/backplane-nats/Cargo.toml create mode 100644 rust/adapters/backplane-nats/src/lib.rs create mode 100644 rust/adapters/backplane-nats/tests/cross_pod.rs create mode 100644 rust/adapters/backplane-redis/Cargo.toml create mode 100644 rust/adapters/backplane-redis/src/lib.rs create mode 100644 rust/adapters/backplane-redis/tests/cross_pod.rs diff --git a/.changeset/backplane-impls.md b/.changeset/backplane-impls.md new file mode 100644 index 0000000..ef4deb6 --- /dev/null +++ b/.changeset/backplane-impls.md @@ -0,0 +1,5 @@ +--- +'@smooai/smooth-operator': minor +--- + +Distributed Backplane backends (SMOODEV-1892): `RedisBackplane` and `NatsBackplane` — the horizontal scale-out seam. Both implement the `Backplane` trait by wrapping a per-pod `InMemoryBackplane` for local registry + delivery and adding a pub/sub bus (Redis/Valkey channel or NATS subject) for cross-pod fan-out: `publish(Target, event)` delivers to local sockets immediately, then broadcasts a `BackplaneEnvelope` so every other pod re-resolves the target against its own registry and delivers to its sockets (the origin pod skips its own echo). This makes the same `publish` call reach a socket on any replica — required to run the WS service with >1 pod, and the cross-pod path for non-AI publishers. Selected at runtime via `SMOOTH_AGENT_BACKPLANE` (`memory` | `redis`/`valkey` | `nats`) + `SMOOTH_AGENT_BACKPLANE_URL`; default stays single-process in-memory. `Target` is now `Serialize`/`Deserialize` and a shared `BackplaneEnvelope` is exposed so a host's own transport adapter can speak the same wire format. New crates: `adapters/backplane-redis`, `adapters/backplane-nats` (cross-pod fan-out proven end-to-end over real Redis + NATS via testcontainers). diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 7dd1e25..d996d6e 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -95,6 +95,42 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-nats" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76433c4de73442daedb3a59e991d94e85c14ebfc33db53dfcd347a21cd6ef4f8" +dependencies = [ + "base64", + "bytes", + "futures", + "memchr", + "nkeys", + "nuid", + "once_cell", + "pin-project", + "portable-atomic", + "rand 0.8.6", + "regex", + "ring", + "rustls-native-certs 0.7.3", + "rustls-pemfile", + "rustls-webpki 0.102.8", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 1.0.69", + "time", + "tokio", + "tokio-rustls 0.26.4", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -457,7 +493,7 @@ dependencies = [ "pin-project-lite", "rustls 0.21.12", "rustls 0.23.40", - "rustls-native-certs", + "rustls-native-certs 0.8.4", "rustls-pki-types", "tokio", "tokio-rustls 0.26.4", @@ -763,7 +799,7 @@ dependencies = [ "log", "pin-project-lite", "rustls 0.23.40", - "rustls-native-certs", + "rustls-native-certs 0.8.4", "rustls-pemfile", "rustls-pki-types", "serde", @@ -973,6 +1009,20 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -985,6 +1035,16 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -1324,6 +1384,7 @@ dependencies = [ "ed25519", "serde", "sha2 0.10.9", + "signature", "subtle", "zeroize", ] @@ -1957,7 +2018,7 @@ dependencies = [ "hyper-util", "log", "rustls 0.23.40", - "rustls-native-certs", + "rustls-native-certs 0.8.4", "tokio", "tokio-rustls 0.26.4", "tower-service", @@ -2202,6 +2263,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -2455,6 +2525,21 @@ dependencies = [ "uuid", ] +[[package]] +name = "nkeys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom 0.2.17", + "log", + "rand 0.8.6", + "signatory", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -2464,6 +2549,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.6", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -2612,6 +2706,12 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + [[package]] name = "openssl-probe" version = "0.2.1" @@ -2977,7 +3077,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", - "itertools", + "itertools 0.14.0", "proc-macro2", "quote", "syn", @@ -3176,6 +3276,30 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" +[[package]] +name = "redis" +version = "0.27.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures-util", + "itertools 0.13.0", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.5.10", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -3412,16 +3536,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe 0.1.6", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dab5152771c58876a2146916e53e35057e1a4dfa2b9df0f0305b07f611fdea4d" dependencies = [ - "openssl-probe", + "openssl-probe 0.2.1", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 3.7.0", ] [[package]] @@ -3453,6 +3590,17 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.13" @@ -3558,6 +3706,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.13.0", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + [[package]] name = "security-framework" version = "3.7.0" @@ -3565,7 +3726,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" dependencies = [ "bitflags 2.13.0", - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "libc", "security-framework-sys", @@ -3634,6 +3795,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_path_to_error" version = "0.1.20" @@ -3735,6 +3905,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -3782,6 +3958,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "2.2.0" @@ -3853,6 +4041,38 @@ dependencies = [ "uuid", ] +[[package]] +name = "smooai-smooth-operator-adapter-backplane-nats" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-nats", + "async-trait", + "futures-util", + "serde_json", + "smooai-smooth-operator", + "testcontainers", + "tokio", + "tracing", + "uuid", +] + +[[package]] +name = "smooai-smooth-operator-adapter-backplane-redis" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures-util", + "redis", + "serde_json", + "smooai-smooth-operator", + "testcontainers", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "smooai-smooth-operator-adapter-dynamodb" version = "0.1.0" @@ -4035,6 +4255,8 @@ dependencies = [ "serde", "serde_json", "smooai-smooth-operator", + "smooai-smooth-operator-adapter-backplane-nats", + "smooai-smooth-operator-adapter-backplane-redis", "smooai-smooth-operator-adapter-dynamodb", "smooai-smooth-operator-adapter-memory", "smooai-smooth-operator-adapter-postgres", @@ -4478,6 +4700,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-sink", + "http 1.4.2", + "httparse", + "rand 0.8.6", + "ring", + "rustls-native-certs 0.8.4", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.4", + "tokio-util", +] + [[package]] name = "toml" version = "0.8.23" @@ -4729,6 +4972,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tungstenite" version = "0.26.2" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 62df763..4f8f5b9 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -5,6 +5,8 @@ members = [ "adapters/in-memory", "adapters/postgres", "adapters/dynamodb", + "adapters/backplane-redis", + "adapters/backplane-nats", "smooth-operator-server", "smooth-operator-lambda", "ingestion", diff --git a/rust/adapters/backplane-nats/Cargo.toml b/rust/adapters/backplane-nats/Cargo.toml new file mode 100644 index 0000000..e5c023b --- /dev/null +++ b/rust/adapters/backplane-nats/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "smooai-smooth-operator-adapter-backplane-nats" +version = "0.1.0" +edition.workspace = true +license.workspace = true +repository.workspace = true +description = "NATS-backed distributed Backplane for smooth-operator — cross-pod WebSocket event fan-out over NATS subjects (scale-out + a shared event bus)." + +[lib] +name = "smooth_operator_adapter_backplane_nats" + +[dependencies] +smooth-operator = { workspace = true } +async-trait = { workspace = true } +anyhow = { workspace = true } +serde_json = { workspace = true } +uuid = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +futures-util = "0.3" +# NATS client (rustls by default — no OpenSSL, matching the workspace posture). +async-nats = "0.38" + +[dev-dependencies] +serde_json = { workspace = true } +tokio = { workspace = true } +# Throwaway NATS for the cross-pod fan-out integration test; skips when Docker +# is unavailable so CI without a daemon stays green (mirrors the PG adapter). +testcontainers = "0.24" diff --git a/rust/adapters/backplane-nats/src/lib.rs b/rust/adapters/backplane-nats/src/lib.rs new file mode 100644 index 0000000..570cbcd --- /dev/null +++ b/rust/adapters/backplane-nats/src/lib.rs @@ -0,0 +1,151 @@ +//! NATS [`Backplane`] — cross-pod scale-out + a shared event bus. +//! +//! Same shape as the Redis adapter (a per-pod [`InMemoryBackplane`] for local +//! registry + delivery, plus a bus for cross-pod fan-out) but over NATS subjects. +//! NATS is attractive here beyond raw fan-out: queue groups, JetStream +//! persistence/replay, and the fact that the same broker doubles as the +//! platform's multi-channel event bus — so non-AI publishers (job status, +//! ingestion progress, notifications) and other services can share it. +//! +//! Per call: +//! - [`attach`](Backplane::attach) / [`detach`](Backplane::detach) / +//! [`associate`](Backplane::associate) are **local** (a connection lives on one +//! pod). +//! - [`publish`](Backplane::publish) delivers to local sinks immediately +//! (returning that count), then publishes a [`BackplaneEnvelope`] on the +//! subject. Every pod's subscriber re-resolves the [`Target`] against its own +//! registry; the origin pod skips its own echo. + +use std::sync::Arc; + +use anyhow::Result; +use async_trait::async_trait; +use futures_util::StreamExt; +use serde_json::Value; +use uuid::Uuid; + +use smooth_operator::backplane::{ + Backplane, BackplaneEnvelope, InMemoryBackplane, LocalSink, Target, +}; + +/// Default subject. Override via [`NatsBackplane::connect_on_subject`]. +pub const DEFAULT_SUBJECT: &str = "smooth-operator.backplane"; + +/// A [`Backplane`] that fans `publish` out across pods over NATS. +pub struct NatsBackplane { + /// Per-pod registry + local delivery, shared with the subscriber task. + local: Arc, + /// This pod's id — stamped on outgoing envelopes so we skip our own echo. + pod_id: String, + /// NATS client (cheap to clone; used for publishing). + client: async_nats::Client, + /// Subject events are published on. + subject: String, + /// The background subscriber; aborted on drop. + subscriber: tokio::task::JoinHandle<()>, +} + +impl NatsBackplane { + /// Connect to `url` (e.g. `nats://nats:4222`) and fan out on + /// [`DEFAULT_SUBJECT`]. + /// + /// # Errors + /// Returns an error if the connection or subscription can't be established. + pub async fn connect(url: &str) -> Result { + Self::connect_on_subject(url, DEFAULT_SUBJECT).await + } + + /// Connect to `url`, broadcasting + subscribing on `subject` (lets two + /// independent deployments share a broker without crosstalk). + /// + /// # Errors + /// Returns an error if the connection or subscription can't be established. + pub async fn connect_on_subject(url: &str, subject: &str) -> Result { + let client = async_nats::connect(url).await?; + + let pod_id = Uuid::new_v4().to_string(); + let local = Arc::new(InMemoryBackplane::new()); + + // Subscribe BEFORE returning so the pod receives as soon as it exists. + let mut sub = client.subscribe(subject.to_string()).await?; + + let sub_local = Arc::clone(&local); + let sub_pod = pod_id.clone(); + let subscriber = tokio::spawn(async move { + while let Some(msg) = sub.next().await { + let env: BackplaneEnvelope = match serde_json::from_slice(&msg.payload) { + Ok(e) => e, + Err(e) => { + tracing::warn!(error = %e, "backplane: undecodable envelope, dropping"); + continue; + } + }; + // Our own publish already delivered locally — don't double-deliver. + if env.origin == sub_pod { + continue; + } + sub_local.publish(env.target, env.event).await; + } + }); + + Ok(Self { + local, + pod_id, + client, + subject: subject.to_string(), + subscriber, + }) + } + + /// Number of connections attached **to this pod** (local registry only). + #[must_use] + pub fn connection_count(&self) -> usize { + self.local.connection_count() + } +} + +impl Drop for NatsBackplane { + fn drop(&mut self) { + self.subscriber.abort(); + } +} + +#[async_trait] +impl Backplane for NatsBackplane { + async fn attach(&self, conn_id: &str, sink: LocalSink) { + self.local.attach(conn_id, sink).await; + } + + async fn detach(&self, conn_id: &str) { + self.local.detach(conn_id).await; + } + + async fn associate(&self, conn_id: &str, target: Target) { + self.local.associate(conn_id, target).await; + } + + async fn publish(&self, target: Target, event: Value) -> usize { + // Deliver to this pod's sockets now (the returned count). + let n = self.local.publish(target.clone(), event.clone()).await; + + // Fan out to every other pod. + let envelope = BackplaneEnvelope { + origin: self.pod_id.clone(), + target, + event, + }; + match serde_json::to_vec(&envelope) { + Ok(payload) => { + if let Err(e) = self + .client + .publish(self.subject.clone(), payload.into()) + .await + { + tracing::warn!(error = %e, "backplane: nats publish failed; cross-pod delivery skipped"); + } + } + Err(e) => tracing::warn!(error = %e, "backplane: envelope serialize failed"), + } + n + } +} diff --git a/rust/adapters/backplane-nats/tests/cross_pod.rs b/rust/adapters/backplane-nats/tests/cross_pod.rs new file mode 100644 index 0000000..1794d90 --- /dev/null +++ b/rust/adapters/backplane-nats/tests/cross_pod.rs @@ -0,0 +1,111 @@ +//! Cross-pod fan-out (SMOODEV-1892): two `NatsBackplane` instances sharing one +//! NATS broker stand in for two pods. A socket lives on pod B; an event +//! published on pod A still reaches pod B's socket over the bus — horizontal +//! scale-out. +//! +//! Requires a Docker daemon for the throwaway NATS. Skips (returns Ok) when +//! Docker is unavailable so CI without one stays green (mirrors the PG adapter). +//! +//! Both behaviours live in one test sharing a single container — one +//! container-per-binary keeps the suite robust under parallel test execution. + +use std::sync::mpsc::{channel, Receiver}; +use std::sync::Arc; +use std::time::Duration; + +use serde_json::{json, Value}; + +use testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers::runners::AsyncRunner; +use testcontainers::{ContainerAsync, GenericImage}; + +use smooth_operator::backplane::{Backplane, LocalSink, Target}; +use smooth_operator_adapter_backplane_nats::NatsBackplane; + +/// Start a throwaway `nats:2.10-alpine`. Returns `Ok(None)` if Docker is +/// unavailable so the caller skips rather than fails. +async fn start_nats() -> anyhow::Result, String)>> { + let image = GenericImage::new("nats", "2.10-alpine") + .with_exposed_port(4222.tcp()) + .with_wait_for(WaitFor::message_on_stderr("Server is ready")); + + match image.start().await { + Ok(node) => { + let host = node.get_host().await?; + let port = node.get_host_port_ipv4(4222).await?; + Ok(Some((node, format!("nats://{host}:{port}")))) + } + Err(e) => { + eprintln!("SKIP: could not start nats container (Docker unavailable?): {e}"); + Ok(None) + } + } +} + +fn sink() -> (LocalSink, Receiver) { + let (tx, rx) = channel::(); + ( + Arc::new(move |v| { + let _ = tx.send(v); + }), + rx, + ) +} + +fn recv_within(rx: &Receiver) -> Option { + for _ in 0..200 { + if let Ok(v) = rx.try_recv() { + return Some(v); + } + std::thread::sleep(Duration::from_millis(10)); + } + None +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn nats_backplane_fans_out_across_pods() -> anyhow::Result<()> { + let Some((_node, url)) = start_nats().await? else { + return Ok(()); // Docker unavailable — skip. + }; + + let pod_a = NatsBackplane::connect(&url).await?; + let pod_b = NatsBackplane::connect(&url).await?; + + let (s, rx) = sink(); + pod_b.attach("conn-b", s).await; + pod_b + .associate("conn-b", Target::Agent("agent-9".into())) + .await; + assert_eq!(pod_a.connection_count(), 0); + assert_eq!(pod_b.connection_count(), 1); + + // Let pod B's subscriber task start polling before we publish. In prod the + // per-pod subscriber is live at boot, long before any socket attaches, so + // this only aligns the test with that ordering — it masks no real loss. + tokio::time::sleep(Duration::from_millis(300)).await; + + // (1) Cross-pod delivery. + let local = pod_a + .publish( + Target::Agent("agent-9".into()), + json!({"kind": "notify", "n": 1}), + ) + .await; + assert_eq!(local, 0, "pod A delivers to 0 local sockets"); + let got = recv_within(&rx).expect("pod B's socket should receive the cross-pod event"); + assert_eq!(got, json!({"kind": "notify", "n": 1})); + + // (2) No double-deliver on the owning pod. + let n = pod_b + .publish(Target::Agent("agent-9".into()), json!("once")) + .await; + assert_eq!(n, 1, "one local delivery on the owning pod"); + assert_eq!(recv_within(&rx), Some(json!("once"))); + std::thread::sleep(Duration::from_millis(250)); + assert!( + rx.try_recv().is_err(), + "no double delivery from the bus echo" + ); + + Ok(()) +} diff --git a/rust/adapters/backplane-redis/Cargo.toml b/rust/adapters/backplane-redis/Cargo.toml new file mode 100644 index 0000000..c6f3253 --- /dev/null +++ b/rust/adapters/backplane-redis/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "smooai-smooth-operator-adapter-backplane-redis" +version = "0.1.0" +edition.workspace = true +license.workspace = true +repository.workspace = true +description = "Redis/Valkey-backed distributed Backplane for smooth-operator — cross-pod WebSocket event fan-out over Redis pub/sub (the horizontal scale-out seam)." + +[lib] +name = "smooth_operator_adapter_backplane_redis" + +[dependencies] +smooth-operator = { workspace = true } +async-trait = { workspace = true } +anyhow = { workspace = true } +serde_json = { workspace = true } +uuid = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +futures-util = "0.3" +# Async Redis/Valkey client + pub/sub (tokio-comp pulls the `aio` async API, +# incl. `get_async_pubsub`). No TLS feature → no OpenSSL; in-cluster Valkey is +# plaintext. A host needing TLS enables a redis TLS feature in its own build. +redis = { version = "0.27", features = ["tokio-comp"] } + +[dev-dependencies] +serde_json = { workspace = true } +tokio = { workspace = true } +# Throwaway Redis for the cross-pod fan-out integration test; skips when Docker +# is unavailable so CI without a daemon stays green (mirrors the PG adapter). +testcontainers = "0.24" diff --git a/rust/adapters/backplane-redis/src/lib.rs b/rust/adapters/backplane-redis/src/lib.rs new file mode 100644 index 0000000..b6318f3 --- /dev/null +++ b/rust/adapters/backplane-redis/src/lib.rs @@ -0,0 +1,164 @@ +//! Redis / Valkey [`Backplane`] — the horizontal scale-out seam. +//! +//! The default [`InMemoryBackplane`] only reaches sockets held by the current +//! process, so with more than one replica an event produced on pod A can't reach +//! a socket on pod B. [`RedisBackplane`] closes that gap **without changing the +//! trait or any call site**: it keeps a per-pod [`InMemoryBackplane`] for the +//! local registry + delivery, and adds a Redis pub/sub bus for cross-pod fan-out. +//! +//! Per call: +//! - [`attach`](Backplane::attach) / [`detach`](Backplane::detach) / +//! [`associate`](Backplane::associate) are **local** — a connection lives on +//! exactly one pod, so only that pod registers it. +//! - [`publish`](Backplane::publish) delivers to local sinks immediately +//! (returning that count), then broadcasts a [`BackplaneEnvelope`] on the bus. +//! Every pod's background subscriber re-resolves the envelope's [`Target`] +//! against *its own* registry and delivers to its sockets — so the same +//! `publish` call fans out across the whole fleet. The origin pod skips its own +//! echo (it already delivered). +//! +//! This is the classic pub/sub fan-out (the shape Socket.IO's Redis adapter +//! uses): no shared connection registry, each pod authoritative for its own +//! sockets. + +use std::sync::Arc; + +use anyhow::Result; +use async_trait::async_trait; +use futures_util::StreamExt; +use redis::AsyncCommands; +use serde_json::Value; +use uuid::Uuid; + +use smooth_operator::backplane::{ + Backplane, BackplaneEnvelope, InMemoryBackplane, LocalSink, Target, +}; + +/// Default pub/sub channel. Override via [`RedisBackplane::connect_on_channel`]. +pub const DEFAULT_CHANNEL: &str = "smooth-operator:backplane"; + +/// A [`Backplane`] that fans `publish` out across pods over Redis/Valkey pub/sub. +pub struct RedisBackplane { + /// Per-pod registry + local delivery. Shared with the subscriber task so + /// remote envelopes deliver to the same sinks. + local: Arc, + /// This pod's id — stamped on outgoing envelopes so we skip our own echo. + pod_id: String, + /// Cloneable multiplexed connection used for publishing. + publisher: redis::aio::MultiplexedConnection, + /// Channel events are broadcast on. + channel: String, + /// The background subscriber; aborted on drop. + subscriber: tokio::task::JoinHandle<()>, +} + +impl RedisBackplane { + /// Connect to `url` (e.g. `redis://valkey:6379`) and fan out on + /// [`DEFAULT_CHANNEL`]. + /// + /// # Errors + /// Returns an error if the URL is invalid or the connection / subscription + /// can't be established. + pub async fn connect(url: &str) -> Result { + Self::connect_on_channel(url, DEFAULT_CHANNEL).await + } + + /// Connect to `url`, broadcasting + subscribing on `channel` (lets two + /// independent deployments share a Redis without crosstalk). + /// + /// # Errors + /// Returns an error if the URL is invalid or the connection / subscription + /// can't be established. + pub async fn connect_on_channel(url: &str, channel: &str) -> Result { + let client = redis::Client::open(url)?; + let publisher = client.get_multiplexed_async_connection().await?; + + let pod_id = Uuid::new_v4().to_string(); + let local = Arc::new(InMemoryBackplane::new()); + + // Subscribe BEFORE returning so the pod is receiving as soon as it exists. + let mut pubsub = client.get_async_pubsub().await?; + pubsub.subscribe(channel).await?; + + let sub_local = Arc::clone(&local); + let sub_pod = pod_id.clone(); + let subscriber = tokio::spawn(async move { + let mut stream = pubsub.on_message(); + while let Some(msg) = stream.next().await { + let payload = msg.get_payload_bytes(); + let env: BackplaneEnvelope = match serde_json::from_slice(payload) { + Ok(e) => e, + Err(e) => { + tracing::warn!(error = %e, "backplane: undecodable envelope, dropping"); + continue; + } + }; + // Our own publish already delivered locally — don't double-deliver. + if env.origin == sub_pod { + continue; + } + sub_local.publish(env.target, env.event).await; + } + }); + + Ok(Self { + local, + pod_id, + publisher, + channel: channel.to_string(), + subscriber, + }) + } + + /// Number of connections attached **to this pod** (local registry only). + #[must_use] + pub fn connection_count(&self) -> usize { + self.local.connection_count() + } +} + +impl Drop for RedisBackplane { + fn drop(&mut self) { + self.subscriber.abort(); + } +} + +#[async_trait] +impl Backplane for RedisBackplane { + async fn attach(&self, conn_id: &str, sink: LocalSink) { + self.local.attach(conn_id, sink).await; + } + + async fn detach(&self, conn_id: &str) { + self.local.detach(conn_id).await; + } + + async fn associate(&self, conn_id: &str, target: Target) { + self.local.associate(conn_id, target).await; + } + + async fn publish(&self, target: Target, event: Value) -> usize { + // Deliver to this pod's sockets now (the returned count). + let n = self.local.publish(target.clone(), event.clone()).await; + + // Fan out to every other pod. + let envelope = BackplaneEnvelope { + origin: self.pod_id.clone(), + target, + event, + }; + match serde_json::to_vec(&envelope) { + Ok(payload) => { + let mut conn = self.publisher.clone(); + if let Err(e) = conn + .publish::<_, _, ()>(self.channel.as_str(), payload) + .await + { + tracing::warn!(error = %e, "backplane: redis publish failed; cross-pod delivery skipped"); + } + } + Err(e) => tracing::warn!(error = %e, "backplane: envelope serialize failed"), + } + n + } +} diff --git a/rust/adapters/backplane-redis/tests/cross_pod.rs b/rust/adapters/backplane-redis/tests/cross_pod.rs new file mode 100644 index 0000000..6ab4af3 --- /dev/null +++ b/rust/adapters/backplane-redis/tests/cross_pod.rs @@ -0,0 +1,119 @@ +//! Cross-pod fan-out (SMOODEV-1892): two `RedisBackplane` instances sharing one +//! Redis stand in for two pods. A connection (sink) lives on pod B; an event +//! published on pod A — where that connection does NOT exist locally — still +//! reaches pod B's sink over the bus. This is horizontal scale-out: the same +//! `publish` call delivers to a socket on another replica. +//! +//! Requires a Docker daemon for the throwaway Redis. Skips (returns Ok) when +//! Docker is unavailable so CI without one stays green (mirrors the PG adapter). +//! +//! Both behaviours live in one test sharing a single container — the two +//! scenarios don't need separate containers, and one-container-per-binary keeps +//! the suite robust under parallel test execution. + +use std::sync::mpsc::{channel, Receiver}; +use std::sync::Arc; +use std::time::Duration; + +use serde_json::{json, Value}; + +use testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers::runners::AsyncRunner; +use testcontainers::{ContainerAsync, GenericImage}; + +use smooth_operator::backplane::{Backplane, LocalSink, Target}; +use smooth_operator_adapter_backplane_redis::RedisBackplane; + +/// Start a throwaway `redis:7-alpine`. Returns `Ok(None)` if Docker is +/// unavailable so the caller skips rather than fails. +async fn start_redis() -> anyhow::Result, String)>> { + let image = GenericImage::new("redis", "7-alpine") + .with_exposed_port(6379.tcp()) + .with_wait_for(WaitFor::message_on_stdout("Ready to accept connections")); + + match image.start().await { + Ok(node) => { + let host = node.get_host().await?; + let port = node.get_host_port_ipv4(6379).await?; + Ok(Some((node, format!("redis://{host}:{port}")))) + } + Err(e) => { + eprintln!("SKIP: could not start redis container (Docker unavailable?): {e}"); + Ok(None) + } + } +} + +/// A test sink feeding a std channel (runtime-agnostic, like the lib's own). +fn sink() -> (LocalSink, Receiver) { + let (tx, rx) = channel::(); + ( + Arc::new(move |v| { + let _ = tx.send(v); + }), + rx, + ) +} + +/// Poll a std channel for up to ~2s (the bus round-trip is async). +fn recv_within(rx: &Receiver) -> Option { + for _ in 0..200 { + if let Ok(v) = rx.try_recv() { + return Some(v); + } + std::thread::sleep(Duration::from_millis(10)); + } + None +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn redis_backplane_fans_out_across_pods() -> anyhow::Result<()> { + let Some((_node, url)) = start_redis().await? else { + return Ok(()); // Docker unavailable — skip. + }; + + // Two pods on the same Redis. + let pod_a = RedisBackplane::connect(&url).await?; + let pod_b = RedisBackplane::connect(&url).await?; + + // A connection for session "s1" lives on pod B only. + let (s, rx) = sink(); + pod_b.attach("conn-b", s).await; + pod_b + .associate("conn-b", Target::Session("s1".into())) + .await; + assert_eq!(pod_a.connection_count(), 0, "pod A holds no sockets"); + assert_eq!(pod_b.connection_count(), 1); + + // Let pod B's subscriber task start polling before we publish. In prod the + // per-pod subscriber is live at boot, long before any socket attaches, so + // this only aligns the test with that ordering — it masks no real loss. + tokio::time::sleep(Duration::from_millis(300)).await; + + // (1) Cross-pod: publish on pod A — it has no local socket for s1 (0 local + // deliveries) but the event must still cross the bus to pod B's socket. + let local = pod_a + .publish( + Target::Session("s1".into()), + json!({"kind": "job_status", "state": "done"}), + ) + .await; + assert_eq!(local, 0, "pod A delivers to 0 local sockets"); + let got = recv_within(&rx).expect("pod B's socket should receive the cross-pod event"); + assert_eq!(got, json!({"kind": "job_status", "state": "done"})); + + // (2) No double-deliver: publishing on the pod that HOLDS the socket delivers + // exactly once (locally); the echoed bus message must be skipped. + let n = pod_b + .publish(Target::Session("s1".into()), json!("once")) + .await; + assert_eq!(n, 1, "one local delivery on the owning pod"); + assert_eq!(recv_within(&rx), Some(json!("once"))); + std::thread::sleep(Duration::from_millis(250)); + assert!( + rx.try_recv().is_err(), + "no double delivery from the bus echo" + ); + + Ok(()) +} diff --git a/rust/smooth-operator-server/Cargo.toml b/rust/smooth-operator-server/Cargo.toml index 6ff180c..52f1aac 100644 --- a/rust/smooth-operator-server/Cargo.toml +++ b/rust/smooth-operator-server/Cargo.toml @@ -21,6 +21,10 @@ smooai-smooth-operator-adapter-memory = { path = "../adapters/in-memory" } # configured storage backend (Postgres / DynamoDB; default in-memory). smooai-smooth-operator-adapter-postgres = { path = "../adapters/postgres" } smooai-smooth-operator-adapter-dynamodb = { path = "../adapters/dynamodb" } +# Distributed Backplane backends for horizontal scale-out, selected at runtime +# via SMOOTH_AGENT_BACKPLANE (default in-memory / single-process). +smooai-smooth-operator-adapter-backplane-redis = { path = "../adapters/backplane-redis" } +smooai-smooth-operator-adapter-backplane-nats = { path = "../adapters/backplane-nats" } # Admin API surfaces indexing-run status via the ingestion crate's IndexingStore. smooai-smooth-operator-ingestion = { path = "../ingestion" } diff --git a/rust/smooth-operator-server/src/config.rs b/rust/smooth-operator-server/src/config.rs index a5be134..0ecabf9 100644 --- a/rust/smooth-operator-server/src/config.rs +++ b/rust/smooth-operator-server/src/config.rs @@ -19,6 +19,8 @@ //! | `SMOOTH_AGENT_MAX_ITERATIONS` | `6` | Agent-loop iteration cap per turn. | //! | `SMOOTH_AGENT_MAX_TOKENS` | `512` | `max_tokens` sent to the gateway (kept low — paid endpoint). | //! | `SMOOTH_AGENT_STORAGE` | `memory` | Storage backend: `memory` \| `postgres` \| `dynamodb`. | +//! | `SMOOTH_AGENT_BACKPLANE` | `memory` | Connection backplane: `memory` (single-process) \| `redis`/`valkey` \| `nats`. A distributed backend is required for >1 replica and to let non-AI publishers push events via `Backplane::publish`. | +//! | `SMOOTH_AGENT_BACKPLANE_URL` | *(unset)* | Bus URL for `redis`/`nats` (e.g. `redis://valkey:6379`, `nats://nats:4222`); falls back to `SMOOTH_AGENT_REDIS_URL` / `SMOOTH_AGENT_NATS_URL`. | //! | `WIDGET_AUTH_STRICT` | *(unset → `false`)* | Fail-closed embeddable-widget auth: when `1`/`true`, a session for an agent the [`WidgetAuthProvider`](smooth_operator::widget_auth::WidgetAuthProvider) has no policy for is rejected. Origin + `authContext` are always enforced for policied agents. | //! //! ### Auth (load-bearing — the admin API's `require_role` reads these) diff --git a/rust/smooth-operator-server/src/server.rs b/rust/smooth-operator-server/src/server.rs index ce73946..1525e5a 100644 --- a/rust/smooth-operator-server/src/server.rs +++ b/rust/smooth-operator-server/src/server.rs @@ -179,9 +179,66 @@ pub async fn build_state_from_env_async(config: ServerConfig) -> Result1 replica (otherwise an event produced +/// on one pod can't reach a socket on another) and to let non-AI publishers push +/// realtime events via `Backplane::publish`. +/// +/// # Errors +/// Returns an error for an unknown backend value, a missing url, or a failed +/// connection — fail loud at boot rather than silently run single-process. +async fn install_backplane_from_env(state: AppState) -> Result { + let kind = std::env::var("SMOOTH_AGENT_BACKPLANE") + .unwrap_or_default() + .trim() + .to_lowercase(); + + let url = |specific: &str| -> Result { + std::env::var("SMOOTH_AGENT_BACKPLANE_URL") + .or_else(|_| std::env::var(specific)) + .map_err(|_| { + anyhow::anyhow!( + "{kind} backplane selected but neither SMOOTH_AGENT_BACKPLANE_URL nor {specific} is set" + ) + }) + }; + + match kind.as_str() { + "" | "memory" | "inmemory" => Ok(state), // default InMemoryBackplane already installed + "redis" | "valkey" => { + use smooth_operator_adapter_backplane_redis::RedisBackplane; + let backplane = RedisBackplane::connect(&url("SMOOTH_AGENT_REDIS_URL")?) + .await + .map_err(|e| anyhow::anyhow!("connecting Redis backplane: {e}"))?; + Ok(state.with_backplane(Arc::new(backplane))) + } + "nats" => { + use smooth_operator_adapter_backplane_nats::NatsBackplane; + let backplane = NatsBackplane::connect(&url("SMOOTH_AGENT_NATS_URL")?) + .await + .map_err(|e| anyhow::anyhow!("connecting NATS backplane: {e}"))?; + Ok(state.with_backplane(Arc::new(backplane))) + } + other => Err(anyhow::anyhow!( + "unknown SMOOTH_AGENT_BACKPLANE '{other}' (expected: memory | redis | valkey | nats)" + )), + } +} + /// Seed a couple of distinctive demo docs so knowledge-grounded E2E is /// deterministic. The 17-day return window is deliberately unusual so an /// ungrounded answer can't accidentally match it. Both docs are tagged into the diff --git a/rust/smooth-operator/src/backplane.rs b/rust/smooth-operator/src/backplane.rs index 2bd86df..88256a3 100644 --- a/rust/smooth-operator/src/backplane.rs +++ b/rust/smooth-operator/src/backplane.rs @@ -30,6 +30,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use serde_json::Value; /// A connection's local delivery sink: given an event, write it to that @@ -40,7 +41,11 @@ pub type LocalSink = Arc; /// A delivery target: a single connection, or every connection associated with a /// session / user / org / agent. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +/// +/// `Serialize`/`Deserialize` so a distributed [`Backplane`] (Redis/NATS) can put +/// the target on the wire alongside the event, and each pod re-resolve it against +/// *its* local registry. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum Target { /// One specific connection. Connection(String), @@ -54,6 +59,24 @@ pub enum Target { Agent(String), } +/// The wire format a distributed [`Backplane`] broadcasts on its bus: who +/// published it (so a pod can skip its own echo), the [`Target`] to re-resolve +/// locally, and the event payload. +/// +/// Shared so the Redis and NATS adapters — and any host's own transport adapter — +/// speak the same envelope; a pod on the other end deserializes this and calls +/// `publish` on its **local** [`InMemoryBackplane`] to fan out to its sockets. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BackplaneEnvelope { + /// Opaque id of the publishing node. A receiver compares it to its own id and + /// skips the message if equal — the origin already delivered locally. + pub origin: String, + /// The delivery target, re-resolved against the receiving pod's registry. + pub target: Target, + /// The event payload, delivered verbatim to matching local sinks. + pub event: Value, +} + /// The connection backplane: a per-pod sink registry + cross-pod event delivery. /// /// Implementations must be cheap to clone behind an `Arc` and safe to share