diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 537c4a26412..5b886e3e17e 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2469,7 +2469,60 @@ TEST(ArrowReadWrite, ListOfStructOfList1) { CheckSimpleRoundtrip(table, 2); } -TEST(ArrowReadWrite, DISABLED_ListOfStructOfList2) { +TEST(ArrowReadWrite, Map) { + using ::arrow::field; + using ::arrow::map; + + auto type = map(::arrow::int16(), ::arrow::utf8()); + + const char* json = R"([ + [[1, "a"], [2, "b"]], + [[3, "c"]], + [], + null, + [[4, "d"], [5, "e"], [6, "f"]] + ])"; + auto array = ::arrow::ArrayFromJSON(type, json); + auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); + auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); + CheckSimpleRoundtrip(table, 2, props_store_schema); +} + +TEST(ArrowReadWrite, LargeList) { + using ::arrow::field; + using ::arrow::large_list; + using ::arrow::struct_; + + auto type = large_list(::arrow::int16()); + + const char* json = R"([ + [1, 2, 3], + [4, 5, 6], + [7, 8, 9]])"; + auto array = ::arrow::ArrayFromJSON(type, json); + auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); + auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); + CheckSimpleRoundtrip(table, 2, props_store_schema); +} + +TEST(ArrowReadWrite, FixedSizeList) { + using ::arrow::field; + using ::arrow::fixed_size_list; + using ::arrow::struct_; + + auto type = fixed_size_list(::arrow::int16(), /*size=*/3); + + const char* json = R"([ + [1, 2, 3], + [4, 5, 6], + [7, 8, 9]])"; + auto array = ::arrow::ArrayFromJSON(type, json); + auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); + auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); + CheckSimpleRoundtrip(table, 2, props_store_schema); +} + +TEST(ArrowReadWrite, ListOfStructOfList2) { using ::arrow::field; using ::arrow::list; using ::arrow::struct_; @@ -2484,7 +2537,7 @@ TEST(ArrowReadWrite, DISABLED_ListOfStructOfList2) { [{"a": 123, "b": [1, 2, 3]}], null, [], - [{"a": 456}, {"a": 789, "b": [null]}, {"a": 876, "b": [4, 5, 6]}]])"; + [{"a": 456, "b": []}, {"a": 789, "b": [null]}, {"a": 876, "b": [4, 5, 6]}]])"; auto array = ::arrow::ArrayFromJSON(type, json); auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); CheckSimpleRoundtrip(table, 2); diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index 260e66a1fcf..c32811961ed 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -346,6 +346,44 @@ TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } +TEST_F(TestConvertParquetSchema, ParquetMaps) { + std::vector parquet_fields; + std::vector> arrow_fields; + + // MAP encoding example taken from parquet-format/LogicalTypes.md + + // Two column map. + { + auto key = PrimitiveNode::Make("key", Repetition::REQUIRED, ParquetType::BYTE_ARRAY, + ConvertedType::UTF8); + auto value = PrimitiveNode::Make("value", Repetition::OPTIONAL, + ParquetType::BYTE_ARRAY, ConvertedType::UTF8); + + auto list = GroupNode::Make("key_value", Repetition::REPEATED, {key, value}); + parquet_fields.push_back( + GroupNode::Make("my_map", Repetition::REQUIRED, {list}, LogicalType::Map())); + auto arrow_value = ::arrow::field("string", UTF8, /*nullable=*/true); + auto arrow_map = ::arrow::map(/*key=*/UTF8, arrow_value); + arrow_fields.push_back(::arrow::field("my_map", arrow_map, false)); + } + // Single column map (i.e. set) gets converted to list of struct. + { + auto key = PrimitiveNode::Make("key", Repetition::REQUIRED, ParquetType::BYTE_ARRAY, + ConvertedType::UTF8); + + auto list = GroupNode::Make("key_value", Repetition::REPEATED, {key}); + parquet_fields.push_back( + GroupNode::Make("my_set", Repetition::REQUIRED, {list}, LogicalType::Map())); + auto arrow_list = ::arrow::list({::arrow::field("key", UTF8, /*nullable=*/false)}); + arrow_fields.push_back(::arrow::field("my_set", arrow_list, false)); + } + + auto arrow_schema = ::arrow::schema(arrow_fields); + ASSERT_OK(ConvertSchema(parquet_fields)); + + ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); +} + TEST_F(TestConvertParquetSchema, ParquetLists) { std::vector parquet_fields; std::vector> arrow_fields; @@ -1217,6 +1255,54 @@ TEST_F(TestLevels, TestPrimitive) { /*ancestor_list_def_level*/ 1})); // primitive field } +TEST_F(TestLevels, TestMaps) { + // Two column map. + auto key = PrimitiveNode::Make("key", Repetition::REQUIRED, ParquetType::BYTE_ARRAY, + ConvertedType::UTF8); + auto value = PrimitiveNode::Make("value", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, + ConvertedType::UTF8); + + auto list = GroupNode::Make("key_value", Repetition::REPEATED, {key, value}); + SetParquetSchema( + GroupNode::Make("my_map", Repetition::OPTIONAL, {list}, LogicalType::Map())); + ASSERT_OK_AND_ASSIGN(std::deque levels, + RootToTreeLeafLevels(*manifest_, /*column_number=*/0)); + EXPECT_THAT( + levels, + ElementsAre(LevelInfo{/*null_slot_usage=*/1, /*def_level=*/2, /*rep_level=*/1, + /*ancestor_list_def_level*/ 0}, + LevelInfo{/*null_slot_usage=*/1, /*def_level=*/2, /*rep_level=*/1, + /*ancestor_list_def_level*/ 2}, + LevelInfo{/*null_slot_usage=*/1, /*def_level=*/2, /*rep_level=*/1, + /*ancestor_list_def_level*/ 2})); + + ASSERT_OK_AND_ASSIGN(levels, RootToTreeLeafLevels(*manifest_, /*column_number=*/1)); + EXPECT_THAT( + levels, + ElementsAre(LevelInfo{/*null_slot_usage=*/1, /*def_level=*/2, /*rep_level=*/1, + /*ancestor_list_def_level*/ 0}, + LevelInfo{/*null_slot_usage=*/1, /*def_level=*/2, /*rep_level=*/1, + /*ancestor_list_def_level*/ 2}, + LevelInfo{/*null_slot_usage=*/1, /*def_level=*/3, /*rep_level=*/1, + /*ancestor_list_def_level*/ 2})); + + // single column map. + key = PrimitiveNode::Make("key", Repetition::REQUIRED, ParquetType::BYTE_ARRAY, + ConvertedType::UTF8); + + list = GroupNode::Make("key_value", Repetition::REPEATED, {key}); + SetParquetSchema( + GroupNode::Make("my_set", Repetition::REQUIRED, {list}, LogicalType::Map())); + + ASSERT_OK_AND_ASSIGN(levels, RootToTreeLeafLevels(*manifest_, /*column_number=*/0)); + EXPECT_THAT( + levels, + ElementsAre(LevelInfo{/*null_slot_usage=*/1, /*def_level=*/1, /*rep_level=*/1, + /*ancestor_list_def_level*/ 0}, + LevelInfo{/*null_slot_usage=*/1, /*def_level=*/1, /*rep_level=*/1, + /*ancestor_list_def_level*/ 1})); +} + TEST_F(TestLevels, TestSimpleGroups) { // Arrow schema: struct(child: struct(inner: boolean not null)) SetParquetSchema(GroupNode::Make( diff --git a/cpp/src/parquet/arrow/path_internal.cc b/cpp/src/parquet/arrow/path_internal.cc index 0f570b7e0ca..daa05a81c79 100644 --- a/cpp/src/parquet/arrow/path_internal.cc +++ b/cpp/src/parquet/arrow/path_internal.cc @@ -815,13 +815,17 @@ class PathBuilder { Status Visit(const ::arrow::FixedSizeListArray& array) { MaybeAddNullable(array); int32_t list_size = array.list_type()->list_size(); - if (list_size == 0) { - info_.max_def_level++; - } + // Technically we could encode fixed size lists with two level encodings + // but since we always use 3 level encoding we increment def levels as + // well. + info_.max_def_level++; info_.max_rep_level++; info_.path.push_back(FixedSizeListNode(FixedSizedRangeSelector{list_size}, info_.max_rep_level, info_.max_def_level)); nullable_in_parent_ = array.list_type()->value_field()->nullable(); + if (array.offset() > 0) { + return VisitInline(*array.values()->Slice(array.value_offset(0))); + } return VisitInline(*array.values()); } diff --git a/cpp/src/parquet/arrow/path_internal_test.cc b/cpp/src/parquet/arrow/path_internal_test.cc index f122a08f668..065e9866e0b 100644 --- a/cpp/src/parquet/arrow/path_internal_test.cc +++ b/cpp/src/parquet/arrow/path_internal_test.cc @@ -461,6 +461,26 @@ TEST_F(MultipathLevelBuilderTest, TestStruct) { /*def_levels=*/std::vector({2, 2, 0})); } +TEST_F(MultipathLevelBuilderTest, TestFixedSizeListNullableElements) { + auto entries = field("Entries", ::arrow::int64()); + auto list_type = fixed_size_list(entries, 2); + auto array = ::arrow::ArrayFromJSON(list_type, R"([null, [2, 3], [4, 5], null])"); + + ASSERT_OK( + MultipathLevelBuilder::Write(*array, /*nullable=*/true, &context_, callback_)); + + ASSERT_THAT(results_, SizeIs(1)); + results_[0].CheckLevels(/*def_levels=*/std::vector{0, 3, 3, 3, 3, 0}, + /*rep_levels=*/std::vector{0, 0, 1, 0, 1, 0}); + + // Null slots take up space in a fixed size list (they can in variable size + // lists as well) but the actual written values are only the "middle" elements + // in this case. + ASSERT_THAT(results_[0].post_list_elements, SizeIs(1)); + EXPECT_THAT(results_[0].post_list_elements[0].start, Eq(2)); + EXPECT_THAT(results_[0].post_list_elements[0].end, Eq(6)); +} + TEST_F(MultipathLevelBuilderTest, TestFixedSizeList) { auto entries = field("Entries", ::arrow::int64(), /*nullable=*/false); auto list_type = fixed_size_list(entries, 2); @@ -470,7 +490,7 @@ TEST_F(MultipathLevelBuilderTest, TestFixedSizeList) { MultipathLevelBuilder::Write(*array, /*nullable=*/true, &context_, callback_)); ASSERT_THAT(results_, SizeIs(1)); - results_[0].CheckLevels(/*def_levels=*/std::vector{0, 1, 1, 1, 1, 0}, + results_[0].CheckLevels(/*def_levels=*/std::vector{0, 2, 2, 2, 2, 0}, /*rep_levels=*/std::vector{0, 0, 1, 0, 1, 0}); // Null slots take up space in a fixed size list (they can in variable size diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index af437437c19..76fe7e68ffd 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -538,6 +538,12 @@ class ListReader : public ColumnReaderImpl { return item_reader_->LoadBatch(number_of_records); } + virtual ::arrow::Result> AssembleArray( + std::shared_ptr data) { + std::shared_ptr result = ::arrow::MakeArray(data); + return std::make_shared(result); + } + Status BuildArray(int64_t length_upper_bound, std::shared_ptr* out) override { const int16_t* def_levels; @@ -545,6 +551,7 @@ class ListReader : public ColumnReaderImpl { int64_t num_levels; RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels)); RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels)); + std::shared_ptr validity_buffer; ::parquet::internal::ValidityBitmapInputOutput validity_io; validity_io.values_read_upper_bound = length_upper_bound; @@ -576,6 +583,7 @@ class ListReader : public ColumnReaderImpl { if (validity_buffer != nullptr) { RETURN_NOT_OK( validity_buffer->Resize(BitUtil::BytesForBits(validity_io.values_read))); + validity_buffer->ZeroPadding(); } ARROW_ASSIGN_OR_RAISE(std::shared_ptr item_chunk, ChunksToSingle(**out)); @@ -586,8 +594,7 @@ class ListReader : public ColumnReaderImpl { /*length=*/validity_io.values_read, std::move(buffers), std::vector>{item_chunk}, validity_io.null_count); - std::shared_ptr result = ::arrow::MakeArray(data); - *out = std::make_shared(result); + ARROW_ASSIGN_OR_RAISE(*out, AssembleArray(std::move(data))); return Status::OK(); } @@ -600,6 +607,32 @@ class ListReader : public ColumnReaderImpl { std::unique_ptr item_reader_; }; +class PARQUET_NO_EXPORT FixedSizeListReader : public ListReader { + public: + FixedSizeListReader(std::shared_ptr ctx, std::shared_ptr field, + ::parquet::internal::LevelInfo level_info, + std::unique_ptr child_reader) + : ListReader(std::move(ctx), std::move(field), level_info, + std::move(child_reader)) {} + ::arrow::Result> AssembleArray( + std::shared_ptr data) final { + DCHECK_EQ(data->buffers.size(), 2); + DCHECK_EQ(field()->type()->id(), ::arrow::Type::FIXED_SIZE_LIST); + const auto& type = checked_cast<::arrow::FixedSizeListType&>(*field()->type()); + const int32_t* offsets = reinterpret_cast(data->buffers[1]->data()); + for (int x = 1; x <= data->length; x++) { + int32_t size = offsets[x] - offsets[x - 1]; + if (size != type.list_size()) { + return Status::Invalid("Expected all lists to be of size=", type.list_size(), + " but index ", x, " had size=", size); + } + } + data->buffers.resize(1); + std::shared_ptr result = ::arrow::MakeArray(data); + return std::make_shared(result); + } +}; + class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl { public: explicit StructReader(std::shared_ptr ctx, @@ -714,6 +747,7 @@ Status StructReader::BuildArray(int64_t length_upper_bound, // Ensure all values are initialized. if (null_bitmap) { RETURN_NOT_OK(null_bitmap->Resize(BitUtil::BytesForBits(validity_io.values_read))); + null_bitmap->ZeroPadding(); } END_PARQUET_CATCH_EXCEPTIONS @@ -770,7 +804,10 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& arrow_f std::unique_ptr input( ctx->iterator_factory(field.column_index, ctx->reader)); out->reset(new LeafReader(ctx, arrow_field, std::move(input), field.level_info)); - } else if (type_id == ::arrow::Type::LIST) { + } else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP || + type_id == ::arrow::Type::FIXED_SIZE_LIST || + type_id == ::arrow::Type::LARGE_LIST) { + auto list_field = arrow_field; auto child = &field.children[0]; std::unique_ptr child_reader; RETURN_NOT_OK(GetReader(*child, ctx, &child_reader)); @@ -778,8 +815,25 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& arrow_f *out = nullptr; return Status::OK(); } - out->reset(new ListReader(ctx, arrow_field, field.level_info, - std::move(child_reader))); + if (type_id == ::arrow::Type::LIST || + type_id == ::arrow::Type::MAP) { // Map can be reconstructed as list of structs. + if (type_id == ::arrow::Type::MAP && + child_reader->field()->type()->num_fields() != 2) { + // This case applies if either key or value is filtered. + list_field = list_field->WithType(::arrow::list(child_reader->field())); + } + out->reset(new ListReader(ctx, list_field, field.level_info, + std::move(child_reader))); + } else if (type_id == ::arrow::Type::LARGE_LIST) { + out->reset(new ListReader(ctx, list_field, field.level_info, + std::move(child_reader))); + + } else if (type_id == ::arrow::Type::FIXED_SIZE_LIST) { + out->reset(new FixedSizeListReader(ctx, list_field, field.level_info, + std::move(child_reader))); + } else { + return Status::UnknownError("Unknown list type: ", field.field->ToString()); + } } else if (type_id == ::arrow::Type::STRUCT) { std::vector> child_fields; std::vector> child_readers; diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 6babe9bc7cf..505fa5822ae 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -467,6 +467,90 @@ Status GroupToStruct(const GroupNode& node, LevelInfo current_levels, return Status::OK(); } +Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels, + SchemaTreeContext* ctx, const SchemaField* parent, + SchemaField* out); + +Status MapToSchemaField(const GroupNode& group, LevelInfo current_levels, + SchemaTreeContext* ctx, const SchemaField* parent, + SchemaField* out) { + if (group.field_count() != 1) { + return Status::Invalid("MAP-annotated groups must have a single child."); + } + if (group.is_repeated()) { + return Status::Invalid("MAP-annotated groups must not be repeated."); + } + + const Node& key_value_node = *group.field(0); + + if (!key_value_node.is_repeated()) { + return Status::Invalid( + "Non-repeated key value in a MAP-annotated group are not supported."); + } + + if (!key_value_node.is_group()) { + return Status::Invalid("Key-value node must be a group."); + } + + const GroupNode& key_value = checked_cast(key_value_node); + if (key_value.field_count() != 1 && key_value.field_count() != 2) { + return Status::Invalid("Key-value map node must have 1 or 2 child elements. Found: ", + key_value.field_count()); + } + const Node& key_node = *key_value.field(0); + if (!key_node.is_required()) { + return Status::Invalid("Map keys must be annotated as required."); + } + // Arrow doesn't support 1 column maps (i.e. Sets). The options are to either + // make the values column nullable, or process the map as a list. We choose the latter + // as it is simpler. + if (key_value.field_count() == 1) { + return ListToSchemaField(group, current_levels, ctx, parent, out); + } + + current_levels.Increment(group); + int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated(); + + out->children.resize(1); + SchemaField* key_value_field = &out->children[0]; + + key_value_field->children.resize(2); + SchemaField* key_field = &key_value_field->children[0]; + SchemaField* value_field = &key_value_field->children[1]; + + ctx->LinkParent(out, parent); + ctx->LinkParent(key_value_field, out); + ctx->LinkParent(key_field, key_value_field); + ctx->LinkParent(value_field, key_value_field); + + // required/optional group name=whatever { + // repeated group name=key_values{ + // required TYPE key; + // required/optional TYPE value; + // } + // } + // + + RETURN_NOT_OK(NodeToSchemaField(*key_value.field(0), current_levels, ctx, + key_value_field, key_field)); + RETURN_NOT_OK(NodeToSchemaField(*key_value.field(1), current_levels, ctx, + key_value_field, value_field)); + + key_value_field->field = ::arrow::field( + group.name(), ::arrow::struct_({key_field->field, value_field->field}), + /*nullable=*/false, FieldIdMetadata(key_value.field_id())); + key_value_field->level_info = current_levels; + + out->field = ::arrow::field(group.name(), + ::arrow::map(key_field->field->type(), value_field->field), + group.is_optional(), FieldIdMetadata(group.field_id())); + out->level_info = current_levels; + // At this point current levels contains the def level for this list, + // we need to reset to the prior parent. + out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level; + return Status::OK(); +} + Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels, SchemaTreeContext* ctx, const SchemaField* parent, SchemaField* out) { @@ -555,6 +639,8 @@ Status GroupToSchemaField(const GroupNode& node, LevelInfo current_levels, SchemaField* out) { if (node.logical_type()->is_list()) { return ListToSchemaField(node, current_levels, ctx, parent, out); + } else if (node.logical_type()->is_map()) { + return MapToSchemaField(node, current_levels, ctx, parent, out); } std::shared_ptr type; if (node.is_repeated()) { @@ -697,13 +783,26 @@ std::function(FieldVector)> GetNestedFactory( } break; case ::arrow::Type::LIST: - // TODO also allow LARGE_LIST and FIXED_SIZE_LIST if (origin_type.id() == ::arrow::Type::LIST) { return [](FieldVector fields) { DCHECK_EQ(fields.size(), 1); return ::arrow::list(std::move(fields[0])); }; } + if (origin_type.id() == ::arrow::Type::LARGE_LIST) { + return [](FieldVector fields) { + DCHECK_EQ(fields.size(), 1); + return ::arrow::large_list(std::move(fields[0])); + }; + } + if (origin_type.id() == ::arrow::Type::FIXED_SIZE_LIST) { + const auto list_size = + checked_cast(origin_type).list_size(); + return [list_size](FieldVector fields) { + DCHECK_EQ(fields.size(), 1); + return ::arrow::fixed_size_list(std::move(fields[0]), list_size); + }; + } break; default: break; @@ -723,7 +822,12 @@ Result ApplyOriginalStorageMetadata(const Field& origin_field, if (num_children > 0 && origin_type->num_fields() == num_children) { DCHECK_EQ(static_cast(inferred->children.size()), num_children); const auto factory = GetNestedFactory(*origin_type, *inferred_type); + ARROW_LOG(INFO) << "Nested type: origin = " << origin_type->ToString() + << ", inferred = " << inferred_type->ToString(); if (factory) { + // The type may be modified (e.g. LargeList) while the children stay the same + modified |= origin_type->id() != inferred_type->id(); + // Apply original metadata recursively to children for (int i = 0; i < inferred_type->num_fields(); ++i) { ARROW_ASSIGN_OR_RAISE( diff --git a/python/pyarrow/tests/test_extension_type.py b/python/pyarrow/tests/test_extension_type.py index e6c7da1721e..1ef70e79696 100644 --- a/python/pyarrow/tests/test_extension_type.py +++ b/python/pyarrow/tests/test_extension_type.py @@ -560,6 +560,17 @@ def test_parquet_nested_extension(tmpdir): assert table.column(0).type == list_array.type assert table == orig_table + # Large list of extensions + list_array = pa.LargeListArray.from_arrays([0, 1, None, 3], ext_array) + + orig_table = pa.table({'lists': list_array}) + filename = tmpdir / 'list_of_ext.parquet' + pq.write_table(orig_table, filename) + + table = pq.read_table(filename) + assert table.column(0).type == list_array.type + assert table == orig_table + @pytest.mark.parquet def test_parquet_extension_nested_in_extension(tmpdir):