Skip to content

Commit 9510938

Browse files
committed
add self guard
1 parent c39329e commit 9510938

3 files changed

Lines changed: 131 additions & 66 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/execution/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ parking_lot = { workspace = true }
6666
parquet = { workspace = true, optional = true }
6767
rand = { workspace = true }
6868
tempfile = { workspace = true }
69+
tokio = { workspace = true, features = ["time"] }
6970
url = { workspace = true }
7071

7172
[dev-dependencies]

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 129 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,24 @@ use std::fmt::{Display, Formatter};
2828
use std::future::Future;
2929
use std::pin::Pin;
3030
use std::sync::Arc;
31+
use std::time::Duration;
3132
use std::{
3233
num::NonZeroUsize,
3334
sync::atomic::{AtomicU8, AtomicUsize, Ordering},
3435
};
3536

37+
/// How long [`TrackConsumersPool::try_grow_async`] waits for an
38+
/// in-flight sibling to finish reclaiming before retrying. Kept short
39+
/// so we don't stall the requestor longer than the typical reclaim
40+
/// (mpsc send + spill commit).
41+
const RECLAIM_RETRY_SLEEP: Duration = Duration::from_millis(50);
42+
43+
/// Maximum number of times [`TrackConsumersPool::try_grow_async`]
44+
/// retries the candidate walk while siblings are still in-flight.
45+
/// Bounds the total wait at `MAX_RECLAIM_RETRIES * RECLAIM_RETRY_SLEEP`
46+
/// so a livelock surfaces as OOM rather than a hang.
47+
const MAX_RECLAIM_RETRIES: usize = 3;
48+
3649
/// A [`MemoryPool`] that enforces no limit
3750
#[derive(Debug, Default)]
3851
pub struct UnboundedMemoryPool {
@@ -709,77 +722,127 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
709722
Err(e) => e,
710723
};
711724

