From df7b91ac273acb2f9beafc1125f0f58f5e3e4865 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Sun, 9 Oct 2022 09:11:44 +0800 Subject: [PATCH 01/15] Add code for binary/string optimization --- cpp/src/parquet/arrow/reader_internal.cc | 21 ++- cpp/src/parquet/column_reader.cc | 89 +++++++++- cpp/src/parquet/column_reader.h | 11 ++ cpp/src/parquet/encoding.cc | 203 +++++++++++++++++++++++ cpp/src/parquet/encoding.h | 8 + 5 files changed, 321 insertions(+), 11 deletions(-) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index e428c206bfc..7b669b843e3 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -357,6 +357,17 @@ std::shared_ptr TransferZeroCopy(RecordReader* reader, return ::arrow::MakeArray(data); } +std::shared_ptr TransferBinaryZeroCopy(RecordReader* reader, + const std::shared_ptr& type) { + std::vector> buffers = {reader->ReleaseIsValid(), + reader->ReleaseOffsets(), + reader->ReleaseValues()}; + auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(), + buffers, reader->null_count()); + // std::cout << "::arrow::MakeArray(data)->ToString():" << ::arrow::MakeArray(data)->ToString() << std::endl; + return ::arrow::MakeArray(data); +} + Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool, Datum* out) { int64_t length = reader->values_written(); @@ -765,13 +776,17 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr& va case ::arrow::Type::DATE64: RETURN_NOT_OK(TransferDate64(reader, pool, value_field, &result)); break; - case ::arrow::Type::FIXED_SIZE_BINARY: + case ::arrow::Type::FIXED_SIZE_BINARY: { + RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result)); + result = chunked_result; + } break; case ::arrow::Type::BINARY: case ::arrow::Type::STRING: case ::arrow::Type::LARGE_BINARY: case ::arrow::Type::LARGE_STRING: { - RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result)); - result = chunked_result; + result = TransferBinaryZeroCopy(reader, value_field->type()); + // RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result)); + // result = chunked_result; } break; case ::arrow::Type::DECIMAL128: { switch (descr->physical_type()) { diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index d2944add163..ca6c38d791d 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1697,6 +1697,8 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); + values_ = AllocateBuffer(pool); + offset_ = AllocateBuffer(pool); } ::arrow::ArrayVector GetBuilderChunks() override { @@ -1711,23 +1713,94 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, } void ReadValuesDense(int64_t values_to_read) override { - int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( - static_cast(values_to_read), &accumulator_); - CheckNumberDecoded(num_decoded, values_to_read); - ResetValues(); + int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( + static_cast(values_to_read), 0, + NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), + values_, 0, &bianry_length_); + DCHECK_EQ(num_decoded, values_to_read); } void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - int64_t num_decoded = this->current_decoder_->DecodeArrow( + int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( static_cast(values_to_read), static_cast(null_count), - valid_bits_->mutable_data(), values_written_, &accumulator_); - CheckNumberDecoded(num_decoded, values_to_read - null_count); - ResetValues(); + valid_bits_->mutable_data(), (reinterpret_cast(offset_->mutable_data()) + values_written_), + values_, values_written_, &bianry_length_); + DCHECK_EQ(num_decoded, values_to_read - null_count); + } + + void ReserveValues(int64_t extra_values) { + const int64_t new_values_capacity = + UpdateCapacity(values_capacity_, values_written_, extra_values); + if (new_values_capacity > values_capacity_) { + PARQUET_THROW_NOT_OK( + values_->Resize(new_values_capacity * binary_per_row_length_, false)); + PARQUET_THROW_NOT_OK( + offset_->Resize((new_values_capacity+1) * 4, false)); + + auto offset = reinterpret_cast(offset_->mutable_data()); + offset[0] = 0; + + values_capacity_ = new_values_capacity; + } + if (leaf_info_.HasNullableValues()) { + int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_); + if (valid_bits_->size() < valid_bytes_new) { + int64_t valid_bytes_old = bit_util::BytesForBits(values_written_); + PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); + + // Avoid valgrind warnings + memset(valid_bits_->mutable_data() + valid_bytes_old, 0, + valid_bytes_new - valid_bytes_old); + } + } + } + + std::shared_ptr ReleaseValues() override { + auto result = values_; + values_ = AllocateBuffer(this->pool_); + values_capacity_ = 0; + return result; + } + + std::shared_ptr ReleaseOffsets() { + auto result = offset_; + + if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { + auto offsetArr = reinterpret_cast(offset_->mutable_data()); + const auto first_offset = offsetArr[0]; + const auto last_offset = offsetArr[values_written_]; + int64_t binary_length = last_offset - first_offset; + binary_per_row_length_ = binary_length / values_written_ + 1; + // std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; + hasCal_average_len_ = true; + } + + offset_ = AllocateBuffer(this->pool_); + bianry_length_ = 0; + return result; + } + + void ResetValues() { + if (values_written_ > 0) { + // Resize to 0, but do not shrink to fit + PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); + PARQUET_THROW_NOT_OK(offset_->Resize(0, false)); + PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + + values_written_ = 0; + values_capacity_ = 0; + null_count_ = 0; + bianry_length_ = 0; + } } private: // Helper data structure for accumulating builder chunks typename EncodingTraits::Accumulator accumulator_; + + int32_t bianry_length_ = 0; + + std::shared_ptr<::arrow::ResizableBuffer> offset_; }; class ByteArrayDictionaryRecordReader : public TypedRecordReader, diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 4bebdba7e95..c5ce2c8cd14 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -55,6 +55,8 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; // 16 KB is the default expected page header size static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; +static constexpr int32_t kDefaultBinaryPerRowSzie = 20; + class PARQUET_EXPORT LevelDecoder { public: LevelDecoder(); @@ -283,6 +285,8 @@ class RecordReader { /// \brief Pre-allocate space for data. Results in better flat read performance virtual void Reserve(int64_t num_values) = 0; + virtual void ReserveValues(int64_t capacity) {} + /// \brief Clear consumed values and repetition/definition levels as the /// result of calling ReadRecords virtual void Reset() = 0; @@ -291,6 +295,10 @@ class RecordReader { /// allocated in subsequent ReadRecords calls virtual std::shared_ptr ReleaseValues() = 0; + virtual std::shared_ptr ReleaseOffsets() { + return nullptr; + } + /// \brief Transfer filled validity bitmap buffer to caller. A new one will /// be allocated in subsequent ReadRecords calls virtual std::shared_ptr ReleaseIsValid() = 0; @@ -352,6 +360,9 @@ class RecordReader { int64_t levels_position_; int64_t levels_capacity_; + bool hasCal_average_len_ = false; + int64_t binary_per_row_length_ = kDefaultBinaryPerRowSzie; + std::shared_ptr<::arrow::ResizableBuffer> values_; // In the case of false, don't allocate the values buffer (when we directly read into // builder classes). diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 8c0e9d98e12..39dfd95ac11 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1340,6 +1340,19 @@ class PlainByteArrayDecoder : public PlainDecoder, return result; } + int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + int32_t* bianry_length) { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, + offset, values, + valid_bits_offset, &result, bianry_length)); + + return result; + } + private: Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -1392,6 +1405,58 @@ class PlainByteArrayDecoder : public PlainDecoder, return Status::OK(); } + Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + int* out_values_decoded, + int32_t* bianry_length) { + int values_decoded = 0; + auto dst_value = values->mutable_data() + (*bianry_length); + int capacity = values->size(); + if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { + values->Resize(len_ + *bianry_length); + dst_value = values->mutable_data() + (*bianry_length); + } + + int i = 0; + RETURN_NOT_OK(VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { + if (ARROW_PREDICT_FALSE(len_ < 4)) { + ParquetException::EofException(); + } + auto value_len = ::arrow::util::SafeLoadAs(data_); + if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { + return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); + } + auto increment = value_len + 4; + if (ARROW_PREDICT_FALSE(len_ < increment)) { + ParquetException::EofException(); + } + + (*bianry_length) += value_len; + offset[i+1] = offset[i] + value_len; + memcpy(dst_value, data_ + 4, value_len); + dst_value = dst_value + value_len; + + data_ += increment; + len_ -= increment; + ++values_decoded; + ++i; + return Status::OK(); + }, + [&]() { + offset[i+1] = offset[i]; + ++i; + return Status::OK(); + })); + + num_values_ -= values_decoded; + *out_values_decoded = values_decoded; + return Status::OK(); + } + template Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, @@ -1858,6 +1923,25 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return result; } + int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + int32_t* bianry_length) { + int result = 0; + if (null_count == 0) { + PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values, + offset, values, &result, bianry_length)); + } else { + PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, + offset, values, + valid_bits_offset, &result, bianry_length)); + + } + + return result; + } + private: Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -1928,6 +2012,81 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return Status::OK(); } + Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + int* out_num_values, + int32_t* bianry_length) { + constexpr int32_t kBufferSize = 1024; + int32_t indices[kBufferSize]; + auto dst_value = values->mutable_data() + (*bianry_length); + + ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); + + auto dict_values = reinterpret_cast(dictionary_->data()); + int values_decoded = 0; + int num_appended = 0; + uint64_t capacity = values->size(); + while (num_appended < num_values) { + bool is_valid = bit_reader.IsSet(); + bit_reader.Next(); + + if (is_valid) { + int32_t batch_size = + std::min(kBufferSize, num_values - num_appended - null_count); + int num_indices = idx_decoder_.GetBatch(indices, batch_size); + + if (ARROW_PREDICT_FALSE(num_indices < 1)) { + return Status::Invalid("Invalid number of indices '", num_indices, "'"); + } + + int i = 0; + while (true) { + // Consume all indices + if (is_valid) { + auto idx = indices[i]; + // RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + auto value_len = val.len; + auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; + + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { + capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); + values->Resize(capacity); + dst_value = values->mutable_data() + (*bianry_length); + } + (*bianry_length) += value_len; + memcpy(dst_value, val.ptr, static_cast(value_len)); + dst_value = dst_value + value_len; + + + ++i; + ++values_decoded; + } else { + offset[num_appended+1] = offset[num_appended]; + --null_count; + } + ++num_appended; + if (i == num_indices) { + // Do not advance the bit_reader if we have fulfilled the decode + // request + break; + } + is_valid = bit_reader.IsSet(); + bit_reader.Next(); + } + } else { + offset[num_appended+1] = offset[num_appended]; + --null_count; + ++num_appended; + } + } + *out_num_values = values_decoded; + return Status::OK(); + } + + Status DecodeArrowDenseNonNull(int num_values, typename EncodingTraits::Accumulator* out, int* out_num_values) { @@ -1957,6 +2116,50 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return Status::OK(); } + Status DecodeArrowDenseNonNull_opt(int num_values, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int* out_num_values, + int32_t* bianry_length) { + + constexpr int32_t kBufferSize = 2048; + int32_t indices[kBufferSize]; + int values_decoded = 0; + uint64_t capacity = values->size(); + + // ArrowBinaryHelper helper(out); + auto dict_values = reinterpret_cast(dictionary_->data()); + + auto dst_value = values->mutable_data() + (*bianry_length); + int num_appended = 0; + + while (values_decoded < num_values) { + int32_t batch_size = std::min(kBufferSize, num_values - values_decoded); + int num_indices = idx_decoder_.GetBatch(indices, batch_size); + if (num_indices == 0) ParquetException::EofException(); + for (int i = 0; i < num_indices; ++i) { + auto idx = indices[i]; + // RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + auto value_len = val.len; + auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { + capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); + values->Resize(capacity); + dst_value = values->mutable_data() + (*bianry_length); + } + (*bianry_length) += value_len; + memcpy(dst_value, val.ptr, static_cast(value_len)); + dst_value = dst_value + value_len; + + num_appended++; + } + values_decoded += num_indices; + } + *out_num_values = values_decoded; + return Status::OK(); + } + template Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index c32da99793d..dde52f4d66b 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -317,6 +317,14 @@ class TypedDecoder : virtual public Decoder { int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out) = 0; + virtual int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + int32_t* bianry_length) { + return 0; + } + /// \brief Decode into an ArrayBuilder or other accumulator ignoring nulls /// /// \return number of values decoded From 6c746d0e06b4217bec048a0d85d58acd78b297c2 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Mon, 10 Oct 2022 22:24:28 +0800 Subject: [PATCH 02/15] Fix compile warning --- cpp/src/parquet/column_reader.cc | 6 +++--- cpp/src/parquet/encoding.cc | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index ca6c38d791d..4a2940bfa56 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1450,7 +1450,7 @@ class TypedRecordReader : public ColumnReaderImplBase, } } - void ReserveValues(int64_t extra_values) { + void ReserveValues(int64_t extra_values) override { const int64_t new_values_capacity = UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { @@ -1728,7 +1728,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, DCHECK_EQ(num_decoded, values_to_read - null_count); } - void ReserveValues(int64_t extra_values) { + void ReserveValues(int64_t extra_values) override { const int64_t new_values_capacity = UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { @@ -1762,7 +1762,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, return result; } - std::shared_ptr ReleaseOffsets() { + std::shared_ptr ReleaseOffsets() override { auto result = offset_; if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 39dfd95ac11..9b6273f736c 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1344,7 +1344,7 @@ class PlainByteArrayDecoder : public PlainDecoder, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - int32_t* bianry_length) { + int32_t* bianry_length) override { int result = 0; PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, offset, values, @@ -1415,7 +1415,7 @@ class PlainByteArrayDecoder : public PlainDecoder, auto dst_value = values->mutable_data() + (*bianry_length); int capacity = values->size(); if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { - values->Resize(len_ + *bianry_length); + PARQUET_THROW_NOT_OK(values->Resize(len_ + *bianry_length, false)); dst_value = values->mutable_data() + (*bianry_length); } @@ -1927,7 +1927,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer> & values, int64_t valid_bits_offset, - int32_t* bianry_length) { + int32_t* bianry_length) override { int result = 0; if (null_count == 0) { PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values, @@ -2049,11 +2049,11 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; auto value_len = val.len; - auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; + uint64_t value_offset= offset[num_appended+1] = offset[num_appended] + value_len; if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); - values->Resize(capacity); + PARQUET_THROW_NOT_OK(values->Resize(capacity, false)); dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; @@ -2142,10 +2142,10 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; auto value_len = val.len; - auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; + uint64_t value_offset= offset[num_appended+1] = offset[num_appended] + value_len; if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); - values->Resize(capacity); + PARQUET_THROW_NOT_OK(values->Resize(capacity, false)); dst_value = values->mutable_data() + (*bianry_length); } (*bianry_length) += value_len; From 1a3cf43d8f73c609516c84c80e86dcc08503913f Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 6 Dec 2022 17:22:59 +0800 Subject: [PATCH 03/15] Fix LargeString/LargeBinary UTs --- cpp/src/parquet/arrow/reader_internal.cc | 47 ++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 7b669b843e3..d6fc07990a5 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -57,6 +57,20 @@ #include "parquet/types.h" #include "parquet/windows_fixup.h" // for OPTIONAL + #include + #include +#include + +std::string getClearName(const char* name) +{ + int status = -1; + char* clear_name = abi::__cxa_demangle(name, NULL, NULL, &status); + const char* demangle_name = (status==0) ? clear_name : name; + std::string ret_val(demangle_name); + free(clear_name); + return ret_val; +} + using arrow::Array; using arrow::BooleanArray; using arrow::ChunkedArray; @@ -368,6 +382,29 @@ std::shared_ptr TransferBinaryZeroCopy(RecordReader* reader, return ::arrow::MakeArray(data); } +Status TransferLargeBinaryZeroCopy(RecordReader* reader, MemoryPool* pool, + const std::shared_ptr& type, + std::shared_ptr* out) { + std::vector> buffers = {reader->ReleaseIsValid(), + reader->ReleaseOffsets(), + reader->ReleaseValues()}; + auto data = std::make_shared<::arrow::ArrayData>(::arrow::binary(), reader->values_written(), + buffers, reader->null_count()); + // std::cout << "::arrow::MakeArray(data)->ToString():" << ::arrow::MakeArray(data)->ToString() << std::endl; + auto array = ::arrow::MakeArray(data); + ::arrow::compute::ExecContext ctx(pool); + ::arrow::compute::CastOptions cast_options; + cast_options.allow_invalid_utf8 = true; // avoid spending time validating UTF8 data + + ARROW_ASSIGN_OR_RAISE( + array, + ::arrow::compute::Cast(*array, type, cast_options, &ctx)); + + + *out = std::make_shared(std::move(::arrow::ArrayVector({array})), type); + return Status::OK(); +} + Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool, Datum* out) { int64_t length = reader->values_written(); @@ -781,13 +818,17 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr& va result = chunked_result; } break; case ::arrow::Type::BINARY: - case ::arrow::Type::STRING: - case ::arrow::Type::LARGE_BINARY: - case ::arrow::Type::LARGE_STRING: { + case ::arrow::Type::STRING: { result = TransferBinaryZeroCopy(reader, value_field->type()); // RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result)); // result = chunked_result; } break; + case ::arrow::Type::LARGE_BINARY: + case ::arrow::Type::LARGE_STRING: { + // result = TransferLargeBinaryZeroCopy(reader, pool, value_field->type()); + RETURN_NOT_OK(TransferLargeBinaryZeroCopy(reader, pool, value_field->type(), &chunked_result)); + result = chunked_result; + } break; case ::arrow::Type::DECIMAL128: { switch (descr->physical_type()) { case ::parquet::Type::INT32: { From 5c8a2531442173269f48081dec9f8d36bd1e783f Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 6 Dec 2022 18:19:46 +0800 Subject: [PATCH 04/15] Fix Decimal UTs --- cpp/src/parquet/column_reader.cc | 46 +++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 4a2940bfa56..1eea2e20517 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1697,6 +1697,47 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); + } + + ::arrow::ArrayVector GetBuilderChunks() override { + ::arrow::ArrayVector result = accumulator_.chunks; + if (result.size() == 0 || accumulator_.builder->length() > 0) { + std::shared_ptr<::arrow::Array> last_chunk; + PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); + result.push_back(std::move(last_chunk)); + } + accumulator_.chunks = {}; + return result; + } + + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( + static_cast(values_to_read), &accumulator_); + CheckNumberDecoded(num_decoded, values_to_read); + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = this->current_decoder_->DecodeArrow( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), values_written_, &accumulator_); + CheckNumberDecoded(num_decoded, values_to_read - null_count); + ResetValues(); + } + + private: + // Helper data structure for accumulating builder chunks + typename EncodingTraits::Accumulator accumulator_; +}; + +class ByteArrayChunkedOptRecordReader : public TypedRecordReader, + virtual public BinaryRecordReader { + public: + ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, leaf_info, pool) { + DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); + accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); values_ = AllocateBuffer(pool); offset_ = AllocateBuffer(pool); } @@ -1900,9 +1941,12 @@ std::shared_ptr MakeByteArrayRecordReader(const ColumnDescriptor* bool read_dictionary) { if (read_dictionary) { return std::make_shared(descr, leaf_info, pool); - } else { + } else if(descr->logical_type()->is_decimal()) { return std::make_shared(descr, leaf_info, pool); } + else { + return std::make_shared(descr, leaf_info, pool); + } } } // namespace From 3f34ecdbcb9f755abf08981365c6ee5a30eb0bb6 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 7 Dec 2022 00:30:45 +0800 Subject: [PATCH 05/15] Fix NestedRequiredOuterOptional UTs --- cpp/src/parquet/arrow/reader_internal.cc | 49 ++++++++++-------------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index d6fc07990a5..74aaf788cc7 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -57,20 +57,6 @@ #include "parquet/types.h" #include "parquet/windows_fixup.h" // for OPTIONAL - #include - #include -#include - -std::string getClearName(const char* name) -{ - int status = -1; - char* clear_name = abi::__cxa_demangle(name, NULL, NULL, &status); - const char* demangle_name = (status==0) ? clear_name : name; - std::string ret_val(demangle_name); - free(clear_name); - return ret_val; -} - using arrow::Array; using arrow::BooleanArray; using arrow::ChunkedArray; @@ -371,26 +357,30 @@ std::shared_ptr TransferZeroCopy(RecordReader* reader, return ::arrow::MakeArray(data); } -std::shared_ptr TransferBinaryZeroCopy(RecordReader* reader, - const std::shared_ptr& type) { +Status TransferBinaryZeroCopy(RecordReader* reader, + const std::shared_ptr& logical_type_field, + std::shared_ptr* out) { std::vector> buffers = {reader->ReleaseIsValid(), reader->ReleaseOffsets(), reader->ReleaseValues()}; - auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(), + auto data = std::make_shared<::arrow::ArrayData>(logical_type_field->type(), reader->values_written(), buffers, reader->null_count()); - // std::cout << "::arrow::MakeArray(data)->ToString():" << ::arrow::MakeArray(data)->ToString() << std::endl; - return ::arrow::MakeArray(data); + auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)}); + if (!logical_type_field->nullable()) { + ReconstructChunksWithoutNulls(&chunks); + } + *out = std::make_shared(std::move(chunks), logical_type_field->type()); + return Status::OK(); } Status TransferLargeBinaryZeroCopy(RecordReader* reader, MemoryPool* pool, - const std::shared_ptr& type, + const std::shared_ptr& logical_type_field, std::shared_ptr* out) { std::vector> buffers = {reader->ReleaseIsValid(), reader->ReleaseOffsets(), reader->ReleaseValues()}; auto data = std::make_shared<::arrow::ArrayData>(::arrow::binary(), reader->values_written(), buffers, reader->null_count()); - // std::cout << "::arrow::MakeArray(data)->ToString():" << ::arrow::MakeArray(data)->ToString() << std::endl; auto array = ::arrow::MakeArray(data); ::arrow::compute::ExecContext ctx(pool); ::arrow::compute::CastOptions cast_options; @@ -398,10 +388,13 @@ Status TransferLargeBinaryZeroCopy(RecordReader* reader, MemoryPool* pool, ARROW_ASSIGN_OR_RAISE( array, - ::arrow::compute::Cast(*array, type, cast_options, &ctx)); + ::arrow::compute::Cast(*array, logical_type_field->type(), cast_options, &ctx)); - - *out = std::make_shared(std::move(::arrow::ArrayVector({array})), type); + auto chunks = ::arrow::ArrayVector({array}); + if (!logical_type_field->nullable()) { + ReconstructChunksWithoutNulls(&chunks); + } + *out = std::make_shared(std::move(chunks)), logical_type_field->type(); return Status::OK(); } @@ -819,14 +812,12 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr& va } break; case ::arrow::Type::BINARY: case ::arrow::Type::STRING: { - result = TransferBinaryZeroCopy(reader, value_field->type()); - // RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result)); - // result = chunked_result; + RETURN_NOT_OK(TransferBinaryZeroCopy(reader, value_field, &chunked_result)); + result = chunked_result; } break; case ::arrow::Type::LARGE_BINARY: case ::arrow::Type::LARGE_STRING: { - // result = TransferLargeBinaryZeroCopy(reader, pool, value_field->type()); - RETURN_NOT_OK(TransferLargeBinaryZeroCopy(reader, pool, value_field->type(), &chunked_result)); + RETURN_NOT_OK(TransferLargeBinaryZeroCopy(reader, pool, value_field, &chunked_result)); result = chunked_result; } break; case ::arrow::Type::DECIMAL128: { From 390323c10015e0ebe3aa4a275d2d3eb864510516 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 7 Dec 2022 01:14:04 +0800 Subject: [PATCH 06/15] remove testing dir change --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index 5bab2f264a2..ecab1162cbe 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88 +Subproject commit ecab1162cbec872e17d949ecc86181670aee045c From 3294aceede8e5e36dd3b353f8dd8d03d2ffdc106 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 14 Dec 2022 01:30:02 +0800 Subject: [PATCH 07/15] Fix RecordReaderByteArrayTest.SkipByteArray --- cpp/src/parquet/column_reader.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index ee43ebba907..a5a83b6a9f9 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1958,14 +1958,14 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, } ::arrow::ArrayVector GetBuilderChunks() override { - ::arrow::ArrayVector result = accumulator_.chunks; - if (result.size() == 0 || accumulator_.builder->length() > 0) { - std::shared_ptr<::arrow::Array> last_chunk; - PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); - result.push_back(std::move(last_chunk)); - } - accumulator_.chunks = {}; - return result; + std::vector> buffers = {ReleaseIsValid(), + ReleaseOffsets(), + ReleaseValues()}; + auto data = std::make_shared<::arrow::ArrayData>(::arrow::binary(), values_written(), + buffers, null_count()); + + auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)}); + return chunks; } void ReadValuesDense(int64_t values_to_read) override { From f703c5a62749f568ea17a50dd77c85fd56509c5f Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 14 Dec 2022 04:33:55 +0800 Subject: [PATCH 08/15] Format cpp code --- cpp/src/parquet/arrow/reader_internal.cc | 35 ++++++----- cpp/src/parquet/column_reader.cc | 74 +++++++++++----------- cpp/src/parquet/column_reader.h | 4 +- cpp/src/parquet/encoding.cc | 78 +++++++++++------------- cpp/src/parquet/encoding.h | 13 ++-- 5 files changed, 94 insertions(+), 110 deletions(-) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 74aaf788cc7..902cf11d394 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -358,13 +358,13 @@ std::shared_ptr TransferZeroCopy(RecordReader* reader, } Status TransferBinaryZeroCopy(RecordReader* reader, - const std::shared_ptr& logical_type_field, - std::shared_ptr* out) { - std::vector> buffers = {reader->ReleaseIsValid(), - reader->ReleaseOffsets(), - reader->ReleaseValues()}; - auto data = std::make_shared<::arrow::ArrayData>(logical_type_field->type(), reader->values_written(), - buffers, reader->null_count()); + const std::shared_ptr& logical_type_field, + std::shared_ptr* out) { + std::vector> buffers = { + reader->ReleaseIsValid(), reader->ReleaseOffsets(), reader->ReleaseValues()}; + auto data = std::make_shared<::arrow::ArrayData>(logical_type_field->type(), + reader->values_written(), buffers, + reader->null_count()); auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)}); if (!logical_type_field->nullable()) { ReconstructChunksWithoutNulls(&chunks); @@ -374,21 +374,19 @@ Status TransferBinaryZeroCopy(RecordReader* reader, } Status TransferLargeBinaryZeroCopy(RecordReader* reader, MemoryPool* pool, - const std::shared_ptr& logical_type_field, - std::shared_ptr* out) { - std::vector> buffers = {reader->ReleaseIsValid(), - reader->ReleaseOffsets(), - reader->ReleaseValues()}; - auto data = std::make_shared<::arrow::ArrayData>(::arrow::binary(), reader->values_written(), - buffers, reader->null_count()); + const std::shared_ptr& logical_type_field, + std::shared_ptr* out) { + std::vector> buffers = { + reader->ReleaseIsValid(), reader->ReleaseOffsets(), reader->ReleaseValues()}; + auto data = std::make_shared<::arrow::ArrayData>( + ::arrow::binary(), reader->values_written(), buffers, reader->null_count()); auto array = ::arrow::MakeArray(data); ::arrow::compute::ExecContext ctx(pool); ::arrow::compute::CastOptions cast_options; cast_options.allow_invalid_utf8 = true; // avoid spending time validating UTF8 data - ARROW_ASSIGN_OR_RAISE( - array, - ::arrow::compute::Cast(*array, logical_type_field->type(), cast_options, &ctx)); + ARROW_ASSIGN_OR_RAISE(array, ::arrow::compute::Cast(*array, logical_type_field->type(), + cast_options, &ctx)); auto chunks = ::arrow::ArrayVector({array}); if (!logical_type_field->nullable()) { @@ -817,7 +815,8 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr& va } break; case ::arrow::Type::LARGE_BINARY: case ::arrow::Type::LARGE_STRING: { - RETURN_NOT_OK(TransferLargeBinaryZeroCopy(reader, pool, value_field, &chunked_result)); + RETURN_NOT_OK( + TransferLargeBinaryZeroCopy(reader, pool, value_field, &chunked_result)); result = chunked_result; } break; case ::arrow::Type::DECIMAL128: { diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index a5a83b6a9f9..d3a7ac99f33 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1946,10 +1946,10 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, }; class ByteArrayChunkedOptRecordReader : public TypedRecordReader, - virtual public BinaryRecordReader { + virtual public BinaryRecordReader { public: ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, - ::arrow::MemoryPool* pool) + ::arrow::MemoryPool* pool) : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); @@ -1958,11 +1958,10 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, } ::arrow::ArrayVector GetBuilderChunks() override { - std::vector> buffers = {ReleaseIsValid(), - ReleaseOffsets(), - ReleaseValues()}; + std::vector> buffers = {ReleaseIsValid(), ReleaseOffsets(), + ReleaseValues()}; auto data = std::make_shared<::arrow::ArrayData>(::arrow::binary(), values_written(), - buffers, null_count()); + buffers, null_count()); auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)}); return chunks; @@ -1970,17 +1969,18 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, void ReadValuesDense(int64_t values_to_read) override { int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( - static_cast(values_to_read), 0, - NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), - values_, 0, &bianry_length_); + static_cast(values_to_read), 0, NULLPTR, + (reinterpret_cast(offset_->mutable_data()) + values_written_), values_, + 0, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read); } void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( static_cast(values_to_read), static_cast(null_count), - valid_bits_->mutable_data(), (reinterpret_cast(offset_->mutable_data()) + values_written_), - values_, values_written_, &bianry_length_); + valid_bits_->mutable_data(), + (reinterpret_cast(offset_->mutable_data()) + values_written_), values_, + values_written_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read - null_count); } @@ -1990,12 +1990,11 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, if (new_values_capacity > values_capacity_) { PARQUET_THROW_NOT_OK( values_->Resize(new_values_capacity * binary_per_row_length_, false)); - PARQUET_THROW_NOT_OK( - offset_->Resize((new_values_capacity+1) * 4, false)); + PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, false)); - auto offset = reinterpret_cast(offset_->mutable_data()); + auto offset = reinterpret_cast(offset_->mutable_data()); offset[0] = 0; - + values_capacity_ = new_values_capacity; } if (leaf_info_.HasNullableValues()) { @@ -2011,29 +2010,29 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, } } - std::shared_ptr ReleaseValues() override { - auto result = values_; - values_ = AllocateBuffer(this->pool_); - values_capacity_ = 0; - return result; + std::shared_ptr ReleaseValues() override { + auto result = values_; + values_ = AllocateBuffer(this->pool_); + values_capacity_ = 0; + return result; } std::shared_ptr ReleaseOffsets() override { - auto result = offset_; - - if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { - auto offsetArr = reinterpret_cast(offset_->mutable_data()); - const auto first_offset = offsetArr[0]; - const auto last_offset = offsetArr[values_written_]; - int64_t binary_length = last_offset - first_offset; - binary_per_row_length_ = binary_length / values_written_ + 1; - // std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; - hasCal_average_len_ = true; - } - - offset_ = AllocateBuffer(this->pool_); - bianry_length_ = 0; - return result; + auto result = offset_; + + if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { + auto offsetArr = reinterpret_cast(offset_->mutable_data()); + const auto first_offset = offsetArr[0]; + const auto last_offset = offsetArr[values_written_]; + int64_t binary_length = last_offset - first_offset; + binary_per_row_length_ = binary_length / values_written_ + 1; + // std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; + hasCal_average_len_ = true; + } + + offset_ = AllocateBuffer(this->pool_); + bianry_length_ = 0; + return result; } void ResetValues() { @@ -2156,10 +2155,9 @@ std::shared_ptr MakeByteArrayRecordReader(const ColumnDescriptor* bool read_dictionary) { if (read_dictionary) { return std::make_shared(descr, leaf_info, pool); - } else if(descr->logical_type()->is_decimal()) { + } else if (descr->logical_type()->is_decimal()) { return std::make_shared(descr, leaf_info, pool); - } - else { + } else { return std::make_shared(descr, leaf_info, pool); } } diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index ea68fddaa68..1e752d4b57c 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -303,9 +303,7 @@ class PARQUET_EXPORT RecordReader { /// allocated in subsequent ReadRecords calls virtual std::shared_ptr ReleaseValues() = 0; - virtual std::shared_ptr ReleaseOffsets() { - return nullptr; - } + virtual std::shared_ptr ReleaseOffsets() { return nullptr; } /// \brief Transfer filled validity bitmap buffer to caller. A new one will /// be allocated in subsequent ReadRecords calls diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 0897ac2b555..563f6834d67 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1361,14 +1361,12 @@ class PlainByteArrayDecoder : public PlainDecoder, } int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, - int32_t* offset, - std::shared_ptr<::arrow::ResizableBuffer> & values, - int64_t valid_bits_offset, - int32_t* bianry_length) override { + int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer>& values, + int64_t valid_bits_offset, int32_t* bianry_length) override { int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, - offset, values, - valid_bits_offset, &result, bianry_length)); + PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, offset, + values, valid_bits_offset, &result, + bianry_length)); return result; } @@ -1426,15 +1424,14 @@ class PlainByteArrayDecoder : public PlainDecoder, } Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t* valid_bits, - int32_t* offset, - std::shared_ptr<::arrow::ResizableBuffer> & values, - int64_t valid_bits_offset, - int* out_values_decoded, - int32_t* bianry_length) { + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer>& values, + int64_t valid_bits_offset, int* out_values_decoded, + int32_t* bianry_length) { int values_decoded = 0; auto dst_value = values->mutable_data() + (*bianry_length); int capacity = values->size(); - if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { + if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { PARQUET_THROW_NOT_OK(values->Resize(len_ + *bianry_length, false)); dst_value = values->mutable_data() + (*bianry_length); } @@ -1456,7 +1453,7 @@ class PlainByteArrayDecoder : public PlainDecoder, } (*bianry_length) += value_len; - offset[i+1] = offset[i] + value_len; + offset[i + 1] = offset[i] + value_len; memcpy(dst_value, data_ + 4, value_len); dst_value = dst_value + value_len; @@ -1467,7 +1464,7 @@ class PlainByteArrayDecoder : public PlainDecoder, return Status::OK(); }, [&]() { - offset[i+1] = offset[i]; + offset[i + 1] = offset[i]; ++i; return Status::OK(); })); @@ -1944,19 +1941,16 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, } int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, - int32_t* offset, - std::shared_ptr<::arrow::ResizableBuffer> & values, - int64_t valid_bits_offset, - int32_t* bianry_length) override { + int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer>& values, + int64_t valid_bits_offset, int32_t* bianry_length) override { int result = 0; if (null_count == 0) { - PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values, - offset, values, &result, bianry_length)); + PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values, offset, values, + &result, bianry_length)); } else { PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, - offset, values, - valid_bits_offset, &result, bianry_length)); - + offset, values, valid_bits_offset, + &result, bianry_length)); } return result; @@ -2033,11 +2027,10 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, } Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t* valid_bits, - int32_t* offset, - std::shared_ptr<::arrow::ResizableBuffer> & values, - int64_t valid_bits_offset, - int* out_num_values, - int32_t* bianry_length) { + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer>& values, + int64_t valid_bits_offset, int* out_num_values, + int32_t* bianry_length) { constexpr int32_t kBufferSize = 1024; int32_t indices[kBufferSize]; auto dst_value = values->mutable_data() + (*bianry_length); @@ -2067,10 +2060,11 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, if (is_valid) { auto idx = indices[i]; // RETURN_NOT_OK(IndexInBounds(idx)); - const auto& val = dict_values[idx]; + const auto& val = dict_values[idx]; auto value_len = val.len; - uint64_t value_offset= offset[num_appended+1] = offset[num_appended] + value_len; - + uint64_t value_offset = offset[num_appended + 1] = + offset[num_appended] + value_len; + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); PARQUET_THROW_NOT_OK(values->Resize(capacity, false)); @@ -2080,11 +2074,10 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, memcpy(dst_value, val.ptr, static_cast(value_len)); dst_value = dst_value + value_len; - ++i; ++values_decoded; } else { - offset[num_appended+1] = offset[num_appended]; + offset[num_appended + 1] = offset[num_appended]; --null_count; } ++num_appended; @@ -2097,16 +2090,15 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, bit_reader.Next(); } } else { - offset[num_appended+1] = offset[num_appended]; + offset[num_appended + 1] = offset[num_appended]; --null_count; ++num_appended; } } *out_num_values = values_decoded; - return Status::OK(); + return Status::OK(); } - Status DecodeArrowDenseNonNull(int num_values, typename EncodingTraits::Accumulator* out, int* out_num_values) { @@ -2136,12 +2128,9 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return Status::OK(); } - Status DecodeArrowDenseNonNull_opt(int num_values, - int32_t* offset, - std::shared_ptr<::arrow::ResizableBuffer> & values, - int* out_num_values, - int32_t* bianry_length) { - + Status DecodeArrowDenseNonNull_opt(int num_values, int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer>& values, + int* out_num_values, int32_t* bianry_length) { constexpr int32_t kBufferSize = 2048; int32_t indices[kBufferSize]; int values_decoded = 0; @@ -2162,7 +2151,8 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // RETURN_NOT_OK(IndexInBounds(idx)); const auto& val = dict_values[idx]; auto value_len = val.len; - uint64_t value_offset= offset[num_appended+1] = offset[num_appended] + value_len; + uint64_t value_offset = offset[num_appended + 1] = + offset[num_appended] + value_len; if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); PARQUET_THROW_NOT_OK(values->Resize(capacity, false)); diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 3c1fbda1fd8..692bb392114 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -318,13 +318,12 @@ class TypedDecoder : virtual public Decoder { typename EncodingTraits::Accumulator* out) = 0; virtual int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, - int32_t* offset, - std::shared_ptr<::arrow::ResizableBuffer> & values, - int64_t valid_bits_offset, - int32_t* bianry_length) { - return 0; - } - + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer>& values, + int64_t valid_bits_offset, int32_t* bianry_length) { + return 0; + } + /// \brief Decode into an ArrayBuilder or other accumulator ignoring nulls /// /// \return number of values decoded From c4312a77561a1bc6f768065f95b6da91f343003c Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 14 Dec 2022 18:13:42 +0800 Subject: [PATCH 09/15] Fix nullptr --- cpp/src/parquet/column_reader.cc | 2 ++ cpp/src/parquet/column_reader.h | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 8cd33049169..91a8a620f51 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1592,6 +1592,8 @@ class TypedRecordReader : public TypedColumnReaderImpl, } } + std::shared_ptr ReleaseOffsets() override { return nullptr; } + std::shared_ptr ReleaseIsValid() override { if (leaf_info_.HasNullableValues()) { auto result = valid_bits_; diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 1e752d4b57c..0bdec7aa560 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -303,7 +303,7 @@ class PARQUET_EXPORT RecordReader { /// allocated in subsequent ReadRecords calls virtual std::shared_ptr ReleaseValues() = 0; - virtual std::shared_ptr ReleaseOffsets() { return nullptr; } + virtual std::shared_ptr ReleaseOffsets() = 0; /// \brief Transfer filled validity bitmap buffer to caller. A new one will /// be allocated in subsequent ReadRecords calls From 9104f0e0c79a13325189606ea452b3afc1d0745e Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 16 Dec 2022 05:57:22 +0800 Subject: [PATCH 10/15] Fix delta encoding --- cpp/src/parquet/arrow/reader_internal.cc | 52 +------------- cpp/src/parquet/column_reader.cc | 92 +++++++++++++++++------- 2 files changed, 68 insertions(+), 76 deletions(-) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 902cf11d394..e428c206bfc 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -357,45 +357,6 @@ std::shared_ptr TransferZeroCopy(RecordReader* reader, return ::arrow::MakeArray(data); } -Status TransferBinaryZeroCopy(RecordReader* reader, - const std::shared_ptr& logical_type_field, - std::shared_ptr* out) { - std::vector> buffers = { - reader->ReleaseIsValid(), reader->ReleaseOffsets(), reader->ReleaseValues()}; - auto data = std::make_shared<::arrow::ArrayData>(logical_type_field->type(), - reader->values_written(), buffers, - reader->null_count()); - auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)}); - if (!logical_type_field->nullable()) { - ReconstructChunksWithoutNulls(&chunks); - } - *out = std::make_shared(std::move(chunks), logical_type_field->type()); - return Status::OK(); -} - -Status TransferLargeBinaryZeroCopy(RecordReader* reader, MemoryPool* pool, - const std::shared_ptr& logical_type_field, - std::shared_ptr* out) { - std::vector> buffers = { - reader->ReleaseIsValid(), reader->ReleaseOffsets(), reader->ReleaseValues()}; - auto data = std::make_shared<::arrow::ArrayData>( - ::arrow::binary(), reader->values_written(), buffers, reader->null_count()); - auto array = ::arrow::MakeArray(data); - ::arrow::compute::ExecContext ctx(pool); - ::arrow::compute::CastOptions cast_options; - cast_options.allow_invalid_utf8 = true; // avoid spending time validating UTF8 data - - ARROW_ASSIGN_OR_RAISE(array, ::arrow::compute::Cast(*array, logical_type_field->type(), - cast_options, &ctx)); - - auto chunks = ::arrow::ArrayVector({array}); - if (!logical_type_field->nullable()) { - ReconstructChunksWithoutNulls(&chunks); - } - *out = std::make_shared(std::move(chunks)), logical_type_field->type(); - return Status::OK(); -} - Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool, Datum* out) { int64_t length = reader->values_written(); @@ -804,19 +765,12 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr& va case ::arrow::Type::DATE64: RETURN_NOT_OK(TransferDate64(reader, pool, value_field, &result)); break; - case ::arrow::Type::FIXED_SIZE_BINARY: { - RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result)); - result = chunked_result; - } break; + case ::arrow::Type::FIXED_SIZE_BINARY: case ::arrow::Type::BINARY: - case ::arrow::Type::STRING: { - RETURN_NOT_OK(TransferBinaryZeroCopy(reader, value_field, &chunked_result)); - result = chunked_result; - } break; + case ::arrow::Type::STRING: case ::arrow::Type::LARGE_BINARY: case ::arrow::Type::LARGE_STRING: { - RETURN_NOT_OK( - TransferLargeBinaryZeroCopy(reader, pool, value_field, &chunked_result)); + RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result)); result = chunked_result; } break; case ::arrow::Type::DECIMAL128: { diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 91a8a620f51..c1117170c2e 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1969,45 +1969,80 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); values_ = AllocateBuffer(pool); offset_ = AllocateBuffer(pool); + if (current_encoding_ == Encoding::PLAIN_DICTIONARY || + current_encoding_ == Encoding::PLAIN) { + uses_opt_ = true; + } } ::arrow::ArrayVector GetBuilderChunks() override { - std::vector> buffers = {ReleaseIsValid(), ReleaseOffsets(), - ReleaseValues()}; - auto data = std::make_shared<::arrow::ArrayData>(::arrow::binary(), values_written(), - buffers, null_count()); - - auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)}); - return chunks; + if (uses_opt_) { + std::vector> buffers = {ReleaseIsValid(), ReleaseOffsets(), + ReleaseValues()}; + auto data = std::make_shared<::arrow::ArrayData>( + ::arrow::binary(), values_written(), buffers, null_count()); + + auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)}); + return chunks; + } else { + ::arrow::ArrayVector result = accumulator_.chunks; + if (result.size() == 0 || accumulator_.builder->length() > 0) { + std::shared_ptr<::arrow::Array> last_chunk; + PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); + result.push_back(std::move(last_chunk)); + } + accumulator_.chunks = {}; + return result; + } } void ReadValuesDense(int64_t values_to_read) override { - int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( - static_cast(values_to_read), 0, NULLPTR, - (reinterpret_cast(offset_->mutable_data()) + values_written_), values_, - 0, &bianry_length_); - DCHECK_EQ(num_decoded, values_to_read); + if (uses_opt_) { + std::cout << "ReadValuesDense:current_encoding_:" << current_encoding_ << std::endl; + int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( + static_cast(values_to_read), 0, NULLPTR, + (reinterpret_cast(offset_->mutable_data()) + values_written_), + values_, 0, &bianry_length_); + DCHECK_EQ(num_decoded, values_to_read); + } else { + int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( + static_cast(values_to_read), &accumulator_); + CheckNumberDecoded(num_decoded, values_to_read); + ResetValues(); + } } void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( - static_cast(values_to_read), static_cast(null_count), - valid_bits_->mutable_data(), - (reinterpret_cast(offset_->mutable_data()) + values_written_), values_, - values_written_, &bianry_length_); - DCHECK_EQ(num_decoded, values_to_read - null_count); + if (uses_opt_) { + std::cout << "ReadValuesSpaced:current_encoding_:" << current_encoding_ + << std::endl; + int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), + (reinterpret_cast(offset_->mutable_data()) + values_written_), + values_, values_written_, &bianry_length_); + DCHECK_EQ(num_decoded, values_to_read - null_count); + } else { + int64_t num_decoded = this->current_decoder_->DecodeArrow( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), values_written_, &accumulator_); + CheckNumberDecoded(num_decoded, values_to_read - null_count); + ResetValues(); + } } void ReserveValues(int64_t extra_values) override { const int64_t new_values_capacity = UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { - PARQUET_THROW_NOT_OK( - values_->Resize(new_values_capacity * binary_per_row_length_, false)); - PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, false)); + if (uses_opt_) { + PARQUET_THROW_NOT_OK( + values_->Resize(new_values_capacity * binary_per_row_length_, false)); + PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, false)); - auto offset = reinterpret_cast(offset_->mutable_data()); - offset[0] = 0; + auto offset = reinterpret_cast(offset_->mutable_data()); + offset[0] = 0; + } values_capacity_ = new_values_capacity; } @@ -2052,14 +2087,15 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, void ResetValues() { if (values_written_ > 0) { // Resize to 0, but do not shrink to fit + if (uses_opt_) { + PARQUET_THROW_NOT_OK(offset_->Resize(0, false)); + PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + bianry_length_ = 0; + } PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); - PARQUET_THROW_NOT_OK(offset_->Resize(0, false)); - PARQUET_THROW_NOT_OK(values_->Resize(0, false)); - values_written_ = 0; values_capacity_ = 0; null_count_ = 0; - bianry_length_ = 0; } } @@ -2069,6 +2105,8 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, int32_t bianry_length_ = 0; + bool uses_opt_ = false; + std::shared_ptr<::arrow::ResizableBuffer> offset_; }; From 3a80c9dc4c66b169c36a7fd21a461dcaed9dc800 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 16 Dec 2022 06:05:58 +0800 Subject: [PATCH 11/15] clean cout --- cpp/src/parquet/column_reader.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index c1117170c2e..1f685ea462e 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1998,7 +1998,6 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, void ReadValuesDense(int64_t values_to_read) override { if (uses_opt_) { - std::cout << "ReadValuesDense:current_encoding_:" << current_encoding_ << std::endl; int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( static_cast(values_to_read), 0, NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), @@ -2014,8 +2013,6 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { if (uses_opt_) { - std::cout << "ReadValuesSpaced:current_encoding_:" << current_encoding_ - << std::endl; int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( static_cast(values_to_read), static_cast(null_count), valid_bits_->mutable_data(), From 50ec96469e57934bf6565c40f515c39a9c5f5391 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 16 Dec 2022 16:51:23 +0800 Subject: [PATCH 12/15] fix conversion --- cpp/src/parquet/encoding.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 563f6834d67..df78f838b55 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1430,7 +1430,7 @@ class PlainByteArrayDecoder : public PlainDecoder, int32_t* bianry_length) { int values_decoded = 0; auto dst_value = values->mutable_data() + (*bianry_length); - int capacity = values->size(); + int64_t capacity = values->size(); if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { PARQUET_THROW_NOT_OK(values->Resize(len_ + *bianry_length, false)); dst_value = values->mutable_data() + (*bianry_length); From 5512db88448bced2c672de20ddafd51e128240ab Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Sat, 17 Dec 2022 04:23:17 +0800 Subject: [PATCH 13/15] Turn on opt --- cpp/src/parquet/column_reader.cc | 44 +++++++++++++++----------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 1f685ea462e..4f13a7043d6 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -850,12 +850,23 @@ class ColumnReaderImplBase { current_encoding_ = encoding; current_decoder_->SetData(static_cast(num_buffered_values_), buffer, static_cast(data_size)); + if (!hasSet_uses_opt_) { + if (current_encoding_ == Encoding::PLAIN_DICTIONARY || + current_encoding_ == Encoding::PLAIN || + current_encoding_ == Encoding::RLE_DICTIONARY) { + uses_opt_ = true; + } + hasSet_uses_opt_ = true; + } } int64_t available_values_current_page() const { return num_buffered_values_ - num_decoded_values_; } + bool hasSet_uses_opt_ = false; + bool uses_opt_ = false; + const ColumnDescriptor* descr_; const int16_t max_def_level_; const int16_t max_rep_level_; @@ -1969,10 +1980,6 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); values_ = AllocateBuffer(pool); offset_ = AllocateBuffer(pool); - if (current_encoding_ == Encoding::PLAIN_DICTIONARY || - current_encoding_ == Encoding::PLAIN) { - uses_opt_ = true; - } } ::arrow::ArrayVector GetBuilderChunks() override { @@ -2032,14 +2039,12 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, const int64_t new_values_capacity = UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { - if (uses_opt_) { - PARQUET_THROW_NOT_OK( - values_->Resize(new_values_capacity * binary_per_row_length_, false)); - PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, false)); + PARQUET_THROW_NOT_OK( + values_->Resize(new_values_capacity * binary_per_row_length_, false)); + PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, false)); - auto offset = reinterpret_cast(offset_->mutable_data()); - offset[0] = 0; - } + auto offset = reinterpret_cast(offset_->mutable_data()); + offset[0] = 0; values_capacity_ = new_values_capacity; } @@ -2048,24 +2053,20 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, if (valid_bits_->size() < valid_bytes_new) { int64_t valid_bytes_old = bit_util::BytesForBits(values_written_); PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); - // Avoid valgrind warnings memset(valid_bits_->mutable_data() + valid_bytes_old, 0, valid_bytes_new - valid_bytes_old); } } } - std::shared_ptr ReleaseValues() override { auto result = values_; values_ = AllocateBuffer(this->pool_); values_capacity_ = 0; return result; } - std::shared_ptr ReleaseOffsets() override { auto result = offset_; - if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { auto offsetArr = reinterpret_cast(offset_->mutable_data()); const auto first_offset = offsetArr[0]; @@ -2075,24 +2076,21 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, // std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; hasCal_average_len_ = true; } - offset_ = AllocateBuffer(this->pool_); bianry_length_ = 0; return result; } - void ResetValues() { if (values_written_ > 0) { // Resize to 0, but do not shrink to fit - if (uses_opt_) { - PARQUET_THROW_NOT_OK(offset_->Resize(0, false)); - PARQUET_THROW_NOT_OK(values_->Resize(0, false)); - bianry_length_ = 0; - } PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); + PARQUET_THROW_NOT_OK(offset_->Resize(0, false)); + PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + values_written_ = 0; values_capacity_ = 0; null_count_ = 0; + bianry_length_ = 0; } } @@ -2102,8 +2100,6 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, int32_t bianry_length_ = 0; - bool uses_opt_ = false; - std::shared_ptr<::arrow::ResizableBuffer> offset_; }; From 19e2ba3b4bf6aca344cfa179f194163d6c3ef089 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Mon, 19 Dec 2022 18:46:24 +0800 Subject: [PATCH 14/15] Refine --- cpp/src/parquet/column_reader.cc | 5 ++--- cpp/src/parquet/encoding.cc | 12 ++++++------ cpp/src/parquet/encoding.h | 2 +- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 4f13a7043d6..1325e211c1e 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -2005,7 +2005,7 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, void ReadValuesDense(int64_t values_to_read) override { if (uses_opt_) { - int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( + int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy( static_cast(values_to_read), 0, NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), values_, 0, &bianry_length_); @@ -2020,7 +2020,7 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { if (uses_opt_) { - int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( + int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy( static_cast(values_to_read), static_cast(null_count), valid_bits_->mutable_data(), (reinterpret_cast(offset_->mutable_data()) + values_written_), @@ -2073,7 +2073,6 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, const auto last_offset = offsetArr[values_written_]; int64_t binary_length = last_offset - first_offset; binary_per_row_length_ = binary_length / values_written_ + 1; - // std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; hasCal_average_len_ = true; } offset_ = AllocateBuffer(this->pool_); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 30baa5ec18b..3d3b01c9fdd 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1360,11 +1360,11 @@ class PlainByteArrayDecoder : public PlainDecoder, return result; } - int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int DecodeArrowZeroCopy(int num_values, int null_count, const uint8_t* valid_bits, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer>& values, int64_t valid_bits_offset, int32_t* bianry_length) override { int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, offset, + PARQUET_THROW_NOT_OK(DecodeArrowDenseZeroCopy(num_values, null_count, valid_bits, offset, values, valid_bits_offset, &result, bianry_length)); @@ -1423,7 +1423,7 @@ class PlainByteArrayDecoder : public PlainDecoder, return Status::OK(); } - Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t* valid_bits, + Status DecodeArrowDenseZeroCopy(int num_values, int null_count, const uint8_t* valid_bits, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer>& values, int64_t valid_bits_offset, int* out_values_decoded, @@ -1940,7 +1940,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return result; } - int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int DecodeArrowZeroCopy(int num_values, int null_count, const uint8_t* valid_bits, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer>& values, int64_t valid_bits_offset, int32_t* bianry_length) override { int result = 0; @@ -1948,7 +1948,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values, offset, values, &result, bianry_length)); } else { - PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, + PARQUET_THROW_NOT_OK(DecodeArrowDenseZeroCopy(num_values, null_count, valid_bits, offset, values, valid_bits_offset, &result, bianry_length)); } @@ -2026,7 +2026,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return Status::OK(); } - Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t* valid_bits, + Status DecodeArrowDenseZeroCopy(int num_values, int null_count, const uint8_t* valid_bits, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer>& values, int64_t valid_bits_offset, int* out_num_values, diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 692bb392114..7bb0decb5d4 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -317,7 +317,7 @@ class TypedDecoder : virtual public Decoder { int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out) = 0; - virtual int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + virtual int DecodeArrowZeroCopy(int num_values, int null_count, const uint8_t* valid_bits, int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer>& values, int64_t valid_bits_offset, int32_t* bianry_length) { From 6abd87979d8e97e83445f758eca80fc02eac27b8 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 13 Jan 2023 00:55:48 +0800 Subject: [PATCH 15/15] Correct typo --- cpp/src/parquet/column_reader.cc | 10 +++++----- cpp/src/parquet/column_reader.h | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 080f3f739c5..8e1177b2950 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -2010,7 +2010,7 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy( static_cast(values_to_read), 0, NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), - values_, 0, &bianry_length_); + values_, 0, &binary_length_); DCHECK_EQ(num_decoded, values_to_read); } else { int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( @@ -2026,7 +2026,7 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, static_cast(values_to_read), static_cast(null_count), valid_bits_->mutable_data(), (reinterpret_cast(offset_->mutable_data()) + values_written_), - values_, values_written_, &bianry_length_); + values_, values_written_, &binary_length_); DCHECK_EQ(num_decoded, values_to_read - null_count); } else { int64_t num_decoded = this->current_decoder_->DecodeArrow( @@ -2078,7 +2078,7 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, hasCal_average_len_ = true; } offset_ = AllocateBuffer(this->pool_); - bianry_length_ = 0; + binary_length_ = 0; return result; } void ResetValues() { @@ -2091,7 +2091,7 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, values_written_ = 0; values_capacity_ = 0; null_count_ = 0; - bianry_length_ = 0; + binary_length_ = 0; } } @@ -2099,7 +2099,7 @@ class ByteArrayChunkedOptRecordReader : public TypedRecordReader, // Helper data structure for accumulating builder chunks typename EncodingTraits::Accumulator accumulator_; - int32_t bianry_length_ = 0; + int32_t binary_length_ = 0; std::shared_ptr<::arrow::ResizableBuffer> offset_; }; diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 0bdec7aa560..c5509d3d4a5 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -55,7 +55,7 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; // 16 KB is the default expected page header size static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; -static constexpr int32_t kDefaultBinaryPerRowSzie = 20; +static constexpr int32_t kDefaultBinaryPerRowSize = 20; class PARQUET_EXPORT LevelDecoder { public: @@ -377,7 +377,7 @@ class PARQUET_EXPORT RecordReader { int64_t null_count_; bool hasCal_average_len_ = false; - int64_t binary_per_row_length_ = kDefaultBinaryPerRowSzie; + int64_t binary_per_row_length_ = kDefaultBinaryPerRowSize; /// \brief Each bit corresponds to one element in 'values_' and specifies if it /// is null or not null.