diff --git a/rust/arrow/src/compute/kernels/aggregate.rs b/rust/arrow/src/compute/kernels/aggregate.rs index 996e1667c14..444e2454a1c 100644 --- a/rust/arrow/src/compute/kernels/aggregate.rs +++ b/rust/arrow/src/compute/kernels/aggregate.rs @@ -125,6 +125,7 @@ where /// Returns the sum of values in the array. /// /// Returns `None` if the array is empty or only contains null values. +#[cfg(not(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd")))] pub fn sum(array: &PrimitiveArray) -> Option where T: ArrowNumericType, @@ -136,23 +137,126 @@ where return None; } - let mut n: T::Native = T::default_value(); - let data = array.data(); - let m = array.value_slice(0, data.len()); + let data: &[T::Native] = array.value_slice(0, array.len()); - if null_count == 0 { - // optimized path for arrays without null values - for item in m.iter().take(data.len()) { - n = n + *item; + match array.data().null_buffer() { + None => { + let sum = data.iter().fold(T::default_value(), |accumulator, value| { + accumulator + *value + }); + + Some(sum) } - } else { - for (i, item) in m.iter().enumerate() { - if data.is_valid(i) { - n = n + *item; - } + Some(buffer) => { + let mut sum = T::default_value(); + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + let bit_chunks = buffer.bit_chunks(array.offset(), array.len()); + &data_chunks + .zip(bit_chunks.iter()) + .for_each(|(chunk, mask)| { + chunk.iter().enumerate().for_each(|(i, value)| { + if (mask & (1 << i)) != 0 { + sum = sum + *value; + } + }); + }); + + let remainder_bits = bit_chunks.remainder_bits(); + + remainder.iter().enumerate().for_each(|(i, value)| { + if remainder_bits & (1 << i) != 0 { + sum = sum + *value; + } + }); + + Some(sum) } } - Some(n) +} + +/// Returns the sum of values in the array. +/// +/// Returns `None` if the array is empty or only contains null values. +#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))] +pub fn sum(array: &PrimitiveArray) -> Option +where + T::Native: Add, +{ + let null_count = array.null_count(); + + if null_count == array.len() { + return None; + } + + let data: &[T::Native] = array.value_slice(0, array.len()); + + let mut vector_sum = T::init(T::default_value()); + let mut remainder_sum = T::default_value(); + + match array.data().null_buffer() { + None => { + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + data_chunks.for_each(|chunk| { + chunk.chunks_exact(T::lanes()).for_each(|chunk| { + let chunk = T::load(&chunk); + vector_sum = vector_sum + chunk; + }); + }); + + remainder.iter().for_each(|value| { + remainder_sum = remainder_sum + *value; + }); + } + Some(buffer) => { + // process data in chunks of 64 elements since we also get 64 bits of validity information at a time + let data_chunks = data.chunks_exact(64); + let remainder = data_chunks.remainder(); + + let bit_chunks = buffer.bit_chunks(array.offset(), array.len()); + let remainder_bits = bit_chunks.remainder_bits(); + + data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| { + // split chunks further into slices corresponding to the vector length + // the compiler is able to unroll this inner loop and remove bounds checks + // since the outer chunk size (64) is always a multiple of the number of lanes + chunk.chunks_exact(T::lanes()).for_each(|chunk| { + let zero = T::init(T::default_value()); + let vecmask = T::mask_from_u64(mask); + let chunk = T::load(&chunk); + let blended = T::mask_select(vecmask, chunk, zero); + + vector_sum = vector_sum + blended; + + mask = mask >> T::lanes(); + }); + }); + + remainder.iter().enumerate().for_each(|(i, value)| { + if remainder_bits & (1 << i) != 0 { + remainder_sum = remainder_sum + *value; + } + }); + } + } + + // calculate horizontal sum of accumulator by writing to a temporary + // this is probably faster than extracting individual lanes + // the compiler is free to optimize this to something faster + let tmp = &mut [T::default_value(); 64]; + T::write(vector_sum, &mut tmp[0..T::lanes()]); + + let mut total_sum = T::default_value(); + tmp[0..T::lanes()] + .iter() + .for_each(|lane| total_sum = total_sum + *lane); + + total_sum = total_sum + remainder_sum; + + Some(total_sum) } #[cfg(test)] diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 1e2c38adc57..0d05f826d37 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -537,6 +537,9 @@ where /// Creates a new SIMD mask for this SIMD type filling it with `value` fn mask_init(value: bool) -> Self::SimdMask; + /// Creates a new SIMD mask for this SIMD type from the lower-most bits of the given `mask` + fn mask_from_u64(mask: u64) -> Self::SimdMask; + /// Gets the value of a single lane in a SIMD mask fn mask_get(mask: &Self::SimdMask, idx: usize) -> bool; @@ -597,22 +600,109 @@ macro_rules! make_numeric_type { type SimdMask = $simd_mask_ty; + #[inline] fn lanes() -> usize { Self::Simd::lanes() } + #[inline] fn init(value: Self::Native) -> Self::Simd { Self::Simd::splat(value) } + #[inline] fn load(slice: &[Self::Native]) -> Self::Simd { unsafe { Self::Simd::from_slice_unaligned_unchecked(slice) } } + #[inline] fn mask_init(value: bool) -> Self::SimdMask { Self::SimdMask::splat(value) } + #[inline] + fn mask_from_u64(mask: u64) -> Self::SimdMask { + match Self::lanes() { + 8 => { + let vecidx = i64x8::new(128, 64, 32, 16, 8, 4, 2, 1); + + let vecmask = i64x8::splat((mask & 0xFF) as i64); + let vecmask = (vecidx & vecmask).eq(vecidx); + + unsafe { std::mem::transmute(vecmask) } + } + 16 => { + let vecidx = i32x16::new( + 32768, 16384, 8192, 4096, 2048, 1024, 512, 256, 128, 64, 32, + 16, 8, 4, 2, 1, + ); + + let vecmask = i32x16::splat((mask & 0xFFFF) as i32); + let vecmask = (vecidx & vecmask).eq(vecidx); + + unsafe { std::mem::transmute(vecmask) } + } + 32 => { + let tmp = &mut [0_i16; 32]; + + let vecidx = i32x16::new( + 32768, 16384, 8192, 4096, 2048, 1024, 512, 256, 128, 64, 32, + 16, 8, 4, 2, 1, + ); + + let vecmask = i32x16::splat((mask & 0xFFFF) as i32); + let vecmask = (vecidx & vecmask).eq(vecidx); + + i16x16::from_cast(vecmask) + .write_to_slice_unaligned(&mut tmp[0..16]); + + let vecmask = i32x16::splat(((mask >> 16) & 0xFFFF) as i32); + let vecmask = (vecidx & vecmask).eq(vecidx); + + i16x16::from_cast(vecmask) + .write_to_slice_unaligned(&mut tmp[16..32]); + + unsafe { std::mem::transmute(i16x32::from_slice_unaligned(tmp)) } + } + 64 => { + let tmp = &mut [0_i8; 64]; + + let vecidx = i32x16::new( + 32768, 16384, 8192, 4096, 2048, 1024, 512, 256, 128, 64, 32, + 16, 8, 4, 2, 1, + ); + + let vecmask = i32x16::splat((mask & 0xFFFF) as i32); + let vecmask = (vecidx & vecmask).eq(vecidx); + + i8x16::from_cast(vecmask) + .write_to_slice_unaligned(&mut tmp[0..16]); + + let vecmask = i32x16::splat(((mask >> 16) & 0xFFFF) as i32); + let vecmask = (vecidx & vecmask).eq(vecidx); + + i8x16::from_cast(vecmask) + .write_to_slice_unaligned(&mut tmp[16..32]); + + let vecmask = i32x16::splat(((mask >> 32) & 0xFFFF) as i32); + let vecmask = (vecidx & vecmask).eq(vecidx); + + i8x16::from_cast(vecmask) + .write_to_slice_unaligned(&mut tmp[32..48]); + + let vecmask = i32x16::splat(((mask >> 48) & 0xFFFF) as i32); + let vecmask = (vecidx & vecmask).eq(vecidx); + + i8x16::from_cast(vecmask) + .write_to_slice_unaligned(&mut tmp[48..64]); + + unsafe { std::mem::transmute(i8x64::from_slice_unaligned(tmp)) } + } + _ => panic!("Invalid number of vector lanes"), + } + } + + #[inline] fn mask_get(mask: &Self::SimdMask, idx: usize) -> bool { unsafe { mask.extract_unchecked(idx) } } @@ -624,11 +714,13 @@ macro_rules! make_numeric_type { action(mask.bitmask().to_byte_slice()); } + #[inline] fn mask_set(mask: Self::SimdMask, idx: usize, value: bool) -> Self::SimdMask { unsafe { mask.replace_unchecked(idx, value) } } /// Selects elements of `a` and `b` using `mask` + #[inline] fn mask_select( mask: Self::SimdMask, a: Self::Simd, @@ -637,10 +729,12 @@ macro_rules! make_numeric_type { mask.select(a, b) } + #[inline] fn mask_any(mask: Self::SimdMask) -> bool { mask.any() } + #[inline] fn bin_op Self::Simd>( left: Self::Simd, right: Self::Simd, @@ -649,30 +743,37 @@ macro_rules! make_numeric_type { op(left, right) } + #[inline] fn eq(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { left.eq(right) } + #[inline] fn ne(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { left.ne(right) } + #[inline] fn lt(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { left.lt(right) } + #[inline] fn le(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { left.le(right) } + #[inline] fn gt(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { left.gt(right) } + #[inline] fn ge(left: Self::Simd, right: Self::Simd) -> Self::SimdMask { left.ge(right) } + #[inline] fn write(simd_result: Self::Simd, slice: &mut [Self::Native]) { unsafe { simd_result.write_to_slice_unaligned_unchecked(slice) }; } @@ -2580,3 +2681,86 @@ mod tests { Ok(()) } } + +#[cfg(all( + test, + any(target_arch = "x86", target_arch = "x86_64"), + feature = "simd" +))] +mod arrow_numeric_type_tests { + use crate::datatypes::{ + ArrowNumericType, Float32Type, Float64Type, Int32Type, Int64Type, Int8Type, + UInt16Type, + }; + use packed_simd::*; + use FromCast; + + #[test] + fn test_mask_f64() { + let mask = Float64Type::mask_from_u64(0b10101010); + + let expected = + m64x8::from_cast(i64x8::from_slice_unaligned(&[-1, 0, -1, 0, -1, 0, -1, 0])); + + assert_eq!(expected, mask); + } + + #[test] + fn test_mask_u64() { + let mask = Int64Type::mask_from_u64(0b01010101); + + let expected = + m64x8::from_cast(i64x8::from_slice_unaligned(&[0, -1, 0, -1, 0, -1, 0, -1])); + + assert_eq!(expected, mask); + } + + #[test] + fn test_mask_f32() { + let mask = Float32Type::mask_from_u64(0b10101010_10101010); + + let expected = m32x16::from_cast(i32x16::from_slice_unaligned(&[ + -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, + ])); + + assert_eq!(expected, mask); + } + + #[test] + fn test_mask_i32() { + let mask = Int32Type::mask_from_u64(0b01010101_01010101); + + let expected = m32x16::from_cast(i32x16::from_slice_unaligned(&[ + 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, + ])); + + assert_eq!(expected, mask); + } + + #[test] + fn test_mask_u16() { + let mask = UInt16Type::mask_from_u64(0b01010101_01010101_10101010_10101010); + + let expected = m16x32::from_cast(i16x32::from_slice_unaligned(&[ + -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, 0, -1, 0, -1, 0, -1, + 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, + ])); + + assert_eq!(expected, mask); + } + + #[test] + fn test_mask_i8() { + let mask = Int8Type::mask_from_u64( + 0b01010101_01010101_10101010_10101010_01010101_01010101_10101010_10101010, + ); + + let expected = m8x64::from_cast(i8x64::from_slice_unaligned(&[ + -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, 0, -1, 0, -1, 0, -1, + 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, + -1, 0, -1, 0, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, 0, -1, + ])); + + assert_eq!(expected, mask); + } +} diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 5640daa5303..432ae593bae 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -779,6 +779,18 @@ async fn csv_query_external_table_count() { assert_eq!(expected, actual); } +#[tokio::test] +async fn csv_query_external_table_sum() { + let mut ctx = ExecutionContext::new(); + // cast smallint and int to bigint to avoid overflow during calculation + register_aggregate_csv_by_sql(&mut ctx).await; + let sql = + "SELECT SUM(CAST(c7 AS BIGINT)), SUM(CAST(c8 AS BIGINT)) FROM aggregate_test_100"; + let actual = execute(&mut ctx, sql).await; + let expected = vec![vec!["13060", "3017641"]]; + assert_eq!(expected, actual); +} + #[tokio::test] async fn csv_query_count_star() { let mut ctx = ExecutionContext::new();