diff --git a/CMakeLists.txt b/CMakeLists.txt index b43e8239..01ac7d3f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -509,6 +509,7 @@ set(LIBPARQUET_SRCS src/parquet/column/writer.cc src/parquet/column/scanner.cc src/parquet/column/scan-all.cc + src/parquet/column/statistics.cc src/parquet/compression/codec.cc src/parquet/compression/snappy-codec.cc diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index b1f1c52d..11ebee07 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -258,8 +258,7 @@ class TestParquetIO : public ::testing::Test { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType, - ::arrow::StringType> - TestTypes; + ::arrow::StringType> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); @@ -477,8 +476,8 @@ class TestPrimitiveParquetIO : public TestParquetIO { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type, - ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, ::arrow::DoubleType> - PrimitiveTestTypes; + ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, + ::arrow::DoubleType> PrimitiveTestTypes; TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt index dc740215..473c311b 100644 --- a/src/parquet/column/CMakeLists.txt +++ b/src/parquet/column/CMakeLists.txt @@ -24,6 +24,7 @@ install(FILES scan-all.h scanner.h writer.h + statistics.h DESTINATION include/parquet/column) ADD_PARQUET_TEST(column-reader-test) @@ -31,6 +32,7 @@ ADD_PARQUET_TEST(column-writer-test) ADD_PARQUET_TEST(levels-test) ADD_PARQUET_TEST(properties-test) ADD_PARQUET_TEST(scanner-test) +ADD_PARQUET_TEST(statistics-test) ADD_PARQUET_BENCHMARK(column-io-benchmark) ADD_PARQUET_BENCHMARK(level-benchmark) diff --git a/src/parquet/column/column-io-benchmark.cc b/src/parquet/column/column-io-benchmark.cc index bc4afec3..3b62004a 100644 --- a/src/parquet/column/column-io-benchmark.cc +++ b/src/parquet/column/column-io-benchmark.cc @@ -35,7 +35,7 @@ std::unique_ptr BuildWriter(int64_t output_size, OutputStream* dst, std::unique_ptr pager( new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata)); return std::unique_ptr(new Int64Writer( - schema, std::move(pager), output_size, Encoding::PLAIN, properties)); + metadata, std::move(pager), output_size, Encoding::PLAIN, properties)); } std::shared_ptr Int64Schema(Repetition::type repetition) { diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index c843a204..29139c93 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -60,53 +60,62 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { Type::type type_num() { return TestType::type_num; } - void BuildReader() { + void BuildReader(Compression::type compression = Compression::UNCOMPRESSED) { auto buffer = sink_->GetBuffer(); std::unique_ptr source(new InMemoryInputStream(buffer)); std::unique_ptr page_reader( - new SerializedPageReader(std::move(source), Compression::UNCOMPRESSED)); + new SerializedPageReader(std::move(source), compression)); reader_.reset(new TypedColumnReader(this->descr_, std::move(page_reader))); } std::shared_ptr> BuildWriter( - int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) { + int64_t output_size = SMALL_SIZE, + const ColumnProperties& column_properties = ColumnProperties()) { sink_.reset(new InMemoryOutputStream()); metadata_ = ColumnChunkMetaDataBuilder::Make( writer_properties_, this->descr_, reinterpret_cast(&thrift_metadata_)); - std::unique_ptr pager(new SerializedPageWriter( - sink_.get(), Compression::UNCOMPRESSED, metadata_.get())); + std::unique_ptr pager( + new SerializedPageWriter(sink_.get(), column_properties.codec, metadata_.get())); WriterProperties::Builder wp_builder; - if (encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY) { + if (column_properties.encoding == Encoding::PLAIN_DICTIONARY || + column_properties.encoding == Encoding::RLE_DICTIONARY) { wp_builder.enable_dictionary(); } else { wp_builder.disable_dictionary(); - wp_builder.encoding(encoding); + wp_builder.encoding(column_properties.encoding); } writer_properties_ = wp_builder.build(); std::shared_ptr writer = ColumnWriter::Make( - this->descr_, std::move(pager), output_size, writer_properties_.get()); + metadata_.get(), std::move(pager), output_size, writer_properties_.get()); return std::static_pointer_cast>(writer); } - void ReadColumn() { - BuildReader(); + void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) { + BuildReader(compression); reader_->ReadBatch(this->values_out_.size(), definition_levels_out_.data(), repetition_levels_out_.data(), this->values_out_ptr_, &values_read_); this->SyncValuesOut(); } void TestRequiredWithEncoding(Encoding::type encoding) { + return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false); + } + + void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression, + bool enable_dictionary, bool enable_statistics) { this->GenerateData(SMALL_SIZE); // Test case 1: required and non-repeated, so no definition or repetition levels + ColumnProperties column_properties( + encoding, compression, enable_dictionary, enable_statistics); std::shared_ptr> writer = - this->BuildWriter(SMALL_SIZE, encoding); + this->BuildWriter(SMALL_SIZE, column_properties); writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); // The behaviour should be independent from the number of Close() calls writer->Close(); writer->Close(); - this->ReadColumn(); + this->ReadColumn(compression); ASSERT_EQ(SMALL_SIZE, this->values_read_); ASSERT_EQ(this->values_, this->values_out_); } @@ -115,8 +124,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { // Metadata accessor must be created lazily. // This is because the ColumnChunkMetaData semantics dictate the metadata object is // complete (no changes to the metadata buffer can be made after instantiation) - auto metadata_accessor = - ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); + auto metadata_accessor = ColumnChunkMetaData::Make( + reinterpret_cast(&thrift_metadata_), this->descr_); return metadata_accessor->num_values(); } @@ -124,8 +133,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { // Metadata accessor must be created lazily. // This is because the ColumnChunkMetaData semantics dictate the metadata object is // complete (no changes to the metadata buffer can be made after instantiation) - auto metadata_accessor = - ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); + auto metadata_accessor = ColumnChunkMetaData::Make( + reinterpret_cast(&thrift_metadata_), this->descr_); return metadata_accessor->encodings(); } @@ -148,8 +157,7 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { }; typedef ::testing::Types - TestTypes; + BooleanType, ByteArrayType, FLBAType> TestTypes; TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes); @@ -189,6 +197,26 @@ TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) { } */ +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false); +} + +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false); +} + +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true); +} + +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true); +} + +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true); +} + TYPED_TEST(TestPrimitiveWriter, Optional) { // Optional and non-repeated, with definition levels // but no repetition levels diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index 1de60131..d3954803 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -26,6 +26,7 @@ #include #include +#include "parquet/column/statistics.h" #include "parquet/types.h" #include "parquet/util/buffer.h" @@ -62,12 +63,14 @@ class DataPage : public Page { public: DataPage(const std::shared_ptr& buffer, int32_t num_values, Encoding::type encoding, Encoding::type definition_level_encoding, - Encoding::type repetition_level_encoding) + Encoding::type repetition_level_encoding, + const EncodedStatistics& statistics = EncodedStatistics()) : Page(buffer, PageType::DATA_PAGE), num_values_(num_values), encoding_(encoding), definition_level_encoding_(definition_level_encoding), - repetition_level_encoding_(repetition_level_encoding) {} + repetition_level_encoding_(repetition_level_encoding), + statistics_(statistics) {} int32_t num_values() const { return num_values_; } @@ -77,31 +80,24 @@ class DataPage : public Page { Encoding::type definition_level_encoding() const { return definition_level_encoding_; } - // DataPageHeader::statistics::max field, if it was set - const uint8_t* max() const { return reinterpret_cast(max_.c_str()); } - - // DataPageHeader::statistics::min field, if it was set - const uint8_t* min() const { return reinterpret_cast(min_.c_str()); } + const EncodedStatistics& statistics() const { return statistics_; } private: int32_t num_values_; Encoding::type encoding_; Encoding::type definition_level_encoding_; Encoding::type repetition_level_encoding_; - - // So max/min can be populated privately - friend class SerializedPageReader; - std::string max_; - std::string min_; + EncodedStatistics statistics_; }; class CompressedDataPage : public DataPage { public: CompressedDataPage(const std::shared_ptr& buffer, int32_t num_values, Encoding::type encoding, Encoding::type definition_level_encoding, - Encoding::type repetition_level_encoding, int64_t uncompressed_size) + Encoding::type repetition_level_encoding, int64_t uncompressed_size, + const EncodedStatistics& statistics = EncodedStatistics()) : DataPage(buffer, num_values, encoding, definition_level_encoding, - repetition_level_encoding), + repetition_level_encoding, statistics), uncompressed_size_(uncompressed_size) {} int64_t uncompressed_size() const { return uncompressed_size_; } diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h index cdd1a12b..ee7e7d68 100644 --- a/src/parquet/column/properties.h +++ b/src/parquet/column/properties.h @@ -82,13 +82,29 @@ static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024; static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE; static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; +static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true; static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN; static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION = ParquetVersion::PARQUET_1_0; static std::string DEFAULT_CREATED_BY = "Apache parquet-cpp"; static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED; -using ColumnCodecs = std::unordered_map; +class PARQUET_EXPORT ColumnProperties { + public: + ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING, + Compression::type codec = DEFAULT_COMPRESSION_TYPE, + bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED, + bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED) + : encoding(encoding), + codec(codec), + dictionary_enabled(dictionary_enabled), + statistics_enabled(statistics_enabled) {} + + Encoding::type encoding; + Compression::type codec; + bool dictionary_enabled; + bool statistics_enabled; +}; class PARQUET_EXPORT WriterProperties { public: @@ -96,14 +112,11 @@ class PARQUET_EXPORT WriterProperties { public: Builder() : allocator_(default_allocator()), - dictionary_enabled_default_(DEFAULT_IS_DICTIONARY_ENABLED), dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), pagesize_(DEFAULT_PAGE_SIZE), version_(DEFAULT_WRITER_VERSION), - created_by_(DEFAULT_CREATED_BY), - default_encoding_(DEFAULT_ENCODING), - default_codec_(DEFAULT_COMPRESSION_TYPE) {} + created_by_(DEFAULT_CREATED_BY) {} virtual ~Builder() {} Builder* allocator(MemoryAllocator* allocator) { @@ -112,12 +125,12 @@ class PARQUET_EXPORT WriterProperties { } Builder* enable_dictionary() { - dictionary_enabled_default_ = true; + default_column_properties_.dictionary_enabled = true; return this; } Builder* disable_dictionary() { - dictionary_enabled_default_ = false; + default_column_properties_.dictionary_enabled = false; return this; } @@ -131,12 +144,12 @@ class PARQUET_EXPORT WriterProperties { } Builder* disable_dictionary(const std::string& path) { - dictionary_enabled_[path] = true; + dictionary_enabled_[path] = false; return this; } Builder* disable_dictionary(const std::shared_ptr& path) { - return this->enable_dictionary(path->ToDotString()); + return this->disable_dictionary(path->ToDotString()); } Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) { @@ -175,7 +188,8 @@ class PARQUET_EXPORT WriterProperties { encoding_type == Encoding::RLE_DICTIONARY) { throw ParquetException("Can't use dictionary encoding as fallback encoding"); } - default_encoding_ = encoding_type; + + default_column_properties_.encoding = encoding_type; return this; } @@ -190,6 +204,7 @@ class PARQUET_EXPORT WriterProperties { encoding_type == Encoding::RLE_DICTIONARY) { throw ParquetException("Can't use dictionary encoding as fallback encoding"); } + encodings_[path] = encoding_type; return this; } @@ -206,7 +221,7 @@ class PARQUET_EXPORT WriterProperties { } Builder* compression(Compression::type codec) { - default_codec_ = codec; + default_column_properties_.codec = codec; return this; } @@ -220,40 +235,76 @@ class PARQUET_EXPORT WriterProperties { return this->compression(path->ToDotString(), codec); } + Builder* enable_statistics() { + default_column_properties_.statistics_enabled = true; + return this; + } + + Builder* disable_statistics() { + default_column_properties_.statistics_enabled = false; + return this; + } + + Builder* enable_statistics(const std::string& path) { + statistics_enabled_[path] = true; + return this; + } + + Builder* enable_statistics(const std::shared_ptr& path) { + return this->enable_statistics(path->ToDotString()); + } + + Builder* disable_statistics(const std::string& path) { + statistics_enabled_[path] = false; + return this; + } + + Builder* disable_statistics(const std::shared_ptr& path) { + return this->disable_statistics(path->ToDotString()); + } + std::shared_ptr build() { + std::unordered_map column_properties; + auto get = [&](const std::string& key) -> ColumnProperties& { + auto it = column_properties.find(key); + if (it == column_properties.end()) + return column_properties[key] = default_column_properties_; + else + return it->second; + }; + + for (const auto& item : encodings_) + get(item.first).encoding = item.second; + for (const auto& item : codecs_) + get(item.first).codec = item.second; + for (const auto& item : dictionary_enabled_) + get(item.first).dictionary_enabled = item.second; + for (const auto& item : statistics_enabled_) + get(item.first).statistics_enabled = item.second; + return std::shared_ptr(new WriterProperties(allocator_, - dictionary_enabled_default_, dictionary_enabled_, dictionary_pagesize_limit_, - write_batch_size_, pagesize_, version_, created_by_, default_encoding_, - encodings_, default_codec_, codecs_)); + dictionary_pagesize_limit_, write_batch_size_, pagesize_, version_, created_by_, + default_column_properties_, column_properties)); } private: MemoryAllocator* allocator_; - bool dictionary_enabled_default_; - std::unordered_map dictionary_enabled_; int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t pagesize_; ParquetVersion::type version_; std::string created_by_; - // Encoding used for each column if not a specialized one is defined as - // part of encodings_ - Encoding::type default_encoding_; + + // Settings used for each column unless overridden in any of the maps below + ColumnProperties default_column_properties_; std::unordered_map encodings_; - // Default compression codec. This will be used for all columns that do - // not have a specific codec set as part of codecs_ - Compression::type default_codec_; - ColumnCodecs codecs_; + std::unordered_map codecs_; + std::unordered_map dictionary_enabled_; + std::unordered_map statistics_enabled_; }; inline MemoryAllocator* allocator() const { return allocator_; } - inline bool dictionary_enabled(const std::shared_ptr& path) const { - auto it = dictionary_enabled_.find(path->ToDotString()); - if (it != dictionary_enabled_.end()) { return it->second; } - return dictionary_enabled_default_; - } - inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; } inline int64_t write_batch_size() const { return write_batch_size_; } @@ -264,12 +315,6 @@ class PARQUET_EXPORT WriterProperties { inline std::string created_by() const { return parquet_created_by_; } - inline Encoding::type encoding(const std::shared_ptr& path) const { - auto it = encodings_.find(path->ToDotString()); - if (it != encodings_.end()) { return it->second; } - return default_encoding_; - } - inline Encoding::type dictionary_index_encoding() const { if (parquet_version_ == ParquetVersion::PARQUET_1_0) { return Encoding::PLAIN_DICTIONARY; @@ -286,45 +331,51 @@ class PARQUET_EXPORT WriterProperties { } } - inline Compression::type compression( + const ColumnProperties& column_properties( const std::shared_ptr& path) const { - auto it = codecs_.find(path->ToDotString()); - if (it != codecs_.end()) return it->second; - return default_codec_; + auto it = column_properties_.find(path->ToDotString()); + if (it != column_properties_.end()) return it->second; + return default_column_properties_; + } + + Encoding::type encoding(const std::shared_ptr& path) const { + return column_properties(path).encoding; + } + + Compression::type compression(const std::shared_ptr& path) const { + return column_properties(path).codec; + } + + bool dictionary_enabled(const std::shared_ptr& path) const { + return column_properties(path).dictionary_enabled; + } + + bool statistics_enabled(const std::shared_ptr& path) const { + return column_properties(path).statistics_enabled; } private: - explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled_default, - std::unordered_map dictionary_enabled, - int64_t dictionary_pagesize, int64_t write_batch_size, int64_t pagesize, - ParquetVersion::type version, const std::string& created_by, - Encoding::type default_encoding, - std::unordered_map encodings, - Compression::type default_codec, const ColumnCodecs& codecs) + explicit WriterProperties(MemoryAllocator* allocator, int64_t dictionary_pagesize_limit, + int64_t write_batch_size, int64_t pagesize, ParquetVersion::type version, + const std::string& created_by, const ColumnProperties& default_column_properties, + const std::unordered_map& column_properties) : allocator_(allocator), - dictionary_enabled_default_(dictionary_enabled_default), - dictionary_enabled_(dictionary_enabled), - dictionary_pagesize_limit_(dictionary_pagesize), + dictionary_pagesize_limit_(dictionary_pagesize_limit), write_batch_size_(write_batch_size), pagesize_(pagesize), parquet_version_(version), parquet_created_by_(created_by), - default_encoding_(default_encoding), - encodings_(encodings), - default_codec_(default_codec), - codecs_(codecs) {} + default_column_properties_(default_column_properties), + column_properties_(column_properties) {} + MemoryAllocator* allocator_; - bool dictionary_enabled_default_; - std::unordered_map dictionary_enabled_; int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t pagesize_; ParquetVersion::type parquet_version_; std::string parquet_created_by_; - Encoding::type default_encoding_; - std::unordered_map encodings_; - Compression::type default_codec_; - ColumnCodecs codecs_; + ColumnProperties default_column_properties_; + std::unordered_map column_properties_; }; std::shared_ptr PARQUET_EXPORT default_writer_properties(); diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc index 8eee1912..3ac07dc1 100644 --- a/src/parquet/column/scanner-test.cc +++ b/src/parquet/column/scanner-test.cc @@ -146,8 +146,7 @@ static int num_pages = 20; static int batch_size = 32; typedef ::testing::Types - TestTypes; + ByteArrayType> TestTypes; using TestBooleanFlatScanner = TestFlatScanner; using TestFLBAFlatScanner = TestFlatScanner; diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc new file mode 100644 index 00000000..d1e1eebd --- /dev/null +++ b/src/parquet/column/statistics-test.cc @@ -0,0 +1,273 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include +#include +#include + +#include "parquet/column/reader.h" +#include "parquet/column/statistics.h" +#include "parquet/column/test-specialization.h" +#include "parquet/column/test-util.h" +#include "parquet/column/writer.h" +#include "parquet/file/reader.h" +#include "parquet/file/writer.h" +#include "parquet/schema/descriptor.h" +#include "parquet/types.h" +#include "parquet/util/input.h" +#include "parquet/util/mem-allocator.h" +#include "parquet/util/output.h" + +namespace parquet { + +using schema::NodePtr; +using schema::PrimitiveNode; +using schema::GroupNode; + +namespace test { + +template +class TestRowGroupStatistics : public PrimitiveTypedTest { + public: + using T = typename TestType::c_type; + using TypedStats = TypedRowGroupStatistics; + + std::vector GetDeepCopy( + const std::vector&); // allocates new memory for FLBA/ByteArray + + T* GetValuesPointer(std::vector&); + void DeepFree(std::vector&); + + void TestMinMaxEncode() { + this->GenerateData(1000); + + TypedStats statistics1(this->schema_.Column(0)); + statistics1.Update(this->values_ptr_, this->values_.size(), 0); + std::string encoded_min = statistics1.EncodeMin(); + std::string encoded_max = statistics1.EncodeMax(); + + TypedStats statistics2( + this->schema_.Column(0), encoded_min, encoded_max, this->values_.size(), 0, 0); + + ASSERT_EQ(encoded_min, statistics2.EncodeMin()); + ASSERT_EQ(encoded_max, statistics2.EncodeMax()); + ASSERT_EQ(statistics1.min(), statistics2.min()); + ASSERT_EQ(statistics1.max(), statistics2.max()); + } + + void TestReset() { + this->GenerateData(1000); + + TypedStats statistics(this->schema_.Column(0)); + statistics.Update(this->values_ptr_, this->values_.size(), 0); + ASSERT_EQ(this->values_.size(), statistics.num_values()); + + statistics.Reset(); + ASSERT_EQ(0, statistics.null_count()); + ASSERT_EQ(0, statistics.num_values()); + ASSERT_EQ("", statistics.EncodeMin()); + ASSERT_EQ("", statistics.EncodeMax()); + } + + void TestMerge() { + int num_null[2]; + random_numbers(2, 42, 0, 100, num_null); + + TypedStats statistics1(this->schema_.Column(0)); + this->GenerateData(1000); + statistics1.Update( + this->values_ptr_, this->values_.size() - num_null[0], num_null[0]); + + TypedStats statistics2(this->schema_.Column(0)); + this->GenerateData(1000); + statistics2.Update( + this->values_ptr_, this->values_.size() - num_null[1], num_null[1]); + + TypedStats total(this->schema_.Column(0)); + total.Merge(statistics1); + total.Merge(statistics2); + + ASSERT_EQ(num_null[0] + num_null[1], total.null_count()); + ASSERT_EQ(this->values_.size() * 2 - num_null[0] - num_null[1], total.num_values()); + ASSERT_EQ(total.min(), std::min(statistics1.min(), statistics2.min())); + ASSERT_EQ(total.max(), std::max(statistics1.max(), statistics2.max())); + } + + void TestFullRoundtrip(int64_t num_values, int64_t null_count) { + this->GenerateData(num_values); + + // compute statistics for the whole batch + TypedStats expected_stats(this->schema_.Column(0)); + expected_stats.Update(this->values_ptr_, num_values - null_count, null_count); + + auto sink = std::make_shared(); + auto gnode = std::static_pointer_cast(this->node_); + std::shared_ptr writer_properties = + WriterProperties::Builder().enable_statistics("column")->build(); + auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties); + auto row_group_writer = file_writer->AppendRowGroup(num_values); + auto column_writer = + static_cast*>(row_group_writer->NextColumn()); + + // simulate the case when data comes from multiple buffers, + // in which case special care is necessary for FLBA/ByteArray types + for (int i = 0; i < 2; i++) { + int batch_num_values = i ? num_values - num_values / 2 : num_values / 2; + int batch_null_count = i ? null_count : 0; + DCHECK(null_count <= num_values); // avoid too much headache + std::vector definition_levels(batch_null_count, 0); + definition_levels.insert( + definition_levels.end(), batch_num_values - batch_null_count, 1); + auto beg = this->values_.begin() + i * num_values / 2; + auto end = beg + batch_num_values; + std::vector batch = GetDeepCopy(std::vector(beg, end)); + T* batch_values_ptr = GetValuesPointer(batch); + column_writer->WriteBatch( + batch_num_values, definition_levels.data(), nullptr, batch_values_ptr); + DeepFree(batch); + } + column_writer->Close(); + row_group_writer->Close(); + file_writer->Close(); + + auto buffer = sink->GetBuffer(); + std::unique_ptr source(new BufferReader(buffer)); + auto file_reader = ParquetFileReader::Open(std::move(source)); + auto rg_reader = file_reader->RowGroup(0); + auto column_chunk = rg_reader->metadata()->ColumnChunk(0); + std::shared_ptr stats = column_chunk->statistics(); + // check values after serialization + deserialization + ASSERT_EQ(null_count, stats->null_count()); + ASSERT_EQ(num_values - null_count, stats->num_values()); + ASSERT_EQ(expected_stats.EncodeMin(), stats->EncodeMin()); + ASSERT_EQ(expected_stats.EncodeMax(), stats->EncodeMax()); + } +}; + +template +typename TestType::c_type* TestRowGroupStatistics::GetValuesPointer( + std::vector& values) { + return values.data(); +} + +template <> +bool* TestRowGroupStatistics::GetValuesPointer(std::vector& values) { + static std::vector bool_buffer; + bool_buffer.clear(); + bool_buffer.resize(values.size()); + std::copy(values.begin(), values.end(), bool_buffer.begin()); + return reinterpret_cast(bool_buffer.data()); +} + +template +typename std::vector +TestRowGroupStatistics::GetDeepCopy( + const std::vector& values) { + return values; +} + +template <> +std::vector TestRowGroupStatistics::GetDeepCopy( + const std::vector& values) { + std::vector copy; + MemoryAllocator* allocator = default_allocator(); + for (const FLBA& flba : values) { + uint8_t* ptr = allocator->Malloc(FLBA_LENGTH); + memcpy(ptr, flba.ptr, FLBA_LENGTH); + copy.emplace_back(ptr); + } + return copy; +} + +template <> +std::vector TestRowGroupStatistics::GetDeepCopy( + const std::vector& values) { + std::vector copy; + MemoryAllocator* allocator = default_allocator(); + for (const ByteArray& ba : values) { + uint8_t* ptr = allocator->Malloc(ba.len); + memcpy(ptr, ba.ptr, ba.len); + copy.emplace_back(ba.len, ptr); + } + return copy; +} + +template +void TestRowGroupStatistics::DeepFree( + std::vector& values) {} + +template <> +void TestRowGroupStatistics::DeepFree(std::vector& values) { + MemoryAllocator* allocator = default_allocator(); + for (FLBA& flba : values) { + auto ptr = const_cast(flba.ptr); + memset(ptr, 0, FLBA_LENGTH); + allocator->Free(ptr, FLBA_LENGTH); + } +} + +template <> +void TestRowGroupStatistics::DeepFree(std::vector& values) { + MemoryAllocator* allocator = default_allocator(); + for (ByteArray& ba : values) { + auto ptr = const_cast(ba.ptr); + memset(ptr, 0, ba.len); + allocator->Free(ptr, ba.len); + } +} + +using TestTypes = ::testing::Types; + +TYPED_TEST_CASE(TestRowGroupStatistics, TestTypes); + +TYPED_TEST(TestRowGroupStatistics, MinMaxEncode) { + this->SetUpSchema(Repetition::REQUIRED); + this->TestMinMaxEncode(); +} + +TYPED_TEST(TestRowGroupStatistics, Reset) { + this->SetUpSchema(Repetition::OPTIONAL); + this->TestReset(); +} + +TYPED_TEST(TestRowGroupStatistics, FullRoundtrip) { + this->SetUpSchema(Repetition::OPTIONAL); + this->TestFullRoundtrip(100, 31); + this->TestFullRoundtrip(1000, 415); + this->TestFullRoundtrip(10000, 926); +} + +template +class TestNumericRowGroupStatistics : public TestRowGroupStatistics {}; + +using NumericTypes = ::testing::Types; + +TYPED_TEST_CASE(TestNumericRowGroupStatistics, NumericTypes); + +TYPED_TEST(TestNumericRowGroupStatistics, Merge) { + this->SetUpSchema(Repetition::OPTIONAL); + this->TestMerge(); +} + +} // namespace test +} // namespace parquet diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc new file mode 100644 index 00000000..65d7df5c --- /dev/null +++ b/src/parquet/column/statistics.cc @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "parquet/column/statistics.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/util/buffer.h" +#include "parquet/util/comparison.h" +#include "parquet/util/output.h" +#include "parquet/exception.h" + +namespace parquet { + +template +TypedRowGroupStatistics::TypedRowGroupStatistics( + const ColumnDescriptor* schema, MemoryAllocator* allocator) + : allocator_(allocator), min_buffer_(0, allocator_), max_buffer_(0, allocator_) { + SetDescr(schema); + Reset(); +} + +template +TypedRowGroupStatistics::TypedRowGroupStatistics(const typename DType::c_type& min, + const typename DType::c_type& max, int64_t num_values, int64_t null_count, + int64_t distinct_count) + : allocator_(default_allocator()), + min_buffer_(0, allocator_), + max_buffer_(0, allocator_) { + IncrementNumValues(num_values); + IncrementNullCount(null_count); + IncrementDistinctCount(distinct_count); + + Copy(min, &min_, min_buffer_); + Copy(max, &max_, max_buffer_); + has_min_max_ = true; +} + +template +TypedRowGroupStatistics::TypedRowGroupStatistics(const ColumnDescriptor* schema, + const std::string& encoded_min, const std::string& encoded_max, int64_t num_values, + int64_t null_count, int64_t distinct_count, MemoryAllocator* allocator) + : allocator_(allocator), min_buffer_(0, allocator_), max_buffer_(0, allocator_) { + IncrementNumValues(num_values); + IncrementNullCount(null_count); + IncrementDistinctCount(distinct_count); + + SetDescr(schema); + + if (!encoded_min.empty()) { PlainDecode(encoded_min, &min_); } + if (!encoded_max.empty()) { PlainDecode(encoded_max, &max_); } + has_min_max_ = !encoded_min.empty() && !encoded_max.empty(); +} + +template +bool TypedRowGroupStatistics::HasMinMax() const { + return has_min_max_; +} + +template +void TypedRowGroupStatistics::Reset() { + ResetCounts(); + has_min_max_ = false; +} + +template +void TypedRowGroupStatistics::Copy(const T& src, T* dst, OwnedMutableBuffer&) { + *dst = src; +} + +template <> +void TypedRowGroupStatistics::Copy( + const FLBA& src, FLBA* dst, OwnedMutableBuffer& buffer) { + if (dst->ptr == src.ptr) return; + uint32_t len = descr_->type_length(); + buffer.Resize(len); + std::memcpy(&buffer[0], src.ptr, len); + *dst = FLBA(buffer.data()); +} + +template <> +void TypedRowGroupStatistics::Copy( + const ByteArray& src, ByteArray* dst, OwnedMutableBuffer& buffer) { + if (dst->ptr == src.ptr) return; + buffer.Resize(src.len); + std::memcpy(&buffer[0], src.ptr, src.len); + *dst = ByteArray(src.len, buffer.data()); +} + +template +void TypedRowGroupStatistics::Update( + const T* values, int64_t num_not_null, int64_t num_null) { + DCHECK(num_not_null >= 0); + DCHECK(num_null >= 0); + + IncrementNullCount(num_null); + IncrementNumValues(num_not_null); + // TODO: support distinct count? + if (num_not_null == 0) return; + + Compare compare(descr_); + auto batch_minmax = std::minmax_element(values, values + num_not_null, compare); + if (!has_min_max_) { + has_min_max_ = true; + Copy(*batch_minmax.first, &min_, min_buffer_); + Copy(*batch_minmax.second, &max_, max_buffer_); + } else { + Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_); + Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_); + } +} + +template +void TypedRowGroupStatistics::Merge(const TypedRowGroupStatistics& other) { + this->MergeCounts(other); + + if (!other.HasMinMax()) return; + + if (!has_min_max_) { + Copy(other.min_, &this->min_, min_buffer_); + Copy(other.max_, &this->max_, max_buffer_); + has_min_max_ = true; + return; + } + + Compare compare(descr_); + Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_); + Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_); +} + +template +std::string TypedRowGroupStatistics::EncodeMin() { + std::string s; + if (HasMinMax()) this->PlainEncode(min_, &s); + return s; +} + +template +std::string TypedRowGroupStatistics::EncodeMax() { + std::string s; + if (HasMinMax()) this->PlainEncode(max_, &s); + return s; +} + +template +EncodedStatistics TypedRowGroupStatistics::Encode() { + EncodedStatistics s; + if (HasMinMax()) { + s.set_min(this->EncodeMin()); + s.set_max(this->EncodeMax()); + } + s.set_null_count(this->null_count()); + return s; +} + +template +void TypedRowGroupStatistics::PlainEncode(const T& src, std::string* dst) { + PlainEncoder encoder(descr(), allocator_); + encoder.Put(&src, 1); + auto buffer = encoder.FlushValues(); + auto ptr = reinterpret_cast(buffer->data()); + dst->assign(ptr, buffer->size()); +} + +template +void TypedRowGroupStatistics::PlainDecode(const std::string& src, T* dst) { + PlainDecoder decoder(descr()); + decoder.SetData(1, reinterpret_cast(src.c_str()), src.size()); + decoder.Decode(dst, 1); +} + +template class TypedRowGroupStatistics; +template class TypedRowGroupStatistics; +template class TypedRowGroupStatistics; +template class TypedRowGroupStatistics; +template class TypedRowGroupStatistics; +template class TypedRowGroupStatistics; +template class TypedRowGroupStatistics; +template class TypedRowGroupStatistics; + +} // namespace parquet diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h new file mode 100644 index 00000000..02e5abdc --- /dev/null +++ b/src/parquet/column/statistics.h @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_COLUMN_STATISTICS_H +#define PARQUET_COLUMN_STATISTICS_H + +#include +#include +#include + +#include "parquet/types.h" +#include "parquet/schema/descriptor.h" +#include "parquet/util/buffer.h" +#include "parquet/util/mem-allocator.h" +#include "parquet/util/visibility.h" + +namespace parquet { + +class PARQUET_EXPORT EncodedStatistics { + std::shared_ptr max_, min_; + + public: + EncodedStatistics() + : max_(std::make_shared()), min_(std::make_shared()) {} + + const std::string& max() const { return *max_; } + const std::string& min() const { return *min_; } + + int64_t null_count = 0; + int64_t distinct_count = 0; + + bool has_min = false; + bool has_max = false; + bool has_null_count = false; + bool has_distinct_count = false; + + inline bool is_set() const { + return has_min || has_max || has_null_count || has_distinct_count; + } + + inline EncodedStatistics& set_max(const std::string& value) { + *max_ = value; + has_max = true; + return *this; + } + + inline EncodedStatistics& set_min(const std::string& value) { + *min_ = value; + has_min = true; + return *this; + } + + inline EncodedStatistics& set_null_count(int64_t value) { + null_count = value; + has_null_count = true; + return *this; + } + + inline EncodedStatistics& set_distinct_count(int64_t value) { + distinct_count = value; + has_distinct_count = true; + return *this; + } +}; + +template +class PARQUET_EXPORT TypedRowGroupStatistics; + +class PARQUET_EXPORT RowGroupStatistics + : public std::enable_shared_from_this { + public: + int64_t null_count() const { return statistics_.null_count; } + int64_t distinct_count() const { return statistics_.distinct_count; } + int64_t num_values() const { return num_values_; } + + virtual bool HasMinMax() const = 0; + virtual void Reset() = 0; + + // Plain-encoded minimum value + virtual std::string EncodeMin() = 0; + + // Plain-encoded maximum value + virtual std::string EncodeMax() = 0; + + virtual EncodedStatistics Encode() = 0; + + virtual ~RowGroupStatistics() {} + + Type::type physical_type() const { return descr_->physical_type(); } + + protected: + const ColumnDescriptor* descr() const { return descr_; } + void SetDescr(const ColumnDescriptor* schema) { descr_ = schema; } + + void IncrementNullCount(int64_t n) { statistics_.null_count += n; } + + void IncrementNumValues(int64_t n) { num_values_ += n; } + + void IncrementDistinctCount(int64_t n) { statistics_.distinct_count += n; } + + void MergeCounts(const RowGroupStatistics& other) { + this->statistics_.null_count += other.statistics_.null_count; + this->statistics_.distinct_count += other.statistics_.distinct_count; + this->num_values_ += other.num_values_; + } + + void ResetCounts() { + this->statistics_.null_count = 0; + this->statistics_.distinct_count = 0; + this->num_values_ = 0; + } + + const ColumnDescriptor* descr_ = nullptr; + int64_t num_values_ = 0; + EncodedStatistics statistics_; +}; + +template +class PARQUET_EXPORT TypedRowGroupStatistics : public RowGroupStatistics { + public: + using T = typename DType::c_type; + + TypedRowGroupStatistics( + const ColumnDescriptor* schema, MemoryAllocator* allocator = default_allocator()); + + TypedRowGroupStatistics(const T& min, const T& max, int64_t num_values, + int64_t null_count, int64_t distinct_count); + + TypedRowGroupStatistics(const ColumnDescriptor* schema, const std::string& encoded_min, + const std::string& encoded_max, int64_t num_values, int64_t null_count, + int64_t distinct_count, MemoryAllocator* allocator = default_allocator()); + + bool HasMinMax() const override; + void Reset() override; + void Merge(const TypedRowGroupStatistics& other); + + void Update(const T* values, int64_t num_not_null, int64_t num_null); + + const T& min() const { return min_; } + const T& max() const { return max_; } + + std::string EncodeMin() override; + std::string EncodeMax() override; + EncodedStatistics Encode() override; + + private: + bool has_min_max_ = false; + T min_; + T max_; + MemoryAllocator* allocator_; + + void PlainEncode(const T& src, std::string* dst); + void PlainDecode(const std::string& src, T* dst); + void Copy(const T& src, T* dst, OwnedMutableBuffer& buffer); + + OwnedMutableBuffer min_buffer_, max_buffer_; +}; + +using BoolStatistics = TypedRowGroupStatistics; +using Int32Statistics = TypedRowGroupStatistics; +using Int64Statistics = TypedRowGroupStatistics; +using Int96Statistics = TypedRowGroupStatistics; +using FloatStatistics = TypedRowGroupStatistics; +using DoubleStatistics = TypedRowGroupStatistics; +using ByteArrayStatistics = TypedRowGroupStatistics; +using FLBAStatistics = TypedRowGroupStatistics; + +} // namespace parquet + +#endif // PARQUET_COLUMN_STATISTICS_H diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index bfbd0c5d..5c10ab99 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -18,6 +18,7 @@ #include "parquet/column/writer.h" #include "parquet/column/properties.h" +#include "parquet/column/statistics.h" #include "parquet/encodings/dictionary-encoding.h" #include "parquet/encodings/plain-encoding.h" @@ -32,10 +33,11 @@ std::shared_ptr default_writer_properties() { return default_writer_properties; } -ColumnWriter::ColumnWriter(const ColumnDescriptor* descr, +ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata, std::unique_ptr pager, int64_t expected_rows, bool has_dictionary, Encoding::type encoding, const WriterProperties* properties) - : descr_(descr), + : metadata_(metadata), + descr_(metadata->descr()), pager_(std::move(pager)), expected_rows_(expected_rows), has_dictionary_(has_dictionary), @@ -117,9 +119,11 @@ void ColumnWriter::AddDataPage() { uncompressed_ptr += definition_levels->size(); memcpy(uncompressed_ptr, values->data(), values->size()); + EncodedStatistics page_stats = GetPageStatistics(); + ResetPageStatistics(); std::shared_ptr compressed_data = pager_->Compress(uncompressed_data); CompressedDataPage page(compressed_data, num_buffered_values_, encoding_, Encoding::RLE, - Encoding::RLE, uncompressed_size); + Encoding::RLE, uncompressed_size, page_stats); // Write the page to OutputStream eagerly if there is no dictionary or // if dictionary encoding has fallen back to PLAIN @@ -147,6 +151,8 @@ int64_t ColumnWriter::Close() { FlushBufferedDataPages(); + EncodedStatistics chunk_statistics = GetChunkStatistics(); + if (chunk_statistics.is_set()) metadata_->SetStatistics(chunk_statistics); pager_->Close(has_dictionary_, fallback_); } @@ -172,25 +178,30 @@ void ColumnWriter::FlushBufferedDataPages() { // TypedColumnWriter template -TypedColumnWriter::TypedColumnWriter(const ColumnDescriptor* descr, +TypedColumnWriter::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, std::unique_ptr pager, int64_t expected_rows, Encoding::type encoding, const WriterProperties* properties) - : ColumnWriter(descr, std::move(pager), expected_rows, + : ColumnWriter(metadata, std::move(pager), expected_rows, (encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY), encoding, properties) { switch (encoding) { case Encoding::PLAIN: - current_encoder_.reset(new PlainEncoder(descr, properties->allocator())); + current_encoder_.reset(new PlainEncoder(descr_, properties->allocator())); break; case Encoding::PLAIN_DICTIONARY: case Encoding::RLE_DICTIONARY: current_encoder_.reset( - new DictEncoder(descr, &pool_, properties->allocator())); + new DictEncoder(descr_, &pool_, properties->allocator())); break; default: ParquetException::NYI("Selected encoding is not supported"); } + + if (properties->statistics_enabled(descr_->path())) { + page_statistics_ = std::unique_ptr(new TypedStats(descr_, allocator_)); + chunk_statistics_ = std::unique_ptr(new TypedStats(descr_, allocator_)); + } } // Only one Dictionary Page is written. @@ -221,12 +232,35 @@ void TypedColumnWriter::WriteDictionaryPage() { total_bytes_written_ += pager_->WriteDictionaryPage(page); } +template +EncodedStatistics TypedColumnWriter::GetPageStatistics() { + EncodedStatistics result; + if (page_statistics_) result = page_statistics_->Encode(); + return result; +} + +template +EncodedStatistics TypedColumnWriter::GetChunkStatistics() { + EncodedStatistics result; + if (chunk_statistics_) result = chunk_statistics_->Encode(); + return result; +} + +template +void TypedColumnWriter::ResetPageStatistics() { + if (chunk_statistics_ != nullptr) { + chunk_statistics_->Merge(*page_statistics_); + page_statistics_->Reset(); + } +} + // ---------------------------------------------------------------------- // Dynamic column writer constructor -std::shared_ptr ColumnWriter::Make(const ColumnDescriptor* descr, +std::shared_ptr ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata, std::unique_ptr pager, int64_t expected_rows, const WriterProperties* properties) { + const ColumnDescriptor* descr = metadata->descr(); Encoding::type encoding = properties->encoding(descr->path()); if (properties->dictionary_enabled(descr->path()) && descr->physical_type() != Type::BOOLEAN) { @@ -235,28 +269,28 @@ std::shared_ptr ColumnWriter::Make(const ColumnDescriptor* descr, switch (descr->physical_type()) { case Type::BOOLEAN: return std::make_shared( - descr, std::move(pager), expected_rows, encoding, properties); + metadata, std::move(pager), expected_rows, encoding, properties); case Type::INT32: return std::make_shared( - descr, std::move(pager), expected_rows, encoding, properties); + metadata, std::move(pager), expected_rows, encoding, properties); case Type::INT64: return std::make_shared( - descr, std::move(pager), expected_rows, encoding, properties); + metadata, std::move(pager), expected_rows, encoding, properties); case Type::INT96: return std::make_shared( - descr, std::move(pager), expected_rows, encoding, properties); + metadata, std::move(pager), expected_rows, encoding, properties); case Type::FLOAT: return std::make_shared( - descr, std::move(pager), expected_rows, encoding, properties); + metadata, std::move(pager), expected_rows, encoding, properties); case Type::DOUBLE: return std::make_shared( - descr, std::move(pager), expected_rows, encoding, properties); + metadata, std::move(pager), expected_rows, encoding, properties); case Type::BYTE_ARRAY: return std::make_shared( - descr, std::move(pager), expected_rows, encoding, properties); + metadata, std::move(pager), expected_rows, encoding, properties); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_shared( - descr, std::move(pager), expected_rows, encoding, properties); + metadata, std::move(pager), expected_rows, encoding, properties); default: ParquetException::NYI("type reader not implemented"); } diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index 3a54cbb6..ab01818f 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -23,6 +23,8 @@ #include "parquet/column/levels.h" #include "parquet/column/page.h" #include "parquet/column/properties.h" +#include "parquet/column/statistics.h" +#include "parquet/file/metadata.h" #include "parquet/encodings/encoder.h" #include "parquet/schema/descriptor.h" #include "parquet/types.h" @@ -36,11 +38,11 @@ namespace parquet { static constexpr int WRITE_BATCH_SIZE = 1000; class PARQUET_EXPORT ColumnWriter { public: - ColumnWriter(const ColumnDescriptor*, std::unique_ptr, + ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr, int64_t expected_rows, bool has_dictionary, Encoding::type encoding, const WriterProperties* properties); - static std::shared_ptr Make(const ColumnDescriptor*, + static std::shared_ptr Make(ColumnChunkMetaDataBuilder*, std::unique_ptr, int64_t expected_rows, const WriterProperties* properties); @@ -67,6 +69,15 @@ class PARQUET_EXPORT ColumnWriter { virtual void CheckDictionarySizeLimit() = 0; + // Plain-encoded statistics of the current page + virtual EncodedStatistics GetPageStatistics() = 0; + + // Plain-encoded statistics of the whole chunk + virtual EncodedStatistics GetChunkStatistics() = 0; + + // Merges page statistics into chunk statistics, then resets the values + virtual void ResetPageStatistics() = 0; + // Adds Data Pages to an in memory buffer in dictionary encoding mode // Serializes the Data Pages in other encoding modes void AddDataPage(); @@ -86,6 +97,7 @@ class PARQUET_EXPORT ColumnWriter { // Serialize the buffered Data Pages void FlushBufferedDataPages(); + ColumnChunkMetaDataBuilder* metadata_; const ColumnDescriptor* descr_; std::unique_ptr pager_; @@ -140,8 +152,9 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter { public: typedef typename DType::c_type T; - TypedColumnWriter(const ColumnDescriptor* schema, std::unique_ptr pager, - int64_t expected_rows, Encoding::type encoding, const WriterProperties* properties); + TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, + std::unique_ptr pager, int64_t expected_rows, Encoding::type encoding, + const WriterProperties* properties); // Write a batch of repetition levels, definition levels, and values to the // column. @@ -154,6 +167,9 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter { } void WriteDictionaryPage() override; void CheckDictionarySizeLimit() override; + EncodedStatistics GetPageStatistics() override; + EncodedStatistics GetChunkStatistics() override; + void ResetPageStatistics() override; private: int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels, @@ -164,6 +180,10 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter { // Write values to a temporary buffer before they are encoded into pages void WriteValues(int64_t num_values, const T* values); std::unique_ptr current_encoder_; + + typedef TypedRowGroupStatistics TypedStats; + std::unique_ptr page_statistics_; + std::unique_ptr chunk_statistics_; }; template @@ -202,6 +222,10 @@ inline int64_t TypedColumnWriter::WriteMiniBatch(int64_t num_values, WriteValues(values_to_write, values); + if (page_statistics_ != nullptr) { + page_statistics_->Update(values, values_to_write, num_values - values_to_write); + } + num_buffered_values_ += num_values; num_buffered_encoded_values_ += values_to_write; diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc index daa25cbc..6af7c9db 100644 --- a/src/parquet/encodings/encoding-test.cc +++ b/src/parquet/encodings/encoding-test.cc @@ -238,8 +238,7 @@ TYPED_TEST(TestPlainEncoding, BasicRoundTrip) { // Dictionary encoding tests typedef ::testing::Types - DictEncodedTypes; + ByteArrayType, FLBAType> DictEncodedTypes; template class TestDictionaryEncoding : public TestEncodingBase { diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc index adc90ede..8f832dfb 100644 --- a/src/parquet/file/file-deserialize-test.cc +++ b/src/parquet/file/file-deserialize-test.cc @@ -105,12 +105,10 @@ void CheckDataPageHeader(const format::DataPageHeader expected, const Page* page ASSERT_EQ(expected.repetition_level_encoding, data_page->repetition_level_encoding()); if (expected.statistics.__isset.max) { - ASSERT_EQ(0, memcmp(expected.statistics.max.c_str(), data_page->max(), - expected.statistics.max.length())); + ASSERT_EQ(expected.statistics.max, data_page->statistics().max()); } if (expected.statistics.__isset.min) { - ASSERT_EQ(0, memcmp(expected.statistics.min.c_str(), data_page->min(), - expected.statistics.min.length())); + ASSERT_EQ(expected.statistics.min, data_page->statistics().min()); } } diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc index 7bb2ae5c..d3d0edc4 100644 --- a/src/parquet/file/file-metadata-test.cc +++ b/src/parquet/file/file-metadata-test.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include "parquet/column/statistics.h" #include "parquet/file/metadata.h" #include "parquet/schema/descriptor.h" #include "parquet/schema/types.h" @@ -38,20 +39,18 @@ TEST(Metadata, TestBuildAccess) { schema.Init(root); int64_t nrows = 1000; - ColumnStatistics stats_int; - stats_int.null_count = 0; - stats_int.distinct_count = nrows; - std::string int_min = std::string("100"); - std::string int_max = std::string("200"); - stats_int.min = &int_min; - stats_int.max = &int_max; - ColumnStatistics stats_float; - stats_float.null_count = 0; - stats_float.distinct_count = nrows; - std::string float_min = std::string("100.100"); - std::string float_max = std::string("200.200"); - stats_float.min = &float_min; - stats_float.max = &float_max; + int32_t int_min = 100, int_max = 200; + EncodedStatistics stats_int; + stats_int.set_null_count(0) + .set_distinct_count(nrows) + .set_min(std::string(reinterpret_cast(&int_min), 4)) + .set_max(std::string(reinterpret_cast(&int_max), 4)); + EncodedStatistics stats_float; + float float_min = 100.100, float_max = 200.200; + stats_float.set_null_count(0) + .set_distinct_count(nrows) + .set_min(std::string(reinterpret_cast(&float_min), 4)) + .set_max(std::string(reinterpret_cast(&float_max), 4)); auto f_builder = FileMetaDataBuilder::Make(&schema, props); auto rg1_builder = f_builder->AppendRowGroup(nrows / 2); @@ -99,14 +98,14 @@ TEST(Metadata, TestBuildAccess) { auto rg1_column2 = rg1_accessor->ColumnChunk(1); ASSERT_EQ(true, rg1_column1->is_stats_set()); ASSERT_EQ(true, rg1_column2->is_stats_set()); - ASSERT_EQ("100.100", *rg1_column2->statistics().min); - ASSERT_EQ("200.200", *rg1_column2->statistics().max); - ASSERT_EQ("100", *rg1_column1->statistics().min); - ASSERT_EQ("200", *rg1_column1->statistics().max); - ASSERT_EQ(0, rg1_column1->statistics().null_count); - ASSERT_EQ(0, rg1_column2->statistics().null_count); - ASSERT_EQ(nrows, rg1_column1->statistics().distinct_count); - ASSERT_EQ(nrows, rg1_column2->statistics().distinct_count); + ASSERT_EQ(stats_float.min(), rg1_column2->statistics()->EncodeMin()); + ASSERT_EQ(stats_float.max(), rg1_column2->statistics()->EncodeMax()); + ASSERT_EQ(stats_int.min(), rg1_column1->statistics()->EncodeMin()); + ASSERT_EQ(stats_int.max(), rg1_column1->statistics()->EncodeMax()); + ASSERT_EQ(0, rg1_column1->statistics()->null_count()); + ASSERT_EQ(0, rg1_column2->statistics()->null_count()); + ASSERT_EQ(nrows, rg1_column1->statistics()->distinct_count()); + ASSERT_EQ(nrows, rg1_column2->statistics()->distinct_count()); ASSERT_EQ(DEFAULT_COMPRESSION_TYPE, rg1_column1->compression()); ASSERT_EQ(DEFAULT_COMPRESSION_TYPE, rg1_column2->compression()); ASSERT_EQ(nrows / 2, rg1_column1->num_values()); @@ -131,14 +130,14 @@ TEST(Metadata, TestBuildAccess) { auto rg2_column2 = rg2_accessor->ColumnChunk(1); ASSERT_EQ(true, rg2_column1->is_stats_set()); ASSERT_EQ(true, rg2_column2->is_stats_set()); - ASSERT_EQ("100.100", *rg2_column2->statistics().min); - ASSERT_EQ("200.200", *rg2_column2->statistics().max); - ASSERT_EQ("100", *rg2_column1->statistics().min); - ASSERT_EQ("200", *rg2_column1->statistics().max); - ASSERT_EQ(0, rg2_column1->statistics().null_count); - ASSERT_EQ(0, rg2_column2->statistics().null_count); - ASSERT_EQ(nrows, rg2_column1->statistics().distinct_count); - ASSERT_EQ(nrows, rg2_column2->statistics().distinct_count); + ASSERT_EQ(stats_float.min(), rg2_column2->statistics()->EncodeMin()); + ASSERT_EQ(stats_float.max(), rg2_column2->statistics()->EncodeMax()); + ASSERT_EQ(stats_int.min(), rg1_column1->statistics()->EncodeMin()); + ASSERT_EQ(stats_int.max(), rg1_column1->statistics()->EncodeMax()); + ASSERT_EQ(0, rg2_column1->statistics()->null_count()); + ASSERT_EQ(0, rg2_column2->statistics()->null_count()); + ASSERT_EQ(nrows, rg2_column1->statistics()->distinct_count()); + ASSERT_EQ(nrows, rg2_column2->statistics()->distinct_count()); ASSERT_EQ(nrows / 2, rg2_column1->num_values()); ASSERT_EQ(nrows / 2, rg2_column2->num_values()); ASSERT_EQ(DEFAULT_COMPRESSION_TYPE, rg2_column1->compression()); diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc index 42a73c9d..5980e555 100644 --- a/src/parquet/file/file-serialize-test.cc +++ b/src/parquet/file/file-serialize-test.cc @@ -106,8 +106,7 @@ class TestSerialize : public PrimitiveTypedTest { }; typedef ::testing::Types - TestTypes; + BooleanType, ByteArrayType, FLBAType> TestTypes; TYPED_TEST_CASE(TestSerialize, TestTypes); diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index 36098c58..d50c9155 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -17,27 +17,56 @@ #include +#include "parquet/exception.h" #include "parquet/file/metadata.h" #include "parquet/schema/converter.h" #include "parquet/thrift/util.h" namespace parquet { +template +static std::shared_ptr MakeTypedColumnStats( + const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) { + return std::make_shared>(descr, metadata.statistics.min, + metadata.statistics.max, metadata.num_values - metadata.statistics.null_count, + metadata.statistics.null_count, metadata.statistics.distinct_count); +} + +std::shared_ptr MakeColumnStats( + const format::ColumnMetaData& meta_data, const ColumnDescriptor* descr) { + switch (meta_data.type) { + case Type::BOOLEAN: + return MakeTypedColumnStats(meta_data, descr); + case Type::INT32: + return MakeTypedColumnStats(meta_data, descr); + case Type::INT64: + return MakeTypedColumnStats(meta_data, descr); + case Type::INT96: + return MakeTypedColumnStats(meta_data, descr); + case Type::DOUBLE: + return MakeTypedColumnStats(meta_data, descr); + case Type::FLOAT: + return MakeTypedColumnStats(meta_data, descr); + case Type::BYTE_ARRAY: + return MakeTypedColumnStats(meta_data, descr); + case Type::FIXED_LEN_BYTE_ARRAY: + return MakeTypedColumnStats(meta_data, descr); + } + throw ParquetException("Can't decode page statistics for selected column type"); +} + // MetaData Accessor // ColumnChunk metadata class ColumnChunkMetaData::ColumnChunkMetaDataImpl { public: - explicit ColumnChunkMetaDataImpl(const format::ColumnChunk* column) : column_(column) { + explicit ColumnChunkMetaDataImpl( + const format::ColumnChunk* column, const ColumnDescriptor* descr) + : column_(column), descr_(descr) { const format::ColumnMetaData& meta_data = column->meta_data; for (auto encoding : meta_data.encodings) { encodings_.push_back(FromThrift(encoding)); } - if (meta_data.__isset.statistics) { - stats_.null_count = meta_data.statistics.null_count; - stats_.distinct_count = meta_data.statistics.distinct_count; - stats_.max = &meta_data.statistics.max; - stats_.min = &meta_data.statistics.min; - } + if (meta_data.__isset.statistics) { stats_ = MakeColumnStats(meta_data, descr_); } } ~ColumnChunkMetaDataImpl() {} @@ -56,7 +85,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { inline bool is_stats_set() const { return column_->meta_data.__isset.statistics; } - inline const ColumnStatistics& statistics() const { return stats_; } + inline std::shared_ptr statistics() const { return stats_; } inline Compression::type compression() const { return FromThrift(column_->meta_data.codec); @@ -87,18 +116,21 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { } private: - ColumnStatistics stats_; + std::shared_ptr stats_; std::vector encodings_; const format::ColumnChunk* column_; + const ColumnDescriptor* descr_; }; -std::unique_ptr ColumnChunkMetaData::Make(const uint8_t* metadata) { - return std::unique_ptr(new ColumnChunkMetaData(metadata)); +std::unique_ptr ColumnChunkMetaData::Make( + const uint8_t* metadata, const ColumnDescriptor* descr) { + return std::unique_ptr(new ColumnChunkMetaData(metadata, descr)); } -ColumnChunkMetaData::ColumnChunkMetaData(const uint8_t* metadata) +ColumnChunkMetaData::ColumnChunkMetaData( + const uint8_t* metadata, const ColumnDescriptor* descr) : impl_{std::unique_ptr(new ColumnChunkMetaDataImpl( - reinterpret_cast(metadata)))} {} + reinterpret_cast(metadata), descr))} {} ColumnChunkMetaData::~ColumnChunkMetaData() {} // column chunk @@ -123,7 +155,7 @@ std::shared_ptr ColumnChunkMetaData::path_in_schema() const return impl_->path_in_schema(); } -const ColumnStatistics& ColumnChunkMetaData::statistics() const { +std::shared_ptr ColumnChunkMetaData::statistics() const { return impl_->statistics(); } @@ -187,7 +219,7 @@ class RowGroupMetaData::RowGroupMetaDataImpl { throw ParquetException(ss.str()); } return ColumnChunkMetaData::Make( - reinterpret_cast(&row_group_->columns[i])); + reinterpret_cast(&row_group_->columns[i]), schema_->Column(i)); } private: @@ -348,15 +380,18 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); } // column metadata - void SetStatistics(const ColumnStatistics& val) { + void SetStatistics(const EncodedStatistics& val) { format::Statistics stats; stats.null_count = val.null_count; stats.distinct_count = val.distinct_count; - stats.max = *val.max; - stats.min = *val.min; - - column_chunk_->meta_data.statistics = stats; - column_chunk_->meta_data.__isset.statistics = true; + stats.max = val.max(); + stats.min = val.min(); + stats.__isset.min = val.has_min; + stats.__isset.max = val.has_max; + stats.__isset.null_count = val.has_null_count; + stats.__isset.distinct_count = val.has_distinct_count; + + column_chunk_->meta_data.__set_statistics(stats); } void Finish(int64_t num_values, int64_t dictionary_page_offset, @@ -430,7 +465,7 @@ const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const { return impl_->descr(); } -void ColumnChunkMetaDataBuilder::SetStatistics(const ColumnStatistics& result) { +void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) { impl_->SetStatistics(result); } diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index 5b8115be..94e6d661 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -23,6 +23,7 @@ #include #include "parquet/column/properties.h" +#include "parquet/column/statistics.h" #include "parquet/compression/codec.h" #include "parquet/schema/descriptor.h" #include "parquet/types.h" @@ -31,18 +32,11 @@ namespace parquet { -// ColumnStatistics does not own the min/max values -struct ColumnStatistics { - int64_t null_count; - int64_t distinct_count; - const std::string* min; - const std::string* max; -}; - class PARQUET_EXPORT ColumnChunkMetaData { public: // API convenience to get a MetaData accessor - static std::unique_ptr Make(const uint8_t* metadata); + static std::unique_ptr Make( + const uint8_t* metadata, const ColumnDescriptor* descr); ~ColumnChunkMetaData(); @@ -55,7 +49,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { int64_t num_values() const; std::shared_ptr path_in_schema() const; bool is_stats_set() const; - const ColumnStatistics& statistics() const; + std::shared_ptr statistics() const; Compression::type compression() const; const std::vector& encodings() const; int64_t has_dictionary_page() const; @@ -66,7 +60,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { int64_t total_uncompressed_size() const; private: - explicit ColumnChunkMetaData(const uint8_t* metadata); + explicit ColumnChunkMetaData(const uint8_t* metadata, const ColumnDescriptor* descr); // PIMPL Idiom class ColumnChunkMetaDataImpl; std::unique_ptr impl_; @@ -143,8 +137,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { // Used when a dataset is spread across multiple files void set_file_path(const std::string& path); // column metadata - // ownership of min/max is with ColumnChunkMetadata - void SetStatistics(const ColumnStatistics& stats); + void SetStatistics(const EncodedStatistics& stats); // get the column descriptor const ColumnDescriptor* descr() const; // commit the metadata diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index 8477052b..fa193904 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -115,15 +115,23 @@ std::shared_ptr SerializedPageReader::NextPage() { } else if (current_page_header_.type == format::PageType::DATA_PAGE) { const format::DataPageHeader& header = current_page_header_.data_page_header; + EncodedStatistics page_statistics; + if (header.__isset.statistics) { + const format::Statistics& stats = header.statistics; + if (stats.__isset.max) { page_statistics.set_max(stats.max); } + if (stats.__isset.min) { page_statistics.set_min(stats.min); } + if (stats.__isset.null_count) { + page_statistics.set_null_count(stats.null_count); + } + if (stats.__isset.distinct_count) { + page_statistics.set_distinct_count(stats.distinct_count); + } + } + auto page = std::make_shared(page_buffer, header.num_values, FromThrift(header.encoding), FromThrift(header.definition_level_encoding), - FromThrift(header.repetition_level_encoding)); + FromThrift(header.repetition_level_encoding), page_statistics); - if (header.__isset.statistics) { - const format::Statistics stats = header.statistics; - if (stats.__isset.max) { page->max_ = stats.max; } - if (stats.__isset.min) { page->min_ = stats.min; } - } return page; } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) { const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; @@ -167,6 +175,14 @@ std::unique_ptr SerializedRowGroup::GetColumnPageReader(int i) { std::move(stream), col->compression(), properties_.allocator())); } +template +static std::shared_ptr MakeColumnStats( + const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) { + return std::make_shared>(descr, metadata.statistics.min, + metadata.statistics.max, metadata.num_values, metadata.statistics.null_count, + metadata.statistics.distinct_count); +} + // ---------------------------------------------------------------------- // SerializedFile: Parquet on-disk layout @@ -198,7 +214,6 @@ SerializedFile::~SerializedFile() { std::shared_ptr SerializedFile::GetRowGroup(int i) { std::unique_ptr contents( new SerializedRowGroup(source_.get(), file_metadata_->RowGroup(i), properties_)); - return std::make_shared(std::move(contents)); } diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc index 3672d94e..9c33f8df 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file/reader.cc @@ -160,16 +160,16 @@ void ParquetFileReader::DebugPrint( // Print column metadata for (auto i : selected_columns) { auto column_chunk = group_metadata->ColumnChunk(i); - const ColumnStatistics stats = column_chunk->statistics(); + std::shared_ptr stats = column_chunk->statistics(); const ColumnDescriptor* descr = file_metadata->schema()->Column(i); stream << "Column " << i << std::endl << ", values: " << column_chunk->num_values(); if (column_chunk->is_stats_set()) { - stream << ", null values: " << stats.null_count - << ", distinct values: " << stats.distinct_count << std::endl - << " max: " << FormatStatValue(descr->physical_type(), stats.max->c_str()) - << ", min: " - << FormatStatValue(descr->physical_type(), stats.min->c_str()); + std::string min = stats->EncodeMin(), max = stats->EncodeMax(); + stream << ", null values: " << stats->null_count() + << ", distinct values: " << stats->distinct_count() << std::endl + << " max: " << FormatStatValue(descr->physical_type(), max.c_str()) + << ", min: " << FormatStatValue(descr->physical_type(), min.c_str()); } else { stream << " Statistics Not Set"; } diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h index bc92af3b..89dbbc8c 100644 --- a/src/parquet/file/reader.h +++ b/src/parquet/file/reader.h @@ -27,6 +27,7 @@ #include "parquet/column/page.h" #include "parquet/column/properties.h" +#include "parquet/column/statistics.h" #include "parquet/file/metadata.h" #include "parquet/schema/descriptor.h" #include "parquet/util/visibility.h" diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 554e7796..5a7c70ee 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -46,7 +46,20 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type compressor_ = Codec::Create(codec); } +static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) { + format::Statistics statistics; + if (row_group_statistics.has_min) statistics.__set_min(row_group_statistics.min()); + if (row_group_statistics.has_max) statistics.__set_max(row_group_statistics.max()); + if (row_group_statistics.has_null_count) + statistics.__set_null_count(row_group_statistics.null_count); + if (row_group_statistics.has_distinct_count) + statistics.__set_distinct_count(row_group_statistics.distinct_count); + return statistics; +} + void SerializedPageWriter::Close(bool has_dictionary, bool fallback) { + // index_page_offset = 0 since they are not supported + // TODO: Remove default fallback = 'false' when implemented metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_, total_compressed_size_, total_uncompressed_size_, has_dictionary, fallback); } @@ -77,7 +90,7 @@ int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) { ToThrift(page.definition_level_encoding())); data_page_header.__set_repetition_level_encoding( ToThrift(page.repetition_level_encoding())); - // TODO(PARQUET-593) statistics + data_page_header.__set_statistics(ToThrift(page.statistics())); format::PageHeader page_header; page_header.__set_type(format::PageType::DATA_PAGE); @@ -149,7 +162,7 @@ ColumnWriter* RowGroupSerializer::NextColumn() { new SerializedPageWriter(sink_, properties_->compression(column_descr->path()), col_meta, properties_->allocator())); current_column_writer_ = - ColumnWriter::Make(col_meta->descr(), std::move(pager), num_rows_, properties_); + ColumnWriter::Make(col_meta, std::move(pager), num_rows_, properties_); return current_column_writer_.get(); } diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h index 422f008b..e82f0166 100644 --- a/src/parquet/file/writer.h +++ b/src/parquet/file/writer.h @@ -42,8 +42,6 @@ class PARQUET_EXPORT RowGroupWriter { virtual int num_columns() const = 0; virtual int64_t num_rows() const = 0; - // TODO: PARQUET-579 - // virtual void WriteRowGroupStatitics(); virtual ColumnWriter* NextColumn() = 0; virtual void Close() = 0; }; @@ -68,9 +66,6 @@ class PARQUET_EXPORT RowGroupWriter { */ int64_t num_rows() const; - // TODO: PARQUET-579 - // virtual void WriteRowGroupStatitics(); - private: // Holds a pointer to an instance of Contents implementation std::unique_ptr contents_; diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h index a340c6c0..f629689f 100644 --- a/src/parquet/thrift/util.h +++ b/src/parquet/thrift/util.h @@ -82,8 +82,7 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali boost::shared_ptr tmem_transport( new apache::thrift::transport::TMemoryBuffer(const_cast(buf), *len)); apache::thrift::protocol::TCompactProtocolFactoryT< - apache::thrift::transport::TMemoryBuffer> - tproto_factory; + apache::thrift::transport::TMemoryBuffer> tproto_factory; boost::shared_ptr tproto = tproto_factory.getProtocol(tmem_transport); try { @@ -105,8 +104,7 @@ inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { boost::shared_ptr mem_buffer( new apache::thrift::transport::TMemoryBuffer(len)); apache::thrift::protocol::TCompactProtocolFactoryT< - apache::thrift::transport::TMemoryBuffer> - tproto_factory; + apache::thrift::transport::TMemoryBuffer> tproto_factory; boost::shared_ptr tproto = tproto_factory.getProtocol(mem_buffer); try { diff --git a/src/parquet/types.h b/src/parquet/types.h index a4285bec..520326b0 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -117,7 +117,7 @@ struct PageType { // ---------------------------------------------------------------------- struct ByteArray { - ByteArray() {} + ByteArray() : len(0), ptr(nullptr) {} ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {} uint32_t len; const uint8_t* ptr; @@ -132,7 +132,7 @@ struct ByteArray { }; struct FixedLenByteArray { - FixedLenByteArray() {} + FixedLenByteArray() : ptr(nullptr) {} explicit FixedLenByteArray(const uint8_t* ptr) : ptr(ptr) {} const uint8_t* ptr; }; diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt index 52c48113..3a4b1c9c 100644 --- a/src/parquet/util/CMakeLists.txt +++ b/src/parquet/util/CMakeLists.txt @@ -71,6 +71,7 @@ endif() ADD_PARQUET_TEST(bit-util-test) ADD_PARQUET_TEST(buffer-test) +ADD_PARQUET_TEST(comparison-test) ADD_PARQUET_TEST(input-output-test) ADD_PARQUET_TEST(mem-allocator-test) ADD_PARQUET_TEST(mem-pool-test) diff --git a/src/parquet/util/comparison-test.cc b/src/parquet/util/comparison-test.cc new file mode 100644 index 00000000..d2689ff2 --- /dev/null +++ b/src/parquet/util/comparison-test.cc @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include + +#include "parquet/schema/descriptor.h" +#include "parquet/types.h" +#include "parquet/util/comparison.h" + +namespace parquet { + +namespace test { + +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +static ByteArray ByteArrayFromString(const std::string& s) { + auto ptr = reinterpret_cast(s.data()); + return ByteArray(s.size(), ptr); +} + +static FLBA FLBAFromString(const std::string& s) { + auto ptr = reinterpret_cast(s.data()); + return FLBA(ptr); +} + +TEST(Comparison, ByteArray) { + NodePtr node = PrimitiveNode::Make("bytearray", Repetition::REQUIRED, Type::BYTE_ARRAY); + ColumnDescriptor descr(node, 0, 0); + Compare less(&descr); + + std::string a = "arrange"; + std::string b = "arrangement"; + auto arr1 = ByteArrayFromString(a); + auto arr2 = ByteArrayFromString(b); + ASSERT_TRUE(less(arr1, arr2)); + + a = u8"braten"; + b = u8"bügeln"; + auto arr3 = ByteArrayFromString(a); + auto arr4 = ByteArrayFromString(b); + // see PARQUET-686 discussion about binary comparison + ASSERT_TRUE(!less(arr3, arr4)); +} + +TEST(Comparison, FLBA) { + std::string a = "Antidisestablishmentarianism"; + std::string b = "Bundesgesundheitsministerium"; + auto arr1 = FLBAFromString(a); + auto arr2 = FLBAFromString(b); + + NodePtr node = PrimitiveNode::Make("FLBA", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, a.size()); + ColumnDescriptor descr(node, 0, 0); + Compare less(&descr); + ASSERT_TRUE(less(arr1, arr2)); +} + +TEST(Comparison, Int96) { + parquet::Int96 a{{1, 41, 14}}, b{{1, 41, 42}}; + + NodePtr node = PrimitiveNode::Make("int96", Repetition::REQUIRED, Type::INT96); + ColumnDescriptor descr(node, 0, 0); + Compare less(&descr); + ASSERT_TRUE(less(a, b)); + b.value[2] = 14; + ASSERT_TRUE(!less(a, b) && !less(b, a)); +} + +} // namespace test + +} // namespace parquet diff --git a/src/parquet/util/comparison.h b/src/parquet/util/comparison.h new file mode 100644 index 00000000..9d44e7eb --- /dev/null +++ b/src/parquet/util/comparison.h @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_UTIL_COMPARISON_H +#define PARQUET_UTIL_COMPARISON_H + +#include + +#include "parquet/types.h" +#include "parquet/schema/descriptor.h" + +namespace parquet { + +template +struct Compare { + explicit Compare(const ColumnDescriptor* descr) : type_length_(descr->type_length()) {} + + inline bool operator()(const T& a, const T& b) { return a < b; } + + private: + int32_t type_length_; +}; + +template <> +inline bool Compare::operator()(const Int96& a, const Int96& b) { + return std::lexicographical_compare(a.value, a.value + 3, b.value, b.value + 3); +} + +template <> +inline bool Compare::operator()(const ByteArray& a, const ByteArray& b) { + auto aptr = reinterpret_cast(a.ptr); + auto bptr = reinterpret_cast(b.ptr); + return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); +} + +template <> +inline bool Compare::operator()(const FLBA& a, const FLBA& b) { + auto aptr = reinterpret_cast(a.ptr); + auto bptr = reinterpret_cast(b.ptr); + return std::lexicographical_compare( + aptr, aptr + type_length_, bptr, bptr + type_length_); +} + +} // namespace parquet + +#endif // PARQUET_UTIL_COMPARISON_H diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h index 2327aeb5..edadb533 100644 --- a/src/parquet/util/test-common.h +++ b/src/parquet/util/test-common.h @@ -32,8 +32,7 @@ namespace parquet { namespace test { typedef ::testing::Types - ParquetTypes; + DoubleType, ByteArrayType, FLBAType> ParquetTypes; template static inline void assert_vector_equal(const vector& left, const vector& right) {