Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ TYPED_TEST_SUITE(TestPrimitiveWriter, TestTypes);

using TestValuesWriterInt32Type = TestPrimitiveWriter<Int32Type>;
using TestValuesWriterInt64Type = TestPrimitiveWriter<Int64Type>;
using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>;
using TestFixedLengthByteArrayValuesWriter = TestPrimitiveWriter<FLBAType>;

TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
this->TestRequiredWithEncoding(Encoding::PLAIN);
Expand Down Expand Up @@ -429,12 +431,16 @@ TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) {
this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
}

/*
TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
}

TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
/*
TYPED_TEST(TestByteArrayValuesWriter, RequiredDeltaByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
}

TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
}
*/
Expand Down Expand Up @@ -692,7 +698,6 @@ TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {

// PARQUET-979
// Prevent writing large MIN, MAX stats
using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>;
TEST_F(TestByteArrayValuesWriter, OmitStats) {
int min_len = 1024 * 4;
int max_len = 1024 * 8;
Expand Down
180 changes: 174 additions & 6 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2572,6 +2572,129 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY

// ----------------------------------------------------------------------
// DeltaLengthByteArrayEncoder

template <typename DType>
class DeltaLengthByteArrayEncoder : public EncoderImpl,
virtual public TypedEncoder<ByteArrayType> {
public:
explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
: EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY,
pool = ::arrow::default_memory_pool()),
sink_(pool),
length_encoder_(nullptr, pool),
encoded_size_{0} {}

std::shared_ptr<Buffer> FlushValues() override;

int64_t EstimatedDataEncodedSize() override {
return encoded_size_ + length_encoder_.EstimatedDataEncodedSize();
}

using TypedEncoder<ByteArrayType>::Put;

void Put(const ::arrow::Array& values) override;

void Put(const T* buffer, int num_values) override;

void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override;

protected:
template <typename ArrayType>
void PutBinaryArray(const ArrayType& array) {
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
*array.data(),
[&](::std::string_view view) {
if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
return Status::Invalid("Parquet cannot store strings with size 2GB or more");
}
length_encoder_.Put({static_cast<int32_t>(view.length())}, 1);
PARQUET_THROW_NOT_OK(sink_.Append(view.data(), view.length()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry for leading to any confusion from my previous review comments.

The input arrow::BinaryArray or arrow::LargeBinaryArray has already concatenated binary values into a large buffer. So we have a chance for optimization by computing the start offset and total length of valid values in the array. Then we can simply use memcpy or sink_.Append to append the concatenated buffer all at once.

Does this sound good? @pitrou @rok @mapleFU

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I agree with this, but maybe as an future optimization. The patch is already great and will be a bit complex if memcpy is used...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give it another try and report back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give it another try and report back.

Let's wait for pitrou's idea... Seems modifing same line back and back again is really a torment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just test locally and won't push changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I agree with this, but maybe as an future optimization. The patch is already great and will be a bit complex if memcpy is used...

I agree. Just want to clarify my purpose but not required for this patch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a followup PR, since it's a non-trivial optimization.

return Status::OK();
},
[]() { return Status::OK(); }));
}

::arrow::BufferBuilder sink_;
DeltaBitPackEncoder<Int32Type> length_encoder_;
uint32_t encoded_size_;
};

template <typename DType>
void DeltaLengthByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
AssertBaseBinary(values);
if (::arrow::is_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
} else {
PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
}
}

template <typename DType>
void DeltaLengthByteArrayEncoder<DType>::Put(const T* src, int num_values) {
if (num_values == 0) {
return;
}

constexpr int kBatchSize = 256;
std::array<int32_t, kBatchSize> lengths;
for (int idx = 0; idx < num_values; idx += kBatchSize) {
const int batch_size = std::min(kBatchSize, num_values - idx);
for (int j = 0; j < batch_size; ++j) {
const int32_t len = src[idx + j].len;
if (AddWithOverflow(encoded_size_, len, &encoded_size_)) {
throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY");
}
lengths[j] = len;
}
length_encoder_.Put(lengths.data(), batch_size);
}

PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_));
for (int idx = 0; idx < num_values; idx++) {
sink_.UnsafeAppend(src[idx].ptr, src[idx].len);
}
}

