Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

ARROW-1644: [C++] Initial cut of implementing deserialization of arbitrary nested groups from Parquet to Arrow #462

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ add_custom_command(
# Library config

set(LIBPARQUET_SRCS
src/parquet/arrow/deserializer.cc
src/parquet/arrow/reader.cc
src/parquet/arrow/record_reader.cc
src/parquet/arrow/schema.cc
Expand Down
Binary file added data/dremel.parquet
Binary file not shown.
153 changes: 144 additions & 9 deletions src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "parquet/util/test-common.h"

#include "arrow/api.h"
#include "arrow/pretty_print.h"
#include "arrow/test-util.h"
#include "arrow/type_traits.h"
#include "arrow/util/decimal.h"
Expand Down Expand Up @@ -427,7 +428,8 @@ void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, 1, row_group_size, {}, &result, arrow_properties);
ASSERT_NO_FATAL_FAILURE(
DoSimpleRoundtrip(table, 1, row_group_size, {}, &result, arrow_properties));
ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result, false));
}

Expand Down Expand Up @@ -508,8 +510,8 @@ class TestParquetIO : public ::testing::Test {
std::shared_ptr<Array> out;

std::unique_ptr<FileReader> reader;
ReaderFromSink(&reader);
ReadSingleColumnFile(std::move(reader), &out);
ASSERT_NO_FATAL_FAILURE(ReaderFromSink(&reader));
ASSERT_NO_FATAL_FAILURE(ReadSingleColumnFile(std::move(reader), &out));

internal::AssertArraysEqual(values, *out);
}
Expand Down Expand Up @@ -554,8 +556,8 @@ class TestParquetIO : public ::testing::Test {
void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) {
std::shared_ptr<::arrow::Table> out;
std::unique_ptr<FileReader> reader;
ReaderFromSink(&reader);
ReadTableFromFile(std::move(reader), &out);
ASSERT_NO_FATAL_FAILURE(ReaderFromSink(&reader));
ASSERT_NO_FATAL_FAILURE(ReadTableFromFile(std::move(reader), &out));
ASSERT_EQ(1, out->num_columns());
ASSERT_EQ(values->length(), out->num_rows());

Expand Down Expand Up @@ -1043,8 +1045,8 @@ TEST_F(TestNullParquetIO, NullListColumn) {

std::shared_ptr<Table> out;
std::unique_ptr<FileReader> reader;
this->ReaderFromSink(&reader);
this->ReadTableFromFile(std::move(reader), &out);
ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out));
ASSERT_EQ(1, out->num_columns());
ASSERT_EQ(offsets.size() - 1, out->num_rows());

Expand Down Expand Up @@ -1132,7 +1134,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
ASSERT_NO_FATAL_FAILURE(MakeTestFile(values, num_chunks, &file_reader));

std::shared_ptr<Table> out;
this->ReadTableFromFile(std::move(file_reader), &out);
ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(file_reader), &out));
ASSERT_EQ(1, out->num_columns());
ASSERT_EQ(SMALL_SIZE, out->num_rows());

Expand Down Expand Up @@ -1244,6 +1246,136 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result));
}

