Skip to content

Conversation

@bezdomniy
Copy link
Contributor

Added "append" snapshot operation to org.apache.iceberg.flink.sink.dynamic.DynamicIcebergSink
Used the same logic as currently in org.apache.iceberg.flink.sink.FlinkSink
Tested with org.apache.iceberg.flink.source.IcebergSource using StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT and it resolves the issue described in #14526

@github-actions github-actions bot added the flink label Nov 11, 2025
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
}
String description = "append";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline after the block. See: https://iceberg.apache.org/contribute/#block-spacing

branch,
appendFiles,
summary,
description,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the string value here? We don't reuse the description variable anywhere else.

Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
if (summary.deleteFilesCount() == 0) {
// To be compatible with iceberg format V1.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change this comment to describe correctly why we do this?

@pvary
Copy link
Contributor

pvary commented Nov 11, 2025

Please add test cases for the new feature to ensure the fix isn’t accidentally reverted in the future.

Also, implement the changes only in the latest Flink version (Flink 2.1). This will speed up the review process since you won’t need to ensure every requested change is merged across all versions. We’ll create a separate backport PR later to apply the agreed changes to older Flink versions.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR @bezdomniy! Initially, we only change code in the latest Flink version (2.1). After merging the changes, we backport to the older versions. Could you remove the 1.20 and 2.0 changes?

I've left some suggestions inline.

// the position delete files that are being committed.
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
if (summary.deleteFilesCount() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the granularity of this value, as every pending result could contain or not contain deletes. Probably best to check the WriteResults directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deleteFilesCount should be correct.

This is how it is calculated:

And internally:

public void addAll(NavigableMap<Long, List<WriteResult>> pendingResults) {
pendingResults.values().forEach(writeResults -> writeResults.forEach(this::addWriteResult));
}
private void addWriteResult(WriteResult writeResult) {
dataFilesCount.addAndGet(writeResult.dataFiles().length);
Arrays.stream(writeResult.dataFiles())
.forEach(
dataFile -> {
dataFilesRecordCount.addAndGet(dataFile.recordCount());
dataFilesByteCount.addAndGet(dataFile.fileSizeInBytes());
});
deleteFilesCount.addAndGet(writeResult.deleteFiles().length);
Arrays.stream(writeResult.deleteFiles())
.forEach(
deleteFile -> {
deleteFilesRecordCount.addAndGet(deleteFile.recordCount());
long deleteBytes = ScanTaskUtil.contentSizeInBytes(deleteFile);
deleteFilesByteCount.addAndGet(deleteBytes);
});
}

Copy link
Contributor

@mxm mxm Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value is correct but we commit at checkpoint level and the delete file count is done across checkpoints. Strictly speaking, there could be both append-only checkpoints and overwrite checkpoints as part of pendingResults.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting decision.
In IcebergSink we commit multiple checkpoints together in a single commit, if we happen to accumulate multiple of them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we fixed #14182, we decided not to do that. In fact, you proposed not to do that: #14182 (comment) 🤓

IMHO this kind of optimization is a bit premature. In practice it is rare to even have multiple pending checkpoints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, as I have mentioned in the comment, this could cause issues if "replacePartition" is used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we should commit them 1-by-1

String operatorId) {
for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
long checkpointId = e.getKey();
List<WriteResult> writeResults = e.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the loop structure? We will need it for both types of snapshots. This should work:

  for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
    long checkpointId = e.getKey();
    List<WriteResult> writeResults = e.getValue();

    boolean appendOnly = true;
    for (WriteResult writeResult : writeResults) {
      if (writeResult.deleteFiles().length > 0) {
        appendOnly = false;
        break;
      }
    }

    final SnapshotUpdate snapshotUpdate;
    if (appendOnly) {
        AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool);
        for (WriteResult result : writeResults) {
          Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
        }
        snapshotUpdate = appendFiles
    } else {
        RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
        for (WriteResult result : writeResults) {
          Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
          Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
        }
       snapshotUpdate = rowDelta;
    }
   
    commitOperation(
          table,
          branch,
          snapshotUpdate,
          summary,
          appendOnly ? "append" : "rowDelta",
          newFlinkJobId,
          operatorId,
          checkpointId);
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks good - only thing left is the checkState for result.referencedDataFiles().length == 0 (which exists in IcebergSink) which I will add to the loop checking it is appendOnly

if (summary.deleteFilesCount() == 0) {
// Use append snapshot operation where possible
AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool);
for (List<WriteResult> resultList : pendingResults.values()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this loop up one level, as in https://github.com/apache/iceberg/pull/14559/files#r2513719871? This avoids repeating it.

@mxm
Copy link
Contributor

mxm commented Nov 18, 2025

Now that #14581 has been merged, we can simplify this PR because RowData automatically creates APPEND commits in the absence of deletes. All we need is one test case for DynamicCommitter to verify that we write append commits when there are no deletes. This is just a safety measurements in case the core code were to change.

Comment on lines +654 to +656

Snapshot snapshot = Iterables.getFirst(table.snapshots(), null);
assertThat(snapshot.operation()).isEqualTo("append");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we already have that test case. @bezdomniy Could you revert the change in main, only keeping this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good - done.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome 😎 LGTM

@pvary pvary merged commit cdf4723 into apache:main Nov 18, 2025
20 checks passed
@pvary
Copy link
Contributor

pvary commented Nov 18, 2025

Merged to main.
Thanks @bezdomniy for highlighting an issue and providing a first fix. Thanks @mxm to come up and fixing this in core.

@bezdomniy: Could you please create the backport for the test to the other Flink branches?
This command can help:

g diff HEAD^ flink/v2.1 |sed "s/v2.1/v2.0/g">/tmp/patch;g apply -3 -p1 /tmp/patch

Please describe in the new PR if the backport is clean, or you have to do manual changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants