diff --git a/Cargo.lock b/Cargo.lock index 4e2827580ea52..e3e32d7b1b4f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,9 +225,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4df8bb5b0bd64c0b9bc61317fcc480bad0f00e56d3bc32c69a4c8dada4786bae" +checksum = "cb372a7cbcac02a35d3fb7b3fc1f969ec078e871f9bb899bf00a2e1809bec8a3" dependencies = [ "arrow-arith", "arrow-array", @@ -248,9 +248,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a640186d3bd30a24cb42264c2dafb30e236a6f50d510e56d40b708c9582491" +checksum = "0f377dcd19e440174596d83deb49cd724886d91060c07fec4f67014ef9d54049" dependencies = [ "arrow-array", "arrow-buffer", @@ -262,9 +262,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219fe420e6800979744c8393b687afb0252b3f8a89b91027d27887b72aa36d31" +checksum = "a23eaff85a44e9fa914660fb0d0bb00b79c4a3d888b5334adb3ea4330c84f002" dependencies = [ "ahash 0.8.12", "arrow-buffer", @@ -281,9 +281,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76885a2697a7edf6b59577f568b456afc94ce0e2edc15b784ce3685b6c3c5c27" +checksum = "a2819d893750cb3380ab31ebdc8c68874dd4429f90fd09180f3c93538bd21626" dependencies = [ "bytes", "half", @@ -293,13 +293,14 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9ebb4c987e6b3b236fb4a14b20b34835abfdd80acead3ccf1f9bf399e1f168" +checksum = "e3d131abb183f80c450d4591dc784f8d7750c50c6e2bc3fcaad148afc8361271" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", + "arrow-ord", "arrow-schema", "arrow-select", "atoi", @@ -314,9 +315,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92386159c8d4bce96f8bd396b0642a0d544d471bdc2ef34d631aec80db40a09c" +checksum = "2275877a0e5e7e7c76954669366c2aa1a829e340ab1f612e647507860906fb6b" dependencies = [ "arrow-array", "arrow-cast", @@ -329,9 +330,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727681b95de313b600eddc2a37e736dcb21980a40f640314dcf360e2f36bc89b" +checksum = "05738f3d42cb922b9096f7786f606fcb8669260c2640df8490533bb2fa38c9d3" dependencies = [ "arrow-buffer", "arrow-schema", @@ -342,9 +343,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f70bb56412a007b0cfc116d15f24dda6adeed9611a213852a004cda20085a3b9" +checksum = "8b5f57c3d39d1b1b7c1376a772ea86a131e7da310aed54ebea9363124bb885e3" dependencies = [ "arrow-arith", "arrow-array", @@ -370,9 +371,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9ba92e3de170295c98a84e5af22e2b037f0c7b32449445e6c493b5fca27f27" +checksum = "3d09446e8076c4b3f235603d9ea7c5494e73d441b01cd61fb33d7254c11964b3" dependencies = [ "arrow-array", "arrow-buffer", @@ -386,9 +387,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b969b4a421ae83828591c6bf5450bd52e6d489584142845ad6a861f42fe35df8" +checksum = "371ffd66fa77f71d7628c63f209c9ca5341081051aa32f9c8020feb0def787c0" dependencies = [ "arrow-array", "arrow-buffer", @@ -410,9 +411,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "141c05298b21d03e88062317a1f1a73f5ba7b6eb041b350015b1cd6aabc0519b" +checksum = "cbc94fc7adec5d1ba9e8cd1b1e8d6f72423b33fe978bf1f46d970fafab787521" dependencies = [ "arrow-array", "arrow-buffer", @@ -423,9 +424,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f3c06a6abad6164508ed283c7a02151515cef3de4b4ff2cebbcaeb85533db2" +checksum = "169676f317157dc079cc5def6354d16db63d8861d61046d2f3883268ced6f99f" dependencies = [ "arrow-array", "arrow-buffer", @@ -436,9 +437,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cfa7a03d1eee2a4d061476e1840ad5c9867a544ca6c4c59256496af5d0a8be5" +checksum = "d27609cd7dd45f006abae27995c2729ef6f4b9361cde1ddd019dc31a5aa017e0" dependencies = [ "bitflags 2.9.4", "serde", @@ -448,9 +449,9 @@ dependencies = [ [[package]] name = "arrow-select" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bafa595babaad59f2455f4957d0f26448fb472722c186739f4fac0823a1bdb47" +checksum = "ae980d021879ea119dd6e2a13912d81e64abed372d53163e804dfe84639d8010" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -462,9 +463,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32f46457dbbb99f2650ff3ac23e46a929e0ab81db809b02aa5511c258348bef2" +checksum = "cf35e8ef49dcf0c5f6d175edee6b8af7b45611805333129c541a8b89a0fc0534" dependencies = [ "arrow-array", "arrow-buffer", @@ -2802,7 +2803,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.61.0", + "windows-sys 0.60.2", ] [[package]] @@ -2946,7 +2947,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.0", + "windows-sys 0.52.0", ] [[package]] @@ -2992,7 +2993,7 @@ checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" dependencies = [ "cfg-if", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -3586,7 +3587,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.62.0", + "windows-core", ] [[package]] @@ -4069,9 +4070,9 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" [[package]] name = "lz4_flex" -version = "0.11.5" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" +checksum = "ab6473172471198271ff72e9379150e9dfd70d8e533e0752a27e515b48dd375e" dependencies = [ "twox-hash", ] @@ -4418,9 +4419,9 @@ dependencies = [ [[package]] name = "parquet" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a0f31027ef1af7549f7cec603a9a21dce706d3f8d7c2060a68f43c1773be95a" +checksum = "be3e4f6d320dd92bfa7d612e265d7d08bba0a240bab86af3425e1d255a511d89" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -4814,7 +4815,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck 0.5.0", - "itertools 0.14.0", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -4834,7 +4835,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.108", @@ -5374,7 +5375,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.0", + "windows-sys 0.52.0", ] [[package]] @@ -5910,7 +5911,7 @@ dependencies = [ "cfg-if", "libc", "psm", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -6128,7 +6129,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.61.0", + "windows-sys 0.52.0", ] [[package]] @@ -7025,7 +7026,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.0", + "windows-sys 0.52.0", ] [[package]] @@ -7041,7 +7042,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" dependencies = [ "windows-collections", - "windows-core 0.61.2", + "windows-core", "windows-future", "windows-link 0.1.3", "windows-numerics", @@ -7053,7 +7054,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" dependencies = [ - "windows-core 0.61.2", + "windows-core", ] [[package]] @@ -7065,21 +7066,8 @@ dependencies = [ "windows-implement", "windows-interface", "windows-link 0.1.3", - "windows-result 0.3.4", - "windows-strings 0.4.2", -] - -[[package]] -name = "windows-core" -version = "0.62.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57fe7168f7de578d2d8a05b07fd61870d2e73b4020e9f49aa00da8471723497c" -dependencies = [ - "windows-implement", - "windows-interface", - "windows-link 0.2.0", - "windows-result 0.4.0", - "windows-strings 0.5.0", + "windows-result", + "windows-strings", ] [[package]] @@ -7088,7 +7076,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" dependencies = [ - "windows-core 0.61.2", + "windows-core", "windows-link 0.1.3", "windows-threading", ] @@ -7133,7 +7121,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" dependencies = [ - "windows-core 0.61.2", + "windows-core", "windows-link 0.1.3", ] @@ -7146,15 +7134,6 @@ dependencies = [ "windows-link 0.1.3", ] -[[package]] -name = "windows-result" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7084dcc306f89883455a206237404d3eaf961e5bd7e0f312f7c91f57eb44167f" -dependencies = [ - "windows-link 0.2.0", -] - [[package]] name = "windows-strings" version = "0.4.2" @@ -7164,15 +7143,6 @@ dependencies = [ "windows-link 0.1.3", ] -[[package]] -name = "windows-strings" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7218c655a553b0bed4426cf54b20d7ba363ef543b52d515b3e48d7fd55318dda" -dependencies = [ - "windows-link 0.2.0", -] - [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index bcc665e763549..64b2c790a1afc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,19 +91,19 @@ ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } apache-avro = { version = "0.21", default-features = false } -arrow = { version = "57.0.0", features = [ +arrow = { version = "57.1.0", features = [ "prettyprint", "chrono-tz", ] } -arrow-buffer = { version = "57.0.0", default-features = false } -arrow-flight = { version = "57.0.0", features = [ +arrow-buffer = { version = "57.1.0", default-features = false } +arrow-flight = { version = "57.1.0", features = [ "flight-sql-experimental", ] } -arrow-ipc = { version = "57.0.0", default-features = false, features = [ +arrow-ipc = { version = "57.1.0", default-features = false, features = [ "lz4", ] } -arrow-ord = { version = "57.0.0", default-features = false } -arrow-schema = { version = "57.0.0", default-features = false } +arrow-ord = { version = "57.1.0", default-features = false } +arrow-schema = { version = "57.1.0", default-features = false } async-trait = "0.1.89" bigdecimal = "0.4.8" bytes = "1.11" @@ -166,7 +166,7 @@ log = "^0.4" num-traits = { version = "0.2" } object_store = { version = "0.12.4", default-features = false } parking_lot = "0.12" -parquet = { version = "57.0.0", default-features = false, features = [ +parquet = { version = "57.1.0", default-features = false, features = [ "arrow", "async", "object_store", diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 09fa8ef15af84..de666fced7e65 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -592,9 +592,9 @@ mod tests { +-----------------------------------+-----------------+---------------------+------+------------------+ | filename | file_size_bytes | metadata_size_bytes | hits | extra | +-----------------------------------+-----------------+---------------------+------+------------------+ - | alltypes_plain.parquet | 1851 | 6957 | 2 | page_index=false | - | alltypes_tiny_pages.parquet | 454233 | 267014 | 2 | page_index=true | - | lz4_raw_compressed_larger.parquet | 380836 | 996 | 2 | page_index=false | + | alltypes_plain.parquet | 1851 | 8882 | 2 | page_index=false | + | alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true | + | lz4_raw_compressed_larger.parquet | 380836 | 1347 | 2 | page_index=false | +-----------------------------------+-----------------+---------------------+------+------------------+ "); @@ -623,9 +623,9 @@ mod tests { +-----------------------------------+-----------------+---------------------+------+------------------+ | filename | file_size_bytes | metadata_size_bytes | hits | extra | +-----------------------------------+-----------------+---------------------+------+------------------+ - | alltypes_plain.parquet | 1851 | 6957 | 5 | page_index=false | - | alltypes_tiny_pages.parquet | 454233 | 267014 | 2 | page_index=true | - | lz4_raw_compressed_larger.parquet | 380836 | 996 | 3 | page_index=false | + | alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false | + | alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true | + | lz4_raw_compressed_larger.parquet | 380836 | 1347 | 3 | page_index=false | +-----------------------------------+-----------------+---------------------+------+------------------+ "); diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9976087d1b4bc..0f4a804ac75bb 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -694,6 +694,12 @@ config_namespace! { /// the filters are applied in the same order as written in the query pub reorder_filters: bool, default = false + /// (reading) Force the use of RowSelections for filter results, when + /// pushdown_filters is enabled. If false, the reader will automatically + /// choose between a RowSelection and a Bitmap based on the number and + /// pattern of selected rows. + pub force_filter_selections: bool, default = false + /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, /// and `Binary/BinaryLarge` with `BinaryView`. pub schema_force_view_types: bool, default = true diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 131041c92070a..8aa0134d09ec8 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -199,6 +199,7 @@ impl ParquetOptions { metadata_size_hint: _, pushdown_filters: _, reorder_filters: _, + force_filter_selections: _, // not used for writer props allow_single_file_parallelism: _, maximum_parallel_row_group_writers: _, maximum_buffered_record_batches_per_stream: _, @@ -461,6 +462,7 @@ mod tests { metadata_size_hint: defaults.metadata_size_hint, pushdown_filters: defaults.pushdown_filters, reorder_filters: defaults.reorder_filters, + force_filter_selections: defaults.force_filter_selections, allow_single_file_parallelism: defaults.allow_single_file_parallelism, maximum_parallel_row_group_writers: defaults .maximum_parallel_row_group_writers, @@ -572,6 +574,7 @@ mod tests { metadata_size_hint: global_options_defaults.metadata_size_hint, pushdown_filters: global_options_defaults.pushdown_filters, reorder_filters: global_options_defaults.reorder_filters, + force_filter_selections: global_options_defaults.force_filter_selections, allow_single_file_parallelism: global_options_defaults .allow_single_file_parallelism, maximum_parallel_row_group_writers: global_options_defaults diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6f0b201185284..aea530d67f024 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -3109,7 +3109,7 @@ mod tests { assert_contains!( &e, - r#"Error during planning: Can not find compatible types to compare Boolean with [Struct("foo": Boolean), Utf8]"# + r#"Error during planning: Can not find compatible types to compare Boolean with [Struct("foo": non-null Boolean), Utf8]"# ); Ok(()) diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs b/datafusion/core/tests/dataframe/dataframe_functions.rs index 265862ff9af8a..56cdd78d7051c 100644 --- a/datafusion/core/tests/dataframe/dataframe_functions.rs +++ b/datafusion/core/tests/dataframe/dataframe_functions.rs @@ -313,10 +313,10 @@ async fn test_fn_arrow_typeof() -> Result<()> { +----------------------+ | arrow_typeof(test.l) | +----------------------+ - | List(nullable Int32) | - | List(nullable Int32) | - | List(nullable Int32) | - | List(nullable Int32) | + | List(Int32) | + | List(Int32) | + | List(Int32) | + | List(Int32) | +----------------------+ "); diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index 966f251613979..1d64669fadd97 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -636,6 +636,27 @@ async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> { config.options_mut().execution.parquet.pushdown_filters = true; let ctx = SessionContext::new_with_config(config); // The cache is on by default, and used when filter pushdown is enabled + PredicateCacheTest { + expected_inner_records: 8, + expected_records: 7, // reads more than necessary from the cache as then another bitmap is applied + } + .run(&ctx) + .await +} + +#[tokio::test] +async fn predicate_cache_pushdown_default_selections_only( +) -> datafusion_common::Result<()> { + let mut config = SessionConfig::new(); + config.options_mut().execution.parquet.pushdown_filters = true; + // forcing filter selections minimizes the number of rows read from the cache + config + .options_mut() + .execution + .parquet + .force_filter_selections = true; + let ctx = SessionContext::new_with_config(config); + // The cache is on by default, and used when filter pushdown is enabled PredicateCacheTest { expected_inner_records: 8, expected_records: 4, diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 84899137e50aa..5a51451461edf 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -222,10 +222,10 @@ async fn test_parameter_invalid_types() -> Result<()> { .await; assert_snapshot!(results.unwrap_err().strip_backtrace(), @r" - type_coercion - caused by - Error during planning: Cannot infer common argument type for comparison operation List(nullable Int32) = Int32 - "); + type_coercion + caused by + Error during planning: Cannot infer common argument type for comparison operation List(Int32) = Int32 + "); Ok(()) } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 3c905d950a962..83235dafdaf86 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -53,7 +53,9 @@ use futures::{ready, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; -use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, +}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; @@ -87,6 +89,8 @@ pub(super) struct ParquetOpener { pub pushdown_filters: bool, /// Should the filters be reordered to optimize the scan? pub reorder_filters: bool, + /// Should we force the reader to use RowSelections for filtering + pub force_filter_selections: bool, /// Should the page index be read from parquet files, if present, to skip /// data pages pub enable_page_index: bool, @@ -147,6 +151,7 @@ impl FileOpener for ParquetOpener { let partition_fields = self.partition_fields.clone(); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; + let force_filter_selections = self.force_filter_selections; let coerce_int96 = self.coerce_int96; let enable_bloom_filter = self.enable_bloom_filter; let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; @@ -347,6 +352,10 @@ impl FileOpener for ParquetOpener { } }; }; + if force_filter_selections { + builder = + builder.with_row_selection_policy(RowSelectionPolicy::Selectors); + } // Determine which row groups to actually read. The idea is to skip // as many row groups as possible based on the metadata and query @@ -887,6 +896,7 @@ mod test { partition_fields: vec![], pushdown_filters: false, // note that this is false! reorder_filters: false, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), @@ -960,6 +970,7 @@ mod test { ))], pushdown_filters: false, // note that this is false! reorder_filters: false, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), @@ -1049,6 +1060,7 @@ mod test { ))], pushdown_filters: false, // note that this is false! reorder_filters: false, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), @@ -1141,6 +1153,7 @@ mod test { ))], pushdown_filters: true, // note that this is true! reorder_filters: true, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), @@ -1233,6 +1246,7 @@ mod test { ))], pushdown_filters: false, // note that this is false! reorder_filters: false, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), @@ -1383,6 +1397,7 @@ mod test { partition_fields: vec![], pushdown_filters: true, reorder_filters: false, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory), diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 5ed74ecfd98f4..d84ddf5993791 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -410,6 +410,11 @@ impl ParquetSource { self.table_parquet_options.global.reorder_filters } + /// Return the value of [`datafusion_common::config::ParquetOptions::force_filter_selections`] + fn force_filter_selections(&self) -> bool { + self.table_parquet_options.global.force_filter_selections + } + /// If enabled, the reader will read the page index /// This is used to optimize filter pushdown /// via `RowSelector` and `RowFilter` by @@ -595,6 +600,7 @@ impl FileSource for ParquetSource { parquet_file_reader_factory, pushdown_filters: self.pushdown_filters(), reorder_filters: self.reorder_filters(), + force_filter_selections: self.force_filter_selections(), enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 4fb0f8553b4ba..a557d3356dba0 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -2465,7 +2465,7 @@ mod test { assert_analyzed_plan_eq!( plan, @r#" - Projection: a = CAST(CAST(a AS Map("key_value": Struct("key": Utf8, "value": nullable Float64), unsorted)) AS Map("entries": Struct("key": Utf8, "value": nullable Float64), unsorted)) + Projection: a = CAST(CAST(a AS Map("key_value": non-null Struct("key": non-null Utf8, "value": Float64), unsorted)) AS Map("entries": non-null Struct("key": non-null Utf8, "value": Float64), unsorted)) EmptyRelation: rows=0 "# ) diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 267953556b166..15c82e948c955 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -519,6 +519,7 @@ message ParquetOptions { bool skip_metadata = 3; // default = true bool pushdown_filters = 5; // default = false bool reorder_filters = 6; // default = false + bool force_filter_selections = 34; // default = false uint64 data_pagesize_limit = 7; // default = 1024 * 1024 uint64 write_batch_size = 8; // default = 1024 string writer_version = 9; // default = "1.0" diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 1a38ed4363d6f..dd6ed284c1b5e 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -943,6 +943,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { .unwrap_or(None), pushdown_filters: value.pushdown_filters, reorder_filters: value.reorder_filters, + force_filter_selections: value.force_filter_selections, data_pagesize_limit: value.data_pagesize_limit as usize, write_batch_size: value.write_batch_size as usize, writer_version: value.writer_version.clone(), diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e63f345459b8f..66659ad14cbbd 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5557,6 +5557,9 @@ impl serde::Serialize for ParquetOptions { if self.reorder_filters { len += 1; } + if self.force_filter_selections { + len += 1; + } if self.data_pagesize_limit != 0 { len += 1; } @@ -5651,6 +5654,9 @@ impl serde::Serialize for ParquetOptions { if self.reorder_filters { struct_ser.serialize_field("reorderFilters", &self.reorder_filters)?; } + if self.force_filter_selections { + struct_ser.serialize_field("forceFilterSelections", &self.force_filter_selections)?; + } if self.data_pagesize_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5816,6 +5822,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "pushdownFilters", "reorder_filters", "reorderFilters", + "force_filter_selections", + "forceFilterSelections", "data_pagesize_limit", "dataPagesizeLimit", "write_batch_size", @@ -5875,6 +5883,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { SkipMetadata, PushdownFilters, ReorderFilters, + ForceFilterSelections, DataPagesizeLimit, WriteBatchSize, WriterVersion, @@ -5927,6 +5936,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "skipMetadata" | "skip_metadata" => Ok(GeneratedField::SkipMetadata), "pushdownFilters" | "pushdown_filters" => Ok(GeneratedField::PushdownFilters), "reorderFilters" | "reorder_filters" => Ok(GeneratedField::ReorderFilters), + "forceFilterSelections" | "force_filter_selections" => Ok(GeneratedField::ForceFilterSelections), "dataPagesizeLimit" | "data_pagesize_limit" => Ok(GeneratedField::DataPagesizeLimit), "writeBatchSize" | "write_batch_size" => Ok(GeneratedField::WriteBatchSize), "writerVersion" | "writer_version" => Ok(GeneratedField::WriterVersion), @@ -5977,6 +5987,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut skip_metadata__ = None; let mut pushdown_filters__ = None; let mut reorder_filters__ = None; + let mut force_filter_selections__ = None; let mut data_pagesize_limit__ = None; let mut write_batch_size__ = None; let mut writer_version__ = None; @@ -6035,6 +6046,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } reorder_filters__ = Some(map_.next_value()?); } + GeneratedField::ForceFilterSelections => { + if force_filter_selections__.is_some() { + return Err(serde::de::Error::duplicate_field("forceFilterSelections")); + } + force_filter_selections__ = Some(map_.next_value()?); + } GeneratedField::DataPagesizeLimit => { if data_pagesize_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dataPagesizeLimit")); @@ -6213,6 +6230,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { skip_metadata: skip_metadata__.unwrap_or_default(), pushdown_filters: pushdown_filters__.unwrap_or_default(), reorder_filters: reorder_filters__.unwrap_or_default(), + force_filter_selections: force_filter_selections__.unwrap_or_default(), data_pagesize_limit: data_pagesize_limit__.unwrap_or_default(), write_batch_size: write_batch_size__.unwrap_or_default(), writer_version: writer_version__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index aa7c3d51a9d6d..eaeed5276b241 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -763,6 +763,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "6")] pub reorder_filters: bool, + /// default = false + #[prost(bool, tag = "34")] + pub force_filter_selections: bool, /// default = 1024 * 1024 #[prost(uint64, tag = "7")] pub data_pagesize_limit: u64, diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 0152d57832394..7addcde5956cc 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -856,6 +856,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)), pushdown_filters: value.pushdown_filters, reorder_filters: value.reorder_filters, + force_filter_selections: value.force_filter_selections, data_pagesize_limit: value.data_pagesize_limit as u64, write_batch_size: value.write_batch_size as u64, writer_version: value.writer_version.clone(), diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index aa7c3d51a9d6d..eaeed5276b241 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -763,6 +763,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "6")] pub reorder_filters: bool, + /// default = false + #[prost(bool, tag = "34")] + pub force_filter_selections: bool, /// default = 1024 * 1024 #[prost(uint64, tag = "7")] pub data_pagesize_limit: u64, diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index d32bfb22ffddd..20b3c6bb7aef9 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -375,6 +375,7 @@ mod parquet { }), pushdown_filters: global_options.global.pushdown_filters, reorder_filters: global_options.global.reorder_filters, + force_filter_selections: global_options.global.force_filter_selections, data_pagesize_limit: global_options.global.data_pagesize_limit as u64, write_batch_size: global_options.global.write_batch_size as u64, writer_version: global_options.global.writer_version.clone(), @@ -471,6 +472,7 @@ mod parquet { }), pushdown_filters: proto.pushdown_filters, reorder_filters: proto.reorder_filters, + force_filter_selections: proto.force_filter_selections, data_pagesize_limit: proto.data_pagesize_limit as usize, write_batch_size: proto.write_batch_size as usize, writer_version: proto.writer_version.clone(), diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 77197721e1f14..a8f4bb85a1edf 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -710,13 +710,13 @@ select query TTT select arrow_typeof(column1), arrow_typeof(column2), arrow_typeof(column3) from arrays; ---- -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) # arrays table query ??? @@ -1182,7 +1182,7 @@ select make_array(make_array(1), arrow_cast(make_array(-1), 'LargeList(Int8)')) query T select arrow_typeof(make_array(make_array(1), arrow_cast(make_array(-1), 'LargeList(Int8)'))); ---- -List(nullable LargeList(nullable Int64)) +List(LargeList(Int64)) query ??? @@ -1978,11 +1978,11 @@ select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 0, ---- [1, 2, 3, 4, 5] [h, e, l, l, o] -# TODO: Enable once arrow_cast supports ListView types. +# TODO: Enable once array_slice supports LargeListView types. # Expected output (once supported): # ---- # [1, 2, 3, 4, 5] [h, e, l, l, o] -query error DataFusion error: Execution error: Unsupported type 'ListView\(Int64\)'. Must be a supported arrow type name such as 'Int32' or 'Timestamp\(ns\)'. Error unknown token: ListView +query error Failed to coerce arguments to satisfy a call to 'array_slice' function: select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'ListView(Int64)'), 0, 6), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'ListView(Utf8)'), 0, 5); @@ -2025,14 +2025,15 @@ select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 2, ---- [2, 3, 4, 5] [l, l, o] -# TODO: Enable once arrow_cast supports LargeListView types. +# TODO: Enable once array_slice supports LargeListView types. # Expected output (once supported): # ---- # [2, 3, 4, 5] [l, l, o] -query error DataFusion error: Execution error: Unsupported type 'LargeListView\(Int64\)'. Must be a supported arrow type name such as 'Int32' or 'Timestamp\(ns\)'. Error unknown token: LargeListView +query error Failed to coerce arguments to satisfy a call to 'array_slice' function: select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeListView(Int64)'), 2, 6), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeListView(Utf8)'), 3, 7); + # array_slice scalar function #6 (with positive indexes; nested array) query ? select array_slice(make_array(make_array(1, 2, 3, 4, 5), make_array(6, 7, 8, 9, 10)), 1, 1); @@ -3321,7 +3322,7 @@ select array_concat([arrow_cast('1', 'Utf8'), arrow_cast('2', 'Utf8')], [arrow_cast('3', 'Utf8View')]), arrow_typeof(array_concat([arrow_cast('1', 'Utf8'), arrow_cast('2', 'Utf8')], [arrow_cast('3', 'Utf8View')])); ---- -[1, 2, 3] List(nullable Utf8View) +[1, 2, 3] List(Utf8View) # array_concat error query error DataFusion error: Error during planning: Execution error: Function 'array_concat' user-defined coercion failed with "Error during planning: array_concat does not support type Int64" @@ -4614,7 +4615,7 @@ NULL [baz] baz query T SELECT arrow_typeof(make_array(arrow_cast('a', 'Utf8View'), 'b', 'c', 'd')); ---- -List(nullable Utf8View) +List(Utf8View) # expect a,b,c,d. make_array forces all types to be of a common type (see above) query T @@ -7708,8 +7709,8 @@ CREATE EXTERNAL TABLE fixed_size_list_array STORED AS PARQUET LOCATION '../core/ query T select arrow_typeof(f0) from fixed_size_list_array; ---- -FixedSizeList(2 x nullable Int64) -FixedSizeList(2 x nullable Int64) +FixedSizeList(2 x Int64) +FixedSizeList(2 x Int64) query ? select * from fixed_size_list_array; @@ -7738,8 +7739,8 @@ select make_array(arrow_cast(f0, 'List(Int64)')) from fixed_size_list_array query T select arrow_typeof(make_array(arrow_cast(f0, 'List(Int64)'))) from fixed_size_list_array ---- -List(nullable List(nullable Int64)) -List(nullable List(nullable Int64)) +List(List(Int64)) +List(List(Int64)) query ? select make_array(f0) from fixed_size_list_array @@ -7750,8 +7751,8 @@ select make_array(f0) from fixed_size_list_array query T select arrow_typeof(make_array(f0)) from fixed_size_list_array ---- -List(nullable FixedSizeList(2 x nullable Int64)) -List(nullable FixedSizeList(2 x nullable Int64)) +List(FixedSizeList(2 x Int64)) +List(FixedSizeList(2 x Int64)) query ? select array_concat(column1, [7]) from arrays_values_v2; @@ -7798,7 +7799,7 @@ select flatten(arrow_cast(make_array([1], [2, 3], [null], make_array(4, null, 5) arrow_typeof(flatten(arrow_cast(make_array([1], [2, 3], [null], make_array(4, null, 5)), 'FixedSizeList(4, LargeList(Int64))'))), arrow_typeof(flatten(arrow_cast(make_array([[1.1], [2.2]], [[3.3], [4.4]]), 'List(LargeList(FixedSizeList(1, Float64)))'))); ---- -[1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]] LargeList(nullable Int64) LargeList(nullable FixedSizeList(1 x nullable Float64)) +[1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]] LargeList(Int64) LargeList(FixedSizeList(1 x Float64)) # flatten with column values query ???? @@ -8338,19 +8339,19 @@ select * from test_create_array_table; query T select arrow_typeof(a) from test_create_array_table; ---- -List(nullable Int32) +List(Int32) query T select arrow_typeof(c) from test_create_array_table; ---- -List(nullable List(nullable Int32)) +List(List(Int32)) # Test casting to array types # issue: https://github.com/apache/datafusion/issues/9440 query ??T select [1,2,3]::int[], [['1']]::int[][], arrow_typeof([]::text[]); ---- -[1, 2, 3] [[1]] List(nullable Utf8View) +[1, 2, 3] [[1]] List(Utf8View) # test empty arrays return length # issue: https://github.com/apache/datafusion/pull/12459 @@ -8370,8 +8371,8 @@ create table fixed_size_col_table (a int[3]) as values ([1,2,3]), ([4,5,6]); query T select arrow_typeof(a) from fixed_size_col_table; ---- -FixedSizeList(3 x nullable Int32) -FixedSizeList(3 x nullable Int32) +FixedSizeList(3 x Int32) +FixedSizeList(3 x Int32) query ? rowsort SELECT DISTINCT a FROM fixed_size_col_table diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt b/datafusion/sqllogictest/test_files/arrow_typeof.slt index 5ba62be6873c3..c213f2abf7190 100644 --- a/datafusion/sqllogictest/test_files/arrow_typeof.slt +++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt @@ -357,12 +357,12 @@ select arrow_cast(make_array(1, 2, 3), 'List(Int64)'); query T select arrow_typeof(arrow_cast(make_array(1, 2, 3), 'List(Int64)')); ---- -List(nullable Int64) +List(Int64) query T select arrow_typeof(arrow_cast(arrow_cast(make_array([1, 2, 3]), 'LargeList(LargeList(Int64))'), 'List(List(Int64))')); ---- -List(nullable List(nullable Int64)) +List(List(Int64)) ## LargeList @@ -380,12 +380,12 @@ select arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'); query T select arrow_typeof(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)')); ---- -LargeList(nullable Int64) +LargeList(Int64) query T select arrow_typeof(arrow_cast(make_array([1, 2, 3]), 'LargeList(LargeList(Int64))')); ---- -LargeList(nullable LargeList(nullable Int64)) +LargeList(LargeList(Int64)) ## FixedSizeList @@ -417,7 +417,7 @@ select arrow_cast(make_array(1, 2, 3), 'FixedSizeList(3, Int64)'); query T select arrow_typeof(arrow_cast(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'), 'FixedSizeList(3, Int64)')); ---- -FixedSizeList(3 x nullable Int64) +FixedSizeList(3 x Int64) query ? select arrow_cast([1, 2, 3], 'FixedSizeList(3, Int64)'); diff --git a/datafusion/sqllogictest/test_files/coalesce.slt b/datafusion/sqllogictest/test_files/coalesce.slt index e34a601851d78..9e5b71b871299 100644 --- a/datafusion/sqllogictest/test_files/coalesce.slt +++ b/datafusion/sqllogictest/test_files/coalesce.slt @@ -199,14 +199,14 @@ select coalesce(array[1, 2], array[3, 4]), arrow_typeof(coalesce(array[1, 2], array[3, 4])); ---- -[1, 2] List(nullable Int64) +[1, 2] List(Int64) query ?T select coalesce(null, array[3, 4]), arrow_typeof(coalesce(array[1, 2], array[3, 4])); ---- -[3, 4] List(nullable Int64) +[3, 4] List(Int64) # coalesce with array query ?T @@ -214,7 +214,7 @@ select coalesce(array[1, 2], array[arrow_cast(3, 'Int32'), arrow_cast(4, 'Int32')]), arrow_typeof(coalesce(array[1, 2], array[arrow_cast(3, 'Int32'), arrow_cast(4, 'Int32')])); ---- -[1, 2] List(nullable Int64) +[1, 2] List(Int64) # test dict(int32, utf8) statement ok diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e15163cf6ec74..090d6424af1d9 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -244,6 +244,7 @@ datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL +datafusion.execution.parquet.force_filter_selections false datafusion.execution.parquet.max_predicate_cache_size NULL datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 @@ -370,6 +371,7 @@ datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionar datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting +datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index 21fa72ad86f11..7ea54464d3e99 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -43,8 +43,8 @@ LOCATION '../core/tests/data/parquet_map.parquet'; query TTT describe data; ---- -ints Map("entries": Struct("key": Utf8, "value": Int64), unsorted) NO -strings Map("entries": Struct("key": Utf8, "value": Utf8), unsorted) NO +ints Map("entries": non-null Struct("key": non-null Utf8, "value": non-null Int64), unsorted) NO +strings Map("entries": non-null Struct("key": non-null Utf8, "value": non-null Utf8), unsorted) NO timestamp Utf8View NO query ??T diff --git a/datafusion/sqllogictest/test_files/struct.slt b/datafusion/sqllogictest/test_files/struct.slt index dce5fe036b4ec..0989dd382d518 100644 --- a/datafusion/sqllogictest/test_files/struct.slt +++ b/datafusion/sqllogictest/test_files/struct.slt @@ -53,9 +53,9 @@ select * from struct_values; query TT select arrow_typeof(s1), arrow_typeof(s2) from struct_values; ---- -Struct("c0": nullable Int32) Struct("a": nullable Int32, "b": nullable Utf8View) -Struct("c0": nullable Int32) Struct("a": nullable Int32, "b": nullable Utf8View) -Struct("c0": nullable Int32) Struct("a": nullable Int32, "b": nullable Utf8View) +Struct("c0": Int32) Struct("a": Int32, "b": Utf8View) +Struct("c0": Int32) Struct("a": Int32, "b": Utf8View) +Struct("c0": Int32) Struct("a": Int32, "b": Utf8View) # struct[i] @@ -229,12 +229,12 @@ select named_struct('field_a', 1, 'field_b', 2); query T select arrow_typeof(named_struct('first', 1, 'second', 2, 'third', 3)); ---- -Struct("first": nullable Int64, "second": nullable Int64, "third": nullable Int64) +Struct("first": Int64, "second": Int64, "third": Int64) query T select arrow_typeof({'first': 1, 'second': 2, 'third': 3}); ---- -Struct("first": nullable Int64, "second": nullable Int64, "third": nullable Int64) +Struct("first": Int64, "second": Int64, "third": Int64) # test nested struct literal query ? @@ -413,7 +413,7 @@ create table t(a struct, b struct) as valu query T select arrow_typeof([a, b]) from t; ---- -List(nullable Struct("r": nullable Utf8View, "c": nullable Float32)) +List(Struct("r": Utf8View, "c": Float32)) query ? select [a, b] from t; @@ -464,12 +464,12 @@ select * from t; query T select arrow_typeof(c1) from t; ---- -Struct("r": nullable Utf8View, "b": nullable Int32) +Struct("r": Utf8View, "b": Int32) query T select arrow_typeof(c2) from t; ---- -Struct("r": nullable Utf8View, "b": nullable Float32) +Struct("r": Utf8View, "b": Float32) statement ok drop table t; @@ -486,8 +486,8 @@ select * from t; query T select arrow_typeof(column1) from t; ---- -Struct("r": nullable Utf8, "c": nullable Float64) -Struct("r": nullable Utf8, "c": nullable Float64) +Struct("r": Utf8, "c": Float64) +Struct("r": Utf8, "c": Float64) statement ok drop table t; @@ -519,9 +519,9 @@ select coalesce(s1) from t; query T select arrow_typeof(coalesce(s1, s2)) from t; ---- -Struct("a": nullable Float32, "b": nullable Utf8View) -Struct("a": nullable Float32, "b": nullable Utf8View) -Struct("a": nullable Float32, "b": nullable Utf8View) +Struct("a": Float32, "b": Utf8View) +Struct("a": Float32, "b": Utf8View) +Struct("a": Float32, "b": Utf8View) statement ok drop table t; @@ -546,9 +546,9 @@ select coalesce(s1, s2) from t; query T select arrow_typeof(coalesce(s1, s2)) from t; ---- -Struct("a": nullable Float32, "b": nullable Utf8View) -Struct("a": nullable Float32, "b": nullable Utf8View) -Struct("a": nullable Float32, "b": nullable Utf8View) +Struct("a": Float32, "b": Utf8View) +Struct("a": Float32, "b": Utf8View) +Struct("a": Float32, "b": Utf8View) statement ok drop table t; @@ -583,7 +583,7 @@ create table t(a struct(r varchar, c int), b struct(r varchar, c float)) as valu query T select arrow_typeof([a, b]) from t; ---- -List(nullable Struct("r": nullable Utf8View, "c": nullable Float32)) +List(Struct("r": Utf8View, "c": Float32)) statement ok drop table t; @@ -606,13 +606,13 @@ create table t(a struct(r varchar, c int, g float), b struct(r varchar, c float, query T select arrow_typeof(a) from t; ---- -Struct("r": nullable Utf8View, "c": nullable Int32, "g": nullable Float32) +Struct("r": Utf8View, "c": Int32, "g": Float32) # type of each column should not coerced but perserve as it is query T select arrow_typeof(b) from t; ---- -Struct("r": nullable Utf8View, "c": nullable Float32, "g": nullable Int32) +Struct("r": Utf8View, "c": Float32, "g": Int32) statement ok drop table t; diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index acb2f07e01b21..818c8616954b4 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -5906,7 +5906,7 @@ LIMIT 5 ---- DataFusion error: type_coercion caused by -Error during planning: Cannot infer common argument type for comparison operation Int64 >= List(nullable Null) +Error during planning: Cannot infer common argument type for comparison operation Int64 >= List(Null) @@ -5934,7 +5934,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[c1@2 as c1, c2@3 as c2, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum1, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum2, count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as count1, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@7 as array_agg1, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@8 as array_agg2] 02)--GlobalLimitExec: skip=0, fetch=5 -03)----BoundedWindowAggExec: wdw=[sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable List(nullable Int64) }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable List(nullable Int64) }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----BoundedWindowAggExec: wdw=[sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable List(Int64) }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable List(Int64) }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 04)------SortPreservingMergeExec: [c1@2 ASC NULLS LAST, c2@3 ASC NULLS LAST], fetch=5 05)--------SortExec: TopK(fetch=5), expr=[c1@2 ASC NULLS LAST, c2@3 ASC NULLS LAST], preserve_partitioning=[true] 06)----------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_csv/partition-0.csv], [WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_csv/partition-1.csv], [WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_csv/partition-2.csv], [WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_csv/partition-3.csv]]}, projection=[c2@1 >= 2 as __common_expr_1, c2@1 >= 2 AND c2@1 < 4 AND c1@0 > 0 as __common_expr_2, c1, c2], file_type=csv, has_header=false diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 25c209c5ebe89..4d1a4b87c6550 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -30,6 +30,28 @@ You can see the current [status of the `52.0.0`release here](https://github.com/ The `pyarrow` feature flag has been removed. This feature has been migrated to the `datafusion-python` repository since version `44.0.0`. +### Adaptive filter representation in Parquet filter pushdown + +As of Arrow 57.1.0, DataFusion uses a new adaptive filter strategy when +evaluating pushed down filters for Parquet files. This new strategy improves +performance for certain types of queries where the results of filtering are +more efficiently represented with a bitmask rather than a selection. +See [arrow-rs #5523] for more details. + +This change only applies to the built-in Parquet data source with filter-pushdown enabled ( +which is [not yet the default behavior]). + +You can disable the new behavior by setting the +`datafusion.execution.parquet.force_filter_selections` [configuration setting] to true. + +```sql +> set datafusion.execution.parquet.force_filter_selections = true; +``` + +[arrow-rs #5523]: https://github.com/apache/arrow-rs/issues/5523 +[configuration setting]: https://datafusion.apache.org/user-guide/configs.html +[not yet the default behavior]: https://github.com/apache/datafusion/issues/3463 + ### Statistics handling moved from `FileSource` to `FileScanConfig` Statistics are now managed directly by `FileScanConfig` instead of being delegated to `FileSource` implementations. This simplifies the `FileSource` trait and provides more consistent statistics handling across all file formats. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c3eda544a1de3..55708de7c140e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,6 +84,7 @@ The following configuration settings are available: | datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.force_filter_selections | false | (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. |