From 15fea6d0777c98938c5aa436b3b22a76a36ef488 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 10 Jul 2024 15:40:41 -0700 Subject: [PATCH 01/11] refactor: make more explicit the relationship btwn TableParquetOptions vs ParquetOptions vs WriterProperties --- datafusion/common/src/config.rs | 168 ++++++++++++++++++ .../common/src/file_options/parquet_writer.rs | 146 +-------------- 2 files changed, 174 insertions(+), 140 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1d2a9589adfc..6c7c95808cbc 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -22,6 +22,22 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::{self, Display}; use std::str::FromStr; +#[cfg(feature = "parquet")] +use crate::file_options::parquet_writer::{ + parse_compression_string, parse_encoding_string, parse_statistics_string, + parse_version_string, +}; +#[cfg(feature = "parquet")] +use parquet::{ + file::properties::{ + WriterProperties, WriterPropertiesBuilder, DEFAULT_BLOOM_FILTER_FPP, + DEFAULT_BLOOM_FILTER_NDV, DEFAULT_MAX_STATISTICS_SIZE, + DEFAULT_STATISTICS_ENABLED, + }, + format::KeyValue, + schema::types::ColumnPath, +}; + use crate::error::_config_err; use crate::parsers::CompressionTypeVariant; use crate::{DataFusionError, Result}; @@ -454,6 +470,80 @@ config_namespace! { } } +#[cfg(feature = "parquet")] +impl ParquetOptions { + /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`]. + /// + /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options + /// applied per column; a customization which is not applicable for [`ParquetOptions`]. + pub fn writer_props_from_global_opts(&self) -> Result { + let ParquetOptions { + data_pagesize_limit, + write_batch_size, + writer_version, + compression, + dictionary_enabled, + dictionary_page_size_limit, + statistics_enabled, + max_statistics_size, + max_row_group_size, + created_by, + column_index_truncate_length, + data_page_row_count_limit, + encoding, + bloom_filter_on_write, + bloom_filter_fpp, + bloom_filter_ndv, + + // not in WriterProperties + enable_page_index: _, + pruning: _, + skip_metadata: _, + metadata_size_hint: _, + pushdown_filters: _, + reorder_filters: _, + allow_single_file_parallelism: _, + maximum_parallel_row_group_writers: _, + maximum_buffered_record_batches_per_stream: _, + bloom_filter_on_read: _, // reads not used for writer props + } = self; + + let mut builder = WriterProperties::builder() + .set_data_page_size_limit(*data_pagesize_limit) + .set_write_batch_size(*write_batch_size) + .set_writer_version(parse_version_string(writer_version.as_str())?) + .set_dictionary_enabled(dictionary_enabled.unwrap_or(false)) + .set_dictionary_page_size_limit(*dictionary_page_size_limit) + .set_statistics_enabled( + statistics_enabled + .as_ref() + .and_then(|s| parse_statistics_string(s).ok()) + .unwrap_or(DEFAULT_STATISTICS_ENABLED), + ) + .set_max_statistics_size( + max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE), + ) + .set_max_row_group_size(*max_row_group_size) + .set_created_by(created_by.clone()) + .set_column_index_truncate_length(*column_index_truncate_length) + .set_data_page_row_count_limit(*data_page_row_count_limit) + .set_bloom_filter_enabled(*bloom_filter_on_write) + .set_bloom_filter_fpp(bloom_filter_fpp.unwrap_or(DEFAULT_BLOOM_FILTER_FPP)) + .set_bloom_filter_ndv(bloom_filter_ndv.unwrap_or(DEFAULT_BLOOM_FILTER_NDV)); + + // We do not have access to default ColumnProperties set in Arrow. + // Therefore, only overwrite if these settings exist. + if let Some(compression) = compression { + builder = builder.set_compression(parse_compression_string(compression)?); + } + if let Some(encoding) = encoding { + builder = builder.set_encoding(parse_encoding_string(encoding)?); + } + + Ok(builder) + } +} + config_namespace! { /// Options related to aggregate execution /// @@ -1421,6 +1511,84 @@ impl TableParquetOptions { } } +#[cfg(feature = "parquet")] +impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { + type Error = DataFusionError; + + /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. + /// + /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. + fn try_from(table_parquet_options: &TableParquetOptions) -> Result { + // Table options include kv_metadata and col-specific options + let TableParquetOptions { + global, + column_specific_options, + key_value_metadata, + } = table_parquet_options; + + let mut builder = global.writer_props_from_global_opts()?; + + if !key_value_metadata.is_empty() { + builder = builder.set_key_value_metadata(Some( + key_value_metadata + .to_owned() + .drain() + .map(|(key, value)| KeyValue { key, value }) + .collect(), + )); + } + + // Apply column-specific options: + for (column, options) in column_specific_options { + let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect()); + + if let Some(bloom_filter_enabled) = options.bloom_filter_enabled { + builder = builder + .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled); + } + + if let Some(encoding) = &options.encoding { + let parsed_encoding = parse_encoding_string(encoding)?; + builder = builder.set_column_encoding(path.clone(), parsed_encoding); + } + + if let Some(dictionary_enabled) = options.dictionary_enabled { + builder = builder + .set_column_dictionary_enabled(path.clone(), dictionary_enabled); + } + + if let Some(compression) = &options.compression { + let parsed_compression = parse_compression_string(compression)?; + builder = + builder.set_column_compression(path.clone(), parsed_compression); + } + + if let Some(statistics_enabled) = &options.statistics_enabled { + let parsed_value = parse_statistics_string(statistics_enabled)?; + builder = + builder.set_column_statistics_enabled(path.clone(), parsed_value); + } + + if let Some(bloom_filter_fpp) = options.bloom_filter_fpp { + builder = + builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp); + } + + if let Some(bloom_filter_ndv) = options.bloom_filter_ndv { + builder = + builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv); + } + + if let Some(max_statistics_size) = options.max_statistics_size { + builder = + builder.set_column_max_statistics_size(path, max_statistics_size); + } + } + + Ok(builder) + } +} + impl ConfigField for TableParquetOptions { fn visit(&self, v: &mut V, key_prefix: &str, description: &'static str) { self.global.visit(v, key_prefix, description); diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 009164a29e34..d07360148b76 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -17,18 +17,13 @@ //! Options related to how parquet files should be written -use crate::{ - config::{ParquetOptions, TableParquetOptions}, - DataFusionError, Result, -}; +use crate::{config::TableParquetOptions, DataFusionError, Result}; use parquet::{ basic::{BrotliLevel, GzipLevel, ZstdLevel}, - file::{ - metadata::KeyValue, - properties::{EnabledStatistics, WriterProperties, WriterVersion}, + file::properties::{ + EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, }, - schema::types::ColumnPath, }; /// Options for writing parquet files @@ -52,140 +47,11 @@ impl ParquetWriterOptions { impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { type Error = DataFusionError; - fn try_from(parquet_options: &TableParquetOptions) -> Result { - let ParquetOptions { - data_pagesize_limit, - write_batch_size, - writer_version, - dictionary_page_size_limit, - max_row_group_size, - created_by, - column_index_truncate_length, - data_page_row_count_limit, - bloom_filter_on_write, - encoding, - dictionary_enabled, - compression, - statistics_enabled, - max_statistics_size, - bloom_filter_fpp, - bloom_filter_ndv, - // below is not part of ParquetWriterOptions - enable_page_index: _, - pruning: _, - skip_metadata: _, - metadata_size_hint: _, - pushdown_filters: _, - reorder_filters: _, - allow_single_file_parallelism: _, - maximum_parallel_row_group_writers: _, - maximum_buffered_record_batches_per_stream: _, - bloom_filter_on_read: _, - } = &parquet_options.global; - - let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() { - Some( - parquet_options - .key_value_metadata - .clone() - .drain() - .map(|(key, value)| KeyValue { key, value }) - .collect::>(), - ) - } else { - None - }; - - let mut builder = WriterProperties::builder() - .set_data_page_size_limit(*data_pagesize_limit) - .set_write_batch_size(*write_batch_size) - .set_writer_version(parse_version_string(writer_version.as_str())?) - .set_dictionary_page_size_limit(*dictionary_page_size_limit) - .set_max_row_group_size(*max_row_group_size) - .set_created_by(created_by.clone()) - .set_column_index_truncate_length(*column_index_truncate_length) - .set_data_page_row_count_limit(*data_page_row_count_limit) - .set_bloom_filter_enabled(*bloom_filter_on_write) - .set_key_value_metadata(key_value_metadata); - - if let Some(encoding) = &encoding { - builder = builder.set_encoding(parse_encoding_string(encoding)?); - } - - if let Some(enabled) = dictionary_enabled { - builder = builder.set_dictionary_enabled(*enabled); - } - - if let Some(compression) = &compression { - builder = builder.set_compression(parse_compression_string(compression)?); - } - - if let Some(statistics) = &statistics_enabled { - builder = - builder.set_statistics_enabled(parse_statistics_string(statistics)?); - } - - if let Some(size) = max_statistics_size { - builder = builder.set_max_statistics_size(*size); - } - - if let Some(fpp) = bloom_filter_fpp { - builder = builder.set_bloom_filter_fpp(*fpp); - } - - if let Some(ndv) = bloom_filter_ndv { - builder = builder.set_bloom_filter_ndv(*ndv); - } - - for (column, options) in &parquet_options.column_specific_options { - let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect()); - - if let Some(bloom_filter_enabled) = options.bloom_filter_enabled { - builder = builder - .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled); - } - - if let Some(encoding) = &options.encoding { - let parsed_encoding = parse_encoding_string(encoding)?; - builder = builder.set_column_encoding(path.clone(), parsed_encoding); - } - - if let Some(dictionary_enabled) = options.dictionary_enabled { - builder = builder - .set_column_dictionary_enabled(path.clone(), dictionary_enabled); - } - - if let Some(compression) = &options.compression { - let parsed_compression = parse_compression_string(compression)?; - builder = - builder.set_column_compression(path.clone(), parsed_compression); - } - - if let Some(statistics_enabled) = &options.statistics_enabled { - let parsed_value = parse_statistics_string(statistics_enabled)?; - builder = - builder.set_column_statistics_enabled(path.clone(), parsed_value); - } - - if let Some(bloom_filter_fpp) = options.bloom_filter_fpp { - builder = - builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp); - } - - if let Some(bloom_filter_ndv) = options.bloom_filter_ndv { - builder = - builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv); - } - - if let Some(max_statistics_size) = options.max_statistics_size { - builder = - builder.set_column_max_statistics_size(path, max_statistics_size); - } - } - + fn try_from(parquet_table_options: &TableParquetOptions) -> Result { // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns) Ok(ParquetWriterOptions { - writer_options: builder.build(), + writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)? + .build(), }) } } From 15a9fec1795320e46e71949f68616fa0428c4e3e Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 12 Jul 2024 12:17:33 -0700 Subject: [PATCH 02/11] test: demonstrate the relationship btwn session configs and writer props --- datafusion/common/src/config.rs | 189 ++++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 6c7c95808cbc..99ae584c3256 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -2042,4 +2042,193 @@ mod tests { let parsed_metadata = table_config.parquet.key_value_metadata; assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into()))); } + + #[cfg(feature = "parquet")] + mod test_conversion_from_session_to_writer_props { + use super::super::*; + use parquet::{basic::Compression, file::properties::EnabledStatistics}; + + const COL_NAME: &str = "configured"; + + /// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config. + fn column_options_with_non_defaults( + src_col_defaults: &ParquetOptions, + ) -> ColumnOptions { + ColumnOptions { + compression: Some("zstd(22)".into()), + dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v), + statistics_enabled: Some("page".into()), + max_statistics_size: Some(72), + encoding: Some("RLE".into()), + bloom_filter_enabled: Some(true), + bloom_filter_fpp: Some(0.72), + bloom_filter_ndv: Some(72), + } + } + + fn parquet_options_with_non_defaults() -> ParquetOptions { + let defaults = ParquetOptions::default(); + let writer_version = if defaults.writer_version.eq("1.0") { + "2.0" + } else { + "1.0" + }; + + ParquetOptions { + data_pagesize_limit: 42, + write_batch_size: 42, + writer_version: writer_version.into(), + compression: Some("zstd(22)".into()), + dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)), + dictionary_page_size_limit: 42, + statistics_enabled: Some("chunk".into()), + max_statistics_size: Some(42), + max_row_group_size: 42, + created_by: "wordy".into(), + column_index_truncate_length: Some(42), + data_page_row_count_limit: 42, + encoding: Some("BYTE_STREAM_SPLIT".into()), + bloom_filter_on_write: !defaults.bloom_filter_on_write, + bloom_filter_fpp: Some(0.42), + bloom_filter_ndv: Some(42), + + // not in WriterProperties, but itemizing here to not skip newly added props + enable_page_index: Default::default(), + pruning: Default::default(), + skip_metadata: Default::default(), + metadata_size_hint: Default::default(), + pushdown_filters: Default::default(), + reorder_filters: Default::default(), + allow_single_file_parallelism: Default::default(), + maximum_parallel_row_group_writers: Default::default(), + maximum_buffered_record_batches_per_stream: Default::default(), + bloom_filter_on_read: Default::default(), + } + } + + fn extract_column_options( + props: &WriterProperties, + col: ColumnPath, + ) -> ColumnOptions { + let bloom_filter_default_props = props.bloom_filter_properties(&col); + + ColumnOptions { + bloom_filter_enabled: Some(bloom_filter_default_props.is_some()), + encoding: props.encoding(&col).map(|s| s.to_string()), + dictionary_enabled: Some(props.dictionary_enabled(&col)), + compression: match props.compression(&col) { + Compression::ZSTD(lvl) => { + Some(format!("zstd({})", lvl.compression_level())) + } + _ => None, + }, + statistics_enabled: Some( + match props.statistics_enabled(&col) { + EnabledStatistics::None => "none", + EnabledStatistics::Chunk => "chunk", + EnabledStatistics::Page => "page", + } + .into(), + ), + bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp), + bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv), + max_statistics_size: Some(props.max_statistics_size(&col)), + } + } + + /// For testing only, take a single write's props and convert back into the session config. + /// (use identity to confirm correct.) + fn session_config_from_writer_props( + props: &WriterProperties, + ) -> TableParquetOptions { + let default_col = ColumnPath::from("col doesn't have specific config"); + let default_col_props = extract_column_options(props, default_col); + + let configured_col = ColumnPath::from(COL_NAME); + let configured_col_props = extract_column_options(props, configured_col); + + let key_value_metadata = props + .key_value_metadata() + .map(|pairs| { + HashMap::from_iter( + pairs + .iter() + .cloned() + .map(|KeyValue { key, value }| (key, value)), + ) + }) + .unwrap_or_default(); + + TableParquetOptions { + global: ParquetOptions { + // global options + data_pagesize_limit: props.dictionary_page_size_limit(), + write_batch_size: props.write_batch_size(), + writer_version: format!("{}.0", props.writer_version().as_num()), + dictionary_page_size_limit: props.dictionary_page_size_limit(), + max_row_group_size: props.max_row_group_size(), + created_by: props.created_by().to_string(), + column_index_truncate_length: props.column_index_truncate_length(), + data_page_row_count_limit: props.data_page_row_count_limit(), + + // global options which set the default column props + encoding: default_col_props.encoding, + compression: default_col_props.compression, + dictionary_enabled: default_col_props.dictionary_enabled, + statistics_enabled: default_col_props.statistics_enabled, + max_statistics_size: default_col_props.max_statistics_size, + bloom_filter_on_write: default_col_props + .bloom_filter_enabled + .unwrap_or_default(), + bloom_filter_fpp: default_col_props.bloom_filter_fpp, + bloom_filter_ndv: default_col_props.bloom_filter_ndv, + + // not in WriterProperties + enable_page_index: Default::default(), + pruning: Default::default(), + skip_metadata: Default::default(), + metadata_size_hint: Default::default(), + pushdown_filters: Default::default(), + reorder_filters: Default::default(), + allow_single_file_parallelism: Default::default(), + maximum_parallel_row_group_writers: Default::default(), + maximum_buffered_record_batches_per_stream: Default::default(), + bloom_filter_on_read: Default::default(), + }, + column_specific_options: HashMap::from([( + COL_NAME.into(), + configured_col_props, + )]), + key_value_metadata, + } + } + + #[test] + fn table_parquet_opts_to_writer_props() { + // ParquetOptions, all props set to non-default + let parquet_options = parquet_options_with_non_defaults(); + + // TableParquetOptions, using ParquetOptions for global settings + let key = "foo".to_string(); + let value = Some("bar".into()); + let table_parquet_opts = TableParquetOptions { + global: parquet_options.clone(), + column_specific_options: [( + COL_NAME.into(), + column_options_with_non_defaults(&parquet_options), + )] + .into(), + key_value_metadata: [(key.clone(), value.clone())].into(), + }; + + let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts) + .unwrap() + .build(); + assert_eq!( + table_parquet_opts, + session_config_from_writer_props(&writer_props), + "the writer_props should have the same configuration as the session's TableParquetOptions", + ); + } + } } From f67c1893af9b39bb3c0600321ee08eb0567af911 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 15 Jul 2024 14:42:32 -0700 Subject: [PATCH 03/11] refactor: move parquet-format specific functionality to the parquet submodule, leaving only the config options in the config module. --- datafusion/common/src/config.rs | 278 +----------------- .../common/src/file_options/parquet_writer.rs | 270 +++++++++++++++++ 2 files changed, 273 insertions(+), 275 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 99ae584c3256..aee1ad983665 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -28,14 +28,9 @@ use crate::file_options::parquet_writer::{ parse_version_string, }; #[cfg(feature = "parquet")] -use parquet::{ - file::properties::{ - WriterProperties, WriterPropertiesBuilder, DEFAULT_BLOOM_FILTER_FPP, - DEFAULT_BLOOM_FILTER_NDV, DEFAULT_MAX_STATISTICS_SIZE, - DEFAULT_STATISTICS_ENABLED, - }, - format::KeyValue, - schema::types::ColumnPath, +use parquet::file::properties::{ + WriterProperties, WriterPropertiesBuilder, DEFAULT_BLOOM_FILTER_FPP, + DEFAULT_BLOOM_FILTER_NDV, DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED, }; use crate::error::_config_err; @@ -1511,84 +1506,6 @@ impl TableParquetOptions { } } -#[cfg(feature = "parquet")] -impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { - type Error = DataFusionError; - - /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. - /// - /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. - fn try_from(table_parquet_options: &TableParquetOptions) -> Result { - // Table options include kv_metadata and col-specific options - let TableParquetOptions { - global, - column_specific_options, - key_value_metadata, - } = table_parquet_options; - - let mut builder = global.writer_props_from_global_opts()?; - - if !key_value_metadata.is_empty() { - builder = builder.set_key_value_metadata(Some( - key_value_metadata - .to_owned() - .drain() - .map(|(key, value)| KeyValue { key, value }) - .collect(), - )); - } - - // Apply column-specific options: - for (column, options) in column_specific_options { - let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect()); - - if let Some(bloom_filter_enabled) = options.bloom_filter_enabled { - builder = builder - .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled); - } - - if let Some(encoding) = &options.encoding { - let parsed_encoding = parse_encoding_string(encoding)?; - builder = builder.set_column_encoding(path.clone(), parsed_encoding); - } - - if let Some(dictionary_enabled) = options.dictionary_enabled { - builder = builder - .set_column_dictionary_enabled(path.clone(), dictionary_enabled); - } - - if let Some(compression) = &options.compression { - let parsed_compression = parse_compression_string(compression)?; - builder = - builder.set_column_compression(path.clone(), parsed_compression); - } - - if let Some(statistics_enabled) = &options.statistics_enabled { - let parsed_value = parse_statistics_string(statistics_enabled)?; - builder = - builder.set_column_statistics_enabled(path.clone(), parsed_value); - } - - if let Some(bloom_filter_fpp) = options.bloom_filter_fpp { - builder = - builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp); - } - - if let Some(bloom_filter_ndv) = options.bloom_filter_ndv { - builder = - builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv); - } - - if let Some(max_statistics_size) = options.max_statistics_size { - builder = - builder.set_column_max_statistics_size(path, max_statistics_size); - } - } - - Ok(builder) - } -} - impl ConfigField for TableParquetOptions { fn visit(&self, v: &mut V, key_prefix: &str, description: &'static str) { self.global.visit(v, key_prefix, description); @@ -2042,193 +1959,4 @@ mod tests { let parsed_metadata = table_config.parquet.key_value_metadata; assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into()))); } - - #[cfg(feature = "parquet")] - mod test_conversion_from_session_to_writer_props { - use super::super::*; - use parquet::{basic::Compression, file::properties::EnabledStatistics}; - - const COL_NAME: &str = "configured"; - - /// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config. - fn column_options_with_non_defaults( - src_col_defaults: &ParquetOptions, - ) -> ColumnOptions { - ColumnOptions { - compression: Some("zstd(22)".into()), - dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v), - statistics_enabled: Some("page".into()), - max_statistics_size: Some(72), - encoding: Some("RLE".into()), - bloom_filter_enabled: Some(true), - bloom_filter_fpp: Some(0.72), - bloom_filter_ndv: Some(72), - } - } - - fn parquet_options_with_non_defaults() -> ParquetOptions { - let defaults = ParquetOptions::default(); - let writer_version = if defaults.writer_version.eq("1.0") { - "2.0" - } else { - "1.0" - }; - - ParquetOptions { - data_pagesize_limit: 42, - write_batch_size: 42, - writer_version: writer_version.into(), - compression: Some("zstd(22)".into()), - dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)), - dictionary_page_size_limit: 42, - statistics_enabled: Some("chunk".into()), - max_statistics_size: Some(42), - max_row_group_size: 42, - created_by: "wordy".into(), - column_index_truncate_length: Some(42), - data_page_row_count_limit: 42, - encoding: Some("BYTE_STREAM_SPLIT".into()), - bloom_filter_on_write: !defaults.bloom_filter_on_write, - bloom_filter_fpp: Some(0.42), - bloom_filter_ndv: Some(42), - - // not in WriterProperties, but itemizing here to not skip newly added props - enable_page_index: Default::default(), - pruning: Default::default(), - skip_metadata: Default::default(), - metadata_size_hint: Default::default(), - pushdown_filters: Default::default(), - reorder_filters: Default::default(), - allow_single_file_parallelism: Default::default(), - maximum_parallel_row_group_writers: Default::default(), - maximum_buffered_record_batches_per_stream: Default::default(), - bloom_filter_on_read: Default::default(), - } - } - - fn extract_column_options( - props: &WriterProperties, - col: ColumnPath, - ) -> ColumnOptions { - let bloom_filter_default_props = props.bloom_filter_properties(&col); - - ColumnOptions { - bloom_filter_enabled: Some(bloom_filter_default_props.is_some()), - encoding: props.encoding(&col).map(|s| s.to_string()), - dictionary_enabled: Some(props.dictionary_enabled(&col)), - compression: match props.compression(&col) { - Compression::ZSTD(lvl) => { - Some(format!("zstd({})", lvl.compression_level())) - } - _ => None, - }, - statistics_enabled: Some( - match props.statistics_enabled(&col) { - EnabledStatistics::None => "none", - EnabledStatistics::Chunk => "chunk", - EnabledStatistics::Page => "page", - } - .into(), - ), - bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp), - bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv), - max_statistics_size: Some(props.max_statistics_size(&col)), - } - } - - /// For testing only, take a single write's props and convert back into the session config. - /// (use identity to confirm correct.) - fn session_config_from_writer_props( - props: &WriterProperties, - ) -> TableParquetOptions { - let default_col = ColumnPath::from("col doesn't have specific config"); - let default_col_props = extract_column_options(props, default_col); - - let configured_col = ColumnPath::from(COL_NAME); - let configured_col_props = extract_column_options(props, configured_col); - - let key_value_metadata = props - .key_value_metadata() - .map(|pairs| { - HashMap::from_iter( - pairs - .iter() - .cloned() - .map(|KeyValue { key, value }| (key, value)), - ) - }) - .unwrap_or_default(); - - TableParquetOptions { - global: ParquetOptions { - // global options - data_pagesize_limit: props.dictionary_page_size_limit(), - write_batch_size: props.write_batch_size(), - writer_version: format!("{}.0", props.writer_version().as_num()), - dictionary_page_size_limit: props.dictionary_page_size_limit(), - max_row_group_size: props.max_row_group_size(), - created_by: props.created_by().to_string(), - column_index_truncate_length: props.column_index_truncate_length(), - data_page_row_count_limit: props.data_page_row_count_limit(), - - // global options which set the default column props - encoding: default_col_props.encoding, - compression: default_col_props.compression, - dictionary_enabled: default_col_props.dictionary_enabled, - statistics_enabled: default_col_props.statistics_enabled, - max_statistics_size: default_col_props.max_statistics_size, - bloom_filter_on_write: default_col_props - .bloom_filter_enabled - .unwrap_or_default(), - bloom_filter_fpp: default_col_props.bloom_filter_fpp, - bloom_filter_ndv: default_col_props.bloom_filter_ndv, - - // not in WriterProperties - enable_page_index: Default::default(), - pruning: Default::default(), - skip_metadata: Default::default(), - metadata_size_hint: Default::default(), - pushdown_filters: Default::default(), - reorder_filters: Default::default(), - allow_single_file_parallelism: Default::default(), - maximum_parallel_row_group_writers: Default::default(), - maximum_buffered_record_batches_per_stream: Default::default(), - bloom_filter_on_read: Default::default(), - }, - column_specific_options: HashMap::from([( - COL_NAME.into(), - configured_col_props, - )]), - key_value_metadata, - } - } - - #[test] - fn table_parquet_opts_to_writer_props() { - // ParquetOptions, all props set to non-default - let parquet_options = parquet_options_with_non_defaults(); - - // TableParquetOptions, using ParquetOptions for global settings - let key = "foo".to_string(); - let value = Some("bar".into()); - let table_parquet_opts = TableParquetOptions { - global: parquet_options.clone(), - column_specific_options: [( - COL_NAME.into(), - column_options_with_non_defaults(&parquet_options), - )] - .into(), - key_value_metadata: [(key.clone(), value.clone())].into(), - }; - - let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts) - .unwrap() - .build(); - assert_eq!( - table_parquet_opts, - session_config_from_writer_props(&writer_props), - "the writer_props should have the same configuration as the session's TableParquetOptions", - ); - } - } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index d07360148b76..a6c5d9fddf58 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -24,6 +24,8 @@ use parquet::{ file::properties::{ EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, }, + format::KeyValue, + schema::types::ColumnPath, }; /// Options for writing parquet files @@ -56,6 +58,83 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { } } +impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { + type Error = DataFusionError; + + /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. + /// + /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. + fn try_from(table_parquet_options: &TableParquetOptions) -> Result { + // Table options include kv_metadata and col-specific options + let TableParquetOptions { + global, + column_specific_options, + key_value_metadata, + } = table_parquet_options; + + let mut builder = global.writer_props_from_global_opts()?; + + if !key_value_metadata.is_empty() { + builder = builder.set_key_value_metadata(Some( + key_value_metadata + .to_owned() + .drain() + .map(|(key, value)| KeyValue { key, value }) + .collect(), + )); + } + + // Apply column-specific options: + for (column, options) in column_specific_options { + let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect()); + + if let Some(bloom_filter_enabled) = options.bloom_filter_enabled { + builder = builder + .set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled); + } + + if let Some(encoding) = &options.encoding { + let parsed_encoding = parse_encoding_string(encoding)?; + builder = builder.set_column_encoding(path.clone(), parsed_encoding); + } + + if let Some(dictionary_enabled) = options.dictionary_enabled { + builder = builder + .set_column_dictionary_enabled(path.clone(), dictionary_enabled); + } + + if let Some(compression) = &options.compression { + let parsed_compression = parse_compression_string(compression)?; + builder = + builder.set_column_compression(path.clone(), parsed_compression); + } + + if let Some(statistics_enabled) = &options.statistics_enabled { + let parsed_value = parse_statistics_string(statistics_enabled)?; + builder = + builder.set_column_statistics_enabled(path.clone(), parsed_value); + } + + if let Some(bloom_filter_fpp) = options.bloom_filter_fpp { + builder = + builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp); + } + + if let Some(bloom_filter_ndv) = options.bloom_filter_ndv { + builder = + builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv); + } + + if let Some(max_statistics_size) = options.max_statistics_size { + builder = + builder.set_column_max_statistics_size(path, max_statistics_size); + } + } + + Ok(builder) + } +} + /// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding pub(crate) fn parse_encoding_string( str_setting: &str, @@ -202,3 +281,194 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result ColumnOptions { + ColumnOptions { + compression: Some("zstd(22)".into()), + dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v), + statistics_enabled: Some("page".into()), + max_statistics_size: Some(72), + encoding: Some("RLE".into()), + bloom_filter_enabled: Some(true), + bloom_filter_fpp: Some(0.72), + bloom_filter_ndv: Some(72), + } + } + + fn parquet_options_with_non_defaults() -> ParquetOptions { + let defaults = ParquetOptions::default(); + let writer_version = if defaults.writer_version.eq("1.0") { + "2.0" + } else { + "1.0" + }; + + ParquetOptions { + data_pagesize_limit: 42, + write_batch_size: 42, + writer_version: writer_version.into(), + compression: Some("zstd(22)".into()), + dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)), + dictionary_page_size_limit: 42, + statistics_enabled: Some("chunk".into()), + max_statistics_size: Some(42), + max_row_group_size: 42, + created_by: "wordy".into(), + column_index_truncate_length: Some(42), + data_page_row_count_limit: 42, + encoding: Some("BYTE_STREAM_SPLIT".into()), + bloom_filter_on_write: !defaults.bloom_filter_on_write, + bloom_filter_fpp: Some(0.42), + bloom_filter_ndv: Some(42), + + // not in WriterProperties, but itemizing here to not skip newly added props + enable_page_index: Default::default(), + pruning: Default::default(), + skip_metadata: Default::default(), + metadata_size_hint: Default::default(), + pushdown_filters: Default::default(), + reorder_filters: Default::default(), + allow_single_file_parallelism: Default::default(), + maximum_parallel_row_group_writers: Default::default(), + maximum_buffered_record_batches_per_stream: Default::default(), + bloom_filter_on_read: Default::default(), + } + } + + fn extract_column_options( + props: &WriterProperties, + col: ColumnPath, + ) -> ColumnOptions { + let bloom_filter_default_props = props.bloom_filter_properties(&col); + + ColumnOptions { + bloom_filter_enabled: Some(bloom_filter_default_props.is_some()), + encoding: props.encoding(&col).map(|s| s.to_string()), + dictionary_enabled: Some(props.dictionary_enabled(&col)), + compression: match props.compression(&col) { + Compression::ZSTD(lvl) => { + Some(format!("zstd({})", lvl.compression_level())) + } + _ => None, + }, + statistics_enabled: Some( + match props.statistics_enabled(&col) { + EnabledStatistics::None => "none", + EnabledStatistics::Chunk => "chunk", + EnabledStatistics::Page => "page", + } + .into(), + ), + bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp), + bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv), + max_statistics_size: Some(props.max_statistics_size(&col)), + } + } + + /// For testing only, take a single write's props and convert back into the session config. + /// (use identity to confirm correct.) + fn session_config_from_writer_props(props: &WriterProperties) -> TableParquetOptions { + let default_col = ColumnPath::from("col doesn't have specific config"); + let default_col_props = extract_column_options(props, default_col); + + let configured_col = ColumnPath::from(COL_NAME); + let configured_col_props = extract_column_options(props, configured_col); + + let key_value_metadata = props + .key_value_metadata() + .map(|pairs| { + HashMap::from_iter( + pairs + .iter() + .cloned() + .map(|KeyValue { key, value }| (key, value)), + ) + }) + .unwrap_or_default(); + + TableParquetOptions { + global: ParquetOptions { + // global options + data_pagesize_limit: props.dictionary_page_size_limit(), + write_batch_size: props.write_batch_size(), + writer_version: format!("{}.0", props.writer_version().as_num()), + dictionary_page_size_limit: props.dictionary_page_size_limit(), + max_row_group_size: props.max_row_group_size(), + created_by: props.created_by().to_string(), + column_index_truncate_length: props.column_index_truncate_length(), + data_page_row_count_limit: props.data_page_row_count_limit(), + + // global options which set the default column props + encoding: default_col_props.encoding, + compression: default_col_props.compression, + dictionary_enabled: default_col_props.dictionary_enabled, + statistics_enabled: default_col_props.statistics_enabled, + max_statistics_size: default_col_props.max_statistics_size, + bloom_filter_on_write: default_col_props + .bloom_filter_enabled + .unwrap_or_default(), + bloom_filter_fpp: default_col_props.bloom_filter_fpp, + bloom_filter_ndv: default_col_props.bloom_filter_ndv, + + // not in WriterProperties + enable_page_index: Default::default(), + pruning: Default::default(), + skip_metadata: Default::default(), + metadata_size_hint: Default::default(), + pushdown_filters: Default::default(), + reorder_filters: Default::default(), + allow_single_file_parallelism: Default::default(), + maximum_parallel_row_group_writers: Default::default(), + maximum_buffered_record_batches_per_stream: Default::default(), + bloom_filter_on_read: Default::default(), + }, + column_specific_options: HashMap::from([( + COL_NAME.into(), + configured_col_props, + )]), + key_value_metadata, + } + } + + #[test] + fn table_parquet_opts_to_writer_props() { + // ParquetOptions, all props set to non-default + let parquet_options = parquet_options_with_non_defaults(); + + // TableParquetOptions, using ParquetOptions for global settings + let key = "foo".to_string(); + let value = Some("bar".into()); + let table_parquet_opts = TableParquetOptions { + global: parquet_options.clone(), + column_specific_options: [( + COL_NAME.into(), + column_options_with_non_defaults(&parquet_options), + )] + .into(), + key_value_metadata: [(key.clone(), value.clone())].into(), + }; + + let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts) + .unwrap() + .build(); + assert_eq!( + table_parquet_opts, + session_config_from_writer_props(&writer_props), + "the writer_props should have the same configuration as the session's TableParquetOptions", + ); + } +} From e9f14274ba4fb421ab337c7c7e2eccfe302bcba2 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 15 Jul 2024 15:27:30 -0700 Subject: [PATCH 04/11] test: update test fixtures to use the ParquetOptions::default --- .../common/src/file_options/parquet_writer.rs | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a6c5d9fddf58..0fda40c67d7d 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -336,16 +336,18 @@ mod tests { bloom_filter_ndv: Some(42), // not in WriterProperties, but itemizing here to not skip newly added props - enable_page_index: Default::default(), - pruning: Default::default(), - skip_metadata: Default::default(), - metadata_size_hint: Default::default(), - pushdown_filters: Default::default(), - reorder_filters: Default::default(), - allow_single_file_parallelism: Default::default(), - maximum_parallel_row_group_writers: Default::default(), - maximum_buffered_record_batches_per_stream: Default::default(), - bloom_filter_on_read: Default::default(), + enable_page_index: defaults.enable_page_index, + pruning: defaults.pruning, + skip_metadata: defaults.skip_metadata, + metadata_size_hint: defaults.metadata_size_hint, + pushdown_filters: defaults.pushdown_filters, + reorder_filters: defaults.reorder_filters, + allow_single_file_parallelism: defaults.allow_single_file_parallelism, + maximum_parallel_row_group_writers: defaults + .maximum_parallel_row_group_writers, + maximum_buffered_record_batches_per_stream: defaults + .maximum_buffered_record_batches_per_stream, + bloom_filter_on_read: defaults.bloom_filter_on_read, } } @@ -400,6 +402,8 @@ mod tests { }) .unwrap_or_default(); + let global_options_defaults = ParquetOptions::default(); + TableParquetOptions { global: ParquetOptions { // global options @@ -425,16 +429,19 @@ mod tests { bloom_filter_ndv: default_col_props.bloom_filter_ndv, // not in WriterProperties - enable_page_index: Default::default(), - pruning: Default::default(), - skip_metadata: Default::default(), - metadata_size_hint: Default::default(), - pushdown_filters: Default::default(), - reorder_filters: Default::default(), - allow_single_file_parallelism: Default::default(), - maximum_parallel_row_group_writers: Default::default(), - maximum_buffered_record_batches_per_stream: Default::default(), - bloom_filter_on_read: Default::default(), + enable_page_index: global_options_defaults.enable_page_index, + pruning: global_options_defaults.pruning, + skip_metadata: global_options_defaults.skip_metadata, + metadata_size_hint: global_options_defaults.metadata_size_hint, + pushdown_filters: global_options_defaults.pushdown_filters, + reorder_filters: global_options_defaults.reorder_filters, + allow_single_file_parallelism: global_options_defaults + .allow_single_file_parallelism, + maximum_parallel_row_group_writers: global_options_defaults + .maximum_parallel_row_group_writers, + maximum_buffered_record_batches_per_stream: global_options_defaults + .maximum_buffered_record_batches_per_stream, + bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, }, column_specific_options: HashMap::from([( COL_NAME.into(), From e693d437368597b98efaacfaf7289cc21d91771c Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 15 Jul 2024 15:44:31 -0700 Subject: [PATCH 05/11] test: update test helper session_config_from_writer_props, to not add column configuration when none exists --- datafusion/common/src/file_options/parquet_writer.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 0fda40c67d7d..374d85b0440b 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -404,6 +404,12 @@ mod tests { let global_options_defaults = ParquetOptions::default(); + let column_specific_options = if configured_col_props.eq(&default_col_props) { + HashMap::default() + } else { + HashMap::from([(COL_NAME.into(), configured_col_props)]) + }; + TableParquetOptions { global: ParquetOptions { // global options @@ -443,10 +449,7 @@ mod tests { .maximum_buffered_record_batches_per_stream, bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, }, - column_specific_options: HashMap::from([( - COL_NAME.into(), - configured_col_props, - )]), + column_specific_options, key_value_metadata, } } From 9c436256c64453702a9f91fe0fa9170a89033421 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 15 Jul 2024 18:05:54 -0700 Subject: [PATCH 06/11] test(11367): write test to demonstrate issue 11367 --- .../common/src/file_options/parquet_writer.rs | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 374d85b0440b..4a490db57603 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -481,4 +481,144 @@ mod tests { "the writer_props should have the same configuration as the session's TableParquetOptions", ); } + + #[test] + fn test_defaults_match() { + // ensure the global settings are the same + let default_table_writer_opts = TableParquetOptions::default(); + let default_parquet_opts = ParquetOptions::default(); + assert_eq!( + default_table_writer_opts.global, + default_parquet_opts, + "should have matching defaults for TableParquetOptions.global and ParquetOptions", + ); + + // confirm the TableParquetOptions::default matches the WriterProperties::default, once both converted to WriterProperties + let default_writer_props = WriterProperties::new(); + let from_datafusion_defaults = + WriterPropertiesBuilder::try_from(&default_table_writer_opts) + .unwrap() + .build(); + + // Expected: how the defaults should not match + assert_ne!( + default_writer_props.created_by(), + from_datafusion_defaults.created_by(), + "should have different created_by sources", + ); + assert!( + default_writer_props.created_by().starts_with("parquet-rs version"), + "should indicate that writer_props defaults came from the extern parquet crate", + ); + assert!( + default_table_writer_opts + .global + .created_by + .starts_with("datafusion version"), + "should indicate that table_parquet_opts defaults came from datafusion", + ); + + // TODO: in a followup PR, start the discussion of which of these defaults should be changed, + // and if the change should be in datafusion's default or the extern parquet's default. + // refer to https://github.com/apache/datafusion/issues/11367 + + // TODO: does not match + assert_eq!( + default_writer_props.compression(&"default".into()), + Compression::UNCOMPRESSED, + "extern parquet's default is None" + ); + assert!( + matches!( + from_datafusion_defaults.compression(&"default".into()), + Compression::ZSTD(_) + ), + "datafusion's default is zstd" + ); + + // TODO: does not match + assert_eq!( + default_writer_props.data_page_row_count_limit(), + 20_000, + "extern parquet's default data_page_row_count_limit is 20_000" + ); + assert_eq!( + from_datafusion_defaults.data_page_row_count_limit(), + usize::MAX, + "datafusion's default is usize::MAX" + ); + + // TODO: does not match + assert_eq!( + default_writer_props.column_index_truncate_length(), + Some(64), + "extern parquet's default is 64" + ); + assert_eq!( + from_datafusion_defaults.column_index_truncate_length(), + None, + "datafusion's default is None" + ); + + // TODO: does not match + assert!( + default_writer_props.dictionary_enabled(&"default".into()), + "extern parquet's default is true" + ); + assert!( + !from_datafusion_defaults.dictionary_enabled(&"default".into()), + "datafusion's default is false" + ); + + // TODO: matches once create WriterProps, but only due to parquet's override + assert_eq!( + default_writer_props.statistics_enabled(&"default".into()), + EnabledStatistics::Page, + "extern parquet's default is page" + ); + assert_eq!( + default_table_writer_opts.global.statistics_enabled, None, + "datafusion's has no default" + ); + assert_eq!( + from_datafusion_defaults.statistics_enabled(&"default".into()), + EnabledStatistics::Page, + "should see the extern parquet's default over-riding datafusion's None", + ); + + // TODO: matches once create WriterProps, but only due to parquet's override + assert_eq!( + default_writer_props.max_statistics_size(&"default".into()), + 4096, + "extern parquet's default is 4096" + ); + assert_eq!( + default_table_writer_opts.global.max_statistics_size, None, + "datafusion's has no default" + ); + assert_eq!( + default_writer_props.max_statistics_size(&"default".into()), + 4096, + "should see the extern parquet's default over-riding datafusion's None", + ); + + // TODO: temporarily set the extern parquet's defaults back to the datafusion defaults. + let mut from_extern_parquet = + session_config_from_writer_props(&default_writer_props); + from_extern_parquet.global.compression = Some("zstd(3)".into()); + from_extern_parquet.global.data_page_row_count_limit = usize::MAX; + from_extern_parquet.global.column_index_truncate_length = None; + from_extern_parquet.global.dictionary_enabled = None; + from_extern_parquet.global.statistics_enabled = None; + from_extern_parquet.global.max_statistics_size = None; + + // Expected: the remaining should match + let same_created_by = default_table_writer_opts.global.created_by.clone(); // we expect these to be different + from_extern_parquet.global.created_by = same_created_by; // we expect these to be different + assert_eq!( + default_table_writer_opts, + from_extern_parquet, + "the writer_props should have the same configuration as the session's TableParquetOptions", + ); + } } From ad9f695b69336fa7e1cab8cba7e59587428c2a2e Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 15 Jul 2024 20:37:30 -0700 Subject: [PATCH 07/11] fix: existing sqllogictests require specific ParquetOptions settings to be left as None --- datafusion/common/src/config.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index aee1ad983665..2cf78cd72b01 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -29,8 +29,8 @@ use crate::file_options::parquet_writer::{ }; #[cfg(feature = "parquet")] use parquet::file::properties::{ - WriterProperties, WriterPropertiesBuilder, DEFAULT_BLOOM_FILTER_FPP, - DEFAULT_BLOOM_FILTER_NDV, DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED, + WriterProperties, WriterPropertiesBuilder, DEFAULT_MAX_STATISTICS_SIZE, + DEFAULT_STATISTICS_ENABLED, }; use crate::error::_config_err; @@ -507,7 +507,6 @@ impl ParquetOptions { .set_data_page_size_limit(*data_pagesize_limit) .set_write_batch_size(*write_batch_size) .set_writer_version(parse_version_string(writer_version.as_str())?) - .set_dictionary_enabled(dictionary_enabled.unwrap_or(false)) .set_dictionary_page_size_limit(*dictionary_page_size_limit) .set_statistics_enabled( statistics_enabled @@ -522,9 +521,17 @@ impl ParquetOptions { .set_created_by(created_by.clone()) .set_column_index_truncate_length(*column_index_truncate_length) .set_data_page_row_count_limit(*data_page_row_count_limit) - .set_bloom_filter_enabled(*bloom_filter_on_write) - .set_bloom_filter_fpp(bloom_filter_fpp.unwrap_or(DEFAULT_BLOOM_FILTER_FPP)) - .set_bloom_filter_ndv(bloom_filter_ndv.unwrap_or(DEFAULT_BLOOM_FILTER_NDV)); + .set_bloom_filter_enabled(*bloom_filter_on_write); + + if let Some(bloom_filter_fpp) = bloom_filter_fpp { + builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp); + }; + if let Some(bloom_filter_ndv) = bloom_filter_ndv { + builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv); + }; + if let Some(dictionary_enabled) = dictionary_enabled { + builder = builder.set_dictionary_enabled(*dictionary_enabled); + }; // We do not have access to default ColumnProperties set in Arrow. // Therefore, only overwrite if these settings exist. From 4f43390fe6bb3ebec75d98e032c46a1c62c83a55 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 15 Jul 2024 20:38:25 -0700 Subject: [PATCH 08/11] test(11367): demonstrate how the require bloom filter defaults, (required to avoid test regression), result in different default behavior than parquet crate --- .../common/src/file_options/parquet_writer.rs | 128 +++++++++++++++++- 1 file changed, 124 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 4a490db57603..4d49f6849a0e 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -282,9 +282,16 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result Date: Tue, 16 Jul 2024 14:03:43 -0700 Subject: [PATCH 09/11] chore: make more reviewable, by pulling tests for issue 11367 into followup PR --- .../common/src/file_options/parquet_writer.rs | 261 +----------------- 1 file changed, 1 insertion(+), 260 deletions(-) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 4d49f6849a0e..e1de20b1a66f 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -285,13 +285,7 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result Date: Tue, 16 Jul 2024 14:09:21 -0700 Subject: [PATCH 10/11] refactor: move all parquet-associated features into parquet-writer mod --- datafusion/common/src/config.rs | 92 ------------------- .../common/src/file_options/parquet_writer.rs | 86 ++++++++++++++++- 2 files changed, 85 insertions(+), 93 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2cf78cd72b01..1d2a9589adfc 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -22,17 +22,6 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::{self, Display}; use std::str::FromStr; -#[cfg(feature = "parquet")] -use crate::file_options::parquet_writer::{ - parse_compression_string, parse_encoding_string, parse_statistics_string, - parse_version_string, -}; -#[cfg(feature = "parquet")] -use parquet::file::properties::{ - WriterProperties, WriterPropertiesBuilder, DEFAULT_MAX_STATISTICS_SIZE, - DEFAULT_STATISTICS_ENABLED, -}; - use crate::error::_config_err; use crate::parsers::CompressionTypeVariant; use crate::{DataFusionError, Result}; @@ -465,87 +454,6 @@ config_namespace! { } } -#[cfg(feature = "parquet")] -impl ParquetOptions { - /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`]. - /// - /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options - /// applied per column; a customization which is not applicable for [`ParquetOptions`]. - pub fn writer_props_from_global_opts(&self) -> Result { - let ParquetOptions { - data_pagesize_limit, - write_batch_size, - writer_version, - compression, - dictionary_enabled, - dictionary_page_size_limit, - statistics_enabled, - max_statistics_size, - max_row_group_size, - created_by, - column_index_truncate_length, - data_page_row_count_limit, - encoding, - bloom_filter_on_write, - bloom_filter_fpp, - bloom_filter_ndv, - - // not in WriterProperties - enable_page_index: _, - pruning: _, - skip_metadata: _, - metadata_size_hint: _, - pushdown_filters: _, - reorder_filters: _, - allow_single_file_parallelism: _, - maximum_parallel_row_group_writers: _, - maximum_buffered_record_batches_per_stream: _, - bloom_filter_on_read: _, // reads not used for writer props - } = self; - - let mut builder = WriterProperties::builder() - .set_data_page_size_limit(*data_pagesize_limit) - .set_write_batch_size(*write_batch_size) - .set_writer_version(parse_version_string(writer_version.as_str())?) - .set_dictionary_page_size_limit(*dictionary_page_size_limit) - .set_statistics_enabled( - statistics_enabled - .as_ref() - .and_then(|s| parse_statistics_string(s).ok()) - .unwrap_or(DEFAULT_STATISTICS_ENABLED), - ) - .set_max_statistics_size( - max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE), - ) - .set_max_row_group_size(*max_row_group_size) - .set_created_by(created_by.clone()) - .set_column_index_truncate_length(*column_index_truncate_length) - .set_data_page_row_count_limit(*data_page_row_count_limit) - .set_bloom_filter_enabled(*bloom_filter_on_write); - - if let Some(bloom_filter_fpp) = bloom_filter_fpp { - builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp); - }; - if let Some(bloom_filter_ndv) = bloom_filter_ndv { - builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv); - }; - if let Some(dictionary_enabled) = dictionary_enabled { - builder = builder.set_dictionary_enabled(*dictionary_enabled); - }; - - // We do not have access to default ColumnProperties set in Arrow. - // Therefore, only overwrite if these settings exist. - if let Some(compression) = compression { - builder = builder.set_compression(parse_compression_string(compression)?); - } - if let Some(encoding) = encoding { - builder = builder.set_encoding(parse_encoding_string(encoding)?); - } - - Ok(builder) - } -} - config_namespace! { /// Options related to aggregate execution /// diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index e1de20b1a66f..beb720a899f4 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -17,12 +17,16 @@ //! Options related to how parquet files should be written -use crate::{config::TableParquetOptions, DataFusionError, Result}; +use crate::{ + config::{ParquetOptions, TableParquetOptions}, + DataFusionError, Result, +}; use parquet::{ basic::{BrotliLevel, GzipLevel, ZstdLevel}, file::properties::{ EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, + DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED, }, format::KeyValue, schema::types::ColumnPath, @@ -135,6 +139,86 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { } } +impl ParquetOptions { + /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`]. + /// + /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options + /// applied per column; a customization which is not applicable for [`ParquetOptions`]. + pub fn writer_props_from_global_opts(&self) -> Result { + let ParquetOptions { + data_pagesize_limit, + write_batch_size, + writer_version, + compression, + dictionary_enabled, + dictionary_page_size_limit, + statistics_enabled, + max_statistics_size, + max_row_group_size, + created_by, + column_index_truncate_length, + data_page_row_count_limit, + encoding, + bloom_filter_on_write, + bloom_filter_fpp, + bloom_filter_ndv, + + // not in WriterProperties + enable_page_index: _, + pruning: _, + skip_metadata: _, + metadata_size_hint: _, + pushdown_filters: _, + reorder_filters: _, + allow_single_file_parallelism: _, + maximum_parallel_row_group_writers: _, + maximum_buffered_record_batches_per_stream: _, + bloom_filter_on_read: _, // reads not used for writer props + } = self; + + let mut builder = WriterProperties::builder() + .set_data_page_size_limit(*data_pagesize_limit) + .set_write_batch_size(*write_batch_size) + .set_writer_version(parse_version_string(writer_version.as_str())?) + .set_dictionary_page_size_limit(*dictionary_page_size_limit) + .set_statistics_enabled( + statistics_enabled + .as_ref() + .and_then(|s| parse_statistics_string(s).ok()) + .unwrap_or(DEFAULT_STATISTICS_ENABLED), + ) + .set_max_statistics_size( + max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE), + ) + .set_max_row_group_size(*max_row_group_size) + .set_created_by(created_by.clone()) + .set_column_index_truncate_length(*column_index_truncate_length) + .set_data_page_row_count_limit(*data_page_row_count_limit) + .set_bloom_filter_enabled(*bloom_filter_on_write); + + if let Some(bloom_filter_fpp) = bloom_filter_fpp { + builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp); + }; + if let Some(bloom_filter_ndv) = bloom_filter_ndv { + builder = builder.set_bloom_filter_ndv(*bloom_filter_ndv); + }; + if let Some(dictionary_enabled) = dictionary_enabled { + builder = builder.set_dictionary_enabled(*dictionary_enabled); + }; + + // We do not have access to default ColumnProperties set in Arrow. + // Therefore, only overwrite if these settings exist. + if let Some(compression) = compression { + builder = builder.set_compression(parse_compression_string(compression)?); + } + if let Some(encoding) = encoding { + builder = builder.set_encoding(parse_encoding_string(encoding)?); + } + + Ok(builder) + } +} + /// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding pub(crate) fn parse_encoding_string( str_setting: &str, From d60e07e0282b63490f87b9e97c37ed58a478d438 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 17 Jul 2024 10:44:19 -0700 Subject: [PATCH 11/11] chore: better function naming convention --- datafusion/common/src/file_options/parquet_writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index beb720a899f4..a14cbdecf601 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -76,7 +76,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { key_value_metadata, } = table_parquet_options; - let mut builder = global.writer_props_from_global_opts()?; + let mut builder = global.into_writer_properties_builder()?; if !key_value_metadata.is_empty() { builder = builder.set_key_value_metadata(Some( @@ -144,7 +144,7 @@ impl ParquetOptions { /// /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options /// applied per column; a customization which is not applicable for [`ParquetOptions`]. - pub fn writer_props_from_global_opts(&self) -> Result { + pub fn into_writer_properties_builder(&self) -> Result { let ParquetOptions { data_pagesize_limit, write_batch_size,