Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 149 additions & 2 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -852,12 +852,23 @@ class ColumnReaderImplBase {
current_encoding_ = encoding;
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
if (!hasSet_uses_opt_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stick to snake_case for variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't mix camel and snake case.
has_set_uses_opt

if (current_encoding_ == Encoding::PLAIN_DICTIONARY ||
current_encoding_ == Encoding::PLAIN ||
current_encoding_ == Encoding::RLE_DICTIONARY) {
Comment on lines +856 to +858
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all these cases covered by UT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,
This patch is from one customer's case. They want to boost String scan performance.
So this patch is just for parquet general encodings: PLAIN , RLE_DICTIONARY, PLAIN_DICTIONARY.

Other encodings will skip this optimization.

Existed UTs will cover all encodings cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why other encodings are not supported?

uses_opt_ = true;
}
hasSet_uses_opt_ = true;
}
}

int64_t available_values_current_page() const {
return num_buffered_values_ - num_decoded_values_;
}

bool hasSet_uses_opt_ = false;
bool uses_opt_ = false;
Comment on lines +869 to +870
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these two flags really necessary?
Looks to me a trivial helper function is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two flags is for
if (current_encoding_ == Encoding::PLAIN_DICTIONARY || current_encoding_ == Encoding::PLAIN || current_encoding_ == Encoding::RLE_DICTIONARY)

Just avoid comparing every time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to make it simple. I don't think there's any performance consideration here.
E.g., define a helper function
bool UsesOpt() const { return (current_encoding == xxx || ....; }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create a separate function for that, as Yibo suggested? If you do measure a meaningful performance difference, could you share your results then?

In addition, could you add a comment explaining why the optimization is only applicable to those those three encodings?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, also please find a more descriptive name than "uses optimization". (which optimization?)


const ColumnDescriptor* descr_;
const int16_t max_def_level_;
const int16_t max_rep_level_;
Expand Down Expand Up @@ -1594,6 +1605,8 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
}
}

std::shared_ptr<ResizableBuffer> ReleaseOffsets() override { return nullptr; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::shared_ptr<ResizableBuffer> ReleaseOffsets() override { return nullptr; }
virtual std::shared_ptr<ResizableBuffer> ReleaseOffsets() { return nullptr; }


std::shared_ptr<ResizableBuffer> ReleaseIsValid() override {
if (leaf_info_.HasNullableValues()) {
auto result = valid_bits_;
Expand Down Expand Up @@ -1697,7 +1710,7 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
}
}

void ReserveValues(int64_t extra_values) {
void ReserveValues(int64_t extra_values) override {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void ReserveValues(int64_t extra_values) override {
virtual void ReserveValues(int64_t extra_values) {

const int64_t new_values_capacity =
UpdateCapacity(values_capacity_, values_written_, extra_values);
if (new_values_capacity > values_capacity_) {
Expand Down Expand Up @@ -1959,6 +1972,138 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
};

class ByteArrayChunkedOptRecordReader : public TypedRecordReader<ByteArrayType>,
virtual public BinaryRecordReader {
public:
ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool)
: TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) {
DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
values_ = AllocateBuffer(pool);
offset_ = AllocateBuffer(pool);
}

::arrow::ArrayVector GetBuilderChunks() override {
if (uses_opt_) {
std::vector<std::shared_ptr<Buffer>> buffers = {ReleaseIsValid(), ReleaseOffsets(),
ReleaseValues()};
auto data = std::make_shared<::arrow::ArrayData>(
::arrow::binary(), values_written(), buffers, null_count());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ::arrow::binary() correct? Will it cause type mismatch if actual type is ::arrow::utf8()? For example, when checking equality with a StringArray, the type identity may be broken if one side is binary and the other is utf8.


auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)});
return chunks;
} else {
::arrow::ArrayVector result = accumulator_.chunks;
if (result.size() == 0 || accumulator_.builder->length() > 0) {
std::shared_ptr<::arrow::Array> last_chunk;
PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
result.push_back(std::move(last_chunk));
}
accumulator_.chunks = {};
return result;
Comment on lines +1996 to +2004
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicates ByteArrayChunkedRecordReader? This doesn't look right.

Copy link
Contributor Author

@zhixingheyi-tian zhixingheyi-tian Dec 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optimized RecordReader implementation is ByteArrayChunkedOptRecordReader

https://github.com/zhixingheyi-tian/arrow/blob/8d611c673d3ed23abfa32f50da3cc4c4e4feb807/cpp/src/parquet/column_reader.cc#L2203-L2206

And is just for Binary/String/LargeBinary/LargeString types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably means it's not good to create a new class for this optimized reader.

That said, I don't have deep knowlege of parquet code and lack bandwidth recently to investigate, someone else might comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, would shouldn't create another class. I don't see any reason this can't be used for the Decimal case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've found locally that if I merge the implementations, the unit tests pass. Could you please merge them in the PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjones127
Do you merge the two class: ByteArrayChunkedRecordReader and ByteArrayChunkedOptRecordReader, , all unit tests passed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, those are the two classes I merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the changeset that allows the parquet-arrow-test and parquet-arrow-internals-test to pass. zhixingheyi-tian#2

Could you incorporate those changes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for not adding a separate class. This would be difficult to maintain if more optimization will be added. It would be better if an option can be added so that user can manually turn it off when something goes wrong with the new feature.

}
}

void ReadValuesDense(int64_t values_to_read) override {
if (uses_opt_) {
int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy(
static_cast<int>(values_to_read), 0, NULLPTR,
(reinterpret_cast<int32_t*>(offset_->mutable_data()) + values_written_),
values_, 0, &binary_length_);
DCHECK_EQ(num_decoded, values_to_read);
} else {
int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
static_cast<int>(values_to_read), &accumulator_);
CheckNumberDecoded(num_decoded, values_to_read);
ResetValues();
}
}

