-
Notifications
You must be signed in to change notification settings - Fork 4.2k
PARQUET-2188: [parquet-cpp] Add SkipRecords API to RecordReader #14142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
66b6531
327f42d
c0fac59
2945559
938e494
ba87e4d
921493b
7897740
3c90a0f
1d0c22e
558efb8
84a9fe4
e91ccf5
66e2058
613f0b0
59758ce
57b249e
8f03b39
3357881
ce7c855
1679747
2e27262
7355db5
0223c39
3dd5fa3
9c4d66f
df873c7
7ab5233
312abbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1005,7 +1005,7 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchWithDictionary( | |||||||||||||
|
|
||||||||||||||
| // Read dictionary indices. | ||||||||||||||
| *indices_read = ReadDictionaryIndices(indices_to_read, indices); | ||||||||||||||
| int64_t total_indices = std::max(num_def_levels, *indices_read); | ||||||||||||||
| int64_t total_indices = std::max<int>(num_def_levels, *indices_read); | ||||||||||||||
| // Some callers use a batch size of 0 just to get the dictionary. | ||||||||||||||
| int64_t expected_values = | ||||||||||||||
| std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); | ||||||||||||||
|
|
@@ -1036,7 +1036,7 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def | |||||||||||||
| ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read); | ||||||||||||||
|
|
||||||||||||||
| *values_read = this->ReadValues(values_to_read, values); | ||||||||||||||
| int64_t total_values = std::max(num_def_levels, *values_read); | ||||||||||||||
| int64_t total_values = std::max<int>(num_def_levels, *values_read); | ||||||||||||||
| int64_t expected_values = | ||||||||||||||
| std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); | ||||||||||||||
| if (total_values == 0 && expected_values > 0) { | ||||||||||||||
|
|
@@ -1218,20 +1218,21 @@ namespace { | |||||||||||||
| constexpr int64_t kMinLevelBatchSize = 1024; | ||||||||||||||
|
|
||||||||||||||
| template <typename DType> | ||||||||||||||
| class TypedRecordReader : public ColumnReaderImplBase<DType>, | ||||||||||||||
| class TypedRecordReader : public TypedColumnReaderImpl<DType>, | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the change in the base class?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To have access to the ColumnReader's Skip method. |
||||||||||||||
| virtual public RecordReader { | ||||||||||||||
| public: | ||||||||||||||
| using T = typename DType::c_type; | ||||||||||||||
| using BASE = ColumnReaderImplBase<DType>; | ||||||||||||||
| TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool) | ||||||||||||||
| : BASE(descr, pool) { | ||||||||||||||
| using BASE = TypedColumnReaderImpl<DType>; | ||||||||||||||
| TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, | ||||||||||||||
| MemoryPool* pool) | ||||||||||||||
| // Pager must be set using SetPageReader. | ||||||||||||||
| : BASE(descr, /* pager = */ nullptr, pool) { | ||||||||||||||
| leaf_info_ = leaf_info; | ||||||||||||||
| nullable_values_ = leaf_info.HasNullableValues(); | ||||||||||||||
| at_record_start_ = true; | ||||||||||||||
| records_read_ = 0; | ||||||||||||||
| values_written_ = 0; | ||||||||||||||
| values_capacity_ = 0; | ||||||||||||||
| null_count_ = 0; | ||||||||||||||
| values_capacity_ = 0; | ||||||||||||||
| levels_written_ = 0; | ||||||||||||||
| levels_position_ = 0; | ||||||||||||||
| levels_capacity_ = 0; | ||||||||||||||
|
|
@@ -1245,7 +1246,7 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>, | |||||||||||||
| rep_levels_ = AllocateBuffer(pool); | ||||||||||||||
| Reset(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| int64_t available_values_current_page() const { | ||||||||||||||
| return this->num_buffered_values_ - this->num_decoded_values_; | ||||||||||||||
| } | ||||||||||||||
|
|
@@ -1268,7 +1269,7 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>, | |||||||||||||
| records_read += ReadRecordData(num_records); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); | ||||||||||||||
| int64_t level_batch_size = std::max<int>(kMinLevelBatchSize, num_records); | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment on type-template param for max
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||||||||||
|
|
||||||||||||||
| // If we are in the middle of a record, we continue until reaching the | ||||||||||||||
| // desired number of records or the end of the current record if we've found | ||||||||||||||
|
|
@@ -1328,6 +1329,156 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>, | |||||||||||||
|
|
||||||||||||||
| return records_read; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Skip records that we have in our buffer. This function is only for | ||||||||||||||
| // non-repeated fields. | ||||||||||||||
| int64_t SkipRecordsInBufferNonRepeated(int64_t num_records) { | ||||||||||||||
| ARROW_DCHECK(this->max_rep_level_ == 0); | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there ARROW_DCHECK_EQ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||
| ARROW_DCHECK(this->has_values_to_process()); | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first DCHECK seems self explanitory, I'm not sure I understand the second one though. Just to validate, do you think DCHECK or a throwing here is more appropriate?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed the check and added a return statement. |
||||||||||||||
|
|
||||||||||||||
| int64_t remaining_records = levels_written_ - levels_position_; | ||||||||||||||
| int64_t skipped_records = std::min(num_records, remaining_records); | ||||||||||||||
| int64_t start_levels_position = levels_position_; | ||||||||||||||
| // Since there is no repetition, number of levels equals number of records. | ||||||||||||||
| levels_position_ += skipped_records; | ||||||||||||||
| // We skipped the levels by incrementing 'levels_position_'. For values | ||||||||||||||
| // we do not have a buffer, so we need to read them and throw them away. | ||||||||||||||
| // First we need to figure out how many present/not-null values there are. | ||||||||||||||
| std::shared_ptr<::arrow::ResizableBuffer> valid_bits; | ||||||||||||||
| valid_bits = AllocateBuffer(this->pool_); | ||||||||||||||
| PARQUET_THROW_NOT_OK( | ||||||||||||||
| valid_bits->Resize(bit_util::BytesForBits(skipped_records), true)); | ||||||||||||||
| ValidityBitmapInputOutput validity_io; | ||||||||||||||
| validity_io.values_read_upper_bound = skipped_records; | ||||||||||||||
| validity_io.valid_bits = valid_bits->mutable_data(); | ||||||||||||||
| validity_io.valid_bits_offset = 0; | ||||||||||||||
| DefLevelsToBitmap(def_levels() + start_levels_position, | ||||||||||||||
| levels_position_ - start_levels_position, | ||||||||||||||
| this->leaf_info_, &validity_io); | ||||||||||||||
| int64_t values_to_read = validity_io.values_read - validity_io.null_count; | ||||||||||||||
| ReadAndThrowAway(values_to_read); | ||||||||||||||
| // Mark the levels as read in the underlying column reader. | ||||||||||||||
| this->ConsumeBufferedValues(skipped_records); | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it might be worth adding comments to distringuish the difference in this operation from ThrowAwayLevels above.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to clarify things. Let me know if it is still not clear. |
||||||||||||||
| return skipped_records; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Skip records for repeated fields. Returns number of skipped records. | ||||||||||||||
| // Skip records for repeated fields. Returns number of skipped records. | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: repeated comment.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||
| int64_t SkipRecordsRepeated(int64_t num_records) { | ||||||||||||||
| ARROW_DCHECK_GT(this->max_rep_level_, 0); | ||||||||||||||
|
|
||||||||||||||
| // For repeated fields, we are technically reading and throwing away the | ||||||||||||||
| // levels and values since we do not know the record boundaries in advance. | ||||||||||||||
| // Keep filling the buffer and skipping until we reach the desired number | ||||||||||||||
| // of records or we run out of values in the column chunk. | ||||||||||||||
| int64_t skipped_records = 0; | ||||||||||||||
| int64_t level_batch_size = std::max<int>(kMinLevelBatchSize, num_records); | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. int64_t for max?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||
| // If 'at_record_start_' is false, but (skip_records == num_records), it | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||
| // means that for the last record that was counted, we have not seen all | ||||||||||||||
| // of it's values yet. | ||||||||||||||
| while (!at_record_start_ || skipped_records < num_records) { | ||||||||||||||
| // Is there more data to read in this row group? | ||||||||||||||
| // HasNextInternal() will advance to the next page if necessary. | ||||||||||||||
| if (!this->HasNextInternal()) { | ||||||||||||||
| if (!at_record_start_) { | ||||||||||||||
| // We ended the row group while inside a record that we haven't seen | ||||||||||||||
| // the end of yet. So increment the record count for the last record | ||||||||||||||
| // in the row group | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Row groups automatically terminate repeated values?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, however, we are counting the records by seeing the next record (rep_level = 0). So for the very last one, we need to manually increment it. |
||||||||||||||
| ++skipped_records; | ||||||||||||||
| at_record_start_ = true; | ||||||||||||||
| } | ||||||||||||||
| break; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Read some more levels. | ||||||||||||||
| int64_t batch_size = | ||||||||||||||
| std::min(level_batch_size, available_values_current_page()); | ||||||||||||||
| // No more data in column. This must be an empty page. | ||||||||||||||
| // If we had exhausted the last page, HasNextInternal() must have advanced | ||||||||||||||
| // to the next page. So there must be available values to process. | ||||||||||||||
| if (batch_size == 0) { | ||||||||||||||
| break; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| ReserveLevels(batch_size); | ||||||||||||||
|
|
||||||||||||||
| int16_t* def_levels = this->def_levels() + levels_written_; | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is levels_written_ important here if we are discarding data?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When reading repetition and definition levels, we append them to the buffer that we already have. When we figure out how many of those we need to skip, we shift the values to the left to skip them. Your comment revealed a bug where I was not shifting the values. I fixed it and checked that in the tests.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, that doesn't seem to answer the question? Why bump
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am bumping it here for correctness. At any point in time levels_written_ shows the end of the levels that are in the buffer. So we update it right here after we read a batch of levels. Note that we may not throw away all the levels that we read here. We may only throw away some of them in DelimitAndSkipRecordsInBuffer. When we throw away levels, we will update levels_written_ accordingly. You are bringing up a good point here. We actually can read the values that we want to skip into a separate buffer and throw them away, which will then reduce the amount of shifting that we have to do. It can make the code a bit more complicated though since I need to consume the values from this buffer first, then read into the scratch buffer, and if anything is left transfer it over. I will keep this in mind as an optimization on top of this pull request. |
||||||||||||||
| int16_t* rep_levels = this->rep_levels() + levels_written_; | ||||||||||||||
|
|
||||||||||||||
| int64_t levels_read = 0; | ||||||||||||||
| levels_read = this->ReadDefinitionLevels(batch_size, def_levels); | ||||||||||||||
| if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { | ||||||||||||||
| throw ParquetException( | ||||||||||||||
| "Number of decoded rep / def levels did not match"); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| levels_written_ += levels_read; | ||||||||||||||
|
|
||||||||||||||
| // Look at the buffered levels, delimit them based on | ||||||||||||||
| // (rep_level == 0), report back how many records are in there, and | ||||||||||||||
| // fill in how many not-null values (def_level == max_def_level_). | ||||||||||||||
| // DelimitRecords updates levels_position_. | ||||||||||||||
| int64_t remaining_records = num_records - skipped_records; | ||||||||||||||
| int64_t start_levels_position = levels_position_; | ||||||||||||||
| int64_t values_seen = 0; | ||||||||||||||
| skipped_records += DelimitRecords(remaining_records, &values_seen); | ||||||||||||||
| this->ConsumeBufferedValues(levels_position_ - start_levels_position); | ||||||||||||||
| ReadAndThrowAway(values_seen); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| return skipped_records; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Read 'num_values' values and throw them away. | ||||||||||||||
| int64_t ReadAndThrowAway(int64_t num_values) { | ||||||||||||||
| int64_t values_left = num_values; | ||||||||||||||
| int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be initialized from a constant or config?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||
| int64_t values_read = 0; | ||||||||||||||
|
|
||||||||||||||
| // This will be enough scratch space to accommodate 16-bit levels or any | ||||||||||||||
| // value type | ||||||||||||||
| int value_size = type_traits<DType::type_num>::value_byte_size; | ||||||||||||||
| std::shared_ptr<ResizableBuffer> scratch = AllocateBuffer( | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i forget how this works for variable length types, is it still sufficient for those?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should work since the value for the variable length types is technically a length and a pointer. I will add a separate test for them.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In parquet/types.h template <>
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the test for ByteArray. |
||||||||||||||
| this->pool_, batch_size * std::max<int>(sizeof(int16_t), value_size)); | ||||||||||||||
| do { | ||||||||||||||
| batch_size = std::min<int>(batch_size, values_left); | ||||||||||||||
| values_read = this->ReadValues( | ||||||||||||||
| batch_size, reinterpret_cast<T*>(scratch->mutable_data())); | ||||||||||||||
| values_left -= values_read; | ||||||||||||||
| } while (values_read > 0 && values_left > 0); | ||||||||||||||
| return num_values - values_left; | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should there be a validation here on values_read and num_values?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a check to check the result once we return from this function. Is that what you meant?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, apparently one call site was updated to check the result. Why not check the result here instead?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| int64_t SkipRecords(int64_t num_records) override { | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that all SKIP operations called below actually read and then discard some records (unless the remaining values of current page can be skipped). Why not simply calling ReadRecords(num_records) internally and reset the buffers?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea is to enable Skipping entire pages at a time if the number of records to skip is sufficiently large.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain where the "Skipping entire pages at a time" part happens?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To give some context, suppose that we have M records in the column chunk and we want to read only record N from the column. In that case, we can do something like this. SkipRecords(N-1), ReadRecords(1), SkipRecords(M - N). We can do some optimizations here. For example for the skip to the end of the column chunk, we would not need to even look at page headers.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, but are those optimizations done in this PR?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, here is what we have now: Consider a non-repeated field, and that there are 10 pages with 100 values each. SkipRecords(900) will skip "decoding" the first 9 pages. It will still look at the page headers to find out how many values there are per page.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Can you point where this happens?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It happens in TypedColumnReader::Skip.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahah. I see, thanks. |
||||||||||||||
| // Top level required field. Number of records equals to number of levels, | ||||||||||||||
| // and there is not read-ahead for levels. | ||||||||||||||
| if (this->max_rep_level_ == 0 && this->max_def_level_ == 0) { | ||||||||||||||
| return this->Skip(num_records); | ||||||||||||||
| } | ||||||||||||||
| // Non-repeated optional field. | ||||||||||||||
| int64_t skipped_records = 0; | ||||||||||||||
| if (this->max_rep_level_ == 0) { | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||
| if (this->has_values_to_process()) { | ||||||||||||||
| // First consume whatever is in the buffer. | ||||||||||||||
| skipped_records = SkipRecordsInBufferNonRepeated(num_records); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // If there are more records left, we should have exhausted all the | ||||||||||||||
| // buffer. | ||||||||||||||
| ARROW_DCHECK(!this->has_values_to_process() || | ||||||||||||||
| skipped_records < num_records); | ||||||||||||||
| // For records that we have not buffered, we will use the column | ||||||||||||||
| // reader's Skip to do the remaining Skip. Since the field is not | ||||||||||||||
| // repeated number of levels to skip is the same as number of records | ||||||||||||||
| // to skip. | ||||||||||||||
| skipped_records += this->Skip(num_records - skipped_records); | ||||||||||||||
| } else { | ||||||||||||||
| skipped_records += this->SkipRecordsRepeated(num_records); | ||||||||||||||
| } | ||||||||||||||
| return skipped_records; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| // We may outwardly have the appearance of having exhausted a column chunk | ||||||||||||||
| // when in fact we are in the middle of processing the last batch | ||||||||||||||
|
|
@@ -1357,7 +1508,8 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>, | |||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Process written repetition/definition levels to reach the end of | ||||||||||||||
| // records. Process no more levels than necessary to delimit the indicated | ||||||||||||||
| // records. Only used for repeated fields. | ||||||||||||||
| // Process no more levels than necessary to delimit the indicated | ||||||||||||||
| // number of logical records. Updates internal state of RecordReader | ||||||||||||||
| // | ||||||||||||||
| // \return Number of records delimited | ||||||||||||||
|
|
@@ -1494,8 +1646,6 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>, | |||||||||||||
| levels_capacity_ = levels_remaining; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| records_read_ = 0; | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we rename records_read_ to records_processed_ and update it accordingly? It is useful when we want to check the current position of the reader.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This variable is currently not used anywhere, so I am inclined towards removing it since it will be another member variable to keep updated. We could always add it back later and actually use it for checking the current position of the reader. |
||||||||||||||
|
|
||||||||||||||
| // Call Finish on the binary builders to reset them | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -1530,7 +1680,7 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>, | |||||||||||||
| int64_t ReadRecordData(int64_t num_records) { | ||||||||||||||
| // Conservative upper bound | ||||||||||||||
| const int64_t possible_num_values = | ||||||||||||||
| std::max(num_records, levels_written_ - levels_position_); | ||||||||||||||
| std::max<int>(num_records, levels_written_ - levels_position_); | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. int64_t?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||
| ReserveValues(possible_num_values); | ||||||||||||||
|
|
||||||||||||||
| const int64_t start_levels_position = levels_position_; | ||||||||||||||
|
|
@@ -1542,7 +1692,7 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>, | |||||||||||||
| } else if (this->max_def_level_ > 0) { | ||||||||||||||
| // No repetition levels, skip delimiting logic. Each level represents a | ||||||||||||||
| // null or not null entry | ||||||||||||||
| records_read = std::min(levels_written_ - levels_position_, num_records); | ||||||||||||||
| records_read = std::min<int>(levels_written_ - levels_position_, num_records); | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. int64_t?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||
|
|
||||||||||||||
| // This is advanced by DelimitRecords, which we skipped | ||||||||||||||
| levels_position_ += records_read; | ||||||||||||||
|
|
||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -164,7 +164,7 @@ class TypedColumnReader : public ColumnReader { | |
| // may be less than the number of repetition and definition levels. With | ||
| // nested data this is almost certainly true. | ||
| // | ||
| // Set def_levels or rep_levels to nullptr if you want to skip reading them. | ||
| // Set def_levels or rep_levels to nullptr if you want to reading them. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change doesn't look right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| // This is only safe if you know through some other source that there are no | ||
| // undefined values. | ||
| // | ||
|
|
@@ -279,6 +279,10 @@ class RecordReader { | |
| /// \brief Attempt to read indicated number of records from column chunk | ||
| /// \return number of records read | ||
| virtual int64_t ReadRecords(int64_t num_records) = 0; | ||
|
|
||
| /// \brief Attempt to skip indicated number of records from column chunk | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets add a detail here to clarify this is the numer of rows records and not Values.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| /// \return number of records skipped | ||
|
pitrou marked this conversation as resolved.
|
||
| virtual int64_t SkipRecords(int64_t num_records) = 0; | ||
|
|
||
| /// \brief Pre-allocate space for data. Results in better flat read performance | ||
| virtual void Reserve(int64_t num_values) = 0; | ||
|
|
@@ -299,7 +303,8 @@ class RecordReader { | |
| /// process | ||
| virtual bool HasMoreData() const = 0; | ||
|
|
||
| /// \brief Advance record reader to the next row group | ||
| /// \brief Advance record reader to the next row group. Must be set before | ||
| /// any records could be read/skipped. | ||
| /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader | ||
| virtual void SetPageReader(std::unique_ptr<PageReader> reader) = 0; | ||
|
|
||
|
|
@@ -326,10 +331,13 @@ class RecordReader { | |
| int64_t levels_position() const { return levels_position_; } | ||
|
|
||
| /// \brief Number of definition / repetition levels that have been written | ||
| /// internally in the reader | ||
| /// internally in the reader. This may be larger than values_written() because | ||
| // for repeated fields, we need to look at the levels in advance to figure out | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: comment style, continue 3-level comments (I believe these are special for doxygen).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. I changed the comments a bit. |
||
| // the record boundaries. Once we found the boundaries, we know exactly how | ||
| // many values to read. So we do not have buffering for the values. | ||
| int64_t levels_written() const { return levels_written_; } | ||
|
|
||
| /// \brief Number of nulls in the leaf | ||
| /// \brief Number of nulls in the leaf that we have read so far. | ||
| int64_t null_count() const { return null_count_; } | ||
|
|
||
| /// \brief True if the leaf values are nullable | ||
|
|
@@ -339,27 +347,49 @@ class RecordReader { | |
| bool read_dictionary() const { return read_dictionary_; } | ||
|
|
||
| protected: | ||
| // Indicates if we can have nullable values. | ||
| bool nullable_values_; | ||
|
|
||
| bool at_record_start_; | ||
| int64_t records_read_; | ||
|
|
||
| // Stores values. These values are populated based on each ReadRecords call. | ||
| // No extra values are buffered for the next call. SkipRecords will not | ||
| // add any value to this buffer. | ||
| std::shared_ptr<::arrow::ResizableBuffer> values_; | ||
| // In the case of false (BYTE_ARRAY), don't allocate the values buffer | ||
| // (when we directly read into builder classes). | ||
| bool uses_values_; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why was this moved up?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I must have re-ordered it to improve readability. |
||
|
|
||
| // Values that we have read into 'values_' + 'null_count_'. | ||
| int64_t values_written_; | ||
| int64_t values_capacity_; | ||
| int64_t null_count_; | ||
|
|
||
| int64_t levels_written_; | ||
| int64_t levels_position_; | ||
| int64_t levels_capacity_; | ||
|
|
||
| std::shared_ptr<::arrow::ResizableBuffer> values_; | ||
| // In the case of false, don't allocate the values buffer (when we directly read into | ||
| // builder classes). | ||
| bool uses_values_; | ||
|
|
||
| // Each element corresponds to one element in 'values_' and specifies if it | ||
| // is null or not null. | ||
| std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; | ||
|
|
||
| // Buffers for repetition and definition levels. These buffers may have | ||
| // more levels than is actually read. This is because we read levels ahead to | ||
| // figure our record boundaries for repeated fields. 'levels_written_' shows | ||
| // the total number of levels that is in the buffer. 'levels_position_' points | ||
| // to the next level that should be read. ReadRecords and SkipRecords both | ||
| // advance 'levels_written_' and 'levels_position_'. | ||
| // For flat required fields, 'def_levels_' and 'rep_levels_' are not | ||
| // populated. For non-repeated fields 'rep_levels_' is not populated. | ||
| // 'def_levels_' and 'rep_levels_' must be of the same size if present. | ||
| std::shared_ptr<::arrow::ResizableBuffer> def_levels_; | ||
| std::shared_ptr<::arrow::ResizableBuffer> rep_levels_; | ||
|
|
||
| /// \brief Number of definition / repetition levels that have been written | ||
| /// internally in the reader. This may be larger than values_written() because | ||
| // for repeated fields, we need to look at the levels in advance to figure out | ||
| // the record boundaries. Once we found the boundaries, we know exactly how | ||
| // many values to read. So we do not have buffering for the values. | ||
| int64_t levels_written_; | ||
| int64_t levels_position_; | ||
| int64_t levels_capacity_; | ||
|
|
||
| bool read_dictionary_ = false; | ||
| }; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be int64_t for the template parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done