From c794490cd0576619a48ea265bc3ca422341323ee Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Fri, 27 Feb 2026 13:36:38 -0700 Subject: [PATCH] Fix serde --- datafusion/physical-plan/src/windows/mod.rs | 12 +++++ .../proto/src/physical_plan/to_proto.rs | 3 +- .../tests/cases/roundtrip_physical_plan.rs | 45 +++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index d0e1eab099871..b72a65cf996be 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -226,6 +226,18 @@ impl WindowUDFExpr { pub fn fun(&self) -> &Arc { &self.fun } + + /// Returns all arguments passed to this window function. + /// + /// Unlike [`StandardWindowFunctionExpr::expressions`], which returns + /// only the expressions that need batch evaluation (and may filter out + /// literal offset/default args like those for `lead`/`lag`), this + /// method returns the complete, unfiltered argument list. This is + /// needed for serialization so that all arguments survive a + /// protobuf round-trip. + pub fn args(&self) -> &[Arc] { + &self.args + } } impl StandardWindowFunctionExpr for WindowUDFExpr { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index a38e59acdab26..de2f36e81e3ba 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -109,7 +109,7 @@ pub fn serialize_physical_window_expr( proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { let expr = window_expr.as_any(); - let args = window_expr.expressions().to_vec(); + let mut args = window_expr.expressions().to_vec(); let window_frame = window_expr.get_window_frame(); let (window_function, fun_definition, ignore_nulls, distinct) = @@ -145,6 +145,7 @@ pub fn serialize_physical_window_expr( { let mut buf = Vec::new(); codec.try_encode_udwf(expr.fun(), &mut buf)?; + args = expr.args().to_vec(); ( physical_window_expr_node::WindowFunction::UserDefinedWindowFunction( expr.fun().name().to_string(), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index bc310150d8982..230727c8c1d41 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3055,3 +3055,48 @@ fn test_session_id_rotation_with_execution_plans() -> Result<()> { Ok(()) } + +/// Tests that `lead` window function with offset and default value args +/// survives a protobuf round-trip. This is a regression test for a bug +/// where `expressions()` (used during serialization) returns only the +/// column expression for lead/lag, silently dropping the offset and +/// default value literal args. +#[test] +fn roundtrip_lead_with_default_value() -> Result<()> { + use datafusion::functions_window::lead_lag::lead_udwf; + + let field_a = Field::new("a", DataType::Int64, false); + let field_b = Field::new("b", DataType::Int64, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b])); + + // lead(a, 2, 42) — column a, offset 2, default value 42 + let lead_window = create_udwf_window_expr( + &lead_udwf(), + &[col("a", &schema)?, lit(2i64), lit(42i64)], + schema.as_ref(), + "test lead with default".to_string(), + false, + )?; + + let udwf_expr = Arc::new(StandardWindowExpr::new( + lead_window, + &[col("b", &schema)?], + &[PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions { + descending: false, + nulls_first: false, + }, + }], + Arc::new(WindowFrame::new(None)), + )); + + let input = Arc::new(EmptyExec::new(schema.clone())); + + roundtrip_test(Arc::new(BoundedWindowAggExec::try_new( + vec![udwf_expr], + input, + InputOrderMode::Sorted, + true, + )?)) +}