Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -83,6 +82,9 @@ public class RewriteDataFilesSparkAction
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER);

private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();

private final Table table;

private Expression filter = Expressions.alwaysTrue();
Expand Down Expand Up @@ -146,7 +148,7 @@ public RewriteDataFilesSparkAction filter(Expression expression) {
@Override
public RewriteDataFiles.Result execute() {
if (table.currentSnapshot() == null) {
return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
return EMPTY_RESULT;
}

long startingSnapshotId = table.currentSnapshot().snapshotId();
Expand All @@ -158,26 +160,25 @@ public RewriteDataFiles.Result execute() {

validateAndInitOptions();

Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
planFileGroups(startingSnapshotId);
RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);

if (ctx.totalGroupCount() == 0) {
LOG.info("Nothing found to rewrite in {}", table.name());
return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
return EMPTY_RESULT;
}

Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);

RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId);
if (partialProgressEnabled) {
return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
return doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId));
} else {
return doExecute(ctx, groupStream, commitManager);
return doExecute(ctx, groupStream, commitManager(startingSnapshotId));
}
}

Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
CloseableIterable<FileScanTask> fileScanTasks =
table
.newScan()
Expand All @@ -188,43 +189,9 @@ Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId

try {
StructType partitionType = table.spec().partitionType();
StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
StructLike emptyStruct = GenericRecord.create(partitionType);

fileScanTasks.forEach(
task -> {
// If a task uses an incompatible partition spec the data inside could contain values
// which
// belong to multiple partitions in the current spec. Treating all such files as
// un-partitioned and
// grouping them together helps to minimize new files made.
StructLike taskPartition =
task.file().specId() == table.spec().specId()
? task.file().partition()
: emptyStruct;

List<FileScanTask> files = filesByPartition.get(taskPartition);
if (files == null) {
files = Lists.newArrayList();
}

files.add(task);
filesByPartition.put(taskPartition, files);
});

StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
StructLikeMap.create(partitionType);

filesByPartition.forEach(
(partition, tasks) -> {
Iterable<List<FileScanTask>> plannedFileGroups = rewriter.planFileGroups(tasks);
List<List<FileScanTask>> fileGroups = ImmutableList.copyOf(plannedFileGroups);
if (fileGroups.size() > 0) {
fileGroupsByPartition.put(partition, fileGroups);
}
});

return fileGroupsByPartition;
StructLikeMap<List<FileScanTask>> filesByPartition =
groupByPartition(partitionType, fileScanTasks);
return fileGroupsByPartition(filesByPartition);
} finally {
try {
fileScanTasks.close();
Expand All @@ -234,6 +201,38 @@ Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId
}
}

private StructLikeMap<List<FileScanTask>> groupByPartition(
StructType partitionType, Iterable<FileScanTask> tasks) {
StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
StructLike emptyStruct = GenericRecord.create(partitionType);

for (FileScanTask task : tasks) {
// If a task uses an incompatible partition spec the data inside could contain values
// which belong to multiple partitions in the current spec. Treating all such files as
// un-partitioned and grouping them together helps to minimize new files made.
StructLike taskPartition =
task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct;

List<FileScanTask> files = filesByPartition.get(taskPartition);
if (files == null) {
files = Lists.newArrayList();
}

files.add(task);
filesByPartition.put(taskPartition, files);
}
return filesByPartition;
}

private StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition(
StructLikeMap<List<FileScanTask>> filesByPartition) {
return filesByPartition.transformValues(this::planFileGroups);
}

private List<List<FileScanTask>> planFileGroups(List<FileScanTask> tasks) {
return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
}

