Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a365b61
add uuid as opt in
paleolimbot Mar 20, 2025
4cee020
convert in other direction
paleolimbot Mar 20, 2025
3d1a4ec
one more convert change
paleolimbot Mar 20, 2025
dc696cf
enable arrow_extensions_enabled in pyarrow
paleolimbot Mar 24, 2025
300f5eb
Add Python test
paleolimbot Mar 24, 2025
af8227c
Add C++ test
paleolimbot Mar 24, 2025
bd9c9a9
format
paleolimbot Mar 24, 2025
f187586
fix comment that was lying
paleolimbot Mar 24, 2025
d11fcd8
clean up schema.cc
paleolimbot Mar 24, 2025
f7a965e
fix python lint
paleolimbot Mar 24, 2025
b4d0730
apply improvement to arrow_extensions_enabled
paleolimbot Mar 31, 2025
2193fd4
Check physical length before inferring Arrow UUID type
paleolimbot Mar 31, 2025
a81d5c1
fix test comments in arrow_schema_test.cc
paleolimbot Mar 31, 2025
62f92b3
Attempt to reduce duplication in extension type restoration
paleolimbot Mar 31, 2025
d1b37e8
remove unused header
paleolimbot Apr 1, 2025
4bd607b
support nested extension components
paleolimbot Apr 1, 2025
7529947
remove unneeded branch
paleolimbot Apr 1, 2025
807a48f
Merge branch 'main' into parquet-uuid
paleolimbot Apr 4, 2025
ced4d6b
Merge branch 'main' into parquet-uuid
paleolimbot Apr 10, 2025
d2d98f2
fix merge
paleolimbot Apr 10, 2025
f3894b8
Update cpp/src/parquet/arrow/arrow_schema_test.cc
paleolimbot Apr 18, 2025
a101c8b
Update cpp/src/parquet/arrow/arrow_schema_test.cc
paleolimbot Apr 18, 2025
4c66a75
Update cpp/src/parquet/arrow/schema_internal.cc
paleolimbot Apr 18, 2025
234475e
Update cpp/src/parquet/arrow/schema.cc
paleolimbot Apr 18, 2025
8e8d5ee
Update cpp/src/parquet/arrow/arrow_schema_test.cc
paleolimbot Apr 18, 2025
26afc40
Update cpp/src/parquet/arrow/arrow_schema_test.cc
paleolimbot Apr 18, 2025
4b63ec7
clang-format
paleolimbot Apr 18, 2025
b48f7f5
move supported storage type test to UuidType, move common code out of…
paleolimbot Apr 18, 2025
555ed75
maybe simplify it all again
paleolimbot Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
arrow_properties.set_io_context(
parquet_scan_options.arrow_reader_properties->io_context());
arrow_properties.set_use_threads(options.use_threads);
arrow_properties.set_arrow_extensions_enabled(
parquet_scan_options.arrow_reader_properties->get_arrow_extensions_enabled());
return arrow_properties;
}

Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/extension/uuid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Result<std::shared_ptr<DataType>> UuidType::Deserialize(
if (!serialized.empty()) {
return Status::Invalid("Unexpected serialized metadata: '", serialized, "'");
}
if (!storage_type->Equals(*fixed_size_binary(16))) {
if (!IsSupportedStorageType(storage_type)) {
return Status::Invalid("Invalid storage type for UuidType: ",
storage_type->ToString());
}
Expand All @@ -55,4 +55,8 @@ std::string UuidType::ToString(bool show_metadata) const {

std::shared_ptr<DataType> uuid() { return std::make_shared<UuidType>(); }

bool UuidType::IsSupportedStorageType(const std::shared_ptr<DataType>& storage_type) {
return storage_type->Equals(*fixed_size_binary(16));
}

} // namespace arrow::extension
2 changes: 2 additions & 0 deletions cpp/src/arrow/extension/uuid.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class ARROW_EXPORT UuidType : public ExtensionType {

/// \brief Create a UuidType instance
static Result<std::shared_ptr<DataType>> Make() { return std::make_shared<UuidType>(); }

static bool IsSupportedStorageType(const std::shared_ptr<DataType>& storage_type);
};

/// \brief Return a UuidType instance.
Expand Down
65 changes: 64 additions & 1 deletion cpp/src/parquet/arrow/arrow_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "arrow/array.h"
#include "arrow/extension/json.h"
#include "arrow/extension/uuid.h"
#include "arrow/ipc/writer.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/type.h"
Expand Down Expand Up @@ -945,7 +946,7 @@ TEST_F(TestConvertParquetSchema, ParquetVariant) {
}
}

TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) {
TEST_F(TestConvertParquetSchema, ParquetSchemaArrowJsonExtension) {
std::vector<NodePtr> parquet_fields;
parquet_fields.push_back(PrimitiveNode::Make(
"json_1", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::JSON));
Expand Down Expand Up @@ -1027,6 +1028,68 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) {
}
}

