Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2718,21 +2718,14 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
MemoryPool* pool = ::arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
len_decoder_(nullptr, pool),
buffered_length_(AllocateBuffer(pool, 0)),
buffered_data_(AllocateBuffer(pool, 0)) {}
buffered_length_(AllocateBuffer(pool, 0)) {}

void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
DecoderImpl::SetData(num_values, data, len);
decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len);
DecodeLengths();
}

void SetDecoder(int num_values, std::shared_ptr<::arrow::bit_util::BitReader> decoder) {
num_values_ = num_values;
decoder_ = decoder;
DecodeLengths();
}

int Decode(ByteArray* buffer, int max_values) override {
// Decode up to `max_values` strings into an internal buffer
// and reference them into `buffer`.
Expand All @@ -2745,6 +2738,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int32_t data_size = 0;
const int32_t* length_ptr =
reinterpret_cast<const int32_t*>(buffered_length_->data()) + length_idx_;
int bytes_offset = len_ - decoder_->bytes_left();
for (int i = 0; i < max_values; ++i) {
int32_t len = length_ptr[i];
if (ARROW_PREDICT_FALSE(len < 0)) {
Expand All @@ -2756,13 +2750,10 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
}
}
length_idx_ += max_values;

PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size));
if (decoder_->GetBatch(8, buffered_data_->mutable_data(), data_size) != data_size) {
if (ARROW_PREDICT_FALSE(!decoder_->Advance(8 * static_cast<int64_t>(data_size)))) {
ParquetException::EofException();
}
const uint8_t* data_ptr = buffered_data_->data();

const uint8_t* data_ptr = data_ + bytes_offset;
for (int i = 0; i < max_values; ++i) {
buffer[i].ptr = data_ptr;
data_ptr += buffer[i].len;
Expand Down Expand Up @@ -2850,7 +2841,6 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int num_valid_values_;
uint32_t length_idx_;
std::shared_ptr<ResizableBuffer> buffered_length_;
std::shared_ptr<ResizableBuffer> buffered_data_;
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -3071,8 +3061,12 @@ class DeltaByteArrayDecoder : public DecoderImpl,
prefix_len_offset_ = 0;
num_valid_values_ = num_prefix;

int bytes_left = decoder_->bytes_left();
// If len < bytes_left, prefix_len_decoder.Decode will throw exception.
DCHECK_GE(len, bytes_left);
int suffix_begins = len - bytes_left;
// at this time, the decoder_ will be at the start of the encoded suffix data.
suffix_decoder_.SetDecoder(num_values, decoder_);
suffix_decoder_.SetData(num_values, data + suffix_begins, bytes_left);

// TODO: read corrupted files written with bug(PARQUET-246). last_value_ should be set
// to last_value_in_previous_page_ when decoding a new page(except the first page)
Expand Down