From 4a38083f94183b1598b0a9d589fad1e05d78b357 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 6 Jan 2026 09:55:03 -0800 Subject: [PATCH 1/3] Apply sort node --- .../datafusion/src/physical_plan/sort.rs | 2 -- .../integrations/datafusion/src/table/mod.rs | 33 ++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/sort.rs b/crates/integrations/datafusion/src/physical_plan/sort.rs index ede2547535..4a7af22e28 100644 --- a/crates/integrations/datafusion/src/physical_plan/sort.rs +++ b/crates/integrations/datafusion/src/physical_plan/sort.rs @@ -42,8 +42,6 @@ use iceberg::arrow::PROJECTED_PARTITION_VALUE_COLUMN; /// # Returns /// * `Ok(Arc)` - A SortExec that sorts by partition values /// * `Err` - If the partition column is not found -/// -/// TODO remove dead_code mark when integrating with insert_into #[allow(dead_code)] pub(crate) fn sort_by_partition(input: Arc) -> DFResult> { let schema = input.schema(); diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 86a79611b3..8d14bcd101 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -44,6 +44,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use iceberg::arrow::schema_to_arrow_schema; use iceberg::inspect::MetadataTableType; +use iceberg::spec::TableProperties; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use metadata_table::IcebergMetadataTableProvider; @@ -53,6 +54,7 @@ use crate::physical_plan::commit::IcebergCommitExec; use crate::physical_plan::project::project_with_partition; use crate::physical_plan::repartition::repartition; use crate::physical_plan::scan::IcebergTableScan; +use crate::physical_plan::sort::sort_by_partition; use crate::physical_plan::write::IcebergWriteExec; /// Catalog-backed table provider with automatic metadata refresh. @@ -185,9 +187,38 @@ impl TableProvider for IcebergTableProvider { let repartitioned_plan = repartition(plan_with_partition, table.metadata_ref(), target_partitions)?; + // Apply sort node when it's not fanout mode + let fanout_enabled = table + .metadata() + .properties() + .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED) + .map(|value| { + value + .parse::() + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid value for {}, expected 'true' or 'false'", + TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED + ), + ) + .with_source(e) + }) + .map_err(to_datafusion_error) + }) + .transpose()? + .unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT); + + let write_input = if fanout_enabled { + repartitioned_plan + } else { + sort_by_partition(repartitioned_plan)? + }; + let write_plan = Arc::new(IcebergWriteExec::new( table.clone(), - repartitioned_plan, + write_input, self.schema.clone(), )); From 6c3826c61f6594770b798caaa6d85eaaa28d5e59 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 6 Jan 2026 12:02:15 -0800 Subject: [PATCH 2/3] Add ut --- .../integrations/datafusion/src/table/mod.rs | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 8d14bcd101..ad616542a4 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -352,6 +352,7 @@ mod tests { use std::sync::Arc; use datafusion::common::Column; + use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use iceberg::io::FileIO; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; @@ -629,4 +630,148 @@ mod tests { assert_eq!(logical_field.data_type(), physical_field.data_type()); } } + + async fn get_partitioned_test_catalog_and_table( + fanout_enabled: Option, + ) -> (Arc, NamespaceIdent, String, TempDir) { + use iceberg::spec::{Transform, UnboundPartitionSpec}; + + let temp_dir = TempDir::new().unwrap(); + let warehouse_path = temp_dir.path().to_str().unwrap().to_string(); + + let catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]), + ) + .await + .unwrap(); + + let namespace = NamespaceIdent::new("test_ns".to_string()); + catalog + .create_namespace(&namespace, HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "category", Transform::Identity) + .unwrap() + .build(); + + let mut properties = HashMap::new(); + if let Some(enabled) = fanout_enabled { + properties.insert( + iceberg::spec::TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED + .to_string(), + enabled.to_string(), + ); + } + + let table_creation = TableCreation::builder() + .name("partitioned_table".to_string()) + .location(format!("{warehouse_path}/partitioned_table")) + .schema(schema) + .partition_spec(partition_spec) + .properties(properties) + .build(); + + catalog + .create_table(&namespace, table_creation) + .await + .unwrap(); + + ( + Arc::new(catalog), + namespace, + "partitioned_table".to_string(), + temp_dir, + ) + } + + /// Helper to check if a plan contains a SortExec node + fn plan_contains_sort(plan: &Arc) -> bool { + if plan.name() == "SortExec" { + return true; + } + for child in plan.children() { + if plan_contains_sort(child) { + return true; + } + } + false + } + + #[tokio::test] + async fn test_insert_plan_fanout_enabled_no_sort() { + use datafusion::datasource::TableProvider; + use datafusion::logical_expr::dml::InsertOp; + use datafusion::physical_plan::empty::EmptyExec; + + // When fanout is enabled (default), no sort node should be added + let (catalog, namespace, table_name, _temp_dir) = + get_partitioned_test_catalog_and_table(Some(true)).await; + + let provider = + IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + let input_schema = provider.schema(); + let input = Arc::new(EmptyExec::new(input_schema)) as Arc; + + let state = ctx.state(); + let insert_plan = provider + .insert_into(&state, input, InsertOp::Append) + .await + .unwrap(); + + // With fanout enabled, there should be no SortExec in the plan + assert!( + !plan_contains_sort(&insert_plan), + "Plan should NOT contain SortExec when fanout is enabled" + ); + } + + #[tokio::test] + async fn test_insert_plan_fanout_disabled_has_sort() { + use datafusion::datasource::TableProvider; + use datafusion::logical_expr::dml::InsertOp; + use datafusion::physical_plan::empty::EmptyExec; + + // When fanout is disabled, a sort node should be added + let (catalog, namespace, table_name, _temp_dir) = + get_partitioned_test_catalog_and_table(Some(false)).await; + + let provider = + IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + let input_schema = provider.schema(); + let input = Arc::new(EmptyExec::new(input_schema)) as Arc; + + let state = ctx.state(); + let insert_plan = provider + .insert_into(&state, input, InsertOp::Append) + .await + .unwrap(); + + // With fanout disabled, there should be a SortExec in the plan + assert!( + plan_contains_sort(&insert_plan), + "Plan should contain SortExec when fanout is disabled" + ); + } } From 7c87a104692188ad2e7a8105f49ddc03bd5fe440 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 6 Jan 2026 12:03:16 -0800 Subject: [PATCH 3/3] minor --- crates/integrations/datafusion/src/physical_plan/sort.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/sort.rs b/crates/integrations/datafusion/src/physical_plan/sort.rs index 4a7af22e28..587ab120ca 100644 --- a/crates/integrations/datafusion/src/physical_plan/sort.rs +++ b/crates/integrations/datafusion/src/physical_plan/sort.rs @@ -42,7 +42,6 @@ use iceberg::arrow::PROJECTED_PARTITION_VALUE_COLUMN; /// # Returns /// * `Ok(Arc)` - A SortExec that sorts by partition values /// * `Err` - If the partition column is not found -#[allow(dead_code)] pub(crate) fn sort_by_partition(input: Arc) -> DFResult> { let schema = input.schema();