diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 04aaf5a890a8..d4626134acbf 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -263,7 +263,7 @@ impl DataFrame { self.unnest_columns_with_options(&[column], options) } - /// Expand multiple list columns into a set of rows. + /// Expand multiple list/struct columns into a set of rows and new columns. /// /// See also: /// @@ -277,8 +277,8 @@ impl DataFrame { /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); - /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; - /// let df = df.unnest_columns(&["a", "b"])?; + /// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?; + /// let df = df.unnest_columns(&["b","c","d"])?; /// # Ok(()) /// # } /// ``` diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 090b1d59d9a0..bc5818361b7d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -49,7 +49,7 @@ use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGro use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::empty::EmptyExec; use crate::physical_plan::explain::ExplainExec; -use crate::physical_plan::expressions::{Column, PhysicalSortExpr}; +use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::joins::utils as join_utils; use crate::physical_plan::joins::{ @@ -1112,24 +1112,18 @@ impl DefaultPhysicalPlanner { Arc::new(GlobalLimitExec::new(input, *skip, *fetch)) } LogicalPlan::Unnest(Unnest { - columns, + list_type_columns, + struct_type_columns, schema, options, .. }) => { let input = children.one()?; - let column_execs = columns - .iter() - .map(|column| { - schema - .index_of_column(column) - .map(|idx| Column::new(&column.name, idx)) - }) - .collect::>()?; let schema = SchemaRef::new(schema.as_ref().to_owned().into()); Arc::new(UnnestExec::new( input, - column_execs, + list_type_columns.clone(), + struct_type_columns.clone(), schema, options.clone(), )) diff --git a/datafusion/core/tests/data/unnest.json b/datafusion/core/tests/data/unnest.json new file mode 100644 index 000000000000..5999171c2886 --- /dev/null +++ b/datafusion/core/tests/data/unnest.json @@ -0,0 +1,2 @@ +{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true],"d":{"e":1,"f":2}} +{"a":2, "b":[3.0, 2.3, -7.1], "c":[false, true]} diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 009f45b28057..9b7cb85614c2 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1231,11 +1231,11 @@ async fn unnest_aggregate_columns() -> Result<()> { .collect() .await?; let expected = [ - r#"+--------------------+"#, - r#"| COUNT(shapes.tags) |"#, - r#"+--------------------+"#, - r#"| 9 |"#, - r#"+--------------------+"#, + r#"+-------------+"#, + r#"| COUNT(tags) |"#, + r#"+-------------+"#, + r#"| 9 |"#, + r#"+-------------+"#, ]; assert_batches_sorted_eq!(expected, &results); @@ -1384,7 +1384,7 @@ async fn unnest_with_redundant_columns() -> Result<()> { let optimized_plan = df.clone().into_optimized_plan()?; let expected = vec![ "Projection: shapes.shape_id [shape_id:UInt32]", - " Unnest: shape_id2 [shape_id:UInt32, shape_id2:UInt32;N]", + " Unnest: lists[shape_id2] structs[] [shape_id:UInt32, shape_id2:UInt32;N]", " Aggregate: groupBy=[[shapes.shape_id]], aggr=[[ARRAY_AGG(shapes.shape_id) AS shape_id2]] [shape_id:UInt32, shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} });N]", " TableScan: shapes projection=[shape_id] [shape_id:UInt32]", ]; diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 64763a973687..2a2bb75f1884 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -19,7 +19,7 @@ use crate::expr::{ AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery, - Placeholder, TryCast, + Placeholder, TryCast, Unnest, }; use crate::function::{ AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory, @@ -489,6 +489,13 @@ pub fn when(when: Expr, then: Expr) -> CaseBuilder { CaseBuilder::new(None, vec![when], vec![then], None) } +/// Create a Unnest expression +pub fn unnest(expr: Expr) -> Expr { + Expr::Unnest(Unnest { + expr: Box::new(expr), + }) +} + /// Convenience method to create a new user defined scalar function (UDF) with a /// specific signature and specific return type. /// diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 8b7f30d245b4..01c9edff306e 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -121,7 +121,7 @@ impl ExprSchemable for Expr { Ok(field.data_type().clone()) } DataType::Struct(_) => { - not_impl_err!("unnest() does not support struct yet") + Ok(arg_data_type) } DataType::Null => { not_impl_err!("unnest() does not support null yet") diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 3b1b1196f1c0..8483525d7f55 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -51,9 +51,9 @@ use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion_common::config::FormatOptions; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::{ - get_target_functional_dependencies, not_impl_err, plan_datafusion_err, plan_err, - Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, TableReference, - ToDFSchema, UnnestOptions, + get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, + plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, + TableReference, ToDFSchema, UnnestOptions, }; /// Default table name for unnamed table @@ -1592,7 +1592,53 @@ impl TableSource for LogicalTableSource { /// Create a [`LogicalPlan::Unnest`] plan pub fn unnest(input: LogicalPlan, columns: Vec) -> Result { - unnest_with_options(input, columns, UnnestOptions::new()) + unnest_with_options(input, columns, UnnestOptions::default()) +} + +// Based on data type, either struct or a variant of list +// return a set of columns as the result of unnesting +// the input columns. +// For example, given a column with name "a", +// - List(Element) returns ["a"] with data type Element +// - Struct(field1, field2) returns ["a.field1","a.field2"] +pub fn get_unnested_columns( + col_name: &String, + data_type: &DataType, +) -> Result)>> { + let mut qualified_columns = Vec::with_capacity(1); + + match data_type { + DataType::List(field) + | DataType::FixedSizeList(field, _) + | DataType::LargeList(field) => { + let new_field = Arc::new(Field::new( + col_name.clone(), + field.data_type().clone(), + // Unnesting may produce NULLs even if the list is not null. + // For example: unnset([1], []) -> 1, null + true, + )); + let column = Column::from_name(col_name); + // let column = Column::from((None, &new_field)); + qualified_columns.push((column, new_field)); + } + DataType::Struct(fields) => { + qualified_columns.extend(fields.iter().map(|f| { + let new_name = format!("{}.{}", col_name, f.name()); + let column = Column::from_name(&new_name); + let new_field = f.as_ref().clone().with_name(new_name); + // let column = Column::from((None, &f)); + (column, Arc::new(new_field)) + })) + } + _ => { + return internal_err!( + "trying to unnest on invalid data type {:?}", + data_type + ); + } + }; + Ok(qualified_columns) } /// Create a [`LogicalPlan::Unnest`] plan with options @@ -1601,41 +1647,59 @@ pub fn unnest_with_options( columns: Vec, options: UnnestOptions, ) -> Result { - // Extract the type of the nested field in the list. - let mut unnested_fields: HashMap = HashMap::with_capacity(columns.len()); - // Add qualifiers to the columns. - let mut qualified_columns = Vec::with_capacity(columns.len()); - for c in &columns { - let index = input.schema().index_of_column(c)?; - let (unnest_qualifier, unnest_field) = input.schema().qualified_field(index); - let unnested_field = match unnest_field.data_type() { - DataType::List(field) - | DataType::FixedSizeList(field, _) - | DataType::LargeList(field) => Arc::new(Field::new( - unnest_field.name(), - field.data_type().clone(), - // Unnesting may produce NULLs even if the list is not null. - // For example: unnset([1], []) -> 1, null - true, - )), - _ => { - // If the unnest field is not a list type return the input plan. - return Ok(input); - } - }; - qualified_columns.push(Column::from((unnest_qualifier, &unnested_field))); - unnested_fields.insert(index, unnested_field); - } + let mut list_columns = Vec::with_capacity(columns.len()); + let mut struct_columns = Vec::with_capacity(columns.len()); + let column_by_original_index = columns + .iter() + .map(|c| Ok((input.schema().index_of_column(c)?, c))) + .collect::>>()?; - // Update the schema with the unnest column types changed to contain the nested types. let input_schema = input.schema(); + + let mut dependency_indices = vec![]; + // Transform input schema into new schema + // e.g int, unnest([]int), unnest(struct(varchar,varchar)) + // becomes int, int, varchar, varchar let fields = input_schema .iter() .enumerate() - .map(|(index, (q, f))| match unnested_fields.get(&index) { - Some(unnested_field) => (q.cloned(), unnested_field.clone()), - None => (q.cloned(), f.clone()), + .map(|(index, (original_qualifier, original_field))| { + match column_by_original_index.get(&index) { + Some(&column_to_unnest) => { + let flatten_columns = get_unnested_columns( + &column_to_unnest.name, + original_field.data_type(), + )?; + match original_field.data_type() { + DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) => list_columns.push(index), + DataType::Struct(_) => struct_columns.push(index), + _ => { + panic!( + "not reachable, should be caught by get_unnested_columns" + ) + } + } + // new columns dependent on the same original index + dependency_indices + .extend(std::iter::repeat(index).take(flatten_columns.len())); + Ok(flatten_columns + .iter() + .map(|col: &(Column, Arc)| { + (col.0.relation.to_owned(), col.1.to_owned()) + }) + .collect()) + } + None => { + dependency_indices.push(index); + Ok(vec![(original_qualifier.cloned(), original_field.clone())]) + } + } }) + .collect::>>()? + .into_iter() + .flatten() .collect::>(); let metadata = input_schema.metadata().clone(); @@ -1643,9 +1707,13 @@ pub fn unnest_with_options( // We can use the existing functional dependencies: let deps = input_schema.functional_dependencies().clone(); let schema = Arc::new(df_schema.with_functional_dependencies(deps)?); + Ok(LogicalPlan::Unnest(Unnest { input: Arc::new(input), - columns: qualified_columns, + exec_columns: columns, + list_type_columns: list_columns, + struct_type_columns: struct_columns, + dependency_indices, schema, options, })) @@ -2074,13 +2142,13 @@ mod tests { #[test] fn plan_builder_unnest() -> Result<()> { - // Unnesting a simple column should return the child plan. - let plan = nested_table_scan("test_table")? - .unnest_column("scalar")? - .build()?; - - let expected = "TableScan: test_table"; - assert_eq!(expected, format!("{plan:?}")); + // Cannot unnest on a scalar column + let err = nested_table_scan("test_table")? + .unnest_column("scalar") + .unwrap_err(); + assert!(err + .to_string() + .starts_with("Internal error: trying to unnest on invalid data type UInt32")); // Unnesting the strings list. let plan = nested_table_scan("test_table")? @@ -2088,36 +2156,65 @@ mod tests { .build()?; let expected = "\ - Unnest: test_table.strings\ + Unnest: lists[test_table.strings] structs[]\ \n TableScan: test_table"; assert_eq!(expected, format!("{plan:?}")); // Check unnested field is a scalar - let field = plan - .schema() - .field_with_name(Some(&TableReference::bare("test_table")), "strings") - .unwrap(); + let field = plan.schema().field_with_name(None, "strings").unwrap(); assert_eq!(&DataType::Utf8, field.data_type()); - // Unnesting multiple fields. + // Unnesting the singular struct column result into 2 new columns for each subfield + let plan = nested_table_scan("test_table")? + .unnest_column("struct_singular")? + .build()?; + + let expected = "\ + Unnest: lists[] structs[test_table.struct_singular]\ + \n TableScan: test_table"; + assert_eq!(expected, format!("{plan:?}")); + + for field_name in &["a", "b"] { + // Check unnested struct field is a scalar + let field = plan + .schema() + .field_with_name(None, &format!("struct_singular.{}", field_name)) + .unwrap(); + assert_eq!(&DataType::UInt32, field.data_type()); + } + + // Unnesting multiple fields in separate plans let plan = nested_table_scan("test_table")? .unnest_column("strings")? .unnest_column("structs")? + .unnest_column("struct_singular")? .build()?; let expected = "\ - Unnest: test_table.structs\ - \n Unnest: test_table.strings\ - \n TableScan: test_table"; + Unnest: lists[] structs[test_table.struct_singular]\ + \n Unnest: lists[test_table.structs] structs[]\ + \n Unnest: lists[test_table.strings] structs[]\ + \n TableScan: test_table"; assert_eq!(expected, format!("{plan:?}")); // Check unnested struct list field should be a struct. - let field = plan - .schema() - .field_with_name(Some(&TableReference::bare("test_table")), "structs") - .unwrap(); + let field = plan.schema().field_with_name(None, "structs").unwrap(); assert!(matches!(field.data_type(), DataType::Struct(_))); + // Unnesting multiple fields at the same time + let cols = vec!["strings", "structs", "struct_singular"] + .into_iter() + .map(|c| c.into()) + .collect(); + let plan = nested_table_scan("test_table")? + .unnest_columns_with_options(cols, UnnestOptions::default())? + .build()?; + + let expected = "\ + Unnest: lists[test_table.strings, test_table.structs] structs[test_table.struct_singular]\ + \n TableScan: test_table"; + assert_eq!(expected, format!("{plan:?}")); + // Unnesting missing column should fail. let plan = nested_table_scan("test_table")?.unnest_column("missing"); assert!(plan.is_err()); @@ -2126,8 +2223,9 @@ mod tests { } fn nested_table_scan(table_name: &str) -> Result { - // Create a schema with a scalar field, a list of strings, and a list of structs. - let struct_field = Field::new_struct( + // Create a schema with a scalar field, a list of strings, a list of structs + // and a singular struct + let struct_field_in_list = Field::new_struct( "item", vec![ Field::new("a", DataType::UInt32, false), @@ -2139,7 +2237,15 @@ mod tests { let schema = Schema::new(vec![ Field::new("scalar", DataType::UInt32, false), Field::new_list("strings", string_field, false), - Field::new_list("structs", struct_field, false), + Field::new_list("structs", struct_field_in_list.clone(), false), + Field::new( + "struct_singular", + DataType::Struct(Fields::from(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::UInt32, false), + ])), + false, + ), ]); table_scan(Some(table_name), &schema, None) diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 3a2ed9ffc2d8..f3765fb184bb 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -30,7 +30,7 @@ use crate::dml::CopyTo; use arrow::datatypes::Schema; use datafusion_common::display::GraphvizBuilder; use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor}; -use datafusion_common::DataFusionError; +use datafusion_common::{Column, DataFusionError}; use serde_json::json; /// Formats plans with a single line per node. For example: @@ -638,10 +638,25 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "Node Type": "DescribeTable" }) } - LogicalPlan::Unnest(Unnest { columns, .. }) => { + LogicalPlan::Unnest(Unnest { + input: plan, + list_type_columns: list_col_indices, + struct_type_columns: struct_col_indices, + .. + }) => { + let input_columns = plan.schema().columns(); + let list_type_columns = list_col_indices + .iter() + .map(|i| &input_columns[*i]) + .collect::>(); + let struct_type_columns = struct_col_indices + .iter() + .map(|i| &input_columns[*i]) + .collect::>(); json!({ "Node Type": "Unnest", - "Column": expr_vec_fmt!(columns), + "ListColumn": expr_vec_fmt!(list_type_columns), + "StructColumn": expr_vec_fmt!(struct_type_columns), }) } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 42f3e1f163a7..97592c05abe2 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -667,12 +667,12 @@ impl LogicalPlan { LogicalPlan::DescribeTable(_) => Ok(self), LogicalPlan::Unnest(Unnest { input, - columns, - schema: _, + exec_columns, options, + .. }) => { // Update schema with unnested column type. - unnest_with_options(unwrap_arc(input), columns, options) + unnest_with_options(unwrap_arc(input), exec_columns, options) } } } @@ -1017,11 +1017,15 @@ impl LogicalPlan { } LogicalPlan::DescribeTable(_) => Ok(self.clone()), LogicalPlan::Unnest(Unnest { - columns, options, .. + exec_columns: columns, + options, + .. }) => { // Update schema with unnested column type. let input = inputs.swap_remove(0); - unnest_with_options(input, columns.clone(), options.clone()) + let new_plan = + unnest_with_options(input, columns.clone(), options.clone())?; + Ok(new_plan) } } } @@ -1790,8 +1794,23 @@ impl LogicalPlan { LogicalPlan::DescribeTable(DescribeTable { .. }) => { write!(f, "DescribeTable") } - LogicalPlan::Unnest(Unnest { columns, .. }) => { - write!(f, "Unnest: {}", expr_vec_fmt!(columns)) + LogicalPlan::Unnest(Unnest { + input: plan, + list_type_columns: list_col_indices, + struct_type_columns: struct_col_indices, .. }) => { + let input_columns = plan.schema().columns(); + let list_type_columns = list_col_indices + .iter() + .map(|i| &input_columns[*i]) + .collect::>(); + let struct_type_columns = struct_col_indices + .iter() + .map(|i| &input_columns[*i]) + .collect::>(); + // get items from input_columns indexed by list_col_indices + write!(f, "Unnest: lists[{}] structs[{}]", + expr_vec_fmt!(list_type_columns), + expr_vec_fmt!(struct_type_columns)) } } } @@ -2783,8 +2802,17 @@ pub enum Partitioning { pub struct Unnest { /// The incoming logical plan pub input: Arc, - /// The columns to unnest - pub columns: Vec, + /// Columns to run unnest on, can be a list of (List/Struct) columns + pub exec_columns: Vec, + /// refer to the indices(in the input schema) of columns + /// that have type list to run unnest on + pub list_type_columns: Vec, + /// refer to the indices (in the input schema) of columns + /// that have type struct to run unnest on + pub struct_type_columns: Vec, + /// Having items aligned with the output columns + /// representing which column in the input schema each output column depends on + pub dependency_indices: Vec, /// The output schema, containing the unnested field column. pub schema: DFSchemaRef, /// Options diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index ea1f1c3c85f7..215b2cb4d4cf 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -313,13 +313,19 @@ impl TreeNode for LogicalPlan { } LogicalPlan::Unnest(Unnest { input, - columns, + exec_columns: input_columns, + list_type_columns, + struct_type_columns, + dependency_indices, schema, options, }) => rewrite_arc(input, f)?.update_data(|input| { LogicalPlan::Unnest(Unnest { input, - columns, + exec_columns: input_columns, + dependency_indices, + list_type_columns, + struct_type_columns, schema, options, }) @@ -492,7 +498,9 @@ impl LogicalPlan { LogicalPlan::TableScan(TableScan { filters, .. }) => { filters.iter().apply_until_stop(f) } - LogicalPlan::Unnest(Unnest { columns, .. }) => { + LogicalPlan::Unnest(unnest) => { + let columns = unnest.exec_columns.clone(); + let exprs = columns .iter() .map(|c| Expr::Column(c.clone())) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 49b52aa53ae9..af51814c9686 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -30,6 +30,7 @@ use datafusion_common::{ JoinType, Result, }; use datafusion_expr::expr::Alias; +use datafusion_expr::Unnest; use datafusion_expr::{ logical_plan::LogicalPlan, projection_schema, Aggregate, Distinct, Expr, Projection, TableScan, Window, @@ -289,7 +290,6 @@ fn optimize_projections( LogicalPlan::Sort(_) | LogicalPlan::Filter(_) | LogicalPlan::Repartition(_) - | LogicalPlan::Unnest(_) | LogicalPlan::Union(_) | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Distinct(Distinct::On(_)) => { @@ -399,6 +399,13 @@ fn optimize_projections( "OptimizeProjection: should have handled in the match statement above" ); } + LogicalPlan::Unnest(Unnest { + dependency_indices, .. + }) => { + vec![RequiredIndicies::new_from_indices( + dependency_indices.clone(), + )] + } }; // Required indices are currently ordered (child0, child1, ...) diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 06dd8230d39e..a8151fe0220b 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -23,8 +23,8 @@ use std::{any::Any, sync::Arc}; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; use crate::{ - expressions::Column, DisplayFormatType, Distribution, ExecutionPlan, PhysicalExpr, - RecordBatchStream, SendableRecordBatchStream, + DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream, + SendableRecordBatchStream, }; use arrow::array::{ @@ -36,18 +36,24 @@ use arrow::compute::kernels::zip::zip; use arrow::compute::{cast, is_not_null, kernels, sum}; use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use arrow_array::{Int64Array, Scalar}; +use arrow_array::{Int64Array, Scalar, StructArray}; use arrow_ord::cmp::lt; -use datafusion_common::{exec_datafusion_err, exec_err, Result, UnnestOptions}; +use datafusion_common::{ + exec_datafusion_err, exec_err, internal_err, Result, UnnestOptions, +}; use datafusion_execution::TaskContext; +use datafusion_expr::ColumnarValue; use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; use futures::{Stream, StreamExt}; +use hashbrown::HashSet; use log::trace; -/// Unnest the given columns by joining the row with each value in the -/// nested type. +/// Unnest the given columns (either with type struct or list) +/// For list unnesting, each rows is vertically transformed into multiple rows +/// For struct unnesting, each columns is horizontally transformed into multiple columns, +/// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m') /// /// See [`UnnestOptions`] for more details and an example. #[derive(Debug)] @@ -56,8 +62,10 @@ pub struct UnnestExec { input: Arc, /// The schema once the unnest is applied schema: SchemaRef, - /// The unnest columns - columns: Vec, + /// indices of the list-typed columns in the input schema + list_column_indices: Vec, + /// indices of the struct-typed columns in the input schema + struct_column_indices: Vec, /// Options options: UnnestOptions, /// Execution metrics @@ -70,15 +78,18 @@ impl UnnestExec { /// Create a new [UnnestExec]. pub fn new( input: Arc, - columns: Vec, + list_column_indices: Vec, + struct_column_indices: Vec, schema: SchemaRef, options: UnnestOptions, ) -> Self { let cache = Self::compute_properties(&input, schema.clone()); + UnnestExec { input, schema, - columns, + list_column_indices, + struct_column_indices, options, metrics: Default::default(), cache, @@ -137,7 +148,8 @@ impl ExecutionPlan for UnnestExec { ) -> Result> { Ok(Arc::new(UnnestExec::new( children[0].clone(), - self.columns.clone(), + self.list_column_indices.clone(), + self.struct_column_indices.clone(), self.schema.clone(), self.options.clone(), ))) @@ -158,7 +170,8 @@ impl ExecutionPlan for UnnestExec { Ok(Box::pin(UnnestStream { input, schema: self.schema.clone(), - columns: self.columns.clone(), + list_type_columns: self.list_column_indices.clone(), + struct_column_indices: self.struct_column_indices.iter().copied().collect(), options: self.options.clone(), metrics, })) @@ -214,7 +227,8 @@ struct UnnestStream { /// Unnested schema schema: Arc, /// The unnest columns - columns: Vec, + list_type_columns: Vec, + struct_column_indices: HashSet, /// Options options: UnnestOptions, /// Metrics @@ -251,8 +265,13 @@ impl UnnestStream { .map(|maybe_batch| match maybe_batch { Some(Ok(batch)) => { let timer = self.metrics.elapsed_compute.timer(); - let result = - build_batch(&batch, &self.schema, &self.columns, &self.options); + let result = build_batch( + &batch, + &self.schema, + &self.list_type_columns, + &self.struct_column_indices, + &self.options, + ); self.metrics.input_batches.add(1); self.metrics.input_rows.add(batch.num_rows()); if let Ok(ref batch) = result { @@ -279,48 +298,105 @@ impl UnnestStream { } } -/// For each row in a `RecordBatch`, some list columns need to be unnested. -/// We will expand the values in each list into multiple rows, +/// Given a set of struct column indices to flatten +/// try converting the column in input into multiple subfield columns +/// For example +/// struct_col: [a: struct(item: int, name: string), b: int] +/// with a batch +/// {a: {item: 1, name: "a"}, b: 2}, +/// {a: {item: 3, name: "b"}, b: 4] +/// will be converted into +/// {a.item: 1, a.name: "a", b: 2}, +/// {a.item: 3, a.name: "b", b: 4} +fn flatten_struct_cols( + input_batch: &[Arc], + schema: &SchemaRef, + struct_column_indices: &HashSet, +) -> Result { + // horizontal expansion because of struct unnest + let columns_expanded = input_batch + .iter() + .enumerate() + .map(|(idx, column_data)| match struct_column_indices.get(&idx) { + Some(_) => match column_data.data_type() { + DataType::Struct(_) => { + let struct_arr = + column_data.as_any().downcast_ref::().unwrap(); + Ok(struct_arr.columns().to_vec()) + } + data_type => internal_err!( + "expecting column {} from input plan to be a struct, got {:?}", + idx, + data_type + ), + }, + None => Ok(vec![column_data.clone()]), + }) + .collect::>>()? + .into_iter() + .flatten() + .collect(); + Ok(RecordBatch::try_new(schema.clone(), columns_expanded)?) +} + +/// For each row in a `RecordBatch`, some list/struct columns need to be unnested. +/// - For list columns: We will expand the values in each list into multiple rows, /// taking the longest length among these lists, and shorter lists are padded with NULLs. -// +/// - For struct columns: We will expand the struct columns into multiple subfield columns. /// For columns that don't need to be unnested, repeat their values until reaching the longest length. fn build_batch( batch: &RecordBatch, schema: &SchemaRef, - columns: &[Column], + list_type_columns: &[usize], + struct_column_indices: &HashSet, options: &UnnestOptions, ) -> Result { - let list_arrays: Vec = columns - .iter() - .map(|column| column.evaluate(batch)?.into_array(batch.num_rows())) - .collect::>()?; + let transformed = match list_type_columns.len() { + 0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices), + _ => { + let list_arrays: Vec = list_type_columns + .iter() + .map(|index| { + ColumnarValue::Array(batch.column(*index).clone()) + .into_array(batch.num_rows()) + }) + .collect::>()?; + + let longest_length = find_longest_length(&list_arrays, options)?; + let unnested_length = longest_length.as_primitive::(); + let total_length = if unnested_length.is_empty() { + 0 + } else { + sum(unnested_length).ok_or_else(|| { + exec_datafusion_err!("Failed to calculate the total unnested length") + })? as usize + }; + if total_length == 0 { + return Ok(RecordBatch::new_empty(schema.clone())); + } - let longest_length = find_longest_length(&list_arrays, options)?; - let unnested_length = longest_length.as_primitive::(); - let total_length = if unnested_length.is_empty() { - 0 - } else { - sum(unnested_length).ok_or_else(|| { - exec_datafusion_err!("Failed to calculate the total unnested length") - })? as usize + // Unnest all the list arrays + let unnested_arrays = + unnest_list_arrays(&list_arrays, unnested_length, total_length)?; + let unnested_array_map: HashMap<_, _> = unnested_arrays + .into_iter() + .zip(list_type_columns.iter()) + .map(|(array, column)| (*column, array)) + .collect(); + + // Create the take indices array for other columns + let take_indicies = create_take_indicies(unnested_length, total_length); + + // vertical expansion because of list unnest + let ret = flatten_list_cols_from_indices( + batch, + &unnested_array_map, + &take_indicies, + )?; + flatten_struct_cols(&ret, schema, struct_column_indices) + } }; - if total_length == 0 { - return Ok(RecordBatch::new_empty(schema.clone())); - } - - // Unnest all the list arrays - let unnested_arrays = - unnest_list_arrays(&list_arrays, unnested_length, total_length)?; - let unnested_array_map: HashMap<_, _> = unnested_arrays - .into_iter() - .zip(columns.iter()) - .map(|(array, column)| (column.index(), array)) - .collect(); - - // Create the take indices array for other columns - let take_indicies = create_take_indicies(unnested_length, total_length); - - batch_from_indices(batch, schema, &unnested_array_map, &take_indicies) + transformed } /// Find the longest list length among the given list arrays for each row. @@ -505,7 +581,8 @@ fn unnest_list_array( )?) } -/// Creates take indicies that will be used to expand all columns except for the unnest [`columns`](UnnestExec::columns). +/// Creates take indicies that will be used to expand all columns except for the list type +/// [`columns`](UnnestExec::list_column_indices) that is being unnested. /// Every column value needs to be repeated multiple times according to the length array. /// /// If the length array looks like this: @@ -568,12 +645,11 @@ fn create_take_indicies( /// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd' /// ``` /// -fn batch_from_indices( +fn flatten_list_cols_from_indices( batch: &RecordBatch, - schema: &SchemaRef, unnested_list_arrays: &HashMap, indices: &PrimitiveArray, -) -> Result { +) -> Result>> { let arrays = batch .columns() .iter() @@ -583,8 +659,7 @@ fn batch_from_indices( None => Ok(kernels::take::take(arr, indices, None)?), }) .collect::>>()?; - - Ok(RecordBatch::try_new(schema.clone(), arrays.to_vec())?) + Ok(arrays) } #[cfg(test)] diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 7abe5ecdae9f..1f8492b9ba47 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -358,10 +358,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { match arg.get_type(schema)? { DataType::List(_) | DataType::LargeList(_) - | DataType::FixedSizeList(_, _) => Ok(()), - DataType::Struct(_) => { - not_impl_err!("unnest() does not support struct yet") - } + | DataType::FixedSizeList(_, _) + | DataType::Struct(_) => Ok(()), DataType::Null => { not_impl_err!("unnest() does not support null yet") } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 730e84cd094b..d2cd1bcf3a06 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -20,14 +20,14 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::utils::{ - check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs, - resolve_columns, resolve_positions_to_exprs, + check_columns_satisfy_exprs, extract_aliases, rebase_expr, + recursive_transform_unnest, resolve_aliases_to_exprs, resolve_columns, + resolve_positions_to_exprs, }; -use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; use datafusion_common::{Column, UnnestOptions}; -use datafusion_expr::expr::{Alias, Unnest}; +use datafusion_expr::expr::Alias; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, }; @@ -298,47 +298,29 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { select_exprs: Vec, ) -> Result { let mut unnest_columns = vec![]; + // from which column used for projection, before the unnest happen + // including non unnest column and unnest column let mut inner_projection_exprs = vec![]; - let outer_projection_exprs = select_exprs + // expr returned here maybe different from the originals in inner_projection_exprs + // for example: + // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 + // - unnest(array_col) will be transformed into unnest(array_col).element + // - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1 + let outer_projection_exprs: Vec = select_exprs .into_iter() .map(|expr| { - let Transformed { - data: transformed_expr, - transformed, - tnr: _, - } = expr.transform_up(|expr: Expr| { - if let Expr::Unnest(Unnest { expr: ref arg }) = expr { - let column_name = expr.display_name()?; - unnest_columns.push(column_name.clone()); - // Add alias for the argument expression, to avoid naming conflicts with other expressions - // in the select list. For example: `select unnest(col1), col1 from t`. - inner_projection_exprs - .push(arg.clone().alias(column_name.clone())); - Ok(Transformed::yes(Expr::Column(Column::from_name( - column_name, - )))) - } else { - Ok(Transformed::no(expr)) - } - })?; - - if !transformed { - if matches!(&transformed_expr, Expr::Column(_)) { - inner_projection_exprs.push(transformed_expr.clone()); - Ok(transformed_expr) - } else { - // We need to evaluate the expr in the inner projection, - // outer projection just select its name - let column_name = transformed_expr.display_name()?; - inner_projection_exprs.push(transformed_expr); - Ok(Expr::Column(Column::from_name(column_name))) - } - } else { - Ok(transformed_expr) - } + recursive_transform_unnest( + &input, + &mut unnest_columns, + &mut inner_projection_exprs, + expr, + ) }) - .collect::>>()?; + .collect::>>()? + .into_iter() + .flatten() + .collect(); // Do the final projection if unnest_columns.is_empty() { diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 2c50d3af1f5e..4ae486ef1a59 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -26,9 +26,10 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ exec_err, internal_err, plan_err, Column, DataFusionError, Result, ScalarValue, }; -use datafusion_expr::expr::{Alias, GroupingSet, WindowFunction}; +use datafusion_expr::builder::get_unnested_columns; +use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction}; use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; -use datafusion_expr::{expr_vec_fmt, Expr, LogicalPlan}; +use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan}; use sqlparser::ast::Ident; /// Make a best-effort attempt at resolving all columns in the expression tree @@ -255,3 +256,185 @@ pub(crate) fn normalize_ident(id: Ident) -> String { None => id.value.to_ascii_lowercase(), } } + +/// The context is we want to rewrite unnest() into InnerProjection->Unnest->OuterProjection +/// Given an expression which contains unnest expr as one of its children, +/// Try transform depends on unnest type +/// - For list column: unnest(col) with type list -> unnest(col) with type list::item +/// - For struct column: unnest(struct(field1, field2)) -> unnest(struct).field1, unnest(struct).field2 +/// The transformed exprs will be used in the outer projection +pub(crate) fn recursive_transform_unnest( + input: &LogicalPlan, + unnest_placeholder_columns: &mut Vec, + inner_projection_exprs: &mut Vec, + original_expr: Expr, +) -> Result> { + let mut transform = + |unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result> { + // Full context, we are trying to plan the execution as InnerProjection->Unnest->OuterProjection + // inside unnest execution, each column inside the inner projection + // will be transformed into new columns. Thus we need to keep track of these placeholding column names + let placeholder_name = unnest_expr.display_name()?; + + unnest_placeholder_columns.push(placeholder_name.clone()); + // Add alias for the argument expression, to avoid naming conflicts + // with other expressions in the select list. For example: `select unnest(col1), col1 from t`. + // this extra projection is used to unnest transforming + inner_projection_exprs + .push(expr_in_unnest.clone().alias(placeholder_name.clone())); + let schema = input.schema(); + + let (data_type, _) = expr_in_unnest.data_type_and_nullable(schema)?; + + let outer_projection_columns = + get_unnested_columns(&placeholder_name, &data_type)?; + let expr = outer_projection_columns + .iter() + .map(|col| Expr::Column(col.0.clone())) + .collect::>(); + Ok(expr) + }; + // expr transformed maybe either the same, or different from the originals exprs + // for example: + // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2 + // - unnest(array_col) will be transformed into unnest(array_col) + // - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1 + + // Specifically handle root level unnest expr, this is the only place + // unnest on struct can be handled + if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr { + return transform(&original_expr, arg); + } + let Transformed { + data: transformed_expr, + transformed, + tnr: _, + } = original_expr.transform_up(|expr: Expr| { + if let Expr::Unnest(Unnest { expr: ref arg }) = expr { + let (data_type, _) = expr.data_type_and_nullable(input.schema())?; + if let DataType::Struct(_) = data_type { + return internal_err!("unnest on struct can ony be applied at the root level of select expression"); + } + let transformed_exprs = transform(&expr, arg)?; + Ok(Transformed::yes(transformed_exprs[0].clone())) + } else { + Ok(Transformed::no(expr)) + } + })?; + + if !transformed { + if matches!(&transformed_expr, Expr::Column(_)) { + inner_projection_exprs.push(transformed_expr.clone()); + Ok(vec![transformed_expr]) + } else { + // We need to evaluate the expr in the inner projection, + // outer projection just select its name + let column_name = transformed_expr.display_name()?; + inner_projection_exprs.push(transformed_expr); + Ok(vec![Expr::Column(Column::from_name(column_name))]) + } + } else { + Ok(vec![transformed_expr]) + } +} + +// write test for recursive_transform_unnest +#[cfg(test)] +mod tests { + use std::{ops::Add, sync::Arc}; + + use arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; + use arrow_schema::Fields; + use datafusion_common::{DFSchema, Result}; + use datafusion_expr::{col, lit, unnest, EmptyRelation, LogicalPlan}; + + use crate::utils::recursive_transform_unnest; + + #[test] + fn test_recursive_transform_unnest() -> Result<()> { + let schema = Schema::new(vec![ + Field::new( + "struct_col", + ArrowDataType::Struct(Fields::from(vec![ + Field::new("field1", ArrowDataType::Int32, false), + Field::new("field2", ArrowDataType::Int32, false), + ])), + false, + ), + Field::new( + "array_col", + ArrowDataType::List(Arc::new(Field::new( + "item", + ArrowDataType::Int64, + true, + ))), + true, + ), + Field::new("int_col", ArrowDataType::Int32, false), + ]); + + let dfschema = DFSchema::try_from(schema)?; + + let input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(dfschema), + }); + + let mut unnest_placeholder_columns = vec![]; + let mut inner_projection_exprs = vec![]; + + // unnest(struct_col) + let original_expr = unnest(col("struct_col")); + let transformed_exprs = recursive_transform_unnest( + &input, + &mut unnest_placeholder_columns, + &mut inner_projection_exprs, + original_expr, + )?; + assert_eq!( + transformed_exprs, + vec![ + col("unnest(struct_col).field1"), + col("unnest(struct_col).field2"), + ] + ); + assert_eq!(unnest_placeholder_columns, vec!["unnest(struct_col)"]); + // still reference struct_col in original schema but with alias, + // to avoid colliding with the projection on the column itself if any + assert_eq!( + inner_projection_exprs, + vec![col("struct_col").alias("unnest(struct_col)"),] + ); + + // unnest(array_col) + 1 + let original_expr = unnest(col("array_col")).add(lit(1i64)); + let transformed_exprs = recursive_transform_unnest( + &input, + &mut unnest_placeholder_columns, + &mut inner_projection_exprs, + original_expr, + )?; + assert_eq!( + unnest_placeholder_columns, + vec!["unnest(struct_col)", "unnest(array_col)"] + ); + // only transform the unnest children + assert_eq!( + transformed_exprs, + vec![col("unnest(array_col)").add(lit(1i64))] + ); + + // keep appending to the current vector + // still reference array_col in original schema but with alias, + // to avoid colliding with the projection on the column itself if any + assert_eq!( + inner_projection_exprs, + vec![ + col("struct_col").alias("unnest(struct_col)"), + col("array_col").alias("unnest(array_col)") + ] + ); + + Ok(()) + } +} diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index bbedaca6a8bc..cca96b6eb944 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2905,6 +2905,21 @@ impl ContextProvider for MockContextProvider { Field::new("Id", DataType::UInt32, false), Field::new("lower", DataType::UInt32, false), ])), + "unnest_table" => Ok(Schema::new(vec![ + Field::new( + "array_col", + DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + false, + ), + Field::new( + "struct_col", + DataType::Struct(Fields::from(vec![ + Field::new("field1", DataType::Int64, true), + Field::new("field2", DataType::Utf8, true), + ])), + false, + ), + ])), _ => plan_err!("No table named: {} found", name.table()), }; @@ -4715,6 +4730,29 @@ fn roundtrip_crossjoin() -> Result<()> { Ok(()) } +#[test] +fn test_unnest_logical_plan() -> Result<()> { + let query = "select unnest(struct_col), unnest(array_col), struct_col, array_col from unnest_table"; + + let dialect = GenericDialect {}; + let statement = Parser::new(&dialect) + .try_with_sql(query)? + .parse_statement()?; + + let context = MockContextProvider::default(); + let sql_to_rel = SqlToRel::new(&context); + let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); + + let expected = "Projection: unnest(unnest_table.struct_col).field1, unnest(unnest_table.struct_col).field2, unnest(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col\ + \n Unnest: lists[unnest(unnest_table.array_col)] structs[unnest(unnest_table.struct_col)]\ + \n Projection: unnest_table.struct_col AS unnest(unnest_table.struct_col), unnest_table.array_col AS unnest(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col\ + \n TableScan: unnest_table"; + + assert_eq!(format!("{plan:?}"), expected); + + Ok(()) +} + #[cfg(test)] #[ctor::ctor] fn init() { diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index ca7e73cb87e0..7b7249d6d5fa 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -22,12 +22,19 @@ statement ok CREATE TABLE unnest_table AS VALUES - ([1,2,3], [7], 1, [13, 14]), - ([4,5], [8,9,10], 2, [15, 16]), - ([6], [11,12], 3, null), - ([12], [null, 42, null], null, null), + ([1,2,3], [7], 1, [13, 14], struct(1,2)), + ([4,5], [8,9,10], 2, [15, 16], struct(3,4)), + ([6], [11,12], 3, null, null), + ([12], [null, 42, null], null, null, struct(7,8)), -- null array to verify the `preserve_nulls` option - (null, null, 4, [17, 18]) + (null, null, 4, [17, 18], null) +; + +statement ok +CREATE TABLE nested_unnest_table +AS VALUES + (struct('a', 'b', struct('c')), (struct('a', 'b', [10,20]))), + (struct('d', 'e', struct('f')), (struct('x', 'y', [30,40, 50]))) ; ## Basic unnest expression in select list @@ -38,7 +45,13 @@ select unnest([1,2,3]); 2 3 -## Basic unnest expression in from clause +## Basic unnest expression in select struct +query III +select unnest(struct(1,2,3)); +---- +1 2 3 + +## Basic unnest list expression in from clause query I select * from unnest([1,2,3]); ---- @@ -46,6 +59,20 @@ select * from unnest([1,2,3]); 2 3 +## Basic unnest struct expression in from clause +query III +select * from unnest(struct(1,2,3)); +---- +1 2 3 + +## Multiple unnest expression in from clause +query IIII +select * from unnest(struct(1,2,3)),unnest([4,5,6]); +---- +1 2 3 4 +1 2 3 5 +1 2 3 6 + ## Unnest null in select list query error DataFusion error: This feature is not implemented: unnest\(\) does not support null yet @@ -145,10 +172,6 @@ select array_remove(column1, 4), unnest(column2), column3 * 10 from unnest_table [12] NULL NULL -## Unnest column with scalars -query error DataFusion error: Error during planning: unnest\(\) can only be applied to array, struct and null -select unnest(column3) from unnest_table; - ## Unnest doesn't work with untyped nulls query error DataFusion error: This feature is not implemented: unnest\(\) does not support null yet select unnest(null) from unnest_table; @@ -233,12 +256,16 @@ select * from unnest([], NULL::int[]); ## Unnest struct expression in select list -query error DataFusion error: This feature is not implemented: unnest\(\) does not support struct yet +query ? select unnest(struct(null)); +---- +NULL ## Unnest struct expression in from clause -query error DataFusion error: This feature is not implemented: unnest\(\) does not support struct yet +query ? select * from unnest(struct(null)); +---- +NULL ## Unnest array expression @@ -288,6 +315,18 @@ select unnest(array_remove(column1, 12)) from unnest_table; 5 6 +## unnest struct-typed column and list-typed column at the same time +query I?II? +select unnest(column1), column1, unnest(column5), column5 from unnest_table; +---- +1 [1, 2, 3] 1 2 {c0: 1, c1: 2} +2 [1, 2, 3] 1 2 {c0: 1, c1: 2} +3 [1, 2, 3] 1 2 {c0: 1, c1: 2} +4 [4, 5] 3 4 {c0: 3, c1: 4} +5 [4, 5] 3 4 {c0: 3, c1: 4} +6 [6] NULL NULL NULL +12 [12] 7 8 {c0: 7, c1: 8} + ## Unnest in from clause with alias query I @@ -383,8 +422,26 @@ select unnest(array_remove(column1, 3)) - 1 as c1, column3 from unnest_table; 5 3 11 NULL +## unnest for nested struct(struct) +query TT? +select unnest(column1) from nested_unnest_table; +---- +a b {c0: c} +d e {c0: f} + +## unnest for nested(struct(list)) +query TT? +select unnest(column2) from nested_unnest_table; +---- +a b [10, 20] +x y [30, 40, 50] + query error DataFusion error: type_coercion\ncaused by\nThis feature is not implemented: Unnest should be rewritten to LogicalPlan::Unnest before type coercion select sum(unnest(generate_series(1,10))); +## TODO: support unnest as a child expr +query error DataFusion error: Internal error: unnest on struct can ony be applied at the root level of select expression +select arrow_typeof(unnest(column5)) from unnest_table; + statement ok drop table unnest_table;