From 21265b7092283c87e04488925617b4c792b468f6 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 28 Jul 2025 19:11:04 +0800 Subject: [PATCH 1/4] feat: support sum distinct for window --- datafusion/functions-aggregate/src/sum.rs | 128 +++++++++++++++++- datafusion/sqllogictest/test_files/window.slt | 85 ++++++++++-- 2 files changed, 196 insertions(+), 17 deletions(-) diff --git a/datafusion/functions-aggregate/src/sum.rs b/datafusion/functions-aggregate/src/sum.rs index 9495e087d2501..8c370b1640b67 100644 --- a/datafusion/functions-aggregate/src/sum.rs +++ b/datafusion/functions-aggregate/src/sum.rs @@ -20,10 +20,10 @@ use ahash::RandomState; use datafusion_expr::utils::AggregateOrderSensitivity; use std::any::Any; -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use std::mem::{size_of, size_of_val}; -use arrow::array::Array; +use arrow::array::{Array, PrimitiveArray}; use arrow::array::ArrowNativeTypeOp; use arrow::array::{ArrowNumericType, AsArray}; use arrow::datatypes::ArrowPrimitiveType; @@ -33,9 +33,7 @@ use arrow::datatypes::{ DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, }; use arrow::{array::ArrayRef, datatypes::Field}; -use datafusion_common::{ - exec_err, not_impl_err, utils::take_function_args, Result, ScalarValue, -}; +use datafusion_common::{exec_err, not_impl_err, utils::take_function_args, HashMap, Result, ScalarValue}; use datafusion_expr::function::AccumulatorArgs; use datafusion_expr::function::StateFieldsArgs; use datafusion_expr::utils::format_state_name; @@ -243,12 +241,23 @@ impl AggregateUDFImpl for Sum { &self, args: AccumulatorArgs, ) -> Result> { - macro_rules! helper { + if args.is_distinct { + // distinct path: use our sliding‐window distinct‐sum + macro_rules! helper_distinct { + ($t:ty, $dt:expr) => { + Ok(Box::new(SlidingDistinctSumAccumulator::try_new(&$dt)?)) + }; + } + downcast_sum!(args, helper_distinct) + } else { + // non‐distinct path: existing sliding sum + macro_rules! helper { ($t:ty, $dt:expr) => { Ok(Box::new(SlidingSumAccumulator::<$t>::new($dt.clone()))) }; } - downcast_sum!(args, helper) + downcast_sum!(args, helper) + } } fn reverse_expr(&self) -> ReversedUDAF { @@ -477,3 +486,108 @@ impl Accumulator for DistinctSumAccumulator { size_of_val(self) + self.values.capacity() * size_of::() } } + + + +/// A sliding‐window accumulator for `SUM(DISTINCT)` over Int64 columns. +/// Maintains a running sum so that `evaluate()` is O(1). +#[derive(Debug)] +pub struct SlidingDistinctSumAccumulator { + /// Map each distinct value → its current count in the window + counts: HashMap, + /// Running sum of all distinct keys currently in the window + sum: i64, + /// Data type (must be Int64) + data_type: DataType, +} + +impl SlidingDistinctSumAccumulator { + /// Create a new accumulator; only `DataType::Int64` is supported. + pub fn try_new(data_type: &DataType) -> Result { + if *data_type != DataType::Int64 { + return exec_err!("SlidingDistinctSumAccumulator only supports Int64"); + } + Ok(Self { + counts: HashMap::default(), + sum: 0, + data_type: data_type.clone(), + }) + } +} + +impl Accumulator for SlidingDistinctSumAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let arr = values[0].as_primitive::(); + for &v in arr.values() { + let cnt = self.counts.entry(v).or_insert(0); + if *cnt == 0 { + // first occurrence in window + self.sum = self.sum.wrapping_add(v); + } + *cnt += 1; + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + // O(1) wrap of running sum + Ok(ScalarValue::Int64(Some(self.sum))) + } + + fn size(&self) -> usize { + size_of_val(self) + } + + fn state(&mut self) -> Result> { + // Serialize distinct keys for cross-partition merge if needed + let keys = self + .counts + .keys() + .cloned() + .map(Some) + .map(ScalarValue::Int64) + .collect::>(); + Ok(vec![ScalarValue::List(ScalarValue::new_list_nullable( + &keys, + &self.data_type, + ))]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + // Merge distinct keys from other partitions + let list_arr = states[0].as_list::(); + for maybe_inner in list_arr.iter().flatten() { + for idx in 0..maybe_inner.len() { + if let ScalarValue::Int64(Some(v)) = + ScalarValue::try_from_array(&*maybe_inner, idx)? + { + let cnt = self.counts.entry(v).or_insert(0); + if *cnt == 0 { + self.sum = self.sum.wrapping_add(v); + } + *cnt += 1; + } + } + } + Ok(()) + } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let arr = values[0].as_primitive::(); + for &v in arr.values() { + if let Some(cnt) = self.counts.get_mut(&v) { + *cnt -= 1; + if *cnt == 0 { + // last copy leaving window + self.sum = self.sum.wrapping_sub(v); + self.counts.remove(&v); + } + } + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } +} \ No newline at end of file diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index bed9121eec3fe..44677fd5b951e 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -5715,17 +5715,82 @@ EXPLAIN SELECT RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW ) AS distinct_count FROM table_test_distinct_count -ODER BY k, time; +ORDER BY k, time; ---- logical_plan -01)Projection: oder.k, oder.time, count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS normal_count, count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS distinct_count -02)--WindowAggr: windowExpr=[[count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW, count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW]] -03)----SubqueryAlias: oder +01)Sort: table_test_distinct_count.k ASC NULLS LAST, table_test_distinct_count.time ASC NULLS LAST +02)--Projection: table_test_distinct_count.k, table_test_distinct_count.time, count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS normal_count, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS distinct_count +03)----WindowAggr: windowExpr=[[count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW]] 04)------TableScan: table_test_distinct_count projection=[k, v, time] physical_plan -01)ProjectionExec: expr=[k@0 as k, time@2 as time, count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@3 as normal_count, count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@4 as distinct_count] -02)--BoundedWindowAggExec: wdw=[count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW, count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST], preserve_partitioning=[true] -04)------CoalesceBatchesExec: target_batch_size=1 -05)--------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2 -06)----------DataSourceExec: partitions=2, partition_sizes=[5, 4] +01)SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST] +02)--ProjectionExec: expr=[k@0 as k, time@2 as time, count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@3 as normal_count, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@4 as distinct_count] +03)----BoundedWindowAggExec: wdw=[count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST], preserve_partitioning=[true] +05)--------CoalesceBatchesExec: target_batch_size=1 +06)----------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2 +07)------------DataSourceExec: partitions=2, partition_sizes=[5, 4] + + +# Add testing for distinct sum +query TPII +SELECT + k, + time, + SUM(v) OVER ( + PARTITION BY k + ORDER BY time + RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW + ) AS sum_v, + SUM(DISTINCT v) OVER ( + PARTITION BY k + ORDER BY time + RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW + ) AS sum_distinct_v +FROM table_test_distinct_count +ORDER BY k, time; +---- +a 1970-01-01T00:01:00Z 1 1 +a 1970-01-01T00:02:00Z 2 1 +a 1970-01-01T00:03:00Z 5 3 +a 1970-01-01T00:03:00Z 5 3 +a 1970-01-01T00:04:00Z 5 3 +b 1970-01-01T00:01:00Z 3 3 +b 1970-01-01T00:02:00Z 6 3 +b 1970-01-01T00:03:00Z 14 7 +b 1970-01-01T00:03:00Z 14 7 + + + +query TT +EXPLAIN SELECT + k, + time, + SUM(v) OVER ( + PARTITION BY k + ORDER BY time + RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW + ) AS sum_v, + SUM(DISTINCT v) OVER ( + PARTITION BY k + ORDER BY time + RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW + ) AS sum_distinct_v +FROM table_test_distinct_count +ORDER BY k, time; +---- +logical_plan +01)Sort: table_test_distinct_count.k ASC NULLS LAST, table_test_distinct_count.time ASC NULLS LAST +02)--Projection: table_test_distinct_count.k, table_test_distinct_count.time, sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS sum_v, sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS sum_distinct_v +03)----WindowAggr: windowExpr=[[sum(__common_expr_1) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW, sum(DISTINCT __common_expr_1) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW]] +04)------Projection: CAST(table_test_distinct_count.v AS Int64) AS __common_expr_1, table_test_distinct_count.k, table_test_distinct_count.time +05)--------TableScan: table_test_distinct_count projection=[k, v, time] +physical_plan +01)SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST] +02)--ProjectionExec: expr=[k@1 as k, time@2 as time, sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@3 as sum_v, sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@4 as sum_distinct_v] +03)----BoundedWindowAggExec: wdw=[sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW, sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[k@1 ASC NULLS LAST, time@2 ASC NULLS LAST], preserve_partitioning=[true] +05)--------CoalesceBatchesExec: target_batch_size=1 +06)----------RepartitionExec: partitioning=Hash([k@1], 2), input_partitions=2 +07)------------ProjectionExec: expr=[CAST(v@1 AS Int64) as __common_expr_1, k@0 as k, time@2 as time] +08)--------------DataSourceExec: partitions=2, partition_sizes=[5, 4] From b30b19513632bf7d2f4b6beb52dd6e4254fed605 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 28 Jul 2025 19:13:10 +0800 Subject: [PATCH 2/4] fmt --- datafusion/functions-aggregate/src/sum.rs | 28 +++++++++++------------ 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/datafusion/functions-aggregate/src/sum.rs b/datafusion/functions-aggregate/src/sum.rs index 8c370b1640b67..4866c2cdc7ff2 100644 --- a/datafusion/functions-aggregate/src/sum.rs +++ b/datafusion/functions-aggregate/src/sum.rs @@ -20,11 +20,11 @@ use ahash::RandomState; use datafusion_expr::utils::AggregateOrderSensitivity; use std::any::Any; -use std::collections::{HashSet, VecDeque}; +use std::collections::HashSet; use std::mem::{size_of, size_of_val}; -use arrow::array::{Array, PrimitiveArray}; use arrow::array::ArrowNativeTypeOp; +use arrow::array::Array; use arrow::array::{ArrowNumericType, AsArray}; use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::{ArrowNativeType, FieldRef}; @@ -33,7 +33,9 @@ use arrow::datatypes::{ DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, }; use arrow::{array::ArrayRef, datatypes::Field}; -use datafusion_common::{exec_err, not_impl_err, utils::take_function_args, HashMap, Result, ScalarValue}; +use datafusion_common::{ + exec_err, not_impl_err, utils::take_function_args, HashMap, Result, ScalarValue, +}; use datafusion_expr::function::AccumulatorArgs; use datafusion_expr::function::StateFieldsArgs; use datafusion_expr::utils::format_state_name; @@ -244,18 +246,18 @@ impl AggregateUDFImpl for Sum { if args.is_distinct { // distinct path: use our sliding‐window distinct‐sum macro_rules! helper_distinct { - ($t:ty, $dt:expr) => { - Ok(Box::new(SlidingDistinctSumAccumulator::try_new(&$dt)?)) - }; - } + ($t:ty, $dt:expr) => { + Ok(Box::new(SlidingDistinctSumAccumulator::try_new(&$dt)?)) + }; + } downcast_sum!(args, helper_distinct) } else { // non‐distinct path: existing sliding sum macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new(SlidingSumAccumulator::<$t>::new($dt.clone()))) - }; - } + ($t:ty, $dt:expr) => { + Ok(Box::new(SlidingSumAccumulator::<$t>::new($dt.clone()))) + }; + } downcast_sum!(args, helper) } } @@ -487,8 +489,6 @@ impl Accumulator for DistinctSumAccumulator { } } - - /// A sliding‐window accumulator for `SUM(DISTINCT)` over Int64 columns. /// Maintains a running sum so that `evaluate()` is O(1). #[derive(Debug)] @@ -590,4 +590,4 @@ impl Accumulator for SlidingDistinctSumAccumulator { fn supports_retract_batch(&self) -> bool { true } -} \ No newline at end of file +} From de3e8af284a3283a4566ce54140c524e7c5de1c5 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 28 Jul 2025 19:17:31 +0800 Subject: [PATCH 3/4] fmt --- datafusion/functions-aggregate/src/sum.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/src/sum.rs b/datafusion/functions-aggregate/src/sum.rs index 4866c2cdc7ff2..2dfa32433b013 100644 --- a/datafusion/functions-aggregate/src/sum.rs +++ b/datafusion/functions-aggregate/src/sum.rs @@ -23,8 +23,8 @@ use std::any::Any; use std::collections::HashSet; use std::mem::{size_of, size_of_val}; -use arrow::array::ArrowNativeTypeOp; use arrow::array::Array; +use arrow::array::ArrowNativeTypeOp; use arrow::array::{ArrowNumericType, AsArray}; use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::{ArrowNativeType, FieldRef}; From 95fa1f1858c94dcea019e7325aeb3fe340d249a5 Mon Sep 17 00:00:00 2001 From: zhuqi-lucas <821684824@qq.com> Date: Mon, 28 Jul 2025 21:05:35 +0800 Subject: [PATCH 4/4] fix test --- datafusion/ffi/src/udaf/mod.rs | 3 ++- datafusion/functions-aggregate/src/sum.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index 63d44110a657d..0e635718606ef 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -717,6 +717,7 @@ mod tests { let foreign_udaf = create_test_foreign_udaf(Sum::new())?; let schema = Schema::new(vec![Field::new("a", DataType::Float64, true)]); + // Note: sum distinct is only support Int64 until now let acc_args = AccumulatorArgs { return_field: Field::new("f", DataType::Float64, true).into(), schema: &schema, @@ -724,7 +725,7 @@ mod tests { order_bys: &[PhysicalSortExpr::new_default(col("a", &schema)?)], is_reversed: false, name: "round_trip", - is_distinct: true, + is_distinct: false, exprs: &[col("a", &schema)?], }; diff --git a/datafusion/functions-aggregate/src/sum.rs b/datafusion/functions-aggregate/src/sum.rs index 2dfa32433b013..97c0bbb976b33 100644 --- a/datafusion/functions-aggregate/src/sum.rs +++ b/datafusion/functions-aggregate/src/sum.rs @@ -504,6 +504,7 @@ pub struct SlidingDistinctSumAccumulator { impl SlidingDistinctSumAccumulator { /// Create a new accumulator; only `DataType::Int64` is supported. pub fn try_new(data_type: &DataType) -> Result { + // TODO support other numeric types if *data_type != DataType::Int64 { return exec_err!("SlidingDistinctSumAccumulator only supports Int64"); }