From a591301a1fe46899f1bfe66edd9c7741ed726ac3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 29 Jul 2024 16:52:52 -0400 Subject: [PATCH] Merge `string-view2` branch: reading from parquet up to 2x faster for some ClickBench queries (not on by default) (#11667) * Pin to pre-release version of arrow 52.2.0 * Update for deprecated method * Add a config to force using string view in benchmark (#11514) * add a knob to force string view in benchmark * fix sql logic test * update doc * fix ci * fix ci only test * Update benchmarks/src/util/options.rs Co-authored-by: Andrew Lamb * Update datafusion/common/src/config.rs Co-authored-by: Andrew Lamb * update tests --------- Co-authored-by: Andrew Lamb * Add String view helper functions (#11517) * add functions * add tests for hash util * Add ArrowBytesViewMap and ArrowBytesViewSet (#11515) * Update `string-view` branch to arrow-rs main (#10966) * Pin to arrow main * Fix clippy with latest arrow * Uncomment test that needs new arrow-rs to work * Update datafusion-cli Cargo.lock * Update Cargo.lock * tapelo * merge * update cast * consistent dep * fix ci * add more tests * make doc happy * update new implementation * fix bug * avoid unused dep * update dep * update * fix cargo check * update doc * pick up the comments change again --------- Co-authored-by: Andrew Lamb * Enable `GroupValueBytesView` for aggregation with StringView types (#11519) * add functions * Update `string-view` branch to arrow-rs main (#10966) * Pin to arrow main * Fix clippy with latest arrow * Uncomment test that needs new arrow-rs to work * Update datafusion-cli Cargo.lock * Update Cargo.lock * tapelo * merge * update cast * consistent dep * fix ci * avoid unused dep * update dep * update * fix cargo check * better group value view aggregation * update --------- Co-authored-by: Andrew Lamb * Initial support for regex_replace on `StringViewArray` (#11556) * initial support for string view regex * update tests * Add support for Utf8View for date/temporal codepaths (#11518) * Add StringView support for date_part and make_date funcs * run cargo update in datafusion-cli * cargo fmt --------- Co-authored-by: Andrew Lamb * GC `StringViewArray` in `CoalesceBatchesStream` (#11587) * gc string view when appropriate * make clippy happy * address comments * make doc happy * update style * Add comments and tests for gc_string_view_batch * better herustic * update test * Update datafusion/physical-plan/src/coalesce_batches.rs Co-authored-by: Andrew Lamb --------- Co-authored-by: Andrew Lamb * [Bug] fix bug in return type inference of `utf8_to_int_type` (#11662) * fix bug in return type inference * update doc * add tests --------- Co-authored-by: Andrew Lamb * Fix clippy * Increase ByteViewMap block size to 2MB (#11674) * better default block size * fix related test * Change `--string-view` to only apply to parquet formats (#11663) * use inferenced schema, don't load schema again * move config to parquet-only * update * update * better format * format * update * Implement native support StringView for character length (#11676) * native support for character length * Update datafusion/functions/src/unicode/character_length.rs --------- Co-authored-by: Andrew Lamb * Remove uneeded patches * cargo fmt --------- Co-authored-by: Xiangpeng Hao Co-authored-by: Xiangpeng Hao Co-authored-by: Andrew Duffy --- benchmarks/src/clickbench.rs | 8 +- benchmarks/src/tpch/run.rs | 7 + benchmarks/src/util/options.rs | 5 + datafusion-cli/Cargo.lock | 5 +- datafusion/common/src/cast.rs | 11 + datafusion/common/src/config.rs | 4 + .../common/src/file_options/parquet_writer.rs | 4 + datafusion/common/src/hash_utils.rs | 125 +++- .../core/src/datasource/file_format/mod.rs | 23 + .../src/datasource/file_format/parquet.rs | 13 +- .../core/src/datasource/listing/table.rs | 4 +- .../datasource/physical_plan/parquet/mod.rs | 4 + .../physical_plan/parquet/opener.rs | 25 +- datafusion/expr/src/type_coercion/binary.rs | 26 +- datafusion/functions-aggregate/src/count.rs | 4 + .../functions/src/datetime/date_part.rs | 30 +- .../functions/src/datetime/date_trunc.rs | 25 +- .../functions/src/datetime/make_date.rs | 4 +- .../functions/src/regex/regexpreplace.rs | 217 ++++-- .../functions/src/unicode/character_length.rs | 131 ++-- datafusion/functions/src/utils.rs | 23 +- .../src/aggregate/count_distinct/bytes.rs | 61 ++ .../src/aggregate/count_distinct/mod.rs | 1 + .../physical-expr-common/src/binary_map.rs | 6 + .../src/binary_view_map.rs | 690 ++++++++++++++++++ datafusion/physical-expr-common/src/lib.rs | 1 + .../physical-expr/src/aggregate/min_max.rs | 39 + .../src/aggregates/group_values/bytes_view.rs | 129 ++++ .../src/aggregates/group_values/mod.rs | 33 +- .../physical-plan/src/coalesce_batches.rs | 191 ++++- .../proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 2 +- .../proto-common/src/generated/pbjson.rs | 18 + .../proto-common/src/generated/prost.rs | 3 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 3 + .../engines/datafusion_engine/normalize.rs | 5 + .../test_files/information_schema.slt | 2 + .../sqllogictest/test_files/string_view.slt | 21 + docs/source/user-guide/configs.md | 1 + 40 files changed, 1714 insertions(+), 192 deletions(-) create mode 100644 datafusion/physical-expr-common/src/binary_view_map.rs create mode 100644 datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index 41dffc55f371..a0f051d17623 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -116,7 +116,13 @@ impl RunOpt { None => queries.min_query_id()..=queries.max_query_id(), }; - let config = self.common.config(); + let mut config = self.common.config(); + config + .options_mut() + .execution + .parquet + .schema_force_string_view = self.common.string_view; + let ctx = SessionContext::new_with_config(config); self.register_hits(&ctx).await?; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index f2a93d2ea549..a72dfaa0f58c 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -120,6 +120,11 @@ impl RunOpt { .config() .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; + config + .options_mut() + .execution + .parquet + .schema_force_string_view = self.common.string_view; let ctx = SessionContext::new_with_config(config); // register tables @@ -339,6 +344,7 @@ mod tests { partitions: Some(2), batch_size: 8192, debug: false, + string_view: false, }; let opt = RunOpt { query: Some(query), @@ -372,6 +378,7 @@ mod tests { partitions: Some(2), batch_size: 8192, debug: false, + string_view: false, }; let opt = RunOpt { query: Some(query), diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index b9398e5b522f..02591e293272 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -37,6 +37,11 @@ pub struct CommonOpt { /// Activate debug mode to see more details #[structopt(short, long)] pub debug: bool, + + /// If true, will use StringView/BinaryViewArray instead of String/BinaryArray + /// when reading ParquetFiles + #[structopt(long)] + pub string_view: bool, } impl CommonOpt { diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index e2851cfb4057..5884e424c781 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -3356,11 +3356,12 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] diff --git a/datafusion/common/src/cast.rs b/datafusion/common/src/cast.rs index 0dc0532bbb6f..0586fcf5e2ae 100644 --- a/datafusion/common/src/cast.rs +++ b/datafusion/common/src/cast.rs @@ -36,6 +36,7 @@ use arrow::{ }, datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType}, }; +use arrow_array::{BinaryViewArray, StringViewArray}; // Downcast ArrayRef to Date32Array pub fn as_date32_array(array: &dyn Array) -> Result<&Date32Array> { @@ -87,6 +88,11 @@ pub fn as_string_array(array: &dyn Array) -> Result<&StringArray> { Ok(downcast_value!(array, StringArray)) } +// Downcast ArrayRef to StringViewArray +pub fn as_string_view_array(array: &dyn Array) -> Result<&StringViewArray> { + Ok(downcast_value!(array, StringViewArray)) +} + // Downcast ArrayRef to UInt32Array pub fn as_uint32_array(array: &dyn Array) -> Result<&UInt32Array> { Ok(downcast_value!(array, UInt32Array)) @@ -221,6 +227,11 @@ pub fn as_binary_array(array: &dyn Array) -> Result<&BinaryArray> { Ok(downcast_value!(array, BinaryArray)) } +// Downcast ArrayRef to BinaryViewArray +pub fn as_binary_view_array(array: &dyn Array) -> Result<&BinaryViewArray> { + Ok(downcast_value!(array, BinaryViewArray)) +} + // Downcast ArrayRef to FixedSizeListArray pub fn as_fixed_size_list_array(array: &dyn Array) -> Result<&FixedSizeListArray> { Ok(downcast_value!(array, FixedSizeListArray)) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2b932b26cad6..9f8aa1cbdcaa 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -469,6 +469,10 @@ config_namespace! { /// writing out already in-memory data, such as from a cached /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 + + /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, + /// and `Binary/BinaryLarge` with `BinaryView`. + pub schema_force_string_view: bool, default = false } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 80b751858398..4a229fe01b54 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -175,6 +175,7 @@ impl ParquetOptions { maximum_parallel_row_group_writers: _, maximum_buffered_record_batches_per_stream: _, bloom_filter_on_read: _, // reads not used for writer props + schema_force_string_view: _, } = self; let mut builder = WriterProperties::builder() @@ -440,6 +441,7 @@ mod tests { maximum_buffered_record_batches_per_stream: defaults .maximum_buffered_record_batches_per_stream, bloom_filter_on_read: defaults.bloom_filter_on_read, + schema_force_string_view: defaults.schema_force_string_view, } } @@ -540,6 +542,8 @@ mod tests { maximum_buffered_record_batches_per_stream: global_options_defaults .maximum_buffered_record_batches_per_stream, bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, + schema_force_string_view: global_options_defaults + .schema_force_string_view, }, column_specific_options, key_value_metadata, diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 5e1324e80702..f57ec0152e3f 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -31,9 +31,9 @@ use arrow_buffer::IntervalMonthDayNano; #[cfg(not(feature = "force_hash_collisions"))] use crate::cast::{ - as_boolean_array, as_fixed_size_list_array, as_generic_binary_array, - as_large_list_array, as_list_array, as_map_array, as_primitive_array, - as_string_array, as_struct_array, + as_binary_view_array, as_boolean_array, as_fixed_size_list_array, + as_generic_binary_array, as_large_list_array, as_list_array, as_map_array, + as_primitive_array, as_string_array, as_string_view_array, as_struct_array, }; use crate::error::Result; #[cfg(not(feature = "force_hash_collisions"))] @@ -415,8 +415,10 @@ pub fn create_hashes<'a>( DataType::Null => hash_null(random_state, hashes_buffer, rehash), DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash), DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash), + DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash), DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash), DataType::Binary => hash_array(as_generic_binary_array::(array)?, random_state, hashes_buffer, rehash), + DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash), DataType::LargeBinary => hash_array(as_generic_binary_array::(array)?, random_state, hashes_buffer, rehash), DataType::FixedSizeBinary(_) => { let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap(); @@ -540,22 +542,57 @@ mod tests { Ok(()) } - #[test] - fn create_hashes_binary() -> Result<()> { - let byte_array = Arc::new(BinaryArray::from_vec(vec![ - &[4, 3, 2], - &[4, 3, 2], - &[1, 2, 3], - ])); + macro_rules! create_hash_binary { + ($NAME:ident, $ARRAY:ty) => { + #[cfg(not(feature = "force_hash_collisions"))] + #[test] + fn $NAME() { + let binary = [ + Some(b"short".to_byte_slice()), + None, + Some(b"long but different 12 bytes string"), + Some(b"short2"), + Some(b"Longer than 12 bytes string"), + Some(b"short"), + Some(b"Longer than 12 bytes string"), + ]; + + let binary_array = Arc::new(binary.iter().cloned().collect::<$ARRAY>()); + let ref_array = Arc::new(binary.iter().cloned().collect::()); + + let random_state = RandomState::with_seeds(0, 0, 0, 0); + + let mut binary_hashes = vec![0; binary.len()]; + create_hashes(&[binary_array], &random_state, &mut binary_hashes) + .unwrap(); + + let mut ref_hashes = vec![0; binary.len()]; + create_hashes(&[ref_array], &random_state, &mut ref_hashes).unwrap(); + + // Null values result in a zero hash, + for (val, hash) in binary.iter().zip(binary_hashes.iter()) { + match val { + Some(_) => assert_ne!(*hash, 0), + None => assert_eq!(*hash, 0), + } + } - let random_state = RandomState::with_seeds(0, 0, 0, 0); - let hashes_buff = &mut vec![0; byte_array.len()]; - let hashes = create_hashes(&[byte_array], &random_state, hashes_buff)?; - assert_eq!(hashes.len(), 3,); + // same logical values should hash to the same hash value + assert_eq!(binary_hashes, ref_hashes); - Ok(()) + // Same values should map to same hash values + assert_eq!(binary[0], binary[5]); + assert_eq!(binary[4], binary[6]); + + // different binary should map to different hash values + assert_ne!(binary[0], binary[2]); + } + }; } + create_hash_binary!(binary_array, BinaryArray); + create_hash_binary!(binary_view_array, BinaryViewArray); + #[test] fn create_hashes_fixed_size_binary() -> Result<()> { let input_arg = vec![vec![1, 2], vec![5, 6], vec![5, 6]]; @@ -571,6 +608,64 @@ mod tests { Ok(()) } + macro_rules! create_hash_string { + ($NAME:ident, $ARRAY:ty) => { + #[cfg(not(feature = "force_hash_collisions"))] + #[test] + fn $NAME() { + let strings = [ + Some("short"), + None, + Some("long but different 12 bytes string"), + Some("short2"), + Some("Longer than 12 bytes string"), + Some("short"), + Some("Longer than 12 bytes string"), + ]; + + let string_array = Arc::new(strings.iter().cloned().collect::<$ARRAY>()); + let dict_array = Arc::new( + strings + .iter() + .cloned() + .collect::>(), + ); + + let random_state = RandomState::with_seeds(0, 0, 0, 0); + + let mut string_hashes = vec![0; strings.len()]; + create_hashes(&[string_array], &random_state, &mut string_hashes) + .unwrap(); + + let mut dict_hashes = vec![0; strings.len()]; + create_hashes(&[dict_array], &random_state, &mut dict_hashes).unwrap(); + + // Null values result in a zero hash, + for (val, hash) in strings.iter().zip(string_hashes.iter()) { + match val { + Some(_) => assert_ne!(*hash, 0), + None => assert_eq!(*hash, 0), + } + } + + // same logical values should hash to the same hash value + assert_eq!(string_hashes, dict_hashes); + + // Same values should map to same hash values + assert_eq!(strings[0], strings[5]); + assert_eq!(strings[4], strings[6]); + + // different strings should map to different hash values + assert_ne!(strings[0], strings[2]); + } + }; + } + + create_hash_string!(string_array, StringArray); + create_hash_string!(large_string_array, LargeStringArray); + create_hash_string!(string_view_array, StringArray); + create_hash_string!(dict_string_array, DictionaryArray); + #[test] // Tests actual values of hashes, which are different if forcing collisions #[cfg(not(feature = "force_hash_collisions"))] diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 7154b50b9dd9..a324a4578424 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -42,6 +42,7 @@ use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::{ExecutionPlan, Statistics}; +use arrow_schema::{DataType, Field, Schema}; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{internal_err, not_impl_err, GetExt}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; @@ -204,6 +205,28 @@ pub fn file_type_to_format( } } +/// Transform a schema to use view types for Utf8 and Binary +pub fn transform_schema_to_view(schema: &Schema) -> Schema { + let transformed_fields: Vec> = schema + .fields + .iter() + .map(|field| match field.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new( + field.name(), + DataType::Utf8View, + field.is_nullable(), + )), + DataType::Binary | DataType::LargeBinary => Arc::new(Field::new( + field.name(), + DataType::BinaryView, + field.is_nullable(), + )), + _ => field.clone(), + }) + .collect(); + Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) +} + #[cfg(test)] pub(crate) mod test_util { use std::ops::Range; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index b50e9389ad9e..8a1cd2a147c7 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; -use super::{FileFormat, FileFormatFactory, FileScanConfig}; +use super::{transform_schema_to_view, FileFormat, FileFormatFactory, FileScanConfig}; use crate::arrow::array::RecordBatch; use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; @@ -316,6 +316,17 @@ impl FileFormat for ParquetFormat { Schema::try_merge(schemas) }?; + let schema = if state + .config_options() + .execution + .parquet + .schema_force_string_view + { + transform_schema_to_view(&schema) + } else { + schema + }; + Ok(Arc::new(schema)) } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3af4d41bcf03..72c6e0d84c04 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -410,7 +410,9 @@ impl ListingOptions { .try_collect() .await?; - self.format.infer_schema(state, &store, &files).await + let schema = self.format.infer_schema(state, &store, &files).await?; + + Ok(schema) } /// Infers the partition columns stored in `LOCATION` and compares diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index a5a7b50a008a..ed71d871b3fd 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -711,6 +711,10 @@ impl ExecutionPlan for ParquetExec { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), schema_adapter_factory, + schema_force_string_view: self + .table_parquet_options + .global + .schema_force_string_view, }; let stream = diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index ffe879eb8de0..4edc0ac525de 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -17,6 +17,7 @@ //! [`ParquetOpener`] for opening Parquet files +use crate::datasource::file_format::transform_schema_to_view; use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter; use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter; use crate::datasource::physical_plan::parquet::{ @@ -33,7 +34,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; use log::debug; -use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use std::sync::Arc; @@ -56,6 +57,7 @@ pub(super) struct ParquetOpener { pub enable_page_index: bool, pub enable_bloom_filter: bool, pub schema_adapter_factory: Arc, + pub schema_force_string_view: bool, } impl FileOpener for ParquetOpener { @@ -66,7 +68,7 @@ impl FileOpener for ParquetOpener { let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); - let reader: Box = + let mut reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, file_meta, @@ -90,12 +92,27 @@ impl FileOpener for ParquetOpener { ); let enable_bloom_filter = self.enable_bloom_filter; let limit = self.limit; + let schema_force_string_view = self.schema_force_string_view; Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); + + let metadata = + ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; + let mut schema = metadata.schema().clone(); + + if schema_force_string_view { + schema = Arc::new(transform_schema_to_view(&schema)); + } + + let options = ArrowReaderOptions::new() + .with_page_index(enable_page_index) + .with_schema(schema.clone()); + let metadata = + ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?; + let mut builder = - ParquetRecordBatchStreamBuilder::new_with_options(reader, options) - .await?; + ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata); let file_schema = builder.schema().clone(); diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs index ae5bdc88b115..a657f4df0e3d 100644 --- a/datafusion/expr/src/type_coercion/binary.rs +++ b/datafusion/expr/src/type_coercion/binary.rs @@ -527,7 +527,7 @@ fn string_numeric_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option Option { match (l, r) { - // Coerce Utf8/LargeUtf8 to Date32/Date64/Time32/Time64/Timestamp - (Utf8, temporal) | (LargeUtf8, temporal) => match temporal { - Date32 | Date64 => Some(temporal.clone()), - Time32(_) | Time64(_) => { - if is_time_with_valid_unit(temporal.to_owned()) { - Some(temporal.to_owned()) - } else { - None + // Coerce Utf8View/Utf8/LargeUtf8 to Date32/Date64/Time32/Time64/Timestamp + (Utf8, temporal) | (LargeUtf8, temporal) | (Utf8View, temporal) => { + match temporal { + Date32 | Date64 => Some(temporal.clone()), + Time32(_) | Time64(_) => { + if is_time_with_valid_unit(temporal.to_owned()) { + Some(temporal.to_owned()) + } else { + None + } } + Timestamp(_, tz) => Some(Timestamp(TimeUnit::Nanosecond, tz.clone())), + _ => None, } - Timestamp(_, tz) => Some(Timestamp(TimeUnit::Nanosecond, tz.clone())), - _ => None, - }, + } _ => None, } } diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 56850d0e02a1..69eac84f890d 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -16,6 +16,7 @@ // under the License. use ahash::RandomState; +use datafusion_physical_expr_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; use std::collections::HashSet; use std::ops::BitAnd; use std::{fmt::Debug, sync::Arc}; @@ -235,6 +236,9 @@ impl AggregateUDFImpl for Count { DataType::Utf8 => { Box::new(BytesDistinctCountAccumulator::::new(OutputType::Utf8)) } + DataType::Utf8View => { + Box::new(BytesViewDistinctCountAccumulator::new(OutputType::Utf8)) + } DataType::LargeUtf8 => { Box::new(BytesDistinctCountAccumulator::::new(OutputType::Utf8)) } diff --git a/datafusion/functions/src/datetime/date_part.rs b/datafusion/functions/src/datetime/date_part.rs index e1efb4811ec0..e24b11aeb71f 100644 --- a/datafusion/functions/src/datetime/date_part.rs +++ b/datafusion/functions/src/datetime/date_part.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, Float64Array}; use arrow::compute::{binary, cast, date_part, DatePart}; use arrow::datatypes::DataType::{ - Date32, Date64, Float64, Time32, Time64, Timestamp, Utf8, + Date32, Date64, Float64, Time32, Time64, Timestamp, Utf8, Utf8View, }; use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; use arrow::datatypes::{DataType, TimeUnit}; @@ -56,31 +56,57 @@ impl DatePartFunc { signature: Signature::one_of( vec![ Exact(vec![Utf8, Timestamp(Nanosecond, None)]), + Exact(vec![Utf8View, Timestamp(Nanosecond, None)]), Exact(vec![ Utf8, Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())), ]), + Exact(vec![ + Utf8View, + Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())), + ]), Exact(vec![Utf8, Timestamp(Millisecond, None)]), + Exact(vec![Utf8View, Timestamp(Millisecond, None)]), Exact(vec![ Utf8, Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())), ]), + Exact(vec![ + Utf8View, + Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())), + ]), Exact(vec![Utf8, Timestamp(Microsecond, None)]), + Exact(vec![Utf8View, Timestamp(Microsecond, None)]), Exact(vec![ Utf8, Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())), ]), + Exact(vec![ + Utf8View, + Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())), + ]), Exact(vec![Utf8, Timestamp(Second, None)]), + Exact(vec![Utf8View, Timestamp(Second, None)]), Exact(vec![ Utf8, Timestamp(Second, Some(TIMEZONE_WILDCARD.into())), ]), + Exact(vec![ + Utf8View, + Timestamp(Second, Some(TIMEZONE_WILDCARD.into())), + ]), Exact(vec![Utf8, Date64]), + Exact(vec![Utf8View, Date64]), Exact(vec![Utf8, Date32]), + Exact(vec![Utf8View, Date32]), Exact(vec![Utf8, Time32(Second)]), + Exact(vec![Utf8View, Time32(Second)]), Exact(vec![Utf8, Time32(Millisecond)]), + Exact(vec![Utf8View, Time32(Millisecond)]), Exact(vec![Utf8, Time64(Microsecond)]), + Exact(vec![Utf8View, Time64(Microsecond)]), Exact(vec![Utf8, Time64(Nanosecond)]), + Exact(vec![Utf8View, Time64(Nanosecond)]), ], Volatility::Immutable, ), @@ -114,6 +140,8 @@ impl ScalarUDFImpl for DatePartFunc { let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part { v + } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = part { + v } else { return exec_err!( "First argument of `DATE_PART` must be non-null scalar Utf8" diff --git a/datafusion/functions/src/datetime/date_trunc.rs b/datafusion/functions/src/datetime/date_trunc.rs index 6b52507a9c6f..308ea668d3d7 100644 --- a/datafusion/functions/src/datetime/date_trunc.rs +++ b/datafusion/functions/src/datetime/date_trunc.rs @@ -29,7 +29,7 @@ use arrow::array::types::{ TimestampNanosecondType, TimestampSecondType, }; use arrow::array::{Array, PrimitiveArray}; -use arrow::datatypes::DataType::{self, Null, Timestamp, Utf8}; +use arrow::datatypes::DataType::{self, Null, Timestamp, Utf8, Utf8View}; use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, Second}; use datafusion_common::cast::as_primitive_array; use datafusion_common::{exec_err, plan_err, DataFusionError, Result, ScalarValue}; @@ -61,25 +61,45 @@ impl DateTruncFunc { signature: Signature::one_of( vec![ Exact(vec![Utf8, Timestamp(Nanosecond, None)]), + Exact(vec![Utf8View, Timestamp(Nanosecond, None)]), Exact(vec![ Utf8, Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())), ]), + Exact(vec![ + Utf8View, + Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())), + ]), Exact(vec![Utf8, Timestamp(Microsecond, None)]), + Exact(vec![Utf8View, Timestamp(Microsecond, None)]), Exact(vec![ Utf8, Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())), ]), + Exact(vec![ + Utf8View, + Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())), + ]), Exact(vec![Utf8, Timestamp(Millisecond, None)]), + Exact(vec![Utf8View, Timestamp(Millisecond, None)]), Exact(vec![ Utf8, Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())), ]), + Exact(vec![ + Utf8View, + Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())), + ]), Exact(vec![Utf8, Timestamp(Second, None)]), + Exact(vec![Utf8View, Timestamp(Second, None)]), Exact(vec![ Utf8, Timestamp(Second, Some(TIMEZONE_WILDCARD.into())), ]), + Exact(vec![ + Utf8View, + Timestamp(Second, Some(TIMEZONE_WILDCARD.into())), + ]), ], Volatility::Immutable, ), @@ -119,6 +139,9 @@ impl ScalarUDFImpl for DateTruncFunc { let granularity = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = granularity + { + v.to_lowercase() + } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = granularity { v.to_lowercase() } else { diff --git a/datafusion/functions/src/datetime/make_date.rs b/datafusion/functions/src/datetime/make_date.rs index 6aa72572bc4d..ded7b454f9eb 100644 --- a/datafusion/functions/src/datetime/make_date.rs +++ b/datafusion/functions/src/datetime/make_date.rs @@ -23,7 +23,7 @@ use arrow::array::cast::AsArray; use arrow::array::types::{Date32Type, Int32Type}; use arrow::array::PrimitiveArray; use arrow::datatypes::DataType; -use arrow::datatypes::DataType::{Date32, Int32, Int64, UInt32, UInt64, Utf8}; +use arrow::datatypes::DataType::{Date32, Int32, Int64, UInt32, UInt64, Utf8, Utf8View}; use chrono::prelude::*; use datafusion_common::{exec_err, Result, ScalarValue}; @@ -45,7 +45,7 @@ impl MakeDateFunc { Self { signature: Signature::uniform( 3, - vec![Int32, Int64, UInt32, UInt64, Utf8], + vec![Int32, Int64, UInt32, UInt64, Utf8, Utf8View], Volatility::Immutable, ), } diff --git a/datafusion/functions/src/regex/regexpreplace.rs b/datafusion/functions/src/regex/regexpreplace.rs index d820f991be18..d28c6cd36d65 100644 --- a/datafusion/functions/src/regex/regexpreplace.rs +++ b/datafusion/functions/src/regex/regexpreplace.rs @@ -17,11 +17,14 @@ //! Regx expressions use arrow::array::new_null_array; +use arrow::array::ArrayAccessor; use arrow::array::ArrayDataBuilder; use arrow::array::BufferBuilder; use arrow::array::GenericStringArray; +use arrow::array::StringViewBuilder; use arrow::array::{Array, ArrayRef, OffsetSizeTrait}; use arrow::datatypes::DataType; +use datafusion_common::cast::as_string_view_array; use datafusion_common::exec_err; use datafusion_common::plan_err; use datafusion_common::ScalarValue; @@ -54,6 +57,7 @@ impl RegexpReplaceFunc { signature: Signature::one_of( vec![ Exact(vec![Utf8, Utf8, Utf8]), + Exact(vec![Utf8View, Utf8, Utf8]), Exact(vec![Utf8, Utf8, Utf8, Utf8]), ], Volatility::Immutable, @@ -80,6 +84,7 @@ impl ScalarUDFImpl for RegexpReplaceFunc { Ok(match &arg_types[0] { LargeUtf8 | LargeBinary => LargeUtf8, Utf8 | Binary => Utf8, + Utf8View | BinaryView => Utf8View, Null => Null, Dictionary(_, t) => match **t { LargeUtf8 | LargeBinary => LargeUtf8, @@ -118,15 +123,18 @@ impl ScalarUDFImpl for RegexpReplaceFunc { } } } + fn regexp_replace_func(args: &[ColumnarValue]) -> Result { match args[0].data_type() { DataType::Utf8 => specialize_regexp_replace::(args), DataType::LargeUtf8 => specialize_regexp_replace::(args), + DataType::Utf8View => specialize_regexp_replace::(args), other => { internal_err!("Unsupported data type {other:?} for function regexp_replace") } } } + /// replace POSIX capture groups (like \1) with Rust Regex group (like ${1}) /// used by regexp_replace fn regex_replace_posix_groups(replacement: &str) -> String { @@ -280,8 +288,8 @@ pub fn regexp_replace(args: &[ArrayRef]) -> Result } } -fn _regexp_replace_early_abort( - input_array: &GenericStringArray, +fn _regexp_replace_early_abort( + input_array: T, sz: usize, ) -> Result { // Mimicking the existing behavior of regexp_replace, if any of the scalar arguments @@ -290,13 +298,14 @@ fn _regexp_replace_early_abort( // Also acts like an early abort mechanism when the input array is empty. Ok(new_null_array(input_array.data_type(), sz)) } + /// Get the first argument from the given string array. /// /// Note: If the array is empty or the first argument is null, /// then calls the given early abort function. macro_rules! fetch_string_arg { ($ARG:expr, $NAME:expr, $T:ident, $EARLY_ABORT:ident, $ARRAY_SIZE:expr) => {{ - let array = as_generic_string_array::($ARG)?; + let array = as_generic_string_array::<$T>($ARG)?; if array.len() == 0 || array.is_null(0) { return $EARLY_ABORT(array, $ARRAY_SIZE); } else { @@ -313,25 +322,24 @@ macro_rules! fetch_string_arg { fn _regexp_replace_static_pattern_replace( args: &[ArrayRef], ) -> Result { - let string_array = as_generic_string_array::(&args[0])?; - let array_size = string_array.len(); + let array_size = args[0].len(); let pattern = fetch_string_arg!( &args[1], "pattern", - T, + i32, _regexp_replace_early_abort, array_size ); let replacement = fetch_string_arg!( &args[2], "replacement", - T, + i32, _regexp_replace_early_abort, array_size ); let flags = match args.len() { 3 => None, - 4 => Some(fetch_string_arg!(&args[3], "flags", T, _regexp_replace_early_abort, array_size)), + 4 => Some(fetch_string_arg!(&args[3], "flags", i32, _regexp_replace_early_abort, array_size)), other => { return exec_err!( "regexp_replace was called with {other} arguments. It requires at least 3 and at most 4." @@ -358,32 +366,61 @@ fn _regexp_replace_static_pattern_replace( // with rust ones. let replacement = regex_replace_posix_groups(replacement); - // We are going to create the underlying string buffer from its parts - // to be able to re-use the existing null buffer for sparse arrays. - let mut vals = BufferBuilder::::new({ - let offsets = string_array.value_offsets(); - (offsets[string_array.len()] - offsets[0]) - .to_usize() - .expect("Failed to convert usize") - }); - let mut new_offsets = BufferBuilder::::new(string_array.len() + 1); - new_offsets.append(T::zero()); - - string_array.iter().for_each(|val| { - if let Some(val) = val { - let result = re.replacen(val, limit, replacement.as_str()); - vals.append_slice(result.as_bytes()); + let string_array_type = args[0].data_type(); + match string_array_type { + DataType::Utf8 | DataType::LargeUtf8 => { + let string_array = as_generic_string_array::(&args[0])?; + + // We are going to create the underlying string buffer from its parts + // to be able to re-use the existing null buffer for sparse arrays. + let mut vals = BufferBuilder::::new({ + let offsets = string_array.value_offsets(); + (offsets[string_array.len()] - offsets[0]) + .to_usize() + .unwrap() + }); + let mut new_offsets = BufferBuilder::::new(string_array.len() + 1); + new_offsets.append(T::zero()); + + string_array.iter().for_each(|val| { + if let Some(val) = val { + let result = re.replacen(val, limit, replacement.as_str()); + vals.append_slice(result.as_bytes()); + } + new_offsets.append(T::from_usize(vals.len()).unwrap()); + }); + + let data = ArrayDataBuilder::new(GenericStringArray::::DATA_TYPE) + .len(string_array.len()) + .nulls(string_array.nulls().cloned()) + .buffers(vec![new_offsets.finish(), vals.finish()]) + .build()?; + let result_array = GenericStringArray::::from(data); + Ok(Arc::new(result_array) as ArrayRef) } - new_offsets.append(T::from_usize(vals.len()).unwrap()); - }); - - let data = ArrayDataBuilder::new(GenericStringArray::::DATA_TYPE) - .len(string_array.len()) - .nulls(string_array.nulls().cloned()) - .buffers(vec![new_offsets.finish(), vals.finish()]) - .build()?; - let result_array = GenericStringArray::::from(data); - Ok(Arc::new(result_array) as ArrayRef) + DataType::Utf8View => { + let string_view_array = as_string_view_array(&args[0])?; + + let mut builder = StringViewBuilder::with_capacity(string_view_array.len()) + .with_block_size(1024 * 1024 * 2); + + for val in string_view_array.iter() { + if let Some(val) = val { + let result = re.replacen(val, limit, replacement.as_str()); + builder.append_value(result); + } else { + builder.append_null(); + } + } + + let result = builder.finish(); + Ok(Arc::new(result) as ArrayRef) + } + _ => unreachable!( + "Invalid data type for regexp_replace: {}", + string_array_type + ), + } } /// Determine which implementation of the regexp_replace to use based @@ -469,43 +506,91 @@ mod tests { use super::*; - #[test] - fn test_static_pattern_regexp_replace() { - let values = StringArray::from(vec!["abc"; 5]); - let patterns = StringArray::from(vec!["b"; 5]); - let replacements = StringArray::from(vec!["foo"; 5]); - let expected = StringArray::from(vec!["afooc"; 5]); - - let re = _regexp_replace_static_pattern_replace::(&[ - Arc::new(values), - Arc::new(patterns), - Arc::new(replacements), - ]) - .unwrap(); - - assert_eq!(re.as_ref(), &expected); + macro_rules! static_pattern_regexp_replace { + ($name:ident, $T:ty, $O:ty) => { + #[test] + fn $name() { + let values = vec!["abc", "acd", "abcd1234567890123", "123456789012abc"]; + let patterns = vec!["b"; 4]; + let replacement = vec!["foo"; 4]; + let expected = + vec!["afooc", "acd", "afoocd1234567890123", "123456789012afooc"]; + + let values = <$T>::from(values); + let patterns = StringArray::from(patterns); + let replacements = StringArray::from(replacement); + let expected = <$T>::from(expected); + + let re = _regexp_replace_static_pattern_replace::<$O>(&[ + Arc::new(values), + Arc::new(patterns), + Arc::new(replacements), + ]) + .unwrap(); + + assert_eq!(re.as_ref(), &expected); + } + }; } - #[test] - fn test_static_pattern_regexp_replace_with_flags() { - let values = StringArray::from(vec!["abc", "ABC", "aBc", "AbC", "aBC"]); - let patterns = StringArray::from(vec!["b"; 5]); - let replacements = StringArray::from(vec!["foo"; 5]); - let flags = StringArray::from(vec!["i"; 5]); - let expected = - StringArray::from(vec!["afooc", "AfooC", "afooc", "AfooC", "afooC"]); - - let re = _regexp_replace_static_pattern_replace::(&[ - Arc::new(values), - Arc::new(patterns), - Arc::new(replacements), - Arc::new(flags), - ]) - .unwrap(); - - assert_eq!(re.as_ref(), &expected); + static_pattern_regexp_replace!(string_array, StringArray, i32); + static_pattern_regexp_replace!(string_view_array, StringViewArray, i32); + static_pattern_regexp_replace!(large_string_array, LargeStringArray, i64); + + macro_rules! static_pattern_regexp_replace_with_flags { + ($name:ident, $T:ty, $O: ty) => { + #[test] + fn $name() { + let values = vec![ + "abc", + "aBc", + "acd", + "abcd1234567890123", + "aBcd1234567890123", + "123456789012abc", + "123456789012aBc", + ]; + let expected = vec![ + "afooc", + "afooc", + "acd", + "afoocd1234567890123", + "afoocd1234567890123", + "123456789012afooc", + "123456789012afooc", + ]; + + let values = <$T>::from(values); + let patterns = StringArray::from(vec!["b"; 7]); + let replacements = StringArray::from(vec!["foo"; 7]); + let flags = StringArray::from(vec!["i"; 5]); + let expected = <$T>::from(expected); + + let re = _regexp_replace_static_pattern_replace::<$O>(&[ + Arc::new(values), + Arc::new(patterns), + Arc::new(replacements), + Arc::new(flags), + ]) + .unwrap(); + + assert_eq!(re.as_ref(), &expected); + } + }; } + static_pattern_regexp_replace_with_flags!(string_array_with_flags, StringArray, i32); + static_pattern_regexp_replace_with_flags!( + string_view_array_with_flags, + StringViewArray, + i32 + ); + static_pattern_regexp_replace_with_flags!( + large_string_array_with_flags, + LargeStringArray, + i64 + ); + #[test] fn test_static_pattern_regexp_replace_early_abort() { let values = StringArray::from(vec!["abc"; 5]); diff --git a/datafusion/functions/src/unicode/character_length.rs b/datafusion/functions/src/unicode/character_length.rs index 4f32f4c17776..cee1a57bc6d9 100644 --- a/datafusion/functions/src/unicode/character_length.rs +++ b/datafusion/functions/src/unicode/character_length.rs @@ -17,11 +17,10 @@ use crate::utils::{make_scalar_function, utf8_to_int_type}; use arrow::array::{ - ArrayRef, ArrowPrimitiveType, GenericStringArray, OffsetSizeTrait, PrimitiveArray, + Array, ArrayAccessor, ArrayIter, ArrayRef, ArrowPrimitiveType, AsArray, + OffsetSizeTrait, PrimitiveArray, }; use arrow::datatypes::{ArrowNativeType, DataType, Int32Type, Int64Type}; -use datafusion_common::cast::as_generic_string_array; -use datafusion_common::exec_err; use datafusion_common::Result; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; use std::any::Any; @@ -71,17 +70,7 @@ impl ScalarUDFImpl for CharacterLengthFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - match args[0].data_type() { - DataType::Utf8 => { - make_scalar_function(character_length::, vec![])(args) - } - DataType::LargeUtf8 => { - make_scalar_function(character_length::, vec![])(args) - } - other => { - exec_err!("Unsupported data type {other:?} for function character_length") - } - } + make_scalar_function(character_length, vec![])(args) } fn aliases(&self) -> &[String] { @@ -92,15 +81,32 @@ impl ScalarUDFImpl for CharacterLengthFunc { /// Returns number of characters in the string. /// character_length('josé') = 4 /// The implementation counts UTF-8 code points to count the number of characters -fn character_length(args: &[ArrayRef]) -> Result +fn character_length(args: &[ArrayRef]) -> Result { + match args[0].data_type() { + DataType::Utf8 => { + let string_array = args[0].as_string::(); + character_length_general::(string_array) + } + DataType::LargeUtf8 => { + let string_array = args[0].as_string::(); + character_length_general::(string_array) + } + DataType::Utf8View => { + let string_array = args[0].as_string_view(); + character_length_general::(string_array) + } + _ => unreachable!(), + } +} + +fn character_length_general<'a, T: ArrowPrimitiveType, V: ArrayAccessor>( + array: V, +) -> Result where T::Native: OffsetSizeTrait, { - let string_array: &GenericStringArray = - as_generic_string_array::(&args[0])?; - - let result = string_array - .iter() + let iter = ArrayIter::new(array); + let result = iter .map(|string| { string.map(|string: &str| { T::Native::from_usize(string.chars().count()) @@ -116,55 +122,54 @@ where mod tests { use crate::unicode::character_length::CharacterLengthFunc; use crate::utils::test::test_function; - use arrow::array::{Array, Int32Array}; - use arrow::datatypes::DataType::Int32; + use arrow::array::{Array, Int32Array, Int64Array}; + use arrow::datatypes::DataType::{Int32, Int64}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; + macro_rules! test_character_length { + ($INPUT:expr, $EXPECTED:expr) => { + test_function!( + CharacterLengthFunc::new(), + &[ColumnarValue::Scalar(ScalarValue::Utf8($INPUT))], + $EXPECTED, + i32, + Int32, + Int32Array + ); + + test_function!( + CharacterLengthFunc::new(), + &[ColumnarValue::Scalar(ScalarValue::LargeUtf8($INPUT))], + $EXPECTED, + i64, + Int64, + Int64Array + ); + + test_function!( + CharacterLengthFunc::new(), + &[ColumnarValue::Scalar(ScalarValue::Utf8View($INPUT))], + $EXPECTED, + i32, + Int32, + Int32Array + ); + }; + } + #[test] fn test_functions() -> Result<()> { #[cfg(feature = "unicode_expressions")] - test_function!( - CharacterLengthFunc::new(), - &[ColumnarValue::Scalar(ScalarValue::Utf8(Some( - String::from("chars") - )))], - Ok(Some(5)), - i32, - Int32, - Int32Array - ); - #[cfg(feature = "unicode_expressions")] - test_function!( - CharacterLengthFunc::new(), - &[ColumnarValue::Scalar(ScalarValue::Utf8(Some( - String::from("josé") - )))], - Ok(Some(4)), - i32, - Int32, - Int32Array - ); - #[cfg(feature = "unicode_expressions")] - test_function!( - CharacterLengthFunc::new(), - &[ColumnarValue::Scalar(ScalarValue::Utf8(Some( - String::from("") - )))], - Ok(Some(0)), - i32, - Int32, - Int32Array - ); - #[cfg(feature = "unicode_expressions")] - test_function!( - CharacterLengthFunc::new(), - &[ColumnarValue::Scalar(ScalarValue::Utf8(None))], - Ok(None), - i32, - Int32, - Int32Array - ); + { + test_character_length!(Some(String::from("chars")), Ok(Some(5))); + test_character_length!(Some(String::from("josé")), Ok(Some(4))); + // test long strings (more than 12 bytes for StringView) + test_character_length!(Some(String::from("joséjoséjoséjosé")), Ok(Some(16))); + test_character_length!(Some(String::from("")), Ok(Some(0))); + test_character_length!(None, Ok(None)); + } + #[cfg(not(feature = "unicode_expressions"))] test_function!( CharacterLengthFunc::new(), diff --git a/datafusion/functions/src/utils.rs b/datafusion/functions/src/utils.rs index 393dcc456a88..7b367174006d 100644 --- a/datafusion/functions/src/utils.rs +++ b/datafusion/functions/src/utils.rs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use arrow::array::ArrayRef; use arrow::datatypes::DataType; + use datafusion_common::{Result, ScalarValue}; use datafusion_expr::function::Hint; use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation}; -use std::sync::Arc; /// Creates a function to identify the optimal return type of a string function given /// the type of its first argument. @@ -29,6 +31,8 @@ use std::sync::Arc; /// `$largeUtf8Type`, /// /// If the input type is `Utf8` or `Binary` the return type is `$utf8Type`, +/// +/// If the input type is `Utf8View` the return type is $utf8Type, macro_rules! get_optimal_return_type { ($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => { pub(crate) fn $FUNC(arg_type: &DataType, name: &str) -> Result { @@ -37,6 +41,8 @@ macro_rules! get_optimal_return_type { DataType::LargeUtf8 | DataType::LargeBinary => $largeUtf8Type, // Binary inputs are automatically coerced to Utf8 DataType::Utf8 | DataType::Binary => $utf8Type, + // Utf8View max offset size is u32::MAX, the same as UTF8 + DataType::Utf8View | DataType::BinaryView => $utf8Type, DataType::Null => DataType::Null, DataType::Dictionary(_, value_type) => match **value_type { DataType::LargeUtf8 | DataType::LargeBinary => $largeUtf8Type, @@ -177,6 +183,21 @@ pub mod test { }; } + use arrow::datatypes::DataType; #[allow(unused_imports)] pub(crate) use test_function; + + use super::*; + + #[test] + fn string_to_int_type() { + let v = utf8_to_int_type(&DataType::Utf8, "test").unwrap(); + assert_eq!(v, DataType::Int32); + + let v = utf8_to_int_type(&DataType::Utf8View, "test").unwrap(); + assert_eq!(v, DataType::Int32); + + let v = utf8_to_int_type(&DataType::LargeUtf8, "test").unwrap(); + assert_eq!(v, DataType::Int64); + } } diff --git a/datafusion/physical-expr-common/src/aggregate/count_distinct/bytes.rs b/datafusion/physical-expr-common/src/aggregate/count_distinct/bytes.rs index 27094b0c819a..360d64ce0141 100644 --- a/datafusion/physical-expr-common/src/aggregate/count_distinct/bytes.rs +++ b/datafusion/physical-expr-common/src/aggregate/count_distinct/bytes.rs @@ -18,6 +18,7 @@ //! [`BytesDistinctCountAccumulator`] for Utf8/LargeUtf8/Binary/LargeBinary values use crate::binary_map::{ArrowBytesSet, OutputType}; +use crate::binary_view_map::ArrowBytesViewSet; use arrow::array::{ArrayRef, OffsetSizeTrait}; use datafusion_common::cast::as_list_array; use datafusion_common::utils::array_into_list_array_nullable; @@ -88,3 +89,63 @@ impl Accumulator for BytesDistinctCountAccumulator { std::mem::size_of_val(self) + self.0.size() } } + +/// Specialized implementation of +/// `COUNT DISTINCT` for [`StringViewArray`] and [`BinaryViewArray`]. +/// +/// [`StringViewArray`]: arrow::array::StringViewArray +/// [`BinaryViewArray`]: arrow::array::BinaryViewArray +#[derive(Debug)] +pub struct BytesViewDistinctCountAccumulator(ArrowBytesViewSet); + +impl BytesViewDistinctCountAccumulator { + pub fn new(output_type: OutputType) -> Self { + Self(ArrowBytesViewSet::new(output_type)) + } +} + +impl Accumulator for BytesViewDistinctCountAccumulator { + fn state(&mut self) -> datafusion_common::Result> { + let set = self.0.take(); + let arr = set.into_state(); + let list = Arc::new(array_into_list_array_nullable(arr)); + Ok(vec![ScalarValue::List(list)]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + self.0.insert(&values[0]); + + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + assert_eq!( + states.len(), + 1, + "count_distinct states must be single array" + ); + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + self.0.insert(&list); + }; + Ok(()) + }) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64))) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + self.0.size() + } +} diff --git a/datafusion/physical-expr-common/src/aggregate/count_distinct/mod.rs b/datafusion/physical-expr-common/src/aggregate/count_distinct/mod.rs index f216406d0dd7..7d772f7c649d 100644 --- a/datafusion/physical-expr-common/src/aggregate/count_distinct/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/count_distinct/mod.rs @@ -19,5 +19,6 @@ mod bytes; mod native; pub use bytes::BytesDistinctCountAccumulator; +pub use bytes::BytesViewDistinctCountAccumulator; pub use native::FloatDistinctCountAccumulator; pub use native::PrimitiveDistinctCountAccumulator; diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index a5da05d2a535..edf608a2054f 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -40,8 +40,12 @@ use std::sync::Arc; pub enum OutputType { /// `StringArray` or `LargeStringArray` Utf8, + /// `StringViewArray` + Utf8View, /// `BinaryArray` or `LargeBinaryArray` Binary, + /// `BinaryViewArray` + BinaryView, } /// HashSet optimized for storing string or binary values that can produce that @@ -318,6 +322,7 @@ where observe_payload_fn, ) } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), }; } @@ -516,6 +521,7 @@ where GenericStringArray::new_unchecked(offsets, values, nulls) }) } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), } } diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs new file mode 100644 index 000000000000..18bc6801aa60 --- /dev/null +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -0,0 +1,690 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ArrowBytesViewMap`] and [`ArrowBytesViewSet`] for storing maps/sets of values from +//! `StringViewArray`/`BinaryViewArray`. +//! Much of the code is from `binary_map.rs`, but with simpler implementation because we directly use the +//! [`GenericByteViewBuilder`]. +use ahash::RandomState; +use arrow::array::cast::AsArray; +use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder}; +use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType}; +use datafusion_common::hash_utils::create_hashes; +use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt}; +use std::fmt::Debug; +use std::sync::Arc; + +use crate::binary_map::OutputType; + +/// HashSet optimized for storing string or binary values that can produce that +/// the final set as a `GenericBinaryViewArray` with minimal copies. +#[derive(Debug)] +pub struct ArrowBytesViewSet(ArrowBytesViewMap<()>); + +impl ArrowBytesViewSet { + pub fn new(output_type: OutputType) -> Self { + Self(ArrowBytesViewMap::new(output_type)) + } + + /// Inserts each value from `values` into the set + pub fn insert(&mut self, values: &ArrayRef) { + fn make_payload_fn(_value: Option<&[u8]>) {} + fn observe_payload_fn(_payload: ()) {} + self.0 + .insert_if_new(values, make_payload_fn, observe_payload_fn); + } + + /// Return the contents of this map and replace it with a new empty map with + /// the same output type + pub fn take(&mut self) -> Self { + let mut new_self = Self::new(self.0.output_type); + std::mem::swap(self, &mut new_self); + new_self + } + + /// Converts this set into a `StringViewArray` or `BinaryViewArray` + /// containing each distinct value that was interned. + /// This is done without copying the values. + pub fn into_state(self) -> ArrayRef { + self.0.into_state() + } + + /// Returns the total number of distinct values (including nulls) seen so far + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// returns the total number of distinct values (not including nulls) seen so far + pub fn non_null_len(&self) -> usize { + self.0.non_null_len() + } + + /// Return the total size, in bytes, of memory used to store the data in + /// this set, not including `self` + pub fn size(&self) -> usize { + self.0.size() + } +} + +/// Optimized map for storing Arrow "byte view" types (`StringView`, `BinaryView`) +/// values that can produce the set of keys on +/// output as `GenericBinaryViewArray` without copies. +/// +/// Equivalent to `HashSet` but with better performance for arrow +/// data. +/// +/// # Generic Arguments +/// +/// * `V`: payload type +/// +/// # Description +/// +/// This is a specialized HashMap with the following properties: +/// +/// 1. Optimized for storing and emitting Arrow byte types (e.g. +/// `StringViewArray` / `BinaryViewArray`) very efficiently by minimizing copying of +/// the string values themselves, both when inserting and when emitting the +/// final array. +/// +/// 2. Retains the insertion order of entries in the final array. The values are +/// in the same order as they were inserted. +/// +/// Note this structure can be used as a `HashSet` by specifying the value type +/// as `()`, as is done by [`ArrowBytesViewSet`]. +/// +/// This map is used by the special `COUNT DISTINCT` aggregate function to +/// store the distinct values, and by the `GROUP BY` operator to store +/// group values when they are a single string array. + +pub struct ArrowBytesViewMap +where + V: Debug + PartialEq + Eq + Clone + Copy + Default, +{ + /// Should the output be StringView or BinaryView? + output_type: OutputType, + /// Underlying hash set for each distinct value + map: hashbrown::raw::RawTable>, + /// Total size of the map in bytes + map_size: usize, + + /// Builder for output array + builder: GenericByteViewBuilder, + /// random state used to generate hashes + random_state: RandomState, + /// buffer that stores hash values (reused across batches to save allocations) + hashes_buffer: Vec, + /// `(payload, null_index)` for the 'null' value, if any + /// NOTE null_index is the logical index in the final array, not the index + /// in the buffer + null: Option<(V, usize)>, +} + +/// The size, in number of entries, of the initial hash table +const INITIAL_MAP_CAPACITY: usize = 512; + +impl ArrowBytesViewMap +where + V: Debug + PartialEq + Eq + Clone + Copy + Default, +{ + pub fn new(output_type: OutputType) -> Self { + Self { + output_type, + map: hashbrown::raw::RawTable::with_capacity(INITIAL_MAP_CAPACITY), + map_size: 0, + builder: GenericByteViewBuilder::new().with_block_size(2 * 1024 * 1024), + random_state: RandomState::new(), + hashes_buffer: vec![], + null: None, + } + } + + /// Return the contents of this map and replace it with a new empty map with + /// the same output type + pub fn take(&mut self) -> Self { + let mut new_self = Self::new(self.output_type); + std::mem::swap(self, &mut new_self); + new_self + } + + /// Inserts each value from `values` into the map, invoking `payload_fn` for + /// each value if *not* already present, deferring the allocation of the + /// payload until it is needed. + /// + /// Note that this is different than a normal map that would replace the + /// existing entry + /// + /// # Arguments: + /// + /// `values`: array whose values are inserted + /// + /// `make_payload_fn`: invoked for each value that is not already present + /// to create the payload, in order of the values in `values` + /// + /// `observe_payload_fn`: invoked once, for each value in `values`, that was + /// already present in the map, with corresponding payload value. + /// + /// # Returns + /// + /// The payload value for the entry, either the existing value or + /// the newly inserted value + /// + /// # Safety: + /// + /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked + /// with valid values from `values`, not for the `NULL` value. + pub fn insert_if_new( + &mut self, + values: &ArrayRef, + make_payload_fn: MP, + observe_payload_fn: OP, + ) where + MP: FnMut(Option<&[u8]>) -> V, + OP: FnMut(V), + { + // Sanity check array type + match self.output_type { + OutputType::BinaryView => { + assert!(matches!(values.data_type(), DataType::BinaryView)); + self.insert_if_new_inner::( + values, + make_payload_fn, + observe_payload_fn, + ) + } + OutputType::Utf8View => { + assert!(matches!(values.data_type(), DataType::Utf8View)); + self.insert_if_new_inner::( + values, + make_payload_fn, + observe_payload_fn, + ) + } + _ => unreachable!("Utf8/Binary should use `ArrowBytesSet`"), + }; + } + + /// Generic version of [`Self::insert_if_new`] that handles `ByteViewType` + /// (both StringView and BinaryView) + /// + /// Note this is the only function that is generic on [`ByteViewType`], which + /// avoids having to template the entire structure, making the code + /// simpler and understand and reducing code bloat due to duplication. + /// + /// See comments on `insert_if_new` for more details + fn insert_if_new_inner( + &mut self, + values: &ArrayRef, + mut make_payload_fn: MP, + mut observe_payload_fn: OP, + ) where + MP: FnMut(Option<&[u8]>) -> V, + OP: FnMut(V), + B: ByteViewType, + { + // step 1: compute hashes + let batch_hashes = &mut self.hashes_buffer; + batch_hashes.clear(); + batch_hashes.resize(values.len(), 0); + create_hashes(&[values.clone()], &self.random_state, batch_hashes) + // hash is supported for all types and create_hashes only + // returns errors for unsupported types + .unwrap(); + + // step 2: insert each value into the set, if not already present + let values = values.as_byte_view::(); + + // Ensure lengths are equivalent + assert_eq!(values.len(), batch_hashes.len()); + + for (value, &hash) in values.iter().zip(batch_hashes.iter()) { + // handle null value + let Some(value) = value else { + let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { + payload + } else { + let payload = make_payload_fn(None); + let null_index = self.builder.len(); + self.builder.append_null(); + self.null = Some((payload, null_index)); + payload + }; + observe_payload_fn(payload); + continue; + }; + + // get the value as bytes + let value: &[u8] = value.as_ref(); + + let entry = self.map.get_mut(hash, |header| { + let v = self.builder.get_value(header.view_idx); + + if v.len() != value.len() { + return false; + } + + v == value + }); + + let payload = if let Some(entry) = entry { + entry.payload + } else { + // no existing value, make a new one. + let payload = make_payload_fn(Some(value)); + + let inner_view_idx = self.builder.len(); + let new_header = Entry { + view_idx: inner_view_idx, + hash, + payload, + }; + + self.builder.append_value(value); + + self.map + .insert_accounted(new_header, |h| h.hash, &mut self.map_size); + payload + }; + observe_payload_fn(payload); + } + } + + /// Converts this set into a `StringViewArray`, or `BinaryViewArray`, + /// containing each distinct value + /// that was inserted. This is done without copying the values. + /// + /// The values are guaranteed to be returned in the same order in which + /// they were first seen. + pub fn into_state(self) -> ArrayRef { + let mut builder = self.builder; + match self.output_type { + OutputType::BinaryView => { + let array = builder.finish(); + + Arc::new(array) + } + OutputType::Utf8View => { + // SAFETY: + // we asserted the input arrays were all the correct type and + // thus since all the values that went in were valid (e.g. utf8) + // so are all the values that come out + let array = builder.finish(); + let array = unsafe { array.to_string_view_unchecked() }; + Arc::new(array) + } + _ => { + unreachable!("Utf8/Binary should use `ArrowBytesMap`") + } + } + } + + /// Total number of entries (including null, if present) + pub fn len(&self) -> usize { + self.non_null_len() + self.null.map(|_| 1).unwrap_or(0) + } + + /// Is the set empty? + pub fn is_empty(&self) -> bool { + self.map.is_empty() && self.null.is_none() + } + + /// Number of non null entries + pub fn non_null_len(&self) -> usize { + self.map.len() + } + + /// Return the total size, in bytes, of memory used to store the data in + /// this set, not including `self` + pub fn size(&self) -> usize { + self.map_size + + self.builder.allocated_size() + + self.hashes_buffer.allocated_size() + } +} + +impl Debug for ArrowBytesViewMap +where + V: Debug + PartialEq + Eq + Clone + Copy + Default, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArrowBytesMap") + .field("map", &"") + .field("map_size", &self.map_size) + .field("view_builder", &self.builder) + .field("random_state", &self.random_state) + .field("hashes_buffer", &self.hashes_buffer) + .finish() + } +} + +/// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +struct Entry +where + V: Debug + PartialEq + Eq + Clone + Copy + Default, +{ + /// The idx into the views array + view_idx: usize, + + hash: u64, + + /// value stored by the entry + payload: V, +} + +#[cfg(test)] +mod tests { + use arrow::array::{BinaryViewArray, GenericByteViewArray, StringViewArray}; + use hashbrown::HashMap; + + use super::*; + + // asserts that the set contains the expected strings, in the same order + fn assert_set(set: ArrowBytesViewSet, expected: &[Option<&str>]) { + let strings = set.into_state(); + let strings = strings.as_string_view(); + let state = strings.into_iter().collect::>(); + assert_eq!(state, expected); + } + + #[test] + fn string_view_set_empty() { + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let array: ArrayRef = Arc::new(StringViewArray::new_null(0)); + set.insert(&array); + assert_eq!(set.len(), 0); + assert_eq!(set.non_null_len(), 0); + assert_set(set, &[]); + } + + #[test] + fn string_view_set_one_null() { + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let array: ArrayRef = Arc::new(StringViewArray::new_null(1)); + set.insert(&array); + assert_eq!(set.len(), 1); + assert_eq!(set.non_null_len(), 0); + assert_set(set, &[None]); + } + + #[test] + fn string_view_set_many_null() { + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let array: ArrayRef = Arc::new(StringViewArray::new_null(11)); + set.insert(&array); + assert_eq!(set.len(), 1); + assert_eq!(set.non_null_len(), 0); + assert_set(set, &[None]); + } + + #[test] + fn test_string_view_set_basic() { + // basic test for mixed small and large string values + let values = GenericByteViewArray::from(vec![ + Some("a"), + Some("b"), + Some("CXCCCCCCCCAABB"), // 14 bytes + Some(""), + Some("cbcxx"), // 5 bytes + None, + Some("AAAAAAAA"), // 8 bytes + Some("BBBBBQBBBAAA"), // 12 bytes + Some("a"), + Some("cbcxx"), + Some("b"), + Some("cbcxx"), + Some(""), + None, + Some("BBBBBQBBBAAA"), + Some("BBBBBQBBBAAA"), + Some("AAAAAAAA"), + Some("CXCCCCCCCCAABB"), + ]); + + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let array: ArrayRef = Arc::new(values); + set.insert(&array); + // values mut appear be in the order they were inserted + assert_set( + set, + &[ + Some("a"), + Some("b"), + Some("CXCCCCCCCCAABB"), + Some(""), + Some("cbcxx"), + None, + Some("AAAAAAAA"), + Some("BBBBBQBBBAAA"), + ], + ); + } + + #[test] + fn test_string_set_non_utf8() { + // basic test for mixed small and large string values + let values = GenericByteViewArray::from(vec![ + Some("a"), + Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"), + Some("🔥"), + Some("✨✨✨"), + Some("foobarbaz"), + Some("🔥"), + Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"), + ]); + + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let array: ArrayRef = Arc::new(values); + set.insert(&array); + // strings mut appear be in the order they were inserted + assert_set( + set, + &[ + Some("a"), + Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"), + Some("🔥"), + Some("✨✨✨"), + Some("foobarbaz"), + ], + ); + } + + // Test use of binary output type + #[test] + fn test_binary_set() { + let v: Vec> = vec![ + Some(b"a"), + Some(b"CXCCCCCCCCCCCCC"), + None, + Some(b"CXCCCCCCCCCCCCC"), + ]; + let values: ArrayRef = Arc::new(BinaryViewArray::from(v)); + + let expected: Vec> = + vec![Some(b"a"), Some(b"CXCCCCCCCCCCCCC"), None]; + let expected: ArrayRef = Arc::new(GenericByteViewArray::from(expected)); + + let mut set = ArrowBytesViewSet::new(OutputType::BinaryView); + set.insert(&values); + assert_eq!(&set.into_state(), &expected); + } + + // inserting strings into the set does not increase reported memory + #[test] + fn test_string_set_memory_usage() { + let strings1 = StringViewArray::from(vec![ + Some("a"), + Some("b"), + Some("CXCCCCCCCCCCC"), // 13 bytes + Some("AAAAAAAA"), // 8 bytes + Some("BBBBBQBBB"), // 9 bytes + ]); + let total_strings1_len = strings1 + .iter() + .map(|s| s.map(|s| s.len()).unwrap_or(0)) + .sum::(); + let values1: ArrayRef = Arc::new(StringViewArray::from(strings1)); + + // Much larger strings in strings2 + let strings2 = StringViewArray::from(vec![ + "FOO".repeat(1000), + "BAR larger than 12 bytes.".repeat(100_000), + "more unique.".repeat(1000), + "more unique2.".repeat(1000), + "FOO".repeat(3000), + ]); + let total_strings2_len = strings2 + .iter() + .map(|s| s.map(|s| s.len()).unwrap_or(0)) + .sum::(); + let values2: ArrayRef = Arc::new(StringViewArray::from(strings2)); + + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let size_empty = set.size(); + + set.insert(&values1); + let size_after_values1 = set.size(); + assert!(size_empty < size_after_values1); + assert!( + size_after_values1 > total_strings1_len, + "expect {size_after_values1} to be more than {total_strings1_len}" + ); + assert!(size_after_values1 < total_strings1_len + total_strings2_len); + + // inserting the same strings should not affect the size + set.insert(&values1); + assert_eq!(set.size(), size_after_values1); + assert_eq!(set.len(), 5); + + // inserting the large strings should increase the reported size + set.insert(&values2); + let size_after_values2 = set.size(); + assert!(size_after_values2 > size_after_values1); + + assert_eq!(set.len(), 10); + } + + #[derive(Debug, PartialEq, Eq, Default, Clone, Copy)] + struct TestPayload { + // store the string value to check against input + index: usize, // store the index of the string (each new string gets the next sequential input) + } + + /// Wraps an [`ArrowBytesViewMap`], validating its invariants + struct TestMap { + map: ArrowBytesViewMap, + // stores distinct strings seen, in order + strings: Vec>, + // map strings to index in strings + indexes: HashMap, usize>, + } + + impl TestMap { + /// creates a map with TestPayloads for the given strings and then + /// validates the payloads + fn new() -> Self { + Self { + map: ArrowBytesViewMap::new(OutputType::Utf8View), + strings: vec![], + indexes: HashMap::new(), + } + } + + /// Inserts strings into the map + fn insert(&mut self, strings: &[Option<&str>]) { + let string_array = StringViewArray::from(strings.to_vec()); + let arr: ArrayRef = Arc::new(string_array); + + let mut next_index = self.indexes.len(); + let mut actual_new_strings = vec![]; + let mut actual_seen_indexes = vec![]; + // update self with new values, keeping track of newly added values + for str in strings { + let str = str.map(|s| s.to_string()); + let index = self.indexes.get(&str).cloned().unwrap_or_else(|| { + actual_new_strings.push(str.clone()); + let index = self.strings.len(); + self.strings.push(str.clone()); + self.indexes.insert(str, index); + index + }); + actual_seen_indexes.push(index); + } + + // insert the values into the map, recording what we did + let mut seen_new_strings = vec![]; + let mut seen_indexes = vec![]; + self.map.insert_if_new( + &arr, + |s| { + let value = s + .map(|s| String::from_utf8(s.to_vec()).expect("Non utf8 string")); + let index = next_index; + next_index += 1; + seen_new_strings.push(value); + TestPayload { index } + }, + |payload| { + seen_indexes.push(payload.index); + }, + ); + + assert_eq!(actual_seen_indexes, seen_indexes); + assert_eq!(actual_new_strings, seen_new_strings); + } + + /// Call `self.map.into_array()` validating that the strings are in the same + /// order as they were inserted + fn into_array(self) -> ArrayRef { + let Self { + map, + strings, + indexes: _, + } = self; + + let arr = map.into_state(); + let expected: ArrayRef = Arc::new(StringViewArray::from(strings)); + assert_eq!(&arr, &expected); + arr + } + } + + #[test] + fn test_map() { + let input = vec![ + // Note mix of short/long strings + Some("A"), + Some("bcdefghijklmnop1234567"), + Some("X"), + Some("Y"), + None, + Some("qrstuvqxyzhjwya"), + Some("✨🔥"), + Some("🔥"), + Some("🔥🔥🔥🔥🔥🔥"), + ]; + + let mut test_map = TestMap::new(); + test_map.insert(&input); + test_map.insert(&input); // put it in twice + let expected_output: ArrayRef = Arc::new(StringViewArray::from(input)); + assert_eq!(&test_map.into_array(), &expected_output); + } +} diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs index 8d50e0b964e5..f03eedd4cf65 100644 --- a/datafusion/physical-expr-common/src/lib.rs +++ b/datafusion/physical-expr-common/src/lib.rs @@ -17,6 +17,7 @@ pub mod aggregate; pub mod binary_map; +pub mod binary_view_map; pub mod datum; pub mod expressions; pub mod physical_expr; diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 9987e97b38d3..f9362db30196 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -45,6 +45,7 @@ use arrow_array::types::{ Decimal128Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; +use arrow_array::{BinaryViewArray, StringViewArray}; use datafusion_common::internal_err; use datafusion_common::ScalarValue; use datafusion_common::{downcast_value, DataFusionError, Result}; @@ -453,6 +454,14 @@ fn min_batch(values: &ArrayRef) -> Result { DataType::LargeUtf8 => { typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string) } + DataType::Utf8View => { + typed_min_max_batch_string!( + values, + StringViewArray, + Utf8View, + min_string_view + ) + } DataType::Boolean => { typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean) } @@ -467,6 +476,14 @@ fn min_batch(values: &ArrayRef) -> Result { min_binary ) } + DataType::BinaryView => { + typed_min_max_batch_binary!( + &values, + BinaryViewArray, + BinaryView, + min_binary_view + ) + } _ => min_max_batch!(values, min), }) } @@ -480,12 +497,28 @@ fn max_batch(values: &ArrayRef) -> Result { DataType::LargeUtf8 => { typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string) } + DataType::Utf8View => { + typed_min_max_batch_string!( + values, + StringViewArray, + Utf8View, + max_string_view + ) + } DataType::Boolean => { typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean) } DataType::Binary => { typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary) } + DataType::BinaryView => { + typed_min_max_batch_binary!( + &values, + BinaryViewArray, + BinaryView, + max_binary_view + ) + } DataType::LargeBinary => { typed_min_max_batch_binary!( &values, @@ -629,12 +662,18 @@ macro_rules! min_max { (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => { typed_min_max_string!(lhs, rhs, LargeUtf8, $OP) } + (ScalarValue::Utf8View(lhs), ScalarValue::Utf8View(rhs)) => { + typed_min_max_string!(lhs, rhs, Utf8View, $OP) + } (ScalarValue::Binary(lhs), ScalarValue::Binary(rhs)) => { typed_min_max_string!(lhs, rhs, Binary, $OP) } (ScalarValue::LargeBinary(lhs), ScalarValue::LargeBinary(rhs)) => { typed_min_max_string!(lhs, rhs, LargeBinary, $OP) } + (ScalarValue::BinaryView(lhs), ScalarValue::BinaryView(rhs)) => { + typed_min_max_string!(lhs, rhs, BinaryView, $OP) + } (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => { typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz) } diff --git a/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs new file mode 100644 index 000000000000..1a0cb90a16d4 --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::aggregates::group_values::GroupValues; +use arrow_array::{Array, ArrayRef, RecordBatch}; +use datafusion_expr::EmitTo; +use datafusion_physical_expr::binary_map::OutputType; +use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap; + +/// A [`GroupValues`] storing single column of Utf8View/BinaryView values +/// +/// This specialization is significantly faster than using the more general +/// purpose `Row`s format +pub struct GroupValuesBytesView { + /// Map string/binary values to group index + map: ArrowBytesViewMap, + /// The total number of groups so far (used to assign group_index) + num_groups: usize, +} + +impl GroupValuesBytesView { + pub fn new(output_type: OutputType) -> Self { + Self { + map: ArrowBytesViewMap::new(output_type), + num_groups: 0, + } + } +} + +impl GroupValues for GroupValuesBytesView { + fn intern( + &mut self, + cols: &[ArrayRef], + groups: &mut Vec, + ) -> datafusion_common::Result<()> { + assert_eq!(cols.len(), 1); + + // look up / add entries in the table + let arr = &cols[0]; + + groups.clear(); + self.map.insert_if_new( + arr, + // called for each new group + |_value| { + // assign new group index on each insert + let group_idx = self.num_groups; + self.num_groups += 1; + group_idx + }, + // called for each group + |group_idx| { + groups.push(group_idx); + }, + ); + + // ensure we assigned a group to for each row + assert_eq!(groups.len(), arr.len()); + Ok(()) + } + + fn size(&self) -> usize { + self.map.size() + std::mem::size_of::() + } + + fn is_empty(&self) -> bool { + self.num_groups == 0 + } + + fn len(&self) -> usize { + self.num_groups + } + + fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result> { + // Reset the map to default, and convert it into a single array + let map_contents = self.map.take().into_state(); + + let group_values = match emit_to { + EmitTo::All => { + self.num_groups -= map_contents.len(); + map_contents + } + EmitTo::First(n) if n == self.len() => { + self.num_groups -= map_contents.len(); + map_contents + } + EmitTo::First(n) => { + // if we only wanted to take the first n, insert the rest back + // into the map we could potentially avoid this reallocation, at + // the expense of much more complex code. + // see https://github.com/apache/datafusion/issues/9195 + let emit_group_values = map_contents.slice(0, n); + let remaining_group_values = + map_contents.slice(n, map_contents.len() - n); + + self.num_groups = 0; + let mut group_indexes = vec![]; + self.intern(&[remaining_group_values], &mut group_indexes)?; + + // Verify that the group indexes were assigned in the correct order + assert_eq!(0, group_indexes[0]); + + emit_group_values + } + }; + + Ok(vec![group_values]) + } + + fn clear_shrink(&mut self, _batch: &RecordBatch) { + // in theory we could potentially avoid this reallocation and clear the + // contents of the maps, but for now we just reset the map from the beginning + self.map.take(); + } +} diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index b5bc923b467d..be7ac934d7bc 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -18,6 +18,7 @@ use arrow::record_batch::RecordBatch; use arrow_array::{downcast_primitive, ArrayRef}; use arrow_schema::{DataType, SchemaRef}; +use bytes_view::GroupValuesBytesView; use datafusion_common::Result; pub(crate) mod primitive; @@ -28,6 +29,7 @@ mod row; use row::GroupValuesRows; mod bytes; +mod bytes_view; use bytes::GroupValuesByes; use datafusion_physical_expr::binary_map::OutputType; @@ -67,17 +69,26 @@ pub fn new_group_values(schema: SchemaRef) -> Result> { _ => {} } - if let DataType::Utf8 = d { - return Ok(Box::new(GroupValuesByes::::new(OutputType::Utf8))); - } - if let DataType::LargeUtf8 = d { - return Ok(Box::new(GroupValuesByes::::new(OutputType::Utf8))); - } - if let DataType::Binary = d { - return Ok(Box::new(GroupValuesByes::::new(OutputType::Binary))); - } - if let DataType::LargeBinary = d { - return Ok(Box::new(GroupValuesByes::::new(OutputType::Binary))); + match d { + DataType::Utf8 => { + return Ok(Box::new(GroupValuesByes::::new(OutputType::Utf8))); + } + DataType::LargeUtf8 => { + return Ok(Box::new(GroupValuesByes::::new(OutputType::Utf8))); + } + DataType::Utf8View => { + return Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View))); + } + DataType::Binary => { + return Ok(Box::new(GroupValuesByes::::new(OutputType::Binary))); + } + DataType::LargeBinary => { + return Ok(Box::new(GroupValuesByes::::new(OutputType::Binary))); + } + DataType::BinaryView => { + return Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView))); + } + _ => {} } } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 061c849971b2..038727daa7d8 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -23,19 +23,22 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; -use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; -use crate::{ - DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, -}; - +use arrow::array::{AsArray, StringViewBuilder}; use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use arrow_array::{Array, ArrayRef}; +use futures::stream::{Stream, StreamExt}; + use datafusion_common::Result; use datafusion_execution::TaskContext; -use futures::stream::{Stream, StreamExt}; +use crate::{ + DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, +}; + +use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; /// `CoalesceBatchesExec` combines small batches into larger batches for more /// efficient use of vectorized processing by later operators. The operator @@ -249,6 +252,8 @@ impl CoalesceBatchesStream { match input_batch { Poll::Ready(x) => match x { Some(Ok(batch)) => { + let batch = gc_string_view_batch(&batch); + // Handle fetch limit: if let Some(fetch) = self.fetch { if self.total_rows + batch.num_rows() >= fetch { @@ -324,13 +329,84 @@ impl RecordBatchStream for CoalesceBatchesStream { } } +/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed +/// +/// This function decides when to consolidate the StringView into a new buffer +/// to reduce memory usage and improve string locality for better performance. +/// +/// This differs from `StringViewArray::gc` because: +/// 1. It may not compact the array depending on a heuristic. +/// 2. It uses a precise block size to reduce the number of buffers to track. +/// +/// # Heuristic +/// +/// If the average size of each view is larger than 32 bytes, we compact the array. +/// +/// `StringViewArray` include pointers to buffer that hold the underlying data. +/// One of the great benefits of `StringViewArray` is that many operations +/// (e.g., `filter`) can be done without copying the underlying data. +/// +/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the +/// `StringViewArray` may only refer to a small portion of the buffer, +/// significantly increasing memory usage. +fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { + let new_columns: Vec = batch + .columns() + .iter() + .map(|c| { + // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. + let Some(s) = c.as_string_view_opt() else { + return Arc::clone(c); + }; + let ideal_buffer_size: usize = s + .views() + .iter() + .map(|v| { + let len = (*v as u32) as usize; + if len > 12 { + len + } else { + 0 + } + }) + .sum(); + let actual_buffer_size = s.get_buffer_memory_size(); + + // Re-creating the array copies data and can be time consuming. + // We only do it if the array is sparse + if actual_buffer_size > (ideal_buffer_size * 2) { + // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. + // See https://github.com/apache/arrow-rs/issues/6094 for more details. + let mut builder = StringViewBuilder::with_capacity(s.len()) + .with_block_size(ideal_buffer_size as u32); + + for v in s.iter() { + builder.append_option(v); + } + + let gc_string = builder.finish(); + + debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0 + + Arc::new(gc_string) + } else { + Arc::clone(c) + } + }) + .collect(); + RecordBatch::try_new(batch.schema(), new_columns) + .expect("Failed to re-create the gc'ed record batch") +} + #[cfg(test)] mod tests { - use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow_array::builder::ArrayBuilder; + use arrow_array::{StringViewArray, UInt32Array}; + use crate::{memory::MemoryExec, repartition::RepartitionExec, Partitioning}; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow_array::UInt32Array; + use super::*; #[tokio::test(flavor = "multi_thread")] async fn test_concat_batches() -> Result<()> { @@ -485,4 +561,99 @@ mod tests { ) .unwrap() } + + #[test] + fn test_gc_string_view_batch_small_no_compact() { + // view with only short strings (no buffers) --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("a"), Some("b"), Some("c")], + } + .build(); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 0); + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_gc_string_view_batch_large_no_compact() { + // view with large strings (has buffers) but full --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("This string is longer than 12 bytes")], + } + .build(); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 5); + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_gc_string_view_batch_large_slice_compact() { + // view with large strings (has buffers) and only partially used --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("this string is longer than 12 bytes")], + } + .build(); + + // slice only 11 rows, so most of the buffer is not used + let array = array.slice(11, 22); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 5); + assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer + } + + /// Compares the values of two string view arrays + fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) { + assert_eq!(arr1.len(), arr2.len()); + for (s1, s2) in arr1.iter().zip(arr2.iter()) { + assert_eq!(s1, s2); + } + } + + /// runs garbage collection on string view array + /// and ensures the number of rows are the same + fn do_gc(array: StringViewArray) -> StringViewArray { + let batch = + RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap(); + let gc_batch = gc_string_view_batch(&batch); + assert_eq!(batch.num_rows(), gc_batch.num_rows()); + assert_eq!(batch.schema(), gc_batch.schema()); + gc_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .clone() + } + + /// Describes parameters for creating a `StringViewArray` + struct StringViewTest { + /// The number of rows in the array + rows: usize, + /// The strings to use in the array (repeated over and over + strings: Vec>, + } + + impl StringViewTest { + /// Create a `StringViewArray` with the parameters specified in this struct + fn build(self) -> StringViewArray { + let mut builder = StringViewBuilder::with_capacity(100); + loop { + for &v in self.strings.iter() { + builder.append_option(v); + if builder.len() >= self.rows { + return builder.finish(); + } + } + } + } + } } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 752f2cf76873..c59aaa2d42bb 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -491,6 +491,7 @@ message ParquetOptions { uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2 bool bloom_filter_on_read = 26; // default = true bool bloom_filter_on_write = 27; // default = false + bool schema_force_string_view = 28; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 21db66a12701..45703d8b9fed 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -956,7 +956,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { allow_single_file_parallelism: value.allow_single_file_parallelism, maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize, maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, - + schema_force_string_view: value.schema_force_string_view, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 4ac6517ed739..23dd5746929d 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4871,6 +4871,9 @@ impl serde::Serialize for ParquetOptions { if self.bloom_filter_on_write { len += 1; } + if self.schema_force_string_view { + len += 1; + } if self.dictionary_page_size_limit != 0 { len += 1; } @@ -4954,6 +4957,9 @@ impl serde::Serialize for ParquetOptions { if self.bloom_filter_on_write { struct_ser.serialize_field("bloomFilterOnWrite", &self.bloom_filter_on_write)?; } + if self.schema_force_string_view { + struct_ser.serialize_field("schemaForceStringView", &self.schema_force_string_view)?; + } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] struct_ser.serialize_field("dictionaryPageSizeLimit", ToString::to_string(&self.dictionary_page_size_limit).as_str())?; @@ -5071,6 +5077,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterOnRead", "bloom_filter_on_write", "bloomFilterOnWrite", + "schema_force_string_view", + "schemaForceStringView", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5112,6 +5120,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { MaximumBufferedRecordBatchesPerStream, BloomFilterOnRead, BloomFilterOnWrite, + SchemaForceStringView, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5159,6 +5168,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maximumBufferedRecordBatchesPerStream" | "maximum_buffered_record_batches_per_stream" => Ok(GeneratedField::MaximumBufferedRecordBatchesPerStream), "bloomFilterOnRead" | "bloom_filter_on_read" => Ok(GeneratedField::BloomFilterOnRead), "bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite), + "schemaForceStringView" | "schema_force_string_view" => Ok(GeneratedField::SchemaForceStringView), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5204,6 +5214,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut maximum_buffered_record_batches_per_stream__ = None; let mut bloom_filter_on_read__ = None; let mut bloom_filter_on_write__ = None; + let mut schema_force_string_view__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5305,6 +5316,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } bloom_filter_on_write__ = Some(map_.next_value()?); } + GeneratedField::SchemaForceStringView => { + if schema_force_string_view__.is_some() { + return Err(serde::de::Error::duplicate_field("schemaForceStringView")); + } + schema_force_string_view__ = Some(map_.next_value()?); + } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -5405,6 +5422,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { maximum_buffered_record_batches_per_stream: maximum_buffered_record_batches_per_stream__.unwrap_or_default(), bloom_filter_on_read: bloom_filter_on_read__.unwrap_or_default(), bloom_filter_on_write: bloom_filter_on_write__.unwrap_or_default(), + schema_force_string_view: schema_force_string_view__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index bf198a24c811..9bea9be89e1d 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -805,6 +805,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "27")] pub bloom_filter_on_write: bool, + /// default = false + #[prost(bool, tag = "28")] + pub schema_force_string_view: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 24083e8b7276..a61a026089fc 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -827,6 +827,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { allow_single_file_parallelism: value.allow_single_file_parallelism, maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as u64, maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64, + schema_force_string_view: value.schema_force_string_view, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index b36624e391c2..f48b05e8d3dc 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -809,6 +809,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "27")] pub bloom_filter_on_write: bool, + /// default = false + #[prost(bool, tag = "28")] + pub schema_force_string_view: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index 520b6b53b32d..66ffeadf8cec 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -233,6 +233,11 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { DataType::Utf8 => { Ok(varchar_to_str(get_row_value!(array::StringArray, col, row))) } + DataType::Utf8View => Ok(varchar_to_str(get_row_value!( + array::StringViewArray, + col, + row + ))), _ => { let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS); Ok(f.unwrap().value(row).to_string()) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e85159fd137a..fef7bfe82174 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -201,6 +201,7 @@ datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false +datafusion.execution.parquet.schema_force_string_view false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.write_batch_size 1024 @@ -287,6 +288,7 @@ datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query +datafusion.execution.parquet.schema_force_string_view false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes diff --git a/datafusion/sqllogictest/test_files/string_view.slt b/datafusion/sqllogictest/test_files/string_view.slt index 3ba4e271c2f6..3f9a4793f655 100644 --- a/datafusion/sqllogictest/test_files/string_view.slt +++ b/datafusion/sqllogictest/test_files/string_view.slt @@ -324,3 +324,24 @@ logical_plan statement ok drop table test; + +# coercion from stringview to integer, as input to make_date +query D +select make_date(arrow_cast('2024', 'Utf8View'), arrow_cast('01', 'Utf8View'), arrow_cast('23', 'Utf8View')) +---- +2024-01-23 + +# coercions between stringview and date types +statement ok +create table dates (dt date) as values + (date '2024-01-23'), + (date '2023-11-30'); + +query D +select t.dt from dates t where arrow_cast('2024-01-01', 'Utf8View') < t.dt; +---- +2024-01-23 + + +statement ok +drop table dates; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 5814d88c7dd8..78d0d7b0239f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -76,6 +76,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.parquet.schema_force_string_view | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |