Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.immutables.value.Value;

/**
Expand Down Expand Up @@ -181,6 +182,11 @@ default RewriteDataFiles zOrder(String... columns) {
interface Result {
List<FileGroupRewriteResult> rewriteResults();

@Value.Default
default List<FileGroupFailureResult> rewriteFailures() {
return ImmutableList.of();
}

@Value.Default
default int addedDataFilesCount() {
return rewriteResults().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum();
Expand All @@ -197,6 +203,11 @@ default int rewrittenDataFilesCount() {
default long rewrittenBytesCount() {
return rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum();
}

@Value.Default
Copy link
Member

@szehon-ho szehon-ho Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of re-using the FileGroupRewriteResult, would it make more sense to have a FailedFileGroupRewriteResult, with just failure?

I dont think the bytesCount/fileCount make too much sense on failure, and we can also put the exception there? cc @aokolnychyi @RussellSpitzer for thoughts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok that have a FailedFileGroupRewriteResult. What about the following data structure?
FailedFileGroupRewriteResult {
FileGroupInfo info();
boolean isFailedGroup()
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we would need a "boolean" wouldn't it be this class because it failed?

Copy link
Member

@RussellSpitzer RussellSpitzer Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably have result have two new interfaces, something like

List<FileGroupFailureResult> rewriteFailures();

default long failedGroupCount() {
      return rewriteFailures().size();
}

default long failedDataFilesRewrites() {
    return rewriteFailures().stream().mapToInt(FailedFileGroup::filescount).sum()
}

Then we won't mix failures and successes in the same list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @RussellSpitzer reply,
Yes, "boolean" is redundant, They are great advice, I wiil fix them according to your advice.

default int failedDataFilesCount() {
return rewriteFailures().stream().mapToInt(FileGroupFailureResult::dataFilesCount).sum();
}
}

/**
Expand All @@ -217,6 +228,14 @@ default long rewrittenBytesCount() {
}
}

/** For a file group that failed to rewrite. */
@Value.Immutable
interface FileGroupFailureResult {
FileGroupInfo info();

int dataFilesCount();
}

/**
* A description of a file group, when it was processed, and within which partition. For use
* tracking rewrite operations and for returning results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testZOrderSortExpression() {
public void testRewriteDataFilesInEmptyTable() {
createTable();
List<Object[]> output = sql("CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L)), output);
assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 0L, 0)), output);
}

@Test
Expand All @@ -101,7 +101,7 @@ public void testRewriteDataFilesOnPartitionTable() {
row(10, 2),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
Expand All @@ -125,7 +125,7 @@ public void testRewriteDataFilesOnNonPartitionTable() {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
Expand All @@ -149,7 +149,7 @@ public void testRewriteDataFilesWithOptions() {

assertEquals(
"Action should rewrite 0 data files and add 0 data files",
ImmutableList.of(row(0, 0, 0L)),
ImmutableList.of(row(0, 0, 0L, 0)),
output);

List<Object[]> actualRecords = currentData();
Expand All @@ -175,7 +175,7 @@ public void testRewriteDataFilesWithSortStrategy() {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
Expand All @@ -202,7 +202,7 @@ public void testRewriteDataFilesWithZOrder() {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
Expand Down Expand Up @@ -244,7 +244,7 @@ public void testRewriteDataFilesWithFilter() {
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
Expand Down Expand Up @@ -272,7 +272,7 @@ public void testRewriteDataFilesWithFilterOnPartitionTable() {
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
Expand Down Expand Up @@ -300,7 +300,7 @@ public void testRewriteDataFilesWithInFilterOnPartitionTable() {
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
Expand Down Expand Up @@ -540,7 +540,7 @@ public void testBinPackTableWithSpecialChars() {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isEqualTo(
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
Expand Down Expand Up @@ -579,7 +579,7 @@ public void testSortTableWithSpecialChars() {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(
Expand Down Expand Up @@ -619,7 +619,7 @@ public void testZOrderTableWithSpecialChars() {
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(
Expand Down Expand Up @@ -655,7 +655,7 @@ public void testDefaultSortOrder() {
row(2, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -337,14 +338,21 @@ private Result doExecuteWithPartialProgress(
commitManager.service(groupsPerCommit);
commitService.start();

Collection<FileGroupFailureResult> rewriteFailures = new ConcurrentLinkedQueue<>();
// Start rewrite tasks
Tasks.foreach(groupStream)
.suppressFailureWhenFinished()
.executeWith(rewriteService)
.noRetry()
.onFailure(
(fileGroup, exception) ->
LOG.error("Failure during rewrite group {}", fileGroup.info(), exception))
(fileGroup, exception) -> {
LOG.error("Failure during rewrite group {}", fileGroup.info(), exception);
rewriteFailures.add(
ImmutableRewriteDataFiles.FileGroupFailureResult.builder()
.info(fileGroup.info())
.dataFilesCount(fileGroup.numFiles())
.build());
})
.run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
rewriteService.shutdown();

Expand All @@ -362,7 +370,10 @@ private Result doExecuteWithPartialProgress(

List<FileGroupRewriteResult> rewriteResults =
commitResults.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build();
return ImmutableRewriteDataFiles.Result.builder()
.rewriteResults(rewriteResults)
.rewriteFailures(rewriteFailures)
.build();
}

Stream<RewriteFileGroup> toGroupStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class RewriteDataFilesProcedure extends BaseProcedure {
"rewritten_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"added_data_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty())
new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
new StructField(
"failed_data_files_count", DataTypes.IntegerType, false, Metadata.empty())
});

public static ProcedureBuilder builder() {
Expand Down Expand Up @@ -216,8 +218,14 @@ private InternalRow[] toOutputRows(RewriteDataFiles.Result result) {
int rewrittenDataFilesCount = result.rewrittenDataFilesCount();
long rewrittenBytesCount = result.rewrittenBytesCount();
int addedDataFilesCount = result.addedDataFilesCount();
int failedDataFilesCount = result.failedDataFilesCount();

InternalRow row =
newInternalRow(rewrittenDataFilesCount, addedDataFilesCount, rewrittenBytesCount);
newInternalRow(
rewrittenDataFilesCount,
addedDataFilesCount,
rewrittenBytesCount,
failedDataFilesCount);
return new InternalRow[] {row};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,9 @@ public void testPartialProgressWithRewriteFailure() {

RewriteDataFiles.Result result = spyRewrite.execute();

Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7);
assertThat(result.rewriteResults()).hasSize(7);
assertThat(result.rewriteFailures()).hasSize(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you also update the line above please?

assertThat(result.failedDataFilesCount()).isEqualTo(6);
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);

table.refresh();
Expand Down Expand Up @@ -796,7 +798,9 @@ public void testParallelPartialProgressWithRewriteFailure() {

RewriteDataFiles.Result result = spyRewrite.execute();

Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7);
assertThat(result.rewriteResults()).hasSize(7);
assertThat(result.rewriteFailures()).hasSize(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you also update the line above please?

assertThat(result.failedDataFilesCount()).isEqualTo(6);
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);

table.refresh();
Expand Down