Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ set(LIBPARQUET_SRCS
src/parquet/column/writer.cc
src/parquet/column/scanner.cc
src/parquet/column/scan-all.cc
src/parquet/column/statistics.cc

src/parquet/compression/codec.cc
src/parquet/compression/snappy-codec.cc
Expand Down
7 changes: 3 additions & 4 deletions src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,7 @@ class TestParquetIO : public ::testing::Test {
typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
::arrow::StringType>
TestTypes;
::arrow::StringType> TestTypes;

TYPED_TEST_CASE(TestParquetIO, TestTypes);

Expand Down Expand Up @@ -477,8 +476,8 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {

typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type,
::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, ::arrow::DoubleType>
PrimitiveTestTypes;
::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType,
::arrow::DoubleType> PrimitiveTestTypes;

TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);

Expand Down
2 changes: 2 additions & 0 deletions src/parquet/column/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ install(FILES
scan-all.h
scanner.h
writer.h
statistics.h
DESTINATION include/parquet/column)

ADD_PARQUET_TEST(column-reader-test)
ADD_PARQUET_TEST(column-writer-test)
ADD_PARQUET_TEST(levels-test)
ADD_PARQUET_TEST(properties-test)
ADD_PARQUET_TEST(scanner-test)
ADD_PARQUET_TEST(statistics-test)

ADD_PARQUET_BENCHMARK(column-io-benchmark)
ADD_PARQUET_BENCHMARK(level-benchmark)
2 changes: 1 addition & 1 deletion src/parquet/column/column-io-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
return std::unique_ptr<Int64Writer>(new Int64Writer(
schema, std::move(pager), output_size, Encoding::PLAIN, properties));
metadata, std::move(pager), output_size, Encoding::PLAIN, properties));
}

std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
Expand Down
64 changes: 46 additions & 18 deletions src/parquet/column/column-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,53 +60,62 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {

Type::type type_num() { return TestType::type_num; }

void BuildReader() {
void BuildReader(Compression::type compression = Compression::UNCOMPRESSED) {
auto buffer = sink_->GetBuffer();
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
std::unique_ptr<SerializedPageReader> page_reader(
new SerializedPageReader(std::move(source), Compression::UNCOMPRESSED));
new SerializedPageReader(std::move(source), compression));
reader_.reset(new TypedColumnReader<TestType>(this->descr_, std::move(page_reader)));
}

std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) {
int64_t output_size = SMALL_SIZE,
const ColumnProperties& column_properties = ColumnProperties()) {
sink_.reset(new InMemoryOutputStream());
metadata_ = ColumnChunkMetaDataBuilder::Make(
writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
std::unique_ptr<SerializedPageWriter> pager(new SerializedPageWriter(
sink_.get(), Compression::UNCOMPRESSED, metadata_.get()));
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(sink_.get(), column_properties.codec, metadata_.get()));
WriterProperties::Builder wp_builder;
if (encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY) {
if (column_properties.encoding == Encoding::PLAIN_DICTIONARY ||
column_properties.encoding == Encoding::RLE_DICTIONARY) {
wp_builder.enable_dictionary();
} else {
wp_builder.disable_dictionary();
wp_builder.encoding(encoding);
wp_builder.encoding(column_properties.encoding);
}
writer_properties_ = wp_builder.build();
std::shared_ptr<ColumnWriter> writer = ColumnWriter::Make(
this->descr_, std::move(pager), output_size, writer_properties_.get());
metadata_.get(), std::move(pager), output_size, writer_properties_.get());
return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
}

void ReadColumn() {
BuildReader();
void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) {
BuildReader(compression);
reader_->ReadBatch(this->values_out_.size(), definition_levels_out_.data(),
repetition_levels_out_.data(), this->values_out_ptr_, &values_read_);
this->SyncValuesOut();
}

void TestRequiredWithEncoding(Encoding::type encoding) {
return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false);
}

void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression,
bool enable_dictionary, bool enable_statistics) {
this->GenerateData(SMALL_SIZE);

// Test case 1: required and non-repeated, so no definition or repetition levels
ColumnProperties column_properties(
encoding, compression, enable_dictionary, enable_statistics);
std::shared_ptr<TypedColumnWriter<TestType>> writer =
this->BuildWriter(SMALL_SIZE, encoding);
this->BuildWriter(SMALL_SIZE, column_properties);
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
// The behaviour should be independent from the number of Close() calls
writer->Close();
writer->Close();

this->ReadColumn();
this->ReadColumn(compression);
ASSERT_EQ(SMALL_SIZE, this->values_read_);
ASSERT_EQ(this->values_, this->values_out_);
}
Expand All @@ -115,17 +124,17 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
auto metadata_accessor =
ColumnChunkMetaData::Make(reinterpret_cast<const uint8_t*>(&thrift_metadata_));
auto metadata_accessor = ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_);
return metadata_accessor->num_values();
}

std::vector<Encoding::type> metadata_encodings() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
auto metadata_accessor =
ColumnChunkMetaData::Make(reinterpret_cast<const uint8_t*>(&thrift_metadata_));
auto metadata_accessor = ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_);
return metadata_accessor->encodings();
}

Expand All @@ -148,8 +157,7 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
};

typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
BooleanType, ByteArrayType, FLBAType>
TestTypes;
BooleanType, ByteArrayType, FLBAType> TestTypes;

TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);

Expand Down Expand Up @@ -189,6 +197,26 @@ TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
}
*/

TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false);
}

TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false);
}

TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true);
}

TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true);
}

TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true);
}

TYPED_TEST(TestPrimitiveWriter, Optional) {
// Optional and non-repeated, with definition levels
// but no repetition levels
Expand Down
24 changes: 10 additions & 14 deletions src/parquet/column/page.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <memory>
#include <string>

#include "parquet/column/statistics.h"
#include "parquet/types.h"
#include "parquet/util/buffer.h"

Expand Down Expand Up @@ -62,12 +63,14 @@ class DataPage : public Page {
public:
DataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding)
Encoding::type repetition_level_encoding,
const EncodedStatistics& statistics = EncodedStatistics())
: Page(buffer, PageType::DATA_PAGE),
num_values_(num_values),
encoding_(encoding),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}
repetition_level_encoding_(repetition_level_encoding),
statistics_(statistics) {}

int32_t num_values() const { return num_values_; }

Expand All @@ -77,31 +80,24 @@ class DataPage : public Page {

Encoding::type definition_level_encoding() const { return definition_level_encoding_; }

// DataPageHeader::statistics::max field, if it was set
const uint8_t* max() const { return reinterpret_cast<const uint8_t*>(max_.c_str()); }

// DataPageHeader::statistics::min field, if it was set
const uint8_t* min() const { return reinterpret_cast<const uint8_t*>(min_.c_str()); }
const EncodedStatistics& statistics() const { return statistics_; }

private:
int32_t num_values_;
Encoding::type encoding_;
Encoding::type definition_level_encoding_;
Encoding::type repetition_level_encoding_;

// So max/min can be populated privately
friend class SerializedPageReader;
std::string max_;
std::string min_;
EncodedStatistics statistics_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid copying a lot of data, this would probably be better as a shared_ptr.

};

class CompressedDataPage : public DataPage {
public:
CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size)
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics())
: DataPage(buffer, num_values, encoding, definition_level_encoding,
repetition_level_encoding),
repetition_level_encoding, statistics),
uncompressed_size_(uncompressed_size) {}

int64_t uncompressed_size() const { return uncompressed_size_; }
Expand Down
Loading