Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct Bridge<Fut, Request> {
/// Accumulates batch items with their oneshot response senders.
///
/// Tracks the max-time timer (started when the first item arrives) and
/// dispatches results or errors to all collected senders on flush via
/// dispatches results or errors to all collected senders on flush via
/// [`notify`](Lot::notify).
#[derive(Debug)]
struct Lot<Fut> {
Expand Down Expand Up @@ -205,6 +205,8 @@ where
this.lot.add((msg.tx, Err(e.clone())));
this.lot.notify(Some(e.clone()));
}
this.state.set(State::Finished);
return Poll::Ready(());
}
}
}
Expand Down
70 changes: 62 additions & 8 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ async fn concurrent_clones_with_backpressure() {

let mut service2 = service.clone();

// Inner service starts not ready creates back-pressure.
// Inner service starts not ready creates back-pressure.
handle.allow(0);

// Clone 1 sends a request; it will be buffered but the inner service won't accept it yet.
Expand All @@ -317,7 +317,7 @@ async fn concurrent_clones_with_backpressure() {
assert_ready_ok!(service2.poll_ready());
let mut res2 = task::spawn(service2.call("from_clone2"));

// Let the worker attempt to process (it can't inner service not ready).
// Let the worker attempt to process (it can't inner service not ready).
tokio::time::sleep(Duration::from_millis(50)).await;
assert_pending!(res1.poll());
assert_pending!(res2.poll());
Expand Down Expand Up @@ -925,7 +925,7 @@ async fn call_after_worker_death() {
// Acquire permit while worker is still alive
assert_ready_ok!(service.poll_ready());

// Kill the worker channel receiver is dropped
// Kill the worker channel receiver is dropped
drop(worker);

// call() tries tx.send() which fails → ResponseFuture::failed()
Expand Down Expand Up @@ -959,7 +959,7 @@ async fn flush_phase_poll_ready_failure() {
let mut res2 = task::spawn(service.call("b"));

// Worker processes both items, batch is full, enters Flushing { flush_fut: None },
// tries poll_ready for Flush gets Pending (0 allows left)
// tries poll_ready for Flush gets Pending (0 allows left)
assert_pending!(worker.poll());

// Respond to item requests
Expand Down Expand Up @@ -998,7 +998,7 @@ async fn cancelled_request_in_channel() {
let mut service = mock::Spawn::new(service);
let mut worker = task::spawn(worker);

// Do NOT poll worker yet messages stay in the channel
// Do NOT poll worker yet messages stay in the channel

// Send request "a" and immediately drop its response future (cancels it)
assert_ready_ok!(service.poll_ready());
Expand All @@ -1013,9 +1013,9 @@ async fn cancelled_request_in_channel() {
handle.allow(3);

// First poll: worker enters poll_next_msg while-let loop.
// Receives "a" tx.is_closed() == true skips (line 377).
// Receives "b" processes normally. Lot timer starts. Batch not full.
// Tries to get next message channel empty → Pending.
// Receives "a" tx.is_closed() == true skips (line 377).
// Receives "b" processes normally. Lot timer starts. Batch not full.
// Tries to get next message channel empty → Pending.
assert_pending!(worker.poll());

// Only "b" should reach the mock handle (not "a")
Expand Down Expand Up @@ -1091,3 +1091,57 @@ async fn timer_does_not_overwrite_in_progress_flush() {
"flush future should have run to completion, not been dropped by the timer"
);
}

/// Regression: when poll_ready fails with multiple messages queued in the
/// channel, the worker must terminate immediately instead of calling
/// poll_ready again on the broken service (which could hang or behave
/// inconsistently).
#[tokio::test(flavor = "current_thread")]
async fn worker_terminates_on_poll_ready_error_with_queued_messages() {
let _guard = support::trace_init();

let (service, mut handle) = mock::pair::<BatchControl<&str>, &str>();
let (service, worker) = Batch::pair(service, 3, Duration::from_secs(1));
let mut service = mock::Spawn::new(service);
let mut worker = task::spawn(worker);

// Inner service not ready -- messages queue in the channel.
handle.allow(0);

assert_ready_ok!(service.poll_ready());
let mut res1 = task::spawn(service.call("msg1"));

assert_ready_ok!(service.poll_ready());
let mut res2 = task::spawn(service.call("msg2"));

// Worker tries to process msg1 but service is not ready -- returns Pending.
assert_pending!(worker.poll());

// Now make the next poll_ready fail.
handle.send_error("boom");

// Worker should hit the error on msg1 and terminate immediately,
// without attempting poll_ready for msg2.
assert_ready!(worker.poll());

// The first response gets the ServiceError (notified via lot).
let err1 = assert_ready_err!(res1.poll());
assert!(
err1.is::<error::ServiceError>(),
"res1 should be ServiceError, got: {:?}",
err1
);

// Drop the worker so the channel receiver and remaining messages are
// freed. This drops msg2's oneshot sender, unblocking res2.
drop(worker);

// The second response gets Closed (its oneshot sender was dropped
// when the worker was dropped without processing it).
let err2 = assert_ready_err!(res2.poll());
assert!(
err2.is::<error::Closed>(),
"res2 should be Closed, got: {:?}",
err2
);
}
Loading