diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index 34caab4ff44d..bc3497401b78 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -33,6 +33,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; @@ -188,6 +189,49 @@ public void testAppend() throws IOException { Assert.assertEquals("Result rows should match", expected, actual); } + @Test + public void testEmptyOverwrite() throws IOException { + File parent = temp.newFolder(format.toString()); + File location = new File(parent, "test"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List records = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + + List expected = records; + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + + df.select("id", "data").write() + .format("iceberg") + .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) + .mode(SaveMode.Append) + .save(location.toString()); + + Dataset empty = spark.createDataFrame(ImmutableList.of(), SimpleRecord.class); + empty.select("id", "data").write() + .format("iceberg") + .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) + .mode(SaveMode.Overwrite) + .option("overwrite-mode", "dynamic") + .save(location.toString()); + + table.refresh(); + + Dataset result = spark.read() + .format("iceberg") + .load(location.toString()); + + List actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); + Assert.assertEquals("Result rows should match", expected, actual); + } + @Test public void testOverwrite() throws IOException { File parent = temp.newFolder(format.toString()); diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java index 4a7e0c24cfa1..8f093210223e 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -184,10 +184,17 @@ private void append(WriterCommitMessage[] messages) { } private void replacePartitions(WriterCommitMessage[] messages) { + Iterable files = files(messages); + + if (!files.iterator().hasNext()) { + LOG.info("Dyanmic overwrite is empty, skipping commit"); + return; + } + ReplacePartitions dynamicOverwrite = table.newReplacePartitions(); int numFiles = 0; - for (DataFile file : files(messages)) { + for (DataFile file : files) { numFiles += 1; dynamicOverwrite.addFile(file); } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index a3da366768de..4cc37c8cfd7f 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -262,10 +262,17 @@ public void commit(WriterCommitMessage[] messages) { private class DynamicOverwrite extends BaseBatchWrite { @Override public void commit(WriterCommitMessage[] messages) { + Iterable files = files(messages); + + if (!files.iterator().hasNext()) { + LOG.info("Dynamic overwrite is empty, skipping commit"); + return; + } + ReplacePartitions dynamicOverwrite = table.newReplacePartitions(); int numFiles = 0; - for (DataFile file : files(messages)) { + for (DataFile file : files) { numFiles += 1; dynamicOverwrite.addFile(file); }