-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Core: add delete row reader #2320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| import org.apache.spark.rdd.InputFileBlockHolder; | ||
| import org.apache.spark.sql.catalyst.InternalRow; | ||
|
|
||
| public class DeleteRowReader extends RowDataReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be more precise, this reader will find all rows that has been deleted by equality deletions. How about using the EqualityDeleteRowReader as the class name ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
| public DeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping, | ||
| FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) { | ||
| super(task, schema, schema, nameMapping, io, encryptionManager, | ||
| caseSensitive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: don't have to start a newline.
| return matches.matchEqDeletes(open(task, requiredSchema, idToConstant)).iterator(); | ||
| } | ||
|
|
||
| protected class SparkDeleteMatcher extends DeleteFilter<InternalRow> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually could share the same DeleteFilter for spark engine ? Don't have to introduce the same SparkDeleteFilter for both RowDataReader and DeleteRowReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct.
|
|
||
| @Test | ||
| public void testReadDeleteRow() throws IOException { | ||
| String tableName = "testDeleteRowRead"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need an unit test to cover the case that have multiple version of equality delete schema to verify that the Predicates concatted by OR is working as expected.
| } | ||
|
|
||
| @Test | ||
| public void testReadDeleteRow() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another point, I think we'd better to move those newly added unit tests to the abstracted DeleteReadTests as possible as we can. Then we don't have to introduce this test again for other engines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me. I plan to add an abstract function to read delete rows and that should be implemented in the engine specific test classes. While the engine specific read logic is not ready, I think we can refactor when those are ready.
3bf38c8 to
e41e4ba
Compare
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left a minor comment.
| public EqualityDeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping, | ||
| FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) { | ||
| super(task, schema, schema, nameMapping, io, encryptionManager, caseSensitive); | ||
| this.tableSchema = schema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this tableSchema is actually defined in its parent class RowDataReader, right ? we usually introduce a protected tableSchema() method in RowDataReader to access the schema , rather than defining an extra private member in sub-class.
|
Got this merged, Thanks @chenjunjiedada for contributing ! |
| return isInDeleteSets; | ||
| } | ||
|
|
||
| public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this method need to be public? I'm surprised that it is given that applyEqDeletes is not.
I also think it would be good to have a better name for it, if it does need to be public. This is really just applying a filter, so I think something like keepDeletedRows would be more descriptive.
| } | ||
|
|
||
| private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) { | ||
| private List<Predicate<T>> applyEqDeletes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this wasn't renamed. It doesn't apply equality deletes, so I'd rather use a more descriptive name, like buildEqDeletePredicates.
| filteredRecords = Deletes.filter(filteredRecords, | ||
| record -> projectRow.wrap(asStructLike(record)), deleteSet); | ||
| Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); | ||
| isInDeleteSets.add(isInDeleteSet); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't this return a single predicate, isDeleted? Both findEqualityDeleteRows and applyEqDeletes end up producing a Predicate that determines whether a row is deleted. The only difference is that deletedRows and remainingRows are negations of each other. But those methods could just as easily use isDeleted and negate in shouldKeep.
|
|
||
| private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) { | ||
| // Predicate to test whether a row should be visible to user after applying equality deletions. | ||
| Predicate<T> remainingRows = applyEqDeletes().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no delete predicates, then this should return the original iterable.
| return deletedRows.test(item); | ||
| } | ||
| }; | ||
| return deletedRowsFilter.filter(records); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no delete predicates, this should return CloseableIterable.empty.
| return set; | ||
| } | ||
|
|
||
| protected StructLikeSet rowSetWitIds(int... idsToRetain) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: should be rowSetWithIds
|
@chenjunjiedada and @openinx, thanks for making progress on this! I wanted to catch up on what's happening in this area so I went ahead and did a round of review as well. I think there are a few minor things to improve in a follow-up. I'm also wondering about why this is only focused on equality deletes. Why not return all deleted rows? Is that because we only want equality deletes where this is used? |
|
Thanks @rdblue! I will update your comment in the following PR. The reason for reading deleted rows only from equality deletes is that we want to handle equality delete and position delete separately since the filtering logic and cost are different between equality delete and position delete. So that we could choose proper rewrite actions when streaming the CDC data. I'm also working on position deletes rewrite action to clustering the position deletes inside the partition, which would include a position delete row reader. Does this make sense to you? These two actions are minor compactions and @openinx have a PR that remove all delete rows which I think is major compaction. |
This adds a row reader to read matched delete row for spark side. The next is to implement the reader in other engines.