Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,26 +274,25 @@ private void replacePartitions(
CommitSummary summary,
String newFlinkJobId,
String operatorId) {
for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
// We don't commit the merged result into a single transaction because for the sequential
// transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied
// to data files from txn1. Committing the merged one will lead to the incorrect delete
// semantic.
for (WriteResult result : e.getValue()) {
ReplacePartitions dynamicOverwrite =
table.newReplacePartitions().scanManifestsWith(workerPool);
// Iceberg tables are unsorted. So the order of the append data does not matter.
// Hence, we commit everything in one snapshot.
ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool);

for (List<WriteResult> writeResults : pendingResults.values()) {
for (WriteResult result : writeResults) {
Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
commitOperation(
table,
branch,
dynamicOverwrite,
summary,
"dynamic partition overwrite",
newFlinkJobId,
operatorId,
e.getKey());
}
}

commitOperation(
table,
branch,
dynamicOverwrite,
summary,
"dynamic partition overwrite",
newFlinkJobId,
operatorId,
pendingResults.lastKey());
Comment on lines +281 to +295
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be:

for (List<WriteResult> writeResults : pendingResults.values()) {
  ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool);
  for (WriteResult result : writeResults) {
    Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
  }

  commitOperation(
      table,
      branch,
      dynamicOverwrite,
      summary,
      "dynamic partition overwrite",
      newFlinkJobId,
      operatorId,
      pendingResults.lastKey());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to commit the checkpoints one-by-one. What if the replace happened for the same partition? With the proposed method we will have duplicated data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be, but IMHO correctness isn't affected. This is a left-over from the previous commit where we would still combine as many WriteResults as possible into a single table snapshot. Since replace partitions is append-only, I figured we could keep this optimization. However, for the sake of consistency with non-replacing writes, we could also go with your suggestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, then the overwrite-mode only should be enabled in batch jobs, as it is very hard to make any claims about the results in a streaming job.

Also, with the current implementation it is also problematic, as we could have multiple data files for a given partition, and then, they will replace each other, and only that last one wins 😢

Also, if the checkpoints are turned on, then we will have a same issue as mentioned above, just with a bit bigger amount of data. The 2nd checkpoint might delete data from the 1st checkpoint, because it is replacing the same partition.

So this means that replace partitions is only working if the checkpointing is turned off (or you are lucky 😄)

So essentially, it doesn't matter which solution we choose 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's probably a topic for another day, as both the IcebergSink and older FlinkSink have this issue. The implementation in this PR is consistent though with how the IcebergSink works, see

for (WriteResult result : pendingResults.values()) {
. Either way, as you said, this feature is questionable to phrase it mildly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline. We can keep your proposed solution here, and we could file another PR to throw an exception/or log an error if checkpointing is on and overwrite flag is used

}

private void commitDeltaTxn(
Expand All @@ -304,25 +303,29 @@ private void commitDeltaTxn(
String newFlinkJobId,
String operatorId) {
for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
// We don't commit the merged result into a single transaction because for the sequential
// transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied
// to data files from txn1. Committing the merged one will lead to the incorrect delete
// semantic.
for (WriteResult result : e.getValue()) {
long checkpointId = e.getKey();
List<WriteResult> writeResults = e.getValue();

RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
for (WriteResult result : writeResults) {
// Row delta validations are not needed for streaming changes that write equality deletes.
// Equality deletes are applied to data in all previous sequence numbers, so retries may
// push deletes further in the future, but do not affect correctness. Position deletes
// committed to the table in this path are used only to delete rows from data files that are
// being added in this commit. There is no way for data files added along with the delete
// files to be concurrently removed, so there is no need to validate the files referenced by
// the position delete files that are being committed.
RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);

Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
commitOperation(
table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey());
}

// Every Flink checkpoint contains a set of independent changes which can be committed
// together. While it is technically feasible to combine append-only data across checkpoints,
// for the sake of simplicity, we do not implement this (premature) optimization. Multiple
// pending checkpoints here are very rare to occur, i.e. only with very short checkpoint
// intervals or when concurrent checkpointing is enabled.
commitOperation(
table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId);
}
}

Expand Down
Loading