diff --git a/Cargo.toml b/Cargo.toml index 8810430..0ac9433 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,9 @@ exclude = [ "README.md" ] +[lints.clippy] +pedantic = "warn" + [dependencies] futures-core = "0.3" diff --git a/examples/sqlite_batch.rs b/examples/sqlite_batch.rs index f633989..215711b 100644 --- a/examples/sqlite_batch.rs +++ b/examples/sqlite_batch.rs @@ -1,4 +1,4 @@ -//! Batch-insert rows into an in-memory SQLite database using the rarray virtual table. +//! Batch-insert rows into an in-memory `SQLite` database using the rarray virtual table. //! //! Run with: `cargo run --example sqlite_batch` @@ -107,7 +107,7 @@ async fn main() -> Result<(), BoxError> { batch .call(InsertRow { name: format!("task{task_id}_row{i}"), - value: (task_id * 50 + i) as i64, + value: i64::from(task_id * 50 + i), }) .await .unwrap(); diff --git a/src/layer.rs b/src/layer.rs index 5a0feb2..afb52f1 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -22,6 +22,7 @@ impl BatchLayer { /// /// * `size` – the maximum number of items per batch. /// * `time` – the maximum duration before a batch is flushed. + #[must_use] pub fn new(size: usize, time: Duration) -> Self { Self { size, diff --git a/src/service.rs b/src/service.rs index 1f8e02d..0a710a9 100644 --- a/src/service.rs +++ b/src/service.rs @@ -155,7 +155,7 @@ where fn call(&mut self, request: Request) -> Self::Future { tracing::debug!("sending request to batch worker"); - let _permit = self + let permit = self .permit .take() .expect("batch full; poll_ready must be called first"); @@ -172,12 +172,12 @@ where // The worker is in control of completing the request now. match self.tx.send(Message { request, - span, tx, - _permit, + span, + _permit: permit, }) { Err(_) => ResponseFuture::failed(self.get_worker_error()), - Ok(_) => ResponseFuture::new(rx), + Ok(()) => ResponseFuture::new(rx), } } } diff --git a/src/worker.rs b/src/worker.rs index f32d2f5..b25c26c 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -151,6 +151,7 @@ where { type Output = (); + #[allow(clippy::too_many_lines)] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { trace!("polling worker"); @@ -166,53 +167,50 @@ where loop { match this.state.as_mut().project() { StateProj::Collecting => { - match ready!(this.bridge.poll_next_msg(cx)) { - Some((msg, first)) => { - let _guard = msg.span.enter(); - - trace!(resumed = !first, message = "worker received request"); - - // Wait for the service to be ready - trace!(message = "waiting for service readiness"); - match this.service.poll_ready(cx) { - Poll::Ready(Ok(())) => { - debug!(service.ready = true, message = "adding item"); - - let response = this.service.call(msg.request.into()); - this.lot.add((msg.tx, Ok(response))); - - // Flush if the batch is full. - if this.lot.is_full() { - this.state.set(State::flushing("size".to_owned(), None)); - } else if this.lot.poll_max_time(cx).is_ready() { - // Or flush if the max time has elapsed. - this.state.set(State::flushing("time".to_owned(), None)); - } - } - Poll::Pending => { - drop(_guard); - debug!(service.ready = false, message = "delay item addition"); - this.bridge.return_msg(msg); - return Poll::Pending; + if let Some((msg, first)) = ready!(this.bridge.poll_next_msg(cx)) { + let guard = msg.span.enter(); + + trace!(resumed = !first, message = "worker received request"); + + // Wait for the service to be ready + trace!(message = "waiting for service readiness"); + match this.service.poll_ready(cx) { + Poll::Ready(Ok(())) => { + debug!(service.ready = true, message = "adding item"); + + let response = this.service.call(msg.request.into()); + this.lot.add((msg.tx, Ok(response))); + + // Flush if the batch is full. + if this.lot.is_full() { + this.state.set(State::flushing("size".to_owned(), None)); + } else if this.lot.poll_max_time(cx).is_ready() { + // Or flush if the max time has elapsed. + this.state.set(State::flushing("time".to_owned(), None)); } - Poll::Ready(Err(e)) => { - drop(_guard); - this.bridge.failed("item addition", e.into()); - if let Some(ref e) = this.bridge.failed { - // Ensure the current caller is notified too. - this.lot.add((msg.tx, Err(e.clone()))); - this.lot.notify(Some(e.clone())); - } - this.state.set(State::Finished); - return Poll::Ready(()); + } + Poll::Pending => { + drop(guard); + debug!(service.ready = false, message = "delay item addition"); + this.bridge.return_msg(msg); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + drop(guard); + this.bridge.failed("item addition", e.into()); + if let Some(ref e) = this.bridge.failed { + // Ensure the current caller is notified too. + this.lot.add((msg.tx, Err(e.clone()))); + this.lot.notify(Some(e)); } + this.state.set(State::Finished); + return Poll::Ready(()); } } - None => { - trace!("shutting down, no more requests _ever_"); - this.state.set(State::Finished); - return Poll::Ready(()); - } + } else { + trace!("shutting down, no more requests _ever_"); + this.state.set(State::Finished); + return Poll::Ready(()); } } StateProj::Flushing { reason, flush_fut } => match flush_fut.as_pin_mut() { @@ -243,7 +241,7 @@ where Poll::Ready(Err(e)) => { this.bridge.failed("flush", e.into()); if let Some(ref e) = this.bridge.failed { - this.lot.notify(Some(e.clone())); + this.lot.notify(Some(e)); } this.state.set(State::Finished); return Poll::Ready(()); @@ -254,12 +252,12 @@ where Ok(_) => { debug!(reason = reason.as_mut().unwrap().as_str(), "batch flushed"); this.lot.notify(None); - this.state.set(State::Collecting) + this.state.set(State::Collecting); } Err(e) => { this.bridge.failed("flush", e.into()); if let Some(ref e) = this.bridge.failed { - this.lot.notify(Some(e.clone())); + this.lot.notify(Some(e)); } this.state.set(State::Finished); return Poll::Ready(()); @@ -290,7 +288,7 @@ impl State { impl Drop for Bridge { fn drop(&mut self) { - self.close_semaphore() + self.close_semaphore(); } } @@ -385,7 +383,7 @@ impl Bridge { } fn return_msg(&mut self, msg: Message) { - self.current_message = Some(msg) + self.current_message = Some(msg); } } @@ -434,10 +432,10 @@ impl Lot { self.responses.push(item); } - fn notify(&mut self, err: Option) { + fn notify(&mut self, err: Option<&ServiceError>) { for (tx, response) in mem::replace(&mut self.responses, Vec::with_capacity(self.max_size)) { - if let Some(ref response) = err { - let _ = tx.send(Err(response.clone())); + if let Some(err) = err { + let _ = tx.send(Err(err.clone())); } else { let _ = tx.send(response); } @@ -455,8 +453,7 @@ impl Handle { .lock() .unwrap() .as_ref() - .map(|svc_err| svc_err.clone().into()) - .unwrap_or_else(|| Closed::new().into()) + .map_or_else(|| Closed::new().into(), |svc_err| svc_err.clone().into()) } } diff --git a/tests/main.rs b/tests/main.rs index c7e4639..84da79f 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -42,7 +42,7 @@ impl Aggregator { return false; } let items = &self.items.lock().unwrap(); - items.get(index).map(|v| v.len() == size).unwrap_or(false) + items.get(index).is_some_and(|v| v.len() == size) } fn batch_items(&self, index: usize) -> Option> @@ -119,7 +119,7 @@ async fn batch_flushes_on_max_size() -> Result<(), BoxError> { results.push(span.in_scope(|| batch.call(i))); } - while let Some(Ok(_)) = results.next().await {} + while let Some(Ok(())) = results.next().await {} assert!(aggregator.batch_has_size(0, 10)); @@ -144,7 +144,7 @@ async fn batch_flushes_on_elapsed_time() -> Result<(), BoxError> { results.push(span.in_scope(|| batch.call(i))); } - while let Some(Ok(_)) = results.next().await {} + while let Some(Ok(())) = results.next().await {} // Give it enough time to finish tokio::time::sleep(Duration::from_millis(500)).await; @@ -169,7 +169,7 @@ async fn batch_flushes_multiple_times() -> Result<(), BoxError> { results.push(span.in_scope(|| batch.call(i))); } - while let Some(Ok(_)) = results.next().await {} + while let Some(Ok(())) = results.next().await {} assert!(aggregator.batch_has_size(0, 10)); assert!(aggregator.batch_has_size(1, 10)); @@ -191,7 +191,7 @@ async fn batch_items_are_ordered() -> Result<(), BoxError> { results.push(batch.call(i)); } - while let Some(Ok(_)) = results.next().await {} + while let Some(Ok(())) = results.next().await {} let items = aggregator.batch_items(0).expect("batch 0 should exist"); assert_eq!(items, (0..10).collect::>()); @@ -237,7 +237,7 @@ async fn concurrent_clones_send_requests() -> Result<(), BoxError> { // Verify all 9 items were actually delivered to the aggregator. let items = aggregator.items.lock().unwrap(); - let delivered: usize = items.iter().map(|batch| batch.len()).sum(); + let delivered: usize = items.iter().map(Vec::len).sum(); assert_eq!(delivered, 9, "all 9 items should reach the aggregator"); Ok(()) @@ -257,7 +257,7 @@ async fn time_based_flush_triggers_multiple_batches() -> Result<(), BoxError> { batch.ready().await?; results.push(batch.call(i)); } - while let Some(Ok(_)) = results.next().await {} + while let Some(Ok(())) = results.next().await {} // Give the flush time to complete. tokio::time::sleep(Duration::from_millis(250)).await; @@ -267,7 +267,7 @@ async fn time_based_flush_triggers_multiple_batches() -> Result<(), BoxError> { batch.ready().await?; results.push(batch.call(i)); } - while let Some(Ok(_)) = results.next().await {} + while let Some(Ok(())) = results.next().await {} tokio::time::sleep(Duration::from_millis(250)).await; assert!( @@ -463,7 +463,7 @@ async fn when_inner_fails() { assert_eq!(e.to_string(), "foobar"); } else { - panic!("unexpected error type: {:?}", e); + panic!("unexpected error type: {e:?}"); } } @@ -480,7 +480,7 @@ async fn poll_ready_when_worker_is_dropped_early() { let err = assert_ready_err!(service.poll_ready()); - assert!(err.is::(), "should be a Closed: {:?}", err); + assert!(err.is::(), "should be a Closed: {err:?}"); } #[tokio::test(flavor = "current_thread")] @@ -502,7 +502,7 @@ async fn response_future_when_worker_is_dropped_early() { tokio::time::sleep(Duration::from_millis(100)).await; let err = assert_ready_err!(response.poll()); - assert!(err.is::(), "should be a Closed: {:?}", err); + assert!(err.is::(), "should be a Closed: {err:?}"); } #[tokio::test(flavor = "current_thread")] @@ -655,8 +655,7 @@ async fn wakes_pending_waiters_on_close() -> Result<(), BoxError> { let err = assert_ready_err!(response.poll()); assert!( err.is::(), - "response should fail with a Closed, got: {:?}", - err + "response should fail with a Closed, got: {err:?}" ); assert!( @@ -666,8 +665,7 @@ async fn wakes_pending_waiters_on_close() -> Result<(), BoxError> { let err = assert_ready_err!(ready1.poll()); assert!( err.is::(), - "ready 1 should fail with a Closed, got: {:?}", - err + "ready 1 should fail with a Closed, got: {err:?}" ); assert!( @@ -677,8 +675,7 @@ async fn wakes_pending_waiters_on_close() -> Result<(), BoxError> { let err = assert_ready_err!(ready2.poll()); assert!( err.is::(), - "ready 2 should fail with a Closed, got: {:?}", - err + "ready 2 should fail with a Closed, got: {err:?}" ); Ok(()) @@ -716,8 +713,7 @@ async fn wakes_pending_waiters_on_failure() -> Result<(), BoxError> { let err = assert_ready_err!(response.poll()); assert!( err.is::(), - "response should fail with a ServiceError, got: {:?}", - err + "response should fail with a ServiceError, got: {err:?}" ); assert!( @@ -727,8 +723,7 @@ async fn wakes_pending_waiters_on_failure() -> Result<(), BoxError> { let err = assert_ready_err!(ready1.poll()); assert!( err.is::(), - "ready 1 should fail with a ServiceError, got: {:?}", - err + "ready 1 should fail with a ServiceError, got: {err:?}" ); assert!( @@ -738,8 +733,7 @@ async fn wakes_pending_waiters_on_failure() -> Result<(), BoxError> { let err = assert_ready_err!(ready2.poll()); assert!( err.is::(), - "ready 2 should fail with a ServiceError, got: {:?}", - err + "ready 2 should fail with a ServiceError, got: {err:?}" ); Ok(()) @@ -832,11 +826,10 @@ async fn batch_layer_wraps_service() { let layer = BatchLayer::::new(10, Duration::from_secs(1)); // Cover Debug impl - let debug_str = format!("{:?}", layer); + let debug_str = format!("{layer:?}"); assert!( debug_str.contains("BatchLayer"), - "Debug should contain 'BatchLayer', got: {}", - debug_str + "Debug should contain 'BatchLayer', got: {debug_str}", ); // Cover Layer::layer() which delegates to Batch::new() @@ -871,13 +864,12 @@ async fn error_display_and_debug_formatting() { let closed = err .downcast_ref::() .expect("should be Closed"); - let debug_str = format!("{:?}", closed); - assert!(debug_str.contains("Closed"), "Debug: {}", debug_str); - let display_str = format!("{}", closed); + let debug_str = format!("{closed:?}"); + assert!(debug_str.contains("Closed"), "Debug: {debug_str}"); + let display_str = format!("{closed}"); assert!( display_str.contains("batch's worker closed unexpectedly"), - "Display: {}", - display_str + "Display: {display_str}", ); } @@ -902,11 +894,10 @@ async fn error_display_and_debug_formatting() { let svc_err = err .downcast_ref::() .expect("should be ServiceError"); - let display_str = format!("{}", svc_err); + let display_str = format!("{svc_err}"); assert!( display_str.contains("batch service failed:"), - "Display: {}", - display_str + "Display: {display_str}", ); // Also check source() let source = svc_err.source().unwrap(); @@ -933,11 +924,7 @@ async fn call_after_worker_death() { // Exercises ResponseState::Failed arm let err = assert_ready_err!(response.poll()); - assert!( - err.is::(), - "should be Closed, got: {:?}", - err - ); + assert!(err.is::(), "should be Closed, got: {err:?}"); } #[tokio::test(flavor = "current_thread")] @@ -977,14 +964,12 @@ async fn flush_phase_poll_ready_failure() { let err1 = assert_ready_err!(res1.poll()); assert!( err1.is::(), - "res1 should be ServiceError, got: {:?}", - err1 + "res1 should be ServiceError, got: {err1:?}" ); let err2 = assert_ready_err!(res2.poll()); assert!( err2.is::(), - "res2 should be ServiceError, got: {:?}", - err2 + "res2 should be ServiceError, got: {err2:?}" ); } @@ -1033,12 +1018,13 @@ async fn cancelled_request_in_channel() { assert_eq!(assert_ready_ok!(res_b.poll()), "rb"); } -/// Regression test: poll_max_time must not overwrite an in-progress flush. +/// Regression test: `poll_max_time` must not overwrite an in-progress flush. /// /// With batch size 1 and a 1ms timer, the size-based flush fires immediately. /// The 1ms timer expires well before the 50ms flush completes. Before the fix, /// the timer would reset the Flushing state, dropping the flush future. #[tokio::test] +#[allow(clippy::items_after_statements)] async fn timer_does_not_overwrite_in_progress_flush() { let _guard = support::trace_init(); @@ -1092,9 +1078,9 @@ async fn timer_does_not_overwrite_in_progress_flush() { ); } -/// Regression: when poll_ready fails with multiple messages queued in the +/// Regression: when `poll_ready` fails with multiple messages queued in the /// channel, the worker must terminate immediately instead of calling -/// poll_ready again on the broken service (which could hang or behave +/// `poll_ready` again on the broken service (which could hang or behave /// inconsistently). #[tokio::test(flavor = "current_thread")] async fn worker_terminates_on_poll_ready_error_with_queued_messages() { @@ -1128,8 +1114,7 @@ async fn worker_terminates_on_poll_ready_error_with_queued_messages() { let err1 = assert_ready_err!(res1.poll()); assert!( err1.is::(), - "res1 should be ServiceError, got: {:?}", - err1 + "res1 should be ServiceError, got: {err1:?}" ); // Drop the worker so the channel receiver and remaining messages are @@ -1141,7 +1126,6 @@ async fn worker_terminates_on_poll_ready_error_with_queued_messages() { let err2 = assert_ready_err!(res2.poll()); assert!( err2.is::(), - "res2 should be Closed, got: {:?}", - err2 + "res2 should be Closed, got: {err2:?}" ); } diff --git a/tests/support.rs b/tests/support.rs index da0b354..e0dca37 100644 --- a/tests/support.rs +++ b/tests/support.rs @@ -9,6 +9,7 @@ use std::{ use tower::Service; use tower_batch::BatchControl; +#[must_use] pub fn trace_init() -> tracing::subscriber::DefaultGuard { let subscriber = tracing_subscriber::fmt() .with_test_writer() @@ -40,6 +41,7 @@ impl fmt::Display for AssertSpanError { impl std::error::Error for AssertSpanError {} impl AssertSpanSvc { + #[must_use] pub fn new(span: tracing::Span) -> Self { Self { span } } @@ -57,8 +59,8 @@ impl AssertSpanSvc { } Err(AssertSpanError(format!( - "{} called outside expected span\n expected: {:?}\n current: {:?}", - func, self.span, current_span + "{func} called outside expected span\n expected: {span:?}\n current: {current_span:?}", + span = self.span, ))) } }