From 322c1cfb97fc86e4942f51af56b073b278ecf99c Mon Sep 17 00:00:00 2001 From: Eric Fredine Date: Thu, 20 Jun 2024 16:32:00 -0700 Subject: [PATCH 1/3] Adds option for providing a schema to the Arrow Parquet Reader. --- parquet/src/arrow/arrow_reader/mod.rs | 218 +++++++++++++++++++++++--- 1 file changed, 194 insertions(+), 24 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 070dda97c59a..95012d40e8f6 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -25,25 +25,24 @@ use arrow_array::Array; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; +pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +pub use selection::{RowSelection, RowSelector}; +pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; -use crate::arrow::{FieldLevels, ProjectionMask}; +use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; +use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; +use crate::file::footer; use crate::file::metadata::ParquetMetaData; +use crate::file::page_index::index_reader; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; mod filter; mod selection; -pub use crate::arrow::array_reader::RowGroups; -use crate::column::page::{PageIterator, PageReader}; -use crate::file::footer; -use crate::file::page_index::index_reader; -pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelector}; - /// Builder for constructing parquet readers into arrow. /// /// Most users should use one of the following specializations: @@ -250,6 +249,8 @@ impl ArrowReaderBuilder { pub struct ArrowReaderOptions { /// Should the reader strip any user defined metadata from the Arrow schema skip_arrow_metadata: bool, + /// If provided used as the schema for the file, otherwise the schema is read from the file + supplied_schema: Option, /// If true, attempt to read `OffsetIndex` and `ColumnIndex` pub(crate) page_index: bool, } @@ -273,6 +274,23 @@ impl ArrowReaderOptions { } } + /// Provide a schema to use when reading the parquet file. If provided it + /// takes precedence over the schema inferred from the file or the schema defined + /// in the file's metadata. If the schema is not compatible with the file's + /// schema an error will be returned when constructing the builder. + /// + /// This option is only required if you want to cast columns to a different type. + /// For example, if you wanted to cast from an Int64 in the Parquet file to a Timestamp + /// in the Arrow schema. + /// + pub fn with_schema(self, schema: SchemaRef) -> Self { + Self { + supplied_schema: Some(schema), + skip_arrow_metadata: true, + ..self + } + } + /// Enable reading [`PageIndex`], if present (defaults to `false`) /// /// The `PageIndex` can be used to push down predicates to the parquet scan, @@ -353,22 +371,46 @@ impl ArrowReaderMetadata { /// This function does not attempt to load the PageIndex if not present in the metadata. /// See [`Self::load`] for more details. pub fn try_new(metadata: Arc, options: ArrowReaderOptions) -> Result { - let kv_metadata = match options.skip_arrow_metadata { - true => None, - false => metadata.file_metadata().key_value_metadata(), - }; - - let (schema, fields) = parquet_to_arrow_schema_and_fields( - metadata.file_metadata().schema_descr(), - ProjectionMask::all(), - kv_metadata, - )?; + match options.supplied_schema { + Some(supplied_schema) => { + let parquet_schema = metadata.file_metadata().schema_descr(); + let field_levels = parquet_to_arrow_field_levels( + parquet_schema, + ProjectionMask::all(), + Some(supplied_schema.fields()), + )?; + let inferred_schema = Schema::new(field_levels.fields); + if supplied_schema.contains(&inferred_schema) { + Ok(Self { + metadata, + schema: Arc::from(inferred_schema), + fields: field_levels.levels.map(Arc::new), + }) + } else { + Err(ParquetError::ArrowError( + "supplied schema does not match the parquet schema".into(), + )) + } + } + None => { + let kv_metadata = match options.skip_arrow_metadata { + true => None, + false => metadata.file_metadata().key_value_metadata(), + }; - Ok(Self { - metadata, - schema: Arc::new(schema), - fields: fields.map(Arc::new), - }) + let (schema, fields) = parquet_to_arrow_schema_and_fields( + metadata.file_metadata().schema_descr(), + ProjectionMask::all(), + kv_metadata, + )?; + + Ok(Self { + metadata, + schema: Arc::new(schema), + fields: fields.map(Arc::new), + }) + } + } } /// Returns a reference to the [`ParquetMetaData`] for this parquet file @@ -842,7 +884,7 @@ mod tests { use arrow_array::*; use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime}; use arrow_data::ArrayDataBuilder; - use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, Schema}; + use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef}; use arrow_select::concat::concat_batches; use crate::arrow::arrow_reader::{ @@ -2307,10 +2349,12 @@ mod tests { fn test_invalid_utf8_string_array() { test_invalid_utf8_string_array_inner::(); } + #[test] fn test_invalid_utf8_large_string_array() { test_invalid_utf8_string_array_inner::(); } + fn test_invalid_utf8_string_array_inner() { let cases = [ ( @@ -2620,6 +2664,132 @@ mod tests { assert_eq!(reader.schema(), schema_without_metadata); } + fn gen_parquet_file(values: &[T::T], file: File) -> Result + where + T: DataType, + { + let len = match T::get_physical_type() { + crate::basic::Type::INT96 => 12, + _ => -1, + }; + + let fields = vec![Arc::new( + Type::primitive_type_builder("leaf", T::get_physical_type()) + .with_repetition(Repetition::REQUIRED) + .with_converted_type(ConvertedType::NONE) + .with_length(len) + .build() + .unwrap(), + )]; + + let schema = Arc::new( + Type::group_type_builder("test_schema") + .with_fields(fields) + .build() + .unwrap(), + ); + let mut writer = SerializedFileWriter::new(file, schema, Default::default())?; + + let mut row_group_writer = writer.next_row_group()?; + let mut column_writer = row_group_writer.next_column()?.unwrap(); + + column_writer.typed::().write_batch(values, None, None)?; + + column_writer.close()?; + row_group_writer.close()?; + writer.close() + } + + fn get_builder_with_schema( + file: File, + schema: SchemaRef, + ) -> Result> { + let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone()); + ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options_with_schema, + ) + } + + #[test] + fn test_with_schema_int64_to_timestamp() { + let values: Vec = vec![0]; + let file = tempfile().unwrap(); + gen_parquet_file::(&values, file.try_clone().unwrap()).unwrap(); + let supplied_schema = Arc::new(Schema::new(vec![Field::new( + "leaf", + ArrowDataType::Timestamp( + arrow::datatypes::TimeUnit::Nanosecond, + Some("+01:00".into()), + ), + false, + )])); + let builder = + get_builder_with_schema(file.try_clone().unwrap(), supplied_schema.clone()).unwrap(); + + let mut arrow_reader = builder.build().unwrap(); + + assert_eq!(arrow_reader.schema(), supplied_schema); + + let batch = arrow_reader.next().unwrap().unwrap(); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.num_rows(), 1); + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value_as_datetime_with_tz(0, "+01:00".parse().unwrap()) + .map(|v| v.to_string()) + .unwrap(), + "1970-01-01 01:00:00 +01:00" + ); + } + + #[test] + fn test_schema_too_many_columns() { + let values: Vec = vec![0]; + let file = tempfile().unwrap(); + gen_parquet_file::(&values, file.try_clone().unwrap()).unwrap(); + let supplied_schema = Arc::new(Schema::new(vec![ + Field::new("leaf", ArrowDataType::Int64, false), + Field::new("extra", ArrowDataType::Int32, true), + ])); + let options_with_schema = ArrowReaderOptions::new().with_schema(supplied_schema.clone()); + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options_with_schema, + ); + + assert_eq!( + builder.err().unwrap().to_string(), + "Arrow: incompatible arrow schema, expected 1 struct fields got 2" + ); + } + + #[test] + fn test_schema_incompatible_column() { + let values: Vec = vec![0]; + let file = tempfile().unwrap(); + gen_parquet_file::(&values, file.try_clone().unwrap()).unwrap(); + let supplied_schema = Arc::new(Schema::new(vec![Field::new( + "leaf", + ArrowDataType::Int32, + false, + )])); + let options_with_schema = ArrowReaderOptions::new().with_schema(supplied_schema.clone()); + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options_with_schema, + ); + + assert_eq!( + builder.err().unwrap().to_string(), + "Arrow: supplied schema does not match the parquet schema" + ); + } + #[test] fn test_empty_projection() { let testdata = arrow::util::test_util::parquet_test_data(); From 3b4b2a1f594e33585110f8be9335a60c22c5a28f Mon Sep 17 00:00:00 2001 From: Eric Fredine Date: Fri, 28 Jun 2024 16:07:54 -0700 Subject: [PATCH 2/3] Adds more complete tests. Adds a more detailed error message for incompatible columns. Adds nested fields to test_with_schema. Adds test for incompatible nested field. Updates documentation. --- parquet/src/arrow/arrow_reader/mod.rs | 393 +++++++++++++++++++------- 1 file changed, 285 insertions(+), 108 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 95012d40e8f6..82d992f4a84d 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -283,6 +283,9 @@ impl ArrowReaderOptions { /// For example, if you wanted to cast from an Int64 in the Parquet file to a Timestamp /// in the Arrow schema. /// + /// The supplied schema must have the same number of columns as the parquet schema and + /// the column names need to be the same. + /// pub fn with_schema(self, schema: SchemaRef) -> Self { Self { supplied_schema: Some(schema), @@ -372,26 +375,7 @@ impl ArrowReaderMetadata { /// See [`Self::load`] for more details. pub fn try_new(metadata: Arc, options: ArrowReaderOptions) -> Result { match options.supplied_schema { - Some(supplied_schema) => { - let parquet_schema = metadata.file_metadata().schema_descr(); - let field_levels = parquet_to_arrow_field_levels( - parquet_schema, - ProjectionMask::all(), - Some(supplied_schema.fields()), - )?; - let inferred_schema = Schema::new(field_levels.fields); - if supplied_schema.contains(&inferred_schema) { - Ok(Self { - metadata, - schema: Arc::from(inferred_schema), - fields: field_levels.levels.map(Arc::new), - }) - } else { - Err(ParquetError::ArrowError( - "supplied schema does not match the parquet schema".into(), - )) - } - } + Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()), None => { let kv_metadata = match options.skip_arrow_metadata { true => None, @@ -413,6 +397,56 @@ impl ArrowReaderMetadata { } } + fn with_supplied_schema( + metadata: Arc, + supplied_schema: SchemaRef, + ) -> Result { + let parquet_schema = metadata.file_metadata().schema_descr(); + let field_levels = parquet_to_arrow_field_levels( + parquet_schema, + ProjectionMask::all(), + Some(supplied_schema.fields()), + )?; + let fields = field_levels.fields; + let inferred_len = fields.len(); + let supplied_len = supplied_schema.fields().len(); + // Ensure the supplied schema has the same number of columns as the parquet schema. + // parquet_to_arrow_field_levels is expected to throw an error if the schemas have + // different lengths, but we check here to be safe. + if inferred_len != supplied_len { + Err(arrow_err!(format!( + "incompatible arrow schema, expected {} columns received {}", + inferred_len, supplied_len + ))) + } else { + let diff_fields: Vec<_> = supplied_schema + .fields() + .iter() + .zip(fields.iter()) + .filter_map(|(field1, field2)| { + if field1 != field2 { + Some(field1.name().clone()) + } else { + None + } + }) + .collect(); + + if !diff_fields.is_empty() { + Err(ParquetError::ArrowError(format!( + "incompatible arrow schema, the following fields could not be cast: [{}]", + diff_fields.join(", ") + ))) + } else { + Ok(Self { + metadata, + schema: supplied_schema, + fields: field_levels.levels.map(Arc::new), + }) + } + } + } + /// Returns a reference to the [`ParquetMetaData`] for this parquet file pub fn metadata(&self) -> &Arc { &self.metadata @@ -2664,129 +2698,272 @@ mod tests { assert_eq!(reader.schema(), schema_without_metadata); } - fn gen_parquet_file(values: &[T::T], file: File) -> Result + fn write_parquet_from_iter(value: I) -> File where - T: DataType, + I: IntoIterator, + F: AsRef, { - let len = match T::get_physical_type() { - crate::basic::Type::INT96 => 12, - _ => -1, - }; + let batch = RecordBatch::try_from_iter(value).unwrap(); + let file = tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + file + } - let fields = vec![Arc::new( - Type::primitive_type_builder("leaf", T::get_physical_type()) - .with_repetition(Repetition::REQUIRED) - .with_converted_type(ConvertedType::NONE) - .with_length(len) - .build() - .unwrap(), - )]; + fn run_schema_test_with_error(value: I, schema: SchemaRef, expected_error: &str) + where + I: IntoIterator, + F: AsRef, + { + let file = write_parquet_from_iter(value); + let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone()); + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options_with_schema, + ); + assert_eq!(builder.err().unwrap().to_string(), expected_error); + } - let schema = Arc::new( - Type::group_type_builder("test_schema") - .with_fields(fields) - .build() - .unwrap(), + #[test] + fn test_schema_too_few_columns() { + run_schema_test_with_error( + vec![ + ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef), + ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef), + ], + Arc::new(Schema::new(vec![Field::new( + "int64", + ArrowDataType::Int64, + false, + )])), + "Arrow: incompatible arrow schema, expected 2 struct fields got 1", ); - let mut writer = SerializedFileWriter::new(file, schema, Default::default())?; + } - let mut row_group_writer = writer.next_row_group()?; - let mut column_writer = row_group_writer.next_column()?.unwrap(); + #[test] + fn test_schema_too_many_columns() { + run_schema_test_with_error( + vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)], + Arc::new(Schema::new(vec![ + Field::new("int64", ArrowDataType::Int64, false), + Field::new("int32", ArrowDataType::Int32, false), + ])), + "Arrow: incompatible arrow schema, expected 1 struct fields got 2", + ); + } - column_writer.typed::().write_batch(values, None, None)?; + #[test] + fn test_schema_mismatched_column_names() { + run_schema_test_with_error( + vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)], + Arc::new(Schema::new(vec![Field::new( + "other", + ArrowDataType::Int64, + false, + )])), + "Arrow: incompatible arrow schema, expected field named int64 got other", + ); + } - column_writer.close()?; - row_group_writer.close()?; - writer.close() + #[test] + fn test_schema_incompatible_columns() { + run_schema_test_with_error( + vec![ + ( + "col1_invalid", + Arc::new(Int64Array::from(vec![0])) as ArrayRef, + ), + ( + "col2_valid", + Arc::new(Int32Array::from(vec![0])) as ArrayRef, + ), + ( + "col3_invalid", + Arc::new(Date64Array::from(vec![0])) as ArrayRef, + ), + ], + Arc::new(Schema::new(vec![ + Field::new("col1_invalid", ArrowDataType::Int32, false), + Field::new("col2_valid", ArrowDataType::Int32, false), + Field::new("col3_invalid", ArrowDataType::Int32, false), + ])), + "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]", + ); } - fn get_builder_with_schema( - file: File, - schema: SchemaRef, - ) -> Result> { - let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone()); - ParquetRecordBatchReaderBuilder::try_new_with_options( - file.try_clone().unwrap(), - options_with_schema, + #[test] + fn test_one_incompatible_nested_column() { + let nested_fields = Fields::from(vec![ + Field::new("nested1_valid", ArrowDataType::Utf8, false), + Field::new("nested1_invalid", ArrowDataType::Int64, false), + ]); + let nested = StructArray::try_new( + nested_fields, + vec![ + Arc::new(StringArray::from(vec!["a"])) as ArrayRef, + Arc::new(Int64Array::from(vec![0])) as ArrayRef, + ], + None, ) + .expect("struct array"); + let supplied_nested_fields = Fields::from(vec![ + Field::new("nested1_valid", ArrowDataType::Utf8, false), + Field::new("nested1_invalid", ArrowDataType::Int32, false), + ]); + run_schema_test_with_error( + vec![ + ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef), + ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef), + ("nested", Arc::new(nested) as ArrayRef), + ], + Arc::new(Schema::new(vec![ + Field::new("col1", ArrowDataType::Int64, false), + Field::new("col2", ArrowDataType::Int32, false), + Field::new( + "nested", + ArrowDataType::Struct(supplied_nested_fields), + false, + ), + ])), + "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]", + ); } #[test] - fn test_with_schema_int64_to_timestamp() { - let values: Vec = vec![0]; - let file = tempfile().unwrap(); - gen_parquet_file::(&values, file.try_clone().unwrap()).unwrap(); - let supplied_schema = Arc::new(Schema::new(vec![Field::new( - "leaf", - ArrowDataType::Timestamp( - arrow::datatypes::TimeUnit::Nanosecond, - Some("+01:00".into()), + fn test_with_schema() { + let nested_fields = Fields::from(vec![ + Field::new("utf8_to_dict", ArrowDataType::Utf8, false), + Field::new("int64_to_ts_nano", ArrowDataType::Int64, false), + ]); + + let nested_arrays: Vec = vec![ + Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef, + Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef, + ]; + + let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap(); + + let file = write_parquet_from_iter(vec![ + ( + "int32_to_ts_second", + Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef, ), - false, - )])); - let builder = - get_builder_with_schema(file.try_clone().unwrap(), supplied_schema.clone()).unwrap(); + ( + "date32_to_date64", + Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef, + ), + ("nested", Arc::new(nested) as ArrayRef), + ]); - let mut arrow_reader = builder.build().unwrap(); + let supplied_nested_fields = Fields::from(vec![ + Field::new( + "utf8_to_dict", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::Int32), + Box::new(ArrowDataType::Utf8), + ), + false, + ), + Field::new( + "int64_to_ts_nano", + ArrowDataType::Timestamp( + arrow::datatypes::TimeUnit::Nanosecond, + Some("+10:00".into()), + ), + false, + ), + ]); - assert_eq!(arrow_reader.schema(), supplied_schema); + let supplied_schema = Arc::new(Schema::new(vec![ + Field::new( + "int32_to_ts_second", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())), + false, + ), + Field::new("date32_to_date64", ArrowDataType::Date64, false), + Field::new( + "nested", + ArrowDataType::Struct(supplied_nested_fields), + false, + ), + ])); + let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone()); + let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options, + ) + .expect("reader builder with schema") + .build() + .expect("reader with schema"); + + assert_eq!(arrow_reader.schema(), supplied_schema); let batch = arrow_reader.next().unwrap().unwrap(); - assert_eq!(batch.num_columns(), 1); - assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 3); + assert_eq!(batch.num_rows(), 4); assert_eq!( batch .column(0) .as_any() - .downcast_ref::() - .unwrap() + .downcast_ref::() + .expect("downcast to timestamp second") .value_as_datetime_with_tz(0, "+01:00".parse().unwrap()) .map(|v| v.to_string()) - .unwrap(), + .expect("value as datetime"), "1970-01-01 01:00:00 +01:00" ); - } - - #[test] - fn test_schema_too_many_columns() { - let values: Vec = vec![0]; - let file = tempfile().unwrap(); - gen_parquet_file::(&values, file.try_clone().unwrap()).unwrap(); - let supplied_schema = Arc::new(Schema::new(vec![ - Field::new("leaf", ArrowDataType::Int64, false), - Field::new("extra", ArrowDataType::Int32, true), - ])); - let options_with_schema = ArrowReaderOptions::new().with_schema(supplied_schema.clone()); - let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( - file.try_clone().unwrap(), - options_with_schema, + assert_eq!( + batch + .column(1) + .as_any() + .downcast_ref::() + .expect("downcast to date64") + .value_as_date(0) + .map(|v| v.to_string()) + .expect("value as date"), + "1970-01-01" ); + let nested = batch + .column(2) + .as_any() + .downcast_ref::() + .expect("downcast to struct"); + + let nested_dict = nested + .column(0) + .as_any() + .downcast_ref::() + .expect("downcast to dictionary"); + assert_eq!( - builder.err().unwrap().to_string(), - "Arrow: incompatible arrow schema, expected 1 struct fields got 2" + nested_dict + .values() + .as_any() + .downcast_ref::() + .expect("downcast to string") + .iter() + .collect::>(), + vec![Some("a"), Some("b")] ); - } - #[test] - fn test_schema_incompatible_column() { - let values: Vec = vec![0]; - let file = tempfile().unwrap(); - gen_parquet_file::(&values, file.try_clone().unwrap()).unwrap(); - let supplied_schema = Arc::new(Schema::new(vec![Field::new( - "leaf", - ArrowDataType::Int32, - false, - )])); - let options_with_schema = ArrowReaderOptions::new().with_schema(supplied_schema.clone()); - let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( - file.try_clone().unwrap(), - options_with_schema, + assert_eq!( + nested_dict.keys().iter().collect::>(), + vec![Some(0), Some(0), Some(0), Some(1)] ); assert_eq!( - builder.err().unwrap().to_string(), - "Arrow: supplied schema does not match the parquet schema" + nested + .column(1) + .as_any() + .downcast_ref::() + .expect("downcast to timestamp nanosecond") + .value_as_datetime_with_tz(0, "+10:00".parse().unwrap()) + .map(|v| v.to_string()) + .expect("value as datetime"), + "1970-01-01 10:00:00.000000001 +10:00" ); } From 7cf6fdba00e66d5c595f047c1c1f7406c9a7d401 Mon Sep 17 00:00:00 2001 From: Eric Fredine Date: Sat, 29 Jun 2024 15:11:40 -0700 Subject: [PATCH 3/3] Add an example using showing how to use the with_schema option. --- parquet/src/arrow/arrow_reader/mod.rs | 34 +++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 82d992f4a84d..b38092dbc937 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -286,6 +286,40 @@ impl ArrowReaderOptions { /// The supplied schema must have the same number of columns as the parquet schema and /// the column names need to be the same. /// + /// # Example + /// ``` + /// use std::io::Bytes; + /// use std::sync::Arc; + /// use tempfile::tempfile; + /// use arrow_array::{ArrayRef, Int32Array, RecordBatch}; + /// use arrow_schema::{DataType, Field, Schema, TimeUnit}; + /// use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; + /// use parquet::arrow::ArrowWriter; + /// + /// // Write data - schema is inferred from the data to be Int32 + /// let file = tempfile().unwrap(); + /// let batch = RecordBatch::try_from_iter(vec![ + /// ("col_1", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), + /// ]).unwrap(); + /// let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), None).unwrap(); + /// writer.write(&batch).unwrap(); + /// writer.close().unwrap(); + /// + /// // Read the file back. + /// // Supply a schema that interprets the Int32 column as a Timestamp. + /// let supplied_schema = Arc::new(Schema::new(vec![ + /// Field::new("col_1", DataType::Timestamp(TimeUnit::Nanosecond, None), false) + /// ])); + /// let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone()); + /// let mut builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + /// file.try_clone().unwrap(), + /// options + /// ).expect("Error if the schema is not compatible with the parquet file schema."); + /// + /// // Create the reader and read the data using the supplied schema. + /// let mut reader = builder.build().unwrap(); + /// let _batch = reader.next().unwrap().unwrap(); + /// ``` pub fn with_schema(self, schema: SchemaRef) -> Self { Self { supplied_schema: Some(schema),