diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 2f70c286503..dc9b41793cf 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -203,9 +203,10 @@ class BitReader { }; inline bool BitWriter::PutValue(uint64_t v, int num_bits) { - // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases) - DCHECK_LE(num_bits, 32); - DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits; + DCHECK_LE(num_bits, 64); + if (num_bits < 64) { + DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits; + } if (ARROW_PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false; @@ -220,7 +221,8 @@ inline bool BitWriter::PutValue(uint64_t v, int num_bits) { buffered_values_ = 0; byte_offset_ += 8; bit_offset_ -= 64; - buffered_values_ = v >> (num_bits - bit_offset_); + buffered_values_ = + (num_bits - bit_offset_ == 64) ? 0 : (v >> (num_bits - bit_offset_)); } DCHECK_LT(bit_offset_, 64); return true; diff --git a/cpp/src/arrow/util/rle_encoding_test.cc b/cpp/src/arrow/util/rle_encoding_test.cc index 52f355daf21..01d1ffd767f 100644 --- a/cpp/src/arrow/util/rle_encoding_test.cc +++ b/cpp/src/arrow/util/rle_encoding_test.cc @@ -173,6 +173,40 @@ TEST(BitArray, TestMixed) { } } +// Write up to 'num_vals' values with width 'bit_width' and reads them back. +static void TestPutValue(int bit_width, uint64_t num_vals) { + // The max value representable in `bit_width` bits. + const uint64_t max = std::numeric_limits::max() >> (64 - bit_width); + num_vals = std::min(num_vals, max); + int len = static_cast(bit_util::BytesForBits(bit_width * num_vals)); + EXPECT_GT(len, 0); + + std::vector buffer(len); + bit_util::BitWriter writer(buffer.data(), len); + for (uint64_t i = max - num_vals; i < max; i++) { + bool result = writer.PutValue(i, bit_width); + EXPECT_TRUE(result); + } + writer.Flush(); + EXPECT_EQ(writer.bytes_written(), len); + + bit_util::BitReader reader(buffer.data(), len); + for (uint64_t i = max - num_vals; i < max; i++) { + int64_t val = 0; + bool result = reader.GetValue(bit_width, &val); + EXPECT_TRUE(result); + EXPECT_EQ(val, i); + } + EXPECT_EQ(reader.bytes_left(), 0); +} + +TEST(BitUtil, RoundTripIntValues) { + for (int width = 1; width < 64; width++) { + TestPutValue(width, 1); + TestPutValue(width, 1024); + } +} + // Validates encoding of values by encoding and decoding them. If // expected_encoding != NULL, also validates that the encoded buffer is // exactly 'expected_encoding'. diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 2cd21628b3f..0da78264832 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -400,7 +400,8 @@ typedef ::testing::Types; +using TestValuesWriterInt32Type = TestPrimitiveWriter; +using TestValuesWriterInt64Type = TestPrimitiveWriter; TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { this->TestRequiredWithEncoding(Encoding::PLAIN); @@ -418,11 +419,17 @@ TYPED_TEST(TestPrimitiveWriter, RequiredRLE) { TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) { this->TestRequiredWithEncoding(Encoding::BIT_PACKED); } +*/ + +TEST_F(TestValuesWriterInt32Type, RequiredDeltaBinaryPacked) { + this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); +} -TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) { +TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) { this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); } +/* TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); } @@ -430,11 +437,11 @@ TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) { TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); } +*/ TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) { this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY); } -*/ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) { this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, @@ -647,7 +654,7 @@ TEST(TestWriter, NullValuesBuffer) { // PARQUET-719 // Test case for NULL values -TEST_F(TestNullValuesWriter, OptionalNullValueChunk) { +TEST_F(TestValuesWriterInt32Type, OptionalNullValueChunk) { this->SetUpSchema(Repetition::OPTIONAL); this->GenerateData(LARGE_SIZE); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 44f762d7113..4923870e9e6 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2060,6 +2060,285 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, } }; +// ---------------------------------------------------------------------- +// DeltaBitPackEncoder + +/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format +/// as per the parquet spec. See: +/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5 +/// +/// Consists of a header followed by blocks of delta encoded values binary packed. +/// +/// Format +/// [header] [block 1] [block 2] ... [block N] +/// +/// Header +/// [block size] [number of mini blocks per block] [total value count] [first value] +/// +/// Block +/// [min delta] [list of bitwidths of the mini blocks] [miniblocks] +/// +/// Sets aside bytes at the start of the internal buffer where the header will be written, +/// and only writes the header when FlushValues is called before returning it. +/// +/// To encode a block, we will: +/// +/// 1. Compute the differences between consecutive elements. For the first element in the +/// block, use the last element in the previous block or, in the case of the first block, +/// use the first value of the whole sequence, stored in the header. +/// +/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract +/// this min delta from all deltas in the block. This guarantees that all values are +/// non-negative. +/// +/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the +/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed +/// per mini block. +/// +/// Supports only INT32 and INT64. + +template +class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder { + // Maximum possible header size + static constexpr uint32_t kMaxPageHeaderWriterSize = 32; + static constexpr uint32_t kValuesPerBlock = 128; + static constexpr uint32_t kMiniBlocksPerBlock = 4; + + public: + using T = typename DType::c_type; + using UT = std::make_unsigned_t; + using TypedEncoder::Put; + + explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool, + const uint32_t values_per_block = kValuesPerBlock, + const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock) + : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool), + values_per_block_(values_per_block), + mini_blocks_per_block_(mini_blocks_per_block), + values_per_mini_block_(values_per_block / mini_blocks_per_block), + deltas_(values_per_block, ::arrow::stl::allocator(pool)), + bits_buffer_( + AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * sizeof(T))), + sink_(pool), + bit_writer_(bits_buffer_->mutable_data(), + static_cast(bits_buffer_->size())) { + if (values_per_block_ % 128 != 0) { + throw ParquetException( + "the number of values in a block must be multiple of 128, but it's " + + std::to_string(values_per_block_)); + } + if (values_per_mini_block_ % 32 != 0) { + throw ParquetException( + "the number of values in a miniblock must be multiple of 32, but it's " + + std::to_string(values_per_mini_block_)); + } + if (values_per_block % mini_blocks_per_block != 0) { + throw ParquetException( + "the number of values per block % number of miniblocks per block must be 0, " + "but it's " + + std::to_string(values_per_block % mini_blocks_per_block)); + } + // Reserve enough space at the beginning of the buffer for largest possible header. + PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize)); + } + + std::shared_ptr FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { return sink_.length(); } + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override; + + void FlushBlock(); + + private: + const uint32_t values_per_block_; + const uint32_t mini_blocks_per_block_; + const uint32_t values_per_mini_block_; + uint32_t values_current_block_{0}; + uint32_t total_value_count_{0}; + UT first_value_{0}; + UT current_value_{0}; + ArrowPoolVector deltas_; + std::shared_ptr bits_buffer_; + ::arrow::BufferBuilder sink_; + ::arrow::bit_util::BitWriter bit_writer_; +}; + +template +void DeltaBitPackEncoder::Put(const T* src, int num_values) { + if (num_values == 0) { + return; + } + + int idx = 0; + if (total_value_count_ == 0) { + current_value_ = src[0]; + first_value_ = current_value_; + idx = 1; + } + total_value_count_ += num_values; + + while (idx < num_values) { + UT value = static_cast(src[idx]); + // Calculate deltas. The possible overflow is handled by use of unsigned integers + // making subtraction operations well defined and correct even in case of overflow. + // Encoded integers will wrap back around on decoding. + // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n + deltas_[values_current_block_] = value - current_value_; + current_value_ = value; + idx++; + values_current_block_++; + if (values_current_block_ == values_per_block_) { + FlushBlock(); + } + } +} + +template +void DeltaBitPackEncoder::FlushBlock() { + if (values_current_block_ == 0) { + return; + } + + const UT min_delta = + *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); + bit_writer_.PutZigZagVlqInt(static_cast(min_delta)); + + // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write + // bit widths of miniblocks as they become known during the encoding. + uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); + DCHECK(bit_width_data != nullptr); + + const uint32_t num_miniblocks = + static_cast(std::ceil(static_cast(values_current_block_) / + static_cast(values_per_mini_block_))); + for (uint32_t i = 0; i < num_miniblocks; i++) { + const uint32_t values_current_mini_block = + std::min(values_per_mini_block_, values_current_block_); + + const uint32_t start = i * values_per_mini_block_; + const UT max_delta = *std::max_element( + deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block); + + // The minimum number of bits required to write any of values in deltas_ vector. + // See overflow comment above. + const auto bit_width = bit_width_data[i] = + bit_util::NumRequiredBits(max_delta - min_delta); + + for (uint32_t j = start; j < start + values_current_mini_block; j++) { + // See overflow comment above. + const UT value = deltas_[j] - min_delta; + bit_writer_.PutValue(value, bit_width); + } + // If there are not enough values to fill the last mini block, we pad the mini block + // with zeroes so that its length is the number of values in a full mini block + // multiplied by the bit width. + for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) { + bit_writer_.PutValue(0, bit_width); + } + values_current_block_ -= values_current_mini_block; + } + + // If, in the last block, less than miniblocks are + // needed to store the values, the bytes storing the bit widths of the unneeded + // miniblocks are still present, their value should be zero, but readers must accept + // arbitrary values as well. + for (uint32_t i = num_miniblocks; i < mini_blocks_per_block_; i++) { + bit_width_data[i] = 0; + } + DCHECK_EQ(values_current_block_, 0); + + bit_writer_.Flush(); + PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); + bit_writer_.Clear(); +} + +template +std::shared_ptr DeltaBitPackEncoder::FlushValues() { + if (values_current_block_ > 0) { + FlushBlock(); + } + PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); + + uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; + bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); + if (!header_writer.PutVlqInt(values_per_block_) || + !header_writer.PutVlqInt(mini_blocks_per_block_) || + !header_writer.PutVlqInt(total_value_count_) || + !header_writer.PutZigZagVlqInt(static_cast(first_value_))) { + throw ParquetException("header writing error"); + } + header_writer.Flush(); + + // We reserved enough space at the beginning of the buffer for largest possible header + // and data was written immediately after. We now write the header data immediately + // before the end of reserved space. + const size_t offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written(); + std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_, + header_writer.bytes_written()); + + // Excess bytes at the beginning are sliced off and ignored. + return SliceBuffer(buffer, offset_bytes); +} + +template <> +void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + const ::arrow::ArrayData& data = *values.data(); + if (values.type_id() != ::arrow::Type::INT32) { + throw ParquetException("Expected Int32TArray, got ", values.type()->ToString()); + } + if (data.length > std::numeric_limits::max()) { + throw ParquetException("Array cannot be longer than ", + std::numeric_limits::max()); + } + + if (values.null_count() == 0) { + Put(data.GetValues(1), static_cast(data.length)); + } else { + PutSpaced(data.GetValues(1), static_cast(data.length), + data.GetValues(0, 0), data.offset); + } +} + +template <> +void DeltaBitPackEncoder::Put(const ::arrow::Array& values) { + const ::arrow::ArrayData& data = *values.data(); + if (values.type_id() != ::arrow::Type::INT64) { + throw ParquetException("Expected Int64TArray, got ", values.type()->ToString()); + } + if (data.length > std::numeric_limits::max()) { + throw ParquetException("Array cannot be longer than ", + std::numeric_limits::max()); + } + if (values.null_count() == 0) { + Put(data.GetValues(1), static_cast(data.length)); + } else { + PutSpaced(data.GetValues(1), static_cast(data.length), + data.GetValues(0, 0), data.offset); + } +} + +template +void DeltaBitPackEncoder::PutSpaced(const T* src, int num_values, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } +} + // ---------------------------------------------------------------------- // DeltaBitPackDecoder @@ -2067,6 +2346,7 @@ template class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder { public: typedef typename DType::c_type T; + using UT = std::make_unsigned_t; explicit DeltaBitPackDecoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) @@ -2141,6 +2421,11 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder(min_delta_) + static_cast(buffer[i + j]); - buffer[i + j] = static_cast(delta + static_cast(last_value_)); + // unsigned addition. Overflow is as expected. + buffer[i + j] = static_cast(min_delta_) + static_cast(buffer[i + j]) + + static_cast(last_value_); last_value_ = buffer[i + j]; } values_current_mini_block_ -= values_decode; @@ -2760,6 +3044,17 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); break; } + } else if (encoding == Encoding::DELTA_BINARY_PACKED) { + switch (type_num) { + case Type::INT32: + return std::unique_ptr(new DeltaBitPackEncoder(descr, pool)); + case Type::INT64: + return std::unique_ptr(new DeltaBitPackEncoder(descr, pool)); + default: + throw ParquetException( + "DELTA_BINARY_PACKED encoder only supports INT32 and INT64"); + break; + } } else { ParquetException::NYI("Selected encoding is not supported"); } @@ -2807,7 +3102,8 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin case Type::INT64: return std::make_unique>(descr); default: - throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64"); + throw ParquetException( + "DELTA_BINARY_PACKED decoder only supports INT32 and INT64"); break; } } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 7d42e3e8ce3..f0a5f32c413 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -124,6 +124,12 @@ void GenerateData(int num_values, T* out, std::vector* heap) { std::numeric_limits::max(), out); } +template +void GenerateBoundData(int num_values, T* out, T min, T max, std::vector* heap) { + // seed the prng so failure is deterministic + random_numbers(num_values, 0, min, max, out); +} + template <> void GenerateData(int num_values, bool* out, std::vector* heap) { // seed the prng so failure is deterministic @@ -1276,5 +1282,131 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) { ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); } +// ---------------------------------------------------------------------- +// DELTA_BINARY_PACKED encode/decode tests. + +template +class TestDeltaBitPackEncoding : public TestEncodingBase { + public: + using c_type = typename Type::c_type; + static constexpr int TYPE = Type::type_num; + + void InitBoundData(int nvalues, int repeats, c_type half_range) { + num_values_ = nvalues * repeats; + input_bytes_.resize(num_values_ * sizeof(c_type)); + output_bytes_.resize(num_values_ * sizeof(c_type)); + draws_ = reinterpret_cast(input_bytes_.data()); + decode_buf_ = reinterpret_cast(output_bytes_.data()); + GenerateBoundData(nvalues, draws_, -half_range, half_range, &data_buffer_); + + // add some repeated values + for (int j = 1; j < repeats; ++j) { + for (int i = 0; i < nvalues; ++i) { + draws_[nvalues * j + i] = draws_[i]; + } + } + } + + void ExecuteBound(int nvalues, int repeats, c_type half_range) { + InitBoundData(nvalues, repeats, half_range); + CheckRoundtrip(); + } + + void ExecuteSpacedBound(int nvalues, int repeats, int64_t valid_bits_offset, + double null_probability, c_type half_range) { + InitBoundData(nvalues, repeats, half_range); + + int64_t size = num_values_ + valid_bits_offset; + auto rand = ::arrow::random::RandomArrayGenerator(1923); + const auto array = rand.UInt8(size, 0, 100, null_probability); + const auto valid_bits = array->null_bitmap_data(); + CheckRoundtripSpaced(valid_bits, valid_bits_offset); + } + + void CheckRoundtrip() override { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); + + encoder->Put(draws_, num_values_); + encode_buffer_ = encoder->FlushValues(); + + decoder->SetData(num_values_, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + int values_decoded = decoder->Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResults(decode_buf_, draws_, num_values_)); + } + + void CheckRoundtripSpaced(const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + auto encoder = + MakeTypedEncoder(Encoding::DELTA_BINARY_PACKED, false, descr_.get()); + auto decoder = MakeTypedDecoder(Encoding::DELTA_BINARY_PACKED, descr_.get()); + int null_count = 0; + for (auto i = 0; i < num_values_; i++) { + if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) { + null_count++; + } + } + + encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset); + encode_buffer_ = encoder->FlushValues(); + decoder->SetData(num_values_ - null_count, encode_buffer_->data(), + static_cast(encode_buffer_->size())); + auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count, + valid_bits, valid_bits_offset); + ASSERT_EQ(num_values_, values_decoded); + ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced(decode_buf_, draws_, num_values_, + valid_bits, valid_bits_offset)); + } + + protected: + USING_BASE_MEMBERS(); + std::vector input_bytes_; + std::vector output_bytes_; +}; + +using TestDeltaBitPackEncodingTypes = ::testing::Types; +TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes); + +TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) { + using T = typename TypeParam::c_type; + int values_per_block = 128; + int values_per_mini_block = 32; + + // Size a multiple of miniblock size + ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_mini_block * 10, 10)); + // Size a multiple of block size + ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_block * 10, 10)); + // Size multiple of neither miniblock nor block size + ASSERT_NO_FATAL_FAILURE( + this->Execute((values_per_mini_block * values_per_block) + 1, 10)); + ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, + /*null_probability*/ 0.1)); + + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(2000, 2000, 0)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, + /*null_probability*/ 0.1, + /*half_range*/ 0)); + + const int max_bitwidth = sizeof(T) * 8; + std::vector bitwidths = { + 1, 2, 3, 5, 8, 11, 16, max_bitwidth - 8, max_bitwidth - 1, max_bitwidth}; + for (int bitwidth : bitwidths) { + T half_range = + std::numeric_limits::max() >> static_cast(max_bitwidth - bitwidth); + + ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200, half_range)); + ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound( + /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, + /*null_probability*/ 0.1, + /*half_range*/ half_range)); + } +} + } // namespace test } // namespace parquet diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst index 23a9657fd41..edc42d54cff 100644 --- a/docs/source/cpp/parquet.rst +++ b/docs/source/cpp/parquet.rst @@ -398,7 +398,7 @@ Encodings +--------------------------+----------+----------+---------+ | BYTE_STREAM_SPLIT | ✓ | ✓ | | +--------------------------+----------+----------+---------+ -| DELTA_BINARY_PACKED | ✓ | | | +| DELTA_BINARY_PACKED | ✓ | ✓ | | +--------------------------+----------+----------+---------+ | DELTA_BYTE_ARRAY | ✓ | | | +--------------------------+----------+----------+---------+ diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index 004bbd8d77f..b2809e0f910 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -405,6 +405,13 @@ def test_column_encoding(use_legacy_dataset): column_encoding="PLAIN", use_legacy_dataset=use_legacy_dataset) + # Check "DELTA_BINARY_PACKED" for integer columns. + _check_roundtrip(mixed_table, expected=mixed_table, + use_dictionary=False, + column_encoding={'a': "PLAIN", + 'b': "DELTA_BINARY_PACKED"}, + use_legacy_dataset=use_legacy_dataset) + # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'. # This should throw an error as it is only supports FLOAT and DOUBLE. with pytest.raises(IOError, @@ -415,14 +422,12 @@ def test_column_encoding(use_legacy_dataset): column_encoding={'b': "BYTE_STREAM_SPLIT"}, use_legacy_dataset=use_legacy_dataset) - # Try to pass "DELTA_BINARY_PACKED". - # This should throw an error as it is only supported for reading. - with pytest.raises(IOError, - match="Not yet implemented: Selected encoding is" - " not supported."): + # Try to pass use "DELTA_BINARY_PACKED" encoding on float column. + # This should throw an error as only integers are supported. + with pytest.raises(OSError): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, - column_encoding={'b': "DELTA_BINARY_PACKED"}, + column_encoding={'a': "DELTA_BINARY_PACKED"}, use_legacy_dataset=use_legacy_dataset) # Try to pass "RLE_DICTIONARY".