Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: more clearly delineate between TableParquetOptions and ParquetWriterOptions #11444

Merged
merged 11 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
357 changes: 357 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Display};
use std::str::FromStr;

#[cfg(feature = "parquet")]
use crate::file_options::parquet_writer::{
parse_compression_string, parse_encoding_string, parse_statistics_string,
parse_version_string,
};
#[cfg(feature = "parquet")]
use parquet::{
file::properties::{
WriterProperties, WriterPropertiesBuilder, DEFAULT_BLOOM_FILTER_FPP,
DEFAULT_BLOOM_FILTER_NDV, DEFAULT_MAX_STATISTICS_SIZE,
DEFAULT_STATISTICS_ENABLED,
},
format::KeyValue,
schema::types::ColumnPath,
};

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::{DataFusionError, Result};
Expand Down Expand Up @@ -454,6 +470,80 @@ config_namespace! {
}
}

#[cfg(feature = "parquet")]
wiedld marked this conversation as resolved.
Show resolved Hide resolved
impl ParquetOptions {
/// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
///
/// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
/// applied per column; a customization which is not applicable for [`ParquetOptions`].
pub fn writer_props_from_global_opts(&self) -> Result<WriterPropertiesBuilder> {
wiedld marked this conversation as resolved.
Show resolved Hide resolved
let ParquetOptions {
data_pagesize_limit,
write_batch_size,
writer_version,
compression,
dictionary_enabled,
dictionary_page_size_limit,
statistics_enabled,
max_statistics_size,
max_row_group_size,
created_by,
column_index_truncate_length,
data_page_row_count_limit,
encoding,
bloom_filter_on_write,
bloom_filter_fpp,
bloom_filter_ndv,

// not in WriterProperties
wiedld marked this conversation as resolved.
Show resolved Hide resolved
enable_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _, // reads not used for writer props
} = self;

let mut builder = WriterProperties::builder()
.set_data_page_size_limit(*data_pagesize_limit)
.set_write_batch_size(*write_batch_size)
.set_writer_version(parse_version_string(writer_version.as_str())?)
.set_dictionary_enabled(dictionary_enabled.unwrap_or(false))
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
.set_statistics_enabled(
statistics_enabled
.as_ref()
.and_then(|s| parse_statistics_string(s).ok())
.unwrap_or(DEFAULT_STATISTICS_ENABLED),
)
.set_max_statistics_size(
max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE),
)
.set_max_row_group_size(*max_row_group_size)
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_on_write)
.set_bloom_filter_fpp(bloom_filter_fpp.unwrap_or(DEFAULT_BLOOM_FILTER_FPP))
.set_bloom_filter_ndv(bloom_filter_ndv.unwrap_or(DEFAULT_BLOOM_FILTER_NDV));

// We do not have access to default ColumnProperties set in Arrow.
// Therefore, only overwrite if these settings exist.
if let Some(compression) = compression {
builder = builder.set_compression(parse_compression_string(compression)?);
}
if let Some(encoding) = encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
}

Ok(builder)
}
}

config_namespace! {
/// Options related to aggregate execution
///
Expand Down Expand Up @@ -1421,6 +1511,84 @@ impl TableParquetOptions {
}
}

#[cfg(feature = "parquet")]
impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
wiedld marked this conversation as resolved.
Show resolved Hide resolved
type Error = DataFusionError;

/// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
///
/// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
// Table options include kv_metadata and col-specific options
let TableParquetOptions {
global,
column_specific_options,
key_value_metadata,
} = table_parquet_options;

let mut builder = global.writer_props_from_global_opts()?;

if !key_value_metadata.is_empty() {
builder = builder.set_key_value_metadata(Some(
key_value_metadata
.to_owned()
.drain()
.map(|(key, value)| KeyValue { key, value })
.collect(),
));
}

