diff --git a/Cargo.lock b/Cargo.lock index 6d54d234e023d..f90e929df2d1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2746,6 +2746,7 @@ dependencies = [ "object_store", "pbjson-types", "prost", + "regex", "serde_json", "substrait", "tokio", diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 39cf7a9855de4..50b418b30c8f0 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -649,6 +649,15 @@ config_namespace! { /// # Default /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false + + /// When set to true, all expressions in Substrait plans will be aliased with UUIDs + /// during conversion to avoid ambiguous column references. This ensures unique names + /// for all expressions, not just literals. This can help prevent naming conflicts + /// when the same expression appears multiple times in different relations. + /// + /// # Default + /// `false` — UUID aliasing is disabled by default. + pub substrait_alias_all_expressions: bool, default = false } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 5e478de0416ce..dba4f790a4faa 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -269,6 +269,7 @@ datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 datafusion.execution.spill_compression uncompressed datafusion.execution.split_file_groups_by_statistics false +datafusion.execution.substrait_alias_all_expressions false datafusion.execution.target_partitions 7 datafusion.execution.time_zone NULL datafusion.execution.use_row_number_estimates_to_optimize_partitioning false @@ -397,6 +398,7 @@ datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below w datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). datafusion.execution.spill_compression uncompressed Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental +datafusion.execution.substrait_alias_all_expressions false When set to true, all expressions in Substrait plans will be aliased with UUIDs during conversion to avoid ambiguous column references. This ensures unique names for all expressions, not just literals. This can help prevent naming conflicts when the same expression appears multiple times in different relations. # Default `false` — UUID aliasing is disabled by default. datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system datafusion.execution.time_zone NULL The default time zone Some functions, e.g. `now` return timestamps in this time zone datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index 558a2a5586189..02515cd0ad985 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -47,6 +47,7 @@ substrait = { version = "0.62", features = ["serde"] } url = { workspace = true } tokio = { workspace = true, features = ["fs"] } uuid = { version = "1.17.0", features = ["v4"] } +regex = "1.12.2" [dev-dependencies] datafusion = { workspace = true, features = ["nested_expressions", "unicode_expressions"] } diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/project_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/project_rel.rs index 239073108ce50..1a1e30413fb38 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/project_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/project_rel.rs @@ -62,18 +62,33 @@ pub async fn from_project_rel( // to transform it into a column reference window_exprs.insert(e.clone()); } - // Substrait plans are ordinal based, so they do not provide names for columns. - // Names for columns are generated by Datafusion during conversion, and for literals - // Datafusion produces names based on the literal value. It is possible to construct - // valid Substrait plans that result in duplicated names if the same literal value is - // used in multiple relations. To avoid this issue, we alias literals with unique names. - // The name tracker will ensure that two literals in the same project would have - // unique names but, it does not ensure that if a literal column exists in a previous - // project say before a join that it is deduplicated with respect to those columns. - // See: https://github.com/apache/datafusion/pull/17299 - let maybe_apply_alias = match e { - lit @ Expr::Literal(_, _) => lit.alias(uuid::Uuid::new_v4().to_string()), - _ => e, + // Check if we should alias all expressions with UUIDs + let maybe_apply_alias = if consumer.alias_all_expressions() { + // When the flag is set, alias all expressions with UUIDs to avoid any potential + // ambiguous column references across the entire plan + // However, don't double-alias expressions that are already aliased + match &e { + Expr::Alias(_) => e, // Already aliased, don't alias again + Expr::Column(_) => e, + _ => e.alias(uuid::Uuid::new_v4().to_string()), + } + } else { + // Original behavior: only alias literals + // Substrait plans are ordinal based, so they do not provide names for columns. + // Names for columns are generated by Datafusion during conversion, and for literals + // Datafusion produces names based on the literal value. It is possible to construct + // valid Substrait plans that result in duplicated names if the same literal value is + // used in multiple relations. To avoid this issue, we alias literals with unique names. + // The name tracker will ensure that two literals in the same project would have + // unique names but, it does not ensure that if a literal column exists in a previous + // project say before a join that it is deduplicated with respect to those columns. + // See: https://github.com/apache/datafusion/pull/17299 + match e { + lit @ Expr::Literal(_, _) => { + lit.alias(uuid::Uuid::new_v4().to_string()) + } + _ => e, + } }; explicit_exprs.push(name_tracker.get_uniquely_named_expr(maybe_apply_alias)?); } diff --git a/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs b/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs index c734b9eb7a541..a5503036ecd53 100644 --- a/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs @@ -164,6 +164,12 @@ pub trait SubstraitConsumer: Send + Sync + Sized { fn get_extensions(&self) -> &Extensions; fn get_function_registry(&self) -> &impl FunctionRegistry; + /// Get a flag indicating whether all expressions should be aliased with UUIDs + /// during Substrait conversion + fn alias_all_expressions(&self) -> bool { + false + } + // Relation Methods // There is one method per Substrait relation to allow for easy overriding of consumer behaviour. // These methods have default implementations calling the common handler code, to allow for users @@ -465,6 +471,14 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> { self.state } + fn alias_all_expressions(&self) -> bool { + self.state + .config() + .options() + .execution + .substrait_alias_all_expressions + } + async fn consume_extension_leaf( &self, rel: &ExtensionLeafRel, diff --git a/datafusion/substrait/tests/cases/logical_plans.rs b/datafusion/substrait/tests/cases/logical_plans.rs index 426f3c12e5a15..6d911256c03ae 100644 --- a/datafusion/substrait/tests/cases/logical_plans.rs +++ b/datafusion/substrait/tests/cases/logical_plans.rs @@ -22,9 +22,10 @@ mod tests { use crate::utils::test::{add_plan_schemas_to_ctx, read_json}; use datafusion::common::Result; use datafusion::dataframe::DataFrame; - use datafusion::prelude::SessionContext; + use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_substrait::logical_plan::consumer::from_substrait_plan; use insta::assert_snapshot; + use regex::Regex; #[tokio::test] async fn scalar_function_compound_signature() -> Result<()> { @@ -185,6 +186,89 @@ mod tests { Ok(()) } + #[tokio::test] + async fn alias_all_expressions_flag() -> Result<()> { + // Test that when the substrait_alias_all_expressions flag is set, + // all expressions (not just literals) are aliased with UUIDs + + // Use the same test plan that can demonstrate the behavior + let proto_plan = + read_json("tests/testdata/test_plans/disambiguate_literals_with_same_name.substrait.json"); + + // Create a context with the flag enabled + let mut config = SessionConfig::new(); + config + .options_mut() + .execution + .substrait_alias_all_expressions = true; + let ctx = add_plan_schemas_to_ctx( + SessionContext::new_with_config(config), + &proto_plan, + )?; + + let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?; + + // Convert the plan to string to analyze the UUIDs + let plan_str = format!("{plan}"); + + // Extract UUIDs and map them to labels for better readability + let uuid_regex = + Regex::new(r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}") + .unwrap(); + let mut uuid_map = std::collections::HashMap::new(); + let mut uuid_counter = 1; + + // First pass: collect all unique UUIDs + for cap in uuid_regex.captures_iter(&plan_str) { + let uuid = cap.get(0).unwrap().as_str(); + if !uuid_map.contains_key(uuid) { + uuid_map.insert(uuid.to_string(), format!("[UUID{uuid_counter}]")); + uuid_counter += 1; + } + } + + // Second pass: replace UUIDs with their labels + let mut labeled_plan = plan_str.clone(); + for (uuid, label) in &uuid_map { + labeled_plan = labeled_plan.replace(uuid, label); + } + + // Verify that the plan has the expected structure with consistent UUID references + // The same UUID should be used when referencing the same column across the plan + assert!( + labeled_plan.contains("AS [UUID"), + "Plan should contain UUID aliases" + ); + + // Snapshot test showing the actual UUID numbers to demonstrate correspondence + // This shows how the same UUID appears in multiple places when referencing the same expression + // For example, [UUID1] appears in: + // - The top projection (referencing the column from the union) + // - The join condition (left side) + // - The first union projection (A.A AS [UUID1]) + // This proves that all references to column A.A use the same UUID + // Note: Expressions already aliased are not double-aliased + assert_snapshot!( + labeled_plan, + @r#" + Projection: left.A, left.[UUID1] AS C, right.D, Utf8(NULL) AS [UUID2] AS E + Left Join: left.A = right.A + SubqueryAlias: left + Union + Projection: A.A, Utf8(NULL) AS [UUID1] + TableScan: A + Projection: B.A, CAST(B.C AS Utf8) AS [UUID3] + TableScan: B + SubqueryAlias: right + TableScan: C + "# + ); + // Trigger execution to ensure plan validity + DataFrame::new(ctx.state(), plan).show().await?; + + Ok(()) + } + #[tokio::test] async fn non_nullable_lists() -> Result<()> { // DataFusion's Substrait consumer treats all lists as nullable, even if the Substrait plan specifies them as non-nullable. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 77d6ff8be97ed..d6e00e52a1337 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -131,6 +131,7 @@ The following configuration settings are available: | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | | datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. | +| datafusion.execution.substrait_alias_all_expressions | false | When set to true, all expressions in Substrait plans will be aliased with UUIDs during conversion to avoid ambiguous column references. This ensures unique names for all expressions, not just literals. This can help prevent naming conflicts when the same expression appears multiple times in different relations. # Default `false` — UUID aliasing is disabled by default | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |