Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ntex-net/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changes

## [Unreleased]

* polling (neon) driver: do not panic with "called `Option::unwrap()` on a `None`
value" when a filter emits a large (>= write_buf_threshold) write burst during
read processing; the reentrant write is now deferred until the streams slab is
released instead of re-taking it

## [3.12.0] - 2026-05-15

* Update ntex-io
Expand Down
4 changes: 2 additions & 2 deletions ntex-net/src/polling/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{any, future::poll_fn, task::Poll};
use ntex_io::{Handle, IoContext, Readiness, types};
use ntex_rt::spawn;

use super::stream::{StreamCtl, StreamItem, WeakStreamCtl};
use super::stream::{StreamCtl, WeakStreamCtl};

impl ntex_io::IoStream for super::TcpStream {
fn start(self, ctx: IoContext) -> Box<dyn Handle> {
Expand Down Expand Up @@ -39,7 +39,7 @@ impl Handle for HandleWrapper {
}

fn write(&self, _: &IoContext) {
self.0.with(StreamItem::write_direct);
self.0.write();
}
}

Expand Down
63 changes: 54 additions & 9 deletions ntex-net/src/polling/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ struct StreamOpsInner {
api: DriverApi,
delayed_drop: Cell<bool>,
delayed_feed: Cell<Option<Vec<IdType>>>,
delayed_write: Cell<bool>,
write_feed: Cell<Option<Vec<u32>>>,
streams: Cell<Option<Box<Slab<StreamItem>>>>,
}

Expand All @@ -67,6 +69,8 @@ impl StreamOps {
api,
delayed_drop: Cell::new(false),
delayed_feed: Cell::new(Some(Vec::new())),
delayed_write: Cell::new(false),
write_feed: Cell::new(Some(Vec::new())),
streams: Cell::new(Some(Box::new(Slab::new()))),
});
inner = Some(ops.clone());
Expand Down Expand Up @@ -201,6 +205,7 @@ impl Handler for StreamOpsHandler {
}
}
self.inner.delayed_feed.take();
self.inner.write_feed.take();
}
}