// Apply column-specific options:
for (column, options) in column_specific_options {
let path = ColumnPath::new(column.split('.').map(|s| s.to_owned()).collect());

if let Some(bloom_filter_enabled) = options.bloom_filter_enabled {
builder = builder
.set_column_bloom_filter_enabled(path.clone(), bloom_filter_enabled);
}

if let Some(encoding) = &options.encoding {
let parsed_encoding = parse_encoding_string(encoding)?;
builder = builder.set_column_encoding(path.clone(), parsed_encoding);
}

if let Some(dictionary_enabled) = options.dictionary_enabled {
builder = builder
.set_column_dictionary_enabled(path.clone(), dictionary_enabled);
}

if let Some(compression) = &options.compression {
let parsed_compression = parse_compression_string(compression)?;
builder =
builder.set_column_compression(path.clone(), parsed_compression);
}

if let Some(statistics_enabled) = &options.statistics_enabled {
let parsed_value = parse_statistics_string(statistics_enabled)?;
builder =
builder.set_column_statistics_enabled(path.clone(), parsed_value);
}

if let Some(bloom_filter_fpp) = options.bloom_filter_fpp {
builder =
builder.set_column_bloom_filter_fpp(path.clone(), bloom_filter_fpp);
}

if let Some(bloom_filter_ndv) = options.bloom_filter_ndv {
builder =
builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
}

if let Some(max_statistics_size) = options.max_statistics_size {
builder =
builder.set_column_max_statistics_size(path, max_statistics_size);
}
}

Ok(builder)
}
}

