Skip to content

Conversation

@flyrain
Copy link
Contributor

@flyrain flyrain commented Apr 11, 2023

It is common that users want to get the net changes across multiple snapshots. They don't care about what happened in the middle. This PR is to provide an option to output net changes.
cc @aokolnychyi @szehon-ho @RussellSpitzer

@flyrain flyrain requested a review from aokolnychyi April 11, 2023 20:43
@github-actions github-actions bot added the spark label Apr 11, 2023
@anigos
Copy link

anigos commented Apr 12, 2023

More importantly, users avoid further compute to determine the intended change on Key(s). This logic can be compute intensive in nature for bigger dimension or fact tables. Thanks @flyrain for prioritising it.

COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
Copy link
Member

@RussellSpitzer RussellSpitzer Apr 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is netChanges incompatible with "remove_carryovers"? I believe it basically is setup in this pr that net changes means remove carryovers must be true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a list of options to generate changelog before this PR:

  1. Keeping carry-over rows and not computing pre/post images. I double how useful it is since user can directly use the change table by select * from table.changes
  2. Removing carry-over rows, but still no pre/post images. this is the default behavior.
  3. Removing carry-over rows and computing pre/post images.

In this PR, we added the fourth option

  1. Removing carry-over rows, computing pre/post images, and removing intermediate changes across multiple snapshots.

Does that make sense?

Copy link
Contributor

@aokolnychyi aokolnychyi Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 1 can be served as a temp workaround as there is no way to pass options in SQL, so we can't configure snapshot ranges when using changes directly.

Russell does bring a valid point that we can't set compute_updates or remove_carryovers to false if the new flag is true. Let me think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I read it wrong. We take into account net_changes only when computing updates?

Copy link
Contributor Author

@flyrain flyrain Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is nice to put net_changes under compute_updates, even though we can do net changes without compute updates. In a combination way, we provide less options to users for better UX. Users won’t be like “Wow, there are so many knobs, which one or combination should I use.”

I’m open to suggestion though. Let me know how useful is net changes without compute updates. We can provide this option if we have a strong reason.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is remove_carryover would be better as a three-way enum: 'keep', 'remove' // per snapshot, 'remove_net'. Right now there's 3 knobs so 8 combos, vs 6? Or did I mis-understand?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are combinations:

  1. no remove_carryovers
  2. remove_carryovers, single-snapshot
  3. remove_carryovers, net change/cross-snapshot
  4. compute_update with remove_carryovers, single-snapshot
  5. compute_update with remove_carryovers, net change/cross-snapshot

1, 2, 4 have been released. 3 is this PR. 5 is TBD

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flyrain yea, so I think my suggestion to make RemoveCarryOverMode be a three-way enum, and deprecate RemoveCarryOver boolean.

Currently with three boolean configs, user gets 8 possibilites. With it as a three-way enum (none, net-changes, per-snapshot), user gets 6 possibilites. Extra one being compute_update with no remove_carryover, which we can validate against.

wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let's discuss this offline a bit.

Copy link
Contributor Author

@flyrain flyrain Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deprecated the Remove_CarryOvers option per discussion. The procedure will always remove carryovers. With deprecation, we got 4 options now.

  1. remove_carryovers, single-snapshot
  2. remove_carryovers, net change/cross-snapshot
  3. compute_update with remove_carryovers, single-snapshot
  4. compute_update with remove_carryovers, net change/cross-snapshot

@aokolnychyi
Copy link
Contributor

Let me take a look today.

@flyrain flyrain force-pushed the cdc-across-snapshot branch from 675f38f to dd95e5f Compare May 19, 2023 00:13
* is 1-to-1 mapping to snapshot id.
* </ul>
*/
public class RemoveNetCarryoverIterator extends RemoveCarryoverIterator {
Copy link
Contributor Author

@flyrain flyrain May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've got this new iterator that can do everything RemoveCarryoverIterator can and more. I'm thinking we could merge them. It'd use more memory since it caches rows. If the memory increase isn't an issue, this could simplify our code. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use the list here? Don't we only need to know the first row and the last?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look a bit more. We should be able to archive the same without a list. Let me post a new commit soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the list since we can either cache a delete row or a insert row. But looking a bit more. It is a either-or at any time. We will never have cached rows with mixed insert/delete, since we will remove them as a pair in that case. For example, cached rows could be

(1, 'a', delete)
(1, 'a', delete)
(1, 'a', delete)

or

(1, 'a', insert)
(1, 'a', insert)

But it will never be

(1, 'a', delete)
(1, 'a', insert)

With that, a cached row count is good enough. Removed the list in the new commit. This new iterator can do more than its parent with the same cost. I think it is good idea to just keep one. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one iterator is fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me merge them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code becomes entangled when I merge them. Too many if-else clauses. We'd better off leave as it is.

@flyrain
Copy link
Contributor Author

flyrain commented May 19, 2023

Thanks @RussellSpitzer @amogh-jahagirdar @anigos @aokolnychyi for the review. This is ready for another look.

@flyrain
Copy link
Contributor Author

flyrain commented Jun 5, 2023

@RussellSpitzer , thanks a lot for the review. Ready for another look.

@flyrain
Copy link
Contributor Author

flyrain commented Jun 15, 2023

cc @szehon-ho

}