TEST_F(TestConvertParquetSchema, ParquetSchemaArrowUuidExtension) {
std::vector<NodePtr> parquet_fields;
parquet_fields.push_back(PrimitiveNode::Make("uuid", Repetition::OPTIONAL,
LogicalType::UUID(),
ParquetType::FIXED_LEN_BYTE_ARRAY, 16));

{
// Parquet file does not contain Arrow schema.
// By default, field should be treated as fixed_size_binary(16) in Arrow.
auto arrow_schema =
::arrow::schema({::arrow::field("uuid", ::arrow::fixed_size_binary(16))});
std::shared_ptr<KeyValueMetadata> metadata{};
ASSERT_OK(ConvertSchema(parquet_fields, metadata));
CheckFlatSchema(arrow_schema);
}

{
// Parquet file does not contain Arrow schema.
// If Arrow extensions are enabled, field will be interpreted as uuid()
// extension field.
ArrowReaderProperties props;
props.set_arrow_extensions_enabled(true);
auto arrow_schema =
::arrow::schema({::arrow::field("uuid", ::arrow::extension::uuid())});
std::shared_ptr<KeyValueMetadata> metadata{};
ASSERT_OK(ConvertSchema(parquet_fields, metadata, props));
CheckFlatSchema(arrow_schema);
}

{
// Parquet file contains Arrow schema.
// uuid will be interpreted as uuid() field even though extensions are not enabled.
ArrowReaderProperties props;
props.set_arrow_extensions_enabled(false);
std::shared_ptr<KeyValueMetadata> field_metadata =
::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"});
auto arrow_schema = ::arrow::schema({::arrow::field(
"uuid", ::arrow::extension::uuid(), /*nullable=*/true, field_metadata)});

std::shared_ptr<KeyValueMetadata> metadata;
ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata));
ASSERT_OK(ConvertSchema(parquet_fields, metadata, props));
CheckFlatSchema(arrow_schema, true /* check_metadata */);
}

{
// Parquet file contains Arrow schema.
// uuid will be interpreted as uuid() field also when extensions *are* enabled
ArrowReaderProperties props;
props.set_arrow_extensions_enabled(true);
std::shared_ptr<KeyValueMetadata> field_metadata =
::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"});
auto arrow_schema = ::arrow::schema({::arrow::field(
"uuid", ::arrow::extension::uuid(), /*nullable=*/true, field_metadata)});

std::shared_ptr<KeyValueMetadata> metadata;
ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata));
ASSERT_OK(ConvertSchema(parquet_fields, metadata, props));
CheckFlatSchema(arrow_schema, true /* check_metadata */);
}
}

class TestConvertArrowSchema : public ::testing::Test {
public:
virtual void SetUp() {}
Expand Down
107 changes: 58 additions & 49 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <vector>

#include "arrow/extension/json.h"
#include "arrow/extension/uuid.h"
#include "arrow/extension_type.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/api.h"
Expand Down Expand Up @@ -454,12 +455,18 @@ Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
type = ParquetType::BYTE_ARRAY;
logical_type = LogicalType::JSON();
break;
} else if (ext_type->extension_name() == std::string("arrow.uuid")) {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
logical_type = LogicalType::UUID();
length = 16;
break;
} else if (ext_type->extension_name() == std::string("parquet.variant")) {
auto variant_type = std::static_pointer_cast<VariantExtensionType>(field->type());

return VariantToNode(variant_type, name, field->nullable(), field_id, properties,
arrow_properties, out);
}

std::shared_ptr<::arrow::Field> storage_field = ::arrow::field(
name, ext_type->storage_type(), field->nullable(), field->metadata());
return FieldToNode(name, storage_field, properties, arrow_properties, out);
Expand Down Expand Up @@ -1052,60 +1059,62 @@ Result<bool> ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer
bool modified = false;

auto& origin_type = origin_field.type();
const auto& inferred_type = inferred->field->type();