impl ConfigField for TableParquetOptions {
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
self.global.visit(v, key_prefix, description);
Expand Down Expand Up @@ -1874,4 +2042,193 @@ mod tests {
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}

#[cfg(feature = "parquet")]
mod test_conversion_from_session_to_writer_props {
use super::super::*;
use parquet::{basic::Compression, file::properties::EnabledStatistics};

const COL_NAME: &str = "configured";

/// Take the column defaults provided in [`ParquetOptions`], and generate a non-default col config.
fn column_options_with_non_defaults(
src_col_defaults: &ParquetOptions,
) -> ColumnOptions {
ColumnOptions {
compression: Some("zstd(22)".into()),
dictionary_enabled: src_col_defaults.dictionary_enabled.map(|v| !v),
statistics_enabled: Some("page".into()),
max_statistics_size: Some(72),
encoding: Some("RLE".into()),
bloom_filter_enabled: Some(true),
bloom_filter_fpp: Some(0.72),
bloom_filter_ndv: Some(72),
}
}

fn parquet_options_with_non_defaults() -> ParquetOptions {
let defaults = ParquetOptions::default();
let writer_version = if defaults.writer_version.eq("1.0") {
"2.0"
} else {
"1.0"
};

ParquetOptions {
data_pagesize_limit: 42,
write_batch_size: 42,
writer_version: writer_version.into(),
compression: Some("zstd(22)".into()),
dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
dictionary_page_size_limit: 42,
statistics_enabled: Some("chunk".into()),
max_statistics_size: Some(42),
max_row_group_size: 42,
created_by: "wordy".into(),
column_index_truncate_length: Some(42),
data_page_row_count_limit: 42,
encoding: Some("BYTE_STREAM_SPLIT".into()),
bloom_filter_on_write: !defaults.bloom_filter_on_write,
bloom_filter_fpp: Some(0.42),
bloom_filter_ndv: Some(42),

// not in WriterProperties, but itemizing here to not skip newly added props
enable_page_index: Default::default(),
pruning: Default::default(),
skip_metadata: Default::default(),
metadata_size_hint: Default::default(),
pushdown_filters: Default::default(),
reorder_filters: Default::default(),
allow_single_file_parallelism: Default::default(),
maximum_parallel_row_group_writers: Default::default(),
maximum_buffered_record_batches_per_stream: Default::default(),
bloom_filter_on_read: Default::default(),
}
}

fn extract_column_options(
props: &WriterProperties,
col: ColumnPath,
) -> ColumnOptions {
let bloom_filter_default_props = props.bloom_filter_properties(&col);

ColumnOptions {
bloom_filter_enabled: Some(bloom_filter_default_props.is_some()),
encoding: props.encoding(&col).map(|s| s.to_string()),
dictionary_enabled: Some(props.dictionary_enabled(&col)),
compression: match props.compression(&col) {
Compression::ZSTD(lvl) => {
Some(format!("zstd({})", lvl.compression_level()))
}
_ => None,
},
statistics_enabled: Some(
match props.statistics_enabled(&col) {
EnabledStatistics::None => "none",
EnabledStatistics::Chunk => "chunk",
EnabledStatistics::Page => "page",
}
.into(),
),
bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
max_statistics_size: Some(props.max_statistics_size(&col)),
}
}

/// For testing only, take a single write's props and convert back into the session config.
/// (use identity to confirm correct.)
fn session_config_from_writer_props(
props: &WriterProperties,
) -> TableParquetOptions {
let default_col = ColumnPath::from("col doesn't have specific config");
let default_col_props = extract_column_options(props, default_col);

let configured_col = ColumnPath::from(COL_NAME);
let configured_col_props = extract_column_options(props, configured_col);

let key_value_metadata = props
.key_value_metadata()
.map(|pairs| {
HashMap::from_iter(
pairs
.iter()
.cloned()
.map(|KeyValue { key, value }| (key, value)),
)
})
.unwrap_or_default();

TableParquetOptions {
global: ParquetOptions {
// global options
data_pagesize_limit: props.dictionary_page_size_limit(),
write_batch_size: props.write_batch_size(),
writer_version: format!("{}.0", props.writer_version().as_num()),
dictionary_page_size_limit: props.dictionary_page_size_limit(),
max_row_group_size: props.max_row_group_size(),
created_by: props.created_by().to_string(),
column_index_truncate_length: props.column_index_truncate_length(),
data_page_row_count_limit: props.data_page_row_count_limit(),

// global options which set the default column props
encoding: default_col_props.encoding,
compression: default_col_props.compression,
dictionary_enabled: default_col_props.dictionary_enabled,
statistics_enabled: default_col_props.statistics_enabled,
max_statistics_size: default_col_props.max_statistics_size,
bloom_filter_on_write: default_col_props
.bloom_filter_enabled
.unwrap_or_default(),
bloom_filter_fpp: default_col_props.bloom_filter_fpp,
bloom_filter_ndv: default_col_props.bloom_filter_ndv,

// not in WriterProperties
enable_page_index: Default::default(),
pruning: Default::default(),
skip_metadata: Default::default(),
metadata_size_hint: Default::default(),
pushdown_filters: Default::default(),
reorder_filters: Default::default(),
allow_single_file_parallelism: Default::default(),
maximum_parallel_row_group_writers: Default::default(),
maximum_buffered_record_batches_per_stream: Default::default(),
bloom_filter_on_read: Default::default(),
},
column_specific_options: HashMap::from([(
COL_NAME.into(),
configured_col_props,
)]),
key_value_metadata,
}
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could also add a test here that described in #11367 -- that takes a default ParquetOptions, creates the ArrowWriterOption and then ensures the fields are the same as ArrowWriterOptions::default()

Copy link
Contributor Author

@wiedld wiedld Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this^^ requested test in this commit, but did the equivalency assertion a bit differently. Parquet's WriterProperties and it's builder do not implement Eq or PartialEq.

Rather than asserting each prop independently based upon the getter, I elected to (a) explicitly define the props that do not match, and then (b) use the same identify function (session_config_from_writer_props) for the assertion.

Copy link
Contributor Author

@wiedld wiedld Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to make a slight change to avoid a test regression in the sqllogictests.

That^^ change (to avoid test regression) also brings us further from the matching-default-settings between the extern parquet vs datafusion. I added more tests to demonstrate this divergence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the follow up PR, I plan to propose (and open for discussion) which defaults should be changed in datafusion vs parquet.

Copy link
Contributor Author

@wiedld wiedld Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per request, moving these into a follow up PR. (Make more reviewable.)

fn table_parquet_opts_to_writer_props() {
// ParquetOptions, all props set to non-default
let parquet_options = parquet_options_with_non_defaults();

// TableParquetOptions, using ParquetOptions for global settings
let key = "foo".to_string();
let value = Some("bar".into());
let table_parquet_opts = TableParquetOptions {
global: parquet_options.clone(),
column_specific_options: [(
COL_NAME.into(),
column_options_with_non_defaults(&parquet_options),
)]
.into(),
key_value_metadata: [(key.clone(), value.clone())].into(),
};

let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
.unwrap()
.build();
assert_eq!(
table_parquet_opts,
session_config_from_writer_props(&writer_props),
"the writer_props should have the same configuration as the session's TableParquetOptions",
);
}
}
}
Loading
Loading