Skip to content

Commit

Permalink
Fix parquet statistics for ListingTable and Utf8View with `schema_for…
Browse files Browse the repository at this point in the history
…ce_string_view`, rename config option to `schema_force_view_types` (#12232)

* chore: move schema_force_string_view upwards to be listed with other reading props

* refactor(12123): have file schema be merged on view types with table schema

* test(12123): test for with, and without schema_force_string_view

* test(12123): demonstrate current upstream failure when reading page stats

* chore(12123): update config.md

* chore: cleanup

* chore(12123): temporarily remove test until next arrow release

* chore(12123): rename all variables to force_view_types

* refactor(12123): make interface ParquetFormat::with_force_view_types public

* chore(12123): rename helper method which coerces the schema (not merging fields)

* chore(12123): add dosc to ParquetFormat to clarify exactly how the view types are used

* test(12123): cleanup tests to be more explicit with ForceViews enum

* test(12123): update tests to pass now that latest arrow-rs release is in

* fix: use proper naming on benchmark
  • Loading branch information
wiedld committed Sep 10, 2024
1 parent 8d2b240 commit 3ece7a7
Show file tree
Hide file tree
Showing 21 changed files with 318 additions and 95 deletions.
2 changes: 1 addition & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl RunOpt {
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;
.schema_force_view_types = self.common.force_view_types;

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl RunOpt {
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;
.schema_force_view_types = self.common.force_view_types;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down Expand Up @@ -345,7 +345,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
force_view_types: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down Expand Up @@ -379,7 +379,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
force_view_types: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct CommonOpt {
/// If true, will use StringView/BinaryViewArray instead of String/BinaryArray
/// when reading ParquetFiles
#[structopt(long)]
pub string_view: bool,
pub force_view_types: bool,
}

impl CommonOpt {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ config_namespace! {
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false

/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_view_types: bool, default = false

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down Expand Up @@ -483,10 +487,6 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_string_view: bool, default = false
}
}

Expand Down
7 changes: 3 additions & 4 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl ParquetOptions {
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _, // reads not used for writer props
schema_force_string_view: _,
schema_force_view_types: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -441,7 +441,7 @@ mod tests {
maximum_buffered_record_batches_per_stream: defaults
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: defaults.bloom_filter_on_read,
schema_force_string_view: defaults.schema_force_string_view,
schema_force_view_types: defaults.schema_force_view_types,
}
}

Expand Down Expand Up @@ -542,8 +542,7 @@ mod tests {
maximum_buffered_record_batches_per_stream: global_options_defaults
.maximum_buffered_record_batches_per_stream,
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
schema_force_string_view: global_options_defaults
.schema_force_string_view,
schema_force_view_types: global_options_defaults.schema_force_view_types,
},
column_specific_options,
key_value_metadata,
Expand Down
45 changes: 45 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,51 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema {
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}

/// Coerces the file schema if the table schema uses a view type.
pub(crate) fn coerce_file_schema_to_view_type(
table_schema: &Schema,
file_schema: &Schema,
) -> Option<Schema> {
let mut transform = false;
let table_fields: HashMap<_, _> = table_schema
.fields
.iter()
.map(|f| {
let dt = f.data_type();
if dt.equals_datatype(&DataType::Utf8View) {
transform = true;
}
(f.name(), dt)
})
.collect();
if !transform {
return None;
}

let transformed_fields: Vec<Arc<Field>> = file_schema
.fields
.iter()
.map(
|field| match (table_fields.get(field.name()), field.data_type()) {
(Some(DataType::Utf8View), DataType::Utf8)
| (Some(DataType::Utf8View), DataType::LargeUtf8) => Arc::new(
Field::new(field.name(), DataType::Utf8View, field.is_nullable()),
),
(Some(DataType::BinaryView), DataType::Binary)
| (Some(DataType::BinaryView), DataType::LargeBinary) => Arc::new(
Field::new(field.name(), DataType::BinaryView, field.is_nullable()),
),
_ => field.clone(),
},
)
.collect();

Some(Schema::new_with_metadata(
transformed_fields,
file_schema.metadata.clone(),
))
}

#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
Expand Down
Loading

0 comments on commit 3ece7a7

Please sign in to comment.