diff --git a/ntex-net/CHANGES.md b/ntex-net/CHANGES.md index b46f54d01..8befae90e 100644 --- a/ntex-net/CHANGES.md +++ b/ntex-net/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [Unreleased] + +* polling (neon) driver: do not panic with "called `Option::unwrap()` on a `None` + value" when a filter emits a large (>= write_buf_threshold) write burst during + read processing; the reentrant write is now deferred until the streams slab is + released instead of re-taking it + ## [3.12.0] - 2026-05-15 * Update ntex-io diff --git a/ntex-net/src/polling/io.rs b/ntex-net/src/polling/io.rs index d089cfd77..9fd2f8a49 100644 --- a/ntex-net/src/polling/io.rs +++ b/ntex-net/src/polling/io.rs @@ -3,7 +3,7 @@ use std::{any, future::poll_fn, task::Poll}; use ntex_io::{Handle, IoContext, Readiness, types}; use ntex_rt::spawn; -use super::stream::{StreamCtl, StreamItem, WeakStreamCtl}; +use super::stream::{StreamCtl, WeakStreamCtl}; impl ntex_io::IoStream for super::TcpStream { fn start(self, ctx: IoContext) -> Box { @@ -39,7 +39,7 @@ impl Handle for HandleWrapper { } fn write(&self, _: &IoContext) { - self.0.with(StreamItem::write_direct); + self.0.write(); } } diff --git a/ntex-net/src/polling/stream.rs b/ntex-net/src/polling/stream.rs index 89a396a6c..e82ef8c09 100644 --- a/ntex-net/src/polling/stream.rs +++ b/ntex-net/src/polling/stream.rs @@ -54,6 +54,8 @@ struct StreamOpsInner { api: DriverApi, delayed_drop: Cell, delayed_feed: Cell>>, + delayed_write: Cell, + write_feed: Cell>>, streams: Cell>>>, } @@ -67,6 +69,8 @@ impl StreamOps { api, delayed_drop: Cell::new(false), delayed_feed: Cell::new(Some(Vec::new())), + delayed_write: Cell::new(false), + write_feed: Cell::new(Some(Vec::new())), streams: Cell::new(Some(Box::new(Slab::new()))), }); inner = Some(ops.clone()); @@ -201,6 +205,7 @@ impl Handler for StreamOpsHandler { } } self.inner.delayed_feed.take(); + self.inner.write_feed.take(); } } @@ -211,10 +216,56 @@ impl StreamOpsInner { { let mut streams = self.streams.take().unwrap(); let result = f(&mut streams); + if self.delayed_write.get() { + self.check_delayed_writes(&mut streams); + } self.streams.set(Some(streams)); result } + /// Initiate write operation for the stream + fn write_stream(&self, id: u32) { + if let Some(mut streams) = self.streams.take() { + if let Some(item) = streams.get_mut(id as usize) { + item.write(); + } + if self.delayed_write.get() { + self.check_delayed_writes(&mut streams); + } + self.streams.set(Some(streams)); + } else { + // Write is initiated while `StreamOps` is handling an event + // (e.g. a filter generated data during read processing). + // Delay the write until the streams slab is released, + // dropping it would stall the connection. + self.delayed_write.set(true); + if let Some(mut feed) = self.write_feed.take() { + feed.push(id); + self.write_feed.set(Some(feed)); + } + } + } + + /// Process writes that were requested while the streams slab was taken + fn check_delayed_writes(&self, streams: &mut Slab) { + while let Some(mut feed) = self.write_feed.take() { + if feed.is_empty() { + self.write_feed.set(Some(feed)); + self.delayed_write.set(false); + break; + } + let ids = mem::take(&mut feed); + self.write_feed.set(Some(feed)); + for id in ids { + if let Some(item) = streams.get_mut(id as usize) + && !item.ctx.is_stopped() + { + item.write(); + } + } + } + } + fn drop_stream(&self, id: u32) { // Dropping while `StreamOps` handling event if let Some(mut streams) = self.streams.take() { @@ -389,11 +440,9 @@ impl WeakStreamCtl { self.inner.with(|streams| f(&streams[self.id as usize].io)) } - pub(super) fn with(&self, f: F) -> R - where - F: FnOnce(&mut StreamItem) -> R, - { - self.inner.with(|streams| f(&mut streams[self.id as usize])) + /// Initiate write operation for the stream + pub(super) fn write(&self) { + self.inner.write_stream(self.id); } } @@ -412,10 +461,6 @@ impl StreamItem { self.ctx.tag() } - pub(super) fn write_direct(&mut self) { - self.write(); - } - fn write(&mut self) -> IoTaskStatus { let res = self.ctx.with_write_buf(|wrt| { let mut pages: [Option; MAX_WRITE_ITEMS] = [ diff --git a/ntex/tests/io_filter.rs b/ntex/tests/io_filter.rs new file mode 100644 index 000000000..5cc9be058 --- /dev/null +++ b/ntex/tests/io_filter.rs @@ -0,0 +1,104 @@ +//! Io filter integration tests. +use std::io::{Read, Write}; +use std::{cell::Cell, io, net, time::Duration}; + +use ntex::codec::BytesCodec; +use ntex::io::{FilterBuf, FilterLayer, Io}; +use ntex::server::test_server; +use ntex::service::fn_service; +use ntex::util::Bytes; + +/// Must be greater than or equal to `IoConfig::write_buf_threshold` +/// (8192 by default), so that the io stream initiates a direct +/// (in-place) write while the read buffer is being processed. +const BURST_SIZE: usize = 16 * 1024; + +/// Filter that emits a large burst of write data while processing +/// incoming read data, similar to a TLS filter that writes handshake +/// or control data in response to incoming records. +#[derive(Debug, Default)] +struct BurstWriteFilter { + sent: Cell, +} + +impl FilterLayer for BurstWriteFilter { + fn process_read_buf(&self, buf: &FilterBuf<'_>) -> io::Result<()> { + // pass incoming data through + let got_data = buf.with_read_buffers(|src, dst| { + if let Some(src) = src.take() { + dst.extend_from_slice(&src); + !src.is_empty() + } else { + false + } + }); + + // the first incoming data triggers a large write + if got_data && !self.sent.get() { + self.sent.set(true); + buf.with_write_buffers(|_, dst| { + dst.extend_from_slice(&[b'x'; BURST_SIZE]); + }); + } + Ok(()) + } + + fn process_write_buf(&self, buf: &FilterBuf<'_>) -> io::Result<()> { + // pass outgoing data through + buf.with_write_buffers(|src, dst| { + if !src.is_empty() { + src.move_to(dst); + } + }); + Ok(()) + } +} + +/// A filter that produces a write burst (>= `write_buf_threshold`) +/// during read-buffer processing must not break the io stream. +/// +/// Regression test for the polling (neon) backend: the driver used to +/// panic on reentrant slab access ("called `Option::unwrap()` on a +/// `None` value") when a filter generated enough write data during +/// read processing to trigger a direct write. +#[ntex::test] +async fn test_filter_large_write_during_read_processing() { + let srv = test_server(async || { + fn_service(|io: Io| async move { + let io = io.add_filter(BurstWriteFilter::default()); + // notify the client that the filter is installed + io.send(Bytes::from_static(b"hi"), &BytesCodec) + .await + .unwrap(); + // echo service + while let Ok(Some(msg)) = io.recv(&BytesCodec).await { + io.send(msg, &BytesCodec).await.unwrap(); + } + Ok::<_, io::Error>(()) + }) + }); + + let mut client = net::TcpStream::connect(srv.addr()).unwrap(); + client + .set_read_timeout(Some(Duration::from_secs(5))) + .unwrap(); + + // wait for the server-side filter to be installed + let mut greeting = [0u8; 2]; + client.read_exact(&mut greeting).unwrap(); + assert_eq!(&greeting, b"hi"); + + // this write triggers the filter burst during read processing + // on the server side + client.write_all(b"ping").unwrap(); + + // burst must arrive first (it is written during read processing), + // followed by the echoed message + let mut data = vec![0u8; BURST_SIZE + 4]; + client + .read_exact(&mut data) + .expect("server did not send data, io driver is broken"); + + assert!(data[..BURST_SIZE].iter().all(|b| *b == b'x')); + assert_eq!(&data[BURST_SIZE..], b"ping"); +}