Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Optimize IsNotNullExpr #11586

Merged
merged 6 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ name = "in_list"
[[bench]]
harness = false
name = "case_when"

[[bench]]
harness = false
name = "is_null"
95 changes: 95 additions & 0 deletions datafusion/physical-expr/benches/is_null.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::builder::Int32Builder;
use arrow_schema::DataType;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_physical_expr::expressions::{IsNotNullExpr, IsNullExpr};
use datafusion_physical_expr_common::expressions::column::Column;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
// create input data
let mut c1 = Int32Builder::new();
let mut c2 = Int32Builder::new();
let mut c3 = Int32Builder::new();
for i in 0..1000 {
// c1 is always null
c1.append_null();
// c2 is never null
c2.append_value(i);
// c3 is a mix of values and nulls
if i % 7 == 0 {
c3.append_null();
} else {
c3.append_value(i);
}
}
let c1 = Arc::new(c1.finish());
let c2 = Arc::new(c2.finish());
let c3 = Arc::new(c3.finish());
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Int32, false),
Field::new("c3", DataType::Int32, true),
]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![c1, c2, c3]).unwrap();

c.bench_function("is_null: column is all nulls", |b| {
let expr = is_null("c1", 0);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});

c.bench_function("is_null: column is never null", |b| {
let expr = is_null("c2", 1);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});

c.bench_function("is_null: column is mix of values and nulls", |b| {
let expr = is_null("c3", 2);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});

c.bench_function("is_not_null: column is all nulls", |b| {
let expr = is_not_null("c1", 0);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});

c.bench_function("is_not_null: column is never null", |b| {
let expr = is_not_null("c2", 1);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});

c.bench_function("is_not_null: column is mix of values and nulls", |b| {
let expr = is_not_null("c3", 2);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});
}

fn is_null(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(IsNullExpr::new(Arc::new(Column::new(name, index))))
}

fn is_not_null(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(IsNotNullExpr::new(Arc::new(Column::new(name, index))))
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
4 changes: 1 addition & 3 deletions datafusion/physical-expr/src/expressions/is_not_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::{any::Any, sync::Arc};

use crate::physical_expr::down_cast_any_ref;
use crate::PhysicalExpr;
use arrow::compute;
use arrow::{
datatypes::{DataType, Schema},
record_batch::RecordBatch,
Expand Down Expand Up @@ -74,8 +73,7 @@ impl PhysicalExpr for IsNotNullExpr {
let arg = self.arg.evaluate(batch)?;
match arg {
ColumnarValue::Array(array) => {
let is_null = super::is_null::compute_is_null(array)?;
let is_not_null = compute::not(&is_null)?;
let is_not_null = super::is_null::compute_is_not_null(array)?;
Ok(ColumnarValue::Array(Arc::new(is_not_null)))
}
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
Expand Down
15 changes: 15 additions & 0 deletions datafusion/physical-expr/src/expressions/is_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ pub(crate) fn compute_is_null(array: ArrayRef) -> Result<BooleanArray> {
}
}

/// workaround <https://github.com/apache/arrow-rs/issues/6017>,
/// this can be replaced with a direct call to `arrow::compute::is_not_null` once it's fixed.
pub(crate) fn compute_is_not_null(array: ArrayRef) -> Result<BooleanArray> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code looks good to me -- I think it also ends up supporting UnionArray in IS NOT NULL

cc @samuelcolvin who added this code in #11321

if let Some(union_array) = array.as_any().downcast_ref::<UnionArray>() {
let is_null = if let Some(offsets) = union_array.offsets() {
dense_union_is_null(union_array, offsets)?
} else {
sparse_union_is_null(union_array)?
};
compute::not(&is_null).map_err(Into::into)
} else {
compute::is_not_null(array.as_ref()).map_err(Into::into)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes faster because it calls a single kernel (compute::is_not_null) rather than 2 (is_null and not)?

Could we add some basic tests for union? Perhaps following the model in #11321 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes faster because it calls a single kernel (compute::is_not_null) rather than 2 (is_null and not)?

Yes, exactly. It avoids creating an interim vector that is then discarded.

Could we add some basic tests for union? Perhaps following the model in #11321 ?

We do already have at least one test for IS NOT NULL for union, that was added in #11321.

There is no functional change for union in this PR. The code in compute_is_not_null for union is copied from the compute_is_null method, and adds a call to not, so it is doing the same thing as before but the flow changed a little.

Union is the only case that this PR does not optimize for, because I didn't want to mess with the temporary workaround that is in place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I pushed a change to simplify the PR and remove the duplicated union code. Let me know if that makes things clearer (or not).

}
}

fn dense_union_is_null(
union_array: &UnionArray,
offsets: &ScalarBuffer<i32>,
Expand Down