Skip to content

Commit b6bc0bb

Browse files
committed
Merge remote-tracking branch 'upstream/main' into concat_batches_for_sort
2 parents ddc9773 + a0eaf51 commit b6bc0bb

File tree

28 files changed

+595
-340
lines changed

28 files changed

+595
-340
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ arrow-flight = { version = "55.1.0", features = [
9898
] }
9999
arrow-ipc = { version = "55.0.0", default-features = false, features = [
100100
"lz4",
101+
"zstd",
101102
] }
102103
arrow-ord = { version = "55.0.0", default-features = false }
103104
arrow-schema = { version = "55.0.0", default-features = false }

datafusion/common/src/config.rs

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
//! Runtime configuration, via [`ConfigOptions`]
1919
20+
use arrow_ipc::CompressionType;
21+
2022
use crate::error::_config_err;
2123
use crate::parsers::CompressionTypeVariant;
2224
use crate::utils::get_available_parallelism;
@@ -274,6 +276,61 @@ config_namespace! {
274276
}
275277
}
276278

279+
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
280+
pub enum SpillCompression {
281+
Zstd,
282+
Lz4Frame,
283+
#[default]
284+
Uncompressed,
285+
}
286+
287+
impl FromStr for SpillCompression {
288+
type Err = DataFusionError;
289+
290+
fn from_str(s: &str) -> Result<Self, Self::Err> {
291+
match s.to_ascii_lowercase().as_str() {
292+
"zstd" => Ok(Self::Zstd),
293+
"lz4_frame" => Ok(Self::Lz4Frame),
294+
"uncompressed" | "" => Ok(Self::Uncompressed),
295+
other => Err(DataFusionError::Configuration(format!(
296+
"Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
297+
))),
298+
}
299+
}
300+
}
301+
302+
impl ConfigField for SpillCompression {
303+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
304+
v.some(key, self, description)
305+
}
306+
307+
fn set(&mut self, _: &str, value: &str) -> Result<()> {
308+
*self = SpillCompression::from_str(value)?;
309+
Ok(())
310+
}
311+
}
312+
313+
impl Display for SpillCompression {
314+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315+
let str = match self {
316+
Self::Zstd => "zstd",
317+
Self::Lz4Frame => "lz4_frame",
318+
Self::Uncompressed => "uncompressed",
319+
};
320+
write!(f, "{str}")
321+
}
322+
}
323+
324+
impl From<SpillCompression> for Option<CompressionType> {
325+
fn from(c: SpillCompression) -> Self {
326+
match c {
327+
SpillCompression::Zstd => Some(CompressionType::ZSTD),
328+
SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
329+
SpillCompression::Uncompressed => None,
330+
}
331+
}
332+
}
333+
277334
config_namespace! {
278335
/// Options related to query execution
279336
///
@@ -294,8 +351,8 @@ config_namespace! {
294351

295352
/// Should DataFusion collect statistics when first creating a table.
296353
/// Has no effect after the table is created. Applies to the default
297-
/// `ListingTableProvider` in DataFusion. Defaults to false.
298-
pub collect_statistics: bool, default = false
354+
/// `ListingTableProvider` in DataFusion. Defaults to true.
355+
pub collect_statistics: bool, default = true
299356

300357
/// Number of partitions for query execution. Increasing partitions can increase
301358
/// concurrency.
@@ -330,6 +387,16 @@ config_namespace! {
330387
/// the new schema verification step.
331388
pub skip_physical_aggregate_schema_check: bool, default = false
332389

390+
/// Sets the compression codec used when spilling data to disk.
391+
///
392+
/// Since datafusion writes spill files using the Arrow IPC Stream format,
393+
/// only codecs supported by the Arrow IPC Stream Writer are allowed.
394+
/// Valid values are: uncompressed, lz4_frame, zstd.
395+
/// Note: lz4_frame offers faster (de)compression, but typically results in
396+
/// larger spill files. In contrast, zstd achieves
397+
/// higher compression ratios at the cost of slower (de)compression speed.
398+
pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
399+
333400
/// Specifies the reserved memory for each spillable sort operation to
334401
/// facilitate an in-memory merge.
335402
///

datafusion/core/src/execution/context/parquet.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,12 @@ impl SessionContext {
3434
///
3535
/// # Note: Statistics
3636
///
37-
/// NOTE: by default, statistics are not collected when reading the Parquet
38-
/// files as this can slow down the initial DataFrame creation. However,
39-
/// collecting statistics can greatly accelerate queries with certain
40-
/// filters.
37+
/// NOTE: by default, statistics are collected when reading the Parquet
38+
/// files This can slow down the initial DataFrame creation while
39+
/// greatly accelerating queries with certain filters.
4140
///
42-
/// To enable collect statistics, set the [config option]
43-
/// `datafusion.execution.collect_statistics` to `true`. See
41+
/// To disable statistics collection, set the [config option]
42+
/// `datafusion.execution.collect_statistics` to `false`. See
4443
/// [`ConfigOptions`] and [`ExecutionOptions::collect_statistics`] for more
4544
/// details.
4645
///
@@ -171,28 +170,28 @@ mod tests {
171170

172171
#[tokio::test]
173172
async fn register_parquet_respects_collect_statistics_config() -> Result<()> {
174-
// The default is false
173+
// The default is true
175174
let mut config = SessionConfig::new();
176175
config.options_mut().explain.physical_plan_only = true;
177176
config.options_mut().explain.show_statistics = true;
178177
let content = explain_query_all_with_config(config).await?;
179-
assert_contains!(content, "statistics=[Rows=Absent,");
178+
assert_contains!(content, "statistics=[Rows=Exact(");
180179

181-
// Explicitly set to false
180+
// Explicitly set to true
182181
let mut config = SessionConfig::new();
183182
config.options_mut().explain.physical_plan_only = true;
184183
config.options_mut().explain.show_statistics = true;
185-
config.options_mut().execution.collect_statistics = false;
184+
config.options_mut().execution.collect_statistics = true;
186185
let content = explain_query_all_with_config(config).await?;
187-
assert_contains!(content, "statistics=[Rows=Absent,");
186+
assert_contains!(content, "statistics=[Rows=Exact(");
188187

189-
// Explicitly set to true
188+
// Explicitly set to false
190189
let mut config = SessionConfig::new();
191190
config.options_mut().explain.physical_plan_only = true;
192191
config.options_mut().explain.show_statistics = true;
193-
config.options_mut().execution.collect_statistics = true;
192+
config.options_mut().execution.collect_statistics = false;
194193
let content = explain_query_all_with_config(config).await?;
195-
assert_contains!(content, "statistics=[Rows=Exact(10),");
194+
assert_contains!(content, "statistics=[Rows=Absent,");
196195

197196
Ok(())
198197
}

datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use datafusion_execution::memory_pool::{
3131
};
3232
use datafusion_expr::display_schema;
3333
use datafusion_physical_plan::spill::get_record_batch_memory_size;
34+
use itertools::Itertools;
3435
use std::time::Duration;
3536

3637
use datafusion_execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder};
@@ -72,6 +73,43 @@ async fn sort_query_fuzzer_runner() {
7273
fuzzer.run().await.unwrap();
7374
}
7475

76+
/// Reproduce the bug with specific seeds from the
77+
/// [failing test case](https://github.com/apache/datafusion/issues/16452).
78+
#[tokio::test(flavor = "multi_thread")]
79+
async fn test_reproduce_sort_query_issue_16452() {
80+
// Seeds from the failing test case
81+
let init_seed = 10313160656544581998u64;
82+
let query_seed = 15004039071976572201u64;
83+
let config_seed_1 = 11807432710583113300u64;
84+
let config_seed_2 = 759937414670321802u64;
85+
86+
let random_seed = 1u64; // Use a fixed seed to ensure consistent behavior
87+
88+
let mut test_generator = SortFuzzerTestGenerator::new(
89+
2000,
90+
3,
91+
"sort_fuzz_table".to_string(),
92+
get_supported_types_columns(random_seed),
93+
false,
94+
random_seed,
95+
);
96+
97+
let mut results = vec![];
98+
99+
for config_seed in [config_seed_1, config_seed_2] {
100+
let r = test_generator
101+
.fuzzer_run(init_seed, query_seed, config_seed)
102+
.await
103+
.unwrap();
104+
105+
results.push(r);
106+
}
107+
108+
for (lhs, rhs) in results.iter().tuple_windows() {
109+
check_equality_of_batches(lhs, rhs).unwrap();
110+
}
111+
}
112+
75113
/// SortQueryFuzzer holds the runner configuration for executing sort query fuzz tests. The fuzzing details are managed inside `SortFuzzerTestGenerator`.
76114
///
77115
/// It defines:

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use arrow::compute::SortOptions;
2828
use arrow::datatypes::{Int32Type, SchemaRef};
2929
use arrow_schema::{DataType, Field, Schema};
3030
use datafusion::assert_batches_eq;
31+
use datafusion::config::SpillCompression;
3132
use datafusion::datasource::memory::MemorySourceConfig;
3233
use datafusion::datasource::source::DataSourceExec;
3334
use datafusion::datasource::{MemTable, TableProvider};
@@ -545,10 +546,11 @@ async fn test_external_sort_zero_merge_reservation() {
545546
// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
546547
// ------------------------------------------------------------------
547548

548-
// Create a new `SessionContext` with speicified disk limit and memory pool limit
549+
// Create a new `SessionContext` with speicified disk limit, memory pool limit, and spill compression codec
549550
async fn setup_context(
550551
disk_limit: u64,
551552
memory_pool_limit: usize,
553+
spill_compression: SpillCompression,
552554
) -> Result<SessionContext> {
553555
let disk_manager = DiskManagerBuilder::default()
554556
.with_mode(DiskManagerMode::OsTmpDirectory)
@@ -570,6 +572,7 @@ async fn setup_context(
570572
let config = SessionConfig::new()
571573
.with_sort_spill_reservation_bytes(64 * 1024) // 256KB
572574
.with_sort_in_place_threshold_bytes(0)
575+
.with_spill_compression(spill_compression)
573576
.with_batch_size(64) // To reduce test memory usage
574577
.with_target_partitions(1);
575578

@@ -580,7 +583,8 @@ async fn setup_context(
580583
/// (specified by `max_temp_directory_size` in `DiskManager`)
581584
#[tokio::test]
582585
async fn test_disk_spill_limit_reached() -> Result<()> {
583-
let ctx = setup_context(1024 * 1024, 1024 * 1024).await?; // 1MB disk limit, 1MB memory limit
586+
let spill_compression = SpillCompression::Uncompressed;
587+
let ctx = setup_context(1024 * 1024, 1024 * 1024, spill_compression).await?; // 1MB disk limit, 1MB memory limit
584588

585589
let df = ctx
586590
.sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1")
@@ -602,7 +606,8 @@ async fn test_disk_spill_limit_reached() -> Result<()> {
602606
#[tokio::test]
603607
async fn test_disk_spill_limit_not_reached() -> Result<()> {
604608
let disk_spill_limit = 1024 * 1024; // 1MB
605-
let ctx = setup_context(disk_spill_limit, 128 * 1024).await?; // 1MB disk limit, 128KB memory limit
609+
let spill_compression = SpillCompression::Uncompressed;
610+
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit
606611

607612
let df = ctx
608613
.sql("select * from generate_series(1, 10000) as t1(v1) order by v1")
@@ -630,6 +635,77 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> {
630635
Ok(())
631636
}
632637

638+
/// External query should succeed using zstd as spill compression codec and
639+
/// and all temporary spill files are properly cleaned up after execution.
640+
/// Note: This test does not inspect file contents (e.g. magic number),
641+
/// as spill files are automatically deleted on drop.
642+
#[tokio::test]
643+
async fn test_spill_file_compressed_with_zstd() -> Result<()> {
644+
let disk_spill_limit = 1024 * 1024; // 1MB
645+
let spill_compression = SpillCompression::Zstd;
646+
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, zstd
647+
648+
let df = ctx
649+
.sql("select * from generate_series(1, 100000) as t1(v1) order by v1")
650+
.await
651+
.unwrap();
652+
let plan = df.create_physical_plan().await.unwrap();
653+
654+
let task_ctx = ctx.task_ctx();
655+
let _ = collect_batches(Arc::clone(&plan), task_ctx)
656+
.await
657+
.expect("Query execution failed");
658+
659+
let spill_count = plan.metrics().unwrap().spill_count().unwrap();
660+
let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();
661+
662+
println!("spill count {spill_count}");
663+
assert!(spill_count > 0);
664+
assert!((spilled_bytes as u64) < disk_spill_limit);
665+
666+
// Verify that all temporary files have been properly cleaned up by checking
667+
// that the total disk usage tracked by the disk manager is zero
668+
let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
669+
assert_eq!(current_disk_usage, 0);
670+
671+
Ok(())
672+
}
673+
674+
/// External query should succeed using lz4_frame as spill compression codec and
675+
/// and all temporary spill files are properly cleaned up after execution.
676+
/// Note: This test does not inspect file contents (e.g. magic number),
677+
/// as spill files are automatically deleted on drop.
678+
#[tokio::test]
679+
async fn test_spill_file_compressed_with_lz4_frame() -> Result<()> {
680+
let disk_spill_limit = 1024 * 1024; // 1MB
681+
let spill_compression = SpillCompression::Lz4Frame;
682+
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, lz4_frame
683+
684+
let df = ctx
685+
.sql("select * from generate_series(1, 100000) as t1(v1) order by v1")
686+
.await
687+
.unwrap();
688+
let plan = df.create_physical_plan().await.unwrap();
689+
690+
let task_ctx = ctx.task_ctx();
691+
let _ = collect_batches(Arc::clone(&plan), task_ctx)
692+
.await
693+
.expect("Query execution failed");
694+
695+
let spill_count = plan.metrics().unwrap().spill_count().unwrap();
696+
let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();
697+
698+
println!("spill count {spill_count}");
699+
assert!(spill_count > 0);
700+
assert!((spilled_bytes as u64) < disk_spill_limit);
701+
702+
// Verify that all temporary files have been properly cleaned up by checking
703+
// that the total disk usage tracked by the disk manager is zero
704+
let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
705+
assert_eq!(current_disk_usage, 0);
706+
707+
Ok(())
708+
}
633709
/// Run the query with the specified memory limit,
634710
/// and verifies the expected errors are returned
635711
#[derive(Clone, Debug)]

datafusion/core/tests/parquet/row_group_pruning.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ macro_rules! int_tests {
421421
.with_query(&format!("SELECT * FROM t where i{} in (100)", $bits))
422422
.with_expected_errors(Some(0))
423423
.with_matched_by_stats(Some(0))
424-
.with_pruned_by_stats(Some(4))
424+
.with_pruned_by_stats(Some(0))
425425
.with_matched_by_bloom_filter(Some(0))
426426
.with_pruned_by_bloom_filter(Some(0))
427427
.with_expected_rows(0)
@@ -1316,7 +1316,7 @@ async fn test_row_group_with_null_values() {
13161316
.with_query("SELECT * FROM t WHERE \"i32\" > 7")
13171317
.with_expected_errors(Some(0))
13181318
.with_matched_by_stats(Some(0))
1319-
.with_pruned_by_stats(Some(3))
1319+
.with_pruned_by_stats(Some(0))
13201320
.with_expected_rows(0)
13211321
.with_matched_by_bloom_filter(Some(0))
13221322
.with_pruned_by_bloom_filter(Some(0))

0 commit comments

Comments
 (0)