712-
// Snapshot reclaimers. Only consumers strictly larger than
713-
// the requestor are eligible: smaller-or-equal siblings would
714-
// free less than the requestor itself can, so the requestor
715-
// should self-spill instead. This rule also breaks the
716-
// mutual-reclaim cycle (A targets B while B targets A) — at
717-
// most one side of any pair can hold strictly more memory,
718-
// so the other side has no candidates and surfaces an error
719-
// for the caller's self-spill fallback. Filter out anyone
720-
// whose `reclaimer_state` flag is not `AVAILABLE` (in-flight or
721-
// sticky-disabled). Drop the read guard before awaiting any
722-
// reclaim.
725+
// Mark the requestor as IN_FLIGHT for the duration of this
726+
// walk. Without this, a victim's reclaim handler that
727+
// recursively triggers `pool.reclaim` (e.g. a merge stream
728+
// started inside an `ExternalSorter` spill) could pick the
729+
// requestor as its own victim, send it a reclaim oneshot,
730+
// and deadlock — the requestor is blocked here at
731+
// `reclaimer.reclaim().await` and can't drain its own
732+
// reclaim channel. Sticky-disabled or already-in-flight
733+
// requestors aren't acquired; the walk proceeds without
734+
// protection (the candidate filter still rejects the
735+
// requestor by id).
723736
let requestor_id = reservation.consumer().id();
724-
let requestor_reserved = {
725-
let guard = self.tracked_consumers.read();
726-
guard
727-
.get(&requestor_id)
728-
.map(|tc| tc.reserved())
729-
.unwrap_or(0)
730-
};
731-
let mut candidates: Vec<(usize, Arc<dyn MemoryReclaimer>, Arc<AtomicU8>)> = {
732-
let guard = self.tracked_consumers.read();
733-
guard
734-
.iter()
735-
.filter_map(|(cid, tc)| {
736-
let reclaimer = tc.reclaimer.as_ref()?;
737-
if *cid == requestor_id || tc.reserved() <= requestor_reserved {
738-
return None;
739-
}
740-
if tc.reclaimer_state.load(Ordering::Acquire)
741-
!= reclaimer_state::AVAILABLE
742-
{
743-
return None;
744-
}
745-
Some((
746-
tc.reserved(),
747-
Arc::clone(reclaimer),
748-
Arc::clone(&tc.reclaimer_state),
749-
))
750-
})
751-
.collect()
752-
};
753-
// Order: priority desc, then reservation size desc.
754-
candidates.sort_by(|(lr, l, _), (rr, r, _)| {
755-
r.priority().cmp(&l.priority()).then_with(|| rr.cmp(lr))
756-
});
757-
// Cap reclaim work — only consider the top-ranked candidates.
758-
candidates.truncate(self.reclaim_candidate_limit.get());
759-
760-
// For each candidate: try to claim its in-flight slot
761-
// (skip on contention or sticky-disabled so we work on a
762-
// different victim rather than serializing behind a
763-
// sibling's reclaim); re-check `try_grow` before reclaiming
764-
// in case a sibling already freed enough; reclaim; retry
765-
// `try_grow`. The retry path goes through `self.try_grow`,
766-
// which already updates the tracked consumer's atomic
767-
// reservation — no manual accounting needed here.
768-
for (_, reclaimer, flag) in candidates {
769-
let _g = match ReclaimerStateGuard::try_acquire(&flag) {
770-
Some(g) => g,
771-
None => continue,
737+
let _self_guard = self
738+
.tracked_consumers
739+
.read()
740+
.get(&requestor_id)
741+
.and_then(|tc| ReclaimerStateGuard::try_acquire(&tc.reclaimer_state));
742+
743+
let mut retries: usize = 0;
744+
loop {
745+
// Snapshot reclaimers. Only consumers strictly larger than
746+
// the requestor are eligible: smaller-or-equal siblings would
747+
// free less than the requestor itself can, so the requestor
748+
// should self-spill instead. This rule also breaks the
749+
// mutual-reclaim cycle (A targets B while B targets A) — at
750+
// most one side of any pair can hold strictly more memory,
751+
// so the other side has no candidates and surfaces an error
752+
// for the caller's self-spill fallback. Filter out anyone
753+
// whose `reclaimer_state` flag is not `AVAILABLE` (in-flight or
754+
// sticky-disabled). Also count IN_FLIGHT siblings so we know
755+
// whether to wait briefly for them to finish before giving up.
756+
// Drop the read guard before awaiting any reclaim.
757+
let requestor_reserved = {
758+
let guard = self.tracked_consumers.read();
759+
guard
760+
.get(&requestor_id)
761+
.map(|tc| tc.reserved())
762+
.unwrap_or(0)
763+
};
764+
let mut in_flight_seen: usize = 0;
765+
let mut candidates: Vec<(
766+
usize,
767+
Arc<dyn MemoryReclaimer>,
768+
Arc<AtomicU8>,
769+
)> = {
770+
let guard = self.tracked_consumers.read();
771+
guard
772+
.iter()
773+
.filter_map(|(cid, tc)| {
774+
if *cid == requestor_id {
775+
return None;
776+
}
777+
// Track in-flight siblings (any size) so we can
778+
// decide whether a retry has any chance of helping.
779+
let state = tc.reclaimer_state.load(Ordering::Acquire);
780+
if state == reclaimer_state::IN_FLIGHT {
781+
in_flight_seen += 1;
782+
}
783+
let reclaimer = tc.reclaimer.as_ref()?;
784+
if tc.reserved() <= requestor_reserved {
785+
return None;
786+
}
787+
if state != reclaimer_state::AVAILABLE {
788+
return None;
789+
}
790+
Some((
791+
tc.reserved(),
792+
Arc::clone(reclaimer),
793+
Arc::clone(&tc.reclaimer_state),
794+
))
795+
})
796+
.collect()
772797
};
773-
if self.try_grow(reservation, additional).is_ok() {
774-
return Ok(());
798+
// Order: priority desc, then reservation size desc.
799+
candidates.sort_by(|(lr, l, _), (rr, r, _)| {
800+
r.priority().cmp(&l.priority()).then_with(|| rr.cmp(lr))
801+
});
802+
// Cap reclaim work — only consider the top-ranked candidates.
803+
candidates.truncate(self.reclaim_candidate_limit.get());
804+
805+
// For each candidate: try to claim its in-flight slot
806+
// (skip on contention or sticky-disabled so we work on a
807+
// different victim rather than serializing behind a
808+
// sibling's reclaim); re-check `try_grow` before reclaiming
809+
// in case a sibling already freed enough; reclaim; retry
810+
// `try_grow`. The retry path goes through `self.try_grow`,
811+
// which already updates the tracked consumer's atomic
812+
// reservation — no manual accounting needed here.
813+
for (_, reclaimer, flag) in candidates {
814+
let _g = match ReclaimerStateGuard::try_acquire(&flag) {
815+
Some(g) => g,
816+
None => continue,
817+
};
818+
if self.try_grow(reservation, additional).is_ok() {
819+
return Ok(());
820+
}
821+
if let Err(e) = reclaimer.reclaim(additional).await {
822+
debug!("memory reclaimer returned error: {e}");
823+
continue;
824+
}
825+
if self.try_grow(reservation, additional).is_ok() {
826+
return Ok(());
827+
}
775828
}
776-
if let Err(e) = reclaimer.reclaim(additional).await {
777-
debug!("memory reclaimer returned error: {e}");
829+
830+
// Walk produced nothing usable. If other consumers are
831+
// currently reclaiming for someone else, their freed bytes
832+
// may land in the pool shortly — wait briefly and retry
833+
// before falling through to OOM. Bounded so we don't stall
834+
// forever on a livelock.
835+
if in_flight_seen > 0 && retries < MAX_RECLAIM_RETRIES {
836+
retries += 1;
837+
tokio::time::sleep(RECLAIM_RETRY_SLEEP).await;
838+
// Quick fast-path retry: an in-flight sibling may have
839+
// freed bytes during the sleep.
840+
if self.try_grow(reservation, additional).is_ok() {
841+
return Ok(());
842+
}
778843
continue;
779844
}
780-
if self.try_grow(reservation, additional).is_ok() {
781-
return Ok(());
782-
}
845+
break;
783846
}
784847

785848
// Fall through to the inner pool's own reclaim path, if any.

0 commit comments

Comments
 (0)