Skip to content

fix: bound RepartitionExec channel memory with a byte-budget gate#22092

Closed
JanKaul wants to merge 3 commits into
apache:mainfrom
Embucket:repartition-backpressure
Closed

fix: bound RepartitionExec channel memory with a byte-budget gate#22092
JanKaul wants to merge 3 commits into
apache:mainfrom
Embucket:repartition-backpressure

Conversation

@JanKaul
Copy link
Copy Markdown
Contributor

@JanKaul JanKaul commented May 10, 2026

Which issue does this PR close?

Closes #22090.

Rationale for this change

See #22090 — today's "all-channels-non-empty" gate doesn't catch the case where one consumer lags behind a balanced producer. The slow channel grows linearly per input batch.

What changes are included in this PR?

  • New config datafusion.execution.repartition_buffer_size_bytes (default 100 MiB).
  • distributor_channels.rs: gate predicate becomes empty_channels == 0 || buffered_bytes >= max_buffered_bytes. Items are stored as (T, usize) so receivers refund bytes on pop. Overdraw escape: an empty channel always accepts a push (a single oversize batch can't head-of-line block its consumer). The Mutex<Option<Vec<Waker>>> double-state is collapsed to Mutex<Vec<Waker>> with the counters as the single source of truth.
  • mod.rs: plumbs the config through channels / partition_aware_channels; passes batch size at the data-path send and 0 for spill-marker / sentinel sends.
  • 5 new unit tests: skewed producer parking on B, oversize-overdraw, Gate A still working under a generous B, multiple parked senders waking on release, receiver-drop refund.

Are these changes tested?

Yes. 21 distributor_channels unit tests and 48 repartition tests pass.

Are there any user-facing changes?

One new config option; otherwise transparent. Default value caps in-memory repartition buffering at 100 MiB — workloads that rely on the previous unbounded buffering may want to set it higher.

@github-actions github-actions Bot added documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt) common Related to common crate physical-plan Changes to the physical-plan crate labels May 10, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 10, 2026

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion-common v53.1.0 (current)
       Built [  34.382s] (current)
     Parsing datafusion-common v53.1.0 (current)
      Parsed [   0.059s] (current)
    Building datafusion-common v53.1.0 (baseline)
       Built [  34.002s] (baseline)
     Parsing datafusion-common v53.1.0 (baseline)
      Parsed [   0.057s] (baseline)
    Checking datafusion-common v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.626s] 222 checks: 221 pass, 1 fail, 0 warn, 30 skip

--- failure constructible_struct_adds_field: externally-constructible struct adds field ---

Description:
A pub struct constructible with a struct literal has a new pub field. Existing struct literals must be updated to include the new field.
        ref: https://doc.rust-lang.org/reference/expressions/struct-expr.html
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.47.0/src/lints/constructible_struct_adds_field.ron

Failed in:
  field ExecutionOptions.repartition_buffer_size_bytes in /home/runner/work/datafusion/datafusion/datafusion/common/src/config.rs:469

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [  70.446s] datafusion-common
    Building datafusion-physical-plan v53.1.0 (current)
       Built [  33.435s] (current)
     Parsing datafusion-physical-plan v53.1.0 (current)
      Parsed [   0.130s] (current)
    Building datafusion-physical-plan v53.1.0 (baseline)
       Built [  33.394s] (baseline)
     Parsing datafusion-physical-plan v53.1.0 (baseline)
      Parsed [   0.134s] (baseline)
    Checking datafusion-physical-plan v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.630s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [  69.030s] datafusion-physical-plan
    Building datafusion-sqllogictest v53.1.0 (current)
       Built [ 137.962s] (current)
     Parsing datafusion-sqllogictest v53.1.0 (current)
      Parsed [   0.023s] (current)
    Building datafusion-sqllogictest v53.1.0 (baseline)
       Built [ 139.432s] (baseline)
     Parsing datafusion-sqllogictest v53.1.0 (baseline)
      Parsed [   0.024s] (baseline)
    Checking datafusion-sqllogictest v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.087s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [ 280.465s] datafusion-sqllogictest

@github-actions github-actions Bot added the auto detected api change Auto detected API change label May 10, 2026
@alamb
Copy link
Copy Markdown
Contributor

alamb commented May 10, 2026

The current behavior of RepartitionExec is required to prevent deadlock in certain cases (like repartitioned Merge). I haven't looked at the code in this PR, but given the description I worry that this will potentially cause new deadlocks

I left some more detailed comments here

#22090 (comment)

@JanKaul JanKaul closed this May 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change common Related to common crate documentation Improvements or additions to documentation physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RepartitionExec channels grow unboundedly with one slow consumer

2 participants