diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 9246096c95c..8e1177b2950 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -852,12 +852,23 @@ class ColumnReaderImplBase { current_encoding_ = encoding; current_decoder_->SetData(static_cast(num_buffered_values_), buffer, static_cast(data_size)); + if (!hasSet_uses_opt_) { + if (current_encoding_ == Encoding::PLAIN_DICTIONARY || + current_encoding_ == Encoding::PLAIN || + current_encoding_ == Encoding::RLE_DICTIONARY) { + uses_opt_ = true; + } + hasSet_uses_opt_ = true; + } } int64_t available_values_current_page() const { return num_buffered_values_ - num_decoded_values_; } + bool hasSet_uses_opt_ = false; + bool uses_opt_ = false; + const ColumnDescriptor* descr_; const int16_t max_def_level_; const int16_t max_rep_level_; @@ -1594,6 +1605,8 @@ class TypedRecordReader : public TypedColumnReaderImpl, } } + std::shared_ptr ReleaseOffsets() override { return nullptr; } + std::shared_ptr ReleaseIsValid() override { if (leaf_info_.HasNullableValues()) { auto result = valid_bits_; @@ -1697,7 +1710,7 @@ class TypedRecordReader : public TypedColumnReaderImpl, } } - void ReserveValues(int64_t extra_values) { + void ReserveValues(int64_t extra_values) override { const int64_t new_values_capacity = UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { @@ -1959,6 +1972,138 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, typename EncodingTraits::Accumulator accumulator_; }; +class ByteArrayChunkedOptRecordReader : public TypedRecordReader, + virtual public BinaryRecordReader { + public: + ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : TypedRecordReader(descr, leaf_info, pool) { + DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); + accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); + values_ = AllocateBuffer(pool); + offset_ = AllocateBuffer(pool); + } + + ::arrow::ArrayVector GetBuilderChunks() override { + if (uses_opt_) { + std::vector> buffers = {ReleaseIsValid(), ReleaseOffsets(), + ReleaseValues()}; + auto data = std::make_shared<::arrow::ArrayData>( + ::arrow::binary(), values_written(), buffers, null_count()); + + auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)}); + return chunks; + } else { + ::arrow::ArrayVector result = accumulator_.chunks; + if (result.size() == 0 || accumulator_.builder->length() > 0) { + std::shared_ptr<::arrow::Array> last_chunk; + PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); + result.push_back(std::move(last_chunk)); + } + accumulator_.chunks = {}; + return result; + } + } + + void ReadValuesDense(int64_t values_to_read) override { + if (uses_opt_) { + int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy( + static_cast(values_to_read), 0, NULLPTR, + (reinterpret_cast(offset_->mutable_data()) + values_written_), + values_, 0, &binary_length_); + DCHECK_EQ(num_decoded, values_to_read); + } else { + int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( + static_cast(values_to_read), &accumulator_); + CheckNumberDecoded(num_decoded, values_to_read); + ResetValues(); + } + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + if (uses_opt_) { + int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), + (reinterpret_cast(offset_->mutable_data()) + values_written_), + values_, values_written_, &binary_length_); + DCHECK_EQ(num_decoded, values_to_read - null_count); + } else { + int64_t num_decoded = this->current_decoder_->DecodeArrow( + static_cast(values_to_read), static_cast(null_count), + valid_bits_->mutable_data(), values_written_, &accumulator_); + CheckNumberDecoded(num_decoded, values_to_read - null_count); + ResetValues(); + } + } + + void ReserveValues(int64_t extra_values) override { + const int64_t new_values_capacity = + UpdateCapacity(values_capacity_, values_written_, extra_values); + if (new_values_capacity > values_capacity_) { + PARQUET_THROW_NOT_OK( + values_->Resize(new_values_capacity * binary_per_row_length_, false)); + PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, false)); + + auto offset = reinterpret_cast(offset_->mutable_data()); + offset[0] = 0; + + values_capacity_ = new_values_capacity; + } + if (leaf_info_.HasNullableValues()) { + int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_); + if (valid_bits_->size() < valid_bytes_new) { + int64_t valid_bytes_old = bit_util::BytesForBits(values_written_); + PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); + // Avoid valgrind warnings + memset(valid_bits_->mutable_data() + valid_bytes_old, 0, + valid_bytes_new - valid_bytes_old); + } + } + } + std::shared_ptr ReleaseValues() override { + auto result = values_; + values_ = AllocateBuffer(this->pool_); + values_capacity_ = 0; + return result; + } + std::shared_ptr ReleaseOffsets() override { + auto result = offset_; + if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { + auto offsetArr = reinterpret_cast(offset_->mutable_data()); + const auto first_offset = offsetArr[0]; + const auto last_offset = offsetArr[values_written_]; + int64_t binary_length = last_offset - first_offset; + binary_per_row_length_ = binary_length / values_written_ + 1; + hasCal_average_len_ = true; + } + offset_ = AllocateBuffer(this->pool_); + binary_length_ = 0; + return result; + } + void ResetValues() { + if (values_written_ > 0) { + // Resize to 0, but do not shrink to fit + PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); + PARQUET_THROW_NOT_OK(offset_->Resize(0, false)); + PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + + values_written_ = 0; + values_capacity_ = 0; + null_count_ = 0; + binary_length_ = 0; + } + } + + private: + // Helper data structure for accumulating builder chunks + typename EncodingTraits::Accumulator accumulator_; + + int32_t binary_length_ = 0; + + std::shared_ptr<::arrow::ResizableBuffer> offset_; +}; + class ByteArrayDictionaryRecordReader : public TypedRecordReader, virtual public DictionaryRecordReader { public: @@ -2056,8 +2201,10 @@ std::shared_ptr MakeByteArrayRecordReader(const ColumnDescriptor* bool read_dictionary) { if (read_dictionary) { return std::make_shared(descr, leaf_info, pool); - } else { + } else if (descr->logical_type()->is_decimal()) { return std::make_shared(descr, leaf_info, pool); + } else { + return std::make_shared(descr, leaf_info, pool); } } diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index b06266de38d..c5509d3d4a5 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -55,6 +55,8 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; // 16 KB is the default expected page header size static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; +static constexpr int32_t kDefaultBinaryPerRowSize = 20; + class PARQUET_EXPORT LevelDecoder { public: LevelDecoder(); @@ -291,6 +293,8 @@ class PARQUET_EXPORT RecordReader { /// \brief Pre-allocate space for data. Results in better flat read performance virtual void Reserve(int64_t num_values) = 0; + virtual void ReserveValues(int64_t capacity) {} + /// \brief Clear consumed values and repetition/definition levels as the /// result of calling ReadRecords virtual void Reset() = 0; @@ -299,6 +303,8 @@ class PARQUET_EXPORT RecordReader { /// allocated in subsequent ReadRecords calls virtual std::shared_ptr ReleaseValues() = 0; + virtual std::shared_ptr ReleaseOffsets() = 0; + /// \brief Transfer filled validity bitmap buffer to caller. A new one will /// be allocated in subsequent ReadRecords calls virtual std::shared_ptr ReleaseIsValid() = 0; @@ -370,6 +376,9 @@ class PARQUET_EXPORT RecordReader { int64_t values_capacity_; int64_t null_count_; + bool hasCal_average_len_ = false; + int64_t binary_per_row_length_ = kDefaultBinaryPerRowSize; + /// \brief Each bit corresponds to one element in 'values_' and specifies if it /// is null or not null. std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index b9472d72aeb..6822bef5d8e 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1360,6 +1360,17 @@ class PlainByteArrayDecoder : public PlainDecoder, return result; } + int DecodeArrowZeroCopy(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer>& values, + int64_t valid_bits_offset, int32_t* bianry_length) override { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDenseZeroCopy(num_values, null_count, valid_bits, offset, + values, valid_bits_offset, &result, + bianry_length)); + + return result; + } + private: Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -1412,6 +1423,57 @@ class PlainByteArrayDecoder : public PlainDecoder, return Status::OK(); } + Status DecodeArrowDenseZeroCopy(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer>& values, + int64_t valid_bits_offset, int* out_values_decoded, + int32_t* bianry_length) { + int values_decoded = 0; + auto dst_value = values->mutable_data() + (*bianry_length); + int64_t capacity = values->size(); + if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { + PARQUET_THROW_NOT_OK(values->Resize(len_ + *bianry_length, false)); + dst_value = values->mutable_data() + (*bianry_length); + } + + int i = 0; + RETURN_NOT_OK(VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { + if (ARROW_PREDICT_FALSE(len_ < 4)) { + ParquetException::EofException(); + } + auto value_len = ::arrow::util::SafeLoadAs(data_); + if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { + return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); + } + auto increment = value_len + 4; + if (ARROW_PREDICT_FALSE(len_ < increment)) { + ParquetException::EofException(); + } + + (*bianry_length) += value_len; + offset[i + 1] = offset[i] + value_len; + memcpy(dst_value, data_ + 4, value_len); + dst_value = dst_value + value_len; + + data_ += increment; + len_ -= increment; + ++values_decoded; + ++i; + return Status::OK(); + }, + [&]() { + offset[i + 1] = offset[i]; + ++i; + return Status::OK(); + })); + + num_values_ -= values_decoded; + *out_values_decoded = values_decoded; + return Status::OK(); + } + template Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, @@ -1878,6 +1940,22 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return result; } + int DecodeArrowZeroCopy(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, std::shared_ptr<::arrow::ResizableBuffer>& values, + int64_t valid_bits_offset, int32_t* bianry_length) override { + int result = 0; + if (null_count == 0) { + PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values, offset, values, + &result, bianry_length)); + } else { + PARQUET_THROW_NOT_OK(DecodeArrowDenseZeroCopy(num_values, null_count, valid_bits, + offset, values, valid_bits_offset, + &result, bianry_length)); + } + + return result; + } + private: Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -1948,6 +2026,79 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return Status::OK(); } + Status DecodeArrowDenseZeroCopy(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer>& values, + int64_t valid_bits_offset, int* out_num_values, + int32_t* bianry_length) { + constexpr int32_t kBufferSize = 1024; + int32_t indices[kBufferSize]; + auto dst_value = values->mutable_data() + (*bianry_length); + + ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); + + auto dict_values = reinterpret_cast(dictionary_->data()); + int values_decoded = 0; + int num_appended = 0; + uint64_t capacity = values->size(); + while (num_appended < num_values) { + bool is_valid = bit_reader.IsSet(); + bit_reader.Next(); + + if (is_valid) { + int32_t batch_size = + std::min(kBufferSize, num_values - num_appended - null_count); + int num_indices = idx_decoder_.GetBatch(indices, batch_size); + + if (ARROW_PREDICT_FALSE(num_indices < 1)) { + return Status::Invalid("Invalid number of indices '", num_indices, "'"); + } + + int i = 0; + while (true) { + // Consume all indices + if (is_valid) { + auto idx = indices[i]; + // RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + auto value_len = val.len; + uint64_t value_offset = offset[num_appended + 1] = + offset[num_appended] + value_len; + + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { + capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); + PARQUET_THROW_NOT_OK(values->Resize(capacity, false)); + dst_value = values->mutable_data() + (*bianry_length); + } + (*bianry_length) += value_len; + memcpy(dst_value, val.ptr, static_cast(value_len)); + dst_value = dst_value + value_len; + + ++i; + ++values_decoded; + } else { + offset[num_appended + 1] = offset[num_appended]; + --null_count; + } + ++num_appended; + if (i == num_indices) { + // Do not advance the bit_reader if we have fulfilled the decode + // request + break; + } + is_valid = bit_reader.IsSet(); + bit_reader.Next(); + } + } else { + offset[num_appended + 1] = offset[num_appended]; + --null_count; + ++num_appended; + } + } + *out_num_values = values_decoded; + return Status::OK(); + } + Status DecodeArrowDenseNonNull(int num_values, typename EncodingTraits::Accumulator* out, int* out_num_values) { @@ -1977,6 +2128,48 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return Status::OK(); } + Status DecodeArrowDenseNonNull_opt(int num_values, int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer>& values, + int* out_num_values, int32_t* bianry_length) { + constexpr int32_t kBufferSize = 2048; + int32_t indices[kBufferSize]; + int values_decoded = 0; + uint64_t capacity = values->size(); + + // ArrowBinaryHelper helper(out); + auto dict_values = reinterpret_cast(dictionary_->data()); + + auto dst_value = values->mutable_data() + (*bianry_length); + int num_appended = 0; + + while (values_decoded < num_values) { + int32_t batch_size = std::min(kBufferSize, num_values - values_decoded); + int num_indices = idx_decoder_.GetBatch(indices, batch_size); + if (num_indices == 0) ParquetException::EofException(); + for (int i = 0; i < num_indices; ++i) { + auto idx = indices[i]; + // RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + auto value_len = val.len; + uint64_t value_offset = offset[num_appended + 1] = + offset[num_appended] + value_len; + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { + capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); + PARQUET_THROW_NOT_OK(values->Resize(capacity, false)); + dst_value = values->mutable_data() + (*bianry_length); + } + (*bianry_length) += value_len; + memcpy(dst_value, val.ptr, static_cast(value_len)); + dst_value = dst_value + value_len; + + num_appended++; + } + values_decoded += num_indices; + } + *out_num_values = values_decoded; + return Status::OK(); + } + template Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 374a02cf491..7bb0decb5d4 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -317,6 +317,13 @@ class TypedDecoder : virtual public Decoder { int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out) = 0; + virtual int DecodeArrowZeroCopy(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer>& values, + int64_t valid_bits_offset, int32_t* bianry_length) { + return 0; + } + /// \brief Decode into an ArrayBuilder or other accumulator ignoring nulls /// /// \return number of values decoded