Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions parquet/src/arrow/schema/complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::arrow::schema::extension::add_extension_type;
use crate::arrow::schema::extension::try_add_extension_type;
use crate::arrow::schema::primitive::convert_primitive;
use crate::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use crate::basic::{ConvertedType, Repetition};
Expand Down Expand Up @@ -224,7 +224,7 @@ impl Visitor {
if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
// The child type returned may be different from what is encoded in the arrow
// schema in the event of a mismatch or a projection
child_fields.push(convert_field(parquet_field, &mut child, arrow_field));
child_fields.push(convert_field(parquet_field, &mut child, arrow_field)?);
children.push(child);
}
}
Expand Down Expand Up @@ -355,11 +355,11 @@ impl Visitor {
match (maybe_key, maybe_value) {
(Some(mut key), Some(mut value)) => {
let key_field = Arc::new(
convert_field(map_key, &mut key, arrow_key)
convert_field(map_key, &mut key, arrow_key)?
// The key is always non-nullable (#5630)
.with_nullable(false),
);
let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value));
let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)?);
let field_metadata = match arrow_map {
Some(field) => field.metadata().clone(),
_ => HashMap::default(),
Expand Down Expand Up @@ -497,7 +497,7 @@ impl Visitor {

match self.dispatch(item_type, new_context) {
Ok(Some(mut item)) => {
let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field));
let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)?);

// Use arrow type as hint for index size
let arrow_type = match context.data_type {
Expand Down Expand Up @@ -549,7 +549,7 @@ fn convert_field(
parquet_type: &Type,
field: &mut ParquetField,
arrow_hint: Option<&Field>,
) -> Field {
) -> Result<Field, ParquetError> {
let name = parquet_type.name();
let data_type = field.arrow_type.clone();
let nullable = field.nullable;
Expand All @@ -567,7 +567,7 @@ fn convert_field(
_ => Field::new(name, data_type, nullable),
};

field.with_metadata(hint.metadata().clone())
Ok(field.with_metadata(hint.metadata().clone()))
}
None => {
let mut ret = Field::new(name, data_type, nullable);
Expand All @@ -580,7 +580,7 @@ fn convert_field(
);
ret.set_metadata(meta);
}
add_extension_type(ret, parquet_type)
try_add_extension_type(ret, parquet_type)
}
}
}
Expand Down
87 changes: 74 additions & 13 deletions parquet/src/arrow/schema/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//! with the key "ARROW:extension:name".

use crate::basic::LogicalType;
use crate::errors::ParquetError;
use crate::schema::types::Type;
use arrow_schema::extension::ExtensionType;
use arrow_schema::Field;
Expand All @@ -34,23 +35,51 @@ use arrow_schema::Field;
/// Some Parquet logical types, such as Variant, do not map directly to an
/// Arrow DataType, and instead are represented by an Arrow ExtensionType.
/// Extension types are attached to Arrow Fields via metadata.
pub(crate) fn add_extension_type(mut arrow_field: Field, parquet_type: &Type) -> Field {
match parquet_type.get_basic_info().logical_type() {
pub(crate) fn try_add_extension_type(
mut arrow_field: Field,
parquet_type: &Type,
) -> Result<Field, ParquetError> {
let Some(parquet_logical_type) = parquet_type.get_basic_info().logical_type() else {
return Ok(arrow_field);
};
match parquet_logical_type {
#[cfg(feature = "variant_experimental")]
Some(LogicalType::Variant) => {
// try to add the Variant extension type, but if that fails (e.g. because the
// storage type is not supported), just return the field as is
arrow_field
.try_with_extension_type(parquet_variant_compute::VariantType)
.ok();
arrow_field
LogicalType::Variant => {
arrow_field.try_with_extension_type(parquet_variant_compute::VariantType)?;
}
// TODO add other LogicalTypes here
_ => arrow_field,
#[cfg(feature = "arrow_canonical_extension_types")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core change of this PR is moving the code that handles extension types from the schema module to the new extension module and put them behind some named functions.

LogicalType::Uuid => {
arrow_field.try_with_extension_type(arrow_schema::extension::Uuid)?;
}
#[cfg(feature = "arrow_canonical_extension_types")]
LogicalType::Json => {
arrow_field.try_with_extension_type(arrow_schema::extension::Json::default())?;
}
_ => {}
};
Ok(arrow_field)
}

/// Returns true if [`try_add_extension_type`] would add an extension type
/// to the specified Parquet field.
///
/// This is used to preallocate the metadata hashmap size
pub(crate) fn has_extension_type(parquet_type: &Type) -> bool {
let Some(parquet_logical_type) = parquet_type.get_basic_info().logical_type() else {
return false;
};
match parquet_logical_type {
#[cfg(feature = "variant_experimental")]
LogicalType::Variant => true,
#[cfg(feature = "arrow_canonical_extension_types")]
LogicalType::Uuid => true,
#[cfg(feature = "arrow_canonical_extension_types")]
LogicalType::Json => true,
_ => false,
}
}

/// Return the Parquet logical type to use for the specified Arrow field, if any.
/// Return the Parquet logical type to use for the specified Arrow Struct field, if any.
#[cfg(feature = "variant_experimental")]
pub(crate) fn logical_type_for_struct(field: &Field) -> Option<LogicalType> {
use parquet_variant_compute::VariantType;
Expand All @@ -67,6 +96,38 @@ pub(crate) fn logical_type_for_struct(field: &Field) -> Option<LogicalType> {
}

#[cfg(not(feature = "variant_experimental"))]
pub(crate) fn logical_type_for_struct(field: &Field) -> Option<LogicalType> {
pub(crate) fn logical_type_for_struct(_field: &Field) -> Option<LogicalType> {
None
}

/// Return the Parquet logical type to use for the specified Arrow fixed size binary field, if any.
#[cfg(feature = "arrow_canonical_extension_types")]
pub(crate) fn logical_type_for_fixed_size_binary(field: &Field) -> Option<LogicalType> {
use arrow_schema::extension::Uuid;
// If set, map arrow uuid extension type to parquet uuid logical type.
field
.try_extension_type::<Uuid>()
.ok()
.map(|_| LogicalType::Uuid)
}

#[cfg(not(feature = "arrow_canonical_extension_types"))]
pub(crate) fn logical_type_for_fixed_size_binary(_field: &Field) -> Option<LogicalType> {
None
}

/// Return the Parquet logical type to use for the specified Arrow string field (Utf8, LargeUtf8) if any
#[cfg(feature = "arrow_canonical_extension_types")]
pub(crate) fn logical_type_for_string(field: &Field) -> Option<LogicalType> {
use arrow_schema::extension::Json;
// Use the Json logical type if the canonical Json
// extension type is set on this field.
field
.try_extension_type::<Json>()
.map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
}

#[cfg(not(feature = "arrow_canonical_extension_types"))]
pub(crate) fn logical_type_for_string(_field: &Field) -> Option<LogicalType> {
Some(LogicalType::String)
}
118 changes: 48 additions & 70 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ use std::collections::HashMap;
use std::sync::Arc;

use arrow_ipc::writer;
#[cfg(feature = "arrow_canonical_extension_types")]
use arrow_schema::extension::{Json, Uuid};
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};

use crate::basic::{
Expand All @@ -39,7 +37,10 @@ mod extension;
mod primitive;

use super::PARQUET_FIELD_ID_META_KEY;
use crate::arrow::schema::extension::logical_type_for_struct;
use crate::arrow::schema::extension::{
has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string,
logical_type_for_struct, try_add_extension_type,
};
use crate::arrow::ProjectionMask;
pub(crate) use complex::{ParquetField, ParquetFieldType};

Expand Down Expand Up @@ -390,31 +391,27 @@ pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field
let field = complex::convert_type(&parquet_column.self_type_ptr())?;
let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);

let basic_info = parquet_column.self_type().get_basic_info();
let mut meta = HashMap::with_capacity(if cfg!(feature = "arrow_canonical_extension_types") {
2
} else {
1
});
let parquet_type = parquet_column.self_type();
let basic_info = parquet_type.get_basic_info();

let mut hash_map_size = 0;
if basic_info.has_id() {
hash_map_size += 1;
}
if has_extension_type(parquet_type) {
hash_map_size += 1;
}
if hash_map_size == 0 {
return Ok(ret);
}
ret.set_metadata(HashMap::with_capacity(hash_map_size));
if basic_info.has_id() {
meta.insert(
ret.metadata_mut().insert(
PARQUET_FIELD_ID_META_KEY.to_string(),
basic_info.id().to_string(),
);
}
#[cfg(feature = "arrow_canonical_extension_types")]
if let Some(logical_type) = basic_info.logical_type() {
match logical_type {
LogicalType::Uuid => ret.try_with_extension_type(Uuid)?,
LogicalType::Json => ret.try_with_extension_type(Json::default())?,
_ => {}
}
}
if !meta.is_empty() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix for #7063

The existing code has a very subtle bug -- by calling ret.set_metadata here it wipes out any metadata that was attached to ret that was added by try_with_extension_type

ret.set_metadata(meta);
}

Ok(ret)
try_add_extension_type(ret, parquet_column.self_type())
}

pub fn decimal_length_from_precision(precision: u8) -> usize {
Expand Down Expand Up @@ -618,16 +615,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
.with_repetition(repetition)
.with_id(id)
.with_length(*length)
.with_logical_type(
#[cfg(feature = "arrow_canonical_extension_types")]
// If set, map arrow uuid extension type to parquet uuid logical type.
field
.try_extension_type::<Uuid>()
.ok()
.map(|_| LogicalType::Uuid),
#[cfg(not(feature = "arrow_canonical_extension_types"))]
None,
)
.with_logical_type(logical_type_for_fixed_size_binary(field))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is just moved into the extension module

.build()
}
DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
Expand Down Expand Up @@ -664,35 +652,13 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
}
DataType::Utf8 | DataType::LargeUtf8 => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_logical_type({
#[cfg(feature = "arrow_canonical_extension_types")]
{
// Use the Json logical type if the canonical Json
// extension type is set on this field.
field
.try_extension_type::<Json>()
.map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
}
#[cfg(not(feature = "arrow_canonical_extension_types"))]
Some(LogicalType::String)
})
.with_logical_type(logical_type_for_string(field))
.with_repetition(repetition)
.with_id(id)
.build()
}
DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_logical_type({
#[cfg(feature = "arrow_canonical_extension_types")]
{
// Use the Json logical type if the canonical Json
// extension type is set on this field.
field
.try_extension_type::<Json>()
.map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
}
#[cfg(not(feature = "arrow_canonical_extension_types"))]
Some(LogicalType::String)
})
.with_logical_type(logical_type_for_string(field))
.with_repetition(repetition)
.with_id(id)
.build(),
Expand Down Expand Up @@ -797,15 +763,14 @@ mod tests {

use std::{collections::HashMap, sync::Arc};

use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};

use crate::arrow::PARQUET_FIELD_ID_META_KEY;
use crate::file::metadata::KeyValue;
use crate::file::reader::FileReader;
use crate::{
arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
schema::{parser::parse_message_type, types::SchemaDescriptor},
};
use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};

#[test]
fn test_flat_primitives() {
Expand Down Expand Up @@ -844,12 +809,26 @@ mod tests {
Field::new("float16", DataType::Float16, true),
Field::new("string", DataType::Utf8, true),
Field::new("string_2", DataType::Utf8, true),
Field::new("json", DataType::Utf8, true),
json_field(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this now actually fails when the canonical extension types are enabled, because a JSON parquet field is now (correctly) annotated with the extension type field

]);

assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}

/// Return the expected Field for a Parquet column annotated with
/// the JSON logical type.
fn json_field() -> Field {
#[cfg(feature = "arrow_canonical_extension_types")]
{
Field::new("json", DataType::Utf8, true)
.with_extension_type(arrow_schema::extension::Json::default())
}
#[cfg(not(feature = "arrow_canonical_extension_types"))]
{
Field::new("json", DataType::Utf8, true)
}
}

#[test]
fn test_decimal_fields() {
let message_type = "
Expand Down Expand Up @@ -2233,6 +2212,7 @@ mod tests {
#[test]
#[cfg(feature = "arrow_canonical_extension_types")]
fn arrow_uuid_to_parquet_uuid() -> Result<()> {
use arrow_schema::extension::Uuid;
let arrow_schema = Schema::new(vec![Field::new(
"uuid",
DataType::FixedSizeBinary(16),
Expand All @@ -2247,16 +2227,16 @@ mod tests {
Some(LogicalType::Uuid)
);

// TODO: roundtrip
// let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
// assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);

Ok(())
}

#[test]
#[cfg(feature = "arrow_canonical_extension_types")]
fn arrow_json_to_parquet_json() -> Result<()> {
use arrow_schema::extension::Json;
let arrow_schema = Schema::new(vec![
Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
]);
Expand All @@ -2268,13 +2248,11 @@ mod tests {
Some(LogicalType::Json)
);

// TODO: roundtrip
// https://github.com/apache/arrow-rs/issues/7063
// let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
// assert_eq!(
// arrow_schema.field(0).try_extension_type::<Json>()?,
// Json::default()
// );
let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it works!

assert_eq!(
arrow_schema.field(0).try_extension_type::<Json>()?,
Json::default()
);

Ok(())
}
Expand Down
Loading