From f02918302748a9280bc1d3eecc4bb9cb62cfa021 Mon Sep 17 00:00:00 2001 From: Borislav Borisov Date: Tue, 7 Apr 2026 11:51:47 +0100 Subject: [PATCH 1/2] Add unreliable_api example demonstrating layer composition Shows Batch composed with Tower's Retry and Timeout layers via ServiceBuilder to batch-write events to a simulated remote API that fails ~30% of the time. Retry with exponential backoff handles transient flush failures transparently. --- README.md | 2 + examples/unreliable_api.rs | 232 +++++++++++++++++++++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 examples/unreliable_api.rs diff --git a/README.md b/README.md index cdf27d5..0799785 100644 --- a/README.md +++ b/README.md @@ -46,11 +46,13 @@ Your inner service must implement `Service>` where `R` is the re See the [`examples/`](examples/) directory: - **[`sqlite_batch`](examples/sqlite_batch.rs)** – batch-insert rows into an in-memory SQLite database using the rarray virtual table. +- **[`unreliable_api`](examples/unreliable_api.rs)** – batch-write events to a simulated unreliable remote API, composing `Batch` with Tower's `Retry` and `Timeout` layers. Run an example with: ```sh cargo run --example sqlite_batch +cargo run --example unreliable_api ``` ## License diff --git a/examples/unreliable_api.rs b/examples/unreliable_api.rs new file mode 100644 index 0000000..442196c --- /dev/null +++ b/examples/unreliable_api.rs @@ -0,0 +1,232 @@ +//! Batch-write events to a simulated unreliable remote API with retry and timeout. +//! +//! Demonstrates composing `tower-batch` with other Tower layers: +//! +//! ```text +//! Client → Batch → Retry → Timeout → UnreliableApi +//! ``` +//! +//! - **Batch** collects individual events and flushes them in groups. +//! - **Retry** retries transient flush failures with exponential backoff. +//! - **Timeout** caps how long a single flush attempt can take. +//! - **`UnreliableApi`** simulates a remote endpoint that fails ~30% of the time. +//! +//! The retry policy only retries `Flush` errors – item buffering is an in-memory +//! operation that never fails. Because the Batch worker drives the inner service +//! sequentially (Item, Item, …, Flush), no new items arrive between a failed +//! Flush and its retry, so the shared pending buffer stays consistent. +//! +//! Run with: `cargo run --example unreliable_api` + +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::time::Duration; + +use tower::retry::Policy; +use tower::{Service, ServiceBuilder, ServiceExt}; +use tower_batch::{BatchControl, BatchLayer, BoxError}; + +// ── Domain types ──────────────────────────────────────────────────────── + +/// A single event to be batch-written to the remote API. +#[allow(dead_code)] +#[derive(Clone)] +struct Event { + payload: String, +} + +// ── Retry policy ──────────────────────────────────────────────────────── + +/// Retry policy that retries flush failures with exponential backoff. +/// +/// Only `Flush` calls are retried – `Item` calls are in-memory and infallible. +#[derive(Clone, Debug)] +struct FlushRetryPolicy { + max_retries: usize, + attempt: usize, +} + +impl FlushRetryPolicy { + fn new(max_retries: usize) -> Self { + Self { + max_retries, + attempt: 0, + } + } +} + +impl Policy, (), BoxError> for FlushRetryPolicy { + type Future = Pin + Send>>; + + fn retry( + &mut self, + req: &mut BatchControl, + result: &mut Result<(), BoxError>, + ) -> Option { + // Only retry Flush failures. + if !matches!(req, BatchControl::Flush) { + return None; + } + + if let Err(e) = result { + if self.attempt < self.max_retries { + self.attempt += 1; + let backoff = Duration::from_millis(10 * (1 << self.attempt)); + tracing::warn!( + attempt = self.attempt, + max = self.max_retries, + backoff_ms = backoff.as_millis(), + error = %e, + "flush failed, retrying" + ); + return Some(Box::pin(tokio::time::sleep(backoff))); + } + tracing::error!( + attempts = self.attempt, + error = %e, + "flush failed, retries exhausted" + ); + } + + // Reset attempt counter on success or when giving up, so the next + // flush starts fresh. + self.attempt = 0; + None + } + + fn clone_request(&mut self, req: &BatchControl) -> Option> { + match req { + BatchControl::Item(event) => Some(BatchControl::Item(event.clone())), + BatchControl::Flush => Some(BatchControl::Flush), + } + } +} + +// ── Simulated unreliable API service ──────────────────────────────────── + +/// Simulates a remote bulk-write endpoint that occasionally fails. +/// +/// Pending items are stored behind an `Arc>` so that they survive +/// a failed flush – the Retry layer replays the Flush call, and the items +/// are still there. +#[derive(Clone)] +struct UnreliableApi { + pending: Arc>>, + flush_count: Arc, + written: Arc, +} + +impl UnreliableApi { + fn new() -> Self { + Self { + pending: Arc::new(Mutex::new(Vec::new())), + flush_count: Arc::new(AtomicUsize::new(0)), + written: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl Service> for UnreliableApi { + type Response = (); + type Error = BoxError; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: BatchControl) -> Self::Future { + match req { + BatchControl::Item(event) => { + self.pending.lock().unwrap().push(event); + Box::pin(std::future::ready(Ok(()))) + } + BatchControl::Flush => { + let pending = self.pending.clone(); + let flush_count = self.flush_count.clone(); + let written = self.written.clone(); + + Box::pin(async move { + // Simulate network latency. + tokio::time::sleep(Duration::from_millis(1)).await; + + let n = flush_count.fetch_add(1, Ordering::SeqCst); + + // Fail every 3rd flush to simulate transient errors. + if n % 3 == 1 { + let count = pending.lock().unwrap().len(); + return Err(format!( + "simulated transient error on flush attempt #{n} ({count} events)" + ) + .into()); + } + + // Success – drain the buffer. + let events = std::mem::take(&mut *pending.lock().unwrap()); + let count = events.len(); + written.fetch_add(count, Ordering::SeqCst); + tracing::info!(flush = n, events = count, "bulk write succeeded"); + Ok(()) + }) + } + } + } +} + +// ── Main ──────────────────────────────────────────────────────────────── + +#[tokio::main] +async fn main() -> Result<(), BoxError> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .init(); + + let api = UnreliableApi::new(); + let written = api.written.clone(); + + // Build the full stack using ServiceBuilder: + // Client → Batch → Retry → Timeout → UnreliableApi + // + // ServiceBuilder applies layers outside-in, so BatchLayer is listed + // first (outermost, what the client sees) and .service(api) is last. + let batch = ServiceBuilder::new() + .layer(BatchLayer::new(10, Duration::from_millis(50))) + .retry(FlushRetryPolicy::new(3)) + .timeout(Duration::from_secs(5)) + .service(api); + + // Spawn 4 tasks, each sending 50 events. + let mut handles = Vec::new(); + for task_id in 0u64..4 { + let mut svc = batch.clone(); + handles.push(tokio::spawn(async move { + for i in 0u64..50 { + svc.ready().await.unwrap(); + svc.call(Event { + payload: format!("task={task_id} seq={i}"), + }) + .await + .unwrap(); + } + tracing::info!(task_id, "task finished sending"); + })); + } + + for handle in handles { + handle.await?; + } + + // Drop the last Batch handle so the worker knows no more requests are + // coming, then give it time to flush. + drop(batch); + tokio::time::sleep(Duration::from_millis(100)).await; + + let total = written.load(Ordering::SeqCst); + tracing::info!(total, "all events written"); + assert_eq!(total, 200, "expected 200 events written"); + + Ok(()) +} From cc38f6cad1447bf10b9c975d41fc2c93e164f58f Mon Sep 17 00:00:00 2001 From: Borislav Borisov Date: Tue, 7 Apr 2026 11:55:10 +0100 Subject: [PATCH 2/2] Run examples in CI --- .github/workflows/ci.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index efa982c..68894b6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,6 +45,11 @@ jobs: with: token: ${{ secrets.CODECOV_TOKEN }} + - name: Run examples + run: | + cargo run --example sqlite_batch + cargo run --example unreliable_api + release: name: Release needs: build