mem: Cleanup resources of done streams immediately#22064
Conversation
| // No data at all — go directly to Done | ||
| self.left_exhausted = true; |
There was a problem hiding this comment.
you forgot to update this as well
There was a problem hiding this comment.
No bc above it can be converted back to false.
I only placed it there bc if at the point I placed it it is set to true, it means the input is actually depleted.
| // If the left stream is fully exhausted, release its resources so the | ||
| // upstream pipeline can be torn down before we move on to probing. | ||
| if self.left_exhausted { | ||
| active.left_stream = None; | ||
| } |
There was a problem hiding this comment.
maybe there are cases that left exhausted is true and left stream is not None, because what is the reason to have both exhausted flag and Option
| // Release the input pipeline's resources. | ||
| this.inner = | ||
| Box::pin(EmptyRecordBatchStream::new(Arc::clone(&this.schema))); |
There was a problem hiding this comment.
I think to avoid bugs the schema for inner should be kept the same and not use this.schema (even when they are the same schema) because you only want to release the stream
There was a problem hiding this comment.
Yeah fair point, applied
Applied |
| // Release the right input pipeline's resources. | ||
| let right_schema = self.right_stream.schema(); | ||
| self.right_stream = Box::pin(EmptyRecordBatchStream::new(right_schema)); |
There was a problem hiding this comment.
nit:
extract as a helper function
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing resource-cleanup (e2e4372) to 3b634aa (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing resource-cleanup (e2e4372) to 3b634aa (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing resource-cleanup (e2e4372) to 3b634aa (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
alamb
left a comment
There was a problem hiding this comment.
This is a nice optimization. Thank you @EmilyMatt
| Some(Err(e)) => Err(e), | ||
| None => { | ||
| this.finished = true; | ||
| // Release the input pipeline's resources before finalization. |
There was a problem hiding this comment.
A suggestion for a follow on PR would be to create a function that returns this empty stream, like
release_input_stream(&mut this.input, this.input.schema());Then you can document the rationale for early release in the doc comments and it might be easier to discover
The current method of sprinking around the reset code will work too, I think it just might be easier to lose in a refactoring / be forgotten when working on new operators
It might also be more defensive to use some sort of stream that returned error if it was polled rather than always being empty, to catch logic errors (now or in the future) where the input wasn't really done, but instead was accidentally reset early
There was a problem hiding this comment.
I agree about the first part, as for the defensive stream, apparently it is very a much a valid behaviour to re-poll a stream after it returns None, so I wouldn't want to cause this error for a user, maybe in tests.
| self.metrics.baseline_metrics.elapsed_compute(), | ||
| ); | ||
| if other.is_none() { | ||
| // Release the input pipeline's resources. |
There was a problem hiding this comment.
I think this comment would be more valuable if it explained "why" releasing the input pipeline is ok only when other.is_noe() is true
There was a problem hiding this comment.
It's a specific case bc the match can be both an err and None, but I've added a couple comments there to make the flow readable even at a skim.
Which issue does this PR close?
Rationale for this change
Reduces memory pressure, cleans up resources eagerly, and makes pools aware that operators are done by dropping their MemoryReservation and MemoryConsumers.
What changes are included in this PR?
Whenever a stream is polled and returns None(is depleted), drops that stream, or replaces it with EmptyRecordBatchStream.
Are these changes tested?
This should have no effect on logic, as the streams are already depleted.
Are there any user-facing changes?
No, users implementing their own memory pool can expect to see the consumer count drop whenever a stream is released, but that is well within parameters and I don't think is considered a change at all.