Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,24 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {

/// Return the total amount of memory reserved
fn reserved(&self) -> usize;

/// Return the memory limit of the pool
///
/// The default implementation of `MemoryPool::memory_limit`
/// will return `MemoryLimit::Unknown`.
/// If you are using your custom memory pool, but have the requirement to
/// know the memory usage limit of the pool, please implement this method
/// to return it(`Memory::Finite(limit)`).
fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Unknown
}
}

/// Memory limit of `MemoryPool`
pub enum MemoryLimit {
Infinite,
Finite(usize),
Unknown,
}

/// A memory consumer is a named allocation traced by a particular
Expand Down
18 changes: 17 additions & 1 deletion datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use crate::memory_pool::{MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation};
use datafusion_common::HashMap;
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use log::debug;
Expand Down Expand Up @@ -48,6 +48,10 @@ impl MemoryPool for UnboundedMemoryPool {
fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}

fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Infinite
}
}

/// A [`MemoryPool`] that implements a greedy first-come first-serve limit.
Expand Down Expand Up @@ -100,6 +104,10 @@ impl MemoryPool for GreedyMemoryPool {
fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}

fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Finite(self.pool_size)
}
}

/// A [`MemoryPool`] that prevents spillable reservations from using more than
Expand Down Expand Up @@ -233,6 +241,10 @@ impl MemoryPool for FairSpillPool {
let state = self.state.lock();
state.spillable + state.unspillable
}

fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Finite(self.pool_size)
}
}

/// Constructs a resources error based upon the individual [`MemoryReservation`].
Expand Down Expand Up @@ -408,6 +420,10 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
fn reserved(&self) -> usize {
self.inner.reserved()
}

fn memory_limit(&self) -> MemoryLimit {
self.inner.memory_limit()
}
}

fn provide_top_memory_consumers_to_error_msg(
Expand Down