-
Notifications
You must be signed in to change notification settings - Fork 3k
Description
Background / context
This is a continuation of work done in PR #2660: reading from iceberg table as an incremental streaming source in spark.
To read a stream out of Iceberg table - the above PR iterates thru all the Iceberg Snapshots (the table history) starting from the very FIRST Snapshot and grabs the list of added files in all these snapshot one-by-one and hands off the rows in them to Spark3 DSv2 reader. However, this along with #2752 - doesn't handle when the streaming reader encounters a SNAPSHOT of type OVERWRITE.
The current issue is a request to extend the implementation to be able to support that.
Implementation/Idea Proposal
What does an Iceberg SNAPSHOT of type OVERWRITE do / have ?
To simplify, Imagine an Iceberg table comprises of just 1 dataFile with 100 rows - row [a1, b1].....[a100, b100]. If a Writer performs a Merge / upsert operation on [a1, b1] to update it to [a1, b1_upserted], then, a Snapshot of type OVERWRITE is written.
Iceberg Table - with spec version 1
In Iceberg version 1 - this Merge/Upsert operation results in a net new Iceberg table SNAPSHOT with this One Single ROW in this dataFile - changed to the new value & the file fully rewritten.
Now, the table snapshots look like
S1 --> [a1, b1], [a2, b2].....[a100, b100]
S2 --> [a1, b1_upserted], [a2, b2].....[a100, b100]
So, when the streaming reader encounters a SNAPSHOT of type OVERWRITE (i.e., S2) - if we stream all the added files - then, this will result restreaming [a2, b2].....[a100, b100] - DUPLICATE data!
This is a limitation in Iceberg table Spec Version 1 & can be solved with version 2 (where the data and delete files are written as the Snapshot!).
So, the proposal here is to implement a Spark option - with a known limitation to replay a lot of duplicate data.
Iceberg Table - with spec version 2
In Iceberg version 2 - this Merge/Upsert operation results in a net new Iceberg table SNAPSHOT with this additional One Single ROW in a NET NEW dataFile and 1 single record in a new DeleteFile.
S1 --> DataFiles { [a1, b1], [a2, b2].....[a100, b100] }
S2 --> DataFiles { [a1, b1], [a2, b2].....[a100, b100] , [a1, b1_upserted]} + DeleteFiles { a=a1 }
In this case - when the Streaming reader encounters a SNAPSHOT of type OVERWRITE (i.e., S2) - the addedFiles perfectly corresponds to the newly added rows - [a1, b1_upserted]!
This with the combination of Skipping SNAPSHOTs of type REPLACE - should avoid the possibility of duplicate data.