void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
if (uses_opt_) {
int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy(
static_cast<int>(values_to_read), static_cast<int>(null_count),
valid_bits_->mutable_data(),
(reinterpret_cast<int32_t*>(offset_->mutable_data()) + values_written_),
values_, values_written_, &binary_length_);
DCHECK_EQ(num_decoded, values_to_read - null_count);
} else {
int64_t num_decoded = this->current_decoder_->DecodeArrow(
static_cast<int>(values_to_read), static_cast<int>(null_count),
valid_bits_->mutable_data(), values_written_, &accumulator_);
CheckNumberDecoded(num_decoded, values_to_read - null_count);
ResetValues();
}
}

void ReserveValues(int64_t extra_values) override {
const int64_t new_values_capacity =
UpdateCapacity(values_capacity_, values_written_, extra_values);
if (new_values_capacity > values_capacity_) {
PARQUET_THROW_NOT_OK(
values_->Resize(new_values_capacity * binary_per_row_length_, false));
Comment on lines +2044 to +2045
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we make this an option:

Suggested change
PARQUET_THROW_NOT_OK(
values_->Resize(new_values_capacity * binary_per_row_length_, false));
int64_t per_row_length = binary_per_row_length_.value_or(kDefaultBinaryPerRowSize);
PARQUET_THROW_NOT_OK(
values_->Resize(new_values_capacity * per_row_length, false));

PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, false));

auto offset = reinterpret_cast<int32_t*>(offset_->mutable_data());
offset[0] = 0;

values_capacity_ = new_values_capacity;
}
if (leaf_info_.HasNullableValues()) {
int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_);
if (valid_bits_->size() < valid_bytes_new) {
int64_t valid_bytes_old = bit_util::BytesForBits(values_written_);
PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
// Avoid valgrind warnings
memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
valid_bytes_new - valid_bytes_old);
}
}
}
std::shared_ptr<ResizableBuffer> ReleaseValues() override {
auto result = values_;
values_ = AllocateBuffer(this->pool_);
values_capacity_ = 0;
return result;
}
std::shared_ptr<ResizableBuffer> ReleaseOffsets() override {
auto result = offset_;
if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we make it an optional:

Suggested change
if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) {
if (ARROW_PREDICT_FALSE(!binary_per_row_length_.has_value())) {

auto offsetArr = reinterpret_cast<int32_t*>(offset_->mutable_data());
const auto first_offset = offsetArr[0];
const auto last_offset = offsetArr[values_written_];
int64_t binary_length = last_offset - first_offset;
binary_per_row_length_ = binary_length / values_written_ + 1;
hasCal_average_len_ = true;
}
offset_ = AllocateBuffer(this->pool_);
binary_length_ = 0;
return result;
}
void ResetValues() {
if (values_written_ > 0) {
// Resize to 0, but do not shrink to fit
PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the unit tests (parquet-arrow-test) with a debugger, and found this branch was never hit. Does that seem right? Could you add a test that validates this branch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just follow here:

void ResetValues() {
if (values_written_ > 0) {
// Resize to 0, but do not shrink to fit
if (uses_values_) {
PARQUET_THROW_NOT_OK(values_->Resize(0, /*shrink_to_fit=*/false));
}

PARQUET_THROW_NOT_OK(offset_->Resize(0, false));
PARQUET_THROW_NOT_OK(values_->Resize(0, false));

values_written_ = 0;
values_capacity_ = 0;
null_count_ = 0;
binary_length_ = 0;
}
}

private:
// Helper data structure for accumulating builder chunks
typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;

int32_t binary_length_ = 0;

std::shared_ptr<::arrow::ResizableBuffer> offset_;
};

class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
virtual public DictionaryRecordReader {
public:
Expand Down Expand Up @@ -2056,8 +2201,10 @@ std::shared_ptr<RecordReader> MakeByteArrayRecordReader(const ColumnDescriptor*
bool read_dictionary) {
if (read_dictionary) {
return std::make_shared<ByteArrayDictionaryRecordReader>(descr, leaf_info, pool);
} else {
} else if (descr->logical_type()->is_decimal()) {
return std::make_shared<ByteArrayChunkedRecordReader>(descr, leaf_info, pool);
} else {
return std::make_shared<ByteArrayChunkedOptRecordReader>(descr, leaf_info, pool);
}
}

Expand Down
9 changes: 9 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
// 16 KB is the default expected page header size
static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;

static constexpr int32_t kDefaultBinaryPerRowSize = 20;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this corresponds to binary_per_row_length_, could we make the names match? I'm thinking "bytes per row" is the best description here:

Suggested change
static constexpr int32_t kDefaultBinaryPerRowSize = 20;
static constexpr int32_t kDefaultBinaryBytesPerRow = 20;

(Also change binary_per_row_length_ to binary_bytes_per_row_)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to add a comment here.


class PARQUET_EXPORT LevelDecoder {
public:
LevelDecoder();
Expand Down Expand Up @@ -291,6 +293,8 @@ class PARQUET_EXPORT RecordReader {
/// \brief Pre-allocate space for data. Results in better flat read performance
virtual void Reserve(int64_t num_values) = 0;

virtual void ReserveValues(int64_t capacity) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new interface? Are these added data members and functions necessary for this base class? I suppose they are only for the new reader implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously. it's TypedRecordReader internal interface,

void ReserveValues(int64_t extra_values) {
const int64_t new_values_capacity =
UpdateCapacity(values_capacity_, values_written_, extra_values);
if (new_values_capacity > values_capacity_) {
// XXX(wesm): A hack to avoid memory allocation when reading directly
// into builder classes
if (uses_values_) {
PARQUET_THROW_NOT_OK(values_->Resize(bytes_for_values(new_values_capacity),
/*shrink_to_fit=*/false));
}
values_capacity_ = new_values_capacity;
}
if (leaf_info_.HasNullableValues()) {
int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_);
if (valid_bits_->size() < valid_bytes_new) {
int64_t valid_bytes_old = bit_util::BytesForBits(values_written_);
PARQUET_THROW_NOT_OK(
valid_bits_->Resize(valid_bytes_new, /*shrink_to_fit=*/false));
// Avoid valgrind warnings
memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
valid_bytes_new - valid_bytes_old);
}
.

And ByteArrayChunkedOptRecordReader extends from TypedRecordReader, so extract it as public interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it just a helper function specific to implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are coming from TypeRecordReader, which is private, could you mark it's methods as virtual instead?

Suggested change
virtual void ReserveValues(int64_t capacity) {}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to make it pure virtual? In addition, it helps to add a comment for public function.


/// \brief Clear consumed values and repetition/definition levels as the
/// result of calling ReadRecords
virtual void Reset() = 0;
Expand All @@ -299,6 +303,8 @@ class PARQUET_EXPORT RecordReader {
/// allocated in subsequent ReadRecords calls
virtual std::shared_ptr<ResizableBuffer> ReleaseValues() = 0;

virtual std::shared_ptr<ResizableBuffer> ReleaseOffsets() = 0;

Comment on lines +306 to +307
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
virtual std::shared_ptr<ResizableBuffer> ReleaseOffsets() = 0;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here for a comment.

/// \brief Transfer filled validity bitmap buffer to caller. A new one will
/// be allocated in subsequent ReadRecords calls
virtual std::shared_ptr<ResizableBuffer> ReleaseIsValid() = 0;
Expand Down Expand Up @@ -370,6 +376,9 @@ class PARQUET_EXPORT RecordReader {
int64_t values_capacity_;
int64_t null_count_;

bool hasCal_average_len_ = false;
int64_t binary_per_row_length_ = kDefaultBinaryPerRowSize;
Comment on lines +379 to +380
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, I think this would be clearer as a std::optional, rather than a boolean on the side.
Second, please document the purpose of these fields in the header file.

Suggested change
bool hasCal_average_len_ = false;
int64_t binary_per_row_length_ = kDefaultBinaryPerRowSize;
/// \brief Typical size of single binary value, used for pre-allocating value buffer.
///
/// Before this is set, kDefaultBinaryPerRowSize is used. After the first
/// batch of values, this is set to the size of the values buffer divided by
/// the number of values.
std::optional<int64_t> binary_per_row_length_ = std::nullopt;


/// \brief Each bit corresponds to one element in 'values_' and specifies if it
/// is null or not null.
std::shared_ptr<::arrow::ResizableBuffer> valid_bits_;
Expand Down
Loading