fix(hermes): bound WS/SSE connection memory via slow-consumer disconnect + 24h deadline#3782
fix(hermes): bound WS/SSE connection memory via slow-consumer disconnect + 24h deadline#3782keyvankhademi wants to merge 3 commits into
Conversation
…ect + 24h deadline Hermes WS and SSE connections that stalled or stopped reading were never reclaimed — their task, broadcast receiver, and buffers stayed pinned until the OS TCP timeout (hours), growing server memory without bound. The root cause was timeout/deadline logic living inside a poll path that a stalled client freezes: - WS: when a price update is selected, the loop awaits the handler to completion; a slow consumer parks it in `flush().await`, so the sibling `select!` branches (30s ping, 24h deadline) can never fire. - SSE: the deadline and slow-consumer (Lagged) logic lived in the response body stream, which axum/hyper only polls while the socket can accept bytes, so a non-reading client froze it entirely. Fix: - WS: route every socket write through helpers wrapped in a 10s WS_SEND_TIMEOUT; on timeout, drop the connection (SlowConsumer metric). This bounds parked time so the ping and 24h-deadline branches stay reachable. Meter deadline closes (ConnectionTimeout) and keep best-effort teardown closes from being mislabeled as slow consumers. - SSE: drive the broadcast drain + 24h deadline from an independent spawned task that forwards into a bounded 16-slot channel with a 10s send timeout; the response body is just a ReceiverStream. A client that stops reading fills the channel, the send times out, and the broadcast receiver is freed within ~10s regardless of whether the client reads. Slow consumers (lag or send timeout) are now disconnected instead of lingering. - Add sse_slow_consumer_disconnects and sse_connection_timeouts metrics. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
There was a problem hiding this comment.
🚩 No tests for new slow-consumer and timeout logic
Neither the SSE nor the WebSocket modules have test modules (#[cfg(test)]). This PR introduces significant new behavior — spawned producer task, bounded mpsc channel, send timeouts, slow-consumer detection, broadcast lag handling, new metrics — none of which have corresponding tests. Per REVIEW.md: "New functionality should have tests." This is worth raising as a follow-up: integration tests verifying that a slow consumer is disconnected after the timeout, and that metrics are correctly incremented, would substantially increase confidence in these changes.
Was this helpful? React with 👍 or 👎 to provide feedback.
| Some(Err(BroadcastStreamRecvError::Lagged(skipped))) => { | ||
| tracing::info!( | ||
| skipped, | ||
| "SSE client is a slow consumer (broadcast lag); closing connection.", | ||
| ); | ||
| state.metrics.sse_slow_consumer_disconnects.inc(); | ||
| let _ = tx.try_send(Ok(Event::default().event("error").data(format!( | ||
| "Slow consumer: dropped {skipped} updates. Closing connection.", | ||
| )))); | ||
| break; | ||
| } |
There was a problem hiding this comment.
🚩 Behavioral change: broadcast lag now terminates SSE connections instead of continuing
In the old code, when the broadcast channel lagged (BroadcastStreamRecvError::Lagged), an error event was sent to the client and the stream continued (with gaps in price data). In the new code (sse.rs:166-175), broadcast lag is treated as a slow consumer — the connection is closed after a best-effort error event. This is an intentional behavioral change that forces clients to reconnect for fresh data, but it could be surprising for existing clients that previously handled lag gracefully by just continuing to consume the stream.
Was this helpful? React with 👍 or 👎 to provide feedback.
main already had #3769 ("disconnect slow WS/SSE consumers to prevent OOM"), which solves the same problem this branch does but with a different mechanism (tungstenite write-buffer cap + an RPC_DISCONNECT_SLOW_CONSUMERS config flag + protocol-labelled metrics). Per request, this branch's solution is kept for the overlap: - ws.rs, sse.rs, metrics_middleware.rs: kept this branch's versions (per-write WS_SEND_TIMEOUT; SSE producer task + bounded channel; the sse_slow_consumer_disconnects / sse_connection_timeouts counters). - api.rs, config/rpc.rs, rest.rs: reverted #3769's StreamingConfig scaffolding, since this solution is always-on and does not read it (a config flag that silently did nothing would be a footgun). - Cargo.toml: kept the version bump to 0.11.0; dropped #3769's now-unused direct `tungstenite` dependency (it remains transitively via axum). - network/wormhole.rs: kept #3769's `dead_code` allow (orthogonal CI fix). All other incoming main changes (CI workflow bumps, fortuna/quorum/etc.) are taken as-is. Verified: cargo check + clippy clean, 33/33 tests pass.
Problem
The Hermes server suffers unbounded memory growth. The cause is WS and SSE streaming connections that stall (or stop reading) and are never reclaimed — their task, broadcast receiver, and buffers stay pinned until the OS TCP timeout (potentially hours), so connections accumulate and memory grows without bound.
The broadcast channel itself is not the leak (it's capacity-1000 with tiny
{slot}events). The real issue is that the timeout/deadline logic lived inside a poll path that a stalled client freezes:handle_price_feeds_update().awaitto completion. A slow consumer parks it insender.flush().await, so the siblingtokio::select!branches — the 30s ping timeout and the 24h deadline — can never fire. The connection stays pinned indefinitely.Lagged) detection lived inside the response-body stream. In axum 0.6 / hyper 0.14 a response body is only polled while the socket can accept bytes, so a client that stops reading freezes the stream entirely — neither the deadline nor the lag check ever runs.Fix
Implements slow-consumer disconnect and a 24h hard deadline for both transports.
WebSocket (
api/ws.rs)send/feed/flush/close_to_clienthelpers wrapped inWS_SEND_TIMEOUT(10s). On timeout → recordSlowConsumerand drop the connection. This bounds how long a connection can be parked on a send, which in turn keeps the ping and 24h-deadlineselect!branches reliably reachable.Interaction::ConnectionTimeout) and sent best-effort, so an idle/slow client at the deadline isn't also miscounted as a slow consumer.close_clienttimeouts (e.g. mass closes on deploy) are no longer mislabeled either.SSE (
api/rest/v2/sse.rs)sleep_untilagainst the broadcast, and forwards events into a bounded 16-slot channel (SSE_CHANNEL_CAPACITY) with a 10s send timeout (SSE_SEND_TIMEOUT); the response body is just aReceiverStream.errorevent, instead of lingering.Metrics (
api/metrics_middleware.rs)sse_slow_consumer_disconnects,sse_connection_timeouts(plus the WSSlowConsumer/ConnectionTimeoutinteraction labels) so the fix is observable in Prometheus.Behavior change
SSE now disconnects a client that falls >1000 updates behind (previously it emitted an error event and kept streaming). This matches the WS behavior and is documented in the handler doc-comment — clients should reconnect on a
Slow consumer/ timeouterrorevent.Validation
cargo check,cargo clippyclean (only pre-existing protobuf warnings),cargo test33/33 pass.Notes / possible follow-ups (not in this PR)
SO_SNDTIMEO).tokio::time-paused test would be high-value but needs newApiStatetest scaffolding.🤖 Generated with Claude Code