From 453ecbd838e8aa81f0f7e538dab492a368131d13 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 25 Jan 2019 00:19:24 -0600 Subject: [PATCH 1/7] Refactor encoder and decoder classes to facilitate type-level extensibility Refactor DictionaryDecoder to be more easily extensible Add missing file windows hell Do not use __declspec(dllexport) when building parquet_static. does not fix problems though gcc does not like using the DictEncoder name inside EncoderTraits Change-Id: I67b51494da3e76f8e856a672cdfd29c9e257efbe Add Ben's windows fixes Refactor encoders to use virtual interface, with ability to add additional methods for ByteArray, FixedLenByteArray Factor decoder code into encoding.cc Tests passing again Make DictEncoder templated too for consistency Draft direct read into ChunkedBinaryBuilder Finish implementing DecodeArrow methods for ByteArrayType Fix release build Use dynamic_cast in a couple places that MSVC doesn't like Don't export interfaces that we don't need to debug Change-Id: Ida18b93c1ea1e51216ee007b258635fcac6f54b5 Clearer names, comments, fix segfault that appeared when ZSTD was turned off IWYU cleaning Revert changes to schema.cc to fix msvc build --- .../iwyu/mappings/arrow-misc.imp | 2 +- cpp/src/arrow/array/builder_binary.h | 2 + cpp/src/parquet/CMakeLists.txt | 5 + cpp/src/parquet/arrow/arrow-schema-test.cc | 1 + cpp/src/parquet/arrow/reader.cc | 11 +- cpp/src/parquet/arrow/record_reader.cc | 92 +- cpp/src/parquet/arrow/record_reader.h | 1 - cpp/src/parquet/arrow/schema.cc | 14 +- cpp/src/parquet/arrow/schema.h | 5 +- cpp/src/parquet/arrow/writer.cc | 16 +- cpp/src/parquet/arrow/writer.h | 23 +- cpp/src/parquet/bloom_filter.cc | 7 +- cpp/src/parquet/bloom_filter.h | 9 +- cpp/src/parquet/column-io-benchmark.cc | 1 + cpp/src/parquet/column_reader.cc | 35 +- cpp/src/parquet/column_reader.h | 22 +- cpp/src/parquet/column_scanner.cc | 1 - cpp/src/parquet/column_scanner.h | 6 +- cpp/src/parquet/column_writer-test.cc | 2 + cpp/src/parquet/column_writer.cc | 39 +- cpp/src/parquet/column_writer.h | 16 +- cpp/src/parquet/encoding-benchmark.cc | 99 +- cpp/src/parquet/encoding-internal.h | 854 ----------- cpp/src/parquet/encoding-test.cc | 88 +- cpp/src/parquet/encoding.cc | 1279 +++++++++++++++++ cpp/src/parquet/encoding.h | 273 +++- cpp/src/parquet/file-deserialize-test.cc | 1 + cpp/src/parquet/file_reader.cc | 9 +- cpp/src/parquet/file_reader.h | 15 +- cpp/src/parquet/file_writer.cc | 7 +- cpp/src/parquet/file_writer.h | 22 +- cpp/src/parquet/metadata.cc | 11 +- cpp/src/parquet/metadata.h | 19 +- cpp/src/parquet/printer.cc | 15 + cpp/src/parquet/printer.h | 8 +- cpp/src/parquet/reader-test.cc | 1 + cpp/src/parquet/schema.cc | 4 +- cpp/src/parquet/schema.h | 14 +- cpp/src/parquet/statistics.cc | 19 +- cpp/src/parquet/test-util.h | 42 +- cpp/src/parquet/util/visibility.h | 14 +- 41 files changed, 1915 insertions(+), 1189 deletions(-) delete mode 100644 cpp/src/parquet/encoding-internal.h create mode 100644 cpp/src/parquet/encoding.cc diff --git a/cpp/build-support/iwyu/mappings/arrow-misc.imp b/cpp/build-support/iwyu/mappings/arrow-misc.imp index 8bb65e62d98e3..7ff99108c5aff 100644 --- a/cpp/build-support/iwyu/mappings/arrow-misc.imp +++ b/cpp/build-support/iwyu/mappings/arrow-misc.imp @@ -49,7 +49,7 @@ { symbol: ["shared_ptr", private, "", public ] }, { symbol: ["_Node_const_iterator", private, "", public ] }, { symbol: ["unordered_map<>::mapped_type", private, "", public ] }, - { symbol: ["move", private, "", public ] }, + { symbol: ["std::move", private, "", public ] }, { symbol: ["pair", private, "", public ] }, { symbol: ["errno", private, "", public ] }, { symbol: ["posix_memalign", private, "", public ] } diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h index 324279daa4a6e..abd8387f8094c 100644 --- a/cpp/src/arrow/array/builder_binary.h +++ b/cpp/src/arrow/array/builder_binary.h @@ -281,6 +281,8 @@ class ARROW_EXPORT ChunkedBinaryBuilder { return builder_->AppendNull(); } + Status Reserve(int64_t values) { return builder_->Reserve(values); } + virtual Status Finish(ArrayVector* out); protected: diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 18cc7e01c91d8..6f4a7c922f10f 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -172,6 +172,7 @@ set(PARQUET_SRCS column_reader.cc column_scanner.cc column_writer.cc + encoding.cc file_reader.cc file_writer.cc metadata.cc @@ -269,6 +270,10 @@ foreach(LIB_TARGET ${PARQUET_LIBRARIES}) endif() endforeach() +if (ARROW_BUILD_STATIC AND WIN32) + target_compile_definitions(parquet_static PUBLIC PARQUET_STATIC) +endif() + add_subdirectory(api) add_subdirectory(arrow) add_subdirectory(util) diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc index cb2b8508e66a5..73de8b1c456c9 100644 --- a/cpp/src/parquet/arrow/arrow-schema-test.cc +++ b/cpp/src/parquet/arrow/arrow-schema-test.cc @@ -21,6 +21,7 @@ #include "gtest/gtest.h" #include "parquet/arrow/schema.h" +#include "parquet/schema.h" #include "arrow/api.h" #include "arrow/test-util.h" diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 58c703f7fe068..0b60c66f9a2bc 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -21,13 +21,18 @@ #include #include #include -#include -#include #include #include #include -#include "arrow/api.h" +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/builder.h" +#include "arrow/record_batch.h" +#include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" #include "arrow/util/bit-util.h" #include "arrow/util/int-util.h" #include "arrow/util/logging.h" diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc index 4a988dacdd9aa..5eb1121836acd 100644 --- a/cpp/src/parquet/arrow/record_reader.cc +++ b/cpp/src/parquet/arrow/record_reader.cc @@ -1,4 +1,4 @@ -// licensed to the Apache Software Foundation (ASF) under one +// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -22,25 +22,20 @@ #include #include #include -#include #include #include +#include "arrow/array.h" #include "arrow/buffer.h" #include "arrow/builder.h" -#include "arrow/memory_pool.h" -#include "arrow/status.h" #include "arrow/type.h" #include "arrow/util/bit-util.h" #include "arrow/util/logging.h" -#include "arrow/util/rle-encoding.h" #include "parquet/column_page.h" #include "parquet/column_reader.h" -#include "parquet/encoding-internal.h" #include "parquet/encoding.h" #include "parquet/exception.h" -#include "parquet/properties.h" #include "parquet/schema.h" #include "parquet/types.h" @@ -51,9 +46,6 @@ namespace internal { namespace BitUtil = ::arrow::BitUtil; -template -class TypedRecordReader; - // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index // encoding. static bool IsDictionaryIndexEncoding(const Encoding::type& e) { @@ -80,9 +72,12 @@ class RecordReader::RecordReaderImpl { null_count_(0), levels_written_(0), levels_position_(0), - levels_capacity_(0) { + levels_capacity_(0), + uses_values_(!(descr->physical_type() == Type::BYTE_ARRAY)) { nullable_values_ = internal::HasSpacedValues(descr); - values_ = AllocateBuffer(pool); + if (uses_values_) { + values_ = AllocateBuffer(pool); + } valid_bits_ = AllocateBuffer(pool); def_levels_ = AllocateBuffer(pool); rep_levels_ = AllocateBuffer(pool); @@ -210,9 +205,13 @@ class RecordReader::RecordReaderImpl { bool nullable_values() const { return nullable_values_; } std::shared_ptr ReleaseValues() { - auto result = values_; - values_ = AllocateBuffer(pool_); - return result; + if (uses_values_) { + auto result = values_; + values_ = AllocateBuffer(pool_); + return result; + } else { + return nullptr; + } } std::shared_ptr ReleaseIsValid() { @@ -324,7 +323,13 @@ class RecordReader::RecordReaderImpl { } int type_size = GetTypeByteSize(descr_->physical_type()); - PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false)); + + // XXX(wesm): A hack to avoid memory allocation when reading directly + // into builder classes + if (uses_values_) { + PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false)); + } + values_capacity_ = new_values_capacity; } if (nullable_values_) { @@ -371,7 +376,9 @@ class RecordReader::RecordReaderImpl { void ResetValues() { if (values_written_ > 0) { // Resize to 0, but do not shrink to fit - PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + if (uses_values_) { + PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + } PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); values_written_ = 0; values_capacity_ = 0; @@ -427,6 +434,7 @@ class RecordReader::RecordReaderImpl { int64_t levels_capacity_; std::shared_ptr<::arrow::ResizableBuffer> values_; + bool uses_values_; template T* ValuesHead() { @@ -559,12 +567,12 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { } private: - typedef Decoder DecoderType; + using DecoderType = typename EncodingTraits::Decoder; // Map of encoding type to the respective decoder object. For example, a // column chunk's data pages may include both dictionary-encoded and // plain-encoded data. - std::unordered_map> decoders_; + std::unordered_map> decoders_; std::unique_ptr builder_; @@ -620,15 +628,9 @@ ::arrow::ArrayVector TypedRecordReader::GetBuilderChunks() { template <> inline void TypedRecordReader::ReadValuesDense(int64_t values_to_read) { - auto values = ValuesHead(); - int64_t num_decoded = - current_decoder_->Decode(values, static_cast(values_to_read)); + int64_t num_decoded = current_decoder_->DecodeArrowNonNull( + static_cast(values_to_read), builder_.get()); DCHECK_EQ(num_decoded, values_to_read); - - for (int64_t i = 0; i < num_decoded; i++) { - PARQUET_THROW_NOT_OK( - builder_->Append(values[i].ptr, static_cast(values[i].len))); - } ResetValues(); } @@ -648,23 +650,10 @@ inline void TypedRecordReader::ReadValuesDense(int64_t values_to_read) template <> inline void TypedRecordReader::ReadValuesSpaced(int64_t values_to_read, int64_t null_count) { - uint8_t* valid_bits = valid_bits_->mutable_data(); - const int64_t valid_bits_offset = values_written_; - auto values = ValuesHead(); - - int64_t num_decoded = current_decoder_->DecodeSpaced( - values, static_cast(values_to_read), static_cast(null_count), valid_bits, - valid_bits_offset); + int64_t num_decoded = current_decoder_->DecodeArrow( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), values_written_, builder_.get()); DCHECK_EQ(num_decoded, values_to_read); - - for (int64_t i = 0; i < num_decoded; i++) { - if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { - PARQUET_THROW_NOT_OK( - builder_->Append(values[i].ptr, static_cast(values[i].len))); - } else { - PARQUET_THROW_NOT_OK(builder_->AppendNull()); - } - } ResetValues(); } @@ -705,8 +694,8 @@ inline void TypedRecordReader::ConfigureDictionary(const DictionaryPage* if (page->encoding() == Encoding::PLAIN_DICTIONARY || page->encoding() == Encoding::PLAIN) { - PlainDecoder dictionary(descr_); - dictionary.SetData(page->num_values(), page->data(), page->size()); + auto dictionary = MakeTypedDecoder(Encoding::PLAIN, descr_); + dictionary->SetData(page->num_values(), page->data(), page->size()); // The dictionary is fully decoded during DictionaryDecoder::Init, so the // DictionaryPage buffer is no longer required after this step @@ -714,14 +703,16 @@ inline void TypedRecordReader::ConfigureDictionary(const DictionaryPage* // TODO(wesm): investigate whether this all-or-nothing decoding of the // dictionary makes sense and whether performance can be improved - auto decoder = std::make_shared>(descr_, pool_); - decoder->SetDict(&dictionary); - decoders_[encoding] = decoder; + std::unique_ptr> decoder = MakeDictDecoder(descr_, pool_); + decoder->SetDict(dictionary.get()); + decoders_[encoding] = + std::unique_ptr(dynamic_cast(decoder.release())); } else { ParquetException::NYI("only plain dictionary encoding has been implemented"); } current_decoder_ = decoders_[encoding].get(); + DCHECK(current_decoder_); } template @@ -787,6 +778,7 @@ bool TypedRecordReader::ReadNewPage() { auto it = decoders_.find(static_cast(encoding)); if (it != decoders_.end()) { + DCHECK(it->second.get() != nullptr); if (encoding == Encoding::RLE_DICTIONARY) { DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); } @@ -794,9 +786,9 @@ bool TypedRecordReader::ReadNewPage() { } else { switch (encoding) { case Encoding::PLAIN: { - std::shared_ptr decoder(new PlainDecoder(descr_)); - decoders_[static_cast(encoding)] = decoder; + auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_); current_decoder_ = decoder.get(); + decoders_[static_cast(encoding)] = std::move(decoder); break; } case Encoding::RLE_DICTIONARY: diff --git a/cpp/src/parquet/arrow/record_reader.h b/cpp/src/parquet/arrow/record_reader.h index 0f62b744f323a..cc932c2865028 100644 --- a/cpp/src/parquet/arrow/record_reader.h +++ b/cpp/src/parquet/arrow/record_reader.h @@ -24,7 +24,6 @@ #include "arrow/memory_pool.h" -#include "parquet/util/macros.h" #include "parquet/util/memory.h" namespace arrow { diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index fed0e59dfa330..f1ebad0e5667f 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -19,14 +19,20 @@ #include #include +#include #include -#include "parquet/api/schema.h" -#include "parquet/util/schema-util.h" - -#include "arrow/api.h" +#include "arrow/array.h" +#include "arrow/status.h" +#include "arrow/type.h" #include "arrow/util/logging.h" +#include "parquet/arrow/writer.h" +#include "parquet/exception.h" +#include "parquet/properties.h" +#include "parquet/types.h" +#include "parquet/util/schema-util.h" + using arrow::Field; using arrow::Status; diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h index 649fe86120a18..0e65ed844eb58 100644 --- a/cpp/src/parquet/arrow/schema.h +++ b/cpp/src/parquet/arrow/schema.h @@ -22,15 +22,14 @@ #include #include -#include "arrow/api.h" - -#include "parquet/arrow/writer.h" #include "parquet/metadata.h" #include "parquet/schema.h" #include "parquet/util/visibility.h" namespace arrow { +class Field; +class Schema; class Status; } // namespace arrow diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index a5c0a62994b1b..6813880f3b0e3 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -18,17 +18,29 @@ #include "parquet/arrow/writer.h" #include -#include +#include +#include #include #include -#include "arrow/api.h" +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/builder.h" #include "arrow/compute/api.h" +#include "arrow/status.h" +#include "arrow/table.h" #include "arrow/util/bit-util.h" +#include "arrow/util/checked_cast.h" #include "arrow/visitor_inline.h" #include "arrow/util/logging.h" + #include "parquet/arrow/schema.h" +#include "parquet/column_writer.h" +#include "parquet/exception.h" +#include "parquet/file_writer.h" +#include "parquet/schema.h" +#include "parquet/util/memory.h" using arrow::Array; using arrow::BinaryArray; diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 50cb4cfea7d8d..ab3d7e815cc9a 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -18,26 +18,37 @@ #ifndef PARQUET_ARROW_WRITER_H #define PARQUET_ARROW_WRITER_H +#include #include -#include "parquet/api/schema.h" -#include "parquet/api/writer.h" +#include "parquet/properties.h" +#include "parquet/types.h" +#include "parquet/util/visibility.h" -#include "arrow/io/interfaces.h" #include "arrow/type.h" namespace arrow { class Array; +class ChunkedArray; class MemoryPool; -class PrimitiveArray; -class Schema; class Status; -class StringArray; class Table; + +namespace io { + +class OutputStream; + +} // namespace io + } // namespace arrow namespace parquet { + +class FileMetaData; +class OutputStream; +class ParquetFileWriter; + namespace arrow { class PARQUET_EXPORT ArrowWriterProperties { diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc index 31a33fa782a7b..8f5f695fde71f 100644 --- a/cpp/src/parquet/bloom_filter.cc +++ b/cpp/src/parquet/bloom_filter.cc @@ -15,17 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include -#include #include +#include -#include "arrow/status.h" +#include "arrow/buffer.h" +#include "arrow/memory_pool.h" #include "arrow/util/bit-util.h" #include "arrow/util/logging.h" #include "parquet/bloom_filter.h" #include "parquet/exception.h" #include "parquet/murmur3.h" -#include "parquet/types.h" namespace parquet { constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock]; diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h index 9ba895c0b3329..a586dc2dcced6 100644 --- a/cpp/src/parquet/bloom_filter.h +++ b/cpp/src/parquet/bloom_filter.h @@ -18,19 +18,24 @@ #ifndef PARQUET_BLOOM_FILTER_H #define PARQUET_BLOOM_FILTER_H +#include #include #include #include "arrow/util/bit-util.h" #include "arrow/util/logging.h" -#include "parquet/exception.h" #include "parquet/hasher.h" #include "parquet/types.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" +namespace arrow { + +class MemoryPool; + +} // namespace arrow + namespace parquet { -class OutputStream; // A Bloom filter is a compact structure to indicate whether an item is not in a set or // probably in a set. The Bloom filter usually consists of a bit set that represents a diff --git a/cpp/src/parquet/column-io-benchmark.cc b/cpp/src/parquet/column-io-benchmark.cc index bb056c1339b97..3e32b2a837815 100644 --- a/cpp/src/parquet/column-io-benchmark.cc +++ b/cpp/src/parquet/column-io-benchmark.cc @@ -20,6 +20,7 @@ #include "parquet/column_reader.h" #include "parquet/column_writer.h" #include "parquet/file_reader.h" +#include "parquet/metadata.h" #include "parquet/thrift.h" #include "parquet/util/memory.h" diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 7fbf9babd71fa..113d50a40aada 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -17,20 +17,22 @@ #include "parquet/column_reader.h" -#include #include +#include +#include #include -#include -#include -#include -#include -#include -#include +#include "arrow/buffer.h" +#include "arrow/util/bit-stream-utils.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/compression.h" +#include "arrow/util/logging.h" +#include "arrow/util/rle-encoding.h" #include "parquet/column_page.h" -#include "parquet/encoding-internal.h" +#include "parquet/encoding.h" #include "parquet/properties.h" +#include "parquet/statistics.h" #include "parquet/thrift.h" using arrow::MemoryPool; @@ -290,18 +292,17 @@ void TypedColumnReader::ConfigureDictionary(const DictionaryPage* page) { if (page->encoding() == Encoding::PLAIN_DICTIONARY || page->encoding() == Encoding::PLAIN) { - PlainDecoder dictionary(descr_); - dictionary.SetData(page->num_values(), page->data(), page->size()); + auto dictionary = MakeTypedDecoder(Encoding::PLAIN, descr_); + dictionary->SetData(page->num_values(), page->data(), page->size()); - // The dictionary is fully decoded during DictionaryDecoder::Init, so the + // The dictionary is fully decoded during SetData, so the // DictionaryPage buffer is no longer required after this step // // TODO(wesm): investigate whether this all-or-nothing decoding of the // dictionary makes sense and whether performance can be improved - - auto decoder = std::make_shared>(descr_, pool_); - decoder->SetDict(&dictionary); - decoders_[encoding] = decoder; + auto decoder = MakeDictDecoder(descr_, pool_); + decoder->SetDict(dictionary.get()); + decoders_[encoding] = std::move(decoder); } else { ParquetException::NYI("only plain dictionary encoding has been implemented"); } @@ -385,9 +386,9 @@ bool TypedColumnReader::ReadNewPage() { } else { switch (encoding) { case Encoding::PLAIN: { - std::shared_ptr decoder(new PlainDecoder(descr_)); - decoders_[static_cast(encoding)] = decoder; + auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_); current_decoder_ = decoder.get(); + decoders_[static_cast(encoding)] = std::move(decoder); break; } case Encoding::RLE_DICTIONARY: diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 42bf900c97932..19513c210d327 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -19,26 +19,20 @@ #define PARQUET_COLUMN_READER_H #include -#include #include -#include -#include #include #include #include -#include -#include -#include -#include -#include +#include "arrow/buffer.h" +#include "arrow/memory_pool.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/macros.h" -#include "parquet/column_page.h" #include "parquet/encoding.h" #include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/types.h" -#include "parquet/util/macros.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" @@ -56,6 +50,9 @@ class RleDecoder; namespace parquet { +class DictionaryPage; +class Page; + // 16 MB is the default maximum page header size static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; @@ -290,7 +287,7 @@ class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnReader : public ColumnReader { int64_t Skip(int64_t num_rows_to_skip); private: - typedef Decoder DecoderType; + using DecoderType = TypedDecoder; // Advance to the next data page bool ReadNewPage() override; @@ -312,10 +309,9 @@ class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnReader : public ColumnReader { // Map of encoding type to the respective decoder object. For example, a // column chunk's data pages may include both dictionary-encoded and // plain-encoded data. - std::unordered_map> decoders_; + std::unordered_map> decoders_; void ConfigureDictionary(const DictionaryPage* page); - DecoderType* current_decoder_; }; diff --git a/cpp/src/parquet/column_scanner.cc b/cpp/src/parquet/column_scanner.cc index 51c87732959fb..8011318a78c9a 100644 --- a/cpp/src/parquet/column_scanner.cc +++ b/cpp/src/parquet/column_scanner.cc @@ -21,7 +21,6 @@ #include #include "parquet/column_reader.h" -#include "parquet/util/memory.h" using arrow::MemoryPool; diff --git a/cpp/src/parquet/column_scanner.h b/cpp/src/parquet/column_scanner.h index f23c86173cb32..cb0da2c9e18f1 100644 --- a/cpp/src/parquet/column_scanner.h +++ b/cpp/src/parquet/column_scanner.h @@ -25,11 +25,13 @@ #include #include +#include "arrow/buffer.h" +#include "arrow/memory_pool.h" + #include "parquet/column_reader.h" #include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/types.h" -#include "parquet/util/macros.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" @@ -87,7 +89,7 @@ class PARQUET_EXPORT Scanner { }; template -class PARQUET_EXPORT TypedScanner : public Scanner { +class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner { public: typedef typename DType::c_type T; diff --git a/cpp/src/parquet/column_writer-test.cc b/cpp/src/parquet/column_writer-test.cc index a7671dbcdc777..1f034b622719a 100644 --- a/cpp/src/parquet/column_writer-test.cc +++ b/cpp/src/parquet/column_writer-test.cc @@ -21,6 +21,8 @@ #include "parquet/column_reader.h" #include "parquet/column_writer.h" +#include "parquet/metadata.h" +#include "parquet/properties.h" #include "parquet/test-specialization.h" #include "parquet/test-util.h" #include "parquet/thrift.h" diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 6996757b172d7..0919a3f1d7a65 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -17,23 +17,32 @@ #include "parquet/column_writer.h" +#include #include #include #include +#include "arrow/status.h" +#include "arrow/util/bit-stream-utils.h" #include "arrow/util/bit-util.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" #include "arrow/util/logging.h" #include "arrow/util/rle-encoding.h" -#include "parquet/encoding-internal.h" +#include "parquet/metadata.h" #include "parquet/properties.h" #include "parquet/statistics.h" #include "parquet/thrift.h" +#include "parquet/types.h" #include "parquet/util/memory.h" namespace parquet { +namespace BitUtil = ::arrow::BitUtil; + +using ::arrow::internal::checked_cast; + using BitWriter = ::arrow::BitUtil::BitWriter; using RleEncoder = ::arrow::util::RleEncoder; @@ -538,13 +547,8 @@ TypedColumnWriter::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, Encoding::type encoding, const WriterProperties* properties) : ColumnWriter(metadata, std::move(pager), use_dictionary, encoding, properties) { - if (use_dictionary) { - current_encoder_.reset(new DictEncoder(descr_, properties->memory_pool())); - } else if (encoding == Encoding::PLAIN) { - current_encoder_.reset(new PlainEncoder(descr_, properties->memory_pool())); - } else { - ParquetException::NYI("Selected encoding is not supported"); - } + current_encoder_ = MakeEncoder(Type::type_num, encoding, use_dictionary, descr_, + properties->memory_pool()); if (properties->statistics_enabled(descr_->path()) && (SortOrder::UNKNOWN != descr_->sort_order())) { @@ -557,21 +561,27 @@ TypedColumnWriter::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, // Fallback to PLAIN if dictionary page limit is reached. template void TypedColumnWriter::CheckDictionarySizeLimit() { - auto dict_encoder = static_cast*>(current_encoder_.get()); + // We have to dynamic cast here because TypedEncoder as some compilers + // don't want to cast through virtual inheritance + auto dict_encoder = dynamic_cast*>(current_encoder_.get()); if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) { WriteDictionaryPage(); // Serialize the buffered Dictionary Indicies FlushBufferedDataPages(); fallback_ = true; // Only PLAIN encoding is supported for fallback in V1 - current_encoder_.reset(new PlainEncoder(descr_, properties_->memory_pool())); + current_encoder_ = MakeEncoder(Type::type_num, Encoding::PLAIN, false, descr_, + properties_->memory_pool()); encoding_ = Encoding::PLAIN; } } template void TypedColumnWriter::WriteDictionaryPage() { - auto dict_encoder = static_cast*>(current_encoder_.get()); + // We have to dynamic cast here because TypedEncoder as some compilers + // don't want to cast through virtual inheritance + auto dict_encoder = dynamic_cast*>(current_encoder_.get()); + DCHECK(dict_encoder); std::shared_ptr buffer = AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size()); dict_encoder->WriteDict(buffer->mutable_data()); @@ -836,7 +846,8 @@ void TypedColumnWriter::WriteBatchSpaced( template void TypedColumnWriter::WriteValues(int64_t num_values, const T* values) { - current_encoder_->Put(values, static_cast(num_values)); + dynamic_cast(current_encoder_.get()) + ->Put(values, static_cast(num_values)); } template @@ -844,8 +855,8 @@ void TypedColumnWriter::WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) { - current_encoder_->PutSpaced(values, static_cast(num_values), valid_bits, - valid_bits_offset); + dynamic_cast(current_encoder_.get()) + ->PutSpaced(values, static_cast(num_values), valid_bits, valid_bits_offset); } template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter; diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 71e01f8442c4f..254bf0dd02e50 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -17,17 +17,18 @@ #pragma once +#include #include #include +#include "arrow/memory_pool.h" + #include "parquet/column_page.h" #include "parquet/encoding.h" -#include "parquet/metadata.h" -#include "parquet/properties.h" +#include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/statistics.h" #include "parquet/types.h" -#include "parquet/util/macros.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" @@ -45,6 +46,9 @@ class RleEncoder; namespace parquet { +class ColumnChunkMetaDataBuilder; +class WriterProperties; + class PARQUET_EXPORT LevelEncoder { public: LevelEncoder(); @@ -297,13 +301,13 @@ class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter : public ColumnWriter { int64_t valid_bits_offset, const T* values, int64_t* num_spaced_written); - typedef Encoder EncoderType; - // Write values to a temporary buffer before they are encoded into pages void WriteValues(int64_t num_values, const T* values); void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values); - std::unique_ptr current_encoder_; + + using ValueEncoderType = typename EncodingTraits::Encoder; + std::unique_ptr current_encoder_; typedef TypedRowGroupStatistics TypedStats; std::unique_ptr page_statistics_; diff --git a/cpp/src/parquet/encoding-benchmark.cc b/cpp/src/parquet/encoding-benchmark.cc index 48183b4f9755d..8031aeb7ce168 100644 --- a/cpp/src/parquet/encoding-benchmark.cc +++ b/cpp/src/parquet/encoding-benchmark.cc @@ -17,7 +17,8 @@ #include "benchmark/benchmark.h" -#include "parquet/encoding-internal.h" +#include "parquet/encoding.h" +#include "parquet/schema.h" #include "parquet/util/memory.h" using arrow::default_memory_pool; @@ -27,39 +28,39 @@ namespace parquet { using schema::PrimitiveNode; -namespace benchmark { - std::shared_ptr Int64Schema(Repetition::type repetition) { auto node = PrimitiveNode::Make("int64", repetition, Type::INT64); return std::make_shared(node, repetition != Repetition::REQUIRED, repetition == Repetition::REPEATED); } -static void BM_PlainEncodingBoolean(::benchmark::State& state) { +static void BM_PlainEncodingBoolean(benchmark::State& state) { std::vector values(state.range(0), true); - PlainEncoder encoder(nullptr); + auto encoder = MakeEncoder(Type::BOOLEAN, Encoding::PLAIN); + auto typed_encoder = dynamic_cast(encoder.get()); while (state.KeepRunning()) { - encoder.Put(values, static_cast(values.size())); - encoder.FlushValues(); + typed_encoder->Put(values, static_cast(values.size())); + typed_encoder->FlushValues(); } state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(bool)); } BENCHMARK(BM_PlainEncodingBoolean)->Range(1024, 65536); -static void BM_PlainDecodingBoolean(::benchmark::State& state) { +static void BM_PlainDecodingBoolean(benchmark::State& state) { std::vector values(state.range(0), true); bool* output = new bool[state.range(0)]; - PlainEncoder encoder(nullptr); - encoder.Put(values, static_cast(values.size())); - std::shared_ptr buf = encoder.FlushValues(); + auto encoder = MakeEncoder(Type::BOOLEAN, Encoding::PLAIN); + auto typed_encoder = dynamic_cast(encoder.get()); + typed_encoder->Put(values, static_cast(values.size())); + std::shared_ptr buf = encoder->FlushValues(); while (state.KeepRunning()) { - PlainDecoder decoder(nullptr); - decoder.SetData(static_cast(values.size()), buf->data(), - static_cast(buf->size())); - decoder.Decode(output, static_cast(values.size())); + auto decoder = MakeTypedDecoder(Encoding::PLAIN); + decoder->SetData(static_cast(values.size()), buf->data(), + static_cast(buf->size())); + decoder->Decode(output, static_cast(values.size())); } state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(bool)); @@ -68,30 +69,29 @@ static void BM_PlainDecodingBoolean(::benchmark::State& state) { BENCHMARK(BM_PlainDecodingBoolean)->Range(1024, 65536); -static void BM_PlainEncodingInt64(::benchmark::State& state) { +static void BM_PlainEncodingInt64(benchmark::State& state) { std::vector values(state.range(0), 64); - PlainEncoder encoder(nullptr); - + auto encoder = MakeTypedEncoder(Encoding::PLAIN); while (state.KeepRunning()) { - encoder.Put(values.data(), static_cast(values.size())); - encoder.FlushValues(); + encoder->Put(values.data(), static_cast(values.size())); + encoder->FlushValues(); } state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int64_t)); } BENCHMARK(BM_PlainEncodingInt64)->Range(1024, 65536); -static void BM_PlainDecodingInt64(::benchmark::State& state) { +static void BM_PlainDecodingInt64(benchmark::State& state) { std::vector values(state.range(0), 64); - PlainEncoder encoder(nullptr); - encoder.Put(values.data(), static_cast(values.size())); - std::shared_ptr buf = encoder.FlushValues(); + auto encoder = MakeTypedEncoder(Encoding::PLAIN); + encoder->Put(values.data(), static_cast(values.size())); + std::shared_ptr buf = encoder->FlushValues(); while (state.KeepRunning()) { - PlainDecoder decoder(nullptr); - decoder.SetData(static_cast(values.size()), buf->data(), - static_cast(buf->size())); - decoder.Decode(values.data(), static_cast(values.size())); + auto decoder = MakeTypedDecoder(Encoding::PLAIN); + decoder->SetData(static_cast(values.size()), buf->data(), + static_cast(buf->size())); + decoder->Decode(values.data(), static_cast(values.size())); } state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int64_t)); } @@ -100,44 +100,47 @@ BENCHMARK(BM_PlainDecodingInt64)->Range(1024, 65536); template static void DecodeDict(std::vector& values, - ::benchmark::State& state) { + benchmark::State& state) { typedef typename Type::c_type T; int num_values = static_cast(values.size()); MemoryPool* allocator = default_memory_pool(); std::shared_ptr descr = Int64Schema(Repetition::REQUIRED); - DictEncoder encoder(descr.get(), allocator); - for (int i = 0; i < num_values; ++i) { - encoder.Put(values[i]); - } + auto base_encoder = + MakeEncoder(Type::type_num, Encoding::PLAIN, true, descr.get(), allocator); + auto encoder = + dynamic_cast::Encoder*>(base_encoder.get()); + auto dict_traits = dynamic_cast*>(base_encoder.get()); + encoder->Put(values.data(), num_values); std::shared_ptr dict_buffer = - AllocateBuffer(allocator, encoder.dict_encoded_size()); + AllocateBuffer(allocator, dict_traits->dict_encoded_size()); std::shared_ptr indices = - AllocateBuffer(allocator, encoder.EstimatedDataEncodedSize()); + AllocateBuffer(allocator, encoder->EstimatedDataEncodedSize()); - encoder.WriteDict(dict_buffer->mutable_data()); - int actual_bytes = - encoder.WriteIndices(indices->mutable_data(), static_cast(indices->size())); + dict_traits->WriteDict(dict_buffer->mutable_data()); + int actual_bytes = dict_traits->WriteIndices(indices->mutable_data(), + static_cast(indices->size())); PARQUET_THROW_NOT_OK(indices->Resize(actual_bytes)); while (state.KeepRunning()) { - PlainDecoder dict_decoder(descr.get()); - dict_decoder.SetData(encoder.num_entries(), dict_buffer->data(), - static_cast(dict_buffer->size())); - DictionaryDecoder decoder(descr.get()); - decoder.SetDict(&dict_decoder); - decoder.SetData(num_values, indices->data(), static_cast(indices->size())); - decoder.Decode(values.data(), num_values); + auto dict_decoder = MakeTypedDecoder(Encoding::PLAIN, descr.get()); + dict_decoder->SetData(dict_traits->num_entries(), dict_buffer->data(), + static_cast(dict_buffer->size())); + + auto decoder = MakeDictDecoder(descr.get()); + decoder->SetDict(dict_decoder.get()); + decoder->SetData(num_values, indices->data(), static_cast(indices->size())); + decoder->Decode(values.data(), num_values); } state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(T)); } -static void BM_DictDecodingInt64_repeats(::benchmark::State& state) { +static void BM_DictDecodingInt64_repeats(benchmark::State& state) { typedef Int64Type Type; typedef typename Type::c_type T; @@ -147,7 +150,7 @@ static void BM_DictDecodingInt64_repeats(::benchmark::State& state) { BENCHMARK(BM_DictDecodingInt64_repeats)->Range(1024, 65536); -static void BM_DictDecodingInt64_literals(::benchmark::State& state) { +static void BM_DictDecodingInt64_literals(benchmark::State& state) { typedef Int64Type Type; typedef typename Type::c_type T; @@ -160,6 +163,4 @@ static void BM_DictDecodingInt64_literals(::benchmark::State& state) { BENCHMARK(BM_DictDecodingInt64_literals)->Range(1024, 65536); -} // namespace benchmark - } // namespace parquet diff --git a/cpp/src/parquet/encoding-internal.h b/cpp/src/parquet/encoding-internal.h deleted file mode 100644 index 8611855383ce8..0000000000000 --- a/cpp/src/parquet/encoding-internal.h +++ /dev/null @@ -1,854 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_ENCODING_INTERNAL_H -#define PARQUET_ENCODING_INTERNAL_H - -#include -#include -#include -#include -#include -#include - -#include "arrow/util/bit-stream-utils.h" -#include "arrow/util/bit-util.h" -#include "arrow/util/hashing.h" -#include "arrow/util/macros.h" -#include "arrow/util/rle-encoding.h" - -#include "parquet/encoding.h" -#include "parquet/exception.h" -#include "parquet/schema.h" -#include "parquet/types.h" -#include "parquet/util/memory.h" - -namespace parquet { - -namespace BitUtil = ::arrow::BitUtil; - -class ColumnDescriptor; - -// ---------------------------------------------------------------------- -// Encoding::PLAIN decoder implementation - -template -class PlainDecoder : public Decoder { - public: - typedef typename DType::c_type T; - using Decoder::num_values_; - - explicit PlainDecoder(const ColumnDescriptor* descr) - : Decoder(descr, Encoding::PLAIN), data_(nullptr), len_(0) { - if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) { - type_length_ = descr_->type_length(); - } else { - type_length_ = -1; - } - } - - virtual void SetData(int num_values, const uint8_t* data, int len) { - num_values_ = num_values; - data_ = data; - len_ = len; - } - - virtual int Decode(T* buffer, int max_values); - - private: - using Decoder::descr_; - const uint8_t* data_; - int len_; - int type_length_; -}; - -// Decode routine templated on C++ type rather than type enum -template -inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, - int type_length, T* out) { - int bytes_to_decode = num_values * static_cast(sizeof(T)); - if (data_size < bytes_to_decode) { - ParquetException::EofException(); - } - // If bytes_to_decode == 0, data could be null - if (bytes_to_decode > 0) { - memcpy(out, data, bytes_to_decode); - } - return bytes_to_decode; -} - -// Template specialization for BYTE_ARRAY. The written values do not own their -// own data. -template <> -inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, - int type_length, ByteArray* out) { - int bytes_decoded = 0; - int increment; - for (int i = 0; i < num_values; ++i) { - uint32_t len = out[i].len = *reinterpret_cast(data); - increment = static_cast(sizeof(uint32_t) + len); - if (data_size < increment) ParquetException::EofException(); - out[i].ptr = data + sizeof(uint32_t); - data += increment; - data_size -= increment; - bytes_decoded += increment; - } - return bytes_decoded; -} - -// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not -// own their own data. -template <> -inline int DecodePlain(const uint8_t* data, int64_t data_size, - int num_values, int type_length, - FixedLenByteArray* out) { - int bytes_to_decode = type_length * num_values; - if (data_size < bytes_to_decode) { - ParquetException::EofException(); - } - for (int i = 0; i < num_values; ++i) { - out[i].ptr = data; - data += type_length; - data_size -= type_length; - } - return bytes_to_decode; -} - -template -inline int PlainDecoder::Decode(T* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - int bytes_consumed = DecodePlain(data_, len_, max_values, type_length_, buffer); - data_ += bytes_consumed; - len_ -= bytes_consumed; - num_values_ -= max_values; - return max_values; -} - -template <> -class PlainDecoder : public Decoder { - public: - explicit PlainDecoder(const ColumnDescriptor* descr) - : Decoder(descr, Encoding::PLAIN) {} - - virtual void SetData(int num_values, const uint8_t* data, int len) { - num_values_ = num_values; - bit_reader_ = BitUtil::BitReader(data, len); - } - - // Two flavors of bool decoding - int Decode(uint8_t* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - bool val; - ::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values); - for (int i = 0; i < max_values; ++i) { - if (!bit_reader_.GetValue(1, &val)) { - ParquetException::EofException(); - } - if (val) { - bit_writer.Set(); - } - bit_writer.Next(); - } - bit_writer.Finish(); - num_values_ -= max_values; - return max_values; - } - - virtual int Decode(bool* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - if (bit_reader_.GetBatch(1, buffer, max_values) != max_values) { - ParquetException::EofException(); - } - num_values_ -= max_values; - return max_values; - } - - private: - BitUtil::BitReader bit_reader_; -}; - -// ---------------------------------------------------------------------- -// Encoding::PLAIN encoder implementation - -template -class PlainEncoder : public Encoder { - public: - typedef typename DType::c_type T; - - explicit PlainEncoder(const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) - : Encoder(descr, Encoding::PLAIN, pool) { - values_sink_.reset(new InMemoryOutputStream(pool)); - } - - int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); } - - std::shared_ptr FlushValues() override; - void Put(const T* src, int num_values) override; - - protected: - std::unique_ptr values_sink_; -}; - -template <> -class PlainEncoder : public Encoder { - public: - explicit PlainEncoder(const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) - : Encoder(descr, Encoding::PLAIN, pool), - bits_available_(kInMemoryDefaultCapacity * 8), - bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)), - values_sink_(new InMemoryOutputStream(pool)) { - bit_writer_.reset(new BitUtil::BitWriter(bits_buffer_->mutable_data(), - static_cast(bits_buffer_->size()))); - } - - int64_t EstimatedDataEncodedSize() override { - return values_sink_->Tell() + bit_writer_->bytes_written(); - } - - std::shared_ptr FlushValues() override { - if (bits_available_ > 0) { - bit_writer_->Flush(); - values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); - bit_writer_->Clear(); - bits_available_ = static_cast(bits_buffer_->size()) * 8; - } - - std::shared_ptr buffer = values_sink_->GetBuffer(); - values_sink_.reset(new InMemoryOutputStream(this->pool_)); - return buffer; - } - -#define PLAINDECODER_BOOLEAN_PUT(input_type, function_attributes) \ - void Put(input_type src, int num_values) function_attributes { \ - int bit_offset = 0; \ - if (bits_available_ > 0) { \ - int bits_to_write = std::min(bits_available_, num_values); \ - for (int i = 0; i < bits_to_write; i++) { \ - bit_writer_->PutValue(src[i], 1); \ - } \ - bits_available_ -= bits_to_write; \ - bit_offset = bits_to_write; \ - \ - if (bits_available_ == 0) { \ - bit_writer_->Flush(); \ - values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \ - bit_writer_->Clear(); \ - } \ - } \ - \ - int bits_remaining = num_values - bit_offset; \ - while (bit_offset < num_values) { \ - bits_available_ = static_cast(bits_buffer_->size()) * 8; \ - \ - int bits_to_write = std::min(bits_available_, bits_remaining); \ - for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { \ - bit_writer_->PutValue(src[i], 1); \ - } \ - bit_offset += bits_to_write; \ - bits_available_ -= bits_to_write; \ - bits_remaining -= bits_to_write; \ - \ - if (bits_available_ == 0) { \ - bit_writer_->Flush(); \ - values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \ - bit_writer_->Clear(); \ - } \ - } \ - } - - PLAINDECODER_BOOLEAN_PUT(const bool*, override) - PLAINDECODER_BOOLEAN_PUT(const std::vector&, ) - - protected: - int bits_available_; - std::unique_ptr bit_writer_; - std::shared_ptr bits_buffer_; - std::unique_ptr values_sink_; -}; - -template -inline std::shared_ptr PlainEncoder::FlushValues() { - std::shared_ptr buffer = values_sink_->GetBuffer(); - values_sink_.reset(new InMemoryOutputStream(this->pool_)); - return buffer; -} - -template -inline void PlainEncoder::Put(const T* buffer, int num_values) { - values_sink_->Write(reinterpret_cast(buffer), num_values * sizeof(T)); -} - -template <> -inline void PlainEncoder::Put(const ByteArray* src, int num_values) { - for (int i = 0; i < num_values; ++i) { - // Write the result to the output stream - values_sink_->Write(reinterpret_cast(&src[i].len), sizeof(uint32_t)); - if (src[i].len > 0) { - DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL"; - } - values_sink_->Write(reinterpret_cast(src[i].ptr), src[i].len); - } -} - -template <> -inline void PlainEncoder::Put(const FixedLenByteArray* src, int num_values) { - for (int i = 0; i < num_values; ++i) { - // Write the result to the output stream - if (descr_->type_length() > 0) { - DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL"; - } - values_sink_->Write(reinterpret_cast(src[i].ptr), - descr_->type_length()); - } -} - -// ---------------------------------------------------------------------- -// Dictionary encoding and decoding - -template -class DictionaryDecoder : public Decoder { - public: - typedef typename Type::c_type T; - - // Initializes the dictionary with values from 'dictionary'. The data in - // dictionary is not guaranteed to persist in memory after this call so the - // dictionary decoder needs to copy the data out if necessary. - explicit DictionaryDecoder(const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) - : Decoder(descr, Encoding::RLE_DICTIONARY), - dictionary_(0, pool), - byte_array_data_(AllocateBuffer(pool, 0)) {} - - // Perform type-specific initiatialization - void SetDict(Decoder* dictionary); - - void SetData(int num_values, const uint8_t* data, int len) override { - num_values_ = num_values; - if (len == 0) return; - uint8_t bit_width = *data; - ++data; - --len; - idx_decoder_ = ::arrow::util::RleDecoder(data, len, bit_width); - } - - int Decode(T* buffer, int max_values) override { - max_values = std::min(max_values, num_values_); - int decoded_values = - idx_decoder_.GetBatchWithDict(dictionary_.data(), buffer, max_values); - if (decoded_values != max_values) { - ParquetException::EofException(); - } - num_values_ -= max_values; - return max_values; - } - - int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset) override { - int decoded_values = - idx_decoder_.GetBatchWithDictSpaced(dictionary_.data(), buffer, num_values, - null_count, valid_bits, valid_bits_offset); - if (decoded_values != num_values) { - ParquetException::EofException(); - } - return decoded_values; - } - - private: - using Decoder::num_values_; - - // Only one is set. - Vector dictionary_; - - // Data that contains the byte array data (byte_array_dictionary_ just has the - // pointers). - std::shared_ptr byte_array_data_; - - ::arrow::util::RleDecoder idx_decoder_; -}; - -template -inline void DictionaryDecoder::SetDict(Decoder* dictionary) { - int num_dictionary_values = dictionary->values_left(); - dictionary_.Resize(num_dictionary_values); - dictionary->Decode(dictionary_.data(), num_dictionary_values); -} - -template <> -inline void DictionaryDecoder::SetDict(Decoder* dictionary) { - ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); -} - -template <> -inline void DictionaryDecoder::SetDict( - Decoder* dictionary) { - int num_dictionary_values = dictionary->values_left(); - dictionary_.Resize(num_dictionary_values); - dictionary->Decode(&dictionary_[0], num_dictionary_values); - - int total_size = 0; - for (int i = 0; i < num_dictionary_values; ++i) { - total_size += dictionary_[i].len; - } - if (total_size > 0) { - PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false)); - } - - int offset = 0; - uint8_t* bytes_data = byte_array_data_->mutable_data(); - for (int i = 0; i < num_dictionary_values; ++i) { - memcpy(bytes_data + offset, dictionary_[i].ptr, dictionary_[i].len); - dictionary_[i].ptr = bytes_data + offset; - offset += dictionary_[i].len; - } -} - -template <> -inline void DictionaryDecoder::SetDict(Decoder* dictionary) { - int num_dictionary_values = dictionary->values_left(); - dictionary_.Resize(num_dictionary_values); - dictionary->Decode(&dictionary_[0], num_dictionary_values); - - int fixed_len = descr_->type_length(); - int total_size = num_dictionary_values * fixed_len; - - PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false)); - uint8_t* bytes_data = byte_array_data_->mutable_data(); - for (int32_t i = 0, offset = 0; i < num_dictionary_values; ++i, offset += fixed_len) { - memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len); - dictionary_[i].ptr = bytes_data + offset; - } -} - -// ---------------------------------------------------------------------- -// Dictionary encoder - -template -struct DictEncoderTraits { - using c_type = typename DType::c_type; - using MemoTableType = ::arrow::internal::ScalarMemoTable; -}; - -template <> -struct DictEncoderTraits { - using MemoTableType = ::arrow::internal::BinaryMemoTable; -}; - -template <> -struct DictEncoderTraits { - using MemoTableType = ::arrow::internal::BinaryMemoTable; -}; - -// Initially 1024 elements -static constexpr int32_t INITIAL_HASH_TABLE_SIZE = 1 << 10; - -/// See the dictionary encoding section of https://github.com/Parquet/parquet-format. -/// The encoding supports streaming encoding. Values are encoded as they are added while -/// the dictionary is being constructed. At any time, the buffered values can be -/// written out with the current dictionary size. More values can then be added to -/// the encoder, including new dictionary entries. -template -class DictEncoder : public Encoder { - using MemoTableType = typename DictEncoderTraits::MemoTableType; - - public: - typedef typename DType::c_type T; - - explicit DictEncoder(const ColumnDescriptor* desc, - ::arrow::MemoryPool* allocator = ::arrow::default_memory_pool()) - : Encoder(desc, Encoding::PLAIN_DICTIONARY, allocator), - allocator_(allocator), - dict_encoded_size_(0), - type_length_(desc->type_length()), - memo_table_(INITIAL_HASH_TABLE_SIZE) {} - - ~DictEncoder() override { DCHECK(buffered_indices_.empty()); } - - void set_type_length(int type_length) { type_length_ = type_length; } - - /// Returns a conservative estimate of the number of bytes needed to encode the buffered - /// indices. Used to size the buffer passed to WriteIndices(). - int64_t EstimatedDataEncodedSize() override { - // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to - // reserve - // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used - // but not reserving them would cause the encoder to fail. - return 1 + - ::arrow::util::RleEncoder::MaxBufferSize( - bit_width(), static_cast(buffered_indices_.size())) + - ::arrow::util::RleEncoder::MinBufferSize(bit_width()); - } - - /// The minimum bit width required to encode the currently buffered indices. - int bit_width() const { - if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0; - if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1; - return BitUtil::Log2(num_entries()); - } - - /// Writes out any buffered indices to buffer preceded by the bit width of this data. - /// Returns the number of bytes written. - /// If the supplied buffer is not big enough, returns -1. - /// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize() - /// to size buffer. - int WriteIndices(uint8_t* buffer, int buffer_len); - - int dict_encoded_size() { return dict_encoded_size_; } - - /// Encode value. Note that this does not actually write any data, just - /// buffers the value's index to be written later. - inline void Put(const T& value); - void Put(const T* values, int num_values) override; - - std::shared_ptr FlushValues() override { - std::shared_ptr buffer = - AllocateBuffer(this->allocator_, EstimatedDataEncodedSize()); - int result_size = WriteIndices(buffer->mutable_data(), - static_cast(EstimatedDataEncodedSize())); - PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false)); - return std::move(buffer); - } - - void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, - int64_t valid_bits_offset) override { - ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset, - num_values); - for (int32_t i = 0; i < num_values; i++) { - if (valid_bits_reader.IsSet()) { - Put(src[i]); - } - valid_bits_reader.Next(); - } - } - - /// Writes out the encoded dictionary to buffer. buffer must be preallocated to - /// dict_encoded_size() bytes. - void WriteDict(uint8_t* buffer); - - /// The number of entries in the dictionary. - int num_entries() const { return memo_table_.size(); } - - private: - /// Clears all the indices (but leaves the dictionary). - void ClearIndices() { buffered_indices_.clear(); } - - ::arrow::MemoryPool* allocator_; - - /// Indices that have not yet be written out by WriteIndices(). - std::vector buffered_indices_; - - /// The number of bytes needed to encode the dictionary. - int dict_encoded_size_; - - /// Size of each encoded dictionary value. -1 for variable-length types. - int type_length_; - - MemoTableType memo_table_; -}; - -template -void DictEncoder::Put(const T* src, int num_values) { - for (int32_t i = 0; i < num_values; i++) { - Put(src[i]); - } -} - -template -inline void DictEncoder::Put(const T& v) { - // Put() implementation for primitive types - auto on_found = [](int32_t memo_index) {}; - auto on_not_found = [this](int32_t memo_index) { - dict_encoded_size_ += static_cast(sizeof(T)); - }; - - auto memo_index = memo_table_.GetOrInsert(v, on_found, on_not_found); - buffered_indices_.push_back(memo_index); -} - -template <> -inline void DictEncoder::Put(const ByteArray& v) { - static const uint8_t empty[] = {0}; - - auto on_found = [](int32_t memo_index) {}; - auto on_not_found = [&](int32_t memo_index) { - dict_encoded_size_ += static_cast(v.len + sizeof(uint32_t)); - }; - - DCHECK(v.ptr != nullptr || v.len == 0); - const void* ptr = (v.ptr != nullptr) ? v.ptr : empty; - auto memo_index = - memo_table_.GetOrInsert(ptr, static_cast(v.len), on_found, on_not_found); - buffered_indices_.push_back(memo_index); -} - -template <> -inline void DictEncoder::Put(const FixedLenByteArray& v) { - static const uint8_t empty[] = {0}; - - auto on_found = [](int32_t memo_index) {}; - auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; }; - - DCHECK(v.ptr != nullptr || type_length_ == 0); - const void* ptr = (v.ptr != nullptr) ? v.ptr : empty; - auto memo_index = memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found); - buffered_indices_.push_back(memo_index); -} - -template -inline void DictEncoder::WriteDict(uint8_t* buffer) { - // For primitive types, only a memcpy - DCHECK_EQ(static_cast(dict_encoded_size_), sizeof(T) * memo_table_.size()); - memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast(buffer)); -} - -// ByteArray and FLBA already have the dictionary encoded in their data heaps -template <> -inline void DictEncoder::WriteDict(uint8_t* buffer) { - memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) { - uint32_t len = static_cast(v.length()); - memcpy(buffer, &len, sizeof(uint32_t)); - buffer += sizeof(uint32_t); - memcpy(buffer, v.data(), v.length()); - buffer += v.length(); - }); -} - -template <> -inline void DictEncoder::WriteDict(uint8_t* buffer) { - memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) { - DCHECK_EQ(v.length(), static_cast(type_length_)); - memcpy(buffer, v.data(), type_length_); - buffer += type_length_; - }); -} - -template -inline int DictEncoder::WriteIndices(uint8_t* buffer, int buffer_len) { - // Write bit width in first byte - *buffer = static_cast(bit_width()); - ++buffer; - --buffer_len; - - ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width()); - for (int index : buffered_indices_) { - if (!encoder.Put(index)) return -1; - } - encoder.Flush(); - - ClearIndices(); - return 1 + encoder.len(); -} - -// ---------------------------------------------------------------------- -// DeltaBitPackDecoder - -template -class DeltaBitPackDecoder : public Decoder { - public: - typedef typename DType::c_type T; - - explicit DeltaBitPackDecoder(const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) - : Decoder(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) { - if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) { - throw ParquetException("Delta bit pack encoding should only be for integer data."); - } - } - - virtual void SetData(int num_values, const uint8_t* data, int len) { - num_values_ = num_values; - decoder_ = BitUtil::BitReader(data, len); - values_current_block_ = 0; - values_current_mini_block_ = 0; - } - - virtual int Decode(T* buffer, int max_values) { - return GetInternal(buffer, max_values); - } - - private: - using Decoder::num_values_; - - void InitBlock() { - int32_t block_size; - if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException(); - if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException(); - if (!decoder_.GetVlqInt(&values_current_block_)) { - ParquetException::EofException(); - } - if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException(); - - delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_); - uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); - - if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); - for (int i = 0; i < num_mini_blocks_; ++i) { - if (!decoder_.GetAligned(1, bit_width_data + i)) { - ParquetException::EofException(); - } - } - values_per_mini_block_ = block_size / num_mini_blocks_; - mini_block_idx_ = 0; - delta_bit_width_ = bit_width_data[0]; - values_current_mini_block_ = values_per_mini_block_; - } - - template - int GetInternal(T* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - const uint8_t* bit_width_data = delta_bit_widths_->data(); - for (int i = 0; i < max_values; ++i) { - if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) { - ++mini_block_idx_; - if (mini_block_idx_ < static_cast(delta_bit_widths_->size())) { - delta_bit_width_ = bit_width_data[mini_block_idx_]; - values_current_mini_block_ = values_per_mini_block_; - } else { - InitBlock(); - buffer[i] = last_value_; - continue; - } - } - - // TODO: the key to this algorithm is to decode the entire miniblock at once. - int64_t delta; - if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException(); - delta += min_delta_; - last_value_ += static_cast(delta); - buffer[i] = last_value_; - --values_current_mini_block_; - } - num_values_ -= max_values; - return max_values; - } - - ::arrow::MemoryPool* pool_; - BitUtil::BitReader decoder_; - int32_t values_current_block_; - int32_t num_mini_blocks_; - uint64_t values_per_mini_block_; - uint64_t values_current_mini_block_; - - int32_t min_delta_; - size_t mini_block_idx_; - std::shared_ptr delta_bit_widths_; - int delta_bit_width_; - - int32_t last_value_; -}; - -// ---------------------------------------------------------------------- -// DELTA_LENGTH_BYTE_ARRAY - -class DeltaLengthByteArrayDecoder : public Decoder { - public: - explicit DeltaLengthByteArrayDecoder( - const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) - : Decoder(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), - len_decoder_(nullptr, pool) {} - - virtual void SetData(int num_values, const uint8_t* data, int len) { - num_values_ = num_values; - if (len == 0) return; - int total_lengths_len = *reinterpret_cast(data); - data += 4; - len_decoder_.SetData(num_values, data, total_lengths_len); - data_ = data + total_lengths_len; - len_ = len - 4 - total_lengths_len; - } - - virtual int Decode(ByteArray* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - std::vector lengths(max_values); - len_decoder_.Decode(lengths.data(), max_values); - for (int i = 0; i < max_values; ++i) { - buffer[i].len = lengths[i]; - buffer[i].ptr = data_; - data_ += lengths[i]; - len_ -= lengths[i]; - } - num_values_ -= max_values; - return max_values; - } - - private: - using Decoder::num_values_; - DeltaBitPackDecoder len_decoder_; - const uint8_t* data_; - int len_; -}; - -// ---------------------------------------------------------------------- -// DELTA_BYTE_ARRAY - -class DeltaByteArrayDecoder : public Decoder { - public: - explicit DeltaByteArrayDecoder( - const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) - : Decoder(descr, Encoding::DELTA_BYTE_ARRAY), - prefix_len_decoder_(nullptr, pool), - suffix_decoder_(nullptr, pool), - last_value_(0, nullptr) {} - - virtual void SetData(int num_values, const uint8_t* data, int len) { - num_values_ = num_values; - if (len == 0) return; - int prefix_len_length = *reinterpret_cast(data); - data += 4; - len -= 4; - prefix_len_decoder_.SetData(num_values, data, prefix_len_length); - data += prefix_len_length; - len -= prefix_len_length; - suffix_decoder_.SetData(num_values, data, len); - } - - // TODO: this doesn't work and requires memory management. We need to allocate - // new strings to store the results. - virtual int Decode(ByteArray* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - int prefix_len = 0; - prefix_len_decoder_.Decode(&prefix_len, 1); - ByteArray suffix = {0, nullptr}; - suffix_decoder_.Decode(&suffix, 1); - buffer[i].len = prefix_len + suffix.len; - - uint8_t* result = reinterpret_cast(malloc(buffer[i].len)); - memcpy(result, last_value_.ptr, prefix_len); - memcpy(result + prefix_len, suffix.ptr, suffix.len); - - buffer[i].ptr = result; - last_value_ = buffer[i]; - } - num_values_ -= max_values; - return max_values; - } - - private: - using Decoder::num_values_; - - DeltaBitPackDecoder prefix_len_decoder_; - DeltaLengthByteArrayDecoder suffix_decoder_; - ByteArray last_value_; -}; - -} // namespace parquet - -#endif // PARQUET_ENCODING_INTERNAL_H diff --git a/cpp/src/parquet/encoding-test.cc b/cpp/src/parquet/encoding-test.cc index 90ceb7828b139..28d98126ec84a 100644 --- a/cpp/src/parquet/encoding-test.cc +++ b/cpp/src/parquet/encoding-test.cc @@ -24,7 +24,7 @@ #include "arrow/util/bit-util.h" -#include "parquet/encoding-internal.h" +#include "parquet/encoding.h" #include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" @@ -43,29 +43,31 @@ namespace test { TEST(VectorBooleanTest, TestEncodeDecode) { // PARQUET-454 int nvalues = 10000; - int nbytes = static_cast(BitUtil::BytesForBits(nvalues)); + int nbytes = static_cast(::arrow::BitUtil::BytesForBits(nvalues)); // seed the prng so failure is deterministic vector draws = flip_coins_seed(nvalues, 0.5, 0); - PlainEncoder encoder(nullptr); - PlainDecoder decoder(nullptr); + std::unique_ptr encoder = + MakeTypedEncoder(Encoding::PLAIN); + encoder->Put(draws, nvalues); - encoder.Put(draws, nvalues); + std::unique_ptr decoder = + MakeTypedDecoder(Encoding::PLAIN); - std::shared_ptr encode_buffer = encoder.FlushValues(); + std::shared_ptr encode_buffer = encoder->FlushValues(); ASSERT_EQ(nbytes, encode_buffer->size()); vector decode_buffer(nbytes); const uint8_t* decode_data = &decode_buffer[0]; - decoder.SetData(nvalues, encode_buffer->data(), - static_cast(encode_buffer->size())); - int values_decoded = decoder.Decode(&decode_buffer[0], nvalues); + decoder->SetData(nvalues, encode_buffer->data(), + static_cast(encode_buffer->size())); + int values_decoded = decoder->Decode(&decode_buffer[0], nvalues); ASSERT_EQ(nvalues, values_decoded); for (int i = 0; i < nvalues; ++i) { - ASSERT_EQ(draws[i], BitUtil::GetBit(decode_data, i)) << i; + ASSERT_EQ(draws[i], ::arrow::BitUtil::GetBit(decode_data, i)) << i; } } @@ -214,14 +216,14 @@ class TestPlainEncoding : public TestEncodingBase { static constexpr int TYPE = Type::type_num; virtual void CheckRoundtrip() { - PlainEncoder encoder(descr_.get()); - PlainDecoder decoder(descr_.get()); - encoder.Put(draws_, num_values_); - encode_buffer_ = encoder.FlushValues(); - - decoder.SetData(num_values_, encode_buffer_->data(), - static_cast(encode_buffer_->size())); - int values_decoded = decoder.Decode(decode_buf_, num_values_); + auto encoder = MakeTypedEncoder(Encoding::PLAIN, false, descr_.get()); + auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_.get()); + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + int values_decoded = decoder->Decode(decode_buf_, num_values_); ASSERT_EQ(num_values_, values_decoded); ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); } @@ -250,29 +252,38 @@ class TestDictionaryEncoding : public TestEncodingBase { static constexpr int TYPE = Type::type_num; void CheckRoundtrip() { - std::vector valid_bits(BitUtil::BytesForBits(num_values_) + 1, 255); - DictEncoder encoder(descr_.get()); + std::vector valid_bits(::arrow::BitUtil::BytesForBits(num_values_) + 1, 255); - ASSERT_NO_THROW(encoder.Put(draws_, num_values_)); - dict_buffer_ = AllocateBuffer(default_memory_pool(), encoder.dict_encoded_size()); - encoder.WriteDict(dict_buffer_->mutable_data()); - std::shared_ptr indices = encoder.FlushValues(); + auto base_encoder = MakeEncoder(Type::type_num, Encoding::PLAIN, true, descr_.get()); + auto encoder = + dynamic_cast::Encoder*>(base_encoder.get()); + auto dict_traits = dynamic_cast*>(base_encoder.get()); + + ASSERT_NO_THROW(encoder->Put(draws_, num_values_)); + dict_buffer_ = + AllocateBuffer(default_memory_pool(), dict_traits->dict_encoded_size()); + dict_traits->WriteDict(dict_buffer_->mutable_data()); + std::shared_ptr indices = encoder->FlushValues(); + + auto base_spaced_encoder = + MakeEncoder(Type::type_num, Encoding::PLAIN, true, descr_.get()); + auto spaced_encoder = + dynamic_cast::Encoder*>(base_spaced_encoder.get()); - DictEncoder spaced_encoder(descr_.get()); // PutSpaced should lead to the same results - ASSERT_NO_THROW(spaced_encoder.PutSpaced(draws_, num_values_, valid_bits.data(), 0)); - std::shared_ptr indices_from_spaced = spaced_encoder.FlushValues(); + ASSERT_NO_THROW(spaced_encoder->PutSpaced(draws_, num_values_, valid_bits.data(), 0)); + std::shared_ptr indices_from_spaced = spaced_encoder->FlushValues(); ASSERT_TRUE(indices_from_spaced->Equals(*indices)); - PlainDecoder dict_decoder(descr_.get()); - dict_decoder.SetData(encoder.num_entries(), dict_buffer_->data(), - static_cast(dict_buffer_->size())); + auto dict_decoder = MakeTypedDecoder(Encoding::PLAIN, descr_.get()); + dict_decoder->SetData(dict_traits->num_entries(), dict_buffer_->data(), + static_cast(dict_buffer_->size())); - DictionaryDecoder decoder(descr_.get()); - decoder.SetDict(&dict_decoder); + auto decoder = MakeDictDecoder(descr_.get()); + decoder->SetDict(dict_decoder.get()); - decoder.SetData(num_values_, indices->data(), static_cast(indices->size())); - int values_decoded = decoder.Decode(decode_buf_, num_values_); + decoder->SetData(num_values_, indices->data(), static_cast(indices->size())); + int values_decoded = decoder->Decode(decode_buf_, num_values_); ASSERT_EQ(num_values_, values_decoded); // TODO(wesm): The DictionaryDecoder must stay alive because the decoded @@ -281,9 +292,9 @@ class TestDictionaryEncoding : public TestEncodingBase { ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); // Also test spaced decoding - decoder.SetData(num_values_, indices->data(), static_cast(indices->size())); + decoder->SetData(num_values_, indices->data(), static_cast(indices->size())); values_decoded = - decoder.DecodeSpaced(decode_buf_, num_values_, 0, valid_bits.data(), 0); + decoder->DecodeSpaced(decode_buf_, num_values_, 0, valid_bits.data(), 0); ASSERT_EQ(num_values_, values_decoded); ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); } @@ -300,10 +311,7 @@ TYPED_TEST(TestDictionaryEncoding, BasicRoundTrip) { } TEST(TestDictionaryEncoding, CannotDictDecodeBoolean) { - PlainDecoder dict_decoder(nullptr); - DictionaryDecoder decoder(nullptr); - - ASSERT_THROW(decoder.SetDict(&dict_decoder), ParquetException); + ASSERT_THROW(MakeDictDecoder(nullptr), ParquetException); } } // namespace test diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc new file mode 100644 index 0000000000000..3fd3ceca4c5e2 --- /dev/null +++ b/cpp/src/parquet/encoding.cc @@ -0,0 +1,1279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/encoding.h" + +#include +#include +#include +#include +#include + +#include "arrow/builder.h" +#include "arrow/status.h" +#include "arrow/util/bit-stream-utils.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/hashing.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "arrow/util/rle-encoding.h" +#include "arrow/util/string_view.h" + +#include "parquet/exception.h" +#include "parquet/schema.h" +#include "parquet/types.h" +#include "parquet/util/memory.h" + +namespace parquet { + +namespace BitUtil = ::arrow::BitUtil; + +class EncoderImpl : virtual public Encoder { + public: + EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, + ::arrow::MemoryPool* pool) + : descr_(descr), + encoding_(encoding), + pool_(pool), + type_length_(descr ? descr->type_length() : -1) {} + + Encoding::type encoding() const override { return encoding_; } + + ::arrow::MemoryPool* memory_pool() const override { return pool_; } + + protected: + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const ColumnDescriptor* descr_; + const Encoding::type encoding_; + ::arrow::MemoryPool* pool_; + + /// Type length from descr + int type_length_; +}; + +// ---------------------------------------------------------------------- +// Plain encoder implementation + +template +class PlainEncoder : public EncoderImpl, virtual public TypedEncoder { + public: + using T = typename DType::c_type; + + explicit PlainEncoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + + int64_t EstimatedDataEncodedSize() override; + std::shared_ptr FlushValues() override; + + void Put(const T* buffer, int num_values) override; + + protected: + std::unique_ptr values_sink_; +}; + +template +PlainEncoder::PlainEncoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool) + : EncoderImpl(descr, Encoding::PLAIN, pool) { + values_sink_.reset(new InMemoryOutputStream(pool)); +} +template +int64_t PlainEncoder::EstimatedDataEncodedSize() { + return values_sink_->Tell(); +} + +template +std::shared_ptr PlainEncoder::FlushValues() { + std::shared_ptr buffer = values_sink_->GetBuffer(); + values_sink_.reset(new InMemoryOutputStream(this->pool_)); + return buffer; +} + +template +void PlainEncoder::Put(const T* buffer, int num_values) { + values_sink_->Write(reinterpret_cast(buffer), num_values * sizeof(T)); +} + +template <> +inline void PlainEncoder::Put(const ByteArray* src, int num_values) { + for (int i = 0; i < num_values; ++i) { + // Write the result to the output stream + values_sink_->Write(reinterpret_cast(&src[i].len), sizeof(uint32_t)); + if (src[i].len > 0) { + DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL"; + } + values_sink_->Write(reinterpret_cast(src[i].ptr), src[i].len); + } +} + +template <> +inline void PlainEncoder::Put(const FixedLenByteArray* src, int num_values) { + for (int i = 0; i < num_values; ++i) { + // Write the result to the output stream + if (descr_->type_length() > 0) { + DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL"; + } + values_sink_->Write(reinterpret_cast(src[i].ptr), + descr_->type_length()); + } +} + +class PlainByteArrayEncoder : public PlainEncoder, + virtual public ByteArrayEncoder { + public: + using BASE = PlainEncoder; + using BASE::PlainEncoder; +}; + +class PlainFLBAEncoder : public PlainEncoder, virtual public FLBAEncoder { + public: + using BASE = PlainEncoder; + using BASE::PlainEncoder; +}; + +class PlainBooleanEncoder : public EncoderImpl, + virtual public TypedEncoder, + virtual public BooleanEncoder { + public: + explicit PlainBooleanEncoder( + const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + + int64_t EstimatedDataEncodedSize() override; + std::shared_ptr FlushValues() override; + + void Put(const bool* src, int num_values) override; + void Put(const std::vector& src, int num_values) override; + + private: + int bits_available_; + std::unique_ptr<::arrow::BitUtil::BitWriter> bit_writer_; + std::shared_ptr bits_buffer_; + std::unique_ptr values_sink_; + + template + void PutImpl(const SequenceType& src, int num_values); +}; + +template +void PlainBooleanEncoder::PutImpl(const SequenceType& src, int num_values) { + int bit_offset = 0; + if (bits_available_ > 0) { + int bits_to_write = std::min(bits_available_, num_values); + for (int i = 0; i < bits_to_write; i++) { + bit_writer_->PutValue(src[i], 1); + } + bits_available_ -= bits_to_write; + bit_offset = bits_to_write; + + if (bits_available_ == 0) { + bit_writer_->Flush(); + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); + bit_writer_->Clear(); + } + } + + int bits_remaining = num_values - bit_offset; + while (bit_offset < num_values) { + bits_available_ = static_cast(bits_buffer_->size()) * 8; + + int bits_to_write = std::min(bits_available_, bits_remaining); + for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { + bit_writer_->PutValue(src[i], 1); + } + bit_offset += bits_to_write; + bits_available_ -= bits_to_write; + bits_remaining -= bits_to_write; + + if (bits_available_ == 0) { + bit_writer_->Flush(); + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); + bit_writer_->Clear(); + } + } +} + +PlainBooleanEncoder::PlainBooleanEncoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool) + : EncoderImpl(descr, Encoding::PLAIN, pool), + bits_available_(kInMemoryDefaultCapacity * 8), + bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)), + values_sink_(new InMemoryOutputStream(pool)) { + bit_writer_.reset(new BitUtil::BitWriter(bits_buffer_->mutable_data(), + static_cast(bits_buffer_->size()))); +} + +int64_t PlainBooleanEncoder::EstimatedDataEncodedSize() { + return values_sink_->Tell() + bit_writer_->bytes_written(); +} + +std::shared_ptr PlainBooleanEncoder::FlushValues() { + if (bits_available_ > 0) { + bit_writer_->Flush(); + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); + bit_writer_->Clear(); + bits_available_ = static_cast(bits_buffer_->size()) * 8; + } + + std::shared_ptr buffer = values_sink_->GetBuffer(); + values_sink_.reset(new InMemoryOutputStream(this->pool_)); + return buffer; +} + +void PlainBooleanEncoder::Put(const bool* src, int num_values) { + PutImpl(src, num_values); +} + +void PlainBooleanEncoder::Put(const std::vector& src, int num_values) { + PutImpl(src, num_values); +} + +// ---------------------------------------------------------------------- +// DictEncoder implementations + +template +struct DictEncoderTraits { + using c_type = typename DType::c_type; + using MemoTableType = ::arrow::internal::ScalarMemoTable; +}; + +template <> +struct DictEncoderTraits { + using MemoTableType = ::arrow::internal::BinaryMemoTable; +}; + +template <> +struct DictEncoderTraits { + using MemoTableType = ::arrow::internal::BinaryMemoTable; +}; + +/// See the dictionary encoding section of https://github.com/Parquet/parquet-format. +/// The encoding supports streaming encoding. Values are encoded as they are added while +/// the dictionary is being constructed. At any time, the buffered values can be +/// written out with the current dictionary size. More values can then be added to +/// the encoder, including new dictionary entries. +template +class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder { + using MemoTableType = typename DictEncoderTraits::MemoTableType; + + public: + typedef typename DType::c_type T; + + explicit DictEncoderImpl( + const ColumnDescriptor* desc, + ::arrow::MemoryPool* allocator = ::arrow::default_memory_pool()); + + ~DictEncoderImpl() override { DCHECK(buffered_indices_.empty()); } + + int dict_encoded_size() override { return dict_encoded_size_; } + + int WriteIndices(uint8_t* buffer, int buffer_len) override { + // Write bit width in first byte + *buffer = static_cast(bit_width()); + ++buffer; + --buffer_len; + + ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width()); + for (int index : buffered_indices_) { + if (!encoder.Put(index)) return -1; + } + encoder.Flush(); + + ClearIndices(); + return 1 + encoder.len(); + } + + void set_type_length(int type_length) { this->type_length_ = type_length; } + + /// Returns a conservative estimate of the number of bytes needed to encode the buffered + /// indices. Used to size the buffer passed to WriteIndices(). + int64_t EstimatedDataEncodedSize() override; + + /// The minimum bit width required to encode the currently buffered indices. + int bit_width() const override; + + /// Encode value. Note that this does not actually write any data, just + /// buffers the value's index to be written later. + inline void Put(const T& value); + void Put(const T* values, int num_values) override; + + std::shared_ptr FlushValues() override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override; + + /// Writes out the encoded dictionary to buffer. buffer must be preallocated to + /// dict_encoded_size() bytes. + void WriteDict(uint8_t* buffer) override; + + /// The number of entries in the dictionary. + int num_entries() const override { return memo_table_.size(); } + + private: + /// Clears all the indices (but leaves the dictionary). + void ClearIndices() { buffered_indices_.clear(); } + + /// Indices that have not yet be written out by WriteIndices(). + std::vector buffered_indices_; + + /// The number of bytes needed to encode the dictionary. + int dict_encoded_size_; + + MemoTableType memo_table_; +}; + +// Initially 1024 elements +static constexpr int32_t INITIAL_HASH_TABLE_SIZE = 1 << 10; + +template +DictEncoderImpl::DictEncoderImpl(const ColumnDescriptor* desc, + ::arrow::MemoryPool* pool) + : EncoderImpl(desc, Encoding::PLAIN_DICTIONARY, pool), + dict_encoded_size_(0), + memo_table_(INITIAL_HASH_TABLE_SIZE) {} + +template +int64_t DictEncoderImpl::EstimatedDataEncodedSize() { + // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to + // reserve + // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used + // but not reserving them would cause the encoder to fail. + return 1 + + ::arrow::util::RleEncoder::MaxBufferSize( + bit_width(), static_cast(buffered_indices_.size())) + + ::arrow::util::RleEncoder::MinBufferSize(bit_width()); +} + +template +int DictEncoderImpl::bit_width() const { + if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0; + if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1; + return BitUtil::Log2(num_entries()); +} + +template +std::shared_ptr DictEncoderImpl::FlushValues() { + std::shared_ptr buffer = + AllocateBuffer(this->pool_, EstimatedDataEncodedSize()); + int result_size = + WriteIndices(buffer->mutable_data(), static_cast(EstimatedDataEncodedSize())); + PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false)); + return buffer; +} + +template +void DictEncoderImpl::Put(const T* src, int num_values) { + for (int32_t i = 0; i < num_values; i++) { + Put(src[i]); + } +} + +template +void DictEncoderImpl::PutSpaced(const T* src, int num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset, + num_values); + for (int32_t i = 0; i < num_values; i++) { + if (valid_bits_reader.IsSet()) { + Put(src[i]); + } + valid_bits_reader.Next(); + } +} + +template +void DictEncoderImpl::WriteDict(uint8_t* buffer) { + // For primitive types, only a memcpy + DCHECK_EQ(static_cast(dict_encoded_size_), sizeof(T) * memo_table_.size()); + memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast(buffer)); +} + +// ByteArray and FLBA already have the dictionary encoded in their data heaps +template <> +void DictEncoderImpl::WriteDict(uint8_t* buffer) { + memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) { + uint32_t len = static_cast(v.length()); + memcpy(buffer, &len, sizeof(uint32_t)); + buffer += sizeof(uint32_t); + memcpy(buffer, v.data(), v.length()); + buffer += v.length(); + }); +} + +template <> +void DictEncoderImpl::WriteDict(uint8_t* buffer) { + memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) { + DCHECK_EQ(v.length(), static_cast(type_length_)); + memcpy(buffer, v.data(), type_length_); + buffer += type_length_; + }); +} + +template +inline void DictEncoderImpl::Put(const T& v) { + // Put() implementation for primitive types + auto on_found = [](int32_t memo_index) {}; + auto on_not_found = [this](int32_t memo_index) { + dict_encoded_size_ += static_cast(sizeof(T)); + }; + + auto memo_index = memo_table_.GetOrInsert(v, on_found, on_not_found); + buffered_indices_.push_back(memo_index); +} + +template <> +inline void DictEncoderImpl::Put(const ByteArray& v) { + static const uint8_t empty[] = {0}; + + auto on_found = [](int32_t memo_index) {}; + auto on_not_found = [&](int32_t memo_index) { + dict_encoded_size_ += static_cast(v.len + sizeof(uint32_t)); + }; + + DCHECK(v.ptr != nullptr || v.len == 0); + const void* ptr = (v.ptr != nullptr) ? v.ptr : empty; + auto memo_index = + memo_table_.GetOrInsert(ptr, static_cast(v.len), on_found, on_not_found); + buffered_indices_.push_back(memo_index); +} + +template <> +inline void DictEncoderImpl::Put(const FixedLenByteArray& v) { + static const uint8_t empty[] = {0}; + + auto on_found = [](int32_t memo_index) {}; + auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; }; + + DCHECK(v.ptr != nullptr || type_length_ == 0); + const void* ptr = (v.ptr != nullptr) ? v.ptr : empty; + auto memo_index = memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found); + buffered_indices_.push_back(memo_index); +} + +class DictByteArrayEncoder : public DictEncoderImpl, + virtual public ByteArrayEncoder { + public: + using BASE = DictEncoderImpl; + using BASE::DictEncoderImpl; +}; + +class DictFLBAEncoder : public DictEncoderImpl, virtual public FLBAEncoder { + public: + using BASE = DictEncoderImpl; + using BASE::DictEncoderImpl; +}; + +// ---------------------------------------------------------------------- +// Encoder and decoder factory functions + +std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encoding, + bool use_dictionary, const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool) { + if (use_dictionary) { + switch (type_num) { + case Type::INT32: + return std::unique_ptr(new DictEncoderImpl(descr, pool)); + case Type::INT64: + return std::unique_ptr(new DictEncoderImpl(descr, pool)); + case Type::INT96: + return std::unique_ptr(new DictEncoderImpl(descr, pool)); + case Type::FLOAT: + return std::unique_ptr(new DictEncoderImpl(descr, pool)); + case Type::DOUBLE: + return std::unique_ptr(new DictEncoderImpl(descr, pool)); + case Type::BYTE_ARRAY: + return std::unique_ptr(new DictByteArrayEncoder(descr, pool)); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::unique_ptr(new DictFLBAEncoder(descr, pool)); + default: + DCHECK(false) << "Encoder not implemented"; + break; + } + } else if (encoding == Encoding::PLAIN) { + switch (type_num) { + case Type::BOOLEAN: + return std::unique_ptr(new PlainBooleanEncoder(descr, pool)); + case Type::INT32: + return std::unique_ptr(new PlainEncoder(descr, pool)); + case Type::INT64: + return std::unique_ptr(new PlainEncoder(descr, pool)); + case Type::INT96: + return std::unique_ptr(new PlainEncoder(descr, pool)); + case Type::FLOAT: + return std::unique_ptr(new PlainEncoder(descr, pool)); + case Type::DOUBLE: + return std::unique_ptr(new PlainEncoder(descr, pool)); + case Type::BYTE_ARRAY: + return std::unique_ptr(new PlainByteArrayEncoder(descr, pool)); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::unique_ptr(new PlainFLBAEncoder(descr, pool)); + default: + DCHECK(false) << "Encoder not implemented"; + break; + } + } else { + ParquetException::NYI("Selected encoding is not supported"); + } + DCHECK(false) << "Should not be able to reach this code"; + return nullptr; +} + +class DecoderImpl : virtual public Decoder { + public: + void SetData(int num_values, const uint8_t* data, int len) override { + num_values_ = num_values; + data_ = data; + len_ = len; + } + + int values_left() const override { return num_values_; } + Encoding::type encoding() const override { return encoding_; } + + protected: + explicit DecoderImpl(const ColumnDescriptor* descr, Encoding::type encoding) + : descr_(descr), encoding_(encoding), num_values_(0), data_(NULLPTR), len_(0) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const ColumnDescriptor* descr_; + + const Encoding::type encoding_; + int num_values_; + const uint8_t* data_; + int len_; + int type_length_; +}; + +template +class PlainDecoder : public DecoderImpl, virtual public TypedDecoder { + public: + using T = typename DType::c_type; + explicit PlainDecoder(const ColumnDescriptor* descr); + + int Decode(T* buffer, int max_values) override; +}; + +template +PlainDecoder::PlainDecoder(const ColumnDescriptor* descr) + : DecoderImpl(descr, Encoding::PLAIN) { + if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) { + type_length_ = descr_->type_length(); + } else { + type_length_ = -1; + } +} + +// Decode routine templated on C++ type rather than type enum +template +inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, + int type_length, T* out) { + int bytes_to_decode = num_values * static_cast(sizeof(T)); + if (data_size < bytes_to_decode) { + ParquetException::EofException(); + } + // If bytes_to_decode == 0, data could be null + if (bytes_to_decode > 0) { + memcpy(out, data, bytes_to_decode); + } + return bytes_to_decode; +} + +// Template specialization for BYTE_ARRAY. The written values do not own their +// own data. +template <> +inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, + int type_length, ByteArray* out) { + int bytes_decoded = 0; + int increment; + for (int i = 0; i < num_values; ++i) { + uint32_t len = out[i].len = *reinterpret_cast(data); + increment = static_cast(sizeof(uint32_t) + len); + if (data_size < increment) ParquetException::EofException(); + out[i].ptr = data + sizeof(uint32_t); + data += increment; + data_size -= increment; + bytes_decoded += increment; + } + return bytes_decoded; +} + +// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not +// own their own data. +template <> +inline int DecodePlain(const uint8_t* data, int64_t data_size, + int num_values, int type_length, + FixedLenByteArray* out) { + int bytes_to_decode = type_length * num_values; + if (data_size < bytes_to_decode) { + ParquetException::EofException(); + } + for (int i = 0; i < num_values; ++i) { + out[i].ptr = data; + data += type_length; + data_size -= type_length; + } + return bytes_to_decode; +} + +template +int PlainDecoder::Decode(T* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + int bytes_consumed = DecodePlain(data_, len_, max_values, type_length_, buffer); + data_ += bytes_consumed; + len_ -= bytes_consumed; + num_values_ -= max_values; + return max_values; +} + +class PlainBooleanDecoder : public DecoderImpl, + virtual public TypedDecoder, + virtual public BooleanDecoder { + public: + explicit PlainBooleanDecoder(const ColumnDescriptor* descr); + void SetData(int num_values, const uint8_t* data, int len) override; + + // Two flavors of bool decoding + int Decode(uint8_t* buffer, int max_values) override; + int Decode(bool* buffer, int max_values) override; + + private: + std::unique_ptr<::arrow::BitUtil::BitReader> bit_reader_; +}; + +PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr) + : DecoderImpl(descr, Encoding::PLAIN) {} + +void PlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + bit_reader_.reset(new BitUtil::BitReader(data, len)); +} + +int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + bool val; + ::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values); + for (int i = 0; i < max_values; ++i) { + if (!bit_reader_->GetValue(1, &val)) { + ParquetException::EofException(); + } + if (val) { + bit_writer.Set(); + } + bit_writer.Next(); + } + bit_writer.Finish(); + num_values_ -= max_values; + return max_values; +} + +int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) { + ParquetException::EofException(); + } + num_values_ -= max_values; + return max_values; +} + +class PlainByteArrayDecoder : public PlainDecoder, + virtual public ByteArrayDecoder { + public: + using Base = PlainDecoder; + using Base::DecodeSpaced; + using Base::PlainDecoder; + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + ::arrow::internal::ChunkedBinaryBuilder* out) override { + int result = 0; + PARQUET_THROW_NOT_OK( + DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); + return result; + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + ::arrow::BinaryDictionaryBuilder* out) override { + int result = 0; + PARQUET_THROW_NOT_OK( + DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); + return result; + } + + int DecodeArrowNonNull(int num_values, + ::arrow::internal::ChunkedBinaryBuilder* out) override { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, out, &result)); + return result; + } + + private: + template + ::arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, BuilderType* out, + int* values_decoded) { + num_values = std::min(num_values, num_values_); + + ARROW_RETURN_NOT_OK(out->Reserve(num_values)); + + ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); + int increment; + int i = 0; + const uint8_t* data = data_; + int64_t data_size = len_; + int bytes_decoded = 0; + while (i < num_values) { + if (bit_reader.IsSet()) { + uint32_t len = *reinterpret_cast(data); + increment = static_cast(sizeof(uint32_t) + len); + if (data_size < increment) { + ParquetException::EofException(); + } + ARROW_RETURN_NOT_OK(out->Append(data + sizeof(uint32_t), len)); + data += increment; + data_size -= increment; + bytes_decoded += increment; + ++i; + } else { + ARROW_RETURN_NOT_OK(out->AppendNull()); + } + bit_reader.Next(); + } + + data_ += bytes_decoded; + len_ -= bytes_decoded; + num_values_ -= num_values; + *values_decoded = num_values; + return ::arrow::Status::OK(); + } + + ::arrow::Status DecodeArrowNonNull(int num_values, + ::arrow::internal::ChunkedBinaryBuilder* out, + int* values_decoded) { + num_values = std::min(num_values, num_values_); + ARROW_RETURN_NOT_OK(out->Reserve(num_values)); + int i = 0; + const uint8_t* data = data_; + int64_t data_size = len_; + int bytes_decoded = 0; + while (i < num_values) { + uint32_t len = *reinterpret_cast(data); + int increment = static_cast(sizeof(uint32_t) + len); + if (data_size < increment) ParquetException::EofException(); + ARROW_RETURN_NOT_OK(out->Append(data + sizeof(uint32_t), len)); + data += increment; + data_size -= increment; + bytes_decoded += increment; + } + + data_ += bytes_decoded; + len_ -= bytes_decoded; + num_values_ -= num_values; + *values_decoded = num_values; + return ::arrow::Status::OK(); + } +}; + +class PlainFLBADecoder : public PlainDecoder, virtual public FLBADecoder { + public: + using Base = PlainDecoder; + using Base::PlainDecoder; +}; + +// ---------------------------------------------------------------------- +// Dictionary encoding and decoding + +template +class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { + public: + typedef typename Type::c_type T; + + // Initializes the dictionary with values from 'dictionary'. The data in + // dictionary is not guaranteed to persist in memory after this call so the + // dictionary decoder needs to copy the data out if necessary. + explicit DictDecoderImpl(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : DecoderImpl(descr, Encoding::RLE_DICTIONARY), + dictionary_(0, pool), + byte_array_data_(AllocateBuffer(pool, 0)) {} + + // Perform type-specific initiatialization + void SetDict(TypedDecoder* dictionary) override; + + void SetData(int num_values, const uint8_t* data, int len) override { + num_values_ = num_values; + if (len == 0) return; + uint8_t bit_width = *data; + ++data; + --len; + idx_decoder_ = ::arrow::util::RleDecoder(data, len, bit_width); + } + + int Decode(T* buffer, int max_values) override { + max_values = std::min(max_values, num_values_); + int decoded_values = + idx_decoder_.GetBatchWithDict(dictionary_.data(), buffer, max_values); + if (decoded_values != max_values) { + ParquetException::EofException(); + } + num_values_ -= max_values; + return max_values; + } + + int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + int decoded_values = + idx_decoder_.GetBatchWithDictSpaced(dictionary_.data(), buffer, num_values, + null_count, valid_bits, valid_bits_offset); + if (decoded_values != num_values) { + ParquetException::EofException(); + } + return decoded_values; + } + + protected: + // Only one is set. + Vector dictionary_; + + // Data that contains the byte array data (byte_array_dictionary_ just has the + // pointers). + std::shared_ptr byte_array_data_; + + ::arrow::util::RleDecoder idx_decoder_; +}; + +template +inline void DictDecoderImpl::SetDict(TypedDecoder* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.Resize(num_dictionary_values); + dictionary->Decode(dictionary_.data(), num_dictionary_values); +} + +template <> +inline void DictDecoderImpl::SetDict(TypedDecoder* dictionary) { + ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); +} + +template <> +inline void DictDecoderImpl::SetDict( + TypedDecoder* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.Resize(num_dictionary_values); + dictionary->Decode(&dictionary_[0], num_dictionary_values); + + int total_size = 0; + for (int i = 0; i < num_dictionary_values; ++i) { + total_size += dictionary_[i].len; + } + if (total_size > 0) { + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false)); + } + + int offset = 0; + uint8_t* bytes_data = byte_array_data_->mutable_data(); + for (int i = 0; i < num_dictionary_values; ++i) { + memcpy(bytes_data + offset, dictionary_[i].ptr, dictionary_[i].len); + dictionary_[i].ptr = bytes_data + offset; + offset += dictionary_[i].len; + } +} + +template <> +inline void DictDecoderImpl::SetDict(TypedDecoder* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.Resize(num_dictionary_values); + dictionary->Decode(&dictionary_[0], num_dictionary_values); + + int fixed_len = descr_->type_length(); + int total_size = num_dictionary_values * fixed_len; + + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false)); + uint8_t* bytes_data = byte_array_data_->mutable_data(); + for (int32_t i = 0, offset = 0; i < num_dictionary_values; ++i, offset += fixed_len) { + memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len); + dictionary_[i].ptr = bytes_data + offset; + } +} + +class DictByteArrayDecoder : public DictDecoderImpl, + virtual public ByteArrayDecoder { + public: + using BASE = DictDecoderImpl; + using BASE::DictDecoderImpl; + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + ::arrow::internal::ChunkedBinaryBuilder* out) override { + int result = 0; + PARQUET_THROW_NOT_OK( + DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); + return result; + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + ::arrow::BinaryDictionaryBuilder* out) override { + int result = 0; + PARQUET_THROW_NOT_OK( + DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); + return result; + } + + int DecodeArrowNonNull(int num_values, + ::arrow::internal::ChunkedBinaryBuilder* out) override { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, out, &result)); + return result; + } + + private: + template + ::arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, BuilderType* builder, + int* out_num_values) { + constexpr int32_t buffer_size = 1024; + int32_t indices_buffer[buffer_size]; + + ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); + + int values_decoded = 0; + while (values_decoded < num_values) { + bool is_valid = bit_reader.IsSet(); + bit_reader.Next(); + + if (is_valid) { + int32_t batch_size = + std::min(buffer_size, num_values - values_decoded - null_count); + int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size); + + int i = 0; + while (true) { + // Consume all indices + if (is_valid) { + const auto& val = dictionary_[indices_buffer[i]]; + ARROW_RETURN_NOT_OK(builder->Append(val.ptr, val.len)); + ++i; + } else { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + --null_count; + } + ++values_decoded; + if (i == num_indices) { + // Do not advance the bit_reader if we have fulfilled the decode + // request + break; + } + is_valid = bit_reader.IsSet(); + bit_reader.Next(); + } + } else { + ARROW_RETURN_NOT_OK(builder->AppendNull()); + --null_count; + ++values_decoded; + } + } + if (values_decoded != num_values) { + return ::arrow::Status::IOError("Expected to dictionary-decode ", num_values, + " but only able to decode ", values_decoded); + } + *out_num_values = values_decoded; + return ::arrow::Status::OK(); + } + + template + ::arrow::Status DecodeArrowNonNull(int num_values, BuilderType* builder, + int* out_num_values) { + constexpr int32_t buffer_size = 2048; + int32_t indices_buffer[buffer_size]; + int values_decoded = 0; + while (values_decoded < num_values) { + int num_indices = idx_decoder_.GetBatch(indices_buffer, buffer_size); + if (num_indices == 0) break; + for (int i = 0; i < num_indices; ++i) { + const auto& val = dictionary_[indices_buffer[i]]; + PARQUET_THROW_NOT_OK(builder->Append(val.ptr, val.len)); + } + values_decoded += num_indices; + } + if (values_decoded != num_values) { + ParquetException::EofException(); + } + *out_num_values = values_decoded; + return ::arrow::Status::OK(); + } +}; + +class DictFLBADecoder : public DictDecoderImpl, virtual public FLBADecoder { + public: + using BASE = DictDecoderImpl; + using BASE::DictDecoderImpl; +}; + +// ---------------------------------------------------------------------- +// DeltaBitPackDecoder + +template +class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder { + public: + typedef typename DType::c_type T; + + explicit DeltaBitPackDecoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : DecoderImpl(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) { + if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) { + throw ParquetException("Delta bit pack encoding should only be for integer data."); + } + } + + virtual void SetData(int num_values, const uint8_t* data, int len) { + this->num_values_ = num_values; + decoder_ = ::arrow::BitUtil::BitReader(data, len); + values_current_block_ = 0; + values_current_mini_block_ = 0; + } + + virtual int Decode(T* buffer, int max_values) { + return GetInternal(buffer, max_values); + } + + private: + void InitBlock() { + int32_t block_size; + if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException(); + if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException(); + if (!decoder_.GetVlqInt(&values_current_block_)) { + ParquetException::EofException(); + } + if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException(); + + delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_); + uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); + + if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); + for (int i = 0; i < num_mini_blocks_; ++i) { + if (!decoder_.GetAligned(1, bit_width_data + i)) { + ParquetException::EofException(); + } + } + values_per_mini_block_ = block_size / num_mini_blocks_; + mini_block_idx_ = 0; + delta_bit_width_ = bit_width_data[0]; + values_current_mini_block_ = values_per_mini_block_; + } + + template + int GetInternal(T* buffer, int max_values) { + max_values = std::min(max_values, this->num_values_); + const uint8_t* bit_width_data = delta_bit_widths_->data(); + for (int i = 0; i < max_values; ++i) { + if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) { + ++mini_block_idx_; + if (mini_block_idx_ < static_cast(delta_bit_widths_->size())) { + delta_bit_width_ = bit_width_data[mini_block_idx_]; + values_current_mini_block_ = values_per_mini_block_; + } else { + InitBlock(); + buffer[i] = last_value_; + continue; + } + } + + // TODO: the key to this algorithm is to decode the entire miniblock at once. + int64_t delta; + if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException(); + delta += min_delta_; + last_value_ += static_cast(delta); + buffer[i] = last_value_; + --values_current_mini_block_; + } + this->num_values_ -= max_values; + return max_values; + } + + ::arrow::MemoryPool* pool_; + ::arrow::BitUtil::BitReader decoder_; + int32_t values_current_block_; + int32_t num_mini_blocks_; + uint64_t values_per_mini_block_; + uint64_t values_current_mini_block_; + + int32_t min_delta_; + size_t mini_block_idx_; + std::shared_ptr delta_bit_widths_; + int delta_bit_width_; + + int32_t last_value_; +}; + +// ---------------------------------------------------------------------- +// DELTA_LENGTH_BYTE_ARRAY + +class DeltaLengthByteArrayDecoder : public DecoderImpl, + virtual public TypedDecoder { + public: + explicit DeltaLengthByteArrayDecoder( + const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), + len_decoder_(nullptr, pool) {} + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + if (len == 0) return; + int total_lengths_len = *reinterpret_cast(data); + data += 4; + this->len_decoder_.SetData(num_values, data, total_lengths_len); + data_ = data + total_lengths_len; + this->len_ = len - 4 - total_lengths_len; + } + + virtual int Decode(ByteArray* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + std::vector lengths(max_values); + len_decoder_.Decode(lengths.data(), max_values); + for (int i = 0; i < max_values; ++i) { + buffer[i].len = lengths[i]; + buffer[i].ptr = data_; + this->data_ += lengths[i]; + this->len_ -= lengths[i]; + } + this->num_values_ -= max_values; + return max_values; + } + + private: + DeltaBitPackDecoder len_decoder_; +}; + +// ---------------------------------------------------------------------- +// DELTA_BYTE_ARRAY + +class DeltaByteArrayDecoder : public DecoderImpl, + virtual public TypedDecoder { + public: + explicit DeltaByteArrayDecoder( + const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY), + prefix_len_decoder_(nullptr, pool), + suffix_decoder_(nullptr, pool), + last_value_(0, nullptr) {} + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + if (len == 0) return; + int prefix_len_length = *reinterpret_cast(data); + data += 4; + len -= 4; + prefix_len_decoder_.SetData(num_values, data, prefix_len_length); + data += prefix_len_length; + len -= prefix_len_length; + suffix_decoder_.SetData(num_values, data, len); + } + + // TODO: this doesn't work and requires memory management. We need to allocate + // new strings to store the results. + virtual int Decode(ByteArray* buffer, int max_values) { + max_values = std::min(max_values, this->num_values_); + for (int i = 0; i < max_values; ++i) { + int prefix_len = 0; + prefix_len_decoder_.Decode(&prefix_len, 1); + ByteArray suffix = {0, nullptr}; + suffix_decoder_.Decode(&suffix, 1); + buffer[i].len = prefix_len + suffix.len; + + uint8_t* result = reinterpret_cast(malloc(buffer[i].len)); + memcpy(result, last_value_.ptr, prefix_len); + memcpy(result + prefix_len, suffix.ptr, suffix.len); + + buffer[i].ptr = result; + last_value_ = buffer[i]; + } + this->num_values_ -= max_values; + return max_values; + } + + private: + DeltaBitPackDecoder prefix_len_decoder_; + DeltaLengthByteArrayDecoder suffix_decoder_; + ByteArray last_value_; +}; + +// ---------------------------------------------------------------------- + +std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encoding, + const ColumnDescriptor* descr) { + if (encoding == Encoding::PLAIN) { + switch (type_num) { + case Type::BOOLEAN: + return std::unique_ptr(new PlainBooleanDecoder(descr)); + case Type::INT32: + return std::unique_ptr(new PlainDecoder(descr)); + case Type::INT64: + return std::unique_ptr(new PlainDecoder(descr)); + case Type::INT96: + return std::unique_ptr(new PlainDecoder(descr)); + case Type::FLOAT: + return std::unique_ptr(new PlainDecoder(descr)); + case Type::DOUBLE: + return std::unique_ptr(new PlainDecoder(descr)); + case Type::BYTE_ARRAY: + return std::unique_ptr(new PlainByteArrayDecoder(descr)); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::unique_ptr(new PlainFLBADecoder(descr)); + default: + break; + } + } else { + ParquetException::NYI("Selected encoding is not supported"); + } + DCHECK(false) << "Should not be able to reach this code"; + return nullptr; +} + +namespace detail { + +std::unique_ptr MakeDictDecoder(Type::type type_num, + const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool) { + switch (type_num) { + case Type::BOOLEAN: + ParquetException::NYI("Dictionary encoding not implemented for boolean type"); + case Type::INT32: + return std::unique_ptr(new DictDecoderImpl(descr, pool)); + case Type::INT64: + return std::unique_ptr(new DictDecoderImpl(descr, pool)); + case Type::INT96: + return std::unique_ptr(new DictDecoderImpl(descr, pool)); + case Type::FLOAT: + return std::unique_ptr(new DictDecoderImpl(descr, pool)); + case Type::DOUBLE: + return std::unique_ptr(new DictDecoderImpl(descr, pool)); + case Type::BYTE_ARRAY: + return std::unique_ptr(new DictByteArrayDecoder(descr, pool)); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::unique_ptr(new DictFLBADecoder(descr, pool)); + default: + break; + } + DCHECK(false) << "Should not be able to reach this code"; + return nullptr; +} + +} // namespace detail + +} // namespace parquet diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index ceda7369dd2f1..5eb5016544eda 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -15,50 +15,66 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_ENCODING_H -#define PARQUET_ENCODING_H +#pragma once #include +#include #include -#include +#include -#include "arrow/status.h" +#include "arrow/buffer.h" +#include "arrow/memory_pool.h" #include "arrow/util/bit-util.h" +#include "arrow/util/macros.h" #include "parquet/exception.h" -#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" +#include "parquet/util/visibility.h" + +namespace arrow { + +class BinaryDictionaryBuilder; + +namespace internal { + +class ChunkedBinaryBuilder; + +} // namespace internal +} // namespace arrow namespace parquet { class ColumnDescriptor; +// Untyped base for all encoders +class Encoder { + public: + virtual ~Encoder() = default; + + virtual int64_t EstimatedDataEncodedSize() = 0; + virtual std::shared_ptr FlushValues() = 0; + virtual Encoding::type encoding() const = 0; + + virtual ::arrow::MemoryPool* memory_pool() const = 0; +}; + // Base class for value encoders. Since encoders may or not have state (e.g., // dictionary encoding) we use a class instance to maintain any state. // // TODO(wesm): Encode interface API is temporary template -class Encoder { +class TypedEncoder : virtual public Encoder { public: typedef typename DType::c_type T; - virtual ~Encoder() {} - - virtual int64_t EstimatedDataEncodedSize() = 0; - virtual std::shared_ptr FlushValues() = 0; virtual void Put(const T* src, int num_values) = 0; + virtual void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) { std::shared_ptr buffer; - auto status = - ::arrow::AllocateResizableBuffer(pool_, num_values * sizeof(T), &buffer); - if (!status.ok()) { - std::ostringstream ss; - ss << "AllocateResizableBuffer failed in Encoder.PutSpaced in " << __FILE__ - << ", on line " << __LINE__; - throw ParquetException(ss.str()); - } + PARQUET_THROW_NOT_OK(::arrow::AllocateResizableBuffer( + this->memory_pool(), num_values * sizeof(T), &buffer)); int32_t num_valid_values = 0; ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset, num_values); @@ -71,32 +87,53 @@ class Encoder { } Put(data, num_valid_values); } +}; + +// Base class for dictionary encoders +template +class DictEncoder : virtual public TypedEncoder { + public: + /// Writes out any buffered indices to buffer preceded by the bit width of this data. + /// Returns the number of bytes written. + /// If the supplied buffer is not big enough, returns -1. + /// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize() + /// to size buffer. + virtual int WriteIndices(uint8_t* buffer, int buffer_len) = 0; + + virtual int dict_encoded_size() = 0; + // virtual int dict_encoded_size() { return dict_encoded_size_; } - Encoding::type encoding() const { return encoding_; } + virtual int bit_width() const = 0; - protected: - explicit Encoder(const ColumnDescriptor* descr, Encoding::type encoding, - ::arrow::MemoryPool* pool) - : descr_(descr), encoding_(encoding), pool_(pool) {} + /// Writes out the encoded dictionary to buffer. buffer must be preallocated to + /// dict_encoded_size() bytes. + virtual void WriteDict(uint8_t* buffer) = 0; - // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY - const ColumnDescriptor* descr_; - const Encoding::type encoding_; - ::arrow::MemoryPool* pool_; + virtual int num_entries() const = 0; }; -// The Decoder template is parameterized on parquet::DataType subclasses -template +// ---------------------------------------------------------------------- +// Value decoding + class Decoder { public: - typedef typename DType::c_type T; - - virtual ~Decoder() {} + virtual ~Decoder() = default; // Sets the data for a new page. This will be called multiple times on the same // decoder and should reset all internal state. virtual void SetData(int num_values, const uint8_t* data, int len) = 0; + // Returns the number of values left (for the last call to SetData()). This is + // the number of values left in this page. + virtual int values_left() const = 0; + virtual Encoding::type encoding() const = 0; +}; + +template +class TypedDecoder : virtual public Decoder { + public: + using T = typename DType::c_type; + // Subclasses should override the ones they support. In each of these functions, // the decoder would decode put to 'max_values', storing the result in 'buffer'. // The function returns the number of values decoded, which should be max_values @@ -130,24 +167,170 @@ class Decoder { } return num_values; } +}; - // Returns the number of values left (for the last call to SetData()). This is - // the number of values left in this page. - int values_left() const { return num_values_; } +template +class DictDecoder : virtual public TypedDecoder { + public: + virtual void SetDict(TypedDecoder* dictionary) = 0; +}; + +// ---------------------------------------------------------------------- +// TypedEncoder specializations, traits, and factory functions - Encoding::type encoding() const { return encoding_; } +class BooleanEncoder : virtual public TypedEncoder { + public: + using TypedEncoder::Put; + virtual void Put(const std::vector& src, int num_values) = 0; +}; + +using Int32Encoder = TypedEncoder; +using Int64Encoder = TypedEncoder; +using Int96Encoder = TypedEncoder; +using FloatEncoder = TypedEncoder; +using DoubleEncoder = TypedEncoder; +class ByteArrayEncoder : virtual public TypedEncoder {}; +class FLBAEncoder : virtual public TypedEncoder {}; - protected: - explicit Decoder(const ColumnDescriptor* descr, Encoding::type encoding) - : descr_(descr), encoding_(encoding), num_values_(0) {} +class BooleanDecoder : virtual public TypedDecoder { + public: + using TypedDecoder::Decode; + virtual int Decode(uint8_t* buffer, int max_values) = 0; +}; - // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY - const ColumnDescriptor* descr_; +using Int32Decoder = TypedDecoder; +using Int64Decoder = TypedDecoder; +using Int96Decoder = TypedDecoder; +using FloatDecoder = TypedDecoder; +using DoubleDecoder = TypedDecoder; - const Encoding::type encoding_; - int num_values_; +class ByteArrayDecoder : virtual public TypedDecoder { + public: + using TypedDecoder::DecodeSpaced; + virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + ::arrow::internal::ChunkedBinaryBuilder* builder) = 0; + + virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + ::arrow::BinaryDictionaryBuilder* builder) = 0; + + virtual int DecodeArrowNonNull(int num_values, + ::arrow::internal::ChunkedBinaryBuilder* builder) = 0; + + // virtual int DecodeArrowDict(int num_values, int null_count, + // const uint8_t* valid_bits, int64_t valid_bits_offset, + // ::arrow::BinaryDictionaryBuilder* builder) = 0; }; -} // namespace parquet +class FLBADecoder : virtual public TypedDecoder { + public: + using TypedDecoder::DecodeSpaced; + // virtual int DecodeArrow(int num_values, int null_count, + // const uint8_t* valid_bits, int64_t valid_bits_offset, + // ::arrow::FixedSizeBinaryBuilder* builder) = 0; + + // virtual int DecodeArrowDict(int num_values, int null_count, + // const uint8_t* valid_bits, int64_t valid_bits_offset, + // ::arrow::BinaryDictionaryBuilder* builder) = 0; +}; + +template +struct EncodingTraits {}; + +template <> +struct EncodingTraits { + using Encoder = BooleanEncoder; + using Decoder = BooleanDecoder; +}; + +template <> +struct EncodingTraits { + using Encoder = Int32Encoder; + using Decoder = Int32Decoder; +}; + +template <> +struct EncodingTraits { + using Encoder = Int64Encoder; + using Decoder = Int64Decoder; +}; + +template <> +struct EncodingTraits { + using Encoder = Int96Encoder; + using Decoder = Int96Decoder; +}; + +template <> +struct EncodingTraits { + using Encoder = FloatEncoder; + using Decoder = FloatDecoder; +}; -#endif // PARQUET_ENCODING_H +template <> +struct EncodingTraits { + using Encoder = DoubleEncoder; + using Decoder = DoubleDecoder; +}; + +template <> +struct EncodingTraits { + using Encoder = ByteArrayEncoder; + using Decoder = ByteArrayDecoder; +}; + +template <> +struct EncodingTraits { + using Encoder = FLBAEncoder; + using Decoder = FLBADecoder; +}; + +PARQUET_EXPORT +std::unique_ptr MakeEncoder( + Type::type type_num, Encoding::type encoding, bool use_dictionary = false, + const ColumnDescriptor* descr = NULLPTR, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + +template +std::unique_ptr::Encoder> MakeTypedEncoder( + Encoding::type encoding, bool use_dictionary = false, + const ColumnDescriptor* descr = NULLPTR, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) { + using OutType = typename EncodingTraits::Encoder; + std::unique_ptr base = + MakeEncoder(DType::type_num, encoding, use_dictionary, descr, pool); + return std::unique_ptr(dynamic_cast(base.release())); +} + +PARQUET_EXPORT +std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encoding, + const ColumnDescriptor* descr = NULLPTR); + +namespace detail { + +PARQUET_EXPORT +std::unique_ptr MakeDictDecoder(Type::type type_num, + const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool); + +} // namespace detail + +template +std::unique_ptr> MakeDictDecoder( + const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) { + using OutType = DictDecoder; + auto decoder = detail::MakeDictDecoder(DType::type_num, descr, pool); + return std::unique_ptr(dynamic_cast(decoder.release())); +} + +template +std::unique_ptr::Decoder> MakeTypedDecoder( + Encoding::type encoding, const ColumnDescriptor* descr = NULLPTR) { + using OutType = typename EncodingTraits::Decoder; + std::unique_ptr base = MakeDecoder(DType::type_num, encoding, descr); + return std::unique_ptr(dynamic_cast(base.release())); +} + +} // namespace parquet diff --git a/cpp/src/parquet/file-deserialize-test.cc b/cpp/src/parquet/file-deserialize-test.cc index 4db338b4bcb54..e62968e5d5dc9 100644 --- a/cpp/src/parquet/file-deserialize-test.cc +++ b/cpp/src/parquet/file-deserialize-test.cc @@ -22,6 +22,7 @@ #include #include "parquet/column_page.h" +#include "parquet/column_reader.h" #include "parquet/exception.h" #include "parquet/file_reader.h" #include "parquet/thrift.h" diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 5be1a86234f3b..0f8e35904c606 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -19,23 +19,22 @@ #include #include -#include +#include #include -#include #include #include -#include +#include "arrow/buffer.h" #include "arrow/io/file.h" +#include "arrow/status.h" #include "arrow/util/logging.h" -#include "parquet/column_page.h" #include "parquet/column_reader.h" #include "parquet/column_scanner.h" #include "parquet/exception.h" #include "parquet/metadata.h" #include "parquet/properties.h" -#include "parquet/thrift.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 4730305c93131..03992300c8c84 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -19,24 +19,23 @@ #define PARQUET_FILE_READER_H #include -#include -#include #include #include #include -#include "parquet/column_reader.h" -#include "parquet/metadata.h" +#include "arrow/io/interfaces.h" +#include "arrow/util/macros.h" + #include "parquet/properties.h" -#include "parquet/schema.h" -#include "parquet/statistics.h" -#include "parquet/util/macros.h" -#include "parquet/util/memory.h" #include "parquet/util/visibility.h" namespace parquet { class ColumnReader; +class FileMetaData; +class PageReader; +class RandomAccessSource; +class RowGroupMetaData; class PARQUET_EXPORT RowGroupReader { public: diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index 01fa112fe37ef..51f0cb43b7eea 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -21,15 +21,12 @@ #include #include "parquet/column_writer.h" -#include "parquet/schema-internal.h" #include "parquet/schema.h" -#include "parquet/thrift.h" #include "parquet/util/memory.h" using arrow::MemoryPool; using parquet::schema::GroupNode; -using parquet::schema::SchemaFlattener; namespace parquet { @@ -251,6 +248,9 @@ class FileSerializer : public ParquetFileWriter::Contents { void Close() override { if (is_open_) { + // If any functions here raise an exception, we set is_open_ to be false + // so that this does not get called again (possibly causing segfault) + is_open_ = false; if (row_group_writer_) { num_rows_ += row_group_writer_->num_rows(); row_group_writer_->Close(); @@ -262,7 +262,6 @@ class FileSerializer : public ParquetFileWriter::Contents { WriteFileMetaData(*metadata, sink_.get()); sink_->Close(); - is_open_ = false; } } diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index 82703f82dc899..860500f3bfe14 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -20,25 +20,31 @@ #include #include +#include +#include "arrow/util/macros.h" + +#include "parquet/exception.h" #include "parquet/metadata.h" #include "parquet/properties.h" #include "parquet/schema.h" -#include "parquet/util/macros.h" -#include "parquet/util/memory.h" #include "parquet/util/visibility.h" -namespace parquet { +namespace arrow { + +class MemoryPool; + +namespace io { -class ColumnWriter; -class PageWriter; class OutputStream; -namespace schema { +} // namespace io +} // namespace arrow -class GroupNode; +namespace parquet { -} // namespace schema +class ColumnWriter; +class OutputStream; class PARQUET_EXPORT RowGroupWriter { public: diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 1641afb869036..93c2073e898ba 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -16,22 +16,25 @@ // under the License. #include +#include #include #include -#include + +#include "arrow/util/logging.h" #include "parquet/exception.h" #include "parquet/metadata.h" #include "parquet/schema-internal.h" #include "parquet/schema.h" +#include "parquet/statistics.h" #include "parquet/thrift.h" -#include "parquet/util/memory.h" -#include -#include +#include // IWYU pragma: keep namespace parquet { +class OutputStream; + const ApplicationVersion& ApplicationVersion::PARQUET_251_FIXED_VERSION() { static ApplicationVersion version("parquet-mr", 1, 8, 0); return version; diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 317145826ddb3..4ccf14be1fdd5 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -18,23 +18,32 @@ #ifndef PARQUET_FILE_METADATA_H #define PARQUET_FILE_METADATA_H +#include #include -#include #include #include #include "arrow/util/key_value_metadata.h" +#include "arrow/util/macros.h" #include "parquet/properties.h" -#include "parquet/schema.h" -#include "parquet/statistics.h" #include "parquet/types.h" -#include "parquet/util/macros.h" -#include "parquet/util/memory.h" #include "parquet/util/visibility.h" namespace parquet { +class ColumnDescriptor; +class EncodedStatistics; +class OutputStream; +class RowGroupStatistics; +class SchemaDescriptor; + +namespace schema { + +class ColumnPath; + +} // namespace schema + using KeyValueMetadata = ::arrow::KeyValueMetadata; class PARQUET_EXPORT ApplicationVersion { diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc index 5be8d9d96467c..61d669bcb34d6 100644 --- a/cpp/src/parquet/printer.cc +++ b/cpp/src/parquet/printer.cc @@ -17,15 +17,30 @@ #include "parquet/printer.h" +#include +#include +#include +#include #include #include +#include "arrow/util/key_value_metadata.h" + #include "parquet/column_scanner.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/metadata.h" +#include "parquet/schema.h" +#include "parquet/statistics.h" +#include "parquet/types.h" using std::string; using std::vector; namespace parquet { + +class ColumnReader; + // ---------------------------------------------------------------------- // ParquetFilePrinter::DebugPrint diff --git a/cpp/src/parquet/printer.h b/cpp/src/parquet/printer.h index 1113c3fecd25b..4591e7abad058 100644 --- a/cpp/src/parquet/printer.h +++ b/cpp/src/parquet/printer.h @@ -18,17 +18,15 @@ #ifndef PARQUET_FILE_PRINTER_H #define PARQUET_FILE_PRINTER_H -#include #include #include -#include -#include -#include -#include "parquet/file_reader.h" +#include "parquet/util/visibility.h" namespace parquet { +class ParquetFileReader; + class PARQUET_EXPORT ParquetFilePrinter { private: ParquetFileReader* fileReader; diff --git a/cpp/src/parquet/reader-test.cc b/cpp/src/parquet/reader-test.cc index d628f4727c160..a0536b56a89ca 100644 --- a/cpp/src/parquet/reader-test.cc +++ b/cpp/src/parquet/reader-test.cc @@ -28,6 +28,7 @@ #include "parquet/column_reader.h" #include "parquet/column_scanner.h" #include "parquet/file_reader.h" +#include "parquet/metadata.h" #include "parquet/printer.h" #include "parquet/util/memory.h" #include "parquet/util/test-common.h" diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc index da004344f2016..431f30773b96d 100644 --- a/cpp/src/parquet/schema.cc +++ b/cpp/src/parquet/schema.cc @@ -19,11 +19,13 @@ #include "parquet/schema-internal.h" #include +#include #include -#include #include #include +#include "arrow/util/logging.h" + #include "parquet/exception.h" #include "parquet/thrift.h" diff --git a/cpp/src/parquet/schema.h b/cpp/src/parquet/schema.h index add2f6dbab013..76920c0e93b57 100644 --- a/cpp/src/parquet/schema.h +++ b/cpp/src/parquet/schema.h @@ -28,6 +28,8 @@ #include #include +#include "arrow/util/macros.h" + #include "parquet/types.h" #include "parquet/util/macros.h" #include "parquet/util/visibility.h" @@ -144,9 +146,7 @@ class PARQUET_EXPORT Node { const std::shared_ptr path() const; - // ToParquet returns an opaque void* to avoid exporting - // parquet::SchemaElement into the public API - virtual void ToParquet(void* opaque_element) const = 0; + virtual void ToParquet(void* element) const = 0; // Node::Visitor abstract class for walking schemas with the visitor pattern class Visitor { @@ -193,8 +193,6 @@ typedef std::vector NodeVector; // parameters) class PARQUET_EXPORT PrimitiveNode : public Node { public: - // FromParquet accepts an opaque void* to avoid exporting - // parquet::SchemaElement into the public API static std::unique_ptr FromParquet(const void* opaque_element, int id); static inline NodePtr Make(const std::string& name, Repetition::type repetition, @@ -217,7 +215,7 @@ class PARQUET_EXPORT PrimitiveNode : public Node { const DecimalMetadata& decimal_metadata() const { return decimal_metadata_; } - void ToParquet(void* opaque_element) const override; + void ToParquet(void* element) const override; void Visit(Visitor* visitor) override; void VisitConst(ConstVisitor* visitor) const override; @@ -250,8 +248,6 @@ class PARQUET_EXPORT PrimitiveNode : public Node { class PARQUET_EXPORT GroupNode : public Node { public: - // Like PrimitiveNode, GroupNode::FromParquet accepts an opaque void* to avoid exporting - // parquet::SchemaElement into the public API static std::unique_ptr FromParquet(const void* opaque_element, int id, const NodeVector& fields); @@ -273,7 +269,7 @@ class PARQUET_EXPORT GroupNode : public Node { int field_count() const { return static_cast(fields_.size()); } - void ToParquet(void* opaque_element) const override; + void ToParquet(void* element) const override; void Visit(Visitor* visitor) override; void VisitConst(ConstVisitor* visitor) const override; diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc index ed4e8d05592e4..4cb2bfd92131d 100644 --- a/cpp/src/parquet/statistics.cc +++ b/cpp/src/parquet/statistics.cc @@ -16,10 +16,13 @@ // under the License. #include +#include #include #include -#include "parquet/encoding-internal.h" +#include "arrow/util/logging.h" + +#include "parquet/encoding.h" #include "parquet/exception.h" #include "parquet/statistics.h" #include "parquet/util/memory.h" @@ -296,19 +299,19 @@ EncodedStatistics TypedRowGroupStatistics::Encode() { template void TypedRowGroupStatistics::PlainEncode(const T& src, std::string* dst) { - PlainEncoder encoder(descr(), pool_); - encoder.Put(&src, 1); - auto buffer = encoder.FlushValues(); + auto encoder = MakeTypedEncoder(Encoding::PLAIN, false, descr(), pool_); + encoder->Put(&src, 1); + auto buffer = encoder->FlushValues(); auto ptr = reinterpret_cast(buffer->data()); dst->assign(ptr, buffer->size()); } template void TypedRowGroupStatistics::PlainDecode(const std::string& src, T* dst) { - PlainDecoder decoder(descr()); - decoder.SetData(1, reinterpret_cast(src.c_str()), - static_cast(src.size())); - decoder.Decode(dst, 1); + auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr()); + decoder->SetData(1, reinterpret_cast(src.c_str()), + static_cast(src.size())); + decoder->Decode(dst, 1); } template <> diff --git a/cpp/src/parquet/test-util.h b/cpp/src/parquet/test-util.h index 647d80460d5d3..ed7c7bb901621 100644 --- a/cpp/src/parquet/test-util.h +++ b/cpp/src/parquet/test-util.h @@ -33,7 +33,7 @@ #include "parquet/column_page.h" #include "parquet/column_reader.h" #include "parquet/column_writer.h" -#include "parquet/encoding-internal.h" +#include "parquet/encoding.h" #include "parquet/util/memory.h" #include "parquet/util/test-common.h" @@ -50,6 +50,15 @@ bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) { namespace test { +template +std::shared_ptr EncodeValues(Encoding::type encoding, bool use_dictionary, + const Sequence& values, int length, + const ColumnDescriptor* descr) { + auto encoder = MakeTypedEncoder(encoding, use_dictionary, descr); + encoder->Put(values, length); + return encoder->FlushValues(); +} + template static void InitValues(int num_values, vector& values, vector& buffer) { random_numbers(num_values, 0, std::numeric_limits::min(), @@ -133,9 +142,8 @@ class DataPageBuilder { void AppendValues(const ColumnDescriptor* d, const vector& values, Encoding::type encoding = Encoding::PLAIN) { - PlainEncoder encoder(d); - encoder.Put(&values[0], static_cast(values.size())); - std::shared_ptr values_sink = encoder.FlushValues(); + std::shared_ptr values_sink = EncodeValues( + encoding, false, values.data(), static_cast(values.size()), d); sink_->Write(values_sink->data(), values_sink->size()); num_values_ = std::max(static_cast(values.size()), num_values_); @@ -195,9 +203,11 @@ void DataPageBuilder::AppendValues(const ColumnDescriptor* d, if (encoding != Encoding::PLAIN) { ParquetException::NYI("only plain encoding currently implemented"); } - PlainEncoder encoder(d); - encoder.Put(values, static_cast(values.size())); - std::shared_ptr buffer = encoder.FlushValues(); + + auto encoder = MakeTypedEncoder(Encoding::PLAIN, false, d); + dynamic_cast(encoder.get()) + ->Put(values, static_cast(values.size())); + std::shared_ptr buffer = encoder->FlushValues(); sink_->Write(buffer->data(), buffer->size()); num_values_ = std::max(static_cast(values.size()), num_values_); @@ -243,11 +253,14 @@ class DictionaryPageBuilder { public: typedef typename TYPE::c_type TC; static constexpr int TN = TYPE::type_num; + using SpecializedEncoder = typename EncodingTraits::Encoder; // This class writes data and metadata to the passed inputs explicit DictionaryPageBuilder(const ColumnDescriptor* d) : num_dict_values_(0), have_values_(false) { - encoder_.reset(new DictEncoder(d)); + auto encoder = MakeTypedEncoder(Encoding::PLAIN, true, d); + dict_traits_ = dynamic_cast*>(encoder.get()); + encoder_.reset(dynamic_cast(encoder.release())); } ~DictionaryPageBuilder() {} @@ -256,22 +269,23 @@ class DictionaryPageBuilder { int num_values = static_cast(values.size()); // Dictionary encoding encoder_->Put(values.data(), num_values); - num_dict_values_ = encoder_->num_entries(); + num_dict_values_ = dict_traits_->num_entries(); have_values_ = true; return encoder_->FlushValues(); } shared_ptr WriteDict() { - std::shared_ptr dict_buffer = - AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size()); - encoder_->WriteDict(dict_buffer->mutable_data()); - return std::move(dict_buffer); + std::shared_ptr dict_buffer = + AllocateBuffer(::arrow::default_memory_pool(), dict_traits_->dict_encoded_size()); + dict_traits_->WriteDict(dict_buffer->mutable_data()); + return dict_buffer; } int32_t num_values() const { return num_dict_values_; } private: - shared_ptr> encoder_; + DictEncoder* dict_traits_; + std::unique_ptr encoder_; int32_t num_dict_values_; bool have_values_; }; diff --git a/cpp/src/parquet/util/visibility.h b/cpp/src/parquet/util/visibility.h index 929d3b22c8851..d731bad6ae47f 100644 --- a/cpp/src/parquet/util/visibility.h +++ b/cpp/src/parquet/util/visibility.h @@ -19,7 +19,8 @@ #define PARQUET_UTIL_VISIBILITY_H #if defined(_WIN32) || defined(__CYGWIN__) -#ifdef _MSC_VER + +#if defined(_MSC_VER) #pragma warning(push) // Disable warning for STL types usage in DLL interface // https://web.archive.org/web/20130317015847/http://connect.microsoft.com/VisualStudio/feedback/details/696593/vc-10-vs-2010-basic-string-exports @@ -30,9 +31,20 @@ #pragma warning(disable : 4005) // Disable extern before exported template warnings #pragma warning(disable : 4910) +#else +#pragma GCC diagnostic ignored "-Wattributes" #endif + +#ifdef PARQUET_STATIC +#define PARQUET_EXPORT +#elif defined(PARQUET_EXPORTING) #define PARQUET_EXPORT __declspec(dllexport) +#else +#define PARQUET_EXPORT __declspec(dllimport) +#endif + #define PARQUET_NO_EXPORT + #else // Not Windows #ifndef PARQUET_EXPORT #define PARQUET_EXPORT __attribute__((visibility("default"))) From daac8a686f1be83c05cec95e813c9d701235cd6a Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 27 Jan 2019 16:36:50 -0600 Subject: [PATCH 2/7] Delete a couple commented-out methods, add code comments about unimplemented DecodeArrowNonNull method for DictionaryBuilder Change-Id: Id75eed26fd057aaf3f7ba6d8664c240a33a6cbd7 --- cpp/src/parquet/encoding.h | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 5eb5016544eda..37ebb13f008ec 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -218,21 +218,20 @@ class ByteArrayDecoder : virtual public TypedDecoder { virtual int DecodeArrowNonNull(int num_values, ::arrow::internal::ChunkedBinaryBuilder* builder) = 0; - // virtual int DecodeArrowDict(int num_values, int null_count, - // const uint8_t* valid_bits, int64_t valid_bits_offset, - // ::arrow::BinaryDictionaryBuilder* builder) = 0; + // TODO(wesm): Implement this method as part of ARROW-3325 + // See also ARROW-3772, ARROW-3769 + // virtual int DecodeArrowNonNull(int num_values, + // ::arrow::internal::BinaryDictionaryBuilder* builder) = 0; }; class FLBADecoder : virtual public TypedDecoder { public: using TypedDecoder::DecodeSpaced; - // virtual int DecodeArrow(int num_values, int null_count, - // const uint8_t* valid_bits, int64_t valid_bits_offset, - // ::arrow::FixedSizeBinaryBuilder* builder) = 0; - // virtual int DecodeArrowDict(int num_values, int null_count, - // const uint8_t* valid_bits, int64_t valid_bits_offset, - // ::arrow::BinaryDictionaryBuilder* builder) = 0; + // TODO(wesm): As possible follow-up to PARQUET-1508, we should examine if + // there is value in adding specialized read methods for + // FIXED_LEN_BYTE_ARRAY. If only Decimal data can occur with this data type + // then perhaps not }; template From 06e2c231598b93f04c28d5f267a48582cb9e9d20 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 27 Jan 2019 16:38:02 -0600 Subject: [PATCH 3/7] lint Change-Id: Icd4fdd9c9f6358a76e25d0eeb994773a707e6e3b --- cpp/src/parquet/encoding.h | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 37ebb13f008ec..046296cdb1445 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -215,13 +215,10 @@ class ByteArrayDecoder : virtual public TypedDecoder { int64_t valid_bits_offset, ::arrow::BinaryDictionaryBuilder* builder) = 0; + // TODO(wesm): Implement DecodeArrowNonNull as part of ARROW-3325 + // See also ARROW-3772, ARROW-3769 virtual int DecodeArrowNonNull(int num_values, ::arrow::internal::ChunkedBinaryBuilder* builder) = 0; - - // TODO(wesm): Implement this method as part of ARROW-3325 - // See also ARROW-3772, ARROW-3769 - // virtual int DecodeArrowNonNull(int num_values, - // ::arrow::internal::BinaryDictionaryBuilder* builder) = 0; }; class FLBADecoder : virtual public TypedDecoder { From c4fcf7479a0aaaf180a5b4d2d47c90689a764d20 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 27 Jan 2019 17:25:51 -0600 Subject: [PATCH 4/7] verbose makefile --- ci/cpp-msvc-build-main.bat | 1 + cpp/src/parquet/file_reader.h | 1 + 2 files changed, 2 insertions(+) diff --git a/ci/cpp-msvc-build-main.bat b/ci/cpp-msvc-build-main.bat index d9d7e548dd79d..0de05029481b7 100644 --- a/ci/cpp-msvc-build-main.bat +++ b/ci/cpp-msvc-build-main.bat @@ -44,6 +44,7 @@ mkdir cpp\build pushd cpp\build cmake -G "%GENERATOR%" %CMAKE_ARGS% ^ + -DCMAKE_VERBOSE_MAKEFILE=ON ^ -DCMAKE_INSTALL_PREFIX=%CONDA_PREFIX%\Library ^ -DARROW_BOOST_USE_SHARED=OFF ^ -DCMAKE_BUILD_TYPE=%CONFIGURATION% ^ diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 03992300c8c84..2d1cc9221f377 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -26,6 +26,7 @@ #include "arrow/io/interfaces.h" #include "arrow/util/macros.h" +#include "parquet/metadata.h" // IWYU pragma:: keep #include "parquet/properties.h" #include "parquet/util/visibility.h" From 4bafc554729072fa6843843c7bfb8214b158642d Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 27 Jan 2019 17:45:40 -0600 Subject: [PATCH 5/7] Fix public compile definition on windows --- ci/cpp-msvc-build-main.bat | 2 +- cpp/src/parquet/CMakeLists.txt | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/ci/cpp-msvc-build-main.bat b/ci/cpp-msvc-build-main.bat index 0de05029481b7..c36c6bd5c53d9 100644 --- a/ci/cpp-msvc-build-main.bat +++ b/ci/cpp-msvc-build-main.bat @@ -44,7 +44,7 @@ mkdir cpp\build pushd cpp\build cmake -G "%GENERATOR%" %CMAKE_ARGS% ^ - -DCMAKE_VERBOSE_MAKEFILE=ON ^ + -DCMAKE_VERBOSE_MAKEFILE=OFF ^ -DCMAKE_INSTALL_PREFIX=%CONDA_PREFIX%\Library ^ -DARROW_BOOST_USE_SHARED=OFF ^ -DCMAKE_BUILD_TYPE=%CONFIGURATION% ^ diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 6f4a7c922f10f..e3294bdee4dbb 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -270,7 +270,10 @@ foreach(LIB_TARGET ${PARQUET_LIBRARIES}) endif() endforeach() -if (ARROW_BUILD_STATIC AND WIN32) +# We always build the Parquet static libraries (see PARQUET-1420) so we add the +# PARQUET_STATIC public compile definition if we are building the unit tests OR +# if we are building the static library +if (WIN32 AND (NOT NO_TESTS OR ARROW_BUILD_STATIC)) target_compile_definitions(parquet_static PUBLIC PARQUET_STATIC) endif() From f3fadcbe3aed7f989ce6454e010b21542adeafa2 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 29 Jan 2019 10:24:50 -0600 Subject: [PATCH 6/7] Update cpp/src/parquet/arrow/record_reader.cc Co-Authored-By: wesm --- cpp/src/parquet/arrow/record_reader.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc index 5eb1121836acd..24706d59c0f67 100644 --- a/cpp/src/parquet/arrow/record_reader.cc +++ b/cpp/src/parquet/arrow/record_reader.cc @@ -434,6 +434,7 @@ class RecordReader::RecordReaderImpl { int64_t levels_capacity_; std::shared_ptr<::arrow::ResizableBuffer> values_; + // In the case of false, don't allocate the values buffer (when we directly read into builder classes). bool uses_values_; template From df1bfc0169a18c8cd26056db7254ff8c3df53160 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 29 Jan 2019 10:26:57 -0600 Subject: [PATCH 7/7] lint Change-Id: Ic7faef17bc54e81d4975eece5d2f7d316d87f03d --- cpp/src/parquet/arrow/record_reader.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc index 24706d59c0f67..39945afc78298 100644 --- a/cpp/src/parquet/arrow/record_reader.cc +++ b/cpp/src/parquet/arrow/record_reader.cc @@ -434,7 +434,8 @@ class RecordReader::RecordReaderImpl { int64_t levels_capacity_; std::shared_ptr<::arrow::ResizableBuffer> values_; - // In the case of false, don't allocate the values buffer (when we directly read into builder classes). + // In the case of false, don't allocate the values buffer (when we directly read into + // builder classes). bool uses_values_; template