private int[] generateIndicesToIdentifySameRow() {
int changeOrdinalIndex = rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit lost why this method is not the same as RemoveCarryoverIterator? Are 'changeOrdinalIndex' and 'snapshotIdIndex' not in the rows of the other iterator?

Copy link
Contributor Author

@flyrain flyrain Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indices generated here is used for comparing if two records are the same. In RemoveCarryoverIterator, we consider two records are different if their changeOrdinalIndex and/or snapshotIdIndex are not the same, while we may consider two records are the same in RemoveNetCarryoverIterator even if these two columns are different. We will need the snapshot boundary in RemoveCarryoverIterator since it is only for one single snapshot. For example, we cannot merge the following two rows in RemoveCarryoverIterator, while we can in RemoveNetCarryoverIterator

(1, 'a', insert, 'snapshot-1')
(1, 'a', delete, 'snapshot-2')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cant we share the code by doing the same thing and making a set to identify metadata column index?

  private int[] generateIndicesToIdentifySameRow() {
    Set<Integer> metadataColumnIndices = Sets.newHashSet(
        rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name()),
        rowType().fieldIndex(MetadataColumns.COMMIT_SNAPSHOT_ID.name()),
        changeTypeIndex());
    return generateIndicesToIdentifySameRow(metadataColumnIndices);
  }

  private int[] generateIndicesToIdentifySameRow(Set<Integer> metadataColumnIndices) {
    int[] indices = new int[rowType().size() - metadataColumnIndices.size()];

    for (int i = 0, j = 0; i < indices.length; i++) {
      if (!metadataColumnIndices.contains(i)) {
        indices[j] = i;
        j++;
      }
    }
    return indices;
  }

From RemoveCarryoverIterator, the set will be only changeTypeIndex? Let me know if I miss something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract the method out. Thanks for the suggestion.

COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is remove_carryover would be better as a three-way enum: 'keep', 'remove' // per snapshot, 'remove_net'. Right now there's 3 knobs so 8 combos, vs 6? Or did I mis-understand?

@szehon-ho
Copy link
Member

Also one more comment, we should make this change on 3.4 as well (latest Spark version)

@flyrain
Copy link
Contributor Author

flyrain commented Jun 22, 2023

Also one more comment, we should make this change on 3.4 as well (latest Spark version)

definitely, will add it to 3.4 right after this is merged.

@github-actions github-actions bot added the build label Jun 22, 2023
@flyrain
Copy link
Contributor Author

flyrain commented Jun 22, 2023

Thanks a lot for the review, @szehon-ho! Resolved the comments, and it's ready for another look.

systemProp.defaultHiveVersions=2
systemProp.knownHiveVersions=2,3
systemProp.defaultSparkVersions=3.4
systemProp.defaultSparkVersions=3.4,3.3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unncessary change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove it.

COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flyrain yea, so I think my suggestion to make RemoveCarryOverMode be a three-way enum, and deprecate RemoveCarryOver boolean.

Currently with three boolean configs, user gets 8 possibilites. With it as a three-way enum (none, net-changes, per-snapshot), user gets 6 possibilites. Extra one being compute_update with no remove_carryover, which we can validate against.

wdyt?

nextRow = null;
} else {
// two rows with same change types means potential net changes
nextRow = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, let's move it out of the if/else. Even Intellij suggests 'Common part can be extracted from 'if'

return currentRow;
}