template <typename DType>
void DeltaLengthByteArrayEncoder<DType>::PutSpaced(const T* src, int num_values,
const uint8_t* valid_bits,
int64_t valid_bits_offset) {
if (valid_bits != NULLPTR) {
PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
T* data = reinterpret_cast<T*>(buffer->mutable_data());
int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
src, num_values, valid_bits, valid_bits_offset, data);
Put(data, num_valid_values);
} else {
Put(src, num_values);
}
}

template <typename DType>
std::shared_ptr<Buffer> DeltaLengthByteArrayEncoder<DType>::FlushValues() {
std::shared_ptr<Buffer> encoded_lengths = length_encoder_.FlushValues();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to change FlushValues to return a ResizableBuffer? If so, you can simply resize encoded_lengths and then memcpy the sink data to it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would require changing return type of DeltaBitPackEncoder to ResizableBuffer. I'll check if it works out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also have some kind of FlushValuesInternal if FlushValues is a public API...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added: bb95765


std::shared_ptr<Buffer> data;
PARQUET_THROW_NOT_OK(sink_.Finish(&data));
sink_.Reset();

PARQUET_THROW_NOT_OK(sink_.Resize(encoded_lengths->size() + data->size()));
PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), encoded_lengths->size()));
PARQUET_THROW_NOT_OK(sink_.Append(data->data(), data->size()));

std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
encoded_size_ = 0;
return buffer;
}

// ----------------------------------------------------------------------
// DeltaLengthByteArrayDecoder

class DeltaLengthByteArrayDecoder : public DecoderImpl,
virtual public TypedDecoder<ByteArrayType> {
public:
Expand Down Expand Up @@ -2636,13 +2759,17 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
valid_bits_offset, out, &result));
return result;
}

int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::DictAccumulator* out) override {
ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
ParquetException::NYI(
"DecodeArrow of DictAccumulator for DeltaLengthByteArrayDecoder");
}

private:
Expand All @@ -2664,6 +2791,44 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
num_valid_values_ = num_length;
}

Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator* out,
int* out_num_values) {
ArrowBinaryHelper helper(out);

std::vector<ByteArray> values(num_values - null_count);
const int num_valid_values = Decode(values.data(), num_values - null_count);
if (ARROW_PREDICT_FALSE(num_values - null_count != num_valid_values)) {
throw ParquetException("Expected to decode ", num_values - null_count,
" values, but decoded ", num_valid_values, " values.");
}

auto values_ptr = values.data();
int value_idx = 0;

RETURN_NOT_OK(VisitNullBitmapInline(
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
const auto& val = values_ptr[value_idx];
if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
RETURN_NOT_OK(helper.PushChunk());
}
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++value_idx;
return Status::OK();
},
[&]() {
RETURN_NOT_OK(helper.AppendNull());
--null_count;
return Status::OK();
}));

DCHECK_EQ(null_count, 0);
*out_num_values = num_valid_values;
return Status::OK();
}

std::shared_ptr<::arrow::bit_util::BitReader> decoder_;
DeltaBitPackDecoder<Int32Type> len_decoder_;
int num_valid_values_;
Expand Down Expand Up @@ -3075,7 +3240,6 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encodin
return std::make_unique<ByteStreamSplitEncoder<DoubleType>>(descr, pool);
default:
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
break;
}
} else if (encoding == Encoding::DELTA_BINARY_PACKED) {
switch (type_num) {
Expand All @@ -3086,7 +3250,13 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encodin
default:
throw ParquetException(
"DELTA_BINARY_PACKED encoder only supports INT32 and INT64");
break;
}
} else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) {
switch (type_num) {
case Type::BYTE_ARRAY:
return std::make_unique<DeltaLengthByteArrayEncoder<ByteArrayType>>(descr, pool);
default:
throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY");
}
} else {
ParquetException::NYI("Selected encoding is not supported");
Expand Down Expand Up @@ -3126,7 +3296,6 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encodin
return std::make_unique<ByteStreamSplitDecoder<DoubleType>>(descr);
default:
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
break;
}
} else if (encoding == Encoding::DELTA_BINARY_PACKED) {
switch (type_num) {
Expand All @@ -3137,7 +3306,6 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encodin
default:
throw ParquetException(
"DELTA_BINARY_PACKED decoder only supports INT32 and INT64");
break;
}
} else if (encoding == Encoding::DELTA_BYTE_ARRAY) {
if (type_num == Type::BYTE_ARRAY) {
Expand Down
Loading