Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,15 @@ config_namespace! {
/// # Default
/// `false` — ANSI SQL mode is disabled by default.
pub enable_ansi_mode: bool, default = false

/// When set to true, all expressions in Substrait plans will be aliased with UUIDs
/// during conversion to avoid ambiguous column references. This ensures unique names
/// for all expressions, not just literals. This can help prevent naming conflicts
/// when the same expression appears multiple times in different relations.
///
/// # Default
/// `false` — UUID aliasing is disabled by default.
pub substrait_alias_all_expressions: bool, default = false
}
}

Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_spill_reservation_bytes 10485760
datafusion.execution.spill_compression uncompressed
datafusion.execution.split_file_groups_by_statistics false
datafusion.execution.substrait_alias_all_expressions false
datafusion.execution.target_partitions 7
datafusion.execution.time_zone NULL
datafusion.execution.use_row_number_estimates_to_optimize_partitioning false
Expand Down Expand Up @@ -397,6 +398,7 @@ datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below w
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
datafusion.execution.spill_compression uncompressed Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed.
datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental
datafusion.execution.substrait_alias_all_expressions false When set to true, all expressions in Substrait plans will be aliased with UUIDs during conversion to avoid ambiguous column references. This ensures unique names for all expressions, not just literals. This can help prevent naming conflicts when the same expression appears multiple times in different relations. # Default `false` — UUID aliasing is disabled by default.
datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system
datafusion.execution.time_zone NULL The default time zone Some functions, e.g. `now` return timestamps in this time zone
datafusion.execution.use_row_number_estimates_to_optimize_partitioning false Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future.
Expand Down
1 change: 1 addition & 0 deletions datafusion/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ substrait = { version = "0.62", features = ["serde"] }
url = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
uuid = { version = "1.17.0", features = ["v4"] }
regex = "1.12.2"

