Skip to content

Conversation

@wypoon
Copy link
Contributor

@wypoon wypoon commented Oct 21, 2022

There is a bug in Parquet vectorized reads reported in #5927.
This bug happens when reading a Parquet data file (using the BatchDataReader) that is bigger than the split size, and there are deletes that need to be applied to the data file. The cause of the bug is that ColumnarBatchReader#setRowGroupInfo is not called with the correct rowPosition, and that is because in ReadConf, generateOffsetToStartPos(Schema) returns null due to an optimization. (When this happens, the startRowPositions array is always populated with 0s, and thus ColumnarBatchReader#setRowGroupInfo gets called with rowPosition 0 even when the rowPosition is that of the second or subsequent row group. In ColumnarBatchReader, setRowGroupInfo initializes a rowStartPosInBatch field, which is used to determine where in the PositionDeleteIndex to start applying deletes from. When rowStartPosInBatch is incorrectly initialized, the indexes of positional deletes are not correctly aligned with the rows in the data file.)
The fix is to ensure that when there are deletes, the Schema has the _pos metadata column in it. Then ReadConf#generateOffsetToStartPos(Schema) will generate the necessary Map that is used to compute the startRowPositions.

Added a unit test that reproduces the problem without this fix. Without this fix, the test passes for non-vectorized read and fails for vectorized read. With this fix, the test passes for both cases.

@wypoon
Copy link
Contributor Author

wypoon commented Oct 21, 2022

@flyrain I believe you implemented the support for row-level deletes in the vectorized reader. Can you please review this? Also @aokolnychyi @RussellSpitzer @chenjunjiedada.

Copy link
Collaborator

@chenjunjiedada chenjunjiedada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Could you please help to backport to spark 3.1 and 3.2 as well?

@wypoon
Copy link
Contributor Author

wypoon commented Oct 21, 2022

Thanks for reviewing, @chenjunjiedada. I do plan to port this to Spark 3.2 once this is approved by a committer. Let me look into 3.1 as well. I also think this bug needs to be fixed in the 1.0.x branch.

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the solution. Thanks @wypoon for the fix!
Not a blocker. We may do it in another PR, which I think we need more coverage for multiple parquet row groups in terms of deletes read. We can add an option here to generate multiple-row-group parquet files, and reuse the class TestSparkReaderDeletes.

I'm a bit confused of this behavior: ReadConf.startRowPositions is valid only if _pos column exists in the expectedSchema due to #1716. Are there use cases that _pos is absent and we still need ReadConf.startRowPositions? By looking at the class VectorizedParquetReader and ParquetReader who are consuming ReadConf.startRowPositions, it seems likely the schema doesn't have _pos.
cc @chenjunjiedada @aokolnychyi

@wypoon
Copy link
Contributor Author

wypoon commented Oct 22, 2022

I'm a bit confused of this behavior: ReadConf.startRowPositions is valid only if _pos column exists in the expectedSchema due to #1716. Are there use cases that _pos is absent and we still need ReadConf.startRowPositions? By looking at the class VectorizedParquetReader and ParquetReader who are consuming ReadConf.startRowPositions, it seems likely the schema doesn't have _pos.

I too was surprised by the behavior. In my example, before my fix, when the query

select count(*) from default.test_iceberg where e is null

is run after the update, the Schema that is passed to ReadConf#generateOffsetToStartPos(Schema) is

{
  5: e: optional double
}

so it did not have _pos.

@flyrain are you asking if there are other cases where _pos will still be absent after this fix and we need ReadConf#startRowPositions() to return a valid startRowPositions?

@chenjunjiedada
Copy link
Collaborator

I'm a bit confused of this behavior: ReadConf.startRowPositions is valid only if _pos column exists in the expectedSchema due to #1716. Are there use cases that _pos is absent and we still need ReadConf.startRowPositions? By looking at the class VectorizedParquetReader and ParquetReader who are consuming ReadConf.startRowPositions, it seems likely the schema doesn't have _pos. cc @chenjunjiedada @aokolnychyi

The row group start positions are always computed but are only correct when it is projected right now. That's intended because we don't want to read the parquet footer one more time. But since the footer must be read at least once, we should be able to cache some content during the first access to avoid the current optimization logic and thus simply the logic to check _pos column.

@flyrain
Copy link
Contributor

flyrain commented Oct 22, 2022

@flyrain are you asking if there are other cases where _pos will still be absent after this fix and we need ReadConf#startRowPositions() to return a valid startRowPositions?

Yes, I have concerns for this case. But I guess it is fine since VectorizedArrowReader::setRowGroupInfo() doesn't consume the rowPosition, which means it still works fine even rowPosition is off. The problem happens only if we want to rely on the row positions.

By digging a bit more, ReadConf.columnChunkMetadataForRowGroups keeps the row group metadata. Wondering if we can calculate the row positions while generating the columnChunkMetadataForRowGroups, so that the row position will always be correct and we don't have to read the metadata twice. What do you think, @chenjunjiedada? I guess that's what you mentioned to avoid optimization.

private List<Map<ColumnPath, ColumnChunkMetaData>> getColumnChunkMetadataForRowGroups() {

@chenjunjiedada
Copy link
Collaborator

@flyrain, Correct, that is it.

@wypoon
Copy link
Contributor Author

wypoon commented Oct 24, 2022

@flyrain for the test part, I followed your suggestion and added a test in TestSparkReaderDeletes instead (removing the earlier one).

.validateDataFilesExist(posDeletes.second())
.commit();

Assert.assertEquals(193, rowSet(tblName, tbl, "*").size());
Copy link
Contributor Author

@wypoon wypoon Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix, this assertion fails for the vectorized case.
There are 3 deletes applied to the first row group and 4 deletes applied to the second row group. Without the fix, the 3 deletes for the first row group are applied to the second as well (instead of the 4 that should be applied). Thus 6 rows are deleted (instead of 7) and the result is 194 rows, instead of the expected 193.

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Thanks @wypoon for the fix.

wypoon added a commit to wypoon/iceberg that referenced this pull request Oct 24, 2022
@flyrain flyrain changed the title Spark: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly Spark3.3: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly Oct 24, 2022
@flyrain flyrain changed the title Spark3.3: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly Spark 3.3: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly Oct 24, 2022
@flyrain flyrain merged commit c8a25d4 into apache:master Oct 24, 2022
@flyrain
Copy link
Contributor

flyrain commented Oct 24, 2022

Merged. Thanks @wypoon. Thanks @chenjunjiedada for the review.

@flyrain
Copy link
Contributor

flyrain commented Oct 25, 2022

@flyrain, Correct, that is it.

Hi @chenjunjiedada , filed PR #6056 based on the discussion. Please take a look. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants