diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 36fa95aa9f57d..00827edc2413a 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -387,17 +387,17 @@ mod test { column_statistics: vec![ ColumnStatistics { null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Int32(None)), + min_value: Precision::Exact(ScalarValue::Int32(None)), + sum_value: Precision::Exact(ScalarValue::Int32(None)), distinct_count: Precision::Exact(0), byte_size: Precision::Exact(16), }, ColumnStatistics { null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Date32(None)), + min_value: Precision::Exact(ScalarValue::Date32(None)), + sum_value: Precision::Exact(ScalarValue::Date32(None)), distinct_count: Precision::Exact(0), byte_size: Precision::Exact(16), // 4 rows * 4 bytes (Date32) }, @@ -416,17 +416,17 @@ mod test { column_statistics: vec![ ColumnStatistics { null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Int32(None)), + min_value: Precision::Exact(ScalarValue::Int32(None)), + sum_value: Precision::Exact(ScalarValue::Int32(None)), distinct_count: Precision::Exact(0), byte_size: Precision::Exact(8), }, ColumnStatistics { null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Date32(None)), + min_value: Precision::Exact(ScalarValue::Date32(None)), + sum_value: Precision::Exact(ScalarValue::Date32(None)), distinct_count: Precision::Exact(0), byte_size: Precision::Exact(8), // 2 rows * 4 bytes (Date32) }, diff --git a/datafusion/functions-nested/src/sort.rs b/datafusion/functions-nested/src/sort.rs index ba2da0f760eee..cbe101f111b26 100644 --- a/datafusion/functions-nested/src/sort.rs +++ b/datafusion/functions-nested/src/sort.rs @@ -18,16 +18,14 @@ //! [`ScalarUDFImpl`] definitions for array_sort function. use crate::utils::make_scalar_function; -use arrow::array::{ - Array, ArrayRef, GenericListArray, NullBufferBuilder, OffsetSizeTrait, new_null_array, -}; +use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_null_array}; use arrow::buffer::OffsetBuffer; use arrow::compute::SortColumn; use arrow::datatypes::{DataType, FieldRef}; use arrow::{compute, compute::SortOptions}; use datafusion_common::cast::{as_large_list_array, as_list_array, as_string_array}; use datafusion_common::utils::ListCoercion; -use datafusion_common::{Result, exec_err, plan_err}; +use datafusion_common::{Result, exec_err}; use datafusion_expr::{ ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -134,18 +132,7 @@ impl ScalarUDFImpl for ArraySort { } fn return_type(&self, arg_types: &[DataType]) -> Result { - match &arg_types[0] { - DataType::Null => Ok(DataType::Null), - DataType::List(field) => { - Ok(DataType::new_list(field.data_type().clone(), true)) - } - DataType::LargeList(field) => { - Ok(DataType::new_large_list(field.data_type().clone(), true)) - } - arg_type => { - plan_err!("{} does not support type {arg_type}", self.name()) - } - } + Ok(arg_types[0].clone()) } fn invoke_with_args( @@ -206,11 +193,11 @@ fn array_sort_inner(args: &[ArrayRef]) -> Result { } DataType::List(field) => { let array = as_list_array(&args[0])?; - array_sort_generic(array, field, sort_options) + array_sort_generic(array, Arc::clone(field), sort_options) } DataType::LargeList(field) => { let array = as_large_list_array(&args[0])?; - array_sort_generic(array, field, sort_options) + array_sort_generic(array, Arc::clone(field), sort_options) } // Signature should prevent this arm ever occurring _ => exec_err!("array_sort expects list for first argument"), @@ -219,18 +206,16 @@ fn array_sort_inner(args: &[ArrayRef]) -> Result { fn array_sort_generic( list_array: &GenericListArray, - field: &FieldRef, + field: FieldRef, sort_options: Option, ) -> Result { let row_count = list_array.len(); let mut array_lengths = vec![]; let mut arrays = vec![]; - let mut valid = NullBufferBuilder::new(row_count); for i in 0..row_count { if list_array.is_null(i) { array_lengths.push(0); - valid.append_null(); } else { let arr_ref = list_array.value(i); @@ -253,25 +238,22 @@ fn array_sort_generic( }; array_lengths.push(sorted_array.len()); arrays.push(sorted_array); - valid.append_non_null(); } } - let buffer = valid.finish(); - let elements = arrays .iter() .map(|a| a.as_ref()) .collect::>(); let list_arr = if elements.is_empty() { - GenericListArray::::new_null(Arc::clone(field), row_count) + GenericListArray::::new_null(field, row_count) } else { GenericListArray::::new( - Arc::clone(field), + field, OffsetBuffer::from_lengths(array_lengths), Arc::new(compute::concat(elements.as_slice())?), - buffer, + list_array.nulls().cloned(), ) }; Ok(Arc::new(list_arr)) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index e0187e6547be9..2bec5a1853528 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -979,7 +979,7 @@ impl AggregateExec { } else if fun_name.eq_ignore_ascii_case("max") { DynamicFilterAggregateType::Max } else { - continue; + return; }; // 2. arg should be only 1 column reference diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 64727b44ace69..743e9e327c0e2 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -232,6 +232,7 @@ impl FilterExec { let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); let column_statistics = collect_new_statistics( + schema, &input_stats.column_statistics, analysis_ctx.boundaries, ); @@ -637,6 +638,7 @@ impl EmbeddedProjection for FilterExec { /// is adjusted by using the next/previous value for its data type to convert /// it into a closed bound. fn collect_new_statistics( + schema: &SchemaRef, input_column_stats: &[ColumnStatistics], analysis_boundaries: Vec, ) -> Vec { @@ -653,12 +655,17 @@ fn collect_new_statistics( }, )| { let Some(interval) = interval else { - // If the interval is `None`, we can say that there are no rows: + // If the interval is `None`, we can say that there are no rows. + // Use a typed null to preserve the column's data type, so that + // downstream interval analysis can still intersect intervals + // of the same type. + let typed_null = ScalarValue::try_from(schema.field(idx).data_type()) + .unwrap_or(ScalarValue::Null); return ColumnStatistics { null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(typed_null.clone()), + min_value: Precision::Exact(typed_null.clone()), + sum_value: Precision::Exact(typed_null), distinct_count: Precision::Exact(0), byte_size: input_column_stats[idx].byte_size, }; @@ -1351,17 +1358,17 @@ mod tests { statistics.column_statistics, vec![ ColumnStatistics { - min_value: Precision::Exact(ScalarValue::Null), - max_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Int32(None)), + max_value: Precision::Exact(ScalarValue::Int32(None)), + sum_value: Precision::Exact(ScalarValue::Int32(None)), distinct_count: Precision::Exact(0), null_count: Precision::Exact(0), byte_size: Precision::Absent, }, ColumnStatistics { - min_value: Precision::Exact(ScalarValue::Null), - max_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Int32(None)), + max_value: Precision::Exact(ScalarValue::Int32(None)), + sum_value: Precision::Exact(ScalarValue::Int32(None)), distinct_count: Precision::Exact(0), null_count: Precision::Exact(0), byte_size: Precision::Absent, @@ -1372,6 +1379,70 @@ mod tests { Ok(()) } + /// Regression test: stacking two FilterExecs where the inner filter + /// proves zero selectivity should not panic with a type mismatch + /// during interval intersection. + /// + /// Previously, when a filter proved no rows could match, the column + /// statistics used untyped `ScalarValue::Null` (data type `Null`). + /// If an outer FilterExec then tried to analyze its own predicate + /// against those statistics, `Interval::intersect` would fail with: + /// "Only intervals with the same data type are intersectable, lhs:Null, rhs:Int32" + #[tokio::test] + async fn test_nested_filter_with_zero_selectivity_inner() -> Result<()> { + // Inner table: a: [1, 100], b: [1, 3] + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(4000), + column_statistics: vec![ + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(3))), + ..Default::default() + }, + ], + }, + schema, + )); + + // Inner filter: a > 200 (impossible given a max=100 → zero selectivity) + let inner_predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(200)))), + )); + let inner_filter: Arc = + Arc::new(FilterExec::try_new(inner_predicate, input)?); + + // Outer filter: a = 50 + // Before the fix, this would panic because the inner filter's + // zero-selectivity statistics produced Null-typed intervals for + // column `a`, which couldn't intersect with the Int32 literal. + let outer_predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(50)))), + )); + let outer_filter: Arc = + Arc::new(FilterExec::try_new(outer_predicate, inner_filter)?); + + // Should succeed without error + let statistics = outer_filter.partition_statistics(None)?; + assert_eq!(statistics.num_rows, Precision::Inexact(0)); + + Ok(()) + } + #[tokio::test] async fn test_filter_statistics_more_inputs() -> Result<()> { let schema = Schema::new(vec![ diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index e6735675125bd..c5c794f5a8c68 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -639,6 +639,7 @@ impl HashJoinStream { filter, JoinSide::Left, None, + self.join_type, )? } else { (left_indices, right_indices) @@ -707,6 +708,7 @@ impl HashJoinStream { &right_indices, &self.column_indices, join_side, + self.join_type, )?; self.output_buffer.push_batch(batch)?; @@ -770,6 +772,7 @@ impl HashJoinStream { &right_side, &self.column_indices, JoinSide::Left, + self.join_type, )?; self.output_buffer.push_batch(batch)?; } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 1f6bc703a0300..a75a9893e9f1a 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -930,6 +930,7 @@ pub(crate) fn build_side_determined_results( &probe_indices, column_indices, build_hash_joiner.build_side, + join_type, ) .map(|batch| (batch.num_rows() > 0).then_some(batch)) } else { @@ -993,6 +994,7 @@ pub(crate) fn join_with_probe_batch( filter, build_hash_joiner.build_side, None, + join_type, )? } else { (build_indices, probe_indices) @@ -1031,6 +1033,7 @@ pub(crate) fn join_with_probe_batch( &probe_indices, column_indices, build_hash_joiner.build_side, + join_type, ) .map(|batch| (batch.num_rows() > 0).then_some(batch)) } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 7d6a469d8ce7a..6f69996dd6363 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -918,6 +918,7 @@ pub(crate) fn get_final_indices_from_bit_map( (left_indices, right_indices) } +#[expect(clippy::too_many_arguments)] pub(crate) fn apply_join_filter_to_indices( build_input_buffer: &RecordBatch, probe_batch: &RecordBatch, @@ -926,6 +927,7 @@ pub(crate) fn apply_join_filter_to_indices( filter: &JoinFilter, build_side: JoinSide, max_intermediate_size: Option, + join_type: JoinType, ) -> Result<(UInt64Array, UInt32Array)> { if build_indices.is_empty() && probe_indices.is_empty() { return Ok((build_indices, probe_indices)); @@ -946,6 +948,7 @@ pub(crate) fn apply_join_filter_to_indices( &probe_indices.slice(i, len), filter.column_indices(), build_side, + join_type, )?; let filter_result = filter .expression() @@ -967,6 +970,7 @@ pub(crate) fn apply_join_filter_to_indices( &probe_indices, filter.column_indices(), build_side, + join_type, )?; filter @@ -987,6 +991,7 @@ pub(crate) fn apply_join_filter_to_indices( /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. +#[expect(clippy::too_many_arguments)] pub(crate) fn build_batch_from_indices( schema: &Schema, build_input_buffer: &RecordBatch, @@ -995,11 +1000,19 @@ pub(crate) fn build_batch_from_indices( probe_indices: &UInt32Array, column_indices: &[ColumnIndex], build_side: JoinSide, + join_type: JoinType, ) -> Result { if schema.fields().is_empty() { + // For RightAnti and RightSemi joins, after `adjust_indices_by_join_type` + // the build_indices were untouched so only probe_indices hold the actual + // row count. + let row_count = match join_type { + JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(), + _ => build_indices.len(), + }; let options = RecordBatchOptions::new() .with_match_field_names(true) - .with_row_count(Some(build_indices.len())); + .with_row_count(Some(row_count)); return Ok(RecordBatch::try_new_with_options( Arc::new(schema.clone()), diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index c31f3d0702358..65df1e50b8981 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -2577,6 +2577,31 @@ NULL NULL NULL NULL NULL NULL +# maintains inner nullability +query ?T +select array_sort(column1), arrow_typeof(array_sort(column1)) +from values + (arrow_cast([], 'List(non-null Int32)')), + (arrow_cast(NULL, 'List(non-null Int32)')), + (arrow_cast([1, 3, 5, -5], 'List(non-null Int32)')) +; +---- +[] List(non-null Int32) +NULL List(non-null Int32) +[-5, 1, 3, 5] List(non-null Int32) + +query ?T +select column1, arrow_typeof(column1) +from values (array_sort(arrow_cast([1, 3, 5, -5], 'LargeList(non-null Int32)'))); +---- +[-5, 1, 3, 5] LargeList(non-null Int32) + +query ?T +select column1, arrow_typeof(column1) +from values (array_sort(arrow_cast([1, 3, 5, -5], 'FixedSizeList(4 x non-null Int32)'))); +---- +[-5, 1, 3, 5] List(non-null Int32) + query ? select array_sort([struct('foo', 3), struct('foo', 1), struct('bar', 1)]) ---- diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index 3e403171e0718..6ee6ba0695cb8 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -257,6 +257,25 @@ physical_plan 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet]]}, projection=[score], file_type=parquet, predicate=category@0 = alpha AND DynamicFilter [ empty ], pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1, required_guarantees=[category in (alpha)] +# Test 4b: COUNT + MAX — DynamicFilter should NOT appear here in mixed aggregates + +query TT +EXPLAIN SELECT COUNT(*), MAX(score) FROM agg_parquet WHERE category = 'alpha'; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*), max(agg_parquet.score) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)), max(agg_parquet.score)]] +03)----Projection: agg_parquet.score +04)------Filter: agg_parquet.category = Utf8View("alpha") +05)--------TableScan: agg_parquet projection=[category, score], partial_filters=[agg_parquet.category = Utf8View("alpha")] +physical_plan +01)ProjectionExec: expr=[count(Int64(1))@0 as count(*), max(agg_parquet.score)@1 as max(agg_parquet.score)] +02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1)), max(agg_parquet.score)] +03)----CoalescePartitionsExec +04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1)), max(agg_parquet.score)] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/agg_data.parquet]]}, projection=[score], file_type=parquet, predicate=category@0 = alpha, pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1, required_guarantees=[category in (alpha)] + # Disable aggregate dynamic filters only statement ok SET datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = false; diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 42441fe787dbe..1155bc4f3b2bf 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -5226,3 +5226,46 @@ DROP TABLE issue_20437_small; statement count 0 DROP TABLE issue_20437_large; + +# Test count(*) with right semi/anti joins returns correct row counts +# issue: https://github.com/apache/datafusion/issues/20669 + +statement ok +CREATE TABLE t1 (k INT, v INT); + +statement ok +CREATE TABLE t2 (k INT, v INT); + +statement ok +INSERT INTO t1 SELECT i AS k, i AS v FROM generate_series(1, 100) t(i); + +statement ok +INSERT INTO t2 VALUES (1, 1); + +query I +WITH t AS ( + SELECT * + FROM t1 + LEFT ANTI JOIN t2 ON t1.k = t2.k +) +SELECT count(*) +FROM t; +---- +99 + +query I +WITH t AS ( + SELECT * + FROM t1 + LEFT SEMI JOIN t2 ON t1.k = t2.k +) +SELECT count(*) +FROM t; +---- +1 + +statement count 0 +DROP TABLE t1; + +statement count 0 +DROP TABLE t2; diff --git a/datafusion/substrait/tests/cases/logical_plans.rs b/datafusion/substrait/tests/cases/logical_plans.rs index 41f08c579f471..e3e45193e7e0c 100644 --- a/datafusion/substrait/tests/cases/logical_plans.rs +++ b/datafusion/substrait/tests/cases/logical_plans.rs @@ -220,6 +220,30 @@ mod tests { // Trigger execution to ensure plan validity DataFrame::new(ctx.state(), plan).show().await?; + Ok(()) + } + #[tokio::test] + async fn duplicate_name_in_union() -> Result<()> { + let proto_plan = + read_json("tests/testdata/test_plans/duplicate_name_in_union.substrait.json"); + let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?; + let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?; + + assert_snapshot!( + plan, + @r" + Projection: foo AS col1, bar AS col2 + Union + Projection: foo, bar + Values: (Int64(100), Int64(200)) + Projection: x, foo + Values: (Int32(300), Int64(400)) + " + ); + + // Trigger execution to ensure plan validity + DataFrame::new(ctx.state(), plan).show().await?; + Ok(()) } } diff --git a/datafusion/substrait/tests/testdata/test_plans/duplicate_name_in_union.substrait.json b/datafusion/substrait/tests/testdata/test_plans/duplicate_name_in_union.substrait.json new file mode 100644 index 0000000000000..1da2ff6131368 --- /dev/null +++ b/datafusion/substrait/tests/testdata/test_plans/duplicate_name_in_union.substrait.json @@ -0,0 +1,171 @@ +{ + "version": { + "minorNumber": 54, + "producer": "datafusion-test" + }, + "relations": [ + { + "root": { + "input": { + "set": { + "common": { + "direct": {} + }, + "inputs": [ + { + "project": { + "common": { + "emit": { + "outputMapping": [2, 3] + } + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": ["foo", "bar"], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "i64": { + "nullability": "NULLABILITY_REQUIRED" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + }, + "virtualTable": { + "expressions": [ + { + "fields": [ + { + "literal": { + "i64": "100" + } + }, + { + "literal": { + "i64": "200" + } + } + ] + } + ] + } + } + }, + "expressions": [ + { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": {} + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": {} + } + } + ] + } + }, + { + "project": { + "common": { + "emit": { + "outputMapping": [2, 3] + } + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": ["x", "foo"], + "struct": { + "types": [ + { + "i32": { + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "i64": { + "nullability": "NULLABILITY_REQUIRED" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + }, + "virtualTable": { + "expressions": [ + { + "fields": [ + { + "literal": { + "i32": 300 + } + }, + { + "literal": { + "i64": "400" + } + } + ] + } + ] + } + } + }, + "expressions": [ + { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": {} + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": {} + } + } + ] + } + } + ], + "op": "SET_OP_UNION_ALL" + } + }, + "names": ["col1", "col2"] + } + } + ] +}