diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index d2944add163..78d7ceabb20 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -60,7 +61,13 @@ using arrow::internal::MultiplyWithOverflow; namespace bit_util = arrow::bit_util; namespace parquet { + namespace { + +// The minimum number of repetition/definition levels to decode at a time, for +// better vectorized performance when doing many smaller record reads +constexpr int64_t kMinLevelBatchSize = 1024; + inline bool HasSpacedValues(const ColumnDescriptor* descr) { if (descr->max_repetition_level() > 0) { // repeated+flat case @@ -396,7 +403,8 @@ std::shared_ptr SerializedPageReader::NextPage() { // Decrypt it if we need to if (crypto_ctx_.data_decryptor != nullptr) { PARQUET_THROW_NOT_OK(decryption_buffer_->Resize( - compressed_len - crypto_ctx_.data_decryptor->CiphertextSizeDelta(), false)); + compressed_len - crypto_ctx_.data_decryptor->CiphertextSizeDelta(), + /*shrink_to_fit=*/false)); compressed_len = crypto_ctx_.data_decryptor->Decrypt( page_buffer->data(), compressed_len, decryption_buffer_->mutable_data()); @@ -498,7 +506,8 @@ std::shared_ptr SerializedPageReader::DecompressIfNeeded( // Grow the uncompressed buffer if we need to. if (uncompressed_len > static_cast(decompression_buffer_->size())) { - PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); + PARQUET_THROW_NOT_OK( + decompression_buffer_->Resize(uncompressed_len, /*shrink_to_fit=*/false)); } if (levels_byte_len > 0) { @@ -1011,7 +1020,7 @@ int64_t TypedColumnReaderImpl::ReadBatchWithDictionary( // Read dictionary indices. *indices_read = ReadDictionaryIndices(indices_to_read, indices); - int64_t total_indices = std::max(num_def_levels, *indices_read); + int64_t total_indices = std::max(num_def_levels, *indices_read); // Some callers use a batch size of 0 just to get the dictionary. int64_t expected_values = std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); @@ -1042,7 +1051,7 @@ int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read); *values_read = this->ReadValues(values_to_read, values); - int64_t total_values = std::max(num_def_levels, *values_read); + int64_t total_values = std::max(num_def_levels, *values_read); int64_t expected_values = std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); if (total_values == 0 && expected_values > 0) { @@ -1150,7 +1159,8 @@ int64_t TypedColumnReaderImpl::Skip(int64_t num_values_to_skip) { } else { // We need to read this Page // Jump to the right offset in the Page - int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint + int64_t batch_size = + kMinLevelBatchSize; // ReadBatch with a smaller memory footprint int64_t values_read = 0; // This will be enough scratch space to accommodate 16-bit levels or any @@ -1217,27 +1227,24 @@ std::shared_ptr ColumnReader::Make(const ColumnDescriptor* descr, // RecordReader namespace internal { -namespace { -// The minimum number of repetition/definition levels to decode at a time, for -// better vectorized performance when doing many smaller record reads -constexpr int64_t kMinLevelBatchSize = 1024; +namespace { template -class TypedRecordReader : public ColumnReaderImplBase, +class TypedRecordReader : public TypedColumnReaderImpl, virtual public RecordReader { public: using T = typename DType::c_type; - using BASE = ColumnReaderImplBase; + using BASE = TypedColumnReaderImpl; TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool) - : BASE(descr, pool) { + // Pager must be set using SetPageReader. + : BASE(descr, /* pager = */ nullptr, pool) { leaf_info_ = leaf_info; nullable_values_ = leaf_info.HasNullableValues(); at_record_start_ = true; - records_read_ = 0; values_written_ = 0; - values_capacity_ = 0; null_count_ = 0; + values_capacity_ = 0; levels_written_ = 0; levels_position_ = 0; levels_capacity_ = 0; @@ -1274,7 +1281,7 @@ class TypedRecordReader : public ColumnReaderImplBase, records_read += ReadRecordData(num_records); } - int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); + int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); // If we are in the middle of a record, we continue until reaching the // desired number of records or the end of the current record if we've found @@ -1335,6 +1342,223 @@ class TypedRecordReader : public ColumnReaderImplBase, return records_read; } + // Throw away levels from start_levels_position to levels_position_. + // Will update levels_position_, levels_written_, and levels_capacity_ + // accordingly and move the levels to left to fill in the gap. + // It will resize the buffer without releasing the memory allocation. + void ThrowAwayLevels(int64_t start_levels_position) { + ARROW_DCHECK_LE(levels_position_, levels_written_); + ARROW_DCHECK_LE(start_levels_position, levels_position_); + ARROW_DCHECK_GT(this->max_def_level_, 0); + ARROW_DCHECK_NE(def_levels_, nullptr); + + int64_t gap = levels_position_ - start_levels_position; + if (gap == 0) return; + + int64_t levels_remaining = levels_written_ - gap; + + auto left_shift = [=](::arrow::ResizableBuffer* buffer) { + int16_t* data = reinterpret_cast(buffer->mutable_data()); + std::copy(data + levels_position_, data + levels_written_, + data + start_levels_position); + PARQUET_THROW_NOT_OK(buffer->Resize(levels_remaining * sizeof(int16_t), + /*shrink_to_fit=*/false)); + }; + + left_shift(def_levels_.get()); + + if (this->max_rep_level_ > 0) { + ARROW_DCHECK_NE(rep_levels_, nullptr); + left_shift(rep_levels_.get()); + } + + levels_written_ -= gap; + levels_position_ -= gap; + levels_capacity_ -= gap; + } + + // Skip records that we have in our buffer. This function is only for + // non-repeated fields. + int64_t SkipRecordsInBufferNonRepeated(int64_t num_records) { + ARROW_DCHECK_EQ(this->max_rep_level_, 0); + if (!this->has_values_to_process() || num_records == 0) return 0; + + int64_t remaining_records = levels_written_ - levels_position_; + int64_t skipped_records = std::min(num_records, remaining_records); + int64_t start_levels_position = levels_position_; + // Since there is no repetition, number of levels equals number of records. + levels_position_ += skipped_records; + + // We skipped the levels by incrementing 'levels_position_'. For values + // we do not have a buffer, so we need to read them and throw them away. + // First we need to figure out how many present/not-null values there are. + std::shared_ptr<::arrow::ResizableBuffer> valid_bits; + valid_bits = AllocateBuffer(this->pool_); + PARQUET_THROW_NOT_OK(valid_bits->Resize(bit_util::BytesForBits(skipped_records), + /*shrink_to_fit=*/true)); + ValidityBitmapInputOutput validity_io; + validity_io.values_read_upper_bound = skipped_records; + validity_io.valid_bits = valid_bits->mutable_data(); + validity_io.valid_bits_offset = 0; + DefLevelsToBitmap(def_levels() + start_levels_position, skipped_records, + this->leaf_info_, &validity_io); + int64_t values_to_read = validity_io.values_read - validity_io.null_count; + + // Now that we have figured out number of values to read, we do not need + // these levels anymore. We will remove these values from the buffer. + // This requires shifting the levels in the buffer to left. So this will + // update levels_position_ and levels_written_. + ThrowAwayLevels(start_levels_position); + // For values, we do not have them in buffer, so we will read them and + // throw them away. + ReadAndThrowAwayValues(values_to_read); + + // Mark the levels as read in the underlying column reader. + this->ConsumeBufferedValues(skipped_records); + + return skipped_records; + } + + // Attempts to skip num_records from the buffer. Will throw away levels + // and corresponding values for the records it skipped and consumes them from the + // underlying decoder. Will advance levels_position_ and update + // at_record_start_. + // Returns how many records were skipped. + int64_t DelimitAndSkipRecordsInBuffer(int64_t num_records) { + if (num_records == 0) return 0; + // Look at the buffered levels, delimit them based on + // (rep_level == 0), report back how many records are in there, and + // fill in how many not-null values (def_level == max_def_level_). + // DelimitRecords updates levels_position_. + int64_t start_levels_position = levels_position_; + int64_t values_seen = 0; + int64_t skipped_records = DelimitRecords(num_records, &values_seen); + ReadAndThrowAwayValues(values_seen); + // Mark those levels and values as consumed in the the underlying page. + // This must be done before we throw away levels since it updates + // levels_position_ and levels_written_. + this->ConsumeBufferedValues(levels_position_ - start_levels_position); + // Updated levels_position_ and levels_written_. + ThrowAwayLevels(start_levels_position); + return skipped_records; + } + + // Skip records for repeated fields. For repeated fields, we are technically + // reading and throwing away the levels and values since we do not know the record + // boundaries in advance. Keep filling the buffer and skipping until we reach the + // desired number of records or we run out of values in the column chunk. + // Returns number of skipped records. + int64_t SkipRecordsRepeated(int64_t num_records) { + ARROW_DCHECK_GT(this->max_rep_level_, 0); + int64_t skipped_records = 0; + + // First consume what is in the buffer. + if (levels_position_ < levels_written_) { + // This updates at_record_start_. + skipped_records = DelimitAndSkipRecordsInBuffer(num_records); + } + + int64_t level_batch_size = + std::max(kMinLevelBatchSize, num_records - skipped_records); + + // If 'at_record_start_' is false, but (skipped_records == num_records), it + // means that for the last record that was counted, we have not seen all + // of it's values yet. + while (!at_record_start_ || skipped_records < num_records) { + // Is there more data to read in this row group? + // HasNextInternal() will advance to the next page if necessary. + if (!this->HasNextInternal()) { + if (!at_record_start_) { + // We ended the row group while inside a record that we haven't seen + // the end of yet. So increment the record count for the last record + // in the row group + ++skipped_records; + at_record_start_ = true; + } + break; + } + + // Read some more levels. + int64_t batch_size = std::min(level_batch_size, available_values_current_page()); + // No more data in column. This must be an empty page. + // If we had exhausted the last page, HasNextInternal() must have advanced + // to the next page. So there must be available values to process. + if (batch_size == 0) { + break; + } + + // For skip we will read the levels and append them to the end + // of the def_levels and rep_levels just like for read. + ReserveLevels(batch_size); + + int16_t* def_levels = this->def_levels() + levels_written_; + int16_t* rep_levels = this->rep_levels() + levels_written_; + + int64_t levels_read = 0; + levels_read = this->ReadDefinitionLevels(batch_size, def_levels); + if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { + throw ParquetException("Number of decoded rep / def levels did not match"); + } + + levels_written_ += levels_read; + int64_t remaining_records = num_records - skipped_records; + // This updates at_record_start_. + skipped_records += DelimitAndSkipRecordsInBuffer(remaining_records); + } + + return skipped_records; + } + + // Read 'num_values' values and throw them away. + // Throws an error if it could not read 'num_values'. + void ReadAndThrowAwayValues(int64_t num_values) { + int64_t values_left = num_values; + int64_t batch_size = kMinLevelBatchSize; // ReadBatch with a smaller memory footprint + int64_t values_read = 0; + + // This will be enough scratch space to accommodate 16-bit levels or any + // value type + int value_size = type_traits::value_byte_size; + std::shared_ptr scratch = AllocateBuffer( + this->pool_, batch_size * std::max(sizeof(int16_t), value_size)); + do { + batch_size = std::min(batch_size, values_left); + values_read = + this->ReadValues(batch_size, reinterpret_cast(scratch->mutable_data())); + values_left -= values_read; + } while (values_read > 0 && values_left > 0); + if (values_left > 0) { + std::stringstream ss; + ss << "Could not read and throw away " << num_values << " values"; + throw ParquetException(ss.str()); + } + } + + int64_t SkipRecords(int64_t num_records) override { + // Top level required field. Number of records equals to number of levels, + // and there is not read-ahead for levels. + if (this->max_rep_level_ == 0 && this->max_def_level_ == 0) { + return this->Skip(num_records); + } + int64_t skipped_records = 0; + if (this->max_rep_level_ == 0) { + // Non-repeated optional field. + // First consume whatever is in the buffer. + skipped_records = SkipRecordsInBufferNonRepeated(num_records); + + ARROW_DCHECK_LE(skipped_records, num_records); + + // For records that we have not buffered, we will use the column + // reader's Skip to do the remaining Skip. Since the field is not + // repeated number of levels to skip is the same as number of records + // to skip. + skipped_records += this->Skip(num_records - skipped_records); + } else { + skipped_records += this->SkipRecordsRepeated(num_records); + } + return skipped_records; + } + // We may outwardly have the appearance of having exhausted a column chunk // when in fact we are in the middle of processing the last batch bool has_values_to_process() const { return levels_position_ < levels_written_; } @@ -1342,7 +1566,8 @@ class TypedRecordReader : public ColumnReaderImplBase, std::shared_ptr ReleaseValues() override { if (uses_values_) { auto result = values_; - PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true)); + PARQUET_THROW_NOT_OK( + result->Resize(bytes_for_values(values_written_), /*shrink_to_fit=*/true)); values_ = AllocateBuffer(this->pool_); values_capacity_ = 0; return result; @@ -1354,7 +1579,8 @@ class TypedRecordReader : public ColumnReaderImplBase, std::shared_ptr ReleaseIsValid() override { if (leaf_info_.HasNullableValues()) { auto result = valid_bits_; - PARQUET_THROW_NOT_OK(result->Resize(bit_util::BytesForBits(values_written_), true)); + PARQUET_THROW_NOT_OK(result->Resize(bit_util::BytesForBits(values_written_), + /*shrink_to_fit=*/true)); valid_bits_ = AllocateBuffer(this->pool_); return result; } else { @@ -1363,7 +1589,8 @@ class TypedRecordReader : public ColumnReaderImplBase, } // Process written repetition/definition levels to reach the end of - // records. Process no more levels than necessary to delimit the indicated + // records. Only used for repeated fields. + // Process no more levels than necessary to delimit the indicated // number of logical records. Updates internal state of RecordReader // // \return Number of records delimited @@ -1441,9 +1668,11 @@ class TypedRecordReader : public ColumnReaderImplBase, if (MultiplyWithOverflow(new_levels_capacity, kItemSize, &capacity_in_bytes)) { throw ParquetException("Allocation size too large (corrupt file?)"); } - PARQUET_THROW_NOT_OK(def_levels_->Resize(capacity_in_bytes, false)); + PARQUET_THROW_NOT_OK( + def_levels_->Resize(capacity_in_bytes, /*shrink_to_fit=*/false)); if (this->max_rep_level_ > 0) { - PARQUET_THROW_NOT_OK(rep_levels_->Resize(capacity_in_bytes, false)); + PARQUET_THROW_NOT_OK( + rep_levels_->Resize(capacity_in_bytes, /*shrink_to_fit=*/false)); } levels_capacity_ = new_levels_capacity; } @@ -1457,8 +1686,8 @@ class TypedRecordReader : public ColumnReaderImplBase, // XXX(wesm): A hack to avoid memory allocation when reading directly // into builder classes if (uses_values_) { - PARQUET_THROW_NOT_OK( - values_->Resize(bytes_for_values(new_values_capacity), false)); + PARQUET_THROW_NOT_OK(values_->Resize(bytes_for_values(new_values_capacity), + /*shrink_to_fit=*/false)); } values_capacity_ = new_values_capacity; } @@ -1466,7 +1695,8 @@ class TypedRecordReader : public ColumnReaderImplBase, int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_); if (valid_bits_->size() < valid_bytes_new) { int64_t valid_bytes_old = bit_util::BytesForBits(values_written_); - PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); + PARQUET_THROW_NOT_OK( + valid_bits_->Resize(valid_bytes_new, /*shrink_to_fit=*/false)); // Avoid valgrind warnings memset(valid_bits_->mutable_data() + valid_bytes_old, 0, @@ -1479,29 +1709,10 @@ class TypedRecordReader : public ColumnReaderImplBase, ResetValues(); if (levels_written_ > 0) { - const int64_t levels_remaining = levels_written_ - levels_position_; - // Shift remaining levels to beginning of buffer and trim to only the number - // of decoded levels remaining - int16_t* def_data = def_levels(); - int16_t* rep_data = rep_levels(); - - std::copy(def_data + levels_position_, def_data + levels_written_, def_data); - PARQUET_THROW_NOT_OK( - def_levels_->Resize(levels_remaining * sizeof(int16_t), false)); - - if (this->max_rep_level_ > 0) { - std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data); - PARQUET_THROW_NOT_OK( - rep_levels_->Resize(levels_remaining * sizeof(int16_t), false)); - } - - levels_written_ -= levels_position_; - levels_position_ = 0; - levels_capacity_ = levels_remaining; + // Throw away levels from 0 to levels_position_. + ThrowAwayLevels(0); } - records_read_ = 0; - // Call Finish on the binary builders to reset them } @@ -1536,7 +1747,7 @@ class TypedRecordReader : public ColumnReaderImplBase, int64_t ReadRecordData(int64_t num_records) { // Conservative upper bound const int64_t possible_num_values = - std::max(num_records, levels_written_ - levels_position_); + std::max(num_records, levels_written_ - levels_position_); ReserveValues(possible_num_values); const int64_t start_levels_position = levels_position_; @@ -1548,7 +1759,7 @@ class TypedRecordReader : public ColumnReaderImplBase, } else if (this->max_def_level_ > 0) { // No repetition levels, skip delimiting logic. Each level represents a // null or not null entry - records_read = std::min(levels_written_ - levels_position_, num_records); + records_read = std::min(levels_written_ - levels_position_, num_records); // This is advanced by DelimitRecords, which we skipped levels_position_ += records_read; @@ -1618,9 +1829,9 @@ class TypedRecordReader : public ColumnReaderImplBase, if (values_written_ > 0) { // Resize to 0, but do not shrink to fit if (uses_values_) { - PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + PARQUET_THROW_NOT_OK(values_->Resize(0, /*shrink_to_fit=*/false)); } - PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); + PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, /*shrink_to_fit=*/false)); values_written_ = 0; values_capacity_ = 0; null_count_ = 0; diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 4bebdba7e95..b06266de38d 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -267,7 +267,7 @@ namespace internal { /// /// \note API EXPERIMENTAL /// \since 1.3.0 -class RecordReader { +class PARQUET_EXPORT RecordReader { public: static std::shared_ptr Make( const ColumnDescriptor* descr, LevelInfo leaf_info, @@ -277,9 +277,17 @@ class RecordReader { virtual ~RecordReader() = default; /// \brief Attempt to read indicated number of records from column chunk + /// Note that for repeated fields, a record may have more than one value + /// and all of them are read. /// \return number of records read virtual int64_t ReadRecords(int64_t num_records) = 0; + /// \brief Attempt to skip indicated number of records from column chunk. + /// Note that for repeated fields, a record may have more than one value + /// and all of them are skipped. + /// \return number of records skipped + virtual int64_t SkipRecords(int64_t num_records) = 0; + /// \brief Pre-allocate space for data. Results in better flat read performance virtual void Reserve(int64_t num_values) = 0; @@ -299,7 +307,8 @@ class RecordReader { /// process virtual bool HasMoreData() const = 0; - /// \brief Advance record reader to the next row group + /// \brief Advance record reader to the next row group. Must be set before + /// any records could be read/skipped. /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader virtual void SetPageReader(std::unique_ptr reader) = 0; @@ -319,6 +328,7 @@ class RecordReader { uint8_t* values() const { return values_->mutable_data(); } /// \brief Number of values written including nulls (if any) + /// There is no read-ahead/buffering for values. int64_t values_written() const { return values_written_; } /// \brief Number of definition / repetition levels (from those that have @@ -326,10 +336,12 @@ class RecordReader { int64_t levels_position() const { return levels_position_; } /// \brief Number of definition / repetition levels that have been written - /// internally in the reader + /// internally in the reader. This may be larger than values_written() because + /// for repeated fields we need to look at the levels in advance to figure out + /// the record boundaries. int64_t levels_written() const { return levels_written_; } - /// \brief Number of nulls in the leaf + /// \brief Number of nulls in the leaf that we have read so far. int64_t null_count() const { return null_count_; } /// \brief True if the leaf values are nullable @@ -339,28 +351,49 @@ class RecordReader { bool read_dictionary() const { return read_dictionary_; } protected: + /// \brief Indicates if we can have nullable values. bool nullable_values_; bool at_record_start_; int64_t records_read_; + /// \brief Stores values. These values are populated based on each ReadRecords + /// call. No extra values are buffered for the next call. SkipRecords will not + /// add any value to this buffer. + std::shared_ptr<::arrow::ResizableBuffer> values_; + /// \brief False for BYTE_ARRAY, in which case we don't allocate the values + /// buffer and we directly read into builder classes. + bool uses_values_; + + /// \brief Values that we have read into 'values_' + 'null_count_'. int64_t values_written_; int64_t values_capacity_; int64_t null_count_; - int64_t levels_written_; - int64_t levels_position_; - int64_t levels_capacity_; - - std::shared_ptr<::arrow::ResizableBuffer> values_; - // In the case of false, don't allocate the values buffer (when we directly read into - // builder classes). - bool uses_values_; - + /// \brief Each bit corresponds to one element in 'values_' and specifies if it + /// is null or not null. std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; + + /// \brief Buffer for definition levels. May contain more levels than + /// is actually read. This is because we read levels ahead to + /// figure out record boundaries for repeated fields. + /// For flat required fields, 'def_levels_' and 'rep_levels_' are not + /// populated. For non-repeated fields 'rep_levels_' is not populated. + /// 'def_levels_' and 'rep_levels_' must be of the same size if present. std::shared_ptr<::arrow::ResizableBuffer> def_levels_; + /// \brief Buffer for repetition levels. Only populated for repeated + /// fields. std::shared_ptr<::arrow::ResizableBuffer> rep_levels_; + /// \brief Number of definition / repetition levels that have been written + /// internally in the reader. This may be larger than values_written() since + /// for repeated fields we need to look at the levels in advance to figure out + /// the record boundaries. + int64_t levels_written_; + /// \brief Position of the next level that should be consumed. + int64_t levels_position_; + int64_t levels_capacity_; + bool read_dictionary_ = false; }; diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc index b2f947eea46..6916ebe8914 100644 --- a/cpp/src/parquet/column_reader_test.cc +++ b/cpp/src/parquet/column_reader_test.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include @@ -26,6 +27,7 @@ #include #include +#include "arrow/array/array_binary.h" #include "arrow/util/macros.h" #include "parquet/column_page.h" #include "parquet/column_reader.h" @@ -35,7 +37,10 @@ namespace parquet { +using parquet::Repetition; +using parquet::internal::BinaryRecordReader; using schema::NodePtr; +using testing::ElementsAre; namespace test { @@ -74,9 +79,8 @@ static inline bool vector_equal_with_def_levels(const std::vector& left, class TestPrimitiveReader : public ::testing::Test { public: void InitReader(const ColumnDescriptor* d) { - std::unique_ptr pager_; - pager_.reset(new test::MockPageReader(pages_)); - reader_ = ColumnReader::Make(d, std::move(pager_)); + auto pager = std::make_unique(pages_); + reader_ = ColumnReader::Make(d, std::move(pager)); } void CheckResults() { @@ -631,5 +635,572 @@ TEST_F(TestPrimitiveReader, TestNonDictionaryEncodedPagesWithExposeEncoding) { pages_.clear(); } +class RecordReaderTest : public ::testing::Test { + public: + const int32_t kNullValue = -1; + + void Init(int32_t max_def_level, int32_t max_rep_level, Repetition::type repetition) { + level_info_.def_level = max_def_level; + level_info_.rep_level = max_rep_level; + repetition_type_ = repetition; + + NodePtr type = schema::Int32("b", repetition); + descr_ = std::make_unique(type, level_info_.def_level, + level_info_.rep_level); + + record_reader_ = internal::RecordReader::Make(descr_.get(), level_info_); + } + + void CheckReadValues(std::vector expected_values, + std::vector expected_defs, + std::vector expected_reps) { + const auto read_values = reinterpret_cast(record_reader_->values()); + std::vector read_vals(read_values, + read_values + record_reader_->values_written()); + ASSERT_EQ(read_vals.size(), expected_values.size()); + for (size_t i = 0; i < expected_values.size(); ++i) { + if (expected_values[i] != kNullValue) { + ASSERT_EQ(expected_values[i], read_values[i]); + } + } + + if (repetition_type_ != Repetition::REQUIRED) { + std::vector read_defs( + record_reader_->def_levels(), + record_reader_->def_levels() + record_reader_->levels_position()); + ASSERT_TRUE(vector_equal(expected_defs, read_defs)); + } + + if (repetition_type_ == Repetition::REPEATED) { + std::vector read_reps( + record_reader_->rep_levels(), + record_reader_->rep_levels() + record_reader_->levels_position()); + ASSERT_TRUE(vector_equal(expected_reps, read_reps)); + } + } + + void CheckState(int64_t values_written, int64_t null_count, int64_t levels_written, + int64_t levels_position) { + ASSERT_EQ(record_reader_->values_written(), values_written); + ASSERT_EQ(record_reader_->null_count(), null_count); + ASSERT_EQ(record_reader_->levels_written(), levels_written); + ASSERT_EQ(record_reader_->levels_position(), levels_position); + } + + protected: + std::shared_ptr record_reader_; + std::unique_ptr descr_; + internal::LevelInfo level_info_; + Repetition::type repetition_type_; +}; + +// Tests reading a repeated field using the RecordReader. +TEST_F(RecordReaderTest, BasicReadRepeatedField) { + Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED); + + // Records look like: {[10], null, [20, 20], null, [30, 30, 30], null} + std::vector> pages; + std::vector values = {10, 20, 20, 30, 30, 30}; + std::vector def_levels = {1, 0, 1, 1, 0, 1, 1, 1, 0}; + std::vector rep_levels = {0, 0, 0, 1, 0, 0, 1, 1, 0}; + + std::shared_ptr page = MakeDataPage( + descr_.get(), values, /*num_values=*/static_cast(def_levels.size()), + Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels, + level_info_.rep_level); + pages.push_back(std::move(page)); + auto pager = std::make_unique(pages); + record_reader_->SetPageReader(std::move(pager)); + + // Read [10], null + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/2); + ASSERT_EQ(records_read, 2); + CheckState(/*values_written=*/2, /*null_count=*/1, /*levels_written=*/9, + /*levels_position=*/2); + CheckReadValues(/*expected_values=*/{10, kNullValue}, /*expected_defs=*/{1, 0}, + /*expected_reps=*/{0, 0}); + record_reader_->Reset(); + CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/7, + /*levels_position=*/0); + // Read [20, 20], null, [30, 30, 30] + records_read = record_reader_->ReadRecords(/*num_records=*/3); + ASSERT_EQ(records_read, 3); + CheckState(/*values_written=*/6, /*null_count=*/1, /*levels_written=*/7, + /*levels_position=*/6); + CheckReadValues(/*expected_values=*/{20, 20, kNullValue, 30, 30, 30}, + /*expected_defs=*/{1, 1, 0, 1, 1, 1}, + /*expected_reps=*/{0, 1, 0, 0, 1, 1}); + record_reader_->Reset(); + CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1, + /*levels_position=*/0); + // Read the last null value and read past the end. + records_read = record_reader_->ReadRecords(/*num_records=*/3); + ASSERT_EQ(records_read, 1); + CheckState(/*values_written=*/1, /*null_count=*/1, /*levels_written=*/1, + /*levels_position=*/1); + CheckReadValues(/*expected_values=*/{kNullValue}, + /*expected_defs=*/{0}, + /*expected_reps=*/{0}); + record_reader_->Reset(); + CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0, + /*levels_position=*/0); +} + +// Test that we can skip required top level field. +TEST_F(RecordReaderTest, SkipRequiredTopLevel) { + Init(/*max_def_level=*/0, /*max_rep_level=*/0, Repetition::REQUIRED); + + std::vector> pages; + std::vector values = {10, 20, 20, 30, 30, 30}; + std::shared_ptr page = MakeDataPage( + descr_.get(), values, /*num_values=*/static_cast(values.size()), + Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, /*def_levels=*/{}, level_info_.def_level, + /*rep_levels=*/{}, level_info_.rep_level); + pages.push_back(std::move(page)); + auto pager = std::make_unique(pages); + record_reader_->SetPageReader(std::move(pager)); + + int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/3); + ASSERT_EQ(records_skipped, 3); + CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0, + /*levels_position=*/0); + + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/2); + ASSERT_EQ(records_read, 2); + CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/0, + /*levels_position=*/0); + CheckReadValues(/*expected_values=*/{30, 30}, /*expected_defs=*/{}, + /*expected_reps=*/{}); + record_reader_->Reset(); + CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0, + /*levels_position=*/0); +} + +// Skip an optional field. Intentionally included some null values. +TEST_F(RecordReaderTest, SkipOptional) { + Init(/*max_def_level=*/1, /*max_rep_level=*/0, Repetition::OPTIONAL); + + // Records look like {null, 10, 20, 30, null, 40, 50, 60} + std::vector> pages; + std::vector values = {10, 20, 30, 40, 50, 60}; + std::vector def_levels = {0, 1, 1, 0, 1, 1, 1, 1}; + + std::shared_ptr page = MakeDataPage( + descr_.get(), values, /*num_values=*/static_cast(values.size()), + Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, def_levels, level_info_.def_level, + /*rep_levels=*/{}, level_info_.rep_level); + pages.push_back(std::move(page)); + auto pager = std::make_unique(pages); + record_reader_->SetPageReader(std::move(pager)); + + { + // Skip {null, 10} + // This also tests when we start with a Skip. + int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2); + ASSERT_EQ(records_skipped, 2); + CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0, + /*levels_position=*/0); + } + + { + // Read 3 records: {20, null, 30} + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/3); + + ASSERT_EQ(records_read, 3); + // values_written() includes null values. + // We had skipped 2 of the levels above. So there is only 6 left in total to + // read, and we read 3 of them here. + CheckState(/*values_written=*/3, /*null_count=*/1, /*levels_written=*/6, + /*levels_position=*/3); + + // ReadRecords for optional fields uses ReadValuesSpaced, so there is a + // placeholder for null. + CheckReadValues(/*expected_values=*/{20, kNullValue, 30}, /*expected_defs=*/{1, 0, 1}, + /*expected_reps=*/{}); + } + + { + // Skip {40, 50}. + int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2); + ASSERT_EQ(records_skipped, 2); + CheckState(/*values_written=*/3, /*null_count=*/1, /*levels_written=*/4, + /*levels_position=*/3); + CheckReadValues(/*expected_values=*/{20, kNullValue, 30}, /*expected_defs=*/{1, 0, 1}, + /*expected_reps=*/{}); + // Reset after a Skip. + record_reader_->Reset(); + CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1, + /*levels_position=*/0); + } + + { + // Read to the end of the column. Read {60} + // This test checks that ReadAndThrowAwayValues works, since if it + // does not we would read the wrong values. + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1); + + ASSERT_EQ(records_read, 1); + CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/1, + /*levels_position=*/1); + CheckReadValues(/*expected_values=*/{60}, + /*expected_defs=*/{1}, + /*expected_reps=*/{}); + } + + // We have exhausted all the records. + ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/3), 0); + ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/3), 0); +} + +// Test skipping for repeated fields. +TEST_F(RecordReaderTest, SkipRepeated) { + Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED); + + // Records look like {null, [20, 20, 20], null, [30, 30], [40]} + std::vector> pages; + std::vector values = {20, 20, 20, 30, 30, 40}; + std::vector def_levels = {0, 1, 1, 1, 0, 1, 1, 1}; + std::vector rep_levels = {0, 0, 1, 1, 0, 0, 1, 0}; + + std::shared_ptr page = MakeDataPage( + descr_.get(), values, /*num_values=*/static_cast(values.size()), + Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels, + level_info_.rep_level); + pages.push_back(std::move(page)); + auto pager = std::make_unique(pages); + record_reader_->SetPageReader(std::move(pager)); + + { + // This should skip the first null record. + int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/1); + ASSERT_EQ(records_skipped, 1); + ASSERT_EQ(record_reader_->values_written(), 0); + ASSERT_EQ(record_reader_->null_count(), 0); + // For repeated fields, we need to read the levels to find the record + // boundaries and skip. So some levels are read, however, the skipped + // level should not be there after the skip. That's why levels_position() + // is 0. + CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/7, + /*levels_position=*/0); + CheckReadValues(/*expected_values=*/{}, + /*expected_defs=*/{}, + /*expected_reps=*/{}); + } + + { + // Read [20, 20, 20] + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1); + ASSERT_EQ(records_read, 1); + CheckState(/*values_written=*/3, /*null_count=*/0, /*levels_written=*/7, + /*levels_position=*/3); + CheckReadValues(/*expected_values=*/{20, 20, 20}, + /*expected_defs=*/{1, 1, 1}, + /*expected_reps=*/{0, 1, 1}); + } + + { + // Skip the null record and also skip [30, 30] + int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2); + ASSERT_EQ(records_skipped, 2); + // We remove the skipped levels from the buffer. + CheckState(/*values_written=*/3, /*null_count=*/0, /*levels_written=*/4, + /*levels_position=*/3); + CheckReadValues(/*expected_values=*/{20, 20, 20}, + /*expected_defs=*/{1, 1, 1}, + /*expected_reps=*/{0, 1, 1}); + } + + { + // Read [40] + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1); + ASSERT_EQ(records_read, 1); + CheckState(/*values_written=*/4, /*null_count=*/0, /*levels_written=*/4, + /*levels_position=*/4); + CheckReadValues(/*expected_values=*/{20, 20, 20, 40}, + /*expected_defs=*/{1, 1, 1, 1}, + /*expected_reps=*/{0, 1, 1, 0}); + } +} + +// Tests that for repeated fields, we first consume what is in the buffer +// before reading more levels. +TEST_F(RecordReaderTest, SkipRepeatedConsumeBufferFirst) { + Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED); + + std::vector> pages; + std::vector values(2048, 10); + std::vector def_levels(2048, 1); + std::vector rep_levels(2048, 0); + + std::shared_ptr page = MakeDataPage( + descr_.get(), values, /*num_values=*/static_cast(values.size()), + Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels, + level_info_.rep_level); + pages.push_back(std::move(page)); + auto pager = std::make_unique(pages); + record_reader_->SetPageReader(std::move(pager)); + { + // Read 1000 records. We will read 1024 levels because that is the minimum + // number of levels to read. + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1000); + ASSERT_EQ(records_read, 1000); + CheckState(/*values_written=*/1000, /*null_count=*/0, /*levels_written=*/1024, + /*levels_position=*/1000); + std::vector expected_values(1000, 10); + std::vector expected_def_levels(1000, 1); + std::vector expected_rep_levels(1000, 0); + CheckReadValues(expected_values, expected_def_levels, expected_rep_levels); + // Reset removes the already consumed values and levels. + record_reader_->Reset(); + } + + { // Skip 12 records. Since we already have 24 in the buffer, we should not be + // reading any more levels into the buffer, we will just consume 12 of it. + int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/12); + ASSERT_EQ(records_skipped, 12); + CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/12, + /*levels_position=*/0); + // Everthing is empty because we reset the reader before this skip. + CheckReadValues(/*expected_values=*/{}, /*expected_def_levels=*/{}, + /*expected_rep_levels=*/{}); + } +} + +// Test reading when one record spans multiple pages for a repeated field. +TEST_F(RecordReaderTest, ReadPartialRecord) { + Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED); + + std::vector> pages; + + // Page 1: {[10], [20, 20, 20 ... } continues to next page. + { + std::shared_ptr page = MakeDataPage( + descr_.get(), /*values=*/{10, 20, 20, 20}, /*num_values=*/4, Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, level_info_.def_level, + /*rep_levels=*/{0, 0, 1, 1}, level_info_.rep_level); + pages.push_back(std::move(page)); + } + + // Page 2: {... 20, 20, ...} continues from previous page and to next page. + { + std::shared_ptr page = MakeDataPage( + descr_.get(), /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info_.def_level, + /*rep_levels=*/{1, 1}, level_info_.rep_level); + pages.push_back(std::move(page)); + } + + // Page 3: { ... 20], [30]} continues from previous page. + { + std::shared_ptr page = MakeDataPage( + descr_.get(), /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info_.def_level, + /*rep_levels=*/{1, 0}, level_info_.rep_level); + pages.push_back(std::move(page)); + } + + auto pager = std::make_unique(pages); + record_reader_->SetPageReader(std::move(pager)); + + { + // Read [10] + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1); + ASSERT_EQ(records_read, 1); + CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/4, + /*levels_position=*/1); + CheckReadValues(/*expected_values=*/{10}, + /*expected_defs=*/{1}, + /*expected_reps=*/{0}); + } + + { + // Read [20, 20, 20, 20, 20, 20] that spans multiple pages. + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1); + ASSERT_EQ(records_read, 1); + CheckState(/*values_written=*/7, /*null_count=*/0, /*levels_written=*/8, + /*levels_position=*/7); + CheckReadValues(/*expected_values=*/{10, 20, 20, 20, 20, 20, 20}, + /*expected_defs=*/{1, 1, 1, 1, 1, 1, 1}, + /*expected_reps=*/{0, 0, 1, 1, 1, 1, 1}); + } + + { + // Read [30] + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1); + ASSERT_EQ(records_read, 1); + CheckState(/*values_written=*/8, /*null_count=*/0, /*levels_written=*/8, + /*levels_position=*/8); + CheckReadValues(/*expected_values=*/{10, 20, 20, 20, 20, 20, 20, 30}, + /*expected_defs=*/{1, 1, 1, 1, 1, 1, 1, 1}, + /*expected_reps=*/{0, 0, 1, 1, 1, 1, 1, 0}); + } +} + +// Test skipping for repeated fields for the case when one record spans multiple +// pages. +TEST_F(RecordReaderTest, SkipPartialRecord) { + Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED); + + std::vector> pages; + + // Page 1: {[10], [20, 20, 20 ... } continues to next page. + { + std::shared_ptr page = MakeDataPage( + descr_.get(), /*values=*/{10, 20, 20, 20}, /*num_values=*/4, Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, level_info_.def_level, + /*rep_levels=*/{0, 0, 1, 1}, level_info_.rep_level); + pages.push_back(std::move(page)); + } + + // Page 2: {... 20, 20, ...} continues from previous page and to next page. + { + std::shared_ptr page = MakeDataPage( + descr_.get(), /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info_.def_level, + /*rep_levels=*/{1, 1}, level_info_.rep_level); + pages.push_back(std::move(page)); + } + + // Page 3: { ... 20, [30]} continues from previous page. + { + std::shared_ptr page = MakeDataPage( + descr_.get(), /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN, + /*indices=*/{}, + /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info_.def_level, + /*rep_levels=*/{1, 0}, level_info_.rep_level); + pages.push_back(std::move(page)); + } + + auto pager = std::make_unique(pages); + record_reader_->SetPageReader(std::move(pager)); + + { + // Read [10] + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1); + ASSERT_EQ(records_read, 1); + // There are 4 levels in the first page. + CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/4, + /*levels_position=*/1); + CheckReadValues(/*expected_values=*/{10}, + /*expected_defs=*/{1}, + /*expected_reps=*/{0}); + } + + { + // Skip the record that goes across pages. + int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/1); + ASSERT_EQ(records_skipped, 1); + CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/2, + /*levels_position=*/1); + CheckReadValues(/*expected_values=*/{10}, + /*expected_defs=*/{1}, + /*expected_reps=*/{0}); + } + + { + // Read [30] + int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1); + + ASSERT_EQ(records_read, 1); + CheckState(/*values_written=*/2, /*null_count=*/0, /*levels_written=*/2, + /*levels_position=*/2); + CheckReadValues(/*expected_values=*/{10, 30}, + /*expected_defs=*/{1, 1}, + /*expected_reps=*/{0, 0}); + } +} + +// Test that SkipRecords works on ByteArrays. Specifically, this is testing +// ReadAndThrowAwayValues for ByteArrays. +TEST(RecordReaderByteArrayTest, SkipByteArray) { + internal::LevelInfo level_info; + level_info.def_level = 1; + level_info.rep_level = 0; + + // Must use REPEATED to exercise ReadAndThrowAwayValues for ByteArrays. It + // does not do any buffering for Optional or Required fields as it calls + // ResetValues after every read. + NodePtr type = schema::ByteArray("b", Repetition::OPTIONAL); + const ColumnDescriptor descr(type, level_info.def_level, level_info.rep_level); + + std::vector> pages; + int levels_per_page = 90; + int num_pages = 1; + + std::vector def_levels; + std::vector rep_levels; + std::vector values; + std::vector buffer; + + MakePages(&descr, num_pages, levels_per_page, def_levels, rep_levels, + values, buffer, pages, Encoding::PLAIN); + + auto pager = std::make_unique(pages); + + std::shared_ptr record_reader = + internal::RecordReader::Make(&descr, level_info); + record_reader->SetPageReader(std::move(pager)); + + // Read one-third of the page. + ASSERT_EQ(record_reader->ReadRecords(/*num_records=*/30), 30); + + // Skip 30 records. + ASSERT_EQ(record_reader->SkipRecords(/*num_records=*/30), 30); + + // Read 60 more records. Only 30 will be read, since we read 30 and skipped 30, + // so only 30 is left. + ASSERT_EQ(record_reader->ReadRecords(/*num_records=*/60), 30); + + auto binary_reader = dynamic_cast(record_reader.get()); + ASSERT_NE(binary_reader, nullptr); + // Chunks are reset after this call. + ::arrow::ArrayVector array_vector = binary_reader->GetBuilderChunks(); + ASSERT_EQ(array_vector.size(), 1); + auto binary_array = dynamic_cast<::arrow::BinaryArray*>(array_vector[0].get()); + ASSERT_NE(binary_array, nullptr); + ASSERT_EQ(binary_array->length(), 60); + + // Our values above are not spaced, however, the RecordReader will + // read spaced for nullable values. + // Create spaced expected values. + std::vector expected_values; + size_t values_index = 0; + for (int i = 0; i < 90; ++i) { + if (def_levels[i] == 0) { + expected_values.emplace_back(); + continue; + } + expected_values.emplace_back(reinterpret_cast(values[values_index].ptr), + values[values_index].len); + ++values_index; + } + + // Check that the expected values match the actual values. + for (size_t i = 0; i < 30; ++i) { + ASSERT_EQ(expected_values[i].compare(binary_array->GetView(i)), 0); + ASSERT_EQ(def_levels[i] == 0, binary_array->IsNull(i)); + } + // Repeat for the next range that we read. + for (size_t i = 60; i < 90; ++i) { + ASSERT_EQ(expected_values[i].compare(binary_array->GetView(i - 30)), 0); + ASSERT_EQ(def_levels[i] == 0, binary_array->IsNull(i - 30)); + } +} + } // namespace test } // namespace parquet