From 6c80a0d0b0bacaaa79f7daf3cb55aeea29217480 Mon Sep 17 00:00:00 2001 From: Borislav Borisov Date: Tue, 7 Apr 2026 08:17:44 +0100 Subject: [PATCH] Fix worker hang after poll_ready error in Collecting state When service.poll_ready() returned Err during item collection, the worker continued looping in the Collecting state, calling poll_ready again on the broken service for each remaining channel message. If the service returned Pending, the worker would hang forever. Transition to Finished immediately after propagating the error, matching the pattern already used by the Flushing state error handler. --- src/worker.rs | 4 ++- tests/main.rs | 70 +++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/src/worker.rs b/src/worker.rs index 643c429..c05e6d9 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -49,7 +49,7 @@ struct Bridge { /// Accumulates batch items with their oneshot response senders. /// /// Tracks the max-time timer (started when the first item arrives) and -/// dispatches results — or errors — to all collected senders on flush via +/// dispatches results – or errors – to all collected senders on flush via /// [`notify`](Lot::notify). #[derive(Debug)] struct Lot { @@ -205,6 +205,8 @@ where this.lot.add((msg.tx, Err(e.clone()))); this.lot.notify(Some(e.clone())); } + this.state.set(State::Finished); + return Poll::Ready(()); } } } diff --git a/tests/main.rs b/tests/main.rs index b49f1ec..ba09027 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -306,7 +306,7 @@ async fn concurrent_clones_with_backpressure() { let mut service2 = service.clone(); - // Inner service starts not ready — creates back-pressure. + // Inner service starts not ready – creates back-pressure. handle.allow(0); // Clone 1 sends a request; it will be buffered but the inner service won't accept it yet. @@ -317,7 +317,7 @@ async fn concurrent_clones_with_backpressure() { assert_ready_ok!(service2.poll_ready()); let mut res2 = task::spawn(service2.call("from_clone2")); - // Let the worker attempt to process (it can't — inner service not ready). + // Let the worker attempt to process (it can't – inner service not ready). tokio::time::sleep(Duration::from_millis(50)).await; assert_pending!(res1.poll()); assert_pending!(res2.poll()); @@ -925,7 +925,7 @@ async fn call_after_worker_death() { // Acquire permit while worker is still alive assert_ready_ok!(service.poll_ready()); - // Kill the worker — channel receiver is dropped + // Kill the worker – channel receiver is dropped drop(worker); // call() tries tx.send() which fails → ResponseFuture::failed() @@ -959,7 +959,7 @@ async fn flush_phase_poll_ready_failure() { let mut res2 = task::spawn(service.call("b")); // Worker processes both items, batch is full, enters Flushing { flush_fut: None }, - // tries poll_ready for Flush — gets Pending (0 allows left) + // tries poll_ready for Flush – gets Pending (0 allows left) assert_pending!(worker.poll()); // Respond to item requests @@ -998,7 +998,7 @@ async fn cancelled_request_in_channel() { let mut service = mock::Spawn::new(service); let mut worker = task::spawn(worker); - // Do NOT poll worker yet — messages stay in the channel + // Do NOT poll worker yet – messages stay in the channel // Send request "a" and immediately drop its response future (cancels it) assert_ready_ok!(service.poll_ready()); @@ -1013,9 +1013,9 @@ async fn cancelled_request_in_channel() { handle.allow(3); // First poll: worker enters poll_next_msg while-let loop. - // Receives "a" — tx.is_closed() == true — skips (line 377). - // Receives "b" — processes normally. Lot timer starts. Batch not full. - // Tries to get next message — channel empty → Pending. + // Receives "a" – tx.is_closed() == true – skips (line 377). + // Receives "b" – processes normally. Lot timer starts. Batch not full. + // Tries to get next message – channel empty → Pending. assert_pending!(worker.poll()); // Only "b" should reach the mock handle (not "a") @@ -1091,3 +1091,57 @@ async fn timer_does_not_overwrite_in_progress_flush() { "flush future should have run to completion, not been dropped by the timer" ); } + +/// Regression: when poll_ready fails with multiple messages queued in the +/// channel, the worker must terminate immediately instead of calling +/// poll_ready again on the broken service (which could hang or behave +/// inconsistently). +#[tokio::test(flavor = "current_thread")] +async fn worker_terminates_on_poll_ready_error_with_queued_messages() { + let _guard = support::trace_init(); + + let (service, mut handle) = mock::pair::, &str>(); + let (service, worker) = Batch::pair(service, 3, Duration::from_secs(1)); + let mut service = mock::Spawn::new(service); + let mut worker = task::spawn(worker); + + // Inner service not ready -- messages queue in the channel. + handle.allow(0); + + assert_ready_ok!(service.poll_ready()); + let mut res1 = task::spawn(service.call("msg1")); + + assert_ready_ok!(service.poll_ready()); + let mut res2 = task::spawn(service.call("msg2")); + + // Worker tries to process msg1 but service is not ready -- returns Pending. + assert_pending!(worker.poll()); + + // Now make the next poll_ready fail. + handle.send_error("boom"); + + // Worker should hit the error on msg1 and terminate immediately, + // without attempting poll_ready for msg2. + assert_ready!(worker.poll()); + + // The first response gets the ServiceError (notified via lot). + let err1 = assert_ready_err!(res1.poll()); + assert!( + err1.is::(), + "res1 should be ServiceError, got: {:?}", + err1 + ); + + // Drop the worker so the channel receiver and remaining messages are + // freed. This drops msg2's oneshot sender, unblocking res2. + drop(worker); + + // The second response gets Closed (its oneshot sender was dropped + // when the worker was dropped without processing it). + let err2 = assert_ready_err!(res2.poll()); + assert!( + err2.is::(), + "res2 should be Closed, got: {:?}", + err2 + ); +}