diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java index 451487ecf138..0c408aece5ce 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -121,6 +121,10 @@ public void commit(WriterCommitMessage[] messages) { protected void commitOperation(SnapshotUpdate operation, int numFiles, String description) { LOG.info("Committing {} with {} files to table {}", description, numFiles, table); + if (numFiles == 0) { + return; + } + if (applicationId != null) { operation.set("spark.app.id", applicationId); } diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index c3d1192926f0..c3b614f06e29 100644 --- a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -83,7 +83,7 @@ public void testDeleteFromEmptyTable() { sql("DELETE FROM %s WHERE dep = 'hr'", tableName); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + Assert.assertEquals("Should have 0 snapshots", 0, Iterables.size(table.snapshots())); assertEquals("Should have expected rows", ImmutableList.of(), @@ -150,10 +150,10 @@ public void testDeleteNonExistingRecords() { sql("DELETE FROM %s AS t WHERE t.id > 10", tableName); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + Assert.assertEquals("Should have 1 snapshots", 1, Iterables.size(table.snapshots())); Snapshot currentSnapshot = table.currentSnapshot(); - validateSnapshot(currentSnapshot, "overwrite", "0", null, null); + validateSnapshot(currentSnapshot, "append", "2", null, "3"); assertEquals("Should have expected rows", ImmutableList.of(row(1, "hr"), row(2, "hardware"), row(null, "hr")), diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 6e1452c32272..8e7e87917287 100644 --- a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -104,7 +104,7 @@ public void testUpdateEmptyTable() { sql("UPDATE %s SET id = -1 WHERE dep = 'hr'", tableName); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + Assert.assertEquals("Should have 0 snapshots", 0, Iterables.size(table.snapshots())); assertEquals("Should have expected rows", ImmutableList.of(), @@ -188,10 +188,10 @@ public void testUpdateNonExistingRecords() { sql("UPDATE %s SET id = -1 WHERE id > 10", tableName); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + Assert.assertEquals("Should have 1 snapshots", 1, Iterables.size(table.snapshots())); Snapshot currentSnapshot = table.currentSnapshot(); - validateSnapshot(currentSnapshot, "overwrite", "0", null, null); + validateSnapshot(currentSnapshot, "append", "2", null, "3"); assertEquals("Should have expected rows", ImmutableList.of(row(1, "hr"), row(2, "hardware"), row(null, "hr")), diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 24475899e283..3be4042ce9b1 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -159,8 +159,12 @@ private WriterFactory createWriterFactory() { return new WriterFactory(tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled); } - private void commitOperation(SnapshotUpdate operation, String description) { + private void commitOperation(SnapshotUpdate operation, int numFiles, String description) { LOG.info("Committing {} to table {}", description, table); + if (numFiles == 0) { + return; + } + if (applicationId != null) { operation.set("spark.app.id", applicationId); } @@ -234,7 +238,7 @@ public void commit(WriterCommitMessage[] messages) { append.appendFile(file); } - commitOperation(append, String.format("append with %d new data files", numFiles)); + commitOperation(append, numFiles, String.format("append with %d new data files", numFiles)); } } @@ -256,7 +260,8 @@ public void commit(WriterCommitMessage[] messages) { dynamicOverwrite.addFile(file); } - commitOperation(dynamicOverwrite, String.format("dynamic partition overwrite with %d new data files", numFiles)); + commitOperation(dynamicOverwrite, numFiles, + String.format("dynamic partition overwrite with %d new data files", numFiles)); } } @@ -279,7 +284,7 @@ public void commit(WriterCommitMessage[] messages) { } String commitMsg = String.format("overwrite by filter %s with %d new data files", overwriteExpr, numFiles); - commitOperation(overwriteFiles, commitMsg); + commitOperation(overwriteFiles, numFiles, commitMsg); } } @@ -350,7 +355,7 @@ private void commitWithSerializableIsolation(OverwriteFiles overwriteFiles, String commitMsg = String.format( "overwrite of %d data files with %d new data files, scanSnapshotId: %d, conflictDetectionFilter: %s", numOverwrittenFiles, numAddedFiles, scanSnapshotId, conflictDetectionFilter); - commitOperation(overwriteFiles, commitMsg); + commitOperation(overwriteFiles, numOverwrittenFiles + numAddedFiles, commitMsg); } private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles, @@ -368,7 +373,7 @@ private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles, String commitMsg = String.format( "overwrite of %d data files with %d new data files", numOverwrittenFiles, numAddedFiles); - commitOperation(overwriteFiles, commitMsg); + commitOperation(overwriteFiles, numOverwrittenFiles + numAddedFiles, commitMsg); } } @@ -420,10 +425,10 @@ public final void commit(long epochId, WriterCommitMessage[] messages) { protected abstract void doCommit(long epochId, WriterCommitMessage[] messages); - protected void commit(SnapshotUpdate snapshotUpdate, long epochId, String description) { + protected void commit(SnapshotUpdate snapshotUpdate, long epochId, int numFiles, String description) { snapshotUpdate.set(QUERY_ID_PROPERTY, queryId); snapshotUpdate.set(EPOCH_ID_PROPERTY, Long.toString(epochId)); - commitOperation(snapshotUpdate, description); + commitOperation(snapshotUpdate, numFiles, description); } private Long findLastCommittedEpochId() { @@ -467,7 +472,7 @@ protected void doCommit(long epochId, WriterCommitMessage[] messages) { append.appendFile(file); numFiles++; } - commit(append, epochId, String.format("streaming append with %d new data files", numFiles)); + commit(append, epochId, numFiles, String.format("streaming append with %d new data files", numFiles)); } } @@ -486,7 +491,8 @@ public void doCommit(long epochId, WriterCommitMessage[] messages) { overwriteFiles.addFile(file); numFiles++; } - commit(overwriteFiles, epochId, String.format("streaming complete overwrite with %d new data files", numFiles)); + commit(overwriteFiles, epochId, numFiles, + String.format("streaming complete overwrite with %d new data files", numFiles)); } }