Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 50 additions & 20 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <vector>

#include "arrow/array.h"
#include "arrow/array/array_binary.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
#include "arrow/array/builder_primitive.h"
Expand Down Expand Up @@ -2040,23 +2041,29 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
LevelInfo leaf_info_;
};

class FLBARecordReader : public TypedRecordReader<FLBAType>,
virtual public BinaryRecordReader {
class FLBARecordReader final : public TypedRecordReader<FLBAType>,
virtual public BinaryRecordReader {
public:
FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)
: TypedRecordReader<FLBAType>(descr, leaf_info, pool, read_dense_for_nullable),
builder_(nullptr) {
byte_width_(descr_->type_length()),
empty_(byte_width_, 0),
type_(::arrow::fixed_size_binary(byte_width_)),
null_bitmap_builder_(pool),
data_builder_(pool) {
ARROW_DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
int byte_width = descr_->type_length();
std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
builder_ = std::make_unique<::arrow::FixedSizeBinaryBuilder>(type, this->pool_);
}

::arrow::ArrayVector GetBuilderChunks() override {
std::shared_ptr<::arrow::Array> chunk;
PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
return ::arrow::ArrayVector({chunk});
const int64_t null_count = null_bitmap_builder_.false_count();
const int64_t length = null_bitmap_builder_.length();
ARROW_DCHECK_EQ(length * byte_width_, data_builder_.length());
PARQUET_ASSIGN_OR_THROW(auto data_buffer, data_builder_.Finish());
PARQUET_ASSIGN_OR_THROW(auto null_bitmap, null_bitmap_builder_.Finish());
auto chunk = std::make_shared<::arrow::FixedSizeBinaryArray>(
type_, length, data_buffer, null_bitmap, null_count);
return ::arrow::ArrayVector({std::move(chunk)});
}

void ReadValuesDense(int64_t values_to_read) override {
Expand All @@ -2065,9 +2072,9 @@ class FLBARecordReader : public TypedRecordReader<FLBAType>,
this->current_decoder_->Decode(values, static_cast<int>(values_to_read));
CheckNumberDecoded(num_decoded, values_to_read);

for (int64_t i = 0; i < num_decoded; i++) {
PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
}
PARQUET_THROW_NOT_OK(null_bitmap_builder_.Reserve(num_decoded));
PARQUET_THROW_NOT_OK(data_builder_.Reserve(num_decoded * byte_width_));
UnsafeAppendDense(values, num_decoded);
ResetValues();
}

Expand All @@ -2081,22 +2088,45 @@ class FLBARecordReader : public TypedRecordReader<FLBAType>,
valid_bits, valid_bits_offset);
ARROW_DCHECK_EQ(num_decoded, values_to_read);

PARQUET_THROW_NOT_OK(null_bitmap_builder_.Reserve(num_decoded));
PARQUET_THROW_NOT_OK(data_builder_.Reserve(num_decoded * byte_width_));
if (null_count == 0) {
UnsafeAppendDense(values, num_decoded);
} else {
UnsafeAppendSpaced(values, num_decoded, valid_bits, valid_bits_offset);
}
ResetValues();
}

void UnsafeAppendDense(const FLBA* values, int64_t num_decoded) {
null_bitmap_builder_.UnsafeAppend(num_decoded, /*value=*/true);
for (int64_t i = 0; i < num_decoded; i++) {
data_builder_.UnsafeAppend(values[i].ptr, byte_width_);
}
}

void UnsafeAppendSpaced(const FLBA* values, int64_t num_decoded,
const uint8_t* valid_bits, int64_t valid_bits_offset) {
null_bitmap_builder_.UnsafeAppend(valid_bits, valid_bits_offset, num_decoded);
for (int64_t i = 0; i < num_decoded; i++) {
if (::arrow::bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
data_builder_.UnsafeAppend(values[i].ptr, byte_width_);
} else {
PARQUET_THROW_NOT_OK(builder_->AppendNull());
data_builder_.UnsafeAppend(empty_.data(), byte_width_);
}
}
ResetValues();
}

private:
std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_;
const int byte_width_;
const std::vector<uint8_t> empty_;
std::shared_ptr<::arrow::DataType> type_;
::arrow::TypedBufferBuilder<bool> null_bitmap_builder_;
::arrow::BufferBuilder data_builder_;
};

class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
virtual public BinaryRecordReader {
class ByteArrayChunkedRecordReader final : public TypedRecordReader<ByteArrayType>,
virtual public BinaryRecordReader {
public:
ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)
Expand Down Expand Up @@ -2137,8 +2167,8 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
};

class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
virtual public DictionaryRecordReader {
class ByteArrayDictionaryRecordReader final : public TypedRecordReader<ByteArrayType>,
virtual public DictionaryRecordReader {
public:
ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)
Expand Down