Expand All @@ -211,10 +216,56 @@ impl StreamOpsInner {
{
let mut streams = self.streams.take().unwrap();
let result = f(&mut streams);
if self.delayed_write.get() {
self.check_delayed_writes(&mut streams);
}
self.streams.set(Some(streams));
result
}

/// Initiate write operation for the stream
fn write_stream(&self, id: u32) {
if let Some(mut streams) = self.streams.take() {
if let Some(item) = streams.get_mut(id as usize) {
item.write();
}
if self.delayed_write.get() {
self.check_delayed_writes(&mut streams);
}
self.streams.set(Some(streams));
} else {
// Write is initiated while `StreamOps` is handling an event
// (e.g. a filter generated data during read processing).
// Delay the write until the streams slab is released,
// dropping it would stall the connection.
self.delayed_write.set(true);
if let Some(mut feed) = self.write_feed.take() {
feed.push(id);
self.write_feed.set(Some(feed));
}
}
}

/// Process writes that were requested while the streams slab was taken
fn check_delayed_writes(&self, streams: &mut Slab<StreamItem>) {
while let Some(mut feed) = self.write_feed.take() {
if feed.is_empty() {
self.write_feed.set(Some(feed));
self.delayed_write.set(false);
break;
}
let ids = mem::take(&mut feed);
self.write_feed.set(Some(feed));
for id in ids {
if let Some(item) = streams.get_mut(id as usize)
&& !item.ctx.is_stopped()
{
item.write();
}
}
}
}

fn drop_stream(&self, id: u32) {
// Dropping while `StreamOps` handling event
if let Some(mut streams) = self.streams.take() {
Expand Down Expand Up @@ -389,11 +440,9 @@ impl WeakStreamCtl {
self.inner.with(|streams| f(&streams[self.id as usize].io))
}

pub(super) fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut StreamItem) -> R,
{
self.inner.with(|streams| f(&mut streams[self.id as usize]))
/// Initiate write operation for the stream
pub(super) fn write(&self) {
self.inner.write_stream(self.id);
}
}

Expand All @@ -412,10 +461,6 @@ impl StreamItem {
self.ctx.tag()
}

pub(super) fn write_direct(&mut self) {
self.write();
}

fn write(&mut self) -> IoTaskStatus {
let res = self.ctx.with_write_buf(|wrt| {
let mut pages: [Option<BytePage>; MAX_WRITE_ITEMS] = [
Expand Down
104 changes: 104 additions & 0 deletions ntex/tests/io_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! Io filter integration tests.
use std::io::{Read, Write};
use std::{cell::Cell, io, net, time::Duration};

use ntex::codec::BytesCodec;
use ntex::io::{FilterBuf, FilterLayer, Io};
use ntex::server::test_server;
use ntex::service::fn_service;
use ntex::util::Bytes;

/// Must be greater than or equal to `IoConfig::write_buf_threshold`
/// (8192 by default), so that the io stream initiates a direct
/// (in-place) write while the read buffer is being processed.
const BURST_SIZE: usize = 16 * 1024;

/// Filter that emits a large burst of write data while processing
/// incoming read data, similar to a TLS filter that writes handshake
/// or control data in response to incoming records.
#[derive(Debug, Default)]
struct BurstWriteFilter {
sent: Cell<bool>,
}

impl FilterLayer for BurstWriteFilter {
fn process_read_buf(&self, buf: &FilterBuf<'_>) -> io::Result<()> {
// pass incoming data through
let got_data = buf.with_read_buffers(|src, dst| {
if let Some(src) = src.take() {
dst.extend_from_slice(&src);
!src.is_empty()
} else {
false
}
});

// the first incoming data triggers a large write
if got_data && !self.sent.get() {
self.sent.set(true);
buf.with_write_buffers(|_, dst| {
dst.extend_from_slice(&[b'x'; BURST_SIZE]);
});
}
Ok(())
}

fn process_write_buf(&self, buf: &FilterBuf<'_>) -> io::Result<()> {
// pass outgoing data through
buf.with_write_buffers(|src, dst| {
if !src.is_empty() {
src.move_to(dst);
}
});
Ok(())
}
}

/// A filter that produces a write burst (>= `write_buf_threshold`)
/// during read-buffer processing must not break the io stream.
///
/// Regression test for the polling (neon) backend: the driver used to
/// panic on reentrant slab access ("called `Option::unwrap()` on a
/// `None` value") when a filter generated enough write data during
/// read processing to trigger a direct write.
#[ntex::test]
async fn test_filter_large_write_during_read_processing() {
let srv = test_server(async || {
fn_service(|io: Io| async move {
let io = io.add_filter(BurstWriteFilter::default());
// notify the client that the filter is installed
io.send(Bytes::from_static(b"hi"), &BytesCodec)
.await
.unwrap();
// echo service
while let Ok(Some(msg)) = io.recv(&BytesCodec).await {
io.send(msg, &BytesCodec).await.unwrap();
}
Ok::<_, io::Error>(())
})
});

let mut client = net::TcpStream::connect(srv.addr()).unwrap();
client
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();

// wait for the server-side filter to be installed
let mut greeting = [0u8; 2];
client.read_exact(&mut greeting).unwrap();
assert_eq!(&greeting, b"hi");

// this write triggers the filter burst during read processing
// on the server side
client.write_all(b"ping").unwrap();

// burst must arrive first (it is written during read processing),
// followed by the echoed message
let mut data = vec![0u8; BURST_SIZE + 4];
client
.read_exact(&mut data)
.expect("server did not send data, io driver is broken");

assert!(data[..BURST_SIZE].iter().all(|b| *b == b'x'));
assert_eq!(&data[BURST_SIZE..], b"ping");
}
Loading