[dev-dependencies]
datafusion = { workspace = true, features = ["nested_expressions", "unicode_expressions"] }
Expand Down
39 changes: 27 additions & 12 deletions datafusion/substrait/src/logical_plan/consumer/rel/project_rel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,33 @@ pub async fn from_project_rel(
// to transform it into a column reference
window_exprs.insert(e.clone());
}
// Substrait plans are ordinal based, so they do not provide names for columns.
// Names for columns are generated by Datafusion during conversion, and for literals
// Datafusion produces names based on the literal value. It is possible to construct
// valid Substrait plans that result in duplicated names if the same literal value is
// used in multiple relations. To avoid this issue, we alias literals with unique names.
// The name tracker will ensure that two literals in the same project would have
// unique names but, it does not ensure that if a literal column exists in a previous
// project say before a join that it is deduplicated with respect to those columns.
// See: https://github.com/apache/datafusion/pull/17299
let maybe_apply_alias = match e {
lit @ Expr::Literal(_, _) => lit.alias(uuid::Uuid::new_v4().to_string()),
_ => e,
// Check if we should alias all expressions with UUIDs
let maybe_apply_alias = if consumer.alias_all_expressions() {
// When the flag is set, alias all expressions with UUIDs to avoid any potential
// ambiguous column references across the entire plan
// However, don't double-alias expressions that are already aliased
match &e {
Expr::Alias(_) => e, // Already aliased, don't alias again
Expr::Column(_) => e,
_ => e.alias(uuid::Uuid::new_v4().to_string()),
}
} else {
// Original behavior: only alias literals
// Substrait plans are ordinal based, so they do not provide names for columns.
// Names for columns are generated by Datafusion during conversion, and for literals
// Datafusion produces names based on the literal value. It is possible to construct
// valid Substrait plans that result in duplicated names if the same literal value is
// used in multiple relations. To avoid this issue, we alias literals with unique names.
// The name tracker will ensure that two literals in the same project would have
// unique names but, it does not ensure that if a literal column exists in a previous
// project say before a join that it is deduplicated with respect to those columns.
// See: https://github.com/apache/datafusion/pull/17299
match e {
lit @ Expr::Literal(_, _) => {
lit.alias(uuid::Uuid::new_v4().to_string())
}
_ => e,
}
};
explicit_exprs.push(name_tracker.get_uniquely_named_expr(maybe_apply_alias)?);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
fn get_extensions(&self) -> &Extensions;
fn get_function_registry(&self) -> &impl FunctionRegistry;

/// Get a flag indicating whether all expressions should be aliased with UUIDs
/// during Substrait conversion
fn alias_all_expressions(&self) -> bool {
false
}

// Relation Methods
// There is one method per Substrait relation to allow for easy overriding of consumer behaviour.
// These methods have default implementations calling the common handler code, to allow for users
Expand Down Expand Up @@ -465,6 +471,14 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> {
self.state
}

fn alias_all_expressions(&self) -> bool {
self.state
.config()
.options()
.execution
.substrait_alias_all_expressions
}

async fn consume_extension_leaf(
&self,
rel: &ExtensionLeafRel,
Expand Down
86 changes: 85 additions & 1 deletion datafusion/substrait/tests/cases/logical_plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ mod tests {
use crate::utils::test::{add_plan_schemas_to_ctx, read_json};
use datafusion::common::Result;
use datafusion::dataframe::DataFrame;
use datafusion::prelude::SessionContext;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use insta::assert_snapshot;
use regex::Regex;

#[tokio::test]
async fn scalar_function_compound_signature() -> Result<()> {
Expand Down Expand Up @@ -185,6 +186,89 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn alias_all_expressions_flag() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to take ideas on the best way to write this test but I wanted to show that the UUIDs actually light up correctly.

// Test that when the substrait_alias_all_expressions flag is set,
// all expressions (not just literals) are aliased with UUIDs

// Use the same test plan that can demonstrate the behavior
let proto_plan =
read_json("tests/testdata/test_plans/disambiguate_literals_with_same_name.substrait.json");

// Create a context with the flag enabled
let mut config = SessionConfig::new();
config
.options_mut()
.execution
.substrait_alias_all_expressions = true;
let ctx = add_plan_schemas_to_ctx(
SessionContext::new_with_config(config),
&proto_plan,
)?;

let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

// Convert the plan to string to analyze the UUIDs
let plan_str = format!("{plan}");

// Extract UUIDs and map them to labels for better readability
let uuid_regex =
Regex::new(r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
.unwrap();
let mut uuid_map = std::collections::HashMap::new();
let mut uuid_counter = 1;

// First pass: collect all unique UUIDs
for cap in uuid_regex.captures_iter(&plan_str) {
let uuid = cap.get(0).unwrap().as_str();
if !uuid_map.contains_key(uuid) {
uuid_map.insert(uuid.to_string(), format!("[UUID{uuid_counter}]"));
uuid_counter += 1;
}
}

// Second pass: replace UUIDs with their labels
let mut labeled_plan = plan_str.clone();
for (uuid, label) in &uuid_map {
labeled_plan = labeled_plan.replace(uuid, label);
}

// Verify that the plan has the expected structure with consistent UUID references
// The same UUID should be used when referencing the same column across the plan
assert!(
labeled_plan.contains("AS [UUID"),
"Plan should contain UUID aliases"
);

// Snapshot test showing the actual UUID numbers to demonstrate correspondence
// This shows how the same UUID appears in multiple places when referencing the same expression
// For example, [UUID1] appears in:
// - The top projection (referencing the column from the union)
// - The join condition (left side)
// - The first union projection (A.A AS [UUID1])
// This proves that all references to column A.A use the same UUID
// Note: Expressions already aliased are not double-aliased
assert_snapshot!(
labeled_plan,
@r#"
Projection: left.A, left.[UUID1] AS C, right.D, Utf8(NULL) AS [UUID2] AS E
Left Join: left.A = right.A
SubqueryAlias: left
Union
Projection: A.A, Utf8(NULL) AS [UUID1]
TableScan: A
Projection: B.A, CAST(B.C AS Utf8) AS [UUID3]
TableScan: B
SubqueryAlias: right
TableScan: C
"#
);
// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;

Ok(())
}

#[tokio::test]
async fn non_nullable_lists() -> Result<()> {
// DataFusion's Substrait consumer treats all lists as nullable, even if the Substrait plan specifies them as non-nullable.
Expand Down
Loading