-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 3.3: Output the net changes across snapshots in CDC #7326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
978c396
ccdcde9
dd95e5f
79efbaf
cec8dbf
10292fc
516040f
058eb7d
1bcff45
95fa7a6
2b3c5de
5bab4e1
414285c
72617a6
7b20c7c
15d8944
bca5bba
ed90942
ac94c5d
7a5607a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,15 +81,13 @@ public Row next() { | |
| // either a cached record which is not an UPDATE or the next record in the iterator. | ||
| Row currentRow = currentRow(); | ||
|
|
||
| if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) { | ||
| if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, can we reverse the order of equals, to reduce chance of NPE? (and for the other places in the method)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense, but it sacrifice the readability. I will add a null check here. We are going to fail it if it is null. |
||
| Row nextRow = rowIterator().next(); | ||
| cachedRow = nextRow; | ||
|
|
||
| if (sameLogicalRow(currentRow, nextRow)) { | ||
| String nextRowChangeType = nextRow.getString(changeTypeIndex()); | ||
|
|
||
| Preconditions.checkState( | ||
| nextRowChangeType.equals(INSERT), | ||
| changeType(nextRow).equals(INSERT), | ||
| "Cannot compute updates because there are multiple rows with the same identifier" | ||
| + " fields([%s]). Please make sure the rows are unique.", | ||
| String.join(",", identifierFields)); | ||
|
|
@@ -118,7 +116,7 @@ private Row modify(Row row, int valueIndex, Object value) { | |
| } | ||
|
|
||
| private boolean cachedUpdateRecord() { | ||
| return cachedRow != null && cachedRow.getString(changeTypeIndex()).equals(UPDATE_AFTER); | ||
| return cachedRow != null && changeType(cachedRow).equals(UPDATE_AFTER); | ||
| } | ||
|
|
||
| private Row currentRow() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,8 @@ | |
| package org.apache.iceberg.spark; | ||
|
|
||
| import java.util.Iterator; | ||
| import java.util.Set; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
| import org.apache.spark.sql.Row; | ||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
|
|
@@ -55,7 +57,7 @@ class RemoveCarryoverIterator extends ChangelogIterator { | |
|
|
||
| RemoveCarryoverIterator(Iterator<Row> rowIterator, StructType rowType) { | ||
| super(rowIterator, rowType); | ||
| this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(rowType.size()); | ||
| this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -88,7 +90,7 @@ public Row next() { | |
| } | ||
|
|
||
| // If the current row is a delete row, drain all identical delete rows | ||
| if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) { | ||
| if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment to change the order (I guess its from a previous change, but while we are here) |
||
| cachedDeletedRow = currentRow; | ||
| deletedRowCount = 1; | ||
|
|
||
|
|
@@ -98,8 +100,8 @@ public Row next() { | |
| // row is the same record | ||
| while (nextRow != null | ||
| && cachedDeletedRow != null | ||
| && isSameRecord(cachedDeletedRow, nextRow)) { | ||
| if (nextRow.getString(changeTypeIndex()).equals(INSERT)) { | ||
| && isSameRecord(cachedDeletedRow, nextRow, indicesToIdentifySameRow)) { | ||
| if (changeType(nextRow).equals(INSERT)) { | ||
| deletedRowCount--; | ||
| if (deletedRowCount == 0) { | ||
| cachedDeletedRow = null; | ||
|
|
@@ -139,25 +141,8 @@ private boolean hasCachedDeleteRow() { | |
| return cachedDeletedRow != null; | ||
| } | ||
|
|
||
| private int[] generateIndicesToIdentifySameRow(int columnSize) { | ||
| int[] indices = new int[columnSize - 1]; | ||
| for (int i = 0; i < indices.length; i++) { | ||
| if (i < changeTypeIndex()) { | ||
| indices[i] = i; | ||
| } else { | ||
| indices[i] = i + 1; | ||
| } | ||
| } | ||
| return indices; | ||
| } | ||
|
|
||
| private boolean isSameRecord(Row currentRow, Row nextRow) { | ||
| for (int idx : indicesToIdentifySameRow) { | ||
| if (isDifferentValue(currentRow, nextRow, idx)) { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| return true; | ||
| private int[] generateIndicesToIdentifySameRow() { | ||
| Set<Integer> metadataColumnIndices = Sets.newHashSet(changeTypeIndex()); | ||
| return generateIndicesToIdentifySameRow(rowType().size(), metadataColumnIndices); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.