Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions benchmarks/src/tpcds/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ pub struct RunOpt {
/// The tables should have been created with the `--sort` option for this to have any effect.
#[arg(short = 't', long = "sorted")]
sorted: bool,

/// How many bytes to buffer on the probe side of hash joins.
#[arg(long, default_value = "1048576")]
hash_join_buffering_capacity: usize,
}

impl RunOpt {
Expand All @@ -162,6 +166,8 @@ impl RunOpt {
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config.options_mut().optimizer.enable_piecewise_merge_join =
self.enable_piecewise_merge_join;
config.options_mut().execution.hash_join_buffering_capacity =
self.hash_join_buffering_capacity;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
// register tables
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl StatementExecutor {
let options = task_ctx.session_config().options();

// Track memory usage for the query result if it's bounded
let mut reservation =
let reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());

if physical_plan.boundedness().is_unbounded() {
Expand Down
11 changes: 11 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,17 @@ config_namespace! {
/// # Default
/// `false` — ANSI SQL mode is disabled by default.
pub enable_ansi_mode: bool, default = false

/// How many bytes to buffer in the probe side of hash joins while the build side is
/// concurrently being built.
///
/// Without this, hash joins will wait until the full materialization of the build side
/// before polling the probe side. This is useful in scenarios where the query is not
/// completely CPU bounded, allowing to do some early work concurrently and reducing the
/// latency of the query.
///
/// 1Mb by default. Set to 0 for disabling it.
pub hash_join_buffering_capacity: usize, default = 1024 * 1024
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2186,7 +2186,7 @@ mod tests {
// configure with same memory / disk manager
let memory_pool = ctx1.runtime_env().memory_pool.clone();

let mut reservation = MemoryConsumer::new("test").register(&memory_pool);
let reservation = MemoryConsumer::new("test").register(&memory_pool);
reservation.grow(100);

let disk_manager = ctx1.runtime_env().disk_manager.clone();
Expand Down
16 changes: 10 additions & 6 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2967,8 +2967,9 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
| physical_plan | HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
| | ProjectionExec: expr=[4 as count(*)] |
| | PlaceholderRowExec |
| | ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 AS Int64) as CAST(t1.a AS Int64)] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | BufferExec: capacity=1048576 |
| | ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 AS Int64) as CAST(t1.a AS Int64)] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------+
"
Expand Down Expand Up @@ -3011,8 +3012,9 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
| physical_plan | HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
| | ProjectionExec: expr=[4 as count(*)] |
| | PlaceholderRowExec |
| | ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 AS Int64) as CAST(t1.a AS Int64)] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | BufferExec: capacity=1048576 |
| | ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 AS Int64) as CAST(t1.a AS Int64)] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------+
"
Expand Down Expand Up @@ -3351,7 +3353,8 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | BufferExec: capacity=1048576 |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
"
Expand Down Expand Up @@ -3407,7 +3410,8 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 |
| | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | BufferExec: capacity=1048576 |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
"
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,9 @@ async fn test_physical_plan_display_indent_multi_children() {
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1@0 as c2], file_type=csv, has_header=true
BufferExec: capacity=1048576
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1@0 as c2], file_type=csv, has_header=true
"
);
}
Expand Down
10 changes: 5 additions & 5 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ impl FileSink for ParquetSink {
parquet_props.clone(),
)
.await?;
let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
let reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
.register(context.memory_pool());
file_write_tasks.spawn(async move {
while let Some(batch) = rx.recv().await {
Expand Down Expand Up @@ -1465,7 +1465,7 @@ impl DataSink for ParquetSink {
async fn column_serializer_task(
mut rx: Receiver<ArrowLeafColumn>,
mut writer: ArrowColumnWriter,
mut reservation: MemoryReservation,
reservation: MemoryReservation,
) -> Result<(ArrowColumnWriter, MemoryReservation)> {
while let Some(col) = rx.recv().await {
writer.write(&col)?;
Expand Down Expand Up @@ -1550,7 +1550,7 @@ fn spawn_rg_join_and_finalize_task(
rg_rows: usize,
pool: &Arc<dyn MemoryPool>,
) -> SpawnedTask<RBStreamSerializeResult> {
let mut rg_reservation =
let rg_reservation =
MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);

SpawnedTask::spawn(async move {
Expand Down Expand Up @@ -1682,12 +1682,12 @@ async fn concatenate_parallel_row_groups(
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
pool: Arc<dyn MemoryPool>,
) -> Result<ParquetMetaData> {
let mut file_reservation =
let file_reservation =
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);

while let Some(task) = serialize_rx.recv().await {
let result = task.join_unwind().await;
let (serialized_columns, mut rg_reservation, _cnt) =
let (serialized_columns, rg_reservation, _cnt) =
result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;

let mut rg_out = parquet_writer.next_row_group()?;
Expand Down
26 changes: 26 additions & 0 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ use itertools::Itertools;

use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::SortOrderPushdownResult;
Expand Down Expand Up @@ -410,6 +412,30 @@ impl ExecutionPlan for DataSourceExec {
as Arc<dyn ExecutionPlan>
})
}

fn dynamic_filters(&self) -> Vec<Arc<DynamicFilterPhysicalExpr>> {
let Some(node) = self.data_source.as_any().downcast_ref::<FileScanConfig>()
else {
return vec![];
};

let Some(filter) = node.file_source.filter() else {
return vec![];
};

let mut filters = vec![];
let _ = filter.transform_down(|expr| {
if let Ok(dynamic_filter) =
Arc::downcast::<DynamicFilterPhysicalExpr>(Arc::clone(&expr) as _)
{
filters.push(dynamic_filter);
};

Ok(Transformed::no(expr))
});

filters
}
}

impl DataSourceExec {
Expand Down
Loading