Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ exclude = [
"README.md"
]

[lints.clippy]
pedantic = "warn"

[dependencies]

futures-core = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions examples/sqlite_batch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Batch-insert rows into an in-memory SQLite database using the rarray virtual table.
//! Batch-insert rows into an in-memory `SQLite` database using the rarray virtual table.
//!
//! Run with: `cargo run --example sqlite_batch`

Expand Down Expand Up @@ -107,7 +107,7 @@ async fn main() -> Result<(), BoxError> {
batch
.call(InsertRow {
name: format!("task{task_id}_row{i}"),
value: (task_id * 50 + i) as i64,
value: i64::from(task_id * 50 + i),
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ impl<Request> BatchLayer<Request> {
///
/// * `size` – the maximum number of items per batch.
/// * `time` – the maximum duration before a batch is flushed.
#[must_use]
pub fn new(size: usize, time: Duration) -> Self {
Self {
size,
Expand Down
8 changes: 4 additions & 4 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ where
fn call(&mut self, request: Request) -> Self::Future {
tracing::debug!("sending request to batch worker");

let _permit = self
let permit = self
.permit
.take()
.expect("batch full; poll_ready must be called first");
Expand All @@ -172,12 +172,12 @@ where
// The worker is in control of completing the request now.
match self.tx.send(Message {
request,
span,
tx,
_permit,
span,
_permit: permit,
}) {
Err(_) => ResponseFuture::failed(self.get_worker_error()),
Ok(_) => ResponseFuture::new(rx),
Ok(()) => ResponseFuture::new(rx),
}
}
}
Expand Down
103 changes: 50 additions & 53 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ where
{
type Output = ();

#[allow(clippy::too_many_lines)]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
trace!("polling worker");

Expand All @@ -166,53 +167,50 @@ where
loop {
match this.state.as_mut().project() {
StateProj::Collecting => {
match ready!(this.bridge.poll_next_msg(cx)) {
Some((msg, first)) => {
let _guard = msg.span.enter();

trace!(resumed = !first, message = "worker received request");

// Wait for the service to be ready
trace!(message = "waiting for service readiness");
match this.service.poll_ready(cx) {
Poll::Ready(Ok(())) => {
debug!(service.ready = true, message = "adding item");

let response = this.service.call(msg.request.into());
this.lot.add((msg.tx, Ok(response)));

// Flush if the batch is full.
if this.lot.is_full() {
this.state.set(State::flushing("size".to_owned(), None));
} else if this.lot.poll_max_time(cx).is_ready() {
// Or flush if the max time has elapsed.
this.state.set(State::flushing("time".to_owned(), None));
}
}
Poll::Pending => {
drop(_guard);
debug!(service.ready = false, message = "delay item addition");
this.bridge.return_msg(msg);
return Poll::Pending;
if let Some((msg, first)) = ready!(this.bridge.poll_next_msg(cx)) {
let guard = msg.span.enter();

trace!(resumed = !first, message = "worker received request");

// Wait for the service to be ready
trace!(message = "waiting for service readiness");
match this.service.poll_ready(cx) {
Poll::Ready(Ok(())) => {
debug!(service.ready = true, message = "adding item");

let response = this.service.call(msg.request.into());
this.lot.add((msg.tx, Ok(response)));

// Flush if the batch is full.
if this.lot.is_full() {
this.state.set(State::flushing("size".to_owned(), None));
} else if this.lot.poll_max_time(cx).is_ready() {
// Or flush if the max time has elapsed.
this.state.set(State::flushing("time".to_owned(), None));
}
Poll::Ready(Err(e)) => {
drop(_guard);
this.bridge.failed("item addition", e.into());
if let Some(ref e) = this.bridge.failed {
// Ensure the current caller is notified too.
this.lot.add((msg.tx, Err(e.clone())));
this.lot.notify(Some(e.clone()));
}
this.state.set(State::Finished);
return Poll::Ready(());
}
Poll::Pending => {
drop(guard);
debug!(service.ready = false, message = "delay item addition");
this.bridge.return_msg(msg);
return Poll::Pending;
}
Poll::Ready(Err(e)) => {
drop(guard);
this.bridge.failed("item addition", e.into());
if let Some(ref e) = this.bridge.failed {
// Ensure the current caller is notified too.
this.lot.add((msg.tx, Err(e.clone())));
this.lot.notify(Some(e));
}
this.state.set(State::Finished);
return Poll::Ready(());
}
}
None => {
trace!("shutting down, no more requests _ever_");
this.state.set(State::Finished);
return Poll::Ready(());
}
} else {
trace!("shutting down, no more requests _ever_");
this.state.set(State::Finished);
return Poll::Ready(());
}
}
StateProj::Flushing { reason, flush_fut } => match flush_fut.as_pin_mut() {
Expand Down Expand Up @@ -243,7 +241,7 @@ where
Poll::Ready(Err(e)) => {
this.bridge.failed("flush", e.into());
if let Some(ref e) = this.bridge.failed {
this.lot.notify(Some(e.clone()));
this.lot.notify(Some(e));
}
this.state.set(State::Finished);
return Poll::Ready(());
Expand All @@ -254,12 +252,12 @@ where
Ok(_) => {
debug!(reason = reason.as_mut().unwrap().as_str(), "batch flushed");
this.lot.notify(None);
this.state.set(State::Collecting)
this.state.set(State::Collecting);
}
Err(e) => {
this.bridge.failed("flush", e.into());
if let Some(ref e) = this.bridge.failed {
this.lot.notify(Some(e.clone()));
this.lot.notify(Some(e));
}
this.state.set(State::Finished);
return Poll::Ready(());
Expand Down Expand Up @@ -290,7 +288,7 @@ impl<Fut> State<Fut> {

impl<Fut, Request> Drop for Bridge<Fut, Request> {
fn drop(&mut self) {
self.close_semaphore()
self.close_semaphore();
}
}

Expand Down Expand Up @@ -385,7 +383,7 @@ impl<Fut, Request> Bridge<Fut, Request> {
}

fn return_msg(&mut self, msg: Message<Request, Fut>) {
self.current_message = Some(msg)
self.current_message = Some(msg);
}
}

Expand Down Expand Up @@ -434,10 +432,10 @@ impl<Fut> Lot<Fut> {
self.responses.push(item);
}

fn notify(&mut self, err: Option<ServiceError>) {
fn notify(&mut self, err: Option<&ServiceError>) {
for (tx, response) in mem::replace(&mut self.responses, Vec::with_capacity(self.max_size)) {
if let Some(ref response) = err {
let _ = tx.send(Err(response.clone()));
if let Some(err) = err {
let _ = tx.send(Err(err.clone()));
} else {
let _ = tx.send(response);
}
Expand All @@ -455,8 +453,7 @@ impl Handle {
.lock()
.unwrap()
.as_ref()
.map(|svc_err| svc_err.clone().into())
.unwrap_or_else(|| Closed::new().into())
.map_or_else(|| Closed::new().into(), |svc_err| svc_err.clone().into())
}
}

Expand Down
Loading
Loading