diff --git a/src/worker.rs b/src/worker.rs index 643c429..c05e6d9 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -49,7 +49,7 @@ struct Bridge { /// Accumulates batch items with their oneshot response senders. /// /// Tracks the max-time timer (started when the first item arrives) and -/// dispatches results — or errors — to all collected senders on flush via +/// dispatches results – or errors – to all collected senders on flush via /// [`notify`](Lot::notify). #[derive(Debug)] struct Lot { @@ -205,6 +205,8 @@ where this.lot.add((msg.tx, Err(e.clone()))); this.lot.notify(Some(e.clone())); } + this.state.set(State::Finished); + return Poll::Ready(()); } } } diff --git a/tests/main.rs b/tests/main.rs index b49f1ec..ba09027 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -306,7 +306,7 @@ async fn concurrent_clones_with_backpressure() { let mut service2 = service.clone(); - // Inner service starts not ready — creates back-pressure. + // Inner service starts not ready – creates back-pressure. handle.allow(0); // Clone 1 sends a request; it will be buffered but the inner service won't accept it yet. @@ -317,7 +317,7 @@ async fn concurrent_clones_with_backpressure() { assert_ready_ok!(service2.poll_ready()); let mut res2 = task::spawn(service2.call("from_clone2")); - // Let the worker attempt to process (it can't — inner service not ready). + // Let the worker attempt to process (it can't – inner service not ready). tokio::time::sleep(Duration::from_millis(50)).await; assert_pending!(res1.poll()); assert_pending!(res2.poll()); @@ -925,7 +925,7 @@ async fn call_after_worker_death() { // Acquire permit while worker is still alive assert_ready_ok!(service.poll_ready()); - // Kill the worker — channel receiver is dropped + // Kill the worker – channel receiver is dropped drop(worker); // call() tries tx.send() which fails → ResponseFuture::failed() @@ -959,7 +959,7 @@ async fn flush_phase_poll_ready_failure() { let mut res2 = task::spawn(service.call("b")); // Worker processes both items, batch is full, enters Flushing { flush_fut: None }, - // tries poll_ready for Flush — gets Pending (0 allows left) + // tries poll_ready for Flush – gets Pending (0 allows left) assert_pending!(worker.poll()); // Respond to item requests @@ -998,7 +998,7 @@ async fn cancelled_request_in_channel() { let mut service = mock::Spawn::new(service); let mut worker = task::spawn(worker); - // Do NOT poll worker yet — messages stay in the channel + // Do NOT poll worker yet – messages stay in the channel // Send request "a" and immediately drop its response future (cancels it) assert_ready_ok!(service.poll_ready()); @@ -1013,9 +1013,9 @@ async fn cancelled_request_in_channel() { handle.allow(3); // First poll: worker enters poll_next_msg while-let loop. - // Receives "a" — tx.is_closed() == true — skips (line 377). - // Receives "b" — processes normally. Lot timer starts. Batch not full. - // Tries to get next message — channel empty → Pending. + // Receives "a" – tx.is_closed() == true – skips (line 377). + // Receives "b" – processes normally. Lot timer starts. Batch not full. + // Tries to get next message – channel empty → Pending. assert_pending!(worker.poll()); // Only "b" should reach the mock handle (not "a") @@ -1091,3 +1091,57 @@ async fn timer_does_not_overwrite_in_progress_flush() { "flush future should have run to completion, not been dropped by the timer" ); } + +/// Regression: when poll_ready fails with multiple messages queued in the +/// channel, the worker must terminate immediately instead of calling +/// poll_ready again on the broken service (which could hang or behave +/// inconsistently). +#[tokio::test(flavor = "current_thread")] +async fn worker_terminates_on_poll_ready_error_with_queued_messages() { + let _guard = support::trace_init(); + + let (service, mut handle) = mock::pair::, &str>(); + let (service, worker) = Batch::pair(service, 3, Duration::from_secs(1)); + let mut service = mock::Spawn::new(service); + let mut worker = task::spawn(worker); + + // Inner service not ready -- messages queue in the channel. + handle.allow(0); + + assert_ready_ok!(service.poll_ready()); + let mut res1 = task::spawn(service.call("msg1")); + + assert_ready_ok!(service.poll_ready()); + let mut res2 = task::spawn(service.call("msg2")); + + // Worker tries to process msg1 but service is not ready -- returns Pending. + assert_pending!(worker.poll()); + + // Now make the next poll_ready fail. + handle.send_error("boom"); + + // Worker should hit the error on msg1 and terminate immediately, + // without attempting poll_ready for msg2. + assert_ready!(worker.poll()); + + // The first response gets the ServiceError (notified via lot). + let err1 = assert_ready_err!(res1.poll()); + assert!( + err1.is::(), + "res1 should be ServiceError, got: {:?}", + err1 + ); + + // Drop the worker so the channel receiver and remaining messages are + // freed. This drops msg2's oneshot sender, unblocking res2. + drop(worker); + + // The second response gets Closed (its oneshot sender was dropped + // when the worker was dropped without processing it). + let err2 = assert_ready_err!(res2.poll()); + assert!( + err2.is::(), + "res2 should be Closed, got: {:?}", + err2 + ); +}