Skip to content

Commit c39329e

Browse files
committed
reclaimer state
1 parent 23c357d commit c39329e

4 files changed

Lines changed: 124 additions & 43 deletions

File tree

datafusion/execution/src/memory_pool/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub use datafusion_common::{
3939
human_readable_count, human_readable_duration, human_readable_size, units,
4040
};
4141
pub use pool::*;
42-
pub use reclaimer::MemoryReclaimer;
42+
pub use reclaimer::{MemoryReclaimer, reclaimer_state};
4343

4444
/// Tracks and potentially limits memory use across operators during execution.
4545
///

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 69 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::memory_pool::reclaimer::reclaimer_state;
1819
use crate::memory_pool::{
1920
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReclaimer, MemoryReservation,
2021
human_readable_size,
@@ -29,7 +30,7 @@ use std::pin::Pin;
2930
use std::sync::Arc;
3031
use std::{
3132
num::NonZeroUsize,
32-
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
33+
sync::atomic::{AtomicU8, AtomicUsize, Ordering},
3334
};
3435

3536
/// A [`MemoryPool`] that enforces no limit
@@ -329,34 +330,49 @@ struct TrackedConsumer {
329330
reserved: AtomicUsize,
330331
peak: AtomicUsize,
331332
reclaimer: Option<Arc<dyn MemoryReclaimer>>,
332-
/// Set to `true` while a reclaim is in-flight on this consumer, so
333-
/// concurrent `try_grow_async` callers skip this victim instead of
334-
/// double-reclaiming. Reset via [`ReclaimInFlightGuard`] on drop
335-
/// (cancel-safe).
336-
reclaim_in_flight: Arc<AtomicBool>,
333+
/// Tri-state eligibility flag for [`reclaimer`], encoded per
334+
/// [`reclaimer_state`]. The pool flips `AVAILABLE` ↔ `IN_FLIGHT`
335+
/// for dedup; the reclaimer's owner may sticky-set `DISABLED` once
336+
/// it can no longer free memory. Shared `Arc` so the reclaimer
337+
/// side and the pool see the same cell. `None` reclaimer ⇒ flag
338+
/// is unused but still allocated.
339+
reclaimer_state: Arc<AtomicU8>,
337340
}
338341

339-
/// RAII guard that clears [`TrackedConsumer::reclaim_in_flight`] on drop.
340-
struct ReclaimInFlightGuard {
341-
flag: Arc<AtomicBool>,
342+
/// RAII guard for the [`IN_FLIGHT`] slot of a [`TrackedConsumer`]'s
343+
/// `reclaimer_state` flag. `Drop` only restores `AVAILABLE` if the
344+
/// state is still `IN_FLIGHT` — leaves a sticky `DISABLED` alone.
345+
///
346+
/// [`IN_FLIGHT`]: reclaimer_state::IN_FLIGHT
347+
struct ReclaimerStateGuard {
348+
flag: Arc<AtomicU8>,
342349
}
343350

344-
impl Drop for ReclaimInFlightGuard {
351+
impl Drop for ReclaimerStateGuard {
345352
fn drop(&mut self) {
346-
self.flag.store(false, Ordering::Release);
353+
let _ = self.flag.compare_exchange(
354+
reclaimer_state::IN_FLIGHT,
355+
reclaimer_state::AVAILABLE,
356+
Ordering::AcqRel,
357+
Ordering::Relaxed,
358+
);
347359
}
348360
}
349361

350-
impl ReclaimInFlightGuard {
351-
/// Atomically claims the in-flight slot, returning `Some(guard)` on
352-
/// success or `None` if another reclaim is already running on this
353-
/// consumer.
354-
fn try_acquire(flag: &Arc<AtomicBool>) -> Option<Self> {
355-
flag.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
356-
.ok()
357-
.map(|_| Self {
358-
flag: Arc::clone(flag),
359-
})
362+
impl ReclaimerStateGuard {
363+
/// Try to transition the flag from `AVAILABLE` to `IN_FLIGHT`.
364+
/// Fails on contention or on a sticky `DISABLED`.
365+
fn try_acquire(flag: &Arc<AtomicU8>) -> Option<Self> {
366+
flag.compare_exchange(
367+
reclaimer_state::AVAILABLE,
368+
reclaimer_state::IN_FLIGHT,
369+
Ordering::AcqRel,
370+
Ordering::Relaxed,
371+
)
372+
.ok()
373+
.map(|_| Self {
374+
flag: Arc::clone(flag),
375+
})
360376
}
361377
}
362378

@@ -598,6 +614,16 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
598614
fn register(&self, consumer: &MemoryConsumer) {
599615
self.inner.register(consumer);
600616

617+
let reclaimer = consumer.reclaimer().cloned();
618+
// Reuse the reclaimer's own flag when it provides one — that
619+
// way the reclaimer side can sticky-set `DISABLED` and the
620+
// pool sees it on the next filter pass. Otherwise allocate a
621+
// fresh `AVAILABLE` flag for in-flight dedup only.
622+
let state = reclaimer
623+
.as_ref()
624+
.and_then(|r| r.reclaimer_state())
625+
.unwrap_or_else(|| Arc::new(AtomicU8::new(reclaimer_state::AVAILABLE)));
626+
601627
let mut guard = self.tracked_consumers.write();
602628
let existing = guard.insert(
603629
consumer.id(),
@@ -606,8 +632,8 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
606632
can_spill: consumer.can_spill(),
607633
reserved: Default::default(),
608634
peak: Default::default(),
609-
reclaimer: consumer.reclaimer().cloned(),
610-
reclaim_in_flight: Arc::new(AtomicBool::new(false)),
635+
reclaimer,
636+
reclaimer_state: state,
611637
},
612638
);
613639

@@ -690,11 +716,10 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
690716
// mutual-reclaim cycle (A targets B while B targets A) — at
691717
// most one side of any pair can hold strictly more memory,
692718
// so the other side has no candidates and surfaces an error
693-
// for the caller's self-spill fallback. Skip zero-byte
694-
// consumers and drop the read guard before awaiting any
695-
// reclaim. Clone each victim's `reclaim_in_flight` flag so
696-
// concurrent `try_grow_async` callers dedup against in-flight
697-
// reclaims on the same victim.
719+
// for the caller's self-spill fallback. Filter out anyone
720+
// whose `reclaimer_state` flag is not `AVAILABLE` (in-flight or
721+
// sticky-disabled). Drop the read guard before awaiting any
722+
// reclaim.
698723
let requestor_id = reservation.consumer().id();
699724
let requestor_reserved = {
700725
let guard = self.tracked_consumers.read();
@@ -703,7 +728,7 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
703728
.map(|tc| tc.reserved())
704729
.unwrap_or(0)
705730
};
706-
let mut candidates: Vec<(usize, Arc<dyn MemoryReclaimer>, Arc<AtomicBool>)> = {
731+
let mut candidates: Vec<(usize, Arc<dyn MemoryReclaimer>, Arc<AtomicU8>)> = {
707732
let guard = self.tracked_consumers.read();
708733
guard
709734
.iter()
@@ -712,10 +737,15 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
712737
if *cid == requestor_id || tc.reserved() <= requestor_reserved {
713738
return None;
714739
}
740+
if tc.reclaimer_state.load(Ordering::Acquire)
741+
!= reclaimer_state::AVAILABLE
742+
{
743+
return None;
744+
}
715745
Some((
716746
tc.reserved(),
717747
Arc::clone(reclaimer),
718-
Arc::clone(&tc.reclaim_in_flight),
748+
Arc::clone(&tc.reclaimer_state),
719749
))
720750
})
721751
.collect()
@@ -728,15 +758,15 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
728758
candidates.truncate(self.reclaim_candidate_limit.get());
729759

730760
// For each candidate: try to claim its in-flight slot
731-
// (skip on contention so we work on a different victim
732-
// rather than serializing behind a sibling's reclaim);
733-
// re-check `try_grow` before reclaiming in case a sibling
734-
// already freed enough; reclaim; retry `try_grow`. The
735-
// retry path goes through `self.try_grow`, which already
736-
// updates the tracked consumer's atomic reservation — no
737-
// manual accounting needed here.
738-
for (_, reclaimer, in_flight) in candidates {
739-
let _g = match ReclaimInFlightGuard::try_acquire(&in_flight) {
761+
// (skip on contention or sticky-disabled so we work on a
762+
// different victim rather than serializing behind a
763+
// sibling's reclaim); re-check `try_grow` before reclaiming
764+
// in case a sibling already freed enough; reclaim; retry
765+
// `try_grow`. The retry path goes through `self.try_grow`,
766+
// which already updates the tracked consumer's atomic
767+
// reservation — no manual accounting needed here.
768+
for (_, reclaimer, flag) in candidates {
769+
let _g = match ReclaimerStateGuard::try_acquire(&flag) {
740770
Some(g) => g,
741771
None => continue,
742772
};

datafusion/execution/src/memory_pool/reclaimer.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,22 @@
2222
2323
use datafusion_common::Result;
2424
use std::fmt::Debug;
25+
use std::sync::Arc;
26+
use std::sync::atomic::AtomicU8;
27+
28+
/// Encoded values stored in the [`reclaimer_state`] tri-state.
29+
///
30+
/// [`reclaimer_state`]: MemoryReclaimer::reclaimer_state
31+
pub mod reclaimer_state {
32+
/// Reclaimer is idle and may be selected as a victim.
33+
pub const AVAILABLE: u8 = 0;
34+
/// A pool task is currently driving `reclaim` on this reclaimer.
35+
pub const IN_FLIGHT: u8 = 1;
36+
/// Reclaimer has been retired (e.g. operator entered a phase where
37+
/// it can no longer free memory). Sticky — never returns to
38+
/// `AVAILABLE`.
39+
pub const DISABLED: u8 = 2;
40+
}
2541

2642
/// Hook attached to a [`MemoryConsumer`] via
2743
/// [`MemoryConsumer::with_reclaimer`]. On
@@ -58,4 +74,15 @@ pub trait MemoryReclaimer: Send + Sync + Debug {
5874
fn priority(&self) -> i32 {
5975
0
6076
}
77+
78+
/// Optional shared tri-state flag controlling whether the pool
79+
/// currently considers this reclaimer eligible. Values are defined
80+
/// in [`reclaimer_state`]. Returning `Some(arc)` lets the
81+
/// reclaimer's owner flip itself to `DISABLED` once it can no
82+
/// longer free memory (e.g., on entering a merge phase), which
83+
/// the pool observes immediately. Returning `None` lets the pool
84+
/// allocate its own private flag — used only for in-flight dedup.
85+
fn reclaimer_state(&self) -> Option<Arc<AtomicU8>> {
86+
None
87+
}
6188
}

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ use datafusion_common::{
6666
};
6767
use datafusion_execution::TaskContext;
6868
use datafusion_execution::memory_pool::{
69-
MemoryConsumer, MemoryReclaimer, MemoryReservation,
69+
MemoryConsumer, MemoryReclaimer, MemoryReservation, reclaimer_state,
7070
};
7171
use datafusion_execution::runtime_env::RuntimeEnv;
7272
use datafusion_physical_expr::LexOrdering;
@@ -82,6 +82,10 @@ use log::{debug, trace};
8282
#[derive(Debug)]
8383
struct ExternalSorterReclaimer {
8484
tx: tokio::sync::mpsc::Sender<tokio::sync::oneshot::Sender<usize>>,
85+
/// Shared with the pool's `TrackedConsumer` entry. Stream loop
86+
/// flips it to `DISABLED` on merge entry so the pool stops
87+
/// targeting this consumer.
88+
reclaimer_state: Arc<std::sync::atomic::AtomicU8>,
8589
}
8690

8791
#[async_trait::async_trait]
@@ -94,6 +98,10 @@ impl MemoryReclaimer for ExternalSorterReclaimer {
9498
}
9599
Ok(resp_rx.await.unwrap_or(0))
96100
}
101+
102+
fn reclaimer_state(&self) -> Option<Arc<std::sync::atomic::AtomicU8>> {
103+
Some(Arc::clone(&self.reclaimer_state))
104+
}
97105
}
98106

99107
struct ExternalSorterMetrics {
@@ -1282,8 +1290,14 @@ impl ExecutionPlan for SortExec {
12821290
// Spill-request channel; drained by the stream loop below.
12831291
let (reclaim_tx, mut reclaim_rx) =
12841292
tokio::sync::mpsc::channel::<tokio::sync::oneshot::Sender<usize>>(4);
1293+
let state = Arc::new(std::sync::atomic::AtomicU8::new(
1294+
reclaimer_state::AVAILABLE,
1295+
));
12851296
let reclaimer: Arc<dyn MemoryReclaimer> =
1286-
Arc::new(ExternalSorterReclaimer { tx: reclaim_tx });
1297+
Arc::new(ExternalSorterReclaimer {
1298+
tx: reclaim_tx,
1299+
reclaimer_state: Arc::clone(&state),
1300+
});
12871301

12881302
let mut sorter = ExternalSorter::new(
12891303
partition,
@@ -1338,7 +1352,17 @@ impl ExecutionPlan for SortExec {
13381352
}
13391353
}
13401354
}
1341-
// Late reclaim requests now resolve to Ok(0).
1355+
// Sticky-disable so concurrent `try_grow_async`
1356+
// callers stop targeting this consumer once we
1357+
// enter the merge phase. Set before dropping
1358+
// the receiver to close any window where the
1359+
// pool would observe `AVAILABLE` after the
1360+
// channel is gone (and hence get `Ok(0)` from a
1361+
// wasted `reclaim`).
1362+
state.store(
1363+
reclaimer_state::DISABLED,
1364+
std::sync::atomic::Ordering::Release,
1365+
);
13421366
drop(reclaim_rx);
13431367
sorter.sort().await
13441368
})

0 commit comments

Comments
 (0)