diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index c73ea300c7de..20e75159b1e8 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -23,6 +23,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.immutables.value.Value; /** @@ -181,6 +182,11 @@ default RewriteDataFiles zOrder(String... columns) { interface Result { List rewriteResults(); + @Value.Default + default List rewriteFailures() { + return ImmutableList.of(); + } + @Value.Default default int addedDataFilesCount() { return rewriteResults().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum(); @@ -197,6 +203,11 @@ default int rewrittenDataFilesCount() { default long rewrittenBytesCount() { return rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum(); } + + @Value.Default + default int failedDataFilesCount() { + return rewriteFailures().stream().mapToInt(FileGroupFailureResult::dataFilesCount).sum(); + } } /** @@ -217,6 +228,14 @@ default long rewrittenBytesCount() { } } + /** For a file group that failed to rewrite. */ + @Value.Immutable + interface FileGroupFailureResult { + FileGroupInfo info(); + + int dataFilesCount(); + } + /** * A description of a file group, when it was processed, and within which partition. For use * tracking rewrite operations and for returning results. diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 6cda93f8674e..3c07e676a53b 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -83,7 +83,7 @@ public void testZOrderSortExpression() { public void testRewriteDataFilesInEmptyTable() { createTable(); List output = sql("CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent); - assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L)), output); + assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L, 0)), output); } @Test @@ -101,7 +101,7 @@ public void testRewriteDataFilesOnPartitionTable() { row(10, 2), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isInstanceOf(Long.class) .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); @@ -125,7 +125,7 @@ public void testRewriteDataFilesOnNonPartitionTable() { row(10, 1), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isInstanceOf(Long.class) .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); @@ -149,7 +149,7 @@ public void testRewriteDataFilesWithOptions() { assertEquals( "Action should rewrite 0 data files and add 0 data files", - ImmutableList.of(row(0, 0, 0L)), + ImmutableList.of(row(0, 0, 0L, 0)), output); List actualRecords = currentData(); @@ -175,7 +175,7 @@ public void testRewriteDataFilesWithSortStrategy() { row(10, 1), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isInstanceOf(Long.class) .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); @@ -202,7 +202,7 @@ public void testRewriteDataFilesWithZOrder() { row(10, 1), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isInstanceOf(Long.class) .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); @@ -244,7 +244,7 @@ public void testRewriteDataFilesWithFilter() { row(5, 1), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isInstanceOf(Long.class) .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); @@ -272,7 +272,7 @@ public void testRewriteDataFilesWithFilterOnPartitionTable() { row(5, 1), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isInstanceOf(Long.class) .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); @@ -300,7 +300,7 @@ public void testRewriteDataFilesWithInFilterOnPartitionTable() { row(5, 1), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isInstanceOf(Long.class) .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); @@ -540,7 +540,7 @@ public void testBinPackTableWithSpecialChars() { row(10, 1), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isEqualTo( Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); @@ -579,7 +579,7 @@ public void testSortTableWithSpecialChars() { row(10, 1), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isInstanceOf(Long.class) .isEqualTo( @@ -619,7 +619,7 @@ public void testZOrderTableWithSpecialChars() { row(10, 1), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isInstanceOf(Long.class) .isEqualTo( @@ -655,7 +655,7 @@ public void testDefaultSortOrder() { row(2, 1), Arrays.copyOf(output.get(0), 2)); // verify rewritten bytes separately - assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)).hasSize(4); assertThat(output.get(0)[2]) .isInstanceOf(Long.class) .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 5f95ef3ed4c9..658d3a927984 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.math.RoundingMode; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -337,14 +338,21 @@ private Result doExecuteWithPartialProgress( commitManager.service(groupsPerCommit); commitService.start(); + Collection rewriteFailures = new ConcurrentLinkedQueue<>(); // Start rewrite tasks Tasks.foreach(groupStream) .suppressFailureWhenFinished() .executeWith(rewriteService) .noRetry() .onFailure( - (fileGroup, exception) -> - LOG.error("Failure during rewrite group {}", fileGroup.info(), exception)) + (fileGroup, exception) -> { + LOG.error("Failure during rewrite group {}", fileGroup.info(), exception); + rewriteFailures.add( + ImmutableRewriteDataFiles.FileGroupFailureResult.builder() + .info(fileGroup.info()) + .dataFilesCount(fileGroup.numFiles()) + .build()); + }) .run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup))); rewriteService.shutdown(); @@ -362,7 +370,10 @@ private Result doExecuteWithPartialProgress( List rewriteResults = commitResults.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList()); - return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build(); + return ImmutableRewriteDataFiles.Result.builder() + .rewriteResults(rewriteResults) + .rewriteFailures(rewriteFailures) + .build(); } Stream toGroupStream( diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java index 1aea61e74785..3929e346c334 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java @@ -68,7 +68,9 @@ class RewriteDataFilesProcedure extends BaseProcedure { "rewritten_data_files_count", DataTypes.IntegerType, false, Metadata.empty()), new StructField( "added_data_files_count", DataTypes.IntegerType, false, Metadata.empty()), - new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()) + new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()), + new StructField( + "failed_data_files_count", DataTypes.IntegerType, false, Metadata.empty()) }); public static ProcedureBuilder builder() { @@ -216,8 +218,14 @@ private InternalRow[] toOutputRows(RewriteDataFiles.Result result) { int rewrittenDataFilesCount = result.rewrittenDataFilesCount(); long rewrittenBytesCount = result.rewrittenBytesCount(); int addedDataFilesCount = result.addedDataFilesCount(); + int failedDataFilesCount = result.failedDataFilesCount(); + InternalRow row = - newInternalRow(rewrittenDataFilesCount, addedDataFilesCount, rewrittenBytesCount); + newInternalRow( + rewrittenDataFilesCount, + addedDataFilesCount, + rewrittenBytesCount, + failedDataFilesCount); return new InternalRow[] {row}; } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index a638776033ae..76b8f58d9a20 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -755,7 +755,9 @@ public void testPartialProgressWithRewriteFailure() { RewriteDataFiles.Result result = spyRewrite.execute(); - Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7); + assertThat(result.rewriteResults()).hasSize(7); + assertThat(result.rewriteFailures()).hasSize(3); + assertThat(result.failedDataFilesCount()).isEqualTo(6); assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); table.refresh(); @@ -796,7 +798,9 @@ public void testParallelPartialProgressWithRewriteFailure() { RewriteDataFiles.Result result = spyRewrite.execute(); - Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7); + assertThat(result.rewriteResults()).hasSize(7); + assertThat(result.rewriteFailures()).hasSize(3); + assertThat(result.failedDataFilesCount()).isEqualTo(6); assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); table.refresh();