@VisibleForTesting
RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) {
String desc = jobDesc(fileGroup, ctx);
Expand Down Expand Up @@ -299,7 +298,7 @@ private Result doExecute(

Tasks.foreach(rewrittenGroups)
.suppressFailureWhenFinished()
.run(group -> commitManager.abortFileGroup(group));
.run(commitManager::abortFileGroup);
throw e;
} finally {
rewriteService.shutdown();
Expand Down Expand Up @@ -331,13 +330,13 @@ private Result doExecuteWithPartialProgress(
RewriteDataFilesCommitManager commitManager) {
ExecutorService rewriteService = rewriteService();

// Start Commit Service
// start commit service
int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING);
RewriteDataFilesCommitManager.CommitService commitService =
commitManager.service(groupsPerCommit);
commitService.start();

// Start rewrite tasks
// start rewrite tasks
Tasks.foreach(groupStream)
.suppressFailureWhenFinished()
.executeWith(rewriteService)
Expand All @@ -348,7 +347,7 @@ private Result doExecuteWithPartialProgress(
.run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
rewriteService.shutdown();

// Stop Commit service
// stop commit service
commitService.close();
List<RewriteFileGroup> commitResults = commitService.results();
if (commitResults.size() == 0) {
Expand All @@ -366,45 +365,29 @@ private Result doExecuteWithPartialProgress(
}

Stream<RewriteFileGroup> toGroupStream(
RewriteExecutionContext ctx,
Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
Stream<RewriteFileGroup> rewriteFileGroupStream =
fileGroupsByPartition.entrySet().stream()
.flatMap(
e -> {
StructLike partition = e.getKey();
List<List<FileScanTask>> fileGroups = e.getValue();
return fileGroups.stream()
.map(
tasks -> {
int globalIndex = ctx.currentGlobalIndex();
int partitionIndex = ctx.currentPartitionIndex(partition);
FileGroupInfo info =
ImmutableRewriteDataFiles.FileGroupInfo.builder()
.globalIndex(globalIndex)
.partitionIndex(partitionIndex)
.partition(partition)
.build();
return new RewriteFileGroup(info, tasks);
});
});

return rewriteFileGroupStream.sorted(rewriteGroupComparator());
RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Not related to this pr). This is my fault, but I think this should be 'StructLikeMap' all across to be safer, I guess we can do it later.

return groupsByPartition.entrySet().stream()
.filter(e -> e.getValue().size() != 0)
.flatMap(
e -> {
StructLike partition = e.getKey();
List<List<FileScanTask>> scanGroups = e.getValue();
return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks));
})
.sorted(RewriteFileGroup.comparator(rewriteJobOrder));
}

private Comparator<RewriteFileGroup> rewriteGroupComparator() {
switch (rewriteJobOrder) {
case BYTES_ASC:
return Comparator.comparing(RewriteFileGroup::sizeInBytes);
case BYTES_DESC:
return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
case FILES_ASC:
return Comparator.comparing(RewriteFileGroup::numFiles);
case FILES_DESC:
return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
default:
return (fileGroupOne, fileGroupTwo) -> 0;
}
private RewriteFileGroup newRewriteGroup(
RewriteExecutionContext ctx, StructLike partition, List<FileScanTask> tasks) {
int globalIndex = ctx.currentGlobalIndex();
int partitionIndex = ctx.currentPartitionIndex(partition);
FileGroupInfo info =
ImmutableRewriteDataFiles.FileGroupInfo.builder()
.globalIndex(globalIndex)
.partitionIndex(partitionIndex)
.partition(partition)
.build();
return new RewriteFileGroup(info, tasks);
}

void validateAndInitOptions() {
Expand Down Expand Up @@ -484,15 +467,13 @@ private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) {

@VisibleForTesting
static class RewriteExecutionContext {
private final Map<StructLike, Integer> numGroupsByPartition;
private final StructLikeMap<Integer> numGroupsByPartition;
private final int totalGroupCount;
private final Map<StructLike, Integer> partitionIndexMap;
private final AtomicInteger groupIndex;

RewriteExecutionContext(Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
this.numGroupsByPartition =
fileGroupsByPartition.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
RewriteExecutionContext(StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition) {
this.numGroupsByPartition = fileGroupsByPartition.transformValues(List::size);
this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0);
this.partitionIndexMap = Maps.newConcurrentMap();
this.groupIndex = new AtomicInteger(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.Assert;
Expand Down Expand Up @@ -1383,7 +1384,7 @@ public void testRewriteJobOrderFilesDesc() {

private Stream<RewriteFileGroup> toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
rewrite.planFileGroups(table.currentSnapshot().snapshotId());

return rewrite.toGroupStream(
Expand Down