diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java index bb62f7971139..f816b5d7a4f6 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java @@ -19,11 +19,13 @@ package org.apache.iceberg.actions; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -92,4 +94,19 @@ public long sizeInBytes() { public int numFiles() { return fileScanTasks.size(); } + + public static Comparator comparator(RewriteJobOrder rewriteJobOrder) { + switch (rewriteJobOrder) { + case BYTES_ASC: + return Comparator.comparing(RewriteFileGroup::sizeInBytes); + case BYTES_DESC: + return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder()); + case FILES_ASC: + return Comparator.comparing(RewriteFileGroup::numFiles); + case FILES_DESC: + return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder()); + default: + return (fileGroupOne, fileGroupTwo) -> 0; + } + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 658d3a927984..6b5628a1f4b5 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -22,7 +22,6 @@ import java.math.RoundingMode; import java.util.Arrays; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -84,6 +83,9 @@ public class RewriteDataFilesSparkAction USE_STARTING_SEQUENCE_NUMBER, REWRITE_JOB_ORDER); + private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT = + ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); + private final Table table; private Expression filter = Expressions.alwaysTrue(); @@ -147,7 +149,7 @@ public RewriteDataFilesSparkAction filter(Expression expression) { @Override public RewriteDataFiles.Result execute() { if (table.currentSnapshot() == null) { - return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); + return EMPTY_RESULT; } long startingSnapshotId = table.currentSnapshot().snapshotId(); @@ -159,26 +161,25 @@ public RewriteDataFiles.Result execute() { validateAndInitOptions(); - Map>> fileGroupsByPartition = + StructLikeMap>> fileGroupsByPartition = planFileGroups(startingSnapshotId); RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition); if (ctx.totalGroupCount() == 0) { LOG.info("Nothing found to rewrite in {}", table.name()); - return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); + return EMPTY_RESULT; } Stream groupStream = toGroupStream(ctx, fileGroupsByPartition); - RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId); if (partialProgressEnabled) { - return doExecuteWithPartialProgress(ctx, groupStream, commitManager); + return doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId)); } else { - return doExecute(ctx, groupStream, commitManager); + return doExecute(ctx, groupStream, commitManager(startingSnapshotId)); } } - Map>> planFileGroups(long startingSnapshotId) { + StructLikeMap>> planFileGroups(long startingSnapshotId) { CloseableIterable fileScanTasks = table .newScan() @@ -189,43 +190,9 @@ Map>> planFileGroups(long startingSnapshotId try { StructType partitionType = table.spec().partitionType(); - StructLikeMap> filesByPartition = StructLikeMap.create(partitionType); - StructLike emptyStruct = GenericRecord.create(partitionType); - - fileScanTasks.forEach( - task -> { - // If a task uses an incompatible partition spec the data inside could contain values - // which - // belong to multiple partitions in the current spec. Treating all such files as - // un-partitioned and - // grouping them together helps to minimize new files made. - StructLike taskPartition = - task.file().specId() == table.spec().specId() - ? task.file().partition() - : emptyStruct; - - List files = filesByPartition.get(taskPartition); - if (files == null) { - files = Lists.newArrayList(); - } - - files.add(task); - filesByPartition.put(taskPartition, files); - }); - - StructLikeMap>> fileGroupsByPartition = - StructLikeMap.create(partitionType); - - filesByPartition.forEach( - (partition, tasks) -> { - Iterable> plannedFileGroups = rewriter.planFileGroups(tasks); - List> fileGroups = ImmutableList.copyOf(plannedFileGroups); - if (fileGroups.size() > 0) { - fileGroupsByPartition.put(partition, fileGroups); - } - }); - - return fileGroupsByPartition; + StructLikeMap> filesByPartition = + groupByPartition(partitionType, fileScanTasks); + return fileGroupsByPartition(filesByPartition); } finally { try { fileScanTasks.close(); @@ -235,6 +202,38 @@ Map>> planFileGroups(long startingSnapshotId } } + private StructLikeMap> groupByPartition( + StructType partitionType, Iterable tasks) { + StructLikeMap> filesByPartition = StructLikeMap.create(partitionType); + StructLike emptyStruct = GenericRecord.create(partitionType); + + for (FileScanTask task : tasks) { + // If a task uses an incompatible partition spec the data inside could contain values + // which belong to multiple partitions in the current spec. Treating all such files as + // un-partitioned and grouping them together helps to minimize new files made. + StructLike taskPartition = + task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct; + + List files = filesByPartition.get(taskPartition); + if (files == null) { + files = Lists.newArrayList(); + } + + files.add(task); + filesByPartition.put(taskPartition, files); + } + return filesByPartition; + } + + private StructLikeMap>> fileGroupsByPartition( + StructLikeMap> filesByPartition) { + return filesByPartition.transformValues(this::planFileGroups); + } + + private List> planFileGroups(List tasks) { + return ImmutableList.copyOf(rewriter.planFileGroups(tasks)); + } + @VisibleForTesting RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) { String desc = jobDesc(fileGroup, ctx); @@ -300,7 +299,7 @@ private Result doExecute( Tasks.foreach(rewrittenGroups) .suppressFailureWhenFinished() - .run(group -> commitManager.abortFileGroup(group)); + .run(commitManager::abortFileGroup); throw e; } finally { rewriteService.shutdown(); @@ -332,14 +331,14 @@ private Result doExecuteWithPartialProgress( RewriteDataFilesCommitManager commitManager) { ExecutorService rewriteService = rewriteService(); - // Start Commit Service + // start commit service int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING); RewriteDataFilesCommitManager.CommitService commitService = commitManager.service(groupsPerCommit); commitService.start(); Collection rewriteFailures = new ConcurrentLinkedQueue<>(); - // Start rewrite tasks + // start rewrite tasks Tasks.foreach(groupStream) .suppressFailureWhenFinished() .executeWith(rewriteService) @@ -356,7 +355,7 @@ private Result doExecuteWithPartialProgress( .run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup))); rewriteService.shutdown(); - // Stop Commit service + // stop commit service commitService.close(); List commitResults = commitService.results(); if (commitResults.size() == 0) { @@ -377,45 +376,29 @@ private Result doExecuteWithPartialProgress( } Stream toGroupStream( - RewriteExecutionContext ctx, - Map>> fileGroupsByPartition) { - Stream rewriteFileGroupStream = - fileGroupsByPartition.entrySet().stream() - .flatMap( - e -> { - StructLike partition = e.getKey(); - List> fileGroups = e.getValue(); - return fileGroups.stream() - .map( - tasks -> { - int globalIndex = ctx.currentGlobalIndex(); - int partitionIndex = ctx.currentPartitionIndex(partition); - FileGroupInfo info = - ImmutableRewriteDataFiles.FileGroupInfo.builder() - .globalIndex(globalIndex) - .partitionIndex(partitionIndex) - .partition(partition) - .build(); - return new RewriteFileGroup(info, tasks); - }); - }); - - return rewriteFileGroupStream.sorted(rewriteGroupComparator()); + RewriteExecutionContext ctx, Map>> groupsByPartition) { + return groupsByPartition.entrySet().stream() + .filter(e -> e.getValue().size() != 0) + .flatMap( + e -> { + StructLike partition = e.getKey(); + List> scanGroups = e.getValue(); + return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks)); + }) + .sorted(RewriteFileGroup.comparator(rewriteJobOrder)); } - private Comparator rewriteGroupComparator() { - switch (rewriteJobOrder) { - case BYTES_ASC: - return Comparator.comparing(RewriteFileGroup::sizeInBytes); - case BYTES_DESC: - return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder()); - case FILES_ASC: - return Comparator.comparing(RewriteFileGroup::numFiles); - case FILES_DESC: - return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder()); - default: - return (fileGroupOne, fileGroupTwo) -> 0; - } + private RewriteFileGroup newRewriteGroup( + RewriteExecutionContext ctx, StructLike partition, List tasks) { + int globalIndex = ctx.currentGlobalIndex(); + int partitionIndex = ctx.currentPartitionIndex(partition); + FileGroupInfo info = + ImmutableRewriteDataFiles.FileGroupInfo.builder() + .globalIndex(globalIndex) + .partitionIndex(partitionIndex) + .partition(partition) + .build(); + return new RewriteFileGroup(info, tasks); } void validateAndInitOptions() { @@ -495,15 +478,13 @@ private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) { @VisibleForTesting static class RewriteExecutionContext { - private final Map numGroupsByPartition; + private final StructLikeMap numGroupsByPartition; private final int totalGroupCount; private final Map partitionIndexMap; private final AtomicInteger groupIndex; - RewriteExecutionContext(Map>> fileGroupsByPartition) { - this.numGroupsByPartition = - fileGroupsByPartition.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size())); + RewriteExecutionContext(StructLikeMap>> fileGroupsByPartition) { + this.numGroupsByPartition = fileGroupsByPartition.transformValues(List::size); this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0); this.partitionIndexMap = Maps.newConcurrentMap(); this.groupIndex = new AtomicInteger(1); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 76b8f58d9a20..bf4bef74c3fe 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -100,6 +100,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.StructLikeMap; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.internal.SQLConf; @@ -1390,7 +1391,7 @@ public void testRewriteJobOrderFilesDesc() { private Stream toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) { rewrite.validateAndInitOptions(); - Map>> fileGroupsByPartition = + StructLikeMap>> fileGroupsByPartition = rewrite.planFileGroups(table.currentSnapshot().snapshotId()); return rewrite.toGroupStream(