From fe997464b33e4279006d6bb7d5ecf0928633418a Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Sat, 20 Jun 2026 17:48:42 +0200 Subject: [PATCH] Review ws teardown flow in ASGI proto --- src/asgi/io.rs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/asgi/io.rs b/src/asgi/io.rs index 5216a0df..9118ec84 100644 --- a/src/asgi/io.rs +++ b/src/asgi/io.rs @@ -355,6 +355,7 @@ pub(crate) struct ASGIWebsocketProtocol { init_tx: Arc, init_event: Arc, closed: Arc, + teardown: Arc, } impl ASGIWebsocketProtocol { @@ -378,6 +379,7 @@ impl ASGIWebsocketProtocol { init_tx: Arc::new(false.into()), init_event: Arc::new(Notify::new()), closed: Arc::new(false.into()), + teardown: Arc::new(Notify::new()), } } @@ -464,17 +466,22 @@ impl ASGIWebsocketProtocol { fn send_message<'p>(&self, py: Python<'p>, data: Message) -> PyResult> { let transport = self.ws_tx.clone(); let closed = self.closed.clone(); + let teardown = self.teardown.clone(); future_into_py_futlike(self.rt.clone(), py, async move { if let Some(ws) = &mut *(transport.lock().await) { - match ws.send(data).await { - Ok(()) => return FutureResultToPy::None, - _ => { - if closed.load(atomic::Ordering::Acquire) { - log::info!("Attempted to write to a closed websocket"); - return FutureResultToPy::None; + tokio::select! { + biased; + res = ws.send(data) => match res { + Ok(()) => return FutureResultToPy::None, + _ => { + if closed.load(atomic::Ordering::Acquire) { + log::info!("Attempted to write to a closed websocket"); + return FutureResultToPy::None; + } } - } + }, + () = teardown.notified() => return FutureResultToPy::None, } } FutureResultToPy::Err(error_flow!("Transport not initialized or closed")) @@ -510,11 +517,13 @@ impl ASGIWebsocketProtocol { Option>, WebsocketDetachedTransport, ) { - let mut ws_rx = self.ws_rx.blocking_lock(); + self.closed.store(true, atomic::Ordering::Release); + self.teardown.notify_one(); let mut ws_tx = self.ws_tx.blocking_lock(); + let ws_rx = self.ws_rx.try_lock().map_or(None, |mut guard| guard.take()); ( self.tx.lock().unwrap().take(), - WebsocketDetachedTransport::new(self.consumed(), ws_rx.take(), ws_tx.take(), None), + WebsocketDetachedTransport::new(self.consumed(), ws_rx, ws_tx.take(), None), ) } }