diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 5f95ef3ed4c9..a5a69dea959e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.math.RoundingMode; import java.util.Arrays; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -83,6 +82,9 @@ public class RewriteDataFilesSparkAction USE_STARTING_SEQUENCE_NUMBER, REWRITE_JOB_ORDER); + private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT = + ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); + private final Table table; private Expression filter = Expressions.alwaysTrue(); @@ -146,7 +148,7 @@ public RewriteDataFilesSparkAction filter(Expression expression) { @Override public RewriteDataFiles.Result execute() { if (table.currentSnapshot() == null) { - return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); + return EMPTY_RESULT; } long startingSnapshotId = table.currentSnapshot().snapshotId(); @@ -158,26 +160,25 @@ public RewriteDataFiles.Result execute() { validateAndInitOptions(); - Map>> fileGroupsByPartition = + StructLikeMap>> fileGroupsByPartition = planFileGroups(startingSnapshotId); RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition); if (ctx.totalGroupCount() == 0) { LOG.info("Nothing found to rewrite in {}", table.name()); - return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); + return EMPTY_RESULT; } Stream groupStream = toGroupStream(ctx, fileGroupsByPartition); - RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId); if (partialProgressEnabled) { - return doExecuteWithPartialProgress(ctx, groupStream, commitManager); + return doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId)); } else { - return doExecute(ctx, groupStream, commitManager); + return doExecute(ctx, groupStream, commitManager(startingSnapshotId)); } } - Map>> planFileGroups(long startingSnapshotId) { + StructLikeMap>> planFileGroups(long startingSnapshotId) { CloseableIterable fileScanTasks = table .newScan() @@ -188,43 +189,9 @@ Map>> planFileGroups(long startingSnapshotId try { StructType partitionType = table.spec().partitionType(); - StructLikeMap> filesByPartition = StructLikeMap.create(partitionType); - StructLike emptyStruct = GenericRecord.create(partitionType); - - fileScanTasks.forEach( - task -> { - // If a task uses an incompatible partition spec the data inside could contain values - // which - // belong to multiple partitions in the current spec. Treating all such files as - // un-partitioned and - // grouping them together helps to minimize new files made. - StructLike taskPartition = - task.file().specId() == table.spec().specId() - ? task.file().partition() - : emptyStruct; - - List files = filesByPartition.get(taskPartition); - if (files == null) { - files = Lists.newArrayList(); - } - - files.add(task); - filesByPartition.put(taskPartition, files); - }); - - StructLikeMap>> fileGroupsByPartition = - StructLikeMap.create(partitionType); - - filesByPartition.forEach( - (partition, tasks) -> { - Iterable> plannedFileGroups = rewriter.planFileGroups(tasks); - List> fileGroups = ImmutableList.copyOf(plannedFileGroups); - if (fileGroups.size() > 0) { - fileGroupsByPartition.put(partition, fileGroups); - } - }); - - return fileGroupsByPartition; + StructLikeMap> filesByPartition = + groupByPartition(partitionType, fileScanTasks); + return fileGroupsByPartition(filesByPartition); } finally { try { fileScanTasks.close(); @@ -234,6 +201,38 @@ Map>> planFileGroups(long startingSnapshotId } } + private StructLikeMap> groupByPartition( + StructType partitionType, Iterable tasks) { + StructLikeMap> filesByPartition = StructLikeMap.create(partitionType); + StructLike emptyStruct = GenericRecord.create(partitionType); + + for (FileScanTask task : tasks) { + // If a task uses an incompatible partition spec the data inside could contain values + // which belong to multiple partitions in the current spec. Treating all such files as + // un-partitioned and grouping them together helps to minimize new files made. + StructLike taskPartition = + task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct; + + List files = filesByPartition.get(taskPartition); + if (files == null) { + files = Lists.newArrayList(); + } + + files.add(task); + filesByPartition.put(taskPartition, files); + } + return filesByPartition; + } + + private StructLikeMap>> fileGroupsByPartition( + StructLikeMap> filesByPartition) { + return filesByPartition.transformValues(this::planFileGroups); + } + + private List> planFileGroups(List tasks) { + return ImmutableList.copyOf(rewriter.planFileGroups(tasks)); + } + @VisibleForTesting RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) { String desc = jobDesc(fileGroup, ctx); @@ -299,7 +298,7 @@ private Result doExecute( Tasks.foreach(rewrittenGroups) .suppressFailureWhenFinished() - .run(group -> commitManager.abortFileGroup(group)); + .run(commitManager::abortFileGroup); throw e; } finally { rewriteService.shutdown(); @@ -331,13 +330,13 @@ private Result doExecuteWithPartialProgress( RewriteDataFilesCommitManager commitManager) { ExecutorService rewriteService = rewriteService(); - // Start Commit Service + // start commit service int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING); RewriteDataFilesCommitManager.CommitService commitService = commitManager.service(groupsPerCommit); commitService.start(); - // Start rewrite tasks + // start rewrite tasks Tasks.foreach(groupStream) .suppressFailureWhenFinished() .executeWith(rewriteService) @@ -348,7 +347,7 @@ private Result doExecuteWithPartialProgress( .run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup))); rewriteService.shutdown(); - // Stop Commit service + // stop commit service commitService.close(); List commitResults = commitService.results(); if (commitResults.size() == 0) { @@ -366,45 +365,29 @@ private Result doExecuteWithPartialProgress( } Stream toGroupStream( - RewriteExecutionContext ctx, - Map>> fileGroupsByPartition) { - Stream rewriteFileGroupStream = - fileGroupsByPartition.entrySet().stream() - .flatMap( - e -> { - StructLike partition = e.getKey(); - List> fileGroups = e.getValue(); - return fileGroups.stream() - .map( - tasks -> { - int globalIndex = ctx.currentGlobalIndex(); - int partitionIndex = ctx.currentPartitionIndex(partition); - FileGroupInfo info = - ImmutableRewriteDataFiles.FileGroupInfo.builder() - .globalIndex(globalIndex) - .partitionIndex(partitionIndex) - .partition(partition) - .build(); - return new RewriteFileGroup(info, tasks); - }); - }); - - return rewriteFileGroupStream.sorted(rewriteGroupComparator()); + RewriteExecutionContext ctx, Map>> groupsByPartition) { + return groupsByPartition.entrySet().stream() + .filter(e -> e.getValue().size() != 0) + .flatMap( + e -> { + StructLike partition = e.getKey(); + List> scanGroups = e.getValue(); + return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks)); + }) + .sorted(RewriteFileGroup.comparator(rewriteJobOrder)); } - private Comparator rewriteGroupComparator() { - switch (rewriteJobOrder) { - case BYTES_ASC: - return Comparator.comparing(RewriteFileGroup::sizeInBytes); - case BYTES_DESC: - return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder()); - case FILES_ASC: - return Comparator.comparing(RewriteFileGroup::numFiles); - case FILES_DESC: - return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder()); - default: - return (fileGroupOne, fileGroupTwo) -> 0; - } + private RewriteFileGroup newRewriteGroup( + RewriteExecutionContext ctx, StructLike partition, List tasks) { + int globalIndex = ctx.currentGlobalIndex(); + int partitionIndex = ctx.currentPartitionIndex(partition); + FileGroupInfo info = + ImmutableRewriteDataFiles.FileGroupInfo.builder() + .globalIndex(globalIndex) + .partitionIndex(partitionIndex) + .partition(partition) + .build(); + return new RewriteFileGroup(info, tasks); } void validateAndInitOptions() { @@ -484,15 +467,13 @@ private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) { @VisibleForTesting static class RewriteExecutionContext { - private final Map numGroupsByPartition; + private final StructLikeMap numGroupsByPartition; private final int totalGroupCount; private final Map partitionIndexMap; private final AtomicInteger groupIndex; - RewriteExecutionContext(Map>> fileGroupsByPartition) { - this.numGroupsByPartition = - fileGroupsByPartition.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size())); + RewriteExecutionContext(StructLikeMap>> fileGroupsByPartition) { + this.numGroupsByPartition = fileGroupsByPartition.transformValues(List::size); this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0); this.partitionIndexMap = Maps.newConcurrentMap(); this.groupIndex = new AtomicInteger(1); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 761284bb56ea..4dba479edd69 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -100,6 +100,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.StructLikeMap; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.Assert; @@ -1383,7 +1384,7 @@ public void testRewriteJobOrderFilesDesc() { private Stream toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) { rewrite.validateAndInitOptions(); - Map>> fileGroupsByPartition = + StructLikeMap>> fileGroupsByPartition = rewrite.planFileGroups(table.currentSnapshot().snapshotId()); return rewrite.toGroupStream(