From 3d5d3b4418abb55256be269eb624d219736a0bef Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Wed, 24 Apr 2024 16:45:32 +0000 Subject: [PATCH 1/2] add date_bin benchmark --- datafusion/functions/Cargo.toml | 5 +++ datafusion/functions/benches/date_bin.rs | 57 ++++++++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 datafusion/functions/benches/date_bin.rs diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 577ecdb7461d..0886dee03479 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -112,6 +112,11 @@ harness = false name = "make_date" required-features = ["datetime_expressions"] +[[bench]] +harness = false +name = "date_bin" +required-features = ["datetime_expressions"] + [[bench]] harness = false name = "to_char" diff --git a/datafusion/functions/benches/date_bin.rs b/datafusion/functions/benches/date_bin.rs new file mode 100644 index 000000000000..c881947354fd --- /dev/null +++ b/datafusion/functions/benches/date_bin.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use std::sync::Arc; + +use arrow::array::{ArrayRef, TimestampSecondArray}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_common::ScalarValue; +use rand::rngs::ThreadRng; +use rand::Rng; + +use datafusion_expr::ColumnarValue; +use datafusion_functions::datetime::date_bin; + +fn timestamps(rng: &mut ThreadRng) -> TimestampSecondArray { + let mut seconds = vec![]; + for _ in 0..1000 { + seconds.push(rng.gen_range(0..1_000_000)); + } + + TimestampSecondArray::from(seconds) +} + +fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("date_bin_1000", |b| { + let mut rng = rand::thread_rng(); + let interval = ColumnarValue::Scalar(ScalarValue::new_interval_dt(0, 1_000_000)); + let timestamps = ColumnarValue::Array(Arc::new(timestamps(&mut rng)) as ArrayRef); + let udf = date_bin(); + + b.iter(|| { + black_box( + udf.invoke(&[interval.clone(), timestamps.clone()]) + .expect("date_bin should work on valid values"), + ) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From 462d7320e1ee9fd042393f5fa8ca313e62a35d3f Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Wed, 24 Apr 2024 16:49:40 +0000 Subject: [PATCH 2/2] optimize date_bin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As mentioned in the docs for `PrimaryArray::unary` it is faster to apply an infallible operation across both valid and invalid values, rather than branching at every value. 1) Make stride function infallible 2) Use `unary` method This gives this speedup on my machine: Before: 22.345 µs After: 10.558 µs So around 2x faster --- datafusion/functions/src/datetime/date_bin.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 7f5d9bb5d921..da1797cdae81 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -320,14 +320,14 @@ fn date_bin_impl( origin: i64, stride: i64, stride_fn: fn(i64, i64, i64) -> i64, - ) -> impl Fn(Option) -> Option { + ) -> impl Fn(i64) -> i64 { let scale = match T::UNIT { Nanosecond => 1, Microsecond => NANOSECONDS / 1_000_000, Millisecond => NANOSECONDS / 1_000, Second => NANOSECONDS, }; - move |x: Option| x.map(|x| stride_fn(stride, x * scale, origin) / scale) + move |x: i64| stride_fn(stride, x * scale, origin) / scale } Ok(match array { @@ -335,7 +335,7 @@ fn date_bin_impl( let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - apply_stride_fn(*v), + v.map(apply_stride_fn), tz_opt.clone(), )) } @@ -343,7 +343,7 @@ fn date_bin_impl( let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - apply_stride_fn(*v), + v.map(apply_stride_fn), tz_opt.clone(), )) } @@ -351,7 +351,7 @@ fn date_bin_impl( let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( - apply_stride_fn(*v), + v.map(apply_stride_fn), tz_opt.clone(), )) } @@ -359,7 +359,7 @@ fn date_bin_impl( let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); ColumnarValue::Scalar(ScalarValue::TimestampSecond( - apply_stride_fn(*v), + v.map(apply_stride_fn), tz_opt.clone(), )) } @@ -377,14 +377,13 @@ fn date_bin_impl( { let array = as_primitive_array::(array)?; let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); - let array = array - .iter() - .map(apply_stride_fn) - .collect::>() + let array: PrimitiveArray = array + .unary(apply_stride_fn) .with_timezone_opt(tz_opt.clone()); Ok(ColumnarValue::Array(Arc::new(array))) } + match array.data_type() { Timestamp(Nanosecond, tz_opt) => { transform_array_with_stride::(