Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,8 @@ object CometConf extends ShimCometConf {
.doc(
"The type of memory pool to be used for Comet native execution. " +
"Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', " +
"'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global'. For off-heap " +
"types are 'unified' and `fair_unified`.")
"'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point there are so many options for memory pool that it is bound to confuse users. Would it be a good idea to have a bit of documentation that helps users decide what kind of memory pool to use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I also do not know some of the differences like 'fair_spill_task_shared' vs 'greedy_global'
I filed an issue #1388 and will on separately

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I can add the doc to the place

"For off-heap types are 'unified' and `fair_unified`.")
.stringConf
.createWithDefault("greedy_task_shared")

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.initCap.enabled | Whether to enable initCap by default. | false |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global'. For off-heap types are 'unified' and `fair_unified`. | greedy_task_shared |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. For off-heap types are 'unified' and `fair_unified`. | greedy_task_shared |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
Expand Down
5 changes: 4 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::{
prelude::{SessionConfig, SessionContext},
};
use datafusion_execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
};
use futures::poll;
use jni::{
Expand Down Expand Up @@ -118,6 +118,7 @@ enum MemoryPoolType {
FairSpillTaskShared,
GreedyGlobal,
FairSpillGlobal,
Unbounded,
}

struct MemoryPoolConfig {
Expand Down Expand Up @@ -319,6 +320,7 @@ fn parse_memory_pool_config(
"greedy_global" => MemoryPoolConfig::new(MemoryPoolType::GreedyGlobal, pool_size),
"fair_spill" => MemoryPoolConfig::new(MemoryPoolType::FairSpill, pool_size_per_task),
"greedy" => MemoryPoolConfig::new(MemoryPoolType::Greedy, pool_size_per_task),
"unbounded" => MemoryPoolConfig::new(MemoryPoolType::Unbounded, 0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I don't know if it is ever a good idea to allow an unbounded memory pool. It doesn't hurt to have the option, but under what conditions is this choice useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the unbounded memory pool is useful for the cases we do not want to allow any spilling and rather choose to fail the job. Spilling slows down the job a lot and the unbounded memory pool is one way to measure the best case without adjusting how much memory we provide for native exec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can add this advice in the documentation. Speeding up the job sounds like a win until the jobs start OOMing. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the value in having the unbounded tool for development testing purposes but not for end user use.

I would like to use it to help with #1315

_ => {
return Err(CometError::Config(format!(
"Unsupported memory pool type: {}",
Expand Down Expand Up @@ -397,6 +399,7 @@ fn create_memory_pool(
per_task_memory_pool.num_plans += 1;
Arc::clone(&per_task_memory_pool.memory_pool)
}
MemoryPoolType::Unbounded => Arc::new(UnboundedMemoryPool::default()),
}
}

Expand Down
Loading