Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,26 +274,25 @@ private void replacePartitions(
CommitSummary summary,
String newFlinkJobId,
String operatorId) {
for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
// We don't commit the merged result into a single transaction because for the sequential
// transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied
// to data files from txn1. Committing the merged one will lead to the incorrect delete
// semantic.
for (WriteResult result : e.getValue()) {
ReplacePartitions dynamicOverwrite =
table.newReplacePartitions().scanManifestsWith(workerPool);
// Iceberg tables are unsorted. So the order of the append data does not matter.
// Hence, we commit everything in one snapshot.
ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool);

for (List<WriteResult> writeResults : pendingResults.values()) {
for (WriteResult result : writeResults) {
Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
commitOperation(
table,
branch,
dynamicOverwrite,
summary,
"dynamic partition overwrite",
newFlinkJobId,
operatorId,
e.getKey());
}
}

commitOperation(
table,
branch,
dynamicOverwrite,
summary,
"dynamic partition overwrite",
newFlinkJobId,
operatorId,
pendingResults.lastKey());
}

private void commitDeltaTxn(
Expand All @@ -304,25 +303,29 @@ private void commitDeltaTxn(
String newFlinkJobId,
String operatorId) {
for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
// We don't commit the merged result into a single transaction because for the sequential
// transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied
// to data files from txn1. Committing the merged one will lead to the incorrect delete
// semantic.
for (WriteResult result : e.getValue()) {
long checkpointId = e.getKey();
List<WriteResult> writeResults = e.getValue();

RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
for (WriteResult result : writeResults) {
// Row delta validations are not needed for streaming changes that write equality deletes.
// Equality deletes are applied to data in all previous sequence numbers, so retries may
// push deletes further in the future, but do not affect correctness. Position deletes
// committed to the table in this path are used only to delete rows from data files that are
// being added in this commit. There is no way for data files added along with the delete
// files to be concurrently removed, so there is no need to validate the files referenced by
// the position delete files that are being committed.
RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);

Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
commitOperation(
table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey());
}

// Every Flink checkpoint contains a set of independent changes which can be committed
// together. While it is technically feasible to combine append-only data across checkpoints,
// for the sake of simplicity, we do not implement this (premature) optimization. Multiple
// pending checkpoints here are very rare to occur, i.e. only with very short checkpoint
// intervals or when concurrent checkpointing is enabled.
commitOperation(
table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId);
}
}

Expand Down
Loading