// The origin was an extension type. This occurs when the ARROW:extension:name field
// was present when the schema was written and that extension is registered when
// the schema is read.
if (origin_type->id() == ::arrow::Type::EXTENSION) {
const auto& ex_type = checked_cast<const ::arrow::ExtensionType&>(*origin_type);
if (inferred_type->id() != ::arrow::Type::EXTENSION &&
ex_type.extension_name() == std::string("arrow.json") &&
::arrow::extension::JsonExtensionType::IsSupportedStorageType(
inferred_type->id())) {
// Schema mismatch.
//
// Arrow extensions are DISABLED in Parquet.
// origin_type is ::arrow::extension::json()
// inferred_type is ::arrow::utf8()
//
// Origin type is restored as Arrow should be considered the source of truth.
inferred->field = inferred->field->WithType(origin_type);
RETURN_NOT_OK(ApplyOriginalStorageMetadata(origin_field, inferred));
} else if (inferred_type->id() == ::arrow::Type::EXTENSION &&
ex_type.extension_name() == std::string("arrow.json")) {
// Potential schema mismatch.
//
// Arrow extensions are ENABLED in Parquet.
// origin_type is arrow::extension::json(...)
// inferred_type is arrow::extension::json(arrow::utf8())
auto origin_storage_field = origin_field.WithType(ex_type.storage_type());

// Apply metadata recursively to storage type
RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred));
inferred->field = inferred->field->WithType(origin_type);
} else if (inferred_type->id() == ::arrow::Type::EXTENSION &&
ex_type.extension_name() == std::string("parquet.variant")) {
// Potential schema mismatch.
//
// Arrow extensions are ENABLED in Parquet.
// origin_type is parquet::arrow::variant(...)
// inferred_type is
// parquet::arrow::variant(struct(arrow::binary(),arrow::binary()))
auto origin_storage_field = origin_field.WithType(ex_type.storage_type());

// Apply metadata recursively to storage type
RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred));
inferred->field = inferred->field->WithType(origin_type);
const auto& origin_extension_type =
checked_cast<const ::arrow::ExtensionType&>(*origin_type);

// (Recursively) Apply the original storage metadata from the original storage field
// This applies extension types to child elements, if any.
auto origin_storage_field =
origin_field.WithType(origin_extension_type.storage_type());
RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred));

// Use the inferred type after child updates for below checks to see if
// we can restore an extension type on the output.
const auto& inferred_type = inferred->field->type();

// Whether or not the inferred type is also an extension type. This can occur when
// arrow_extensions_enabled is true in the ArrowReaderProperties. Extension types
// are not currently inferred for any other reason.
bool arrow_extension_inferred = inferred_type->id() == ::arrow::Type::EXTENSION;

// Check if the inferred storage type is compatible with the extension type
// we're hoping to apply. We assume that if an extension type was inferred
// that it was constructed with a valid storage type. Otherwise, we check with
// extension types that we know about for valid storage, falling back to
// storage type equality for extension types that we don't know about.
std::string origin_extension_name = origin_extension_type.extension_name();
bool extension_supports_inferred_storage;

if (origin_extension_name == "arrow.json") {
extension_supports_inferred_storage =
arrow_extension_inferred ||
::arrow::extension::JsonExtensionType::IsSupportedStorageType(
inferred_type->id());
} else if (origin_extension_name == "arrow.uuid") {
extension_supports_inferred_storage =
arrow_extension_inferred ||
::arrow::extension::UuidType::IsSupportedStorageType(inferred_type);
} else if (origin_extension_name == "parquet.variant") {
extension_supports_inferred_storage =
arrow_extension_inferred ||
VariantExtensionType::IsSupportedStorageType(inferred_type);
} else {
auto origin_storage_field = origin_field.WithType(ex_type.storage_type());

// Apply metadata recursively to storage type
RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred));
extension_supports_inferred_storage =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above comment, do we need to check this? It will go to the else branch at line 1108 and the same check is performed there too.

origin_extension_type.storage_type()->Equals(*inferred_type);
}

// Restore extension type, if the storage type is the same as inferred
// from the Parquet type
if (ex_type.storage_type()->Equals(*inferred->field->type())) {
inferred->field = inferred->field->WithType(origin_type);
}
// If the origin extension of the metadata we are about to apply supports
// the Arrow storage type we would otherwise return, we restore the extension
// type to the output.
if (extension_supports_inferred_storage) {
inferred->field = inferred->field->WithType(origin_type);
}

modified = true;
} else {
ARROW_ASSIGN_OR_RAISE(modified, ApplyOriginalStorageMetadata(origin_field, inferred));
Expand Down
16 changes: 11 additions & 5 deletions cpp/src/parquet/arrow/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "parquet/arrow/schema_internal.h"

#include "arrow/extension/json.h"
#include "arrow/extension/uuid.h"
#include "arrow/type.h"

#include "parquet/properties.h"
Expand Down Expand Up @@ -134,20 +135,25 @@ Result<std::shared_ptr<ArrowType>> FromByteArray(
}
}

