diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 0974b3a9114ef..7ae09a42de880 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -48,11 +48,15 @@ use crate::{ use arrow::array::{ new_null_array, Array, BooleanArray, BooleanBufferBuilder, RecordBatchOptions, + UInt64Array, }; use arrow::buffer::BooleanBuffer; -use arrow::compute::{concat_batches, filter, filter_record_batch, not, BatchCoalescer}; +use arrow::compute::{ + concat_batches, filter, filter_record_batch, not, take, BatchCoalescer, +}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use arrow_schema::DataType; use datafusion_common::cast::as_boolean_array; use datafusion_common::{ arrow_err, internal_datafusion_err, internal_err, project_schema, @@ -1661,11 +1665,30 @@ fn build_row_join_batch( // Broadcast the single build-side row to match the filtered // probe-side batch length let original_left_array = build_side_batch.column(column_index.index); - let scalar_value = ScalarValue::try_from_array( - original_left_array.as_ref(), - build_side_index, - )?; - scalar_value.to_array_of_size(filtered_probe_batch.num_rows())? + // Avoid using `ScalarValue::to_array_of_size()` for `List(Utf8View)` to avoid + // deep copies for buffers inside `Utf8View` array. See below for details. + // https://github.com/apache/datafusion/issues/18159 + // + // In other cases, `to_array_of_size()` is faster. + match original_left_array.data_type() { + DataType::List(field) | DataType::LargeList(field) + if field.data_type() == &DataType::Utf8View => + { + let indices_iter = std::iter::repeat_n( + build_side_index as u64, + filtered_probe_batch.num_rows(), + ); + let indices_array = UInt64Array::from_iter_values(indices_iter); + take(original_left_array.as_ref(), &indices_array, None)? + } + _ => { + let scalar_value = ScalarValue::try_from_array( + original_left_array.as_ref(), + build_side_index, + )?; + scalar_value.to_array_of_size(filtered_probe_batch.num_rows())? + } + } } else { // Take the filtered probe-side column using compute::take Arc::clone(filtered_probe_batch.column(column_index.index)) diff --git a/datafusion/sqllogictest/test_files/join_lists.slt b/datafusion/sqllogictest/test_files/join_lists.slt new file mode 100644 index 0000000000000..c07bd85551f34 --- /dev/null +++ b/datafusion/sqllogictest/test_files/join_lists.slt @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + + +## Ensure test coverage for NLJ using joining on LISTS + +## Reproducer for https://github.com/apache/datafusion/issues/18070 + +statement ok +CREATE TABLE categories_raw +AS SELECT arrow_cast('cat_' || value, 'Utf8View') AS category_id FROM generate_series(1, 5); + +statement ok +CREATE TABLE places +AS SELECT column1 as id, column2 as fsq_category_ids, column3 as date_refreshed +FROM VALUES + (1, ['cat_1', 'cat_2', 'cat_3'], DATE '2023-05-10'), + (2, ['cat_4', 'cat_5'], DATE '2021-12-01'), + (3, ['cat_6', 'cat_7', 'cat_8', 'cat_9'], DATE '2024-01-15'); --> NOTE these categories do not exist in categories_raw + + +query I +WITH categories_arr AS ( + SELECT array_agg(category_id) AS category_ids FROM categories_raw LIMIT 500 +) +SELECT COUNT(*) + FROM places p + WHERE array_has_any(p.fsq_category_ids, (SELECT category_ids FROM categories_arr)); +---- +2 + +query I +WITH categories_arr AS ( + SELECT array_agg(category_id) AS category_ids FROM categories_raw LIMIT 500 +) +SELECT COUNT(*) + FROM places p + WHERE id <> 1 AND array_has_any(p.fsq_category_ids, (SELECT category_ids FROM categories_arr)); +---- +1 + +# cleanup +statement ok +DROP TABLE categories_raw; + +statement ok +DROP TABLE places; +