Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions crates/integrations/datafusion/src/physical_plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ use iceberg::arrow::PROJECTED_PARTITION_VALUE_COLUMN;
/// # Returns
/// * `Ok(Arc<dyn ExecutionPlan>)` - A SortExec that sorts by partition values
/// * `Err` - If the partition column is not found
///
/// TODO remove dead_code mark when integrating with insert_into
#[allow(dead_code)]
pub(crate) fn sort_by_partition(input: Arc<dyn ExecutionPlan>) -> DFResult<Arc<dyn ExecutionPlan>> {
let schema = input.schema();

Expand Down
178 changes: 177 additions & 1 deletion crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::inspect::MetadataTableType;
use iceberg::spec::TableProperties;
use iceberg::table::Table;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
use metadata_table::IcebergMetadataTableProvider;
Expand All @@ -53,6 +54,7 @@ use crate::physical_plan::commit::IcebergCommitExec;
use crate::physical_plan::project::project_with_partition;
use crate::physical_plan::repartition::repartition;
use crate::physical_plan::scan::IcebergTableScan;
use crate::physical_plan::sort::sort_by_partition;
use crate::physical_plan::write::IcebergWriteExec;

/// Catalog-backed table provider with automatic metadata refresh.
Expand Down Expand Up @@ -185,9 +187,38 @@ impl TableProvider for IcebergTableProvider {
let repartitioned_plan =
repartition(plan_with_partition, table.metadata_ref(), target_partitions)?;

// Apply sort node when it's not fanout mode
let fanout_enabled = table
.metadata()
.properties()
.get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
.map(|value| {
value
.parse::<bool>()
.map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid value for {}, expected 'true' or 'false'",
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
),
)
.with_source(e)
})
.map_err(to_datafusion_error)
})
.transpose()?
.unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);

let write_input = if fanout_enabled {
repartitioned_plan
} else {
sort_by_partition(repartitioned_plan)?
};

let write_plan = Arc::new(IcebergWriteExec::new(
table.clone(),
repartitioned_plan,
write_input,
self.schema.clone(),
));

Expand Down Expand Up @@ -321,6 +352,7 @@ mod tests {
use std::sync::Arc;

use datafusion::common::Column;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use iceberg::io::FileIO;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
Expand Down Expand Up @@ -598,4 +630,148 @@ mod tests {
assert_eq!(logical_field.data_type(), physical_field.data_type());
}
}

async fn get_partitioned_test_catalog_and_table(
fanout_enabled: Option<bool>,
) -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
use iceberg::spec::{Transform, UnboundPartitionSpec};

let temp_dir = TempDir::new().unwrap();
let warehouse_path = temp_dir.path().to_str().unwrap().to_string();

let catalog = MemoryCatalogBuilder::default()
.load(
"memory",
HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]),
)
.await
.unwrap();

let namespace = NamespaceIdent::new("test_ns".to_string());
catalog
.create_namespace(&namespace, HashMap::new())
.await
.unwrap();

let schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap();

let partition_spec = UnboundPartitionSpec::builder()
.with_spec_id(0)
.add_partition_field(2, "category", Transform::Identity)
.unwrap()
.build();

let mut properties = HashMap::new();
if let Some(enabled) = fanout_enabled {
properties.insert(
iceberg::spec::TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
.to_string(),
enabled.to_string(),
);
}

let table_creation = TableCreation::builder()
.name("partitioned_table".to_string())
.location(format!("{warehouse_path}/partitioned_table"))
.schema(schema)
.partition_spec(partition_spec)
.properties(properties)
.build();

catalog
.create_table(&namespace, table_creation)
.await
.unwrap();

(
Arc::new(catalog),
namespace,
"partitioned_table".to_string(),
temp_dir,
)
}

/// Helper to check if a plan contains a SortExec node
fn plan_contains_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
if plan.name() == "SortExec" {
return true;
}
for child in plan.children() {
if plan_contains_sort(child) {
return true;
}
}
false
}

#[tokio::test]
async fn test_insert_plan_fanout_enabled_no_sort() {
use datafusion::datasource::TableProvider;
use datafusion::logical_expr::dml::InsertOp;
use datafusion::physical_plan::empty::EmptyExec;

// When fanout is enabled (default), no sort node should be added
let (catalog, namespace, table_name, _temp_dir) =
get_partitioned_test_catalog_and_table(Some(true)).await;

let provider =
IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
.await
.unwrap();

let ctx = SessionContext::new();
let input_schema = provider.schema();
let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;

let state = ctx.state();
let insert_plan = provider
.insert_into(&state, input, InsertOp::Append)
.await
.unwrap();

// With fanout enabled, there should be no SortExec in the plan
assert!(
!plan_contains_sort(&insert_plan),
"Plan should NOT contain SortExec when fanout is enabled"
);
}

#[tokio::test]
async fn test_insert_plan_fanout_disabled_has_sort() {
use datafusion::datasource::TableProvider;
use datafusion::logical_expr::dml::InsertOp;
use datafusion::physical_plan::empty::EmptyExec;

// When fanout is disabled, a sort node should be added
let (catalog, namespace, table_name, _temp_dir) =
get_partitioned_test_catalog_and_table(Some(false)).await;

let provider =
IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
.await
.unwrap();

let ctx = SessionContext::new();
let input_schema = provider.schema();
let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn ExecutionPlan>;

let state = ctx.state();
let insert_plan = provider
.insert_into(&state, input, InsertOp::Append)
.await
.unwrap();

// With fanout disabled, there should be a SortExec in the plan
assert!(
plan_contains_sort(&insert_plan),
"Plan should contain SortExec when fanout is disabled"
);
}
}
Loading