Row nextRow = rowIterator().next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not clear:


  @Override
  public Row next() {
    // if there are cached rows, return one of them from the beginning
    if (cachedRowCount > 0) {
      cachedRowCount--;
      return cachedRow;
    }

    this.cachedRow = getCurrentRow();

    // return it directly if the current row is the last row
    if (!rowIterator().hasNext()) {
      return cachedRow;
    }

    this.cachedNextRow = rowIterator().next();
    cachedRowCount = 1;

    // pull rows from the iterator until two consecutive rows are different
    while (isSameRecord(cachedRow, cachedNextRow)) {
      if (oppositeChangeType(cachedRow, cachedNextRow)) {
        // two rows with opposite change types means no net changes, remove both
        cachedRowCount--;
      } else {
        // two rows with same change types means potential net changes, cache the next row, reset it
        // to null
        cachedRowCount++;
      }

      // stop pulling rows if there is no more rows or the next row is different
      if (cachedRowCount <= 0 || !rowIterator().hasNext()) {
        this.cachedNextRow = null;
        break;
      }

      this.cachedNextRow = rowIterator().next();
    }

    return null;
  }

I think it will work to remove 'currentRow'. But I am not sure if I am too aggressive for removing 'nextRow'. Please check if I made a mistake.


private boolean isSameRecord(Row currentRow, Row nextRow) {
for (int idx : indicesToIdentifySameRow) {
protected boolean isSameRecord(Row currentRow, Row nextRow) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code-wise, I am thinking that it is a bit harder to read to have RemoveNetCarryoverIterator to extend RemoveCarryoverIterator. So was thinking we can move have an extra base class, then its easier to see what is the different methods and what is same, not sure what you think. Now its a bit trickier to see that.

}

private int[] generateIndicesToIdentifySameRow() {
int changeOrdinalIndex = rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cant we share the code by doing the same thing and making a set to identify metadata column index?

  private int[] generateIndicesToIdentifySameRow() {
    Set<Integer> metadataColumnIndices = Sets.newHashSet(
        rowType().fieldIndex(MetadataColumns.CHANGE_ORDINAL.name()),
        rowType().fieldIndex(MetadataColumns.COMMIT_SNAPSHOT_ID.name()),
        changeTypeIndex());
    return generateIndicesToIdentifySameRow(metadataColumnIndices);
  }

  private int[] generateIndicesToIdentifySameRow(Set<Integer> metadataColumnIndices) {
    int[] indices = new int[rowType().size() - metadataColumnIndices.size()];

    for (int i = 0, j = 0; i < indices.length; i++) {
      if (!metadataColumnIndices.contains(i)) {
        indices[j] = i;
        j++;
      }
    }
    return indices;
  }

From RemoveCarryoverIterator, the set will be only changeTypeIndex? Let me know if I miss something.

@flyrain
Copy link
Contributor Author

flyrain commented Jun 27, 2023

The test failure is unrelated. It failed in Spark3.4, which is not touched by this PR

TestStructuredStreamingRead3 > [catalogName = testhive, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hive, default-namespace=default}] > testReadingStreamFromFutureTimetsamp[catalogName = testhive, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hive, default-namespace=default}] FAILED
    org.opentest4j.AssertionFailedError: 
    Expecting value to be true but was false
        at [email protected]/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at [email protected]/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

@flyrain
Copy link
Contributor Author

flyrain commented Jun 27, 2023

Thanks a lot for the review, @szehon-ho. Ready for another look.

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey looks good, had a few style nits

Row currentRow = currentRow();

if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) {
if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, can we reverse the order of equals, to reduce chance of NPE? (and for the other places in the method)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, but it sacrifice the readability. I will add a null check here. We are going to fail it if it is null.

  protected String changeType(Row row) {
    String changeType = row.getString(changeTypeIndex());
    Preconditions.checkNotNull(changeType, "Change type should not be null");
    return changeType;
  }

RemoveCarryoverIterator(Iterator<Row> rowIterator, StructType rowType) {
super(rowIterator, rowType);
this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(rowType.size());
this.rowType = rowType;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if we pass the variable to the super class , can we just store it in super class and get it via super method? (I think cleaner if subclass has only fields that it only knows about).

Also looks like both subclass do this.


// If the current row is a delete row, drain all identical delete rows
if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) {
if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment to change the order (I guess its from a previous change, but while we are here)

private final int[] indicesToIdentifySameRow;
private final StructType rowType;

private Row cachedNextRow = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: rely on java default

@flyrain flyrain merged commit 1d88e80 into apache:master Jun 29, 2023
@flyrain
Copy link
Contributor Author

flyrain commented Jun 29, 2023

Thanks @anigos @aokolnychyi @RussellSpitzer @szehon-ho for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants