Add Iceberg equality delete file reader#16871
Add Iceberg equality delete file reader#16871apurva-meta wants to merge 3 commits intofacebookincubator:mainfrom
Conversation
Summary: - Add explicit equality deletes NYI branch in prepareSplit() - Improve VELOX_NYI error messages with descriptive content type info - Fix FILE handle leaks in IcebergReadTest by extracting getTestFileSize() helper - Minor doc comment formatting improvements in IcebergSplitReader.h Differential Revision: D97530140
✅ Deploy Preview for meta-velox canceled.
|
|
@apurva-meta has exported this pull request. If you are a Meta employee, you can view the originating Diff in D97530141. |
0a911ca to
cb6324c
Compare
cb6324c to
8249f53
Compare
a03eb81 to
83d5a3e
Compare
83d5a3e to
243eef6
Compare
Summary: Add deletion vector (DV) reader to the Velox Iceberg connector, enabling Iceberg V3 spec support for row-level deletes. Iceberg V3 replaces positional delete files with deletion vectors — compact roaring bitmaps stored as blobs inside Puffin files. Compared to V2 positional delete files, DVs are more compact and avoid sorted merge of multiple delete files at read time. Changes: - IcebergDeleteFile.h: Add FileContent::kDeletionVector enum value - DeletionVectorReader.h/cpp: New reader that loads a Puffin blob, deserializes the roaring bitmap portable format (array, bitset, and run containers), and sets bits in the deleteBitmap for the current batch range. No CRoaring dependency — self-contained parser. - IcebergSplitReader.h: Add deletionVectorReaders_ member, include header - IcebergSplitReader.cpp: Route kDeletionVector in prepareSplit(), apply DVs alongside positional deletes in next() - BUCK: Add DeletionVectorReader library target Differential Revision: D97530142
| std::vector<TypePtr> equalityColumnTypes_; | ||
|
|
||
| /// Column indices in the delete file output vector. | ||
| std::vector<column_index_t> deleteColumnIndices_; |
There was a problem hiding this comment.
How do you think about the complex data type column delete, such as ROW(ROW(a))
| /// @param connectorId Connector identifier. | ||
| EqualityDeleteFileReader( | ||
| const IcebergDeleteFile& deleteFile, | ||
| const std::vector<std::string>& equalityColumnNames, |
There was a problem hiding this comment.
Could you use IcebergMetadataColumn as equalityColumn specifier, this will help schema evolution and interface consistence
| // Use a separate bitmap for equality deletes to track which rows to | ||
| // remove from the output. | ||
| BufferPtr eqDeleteBitmap = AlignedBuffer::allocate<bool>( | ||
| numRows, connectorQueryCtx_->memoryPool()); |
There was a problem hiding this comment.
bits::nbytes(numRows)
| uint64_t splitOffset_; | ||
| std::list<std::unique_ptr<PositionalDeleteFileReader>> | ||
| positionalDeleteFileReaders_; | ||
| BufferPtr deleteBitmap_; | ||
|
|
||
| /// Readers for Iceberg V3 deletion vectors (Puffin-encoded roaring bitmaps). | ||
| std::list<std::unique_ptr<DeletionVectorReader>> deletionVectorReaders_; |
There was a problem hiding this comment.
std::list -> std::vector
|
|
||
| /// Hashes a single value from a vector at the given index. | ||
| /// Handles lazy vectors via loadedVector(). Returns 0 for null values. | ||
| uint64_t hashValue(const VectorPtr& vectorPtr, vector_size_t index) { |
There was a problem hiding this comment.
Can we use VectorHasher?
|
|
||
| // Use a separate bitmap for equality deletes to track which rows to | ||
| // remove from the output. | ||
| BufferPtr eqDeleteBitmap = AlignedBuffer::allocate<bool>( |
There was a problem hiding this comment.
Maybe we can optimize the eqDeleteBitmap life time for all the inputs later
| auto numRows = outputRowVector->size(); | ||
|
|
||
| // Use a separate bitmap for equality deletes to track which rows to | ||
| // remove from the output. |
There was a problem hiding this comment.
Looks like the usage is similar to SelectivityVector
243eef6 to
5c04444
Compare
Summary: Implements Iceberg equality delete support for the Velox Iceberg connector. Equality delete files contain rows with values for one or more columns (identified by equalityFieldIds). A base data row is deleted if its values match ALL specified columns of ANY row in the delete file. The implementation: - Adds EqualityDeleteFileReader that eagerly reads the entire delete file and builds an in-memory hash multimap of delete key tuples during construction. - Wires EqualityDeleteFileReader into IcebergSplitReader::prepareSplit() to resolve equalityFieldIds to column names/types from the table schema, and into IcebergSplitReader::next() to apply post-read equality delete filtering with row compaction. - Handles lazy vectors from file readers via loadedVector() before accessing values for hashing and comparison. - Supports BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, REAL, DOUBLE, VARCHAR, VARBINARY, and TIMESTAMP column types. Differential Revision: D97530141
5c04444 to
7de3176
Compare
|
@apurva-meta Thanks for this PR. Can we move the changes related to deletion vector reader into a separate PR? |
Summary:
Implements Iceberg equality delete support for the Velox Iceberg connector.
Equality delete files contain rows with values for one or more columns
(identified by equalityFieldIds). A base data row is deleted if its values
match ALL specified columns of ANY row in the delete file.
The implementation:
and builds an in-memory hash multimap of delete key tuples during
construction.
to resolve equalityFieldIds to column names/types from the table schema,
and into IcebergSplitReader::next() to apply post-read equality delete
filtering with row compaction.
accessing values for hashing and comparison.
VARCHAR, VARBINARY, and TIMESTAMP column types.
Differential Revision: D97530141