@@ -32,6 +32,7 @@ use arrow::datatypes::{
3232} ;
3333use arrow:: error:: ArrowError ;
3434use arrow:: util:: pretty:: pretty_format_batches;
35+ use arrow_schema:: { SortOptions , TimeUnit } ;
3536use datafusion:: { assert_batches_eq, dataframe} ;
3637use datafusion_functions_aggregate:: count:: { count_all, count_all_window} ;
3738use datafusion_functions_aggregate:: expr_fn:: {
@@ -64,8 +65,8 @@ use datafusion::test_util::{
6465use datafusion_catalog:: TableProvider ;
6566use datafusion_common:: test_util:: { batches_to_sort_string, batches_to_string} ;
6667use datafusion_common:: {
67- assert_contains, Constraint , Constraints , DataFusionError , ParamValues , ScalarValue ,
68- TableReference , UnnestOptions ,
68+ assert_contains, Constraint , Constraints , DFSchema , DataFusionError , ParamValues ,
69+ ScalarValue , TableReference , UnnestOptions ,
6970} ;
7071use datafusion_common_runtime:: SpawnedTask ;
7172use datafusion_datasource:: file_format:: format_as_file_type;
@@ -79,10 +80,16 @@ use datafusion_expr::{
7980 LogicalPlanBuilder , ScalarFunctionImplementation , SortExpr , WindowFrame ,
8081 WindowFrameBound , WindowFrameUnits , WindowFunctionDefinition ,
8182} ;
83+ use datafusion_physical_expr:: aggregate:: AggregateExprBuilder ;
8284use datafusion_physical_expr:: expressions:: Column ;
8385use datafusion_physical_expr:: Partitioning ;
8486use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
85- use datafusion_physical_plan:: { displayable, ExecutionPlanProperties } ;
87+ use datafusion_physical_expr_common:: sort_expr:: PhysicalSortExpr ;
88+ use datafusion_physical_plan:: aggregates:: {
89+ AggregateExec , AggregateMode , PhysicalGroupBy ,
90+ } ;
91+ use datafusion_physical_plan:: empty:: EmptyExec ;
92+ use datafusion_physical_plan:: { displayable, ExecutionPlan , ExecutionPlanProperties } ;
8693
8794// Get string representation of the plan
8895async fn physical_plan_to_string ( df : & DataFrame ) -> String {
@@ -6322,3 +6329,105 @@ async fn test_copy_to_preserves_order() -> Result<()> {
63226329 ) ;
63236330 Ok ( ( ) )
63246331}
6332+
6333+ #[ tokio:: test]
6334+ async fn test_duplicate_state_fields_for_dfschema_construct ( ) -> Result < ( ) > {
6335+ let ctx = SessionContext :: new ( ) ;
6336+
6337+ // Simple schema with just the fields we need
6338+ let file_schema = Arc :: new ( Schema :: new ( vec ! [
6339+ Field :: new(
6340+ "timestamp" ,
6341+ DataType :: Timestamp ( TimeUnit :: Nanosecond , Some ( "UTC" . into( ) ) ) ,
6342+ true ,
6343+ ) ,
6344+ Field :: new( "ticker" , DataType :: Utf8 , true ) ,
6345+ Field :: new( "value" , DataType :: Float64 , true ) ,
6346+ Field :: new( "date" , DataType :: Utf8 , false ) ,
6347+ ] ) ) ;
6348+
6349+ let df_schema = DFSchema :: try_from ( file_schema. clone ( ) ) ?;
6350+
6351+ let timestamp = col ( "timestamp" ) ;
6352+ let value = col ( "value" ) ;
6353+ let ticker = col ( "ticker" ) ;
6354+ let date = col ( "date" ) ;
6355+
6356+ let mock_exec = Arc :: new ( EmptyExec :: new ( file_schema. clone ( ) ) ) ;
6357+
6358+ // Build first_value aggregate
6359+ let first_value = Arc :: new (
6360+ AggregateExprBuilder :: new (
6361+ datafusion_functions_aggregate:: first_last:: first_value_udaf ( ) ,
6362+ vec ! [ ctx. create_physical_expr( value. clone( ) , & df_schema) ?] ,
6363+ )
6364+ . alias ( "first_value(value)" )
6365+ . order_by ( vec ! [ PhysicalSortExpr :: new(
6366+ ctx. create_physical_expr( timestamp. clone( ) , & df_schema) ?,
6367+ SortOptions :: new( false , false ) ,
6368+ ) ] )
6369+ . schema ( file_schema. clone ( ) )
6370+ . build ( )
6371+ . expect ( "Failed to build first_value" ) ,
6372+ ) ;
6373+
6374+ // Build last_value aggregate
6375+ let last_value = Arc :: new (
6376+ AggregateExprBuilder :: new (
6377+ datafusion_functions_aggregate:: first_last:: last_value_udaf ( ) ,
6378+ vec ! [ ctx. create_physical_expr( value. clone( ) , & df_schema) ?] ,
6379+ )
6380+ . alias ( "last_value(value)" )
6381+ . order_by ( vec ! [ PhysicalSortExpr :: new(
6382+ ctx. create_physical_expr( timestamp. clone( ) , & df_schema) ?,
6383+ SortOptions :: new( false , false ) ,
6384+ ) ] )
6385+ . schema ( file_schema. clone ( ) )
6386+ . build ( )
6387+ . expect ( "Failed to build last_value" ) ,
6388+ ) ;
6389+
6390+ let partial_agg = AggregateExec :: try_new (
6391+ AggregateMode :: Partial ,
6392+ PhysicalGroupBy :: new_single ( vec ! [
6393+ (
6394+ ctx. create_physical_expr( date. clone( ) , & df_schema) ?,
6395+ "date" . to_string( ) ,
6396+ ) ,
6397+ (
6398+ ctx. create_physical_expr( ticker. clone( ) , & df_schema) ?,
6399+ "ticker" . to_string( ) ,
6400+ ) ,
6401+ ] ) ,
6402+ vec ! [ first_value, last_value] ,
6403+ vec ! [ None , None ] ,
6404+ mock_exec,
6405+ file_schema,
6406+ )
6407+ . expect ( "Failed to build partial agg" ) ;
6408+
6409+ // Assert that the schema field names match the expected names
6410+ let expected_field_names = vec ! [
6411+ "date" ,
6412+ "ticker" ,
6413+ "first_value(value)[first_value]" ,
6414+ "timestamp@0" ,
6415+ "is_set" ,
6416+ "last_value(value)[last_value]" ,
6417+ "timestamp@0" ,
6418+ "is_set" ,
6419+ ] ;
6420+
6421+ let binding = partial_agg. schema ( ) ;
6422+ let actual_field_names: Vec < _ > = binding. fields ( ) . iter ( ) . map ( |f| f. name ( ) ) . collect ( ) ;
6423+ assert_eq ! ( actual_field_names, expected_field_names) ;
6424+
6425+ // Ensure that DFSchema::try_from does not fail
6426+ let partial_agg_exec_schema = DFSchema :: try_from ( partial_agg. schema ( ) ) ;
6427+ assert ! (
6428+ partial_agg_exec_schema. is_ok( ) ,
6429+ "Expected get AggregateExec schema to succeed with duplicate state fields"
6430+ ) ;
6431+
6432+ Ok ( ( ) )
6433+ }
0 commit comments