From c1b5e7abb92ceb65120850ed0e46ba56d851065a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 22 Sep 2025 16:20:17 -0400 Subject: [PATCH 1/3] Fix parquet logical type annotations and consolidate extension type handling --- parquet/src/arrow/schema/complex.rs | 16 ++-- parquet/src/arrow/schema/extension.rs | 85 ++++++++++++++++--- parquet/src/arrow/schema/mod.rs | 117 +++++++++++--------------- 3 files changed, 127 insertions(+), 91 deletions(-) diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index ecc80a65904a..bf4835994ac3 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::arrow::schema::extension::add_extension_type; +use crate::arrow::schema::extension::try_add_extension_type; use crate::arrow::schema::primitive::convert_primitive; use crate::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY}; use crate::basic::{ConvertedType, Repetition}; @@ -224,7 +224,7 @@ impl Visitor { if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? { // The child type returned may be different from what is encoded in the arrow // schema in the event of a mismatch or a projection - child_fields.push(convert_field(parquet_field, &mut child, arrow_field)); + child_fields.push(convert_field(parquet_field, &mut child, arrow_field)?); children.push(child); } } @@ -355,11 +355,11 @@ impl Visitor { match (maybe_key, maybe_value) { (Some(mut key), Some(mut value)) => { let key_field = Arc::new( - convert_field(map_key, &mut key, arrow_key) + convert_field(map_key, &mut key, arrow_key)? // The key is always non-nullable (#5630) .with_nullable(false), ); - let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)); + let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)?); let field_metadata = match arrow_map { Some(field) => field.metadata().clone(), _ => HashMap::default(), @@ -497,7 +497,7 @@ impl Visitor { match self.dispatch(item_type, new_context) { Ok(Some(mut item)) => { - let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)); + let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)?); // Use arrow type as hint for index size let arrow_type = match context.data_type { @@ -549,7 +549,7 @@ fn convert_field( parquet_type: &Type, field: &mut ParquetField, arrow_hint: Option<&Field>, -) -> Field { +) -> Result { let name = parquet_type.name(); let data_type = field.arrow_type.clone(); let nullable = field.nullable; @@ -567,7 +567,7 @@ fn convert_field( _ => Field::new(name, data_type, nullable), }; - field.with_metadata(hint.metadata().clone()) + Ok(field.with_metadata(hint.metadata().clone())) } None => { let mut ret = Field::new(name, data_type, nullable); @@ -580,7 +580,7 @@ fn convert_field( ); ret.set_metadata(meta); } - add_extension_type(ret, parquet_type) + try_add_extension_type(ret, parquet_type) } } } diff --git a/parquet/src/arrow/schema/extension.rs b/parquet/src/arrow/schema/extension.rs index 752b9a5ced87..f2d16948a5fc 100644 --- a/parquet/src/arrow/schema/extension.rs +++ b/parquet/src/arrow/schema/extension.rs @@ -24,6 +24,7 @@ //! with the key "ARROW:extension:name". use crate::basic::LogicalType; +use crate::errors::ParquetError; use crate::schema::types::Type; use arrow_schema::extension::ExtensionType; use arrow_schema::Field; @@ -34,23 +35,49 @@ use arrow_schema::Field; /// Some Parquet logical types, such as Variant, do not map directly to an /// Arrow DataType, and instead are represented by an Arrow ExtensionType. /// Extension types are attached to Arrow Fields via metadata. -pub(crate) fn add_extension_type(mut arrow_field: Field, parquet_type: &Type) -> Field { - match parquet_type.get_basic_info().logical_type() { +pub(crate) fn try_add_extension_type( + mut arrow_field: Field, + parquet_type: &Type, +) -> Result { + let Some(parquet_logical_type) = parquet_type.get_basic_info().logical_type() else { + return Ok(arrow_field); + }; + match parquet_logical_type { #[cfg(feature = "variant_experimental")] - Some(LogicalType::Variant) => { - // try to add the Variant extension type, but if that fails (e.g. because the - // storage type is not supported), just return the field as is - arrow_field - .try_with_extension_type(parquet_variant_compute::VariantType) - .ok(); - arrow_field + LogicalType::Variant => { + arrow_field.try_with_extension_type(parquet_variant_compute::VariantType)?; } - // TODO add other LogicalTypes here - _ => arrow_field, + #[cfg(feature = "arrow_canonical_extension_types")] + LogicalType::Uuid => { + arrow_field.try_with_extension_type(arrow_schema::extension::Uuid)?; + } + #[cfg(feature = "arrow_canonical_extension_types")] + LogicalType::Json => { + arrow_field.try_with_extension_type(arrow_schema::extension::Json::default())?; + } + _ => {} + }; + Ok(arrow_field) +} + +/// Returns true if [`try_add_extension_type`] would add an extension type +/// to the specified Parquet field. (used to preallocate the metadata) +pub(crate) fn has_extension_type(parquet_type: &Type) -> bool { + let Some(parquet_logical_type) = parquet_type.get_basic_info().logical_type() else { + return false; + }; + match parquet_logical_type { + #[cfg(feature = "variant_experimental")] + LogicalType::Variant => true, + #[cfg(feature = "arrow_canonical_extension_types")] + LogicalType::Uuid => true, + #[cfg(feature = "arrow_canonical_extension_types")] + LogicalType::Json => true, + _ => false, } } -/// Return the Parquet logical type to use for the specified Arrow field, if any. +/// Return the Parquet logical type to use for the specified Arrow Struct field, if any. #[cfg(feature = "variant_experimental")] pub(crate) fn logical_type_for_struct(field: &Field) -> Option { use parquet_variant_compute::VariantType; @@ -67,6 +94,38 @@ pub(crate) fn logical_type_for_struct(field: &Field) -> Option { } #[cfg(not(feature = "variant_experimental"))] -pub(crate) fn logical_type_for_struct(field: &Field) -> Option { +pub(crate) fn logical_type_for_struct(_field: &Field) -> Option { None } + +/// Return the Parquet logical type to use for the specified Arrow fixed size binary field, if any. +#[cfg(feature = "arrow_canonical_extension_types")] +pub(crate) fn logical_type_for_fixed_size_binary(field: &Field) -> Option { + use arrow_schema::extension::Uuid; + // If set, map arrow uuid extension type to parquet uuid logical type. + field + .try_extension_type::() + .ok() + .map(|_| LogicalType::Uuid) +} + +#[cfg(not(feature = "arrow_canonical_extension_types"))] +pub(crate) fn logical_type_for_fixed_size_binary(_field: &Field) -> Option { + None +} + +/// Return the Parquet logical type to use for the specified Arrow string field (Utf8, LargeUtf8) if any +#[cfg(feature = "arrow_canonical_extension_types")] +pub(crate) fn logical_type_for_string(field: &Field) -> Option { + use arrow_schema::extension::Json; + // Use the Json logical type if the canonical Json + // extension type is set on this field. + field + .try_extension_type::() + .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json)) +} + +#[cfg(not(feature = "arrow_canonical_extension_types"))] +pub(crate) fn logical_type_for_string(_field: &Field) -> Option { + Some(LogicalType::String) +} diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 9d1098d86ca6..0763c88c9129 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -23,8 +23,6 @@ use std::collections::HashMap; use std::sync::Arc; use arrow_ipc::writer; -#[cfg(feature = "arrow_canonical_extension_types")] -use arrow_schema::extension::{Json, Uuid}; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use crate::basic::{ @@ -39,7 +37,10 @@ mod extension; mod primitive; use super::PARQUET_FIELD_ID_META_KEY; -use crate::arrow::schema::extension::logical_type_for_struct; +use crate::arrow::schema::extension::{ + has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string, + logical_type_for_struct, try_add_extension_type, +}; use crate::arrow::ProjectionMask; pub(crate) use complex::{ParquetField, ParquetFieldType}; @@ -390,31 +391,27 @@ pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result ret.try_with_extension_type(Uuid)?, - LogicalType::Json => ret.try_with_extension_type(Json::default())?, - _ => {} - } - } - if !meta.is_empty() { - ret.set_metadata(meta); - } - - Ok(ret) + try_add_extension_type(ret, parquet_column.self_type()) } pub fn decimal_length_from_precision(precision: u8) -> usize { @@ -618,16 +615,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { .with_repetition(repetition) .with_id(id) .with_length(*length) - .with_logical_type( - #[cfg(feature = "arrow_canonical_extension_types")] - // If set, map arrow uuid extension type to parquet uuid logical type. - field - .try_extension_type::() - .ok() - .map(|_| LogicalType::Uuid), - #[cfg(not(feature = "arrow_canonical_extension_types"))] - None, - ) + .with_logical_type(logical_type_for_fixed_size_binary(field)) .build() } DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) @@ -664,35 +652,13 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { } DataType::Utf8 | DataType::LargeUtf8 => { Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) - .with_logical_type({ - #[cfg(feature = "arrow_canonical_extension_types")] - { - // Use the Json logical type if the canonical Json - // extension type is set on this field. - field - .try_extension_type::() - .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json)) - } - #[cfg(not(feature = "arrow_canonical_extension_types"))] - Some(LogicalType::String) - }) + .with_logical_type(logical_type_for_string(field)) .with_repetition(repetition) .with_id(id) .build() } DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) - .with_logical_type({ - #[cfg(feature = "arrow_canonical_extension_types")] - { - // Use the Json logical type if the canonical Json - // extension type is set on this field. - field - .try_extension_type::() - .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json)) - } - #[cfg(not(feature = "arrow_canonical_extension_types"))] - Some(LogicalType::String) - }) + .with_logical_type(logical_type_for_string(field)) .with_repetition(repetition) .with_id(id) .build(), @@ -797,8 +763,6 @@ mod tests { use std::{collections::HashMap, sync::Arc}; - use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; - use crate::arrow::PARQUET_FIELD_ID_META_KEY; use crate::file::metadata::KeyValue; use crate::file::reader::FileReader; @@ -806,6 +770,7 @@ mod tests { arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter}, schema::{parser::parse_message_type, types::SchemaDescriptor}, }; + use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; #[test] fn test_flat_primitives() { @@ -844,12 +809,25 @@ mod tests { Field::new("float16", DataType::Float16, true), Field::new("string", DataType::Utf8, true), Field::new("string_2", DataType::Utf8, true), - Field::new("json", DataType::Utf8, true), + json_field(), ]); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); } + /// Return the expected Field for a Parquet column annotated with + /// the JSON logical type. + fn json_field() -> Field { + #[cfg(feature = "arrow_canonical_extension_types")] + { + Field::new("json", DataType::Utf8, true).with_extension_type(arrow_schema::extension::Json::default()) + } + #[cfg(not(feature = "arrow_canonical_extension_types"))] + { + Field::new("json", DataType::Utf8, true) + } + } + #[test] fn test_decimal_fields() { let message_type = " @@ -2233,6 +2211,7 @@ mod tests { #[test] #[cfg(feature = "arrow_canonical_extension_types")] fn arrow_uuid_to_parquet_uuid() -> Result<()> { + use arrow_schema::extension::Uuid; let arrow_schema = Schema::new(vec![Field::new( "uuid", DataType::FixedSizeBinary(16), @@ -2247,9 +2226,8 @@ mod tests { Some(LogicalType::Uuid) ); - // TODO: roundtrip - // let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?; - // assert_eq!(arrow_schema.field(0).try_extension_type::()?, Uuid); + let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?; + assert_eq!(arrow_schema.field(0).try_extension_type::()?, Uuid); Ok(()) } @@ -2257,6 +2235,7 @@ mod tests { #[test] #[cfg(feature = "arrow_canonical_extension_types")] fn arrow_json_to_parquet_json() -> Result<()> { + use arrow_schema::extension::Json; let arrow_schema = Schema::new(vec![ Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()) ]); @@ -2268,13 +2247,11 @@ mod tests { Some(LogicalType::Json) ); - // TODO: roundtrip - // https://github.com/apache/arrow-rs/issues/7063 - // let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?; - // assert_eq!( - // arrow_schema.field(0).try_extension_type::()?, - // Json::default() - // ); + let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?; + assert_eq!( + arrow_schema.field(0).try_extension_type::()?, + Json::default() + ); Ok(()) } From 58d5f600fc330051894b46cb1c667a0933d6e912 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 26 Sep 2025 12:52:24 -0400 Subject: [PATCH 2/3] Comments --- parquet/src/arrow/schema/extension.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/schema/extension.rs b/parquet/src/arrow/schema/extension.rs index f2d16948a5fc..c6d04328aca8 100644 --- a/parquet/src/arrow/schema/extension.rs +++ b/parquet/src/arrow/schema/extension.rs @@ -61,7 +61,9 @@ pub(crate) fn try_add_extension_type( } /// Returns true if [`try_add_extension_type`] would add an extension type -/// to the specified Parquet field. (used to preallocate the metadata) +/// to the specified Parquet field. +/// +/// This is used to preallocate the metadata hashmap size pub(crate) fn has_extension_type(parquet_type: &Type) -> bool { let Some(parquet_logical_type) = parquet_type.get_basic_info().logical_type() else { return false; From 4065cdd6785d721df589e3e4b7d6dd1e67eb481b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 26 Sep 2025 13:29:09 -0400 Subject: [PATCH 3/3] fmt --- parquet/src/arrow/schema/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 0763c88c9129..8e689f5eb66e 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -820,7 +820,8 @@ mod tests { fn json_field() -> Field { #[cfg(feature = "arrow_canonical_extension_types")] { - Field::new("json", DataType::Utf8, true).with_extension_type(arrow_schema::extension::Json::default()) + Field::new("json", DataType::Utf8, true) + .with_extension_type(arrow_schema::extension::Json::default()) } #[cfg(not(feature = "arrow_canonical_extension_types"))] {