Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow adding user defined metadata to ParquetSink #10224

Merged
merged 6 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,12 +1364,31 @@ impl TableOptions {

/// Options that control how Parquet files are read, including global options
/// that apply to all columns and optional column-specific overrides
///
/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
/// (e.g. sorting_columns).
#[derive(Clone, Default, Debug, PartialEq)]
pub struct TableParquetOptions {
/// Global Parquet options that propagates to all columns.
pub global: ParquetOptions,
/// Column specific options. Default usage is parquet.XX::column.
pub column_specific_options: HashMap<String, ColumnOptions>,
/// Additional file-level metadata to include. Inserted into the key_value_metadata
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
///
/// Multiple entries are permitted
/// ```sql
/// OPTIONS (
/// 'format.metadata::key1' '',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// 'format.metadata::key2' 'value',
/// 'format.metadata::key3' 'value has spaces',
/// 'format.metadata::key4' 'value has special chars :: :',
/// 'format.metadata::key_dupe' 'original will be overwritten',
/// 'format.metadata::key_dupe' 'final'
/// )
/// ```
pub key_value_metadata: HashMap<String, Option<String>>,
}

impl ConfigField for TableParquetOptions {
Expand All @@ -1380,8 +1399,24 @@ impl ConfigField for TableParquetOptions {
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Determine the key if it's a global or column-specific setting
if key.contains("::") {
// Determine if the key is a global, metadata, or column-specific setting
if key.starts_with("metadata::") {
let k =
match key.split("::").collect::<Vec<_>>()[..] {
[_meta] | [_meta, ""] => return Err(DataFusionError::Configuration(
"Invalid metadata key provided, missing key in metadata::<key>"
.to_string(),
)),
[_meta, k] => k.into(),
_ => {
return Err(DataFusionError::Configuration(format!(
"Invalid metadata key provided, found too many '::' in \"{key}\""
)))
}
};
self.key_value_metadata.insert(k, Some(value.into()));
Ok(())
} else if key.contains("::") {
self.column_specific_options.set(key, value)
} else {
self.global.set(key, value)
Expand Down Expand Up @@ -1773,4 +1808,38 @@ mod tests {
.iter()
.any(|item| item.key == "format.bloom_filter_enabled::col1"))
}

#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_metadata_entry() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.set("format.metadata::key1", "").unwrap();
table_config.set("format.metadata::key2", "value2").unwrap();
table_config
.set("format.metadata::key3", "value with spaces ")
.unwrap();
table_config
.set("format.metadata::key4", "value with special chars :: :")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

.unwrap();

let parsed_metadata = table_config.parquet.key_value_metadata.clone();
assert_eq!(parsed_metadata.get("should not exist1"), None);
assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
assert_eq!(
parsed_metadata.get("key3"),
Some(&Some("value with spaces ".into()))
);
assert_eq!(
parsed_metadata.get("key4"),
Some(&Some("value with special chars :: :".into()))
);

// duplicate keys are overwritten
table_config.set("format.metadata::key_dupe", "A").unwrap();
table_config.set("format.metadata::key_dupe", "B").unwrap();
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}
}
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ mod tests {
123
);

// properties which remain as default on WriterProperties
assert_eq!(properties.key_value_metadata(), None);
assert_eq!(properties.sorting_columns(), None);

Ok(())
}

Expand Down
104 changes: 73 additions & 31 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

//! Options related to how parquet files should be written

use crate::{config::TableParquetOptions, DataFusionError, Result};
use crate::{
config::{ParquetOptions, TableParquetOptions},
DataFusionError, Result,
};

use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterProperties, WriterVersion},
file::{
metadata::KeyValue,
properties::{EnabledStatistics, WriterProperties, WriterVersion},
},
schema::types::ColumnPath,
};

Expand All @@ -47,53 +53,87 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;

fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> {
let parquet_session_options = &parquet_options.global;
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(parquet_session_options.data_pagesize_limit)
.set_write_batch_size(parquet_session_options.write_batch_size)
.set_writer_version(parse_version_string(
&parquet_session_options.writer_version,
)?)
.set_dictionary_page_size_limit(
parquet_session_options.dictionary_page_size_limit,
)
.set_max_row_group_size(parquet_session_options.max_row_group_size)
.set_created_by(parquet_session_options.created_by.clone())
.set_column_index_truncate_length(
parquet_session_options.column_index_truncate_length,
let ParquetOptions {
wiedld marked this conversation as resolved.
Show resolved Hide resolved
data_pagesize_limit,
write_batch_size,
writer_version,
dictionary_page_size_limit,
max_row_group_size,
created_by,
column_index_truncate_length,
data_page_row_count_limit,
bloom_filter_enabled,
encoding,
dictionary_enabled,
compression,
statistics_enabled,
max_statistics_size,
bloom_filter_fpp,
bloom_filter_ndv,
// below is not part of ParquetWriterOptions
enable_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
} = &parquet_options.global;

let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() {
Some(
parquet_options
.key_value_metadata
.clone()
.drain()
.map(|(key, value)| KeyValue { key, value })
.collect::<Vec<_>>(),
)
.set_data_page_row_count_limit(
parquet_session_options.data_page_row_count_limit,
)
.set_bloom_filter_enabled(parquet_session_options.bloom_filter_enabled);
} else {
None
};

if let Some(encoding) = &parquet_session_options.encoding {
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(*data_pagesize_limit)
.set_write_batch_size(*write_batch_size)
.set_writer_version(parse_version_string(writer_version.as_str())?)
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
.set_max_row_group_size(*max_row_group_size)
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_enabled)
.set_key_value_metadata(key_value_metadata);

if let Some(encoding) = &encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
}

if let Some(enabled) = parquet_session_options.dictionary_enabled {
builder = builder.set_dictionary_enabled(enabled);
if let Some(enabled) = dictionary_enabled {
builder = builder.set_dictionary_enabled(*enabled);
}

if let Some(compression) = &parquet_session_options.compression {
if let Some(compression) = &compression {
builder = builder.set_compression(parse_compression_string(compression)?);
}

if let Some(statistics) = &parquet_session_options.statistics_enabled {
if let Some(statistics) = &statistics_enabled {
builder =
builder.set_statistics_enabled(parse_statistics_string(statistics)?);
}

if let Some(size) = parquet_session_options.max_statistics_size {
builder = builder.set_max_statistics_size(size);
if let Some(size) = max_statistics_size {
builder = builder.set_max_statistics_size(*size);
}

if let Some(fpp) = parquet_session_options.bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(fpp);
if let Some(fpp) = bloom_filter_fpp {
builder = builder.set_bloom_filter_fpp(*fpp);
}

if let Some(ndv) = parquet_session_options.bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(ndv);
if let Some(ndv) = bloom_filter_ndv {
builder = builder.set_bloom_filter_ndv(*ndv);
}

for (column, options) in &parquet_options.column_specific_options {
Expand Down Expand Up @@ -141,6 +181,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
builder.set_column_max_statistics_size(path, max_statistics_size);
}
}

// ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
Ok(ParquetWriterOptions {
writer_options: builder.build(),
})
Expand Down
29 changes: 26 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ mod tests {
};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::index::Index;
use tokio::fs::File;

Expand Down Expand Up @@ -1857,7 +1857,13 @@ mod tests {
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions::default(),
TableParquetOptions {
key_value_metadata: std::collections::HashMap::from([
("my-data".to_string(), Some("stuff".to_string())),
("my-data-bool-key".to_string(), None),
]),
..Default::default()
},
));

// create data
Expand Down Expand Up @@ -1891,7 +1897,10 @@ mod tests {
let (
path,
FileMetaData {
num_rows, schema, ..
num_rows,
schema,
key_value_metadata,
..
},
) = written.take(1).next().unwrap();
let path_parts = path.parts().collect::<Vec<_>>();
Expand All @@ -1907,6 +1916,20 @@ mod tests {
"output file metadata should contain col b"
);

let mut key_value_metadata = key_value_metadata.unwrap();
key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
let expected_metadata = vec![
KeyValue {
key: "my-data".to_string(),
value: Some("stuff".to_string()),
},
KeyValue {
key: "my-data-bool-key".to_string(),
value: None,
},
];
assert_eq!(key_value_metadata, expected_metadata);

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
.unwrap()
.unwrap(),
column_specific_options,
key_value_metadata: Default::default(),
})
}
}
Expand Down
64 changes: 63 additions & 1 deletion datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,73 @@ OPTIONS (
'format.statistics_enabled::col2' none,
'format.max_statistics_size' 123,
'format.bloom_filter_fpp' 0.001,
'format.bloom_filter_ndv' 100
'format.bloom_filter_ndv' 100,
'format.metadata::key' 'value'
)
----
2

# valid vs invalid metadata

# accepts map with a single entry
statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata::key' 'value'
)

# accepts multiple entries (on different keys)
statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata::key1' '',
'format.metadata::key2' 'value',
'format.metadata::key3' 'value with spaces',
'format.metadata::key4' 'value with special chars :: :'
)

# accepts multiple entries with the same key (will overwrite)
statement ok
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata::key1' 'value',
'format.metadata::key1' 'value'
)
Comment on lines +315 to +323
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overwriting is a common feature to all of the config OPTIONS(...). If we want to change, then I recommend a followup ticket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree there is no need to change it in this PR


# errors if key is missing
statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata key provided, missing key in metadata::<key>
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata::' 'value'
)

# errors if key contains internal '::'
statement error DataFusion error: Invalid or Unsupported Configuration: Invalid metadata key provided, found too many '::' in "metadata::key::extra"
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.metadata::key::extra' 'value'
)

# errors for invalid property (not stating `format.metadata`)
statement error DataFusion error: Invalid or Unsupported Configuration: Config value "wrong-metadata" not found on ColumnOptions
COPY source_table
TO 'test_files/scratch/copy/table_with_metadata/'
STORED AS PARQUET
OPTIONS (
'format.wrong-metadata::key' 'value'
)


# validate multiple parquet file output with all options set
statement ok
CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/';
Expand Down