TEST(TestArrowReadWrite, DremelExampleTest) {
std::string dir_string(test::get_data_dir());
std::stringstream ss;
ss << dir_string << "/dremel.parquet";
auto parquet_reader = ParquetFileReader::OpenFile(ss.str());
parquet::arrow::FileReader reader(::arrow::default_memory_pool(),
std::move(parquet_reader));
std::shared_ptr<::arrow::Table> out;
ASSERT_OK(reader.ReadTable(&out));

// Dremel paper uses int64, but the sample file
// stored them as int32
auto forward = std::make_shared<::arrow::Int32Builder>();
auto backward = std::make_shared<::arrow::Int32Builder>();
auto docId = std::make_shared<::arrow::Int32Builder>();
auto country = std::make_shared<::arrow::StringBuilder>();
auto code = std::make_shared<::arrow::StringBuilder>();
auto url = std::make_shared<::arrow::StringBuilder>();

std::vector<std::shared_ptr<::arrow::Field>> language_fields;
language_fields.emplace_back(
std::make_shared<::arrow::Field>("Code", ::arrow::utf8(), false));
language_fields.emplace_back(
std::make_shared<::arrow::Field>("Country", ::arrow::utf8()));

auto language_type = std::make_shared<::arrow::StructType>(language_fields);
std::vector<std::shared_ptr<::arrow::ArrayBuilder>> language_builders = {code, country};
auto language = std::make_shared<::arrow::StructBuilder>(
language_type, ::arrow::default_memory_pool(), std::move(language_builders));
auto language_list =
std::make_shared<::arrow::ListBuilder>(::arrow::default_memory_pool(), language);

auto backward_list =
std::make_shared<::arrow::ListBuilder>(::arrow::default_memory_pool(), backward);
auto forward_list =
std::make_shared<::arrow::ListBuilder>(::arrow::default_memory_pool(), forward);
std::vector<std::shared_ptr<::arrow::Field>> links_fields;
links_fields.emplace_back(
std::make_shared<::arrow::Field>("Backward_Outer", backward_list->type(), false));
links_fields.emplace_back(
std::make_shared<::arrow::Field>("Forward_Outer", backward_list->type(), false));
auto links_type = std::make_shared<::arrow::StructType>(links_fields);
::arrow::StructBuilder links(links_type, ::arrow::default_memory_pool(),
{forward_list, backward_list});

std::vector<std::shared_ptr<::arrow::Field>> name_fields;
name_fields.emplace_back(
std::make_shared<::arrow::Field>("Language_Outer", language_list->type(), false));
name_fields.emplace_back(std::make_shared<::arrow::Field>("Url", url->type()));

auto name_type = std::make_shared<::arrow::StructType>(name_fields);
std::vector<std::shared_ptr<::arrow::ArrayBuilder>> name_builders = {language_list,
url};
auto name = std::make_shared<::arrow::StructBuilder>(
name_type, ::arrow::default_memory_pool(), std::move(name_builders));
::arrow::ListBuilder name_list(::arrow::default_memory_pool(), name);

// Row 1
docId->Append(10);
links.Append();
name_list.Append();

// 3 forward links
forward_list->Append();
forward->Append(20);
forward->Append(40);
forward->Append(60);
// No backward links
backward_list->Append();

// First name
name->Append();
language_list->Append();
language->Append();
code->Append("en-us");
country->Append("us");

language->Append();
code->Append("en");
country->AppendNull();

url->Append("http://A");

// Second name
name->Append();
language_list->Append();
url->Append("http://B");

// Third name
name->Append();
language_list->Append();
language->Append();
code->Append("en-gb");
country->Append("gb");
url->AppendNull();

// Row 2
docId->Append(20);
links.Append();
name_list.Append();

backward_list->Append();
backward->Append(10);
backward->Append(30);
forward_list->Append();
forward->Append(80);

name->Append();
language_list->Append();
url->Append("http://C");

std::shared_ptr<::arrow::Array> docIdArray, nameArray, linksArray;

ASSERT_OK(docId->Finish(&docIdArray));
ASSERT_OK(name_list.Finish(&nameArray));
ASSERT_OK(links.Finish(&linksArray));

std::vector<std::shared_ptr<::arrow::Field>> fields;
fields.emplace_back(
std::make_shared<::arrow::Field>("DocId", docIdArray->type(), false));
fields.emplace_back(std::make_shared<::arrow::Field>("Links", linksArray->type()));
fields.emplace_back(std::make_shared<::arrow::Field>("Name", nameArray->type(), false));

auto schema = std::make_shared<::arrow::Schema>(fields);

auto expected = ::arrow::Table::Make(schema, {docIdArray, linksArray, nameArray});

ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*expected, *out));
}

TEST(TestArrowReadWrite, CoerceTimestamps) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
Expand Down Expand Up @@ -1445,8 +1577,8 @@ TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) {
using ::arrow::Schema;
using ::arrow::Table;
using ::arrow::TimeUnit;
using ::arrow::TimestampType;
using ::arrow::TimestampBuilder;
using ::arrow::TimestampType;
using ::arrow::default_memory_pool;

auto timestamp_type = std::make_shared<TimestampType>(TimeUnit::NANO);
Expand Down Expand Up @@ -2161,6 +2293,7 @@ TEST_F(TestNestedSchemaRead, ReadIntoTableFull) {

std::shared_ptr<Table> table;
ASSERT_OK_NO_THROW(reader_->ReadTable(&table));

ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
ASSERT_EQ(table->num_columns(), 2);
ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2);
Expand Down Expand Up @@ -2228,9 +2361,11 @@ TEST_F(TestNestedSchemaRead, ReadTablePartial) {
}

TEST_F(TestNestedSchemaRead, StructAndListTogetherUnsupported) {
#if 0
ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::REPEATED));
std::shared_ptr<Table> table;
ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table));
#endif
}

TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
Expand Down
11 changes: 6 additions & 5 deletions src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ class TestConvertParquetSchema : public ::testing::Test {
for (int i = 0; i < expected_schema->num_fields(); ++i) {
auto lhs = result_schema_->field(i);
auto rhs = expected_schema->field(i);
EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
<< " != " << rhs->ToString();
EXPECT_TRUE(lhs->Equals(rhs))
<< i << " " << lhs->ToString() << " != " << rhs->ToString();
}
}

Expand Down Expand Up @@ -607,9 +607,10 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
auto inner_group_type = std::make_shared<::arrow::StructType>(inner_group_fields);
auto outer_group_fields = {
std::make_shared<Field>("leaf2", INT32, true),
std::make_shared<Field>("innerGroup", ::arrow::list(std::make_shared<Field>(
"innerGroup", inner_group_type, false)),
false)};
std::make_shared<Field>(
"innerGroup",
::arrow::list(std::make_shared<Field>("innerGroup", inner_group_type, false)),
false)};
auto outer_group_type = std::make_shared<::arrow::StructType>(outer_group_fields);

arrow_fields.push_back(std::make_shared<Field>("leaf1", INT32, true));
Expand Down
Loading