Skip to content

Conversation

@kbendick
Copy link
Contributor

@kbendick kbendick commented Oct 9, 2021

This PR adds

  1. Explicit handling for type DataOperations.OVERWRITE operations when set the Spark read option "streaming-skip-delete-snapshots" to true (allows for skipping them).
  2. More explicit handling of all possible DataOperations, as well as a catch all clause for unrecognized / unimplemented snapshot operation types.
  3. A unit test to show the new behavior when streaming-skip-delete-snapshots is set to true (to show the behavior and pair with the unit test for the default case, which sets the flag to false).

This closes #3265

Detailed explanation & A Motivating Use Case

I've recently started an investigation into what would be needed for a true streaming CDC source, possibly to be made into a proposal to update the streaming sources (which could also arguably be used for the Flink streaming source) to handle deletions etc. Possibly one of my colleagues at Apple will take over it or I can collaborate with them on it later on depending on my new upcoming workload. Happy to share findings with anybody (though would need access to some of the notes I wrote previously as I've moved on from Apple).

The Spark MicroBatch streaming source can currently only handle snapshots that do not mutate or delete any of the existing rows and still produce a "correct" stream.

This means that it can presently handle two types of snapshots:

  • DataOperations.APPEND: New data files are added to the table.
  • DataOperations.REPLACE: Files are replaced, without changing the row level data in the table (e.g. data file rewrites).

Users can choose to skip "delete" type snapshots via the read option streaming-skip-delete-snapshots, which simply skips the given snapshot if it potentially contains a delete. This will update the streaming source to allow skipping snapshots that contain any kind of row level data mutations.

OVERWRITE type snapshots are a form of delete (as well as insertion) and so one can argue that they should be skippable if users choose to skip deletes.

Consider the case where users are simply interested in using Spark to get an Iceberg source stream of a table they treat as append only (which is more or less what they can do now), but data is then reprocessed for maintenance as part of a batch operation to change its compression type or file type for example. There would be no easy way to skip these out-of-band one time OVERWRITE operations committed to the table (which are more akin to REPLACE operations if only the compression or file format is changed, in that no row level data would actually be changed). Changing a table's compression for past data is not at all unheard of.

However, OVERWRITE snapshots can add data (as opposed to snapshots with true DELETE operation). While it would potentially be possible to grab only the added data files and still skip the deletes in a limited set of cases, given that users are skipping deletes, it seems fair to allow them to skip mixed append and deletes as well.

We could also introduce another read option to skip OVERWRITE, allowing users more granular choices to avoid skipping processing on important table updates.

I'd personally rather the time spent implementing anything more complex than an additional flag go into refactoring the Spark MicroBatch stream to truly produce CDC data (something I have began investigating an API for and researching existing APIs in similar systems, given that it's not supported in Spark natively). But I'm very much open to discussion on this.

When we refactor the spark streaming source to handle deletions, we will of course be sure to handle commits that both delete and add data at the same time. Hence why I think this more explicit addition is good enough for now.

If we don't want to take this relatively simplistic approach, at the very least, a test should be added indicating the intended behavior when "streaming-skip-delete-snapshots" is true, as there's a test showing that OVERWRITE snapshots will fail an Iceberg spark streaming source when the option is not used or is set to false (its default value).

@github-actions github-actions bot added the spark label Oct 9, 2021
Comment on lines -217 to -219
Preconditions.checkState(
op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE),
"Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId());
Copy link
Contributor Author

@kbendick kbendick Oct 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the code handles skipDeletes via the first Preconditions.checkState assertion. Then, if that passed, it checks again that the operation is one of the insert only operations OR delete again (as presumably if the snapshot operation were of type DataOperations.DELETE, the flag skipDelete would have been set to true per the first assertion).

This was a little confusing for me at first seeing it checked twice, once in the negative and once in the positive.

The second assertion here also arguably serves as a way of detecting potentially unhandled DataOperation types. Given that, I chose to use a switch statement to be a bit more explicit about what is and is not allowed, as well as having the benefit of a default case to catch any unhandled DataOperations (should they be added later for example).

If we want to go back to using Preconditions, I'm more than happy to do that. As this code is arguably in the hot path for the driver during scheduling, we could collapse the two calls into one (as they have the same precondition statement anyway) and change the ordering so that string comparisons are reduced (by checking for skipDelete first and then checking for DataOperations.APPEND, which should be the most common case).

Let me know if anybody has issue with the use of the switch statement.

@kbendick
Copy link
Contributor Author

kbendick commented Oct 9, 2021

I see now that there are several open issues and a few open PRs related to this. So I'm probably going to close this PR in favor of only simplifying the shouldProcess preconditions check.

I did some research into CDC recently in other systems but I'm starting a new position soon so I'm not sure what the priority will be yet for where I spend my time. I will know more next week, but am happy to discuss with others if I don't have cycles to work on the Spark streaming source.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Spark streaming source should respect "streaming-skip-delete-snapshots" option for OVERWRITE snapshots

1 participant