Result<std::shared_ptr<ArrowType>> FromFLBA(const LogicalType& logical_type,
int32_t physical_length) {
Result<std::shared_ptr<ArrowType>> FromFLBA(
const LogicalType& logical_type, int32_t physical_length,
const ArrowReaderProperties& reader_properties) {
switch (logical_type.type()) {
case LogicalType::Type::DECIMAL:
return MakeArrowDecimal(logical_type);
case LogicalType::Type::FLOAT16:
return ::arrow::float16();
case LogicalType::Type::NONE:
case LogicalType::Type::INTERVAL:
return ::arrow::fixed_size_binary(physical_length);
case LogicalType::Type::UUID:
if (physical_length == 16 && reader_properties.get_arrow_extensions_enabled()) {
return ::arrow::extension::uuid();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check physical_length here?

}

return ::arrow::fixed_size_binary(physical_length);
default:
return Status::NotImplemented("Unhandled logical logical_type ",
logical_type.ToString(),
return Status::NotImplemented("Unhandled logical_type ", logical_type.ToString(),
" for fixed-length binary array");
}
}
Expand Down Expand Up @@ -216,7 +222,7 @@ Result<std::shared_ptr<ArrowType>> GetArrowType(
case ParquetType::BYTE_ARRAY:
return FromByteArray(logical_type, reader_properties);
case ParquetType::FIXED_LEN_BYTE_ARRAY:
return FromFLBA(logical_type, type_length);
return FromFLBA(logical_type, type_length, reader_properties);
default: {
// PARQUET-1565: This can occur if the file is corrupt
return Status::IOError("Invalid physical column type: ",
Expand Down
26 changes: 21 additions & 5 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
cache_options : pyarrow.CacheOptions, default None
Cache options used when pre_buffer is enabled. The default values should
be good for most use cases. You may want to adjust these for example if
you have exceptionally high latency to the file system.
you have exceptionally high latency to the file system.
thrift_string_size_limit : int, default None
If not None, override the maximum total string size allocated
when decoding Thrift structures. The default limit should be
Expand All @@ -720,6 +720,10 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
Parquet file.
page_checksum_verification : bool, default False
If True, verify the page checksum for each page read from the file.
arrow_extensions_enabled : bool, default False
If True, read Parquet logical types as Arrow extension types where possible,
(e.g., read JSON as the canonical `arrow.json` extension type or UUID as
the canonical `arrow.uuid` extension type).
"""

# Avoid mistakingly creating attributes
Expand All @@ -733,7 +737,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
thrift_container_size_limit=None,
decryption_config=None,
decryption_properties=None,
bint page_checksum_verification=False):
bint page_checksum_verification=False,
bint arrow_extensions_enabled=False):
self.init(shared_ptr[CFragmentScanOptions](
new CParquetFragmentScanOptions()))
self.use_buffered_stream = use_buffered_stream
Expand All @@ -752,6 +757,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
if decryption_properties is not None:
self.decryption_properties = decryption_properties
self.page_checksum_verification = page_checksum_verification
self.arrow_extensions_enabled = arrow_extensions_enabled

cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
Expand Down Expand Up @@ -868,6 +874,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
def page_checksum_verification(self, bint page_checksum_verification):
self.reader_properties().set_page_checksum_verification(page_checksum_verification)

@property
def arrow_extensions_enabled(self):
return self.arrow_reader_properties().get_arrow_extensions_enabled()

@arrow_extensions_enabled.setter
def arrow_extensions_enabled(self, bint arrow_extensions_enabled):
self.arrow_reader_properties().set_arrow_extensions_enabled(arrow_extensions_enabled)

def equals(self, ParquetFragmentScanOptions other):
"""
Parameters
Expand All @@ -881,11 +895,12 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
attrs = (
self.use_buffered_stream, self.buffer_size, self.pre_buffer, self.cache_options,
self.thrift_string_size_limit, self.thrift_container_size_limit,
self.page_checksum_verification)
self.page_checksum_verification, self.arrow_extensions_enabled)
other_attrs = (
other.use_buffered_stream, other.buffer_size, other.pre_buffer, other.cache_options,
other.thrift_string_size_limit,
other.thrift_container_size_limit, other.page_checksum_verification)
other.thrift_container_size_limit, other.page_checksum_verification,
other.arrow_extensions_enabled)
return attrs == other_attrs

@staticmethod
Expand All @@ -902,7 +917,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
cache_options=self.cache_options,
thrift_string_size_limit=self.thrift_string_size_limit,
thrift_container_size_limit=self.thrift_container_size_limit,
page_checksum_verification=self.page_checksum_verification
page_checksum_verification=self.page_checksum_verification,
arrow_extensions_enabled=self.arrow_extensions_enabled
)
return ParquetFragmentScanOptions._reconstruct, (kwargs,)

Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
CCacheOptions cache_options() const
void set_coerce_int96_timestamp_unit(TimeUnit unit)
TimeUnit coerce_int96_timestamp_unit() const
void set_arrow_extensions_enabled(c_bool extensions_enabled)
c_bool get_arrow_extensions_enabled() const

ArrowReaderProperties default_arrow_reader_properties()

Expand Down
Loading
Loading