Skip to content

Commit

Permalink
feat: extend unnest to support Struct datatype (#10429)
Browse files Browse the repository at this point in the history
* feat: extend unnest for struct

* compile err

* debugging

* finish basic

* chore: complete impl

* chore: clean garbage

* chore: more test

* test: fix df test

* prettify display

* fix unit test

* chore: compile err

* chore: fix physical exec err

* add sqllogic test

* chore: more doc

* chore: refactor

* fix doc

* fmt

* fix doc

* ut for recursive transform unnest

* a small integration test

* fix comment
  • Loading branch information
duongcongtoai authored May 22, 2024
1 parent b14e92f commit 0355713
Show file tree
Hide file tree
Showing 17 changed files with 707 additions and 207 deletions.
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl DataFrame {
self.unnest_columns_with_options(&[column], options)
}

/// Expand multiple list columns into a set of rows.
/// Expand multiple list/struct columns into a set of rows and new columns.
///
/// See also:
///
Expand All @@ -277,8 +277,8 @@ impl DataFrame {
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
/// let df = df.unnest_columns(&["a", "b"])?;
/// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
/// let df = df.unnest_columns(&["b","c","d"])?;
/// # Ok(())
/// # }
/// ```
Expand Down
16 changes: 5 additions & 11 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGro
use crate::physical_plan::analyze::AnalyzeExec;
use crate::physical_plan::empty::EmptyExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils as join_utils;
use crate::physical_plan::joins::{
Expand Down Expand Up @@ -1112,24 +1112,18 @@ impl DefaultPhysicalPlanner {
Arc::new(GlobalLimitExec::new(input, *skip, *fetch))
}
LogicalPlan::Unnest(Unnest {
columns,
list_type_columns,
struct_type_columns,
schema,
options,
..
}) => {
let input = children.one()?;
let column_execs = columns
.iter()
.map(|column| {
schema
.index_of_column(column)
.map(|idx| Column::new(&column.name, idx))
})
.collect::<Result<_>>()?;
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
Arc::new(UnnestExec::new(
input,
column_execs,
list_type_columns.clone(),
struct_type_columns.clone(),
schema,
options.clone(),
))
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/data/unnest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true],"d":{"e":1,"f":2}}
{"a":2, "b":[3.0, 2.3, -7.1], "c":[false, true]}
12 changes: 6 additions & 6 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1231,11 +1231,11 @@ async fn unnest_aggregate_columns() -> Result<()> {
.collect()
.await?;
let expected = [
r#"+--------------------+"#,
r#"| COUNT(shapes.tags) |"#,
r#"+--------------------+"#,
r#"| 9 |"#,
r#"+--------------------+"#,
r#"+-------------+"#,
r#"| COUNT(tags) |"#,
r#"+-------------+"#,
r#"| 9 |"#,
r#"+-------------+"#,
];
assert_batches_sorted_eq!(expected, &results);

Expand Down Expand Up @@ -1384,7 +1384,7 @@ async fn unnest_with_redundant_columns() -> Result<()> {
let optimized_plan = df.clone().into_optimized_plan()?;
let expected = vec![
"Projection: shapes.shape_id [shape_id:UInt32]",
" Unnest: shape_id2 [shape_id:UInt32, shape_id2:UInt32;N]",
" Unnest: lists[shape_id2] structs[] [shape_id:UInt32, shape_id2:UInt32;N]",
" Aggregate: groupBy=[[shapes.shape_id]], aggr=[[ARRAY_AGG(shapes.shape_id) AS shape_id2]] [shape_id:UInt32, shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} });N]",
" TableScan: shapes projection=[shape_id] [shape_id:UInt32]",
];
Expand Down
9 changes: 8 additions & 1 deletion datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use crate::expr::{
AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery,
Placeholder, TryCast,
Placeholder, TryCast, Unnest,
};
use crate::function::{
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
Expand Down Expand Up @@ -489,6 +489,13 @@ pub fn when(when: Expr, then: Expr) -> CaseBuilder {
CaseBuilder::new(None, vec![when], vec![then], None)
}

/// Create a Unnest expression
pub fn unnest(expr: Expr) -> Expr {
Expr::Unnest(Unnest {
expr: Box::new(expr),
})
}

/// Convenience method to create a new user defined scalar function (UDF) with a
/// specific signature and specific return type.
///
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl ExprSchemable for Expr {
Ok(field.data_type().clone())
}
DataType::Struct(_) => {
not_impl_err!("unnest() does not support struct yet")
Ok(arg_data_type)
}
DataType::Null => {
not_impl_err!("unnest() does not support null yet")
Expand Down
Loading

0 comments on commit 0355713

Please sign in to comment.