Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.iceberg.actions;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -92,4 +94,19 @@ public long sizeInBytes() {
public int numFiles() {
return fileScanTasks.size();
}

public static Comparator<RewriteFileGroup> comparator(RewriteJobOrder rewriteJobOrder) {
switch (rewriteJobOrder) {
case BYTES_ASC:
return Comparator.comparing(RewriteFileGroup::sizeInBytes);
case BYTES_DESC:
return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
case FILES_ASC:
return Comparator.comparing(RewriteFileGroup::numFiles);
case FILES_DESC:
return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
default:
return (fileGroupOne, fileGroupTwo) -> 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -84,6 +83,9 @@ public class RewriteDataFilesSparkAction
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER);

private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();

private final Table table;

private Expression filter = Expressions.alwaysTrue();
Expand Down Expand Up @@ -147,7 +149,7 @@ public RewriteDataFilesSparkAction filter(Expression expression) {
@Override
public RewriteDataFiles.Result execute() {
if (table.currentSnapshot() == null) {
return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
return EMPTY_RESULT;
}

long startingSnapshotId = table.currentSnapshot().snapshotId();
Expand All @@ -159,26 +161,25 @@ public RewriteDataFiles.Result execute() {

validateAndInitOptions();

Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
Copy link
Contributor

@aokolnychyi aokolnychyi May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add EMPTY_RESULT static variable like in the action for position deletes? There are two return statements in this method (not this line).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. Added.

StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
planFileGroups(startingSnapshotId);
RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);

if (ctx.totalGroupCount() == 0) {
LOG.info("Nothing found to rewrite in {}", table.name());
return ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
return EMPTY_RESULT;
}

Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);

RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId);
if (partialProgressEnabled) {
return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
return doExecuteWithPartialProgress(ctx, groupStream, commitManager(startingSnapshotId));
Copy link
Member

@szehon-ho szehon-ho May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess i'd personally put the variable back, as in the RewriteDeleteFilesSparkAction case it was just a call without arguments, but here there are arguments so probably is more clearer if we assign it separately, but not a big deal

Copy link
Member Author

@ajantha-bhat ajantha-bhat May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I too thought about it during the change.
Since it is in if-else, I don't think it will make a huge difference.

I will leave it as it is (because the style looks similar to RewriteDeleteFilesSparkAction)

} else {
return doExecute(ctx, groupStream, commitManager);
return doExecute(ctx, groupStream, commitManager(startingSnapshotId));
}
}

Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
Copy link
Member

@szehon-ho szehon-ho May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't comment on the exact line , but on line 196-304 below , initially you had done some refactoring there. I see it was reverted as @aokolnychyi said we can't coerce partition to change the behavior in this discussion: #7630 (comment)

That is correct, but we can still do some refactoring without coerce. I took a brief look and came up with this to maintain the logic:

First add the methods:

private StructLikeMap<List<FileScanTask>> groupByPartition(
      StructType partitionType, Iterable<FileScanTask> tasks) {
    StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
    StructLike emptyStruct = GenericRecord.create(partitionType);

    for (FileScanTask task : tasks) {
      // If a task uses an incompatible partition spec the data inside could contain values
      // which belong to multiple partitions in the current spec. Treating all such files as
      // un-partitioned and grouping them together helps to minimize new files made.
      StructLike taskPartition =
          task.file().specId() == table.spec().specId()
              ? task.file().partition()
              : emptyStruct;

      List<FileScanTask> files = filesByPartition.get(taskPartition);
      if (files == null) {
        files = Lists.newArrayList();
      }

      files.add(task);
      filesByPartition.put(taskPartition, files);
    }
    return filesByPartition;
  }

  private StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition(StructLikeMap<List<FileScanTask>> filesByPartition) {
    return filesByPartition.transformValues(this::planFileGroups);
  }

  private List<List<FileScanTask>> planFileGroups(List<FileScanTask> tasks) {
    return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
  }

and then we can make this method like:

  Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
    CloseableIterable<FileScanTask> fileScanTasks =
        table
            .newScan()
            .useSnapshot(startingSnapshotId)
            .filter(filter)
            .ignoreResiduals()
            .planFiles();

    try {
      StructType partitionType = table.spec().partitionType();
      StructLikeMap<List<FileScanTask>> filesByPartition = groupByPartition(partitionType, fileScanTasks);
      return fileGroupsByPartition(filesByPartition);
    } finally {
      try {
        fileScanTasks.close();
      } catch (IOException io) {
        LOG.error("Cannot properly close file iterable while planning for rewrite", io);
      }
    }
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I tried it before. Have you checked this comment?
#7630 (comment)

If I use transformValues, it cannot check the existing fileGroups.size() > 0 check due to generics. Are we ok with that? That's why I didn't do

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I tried it before. Have you checked this comment?
#7630 (comment)

Yea I saw that, so hence my suggestion did not use coerce, but rather the table's latest partitionType. It should be exactly the same as the existing code now?

If I use transformValues, it cannot check the existing fileGroups.size() > 0 check due to generics. Are we ok with that? That's why I didn't do

Yea , in RewritePositionDeleteFilesSparkAction, we've moved that check as part of toGroupStream, which I think you also copied here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've moved that check as part of toGroupStream

I can see this now. SGTM.
I have addressed the suggestion.

CloseableIterable<FileScanTask> fileScanTasks =
table
.newScan()
Expand All @@ -189,43 +190,9 @@ Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId

try {
StructType partitionType = table.spec().partitionType();
StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
StructLike emptyStruct = GenericRecord.create(partitionType);

fileScanTasks.forEach(
task -> {
// If a task uses an incompatible partition spec the data inside could contain values
// which
// belong to multiple partitions in the current spec. Treating all such files as
// un-partitioned and
// grouping them together helps to minimize new files made.
StructLike taskPartition =
task.file().specId() == table.spec().specId()
? task.file().partition()
: emptyStruct;

List<FileScanTask> files = filesByPartition.get(taskPartition);
if (files == null) {
files = Lists.newArrayList();
}

files.add(task);
filesByPartition.put(taskPartition, files);
});

StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
StructLikeMap.create(partitionType);

filesByPartition.forEach(
(partition, tasks) -> {
Iterable<List<FileScanTask>> plannedFileGroups = rewriter.planFileGroups(tasks);
List<List<FileScanTask>> fileGroups = ImmutableList.copyOf(plannedFileGroups);
if (fileGroups.size() > 0) {
fileGroupsByPartition.put(partition, fileGroups);
}
});

return fileGroupsByPartition;
StructLikeMap<List<FileScanTask>> filesByPartition =
groupByPartition(partitionType, fileScanTasks);
return fileGroupsByPartition(filesByPartition);
} finally {
try {
fileScanTasks.close();
Expand All @@ -235,6 +202,38 @@ Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId
}
}

private StructLikeMap<List<FileScanTask>> groupByPartition(
StructType partitionType, Iterable<FileScanTask> tasks) {
StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
Copy link
Contributor

@aokolnychyi aokolnychyi May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should continue to use table.spec() and we don't need coercePartition. The old behavior of grouping all files from an old spec should be preserved. The two actions behave differently w.r.t. output partitioning.

Copy link
Member Author

@ajantha-bhat ajantha-bhat May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats strange. So, we don't have enough testcases to catch this behaviour?

I have reverted it back now. Also, I didn't use transform in the end of planFileGroups because it needs a special check of fileGroups.size() > 0 which is not handled in transform due to generics

StructLike emptyStruct = GenericRecord.create(partitionType);

for (FileScanTask task : tasks) {
// If a task uses an incompatible partition spec the data inside could contain values
// which belong to multiple partitions in the current spec. Treating all such files as
// un-partitioned and grouping them together helps to minimize new files made.
StructLike taskPartition =
task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct;

List<FileScanTask> files = filesByPartition.get(taskPartition);
if (files == null) {
files = Lists.newArrayList();
}

files.add(task);
filesByPartition.put(taskPartition, files);
}
return filesByPartition;
}

private StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition(
StructLikeMap<List<FileScanTask>> filesByPartition) {
return filesByPartition.transformValues(this::planFileGroups);
}

private List<List<FileScanTask>> planFileGroups(List<FileScanTask> tasks) {
return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
}

@VisibleForTesting
RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup fileGroup) {
String desc = jobDesc(fileGroup, ctx);
Expand Down Expand Up @@ -300,7 +299,7 @@ private Result doExecute(

Tasks.foreach(rewrittenGroups)
.suppressFailureWhenFinished()
.run(group -> commitManager.abortFileGroup(group));
.run(commitManager::abortFileGroup);
throw e;
} finally {
rewriteService.shutdown();
Expand Down Expand Up @@ -332,14 +331,14 @@ private Result doExecuteWithPartialProgress(
RewriteDataFilesCommitManager commitManager) {
ExecutorService rewriteService = rewriteService();

// Start Commit Service
// start commit service
int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING);
RewriteDataFilesCommitManager.CommitService commitService =
commitManager.service(groupsPerCommit);
commitService.start();

Collection<FileGroupFailureResult> rewriteFailures = new ConcurrentLinkedQueue<>();
// Start rewrite tasks
// start rewrite tasks
Tasks.foreach(groupStream)
.suppressFailureWhenFinished()
.executeWith(rewriteService)
Expand All @@ -356,7 +355,7 @@ private Result doExecuteWithPartialProgress(
.run(fileGroup -> commitService.offer(rewriteFiles(ctx, fileGroup)));
rewriteService.shutdown();

// Stop Commit service
// stop commit service
commitService.close();
List<RewriteFileGroup> commitResults = commitService.results();
if (commitResults.size() == 0) {
Expand All @@ -377,45 +376,29 @@ private Result doExecuteWithPartialProgress(
}

Stream<RewriteFileGroup> toGroupStream(
RewriteExecutionContext ctx,
Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
Stream<RewriteFileGroup> rewriteFileGroupStream =
fileGroupsByPartition.entrySet().stream()
.flatMap(
e -> {
StructLike partition = e.getKey();
List<List<FileScanTask>> fileGroups = e.getValue();
return fileGroups.stream()
.map(
tasks -> {
int globalIndex = ctx.currentGlobalIndex();
int partitionIndex = ctx.currentPartitionIndex(partition);
FileGroupInfo info =
ImmutableRewriteDataFiles.FileGroupInfo.builder()
.globalIndex(globalIndex)
.partitionIndex(partitionIndex)
.partition(partition)
.build();
return new RewriteFileGroup(info, tasks);
});
});

return rewriteFileGroupStream.sorted(rewriteGroupComparator());
RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) {
return groupsByPartition.entrySet().stream()
.filter(e -> e.getValue().size() != 0)
.flatMap(
e -> {
StructLike partition = e.getKey();
List<List<FileScanTask>> scanGroups = e.getValue();
return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks));
})
.sorted(RewriteFileGroup.comparator(rewriteJobOrder));
}

private Comparator<RewriteFileGroup> rewriteGroupComparator() {
switch (rewriteJobOrder) {
case BYTES_ASC:
return Comparator.comparing(RewriteFileGroup::sizeInBytes);
case BYTES_DESC:
return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
case FILES_ASC:
return Comparator.comparing(RewriteFileGroup::numFiles);
case FILES_DESC:
return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
default:
return (fileGroupOne, fileGroupTwo) -> 0;
}
private RewriteFileGroup newRewriteGroup(
RewriteExecutionContext ctx, StructLike partition, List<FileScanTask> tasks) {
int globalIndex = ctx.currentGlobalIndex();
int partitionIndex = ctx.currentPartitionIndex(partition);
FileGroupInfo info =
ImmutableRewriteDataFiles.FileGroupInfo.builder()
.globalIndex(globalIndex)
.partitionIndex(partitionIndex)
.partition(partition)
.build();
return new RewriteFileGroup(info, tasks);
}

void validateAndInitOptions() {
Expand Down Expand Up @@ -495,15 +478,13 @@ private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) {

@VisibleForTesting
static class RewriteExecutionContext {
private final Map<StructLike, Integer> numGroupsByPartition;
private final StructLikeMap<Integer> numGroupsByPartition;
private final int totalGroupCount;
private final Map<StructLike, Integer> partitionIndexMap;
private final AtomicInteger groupIndex;

RewriteExecutionContext(Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
this.numGroupsByPartition =
fileGroupsByPartition.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size()));
RewriteExecutionContext(StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition) {
this.numGroupsByPartition = fileGroupsByPartition.transformValues(List::size);
this.totalGroupCount = numGroupsByPartition.values().stream().reduce(Integer::sum).orElse(0);
this.partitionIndexMap = Maps.newConcurrentMap();
this.groupIndex = new AtomicInteger(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.internal.SQLConf;
Expand Down Expand Up @@ -1390,7 +1391,7 @@ public void testRewriteJobOrderFilesDesc() {

private Stream<RewriteFileGroup> toGroupStream(Table table, RewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition =
rewrite.planFileGroups(table.currentSnapshot().snapshotId());

return rewrite.toGroupStream(
Expand Down