diff --git a/api/src/main/java/org/apache/iceberg/SortOrder.java b/api/src/main/java/org/apache/iceberg/SortOrder.java index b443eb2dca55..2774aaf85698 100644 --- a/api/src/main/java/org/apache/iceberg/SortOrder.java +++ b/api/src/main/java/org/apache/iceberg/SortOrder.java @@ -286,7 +286,7 @@ public SortOrder build() { } } - static void checkCompatibility(SortOrder sortOrder, Schema schema) { + public static void checkCompatibility(SortOrder sortOrder, Schema schema) { for (SortField field : sortOrder.fields) { Type sourceType = schema.findType(field.sourceId()); ValidationException.check( diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index e95936522a62..70ec194bbaf4 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -20,6 +20,7 @@ package org.apache.iceberg.actions; import java.util.Map; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.expressions.Expression; @@ -68,7 +69,7 @@ public interface RewriteDataFiles extends SnapshotUpdate resultMap(); + + default int addedDataFilesCount() { + return resultMap().values().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum(); + } + + default int rewrittenDataFilesCount() { + return resultMap().values().stream().mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount).sum(); + } } /** diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java b/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java index 13f1386c0d1e..4503382b5337 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java @@ -27,7 +27,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; -interface RewriteStrategy extends Serializable { +public interface RewriteStrategy extends Serializable { /** * Returns the name of this rewrite strategy */ @@ -71,8 +71,9 @@ interface RewriteStrategy extends Serializable { * Method which will rewrite files based on this particular RewriteStrategy's algorithm. * This will most likely be Action framework specific (Spark/Presto/Flink ....). * + * @param groupID an identifier for this set of files * @param filesToRewrite a group of files to be rewritten together * @return a list of newly written files */ - List rewriteFiles(List filesToRewrite); + Set rewriteFiles(String groupID, List filesToRewrite); } diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java index b42001cd76d5..b2d675fa0757 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java +++ b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java @@ -42,7 +42,7 @@ * more files than {@link MIN_INPUT_FILES} or would produce at least one file of * {@link RewriteDataFiles#TARGET_FILE_SIZE_BYTES}. */ -abstract class BinPackStrategy implements RewriteStrategy { +public abstract class BinPackStrategy implements RewriteStrategy { /** * The minimum number of files that need to be in a file group for it to be considered for @@ -78,6 +78,7 @@ abstract class BinPackStrategy implements RewriteStrategy { private long maxFileSize; private long targetFileSize; private long maxGroupSize; + private long specId; @Override public String name() { @@ -118,6 +119,10 @@ public RewriteStrategy options(Map options) { MIN_INPUT_FILES, MIN_INPUT_FILES_DEFAULT); + specId = PropertyUtil.propertyAsInt(options, + RewriteDataFiles.OUTPUT_PARTITION_SPEC_ID, + table().spec().specId()); + validateOptions(); return this; } @@ -162,4 +167,12 @@ private void validateOptions() { "Cannot set %s is less than 1. All values less than 1 have the same effect as 1. %d < 1", MIN_INPUT_FILES, minInputFiles); } + + protected long targetFileSize() { + return this.targetFileSize; + } + + protected long specId() { + return this.specId; + } } diff --git a/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java b/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java new file mode 100644 index 000000000000..bb1473966cfb --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A rewrite strategy for data files which aims to reorder data with data files to optimally lay them out + * in relation to a column. For example, if the Sort strategy is used on a set of files which is ordered + * by column x and original has files File A (x: 0 - 50), File B ( x: 10 - 40) and File C ( x: 30 - 60), + * this Strategy will attempt to rewrite those files into File A' (x: 0-20), File B' (x: 21 - 40), + * File C' (x: 41 - 60). + *

+ * Currently the there is no clustering detection and we will rewrite all files if {@link SortStrategy#REWRITE_ALL} + * is true (default). If this property is disabled any files with the incorrect sort-order as well as any files + * that would be chosen by {@link BinPackStrategy} will be rewrite candidates. + *

+ * In the future other algorithms for determining files to rewrite will be provided. + */ +public abstract class SortStrategy extends BinPackStrategy { + private static final Logger LOG = LoggerFactory.getLogger(SortStrategy.class); + + /** + * Rewrites all files, regardless of their size. Defaults to false, rewriting only wrong sort-order and mis-sized + * files; + */ + public static final String REWRITE_ALL = "no-size-filter"; + public static final boolean REWRITE_ALL_DEFAULT = false; + + + private static final Set validOptions = ImmutableSet.of( + REWRITE_ALL + ); + + private boolean rewriteAll; + private SortOrder sortOrder; + private int sortOrderId = -1; + + /** + * Sets the sort order to be used in this strategy when rewriting files + * @param order the order to use + * @return this for method chaining + */ + public SortStrategy sortOrder(SortOrder order) { + this.sortOrder = order; + + // See if this order matches any of our known orders + Optional> knownOrder = table().sortOrders().entrySet().stream() + .filter(entry -> entry.getValue().sameOrder(order)) + .findFirst(); + knownOrder.ifPresent(entry -> sortOrderId = entry.getKey()); + + return this; + } + + protected SortOrder sortOrder() { + return sortOrder; + } + + @Override + public String name() { + return "SORT"; + } + + @Override + public Set validOptions() { + return ImmutableSet.builder() + .addAll(super.validOptions()) + .addAll(validOptions) + .build(); + } + + @Override + public RewriteStrategy options(Map options) { + super.options(options); // Also checks validity of BinPack options + + rewriteAll = PropertyUtil.propertyAsBoolean(options, + REWRITE_ALL, + REWRITE_ALL_DEFAULT); + + if (sortOrder == null) { + sortOrder = table().sortOrder(); + sortOrderId = sortOrder.orderId(); + } + + validateOptions(); + return this; + } + + @Override + public Iterable selectFilesToRewrite(Iterable dataFiles) { + if (rewriteAll) { + LOG.info("Sort Strategy for table {} set to rewrite all data files", table().name()); + return dataFiles; + } else { + FluentIterable filesWithCorrectOrder = + FluentIterable.from(dataFiles).filter(file -> file.file().sortOrderId() == sortOrderId); + + FluentIterable filesWithIncorrectOrder = + FluentIterable.from(dataFiles).filter(file -> file.file().sortOrderId() != sortOrderId); + return filesWithIncorrectOrder.append(super.selectFilesToRewrite(filesWithCorrectOrder)); + } + } + + private void validateOptions() { + Preconditions.checkArgument(!sortOrder.isUnsorted(), + "Can't use %s when there is no sort order, either define table %s's sort order or set sort" + + "order in the action", + name(), table().name()); + + SortOrder.checkCompatibility(sortOrder, table().schema()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java index f70b23e42b3f..d98594af9958 100644 --- a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java +++ b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java @@ -19,6 +19,8 @@ package org.apache.iceberg; +import org.mockito.Mockito; + public class MockFileScanTask extends BaseFileScanTask { private final long length; @@ -28,6 +30,18 @@ public MockFileScanTask(long length) { this.length = length; } + public MockFileScanTask(DataFile file) { + super(file, null, null, null, null); + this.length = file.fileSizeInBytes(); + } + + public static MockFileScanTask mockTask(long length, int sortOrderId) { + DataFile mockFile = Mockito.mock(DataFile.class); + Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length); + Mockito.when(mockFile.sortOrderId()).thenReturn(sortOrderId); + return new MockFileScanTask(mockFile); + } + @Override public long length() { return length; diff --git a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java index b42496fc78b7..77b13b3a574b 100644 --- a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java +++ b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -59,7 +60,7 @@ public Table table() { } @Override - public List rewriteFiles(List filesToRewrite) { + public Set rewriteFiles(String groupID, List filesToRewrite) { throw new UnsupportedOperationException(); } } diff --git a/core/src/test/java/org/apache/iceberg/actions/TestSortStrategy.java b/core/src/test/java/org/apache/iceberg/actions/TestSortStrategy.java new file mode 100644 index 000000000000..a02a6d36bfdd --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/actions/TestSortStrategy.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + + +import java.util.Collections; +import java.util.List; +import java.util.stream.IntStream; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MockFileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestSortStrategy extends TableTestBase { + + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] {2}; // We don't actually use the format version since everything is mock + } + + @Override + public void setupTable() throws Exception { + super.setupTable(); + table.replaceSortOrder().asc("data").commit(); + } + + private static final long MB = 1024 * 1024; + + public TestSortStrategy(int formatVersion) { + super(formatVersion); + } + + class TestSortStrategyImpl extends SortStrategy { + + @Override + public Table table() { + return table; + } + + @Override + public List rewriteFiles(List filesToRewrite) { + throw new UnsupportedOperationException(); + } + } + + private SortStrategy defaultSort() { + return (SortStrategy) new TestSortStrategyImpl().options(Collections.emptyMap()); + } + + private List tasksForSortOrder(int sortOrderId, int... fileSizesMB) { + ImmutableList.Builder files = ImmutableList.builder(); + IntStream.of(fileSizesMB).forEach(length -> files.add(MockFileScanTask.mockTask(length * MB, sortOrderId))); + return files.build(); + } + + @Test + public void testInvalidSortOrder() { + AssertHelpers.assertThrows("Should not allow an unsorted Sort order", IllegalArgumentException.class, + () -> defaultSort().sortOrder(SortOrder.unsorted()).options(Collections.emptyMap())); + + AssertHelpers.assertThrows("Should not allow a Sort order with bad columns", ValidationException.class, + () -> { + Schema badSchema = new Schema( + ImmutableList.of(Types.NestedField.required(0, "nonexistant", Types.IntegerType.get()))); + + defaultSort() + .sortOrder(SortOrder.builderFor(badSchema).asc("nonexistant").build()) + .options(Collections.emptyMap()); + }); + } + + @Test + public void testSelectWrongSortOrder() { + List expected = tasksForSortOrder(-1, 500, 500, 500, 500); + RewriteStrategy strategy = defaultSort().options(Collections.emptyMap()); + List actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(expected)); + + Assert.assertEquals("Should mark all files for rewrite if they have the wrong sort order", + expected, actual); + } + + @Test + public void testSelectCorrectSortOrder() { + List fileScanTasks = tasksForSortOrder(table.sortOrder().orderId(), 500, 500, 500, 500); + RewriteStrategy strategy = defaultSort().options(Collections.emptyMap()); + List actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(fileScanTasks)); + + Assert.assertEquals("Should mark no files for rewrite if they have a good size and the right sort order", + Collections.emptyList(), actual); + } + + @Test + public void testSelectMixedOrderMixedSize() { + List expected = ImmutableList.builder() + .addAll(tasksForSortOrder(-1, 500, 500, 500, 500)) + .addAll(tasksForSortOrder(table.sortOrder().orderId(), 10, 10, 2000, 10)) + .build(); + + List fileScanTasks = ImmutableList.builder() + .addAll(expected) + .addAll(tasksForSortOrder(table.sortOrder().orderId(), 500, 490, 520)) + .build(); + + RewriteStrategy strategy = defaultSort().options(Collections.emptyMap()); + List actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(fileScanTasks)); + + Assert.assertEquals("Should mark files for rewrite with invalid size and bad sort order", + expected, actual); + } + + @Test + public void testSelectAll() { + List invalid = ImmutableList.builder() + .addAll(tasksForSortOrder(-1, 500, 500, 500, 500)) + .addAll(tasksForSortOrder(table.sortOrder().orderId(), 10, 10, 2000, 10)) + .build(); + + List expected = ImmutableList.builder() + .addAll(invalid) + .addAll(tasksForSortOrder(table.sortOrder().orderId(), 500, 490, 520)) + .build(); + + RewriteStrategy strategy = defaultSort().options(ImmutableMap.of(SortStrategy.REWRITE_ALL, "true")); + List actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(expected)); + + Assert.assertEquals("Should mark all files for rewrite", + expected, actual); + } + + @Test + public void testUseSizeOptions() { + List expected = ImmutableList.builder() + .addAll(tasksForSortOrder(table.sortOrder().orderId(), 498, 551)) + .build(); + + List fileScanTasks = ImmutableList.builder() + .addAll(expected) + .addAll(tasksForSortOrder(table.sortOrder().orderId(), 500, 500)) + .build(); + + RewriteStrategy strategy = defaultSort().options(ImmutableMap.of( + SortStrategy.MAX_FILE_SIZE_BYTES, Long.toString(550 * MB), + SortStrategy.MIN_FILE_SIZE_BYTES, Long.toString(499 * MB))); + + List actual = ImmutableList.copyOf(strategy.selectFilesToRewrite(fileScanTasks)); + + Assert.assertEquals("Should mark files for rewrite with adjusted min and max size", + expected, actual); + } + + +} + diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java new file mode 100644 index 000000000000..f00e595679cc --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.math.RoundingMode; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BinPackStrategy; +import org.apache.iceberg.actions.RewriteDataFiles; +import org.apache.iceberg.actions.RewriteStrategy; +import org.apache.iceberg.actions.SortStrategy; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.relocated.com.google.common.math.IntMath; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class BaseRewriteDataFilesSparkAction + extends BaseSnapshotUpdateSparkAction implements RewriteDataFiles { + + private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesSparkAction.class); + private static final Set VALID_OPTIONS = ImmutableSet.of( + MAX_CONCURRENT_FILE_GROUP_REWRITES, + MAX_FILE_GROUP_SIZE_BYTES, + PARTIAL_PROGRESS_ENABLED, + PARTIAL_PROGRESS_MAX_COMMITS, + TARGET_FILE_SIZE_BYTES + ); + + private final Table table; + + private Expression filter = Expressions.alwaysTrue(); + private int maxConcurrentFileGroupRewrites; + private int maxCommits; + private boolean partialProgressEnabled; + private RewriteStrategy strategy; + + protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + this.strategy = binPackStrategy(); + } + + protected Table table() { + return table; + } + + /** + * Perform a commit operation on the table adding and removing files as + * required for this set of file groups + * @param completedGroupIDs fileSets to commit + */ + protected abstract void commitFileGroups(Set completedGroupIDs); + + /** + * Clean up a specified file set by removing any files created for that operation, should + * not throw any exceptions + * @param groupID fileSet to clean + */ + protected abstract void abortFileGroup(String groupID); + + /** + * The framework specific {@link BinPackStrategy} + */ + protected abstract BinPackStrategy binPackStrategy(); + + /** + * The framework specific {@link SortStrategy} + */ + protected abstract SortStrategy sortStrategy(); + + @Override + public RewriteDataFiles sort(SortOrder sortOrder) { + this.strategy = sortStrategy().sortOrder(sortOrder); + return this; + } + + @Override + public RewriteDataFiles sort() { + this.strategy = sortStrategy(); + return this; + } + + @Override + public RewriteDataFiles binPack() { + this.strategy = binPackStrategy(); + return this; + } + + @Override + public RewriteDataFiles filter(Expression expression) { + filter = Expressions.and(filter, expression); + return this; + } + + @Override + public Result execute() { + validateOptions(); + strategy = strategy.options(options()); + + Map>> fileGroupsByPartition = planFileGroups(); + RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition); + Stream groupStream = toGroupStream(ctx, fileGroupsByPartition); + + if (ctx.totalGroupCount() == 0) { + LOG.info("Nothing found to rewrite in {}", table.name()); + return new Result(Collections.emptyMap()); + } + + if (partialProgressEnabled) { + return doExecuteWithPartialProgress(ctx, groupStream); + } else { + return doExecute(ctx, groupStream); + } + } + + private Map>> planFileGroups() { + CloseableIterable fileScanTasks = table.newScan() + .filter(filter) + .ignoreResiduals() + .planFiles(); + + try { + Map> filesByPartition = Streams.stream(fileScanTasks) + .collect(Collectors.groupingBy(task -> task.file().partition())); + + Map>> fileGroupsByPartition = Maps.newHashMap(); + + filesByPartition.forEach((partition, tasks) -> { + List> fileGroups = toFileGroups(tasks); + if (fileGroups.size() > 0) { + fileGroupsByPartition.put(partition, fileGroups); + } + }); + + return fileGroupsByPartition; + } finally { + try { + fileScanTasks.close(); + } catch (IOException io) { + LOG.error("Cannot properly close file iterable while planning for rewrite", io); + } + } + } + + private List> toFileGroups(List tasks) { + Iterable filtered = strategy.selectFilesToRewrite(tasks); + Iterable> groupedTasks = strategy.planFileGroups(filtered); + return ImmutableList.copyOf(groupedTasks); + } + + @VisibleForTesting + void rewriteFiles(RewriteExecutionContext ctx, FileGroupForRewrite fileGroupForRewrite, + ConcurrentLinkedQueue completedRewrite, + ConcurrentHashMap> results) { + + String groupID = fileGroupForRewrite.groupID(); + int filesInGroup = fileGroupForRewrite.numFiles(); + + String desc = jobDesc(fileGroupForRewrite, ctx); + + Set addedFiles = + withJobGroupInfo(newJobGroupInfo("REWRITE-DATA-FILES", desc), + () -> strategy.rewriteFiles(groupID, fileGroupForRewrite.files())); + + completedRewrite.offer(groupID); + FileGroupRewriteResult fileGroupResult = new FileGroupRewriteResult(addedFiles.size(), filesInGroup); + + results.put(groupID, Pair.of(fileGroupForRewrite.info(), fileGroupResult)); + } + + private void commitOrClean(Set completedGroupIDs) { + try { + commitFileGroups(completedGroupIDs); + } catch (Exception e) { + LOG.error("Cannot commit groups {}, attempting to clean up written files", completedGroupIDs, e); + Tasks.foreach(completedGroupIDs) + .suppressFailureWhenFinished() + .run(this::abortFileGroup); + throw e; + } + } + + private Result doExecute(RewriteExecutionContext ctx, Stream groupStream) { + + ExecutorService rewriteService = Executors.newFixedThreadPool(maxConcurrentFileGroupRewrites, + new ThreadFactoryBuilder().setNameFormat("Rewrite-Service-%d").build()); + + ConcurrentLinkedQueue completedRewrite = new ConcurrentLinkedQueue<>(); + ConcurrentHashMap> results = new ConcurrentHashMap<>(); + + Tasks.Builder rewriteTaskBuilder = Tasks.foreach(groupStream.iterator()) + .executeWith(rewriteService) + .stopOnFailure() + .noRetry() + .onFailure((fileGroupForRewrite, exception) -> { + LOG.error("Failure during rewrite process for group {}", fileGroupForRewrite.info, exception); + }); + + try { + rewriteTaskBuilder + .run(fileGroupForRewrite -> rewriteFiles(ctx, fileGroupForRewrite, completedRewrite, results)); + } catch (Exception e) { + // At least one rewrite group failed, clean up all completed rewrites + LOG.error("Cannot complete rewrite, partial progress is not enabled and one of the file set groups failed to " + + "be rewritten. Cleaning up {} groups which finished being written.", completedRewrite.size(), e); + Tasks.foreach(completedRewrite) + .suppressFailureWhenFinished() + .run(this::abortFileGroup); + throw e; + } + + commitOrClean(ImmutableSet.copyOf(completedRewrite)); + + return new Result(results.values().stream().collect(Collectors.toMap(Pair::first, Pair::second))); + } + + private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream groupStream) { + + ExecutorService rewriteService = Executors.newFixedThreadPool(maxConcurrentFileGroupRewrites, + new ThreadFactoryBuilder().setNameFormat("Rewrite-Service-%d").build()); + + ExecutorService committerService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("Committer-Service").build()); + + int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING); + + AtomicBoolean stillRewriting = new AtomicBoolean(true); + ConcurrentHashMap> results = new ConcurrentHashMap<>(); + ConcurrentLinkedQueue completedRewriteIds = new ConcurrentLinkedQueue<>(); + + // Partial progress commit service + committerService.execute(() -> { + while (stillRewriting.get() || completedRewriteIds.size() > 0) { + Thread.yield(); + // Either we have a full commit group, or we have completed writing and need to commit what is left over + if (completedRewriteIds.size() >= groupsPerCommit || + (!stillRewriting.get() && completedRewriteIds.size() > 0)) { + + Set batch = Sets.newHashSetWithExpectedSize(groupsPerCommit); + for (int i = 0; i < groupsPerCommit && !completedRewriteIds.isEmpty(); i++) { + batch.add(completedRewriteIds.poll()); + } + + try { + commitOrClean(batch); + } catch (Exception e) { + batch.forEach(results::remove); + LOG.error("Failure during rewrite commit process, partial progress enabled. Ignoring", e); + } + } + } + }); + + // Start rewrite tasks + Tasks.foreach(groupStream.iterator()) + .suppressFailureWhenFinished() + .executeWith(rewriteService) + .noRetry() + .onFailure((fileGroupForRewrite, exception) -> { + LOG.error("Failure during rewrite process for group {}", fileGroupForRewrite.info, exception); + abortFileGroup(fileGroupForRewrite.groupID()); }) + .run(infoListPair -> rewriteFiles(ctx, infoListPair, completedRewriteIds, results)); + + stillRewriting.set(false); + committerService.shutdown(); + + try { + committerService.awaitTermination(10, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException("Cannot complete commit for rewrite, commit service interrupted", e); + } + + return new Result(results.values().stream().collect(Collectors.toMap(Pair::first, Pair::second))); + } + + private Stream toGroupStream(RewriteExecutionContext ctx, + Map>> fileGroupsByPartition) { + + // Todo Add intelligence to the order in which we do rewrites instead of just using partition order + return fileGroupsByPartition.entrySet().stream() + .flatMap( + e -> e.getValue().stream().map(tasks -> { + int myJobIndex = ctx.currentGlobalIndex(); + int myPartIndex = ctx.currentPartitionIndex(e.getKey()); + String groupID = UUID.randomUUID().toString(); + return new FileGroupForRewrite(new FileGroupInfo(groupID, myJobIndex, myPartIndex, e.getKey()), tasks); + })); + } + + private void validateOptions() { + Set validOptions = Sets.newHashSet(strategy.validOptions()); + validOptions.addAll(VALID_OPTIONS); + + Set invalidKeys = Sets.newHashSet(options().keySet()); + invalidKeys.removeAll(validOptions); + + Preconditions.checkArgument(invalidKeys.isEmpty(), + "Cannot use options %s, they are not supported by RewriteDatafiles or the strategy %s", + invalidKeys, strategy.name()); + + maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(), + MAX_CONCURRENT_FILE_GROUP_REWRITES, + MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT); + + maxCommits = PropertyUtil.propertyAsInt(options(), + PARTIAL_PROGRESS_MAX_COMMITS, + PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT); + + partialProgressEnabled = PropertyUtil.propertyAsBoolean(options(), + PARTIAL_PROGRESS_ENABLED, + PARTIAL_PROGRESS_ENABLED_DEFAULT); + + Preconditions.checkArgument(maxConcurrentFileGroupRewrites >= 1, + "Cannot set %s to %s, the value must be positive.", + MAX_CONCURRENT_FILE_GROUP_REWRITES, maxConcurrentFileGroupRewrites); + + Preconditions.checkArgument(!partialProgressEnabled || partialProgressEnabled && maxCommits > 0, + "Cannot set %s to %s, the value must be positive when %s is true", + PARTIAL_PROGRESS_MAX_COMMITS, maxCommits, PARTIAL_PROGRESS_ENABLED); + } + + private String jobDesc(FileGroupForRewrite group, RewriteExecutionContext ctx) { + StructLike partition = group.partition(); + + return String.format("Rewrite %s, FileGroup %d/%d : Partition %s:%d/%d : Rewriting %d files : Table %s", + strategy.name(), group.globalIndex(), ctx.totalGroupCount(), partition, group.partitionIndex(), + ctx.groupsInPartition(partition), group.numFiles(), table.name()); + } + + class Result implements RewriteDataFiles.Result { + private final Map resultMap; + + Result( + Map resultMap) { + this.resultMap = resultMap; + } + + @Override + public Map resultMap() { + return resultMap; + } + } + + class FileGroupInfo implements RewriteDataFiles.FileGroupInfo { + + private final String groupID; + private final int globalIndex; + private final int partitionIndex; + private final StructLike partition; + + FileGroupInfo(String groupID, int globalIndex, int partitionIndex, StructLike partition) { + this.groupID = groupID; + this.globalIndex = globalIndex; + this.partitionIndex = partitionIndex; + this.partition = partition; + } + + @Override + public int globalIndex() { + return globalIndex; + } + + @Override + public int partitionIndex() { + return partitionIndex; + } + + @Override + public StructLike partition() { + return partition; + } + + @Override + public String toString() { + return "FileGroupInfo{" + + "groupID=" + groupID + + ", globalIndex=" + globalIndex + + ", partitionIndex=" + partitionIndex + + ", partition=" + partition + + '}'; + } + + public String groupID() { + return groupID; + } + } + + @VisibleForTesting + class FileGroupRewriteResult implements RewriteDataFiles.FileGroupRewriteResult { + private final int addedDataFilesCount; + private final int rewrittenDataFilesCount; + + FileGroupRewriteResult(int addedDataFilesCount, int rewrittenDataFilesCount) { + this.addedDataFilesCount = addedDataFilesCount; + this.rewrittenDataFilesCount = rewrittenDataFilesCount; + } + + @Override + public int addedDataFilesCount() { + return this.addedDataFilesCount; + } + + @Override + public int rewrittenDataFilesCount() { + return this.rewrittenDataFilesCount; + } + } + + static class FileGroupForRewrite { + private final FileGroupInfo info; + private final List files; + private final int numFiles; + + FileGroupForRewrite(FileGroupInfo info, List files) { + this.info = info; + this.files = files; + this.numFiles = files.size(); + } + + public int numFiles() { + return numFiles; + } + + public StructLike partition() { + return info.partition(); + } + + public String groupID() { + return info.groupID(); + } + + public Integer globalIndex() { + return info.globalIndex(); + } + + public Integer partitionIndex() { + return info.partitionIndex(); + } + + public FileGroupInfo info() { + return info; + } + + public List files() { + return files; + } + } + + @VisibleForTesting + static class RewriteExecutionContext { + private final Map numGroupsByPartition; + private final int totalGroupCount; + private final Map partitionIndexMap; + private final AtomicInteger groupIndex; + + RewriteExecutionContext(Map>> fileGroupsByPartition) { + this.numGroupsByPartition = fileGroupsByPartition.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().size())); + this.totalGroupCount = numGroupsByPartition.values().stream() + .reduce(Integer::sum) + .orElse(0); + this.partitionIndexMap = Maps.newConcurrentMap(); + this.groupIndex = new AtomicInteger(1); + } + + public int currentGlobalIndex() { + return groupIndex.getAndIncrement(); + } + + public int currentPartitionIndex(StructLike partition) { + return partitionIndexMap.merge(partition, 1, Integer::sum); + } + + public int groupsInPartition(StructLike partition) { + return numGroupsByPartition.get(partition); + } + + public int totalGroupCount() { + return totalGroupCount; + } + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java index a0685a68434e..95a9e98b0625 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java @@ -97,6 +97,10 @@ protected List sql(String query, Object... args) { return ImmutableList.of(); } + return rowsToJava(rows); + } + + protected List rowsToJava(List rows) { return rows.stream().map(this::toJava).collect(Collectors.toList()); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java b/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java new file mode 100644 index 000000000000..9ac946318697 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java @@ -0,0 +1,769 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.io.File; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.ActionsProvider; +import org.apache.iceberg.actions.BinPackStrategy; +import org.apache.iceberg.actions.RewriteDataFiles; +import org.apache.iceberg.actions.RewriteDataFiles.Result; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.FileGroupForRewrite; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doThrow; + +public abstract class TestNewRewriteDataFilesAction extends SparkTestBase { + + protected abstract ActionsProvider actions(); + protected abstract Set cacheContents(Table table); + + private static final HadoopTables TABLES = new HadoopTables(new Configuration()); + private static final Schema SCHEMA = new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get()) + ); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private String tableLocation = null; + + @Before + public void setupTableLocation() throws Exception { + File tableDir = temp.newFolder(); + this.tableLocation = tableDir.toURI().toString(); + } + + private RewriteDataFiles basicRewrite(Table table) { + // Always compact regardless of input files + return actions().rewriteDataFiles(table).option(BinPackStrategy.MIN_INPUT_FILES, "1"); + } + + @Test + public void testRewriteDataFilesEmptyTable() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + Assert.assertNull("Table must be empty", table.currentSnapshot()); + + basicRewrite(table); + + Assert.assertNull("Table must stay empty", table.currentSnapshot()); + } + + @Test + public void testRewriteDataFilesUnpartitionedTable() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + + table.refresh(); + + CloseableIterable tasks = table.newScan().planFiles(); + List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); + Assert.assertEquals("Should have 4 data files before rewrite", 4, dataFiles.size()); + + Result result = basicRewrite(table).execute(); + Assert.assertEquals("Action should rewrite 4 data files", 4, result.rewrittenDataFilesCount()); + Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFilesCount()); + + table.refresh(); + + CloseableIterable tasks1 = table.newScan().planFiles(); + List dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file)); + Assert.assertEquals("Should have 1 data files before rewrite", 1, dataFiles1.size()); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records1); + expectedRecords.addAll(records2); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDataFilesPartitionedTable() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("c1") + .truncate("c2", 2) + .build(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"), + new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD") + ); + writeRecords(records2); + + List records3 = Lists.newArrayList( + new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"), + new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG") + ); + writeRecords(records3); + + List records4 = Lists.newArrayList( + new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"), + new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH") + ); + writeRecords(records4); + + table.refresh(); + + CloseableIterable tasks = table.newScan().planFiles(); + List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); + Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size()); + + Result result = basicRewrite(table).execute(); + Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount()); + Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount()); + + table.refresh(); + + CloseableIterable tasks1 = table.newScan().planFiles(); + List dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file)); + Assert.assertEquals("Should have 4 data files before rewrite", 4, dataFiles1.size()); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records1); + expectedRecords.addAll(records2); + expectedRecords.addAll(records3); + expectedRecords.addAll(records4); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2", "c3") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDataFilesWithFilter() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) + .identity("c1") + .truncate("c2", 2) + .build(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List records1 = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"), + new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC") + ); + writeRecords(records1); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"), + new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD") + ); + writeRecords(records2); + + List records3 = Lists.newArrayList( + new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"), + new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG") + ); + writeRecords(records3); + + List records4 = Lists.newArrayList( + new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"), + new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH") + ); + writeRecords(records4); + + table.refresh(); + + CloseableIterable tasks = table.newScan().planFiles(); + List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); + Assert.assertEquals("Should have 8 data files before rewrite", 8, dataFiles.size()); + + Result result = basicRewrite(table) + .filter(Expressions.equal("c1", 1)) + .filter(Expressions.startsWith("c2", "AA")) + .execute(); + Assert.assertEquals("Action should rewrite 2 data files", 2, result.rewrittenDataFilesCount()); + Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFilesCount()); + + table.refresh(); + + CloseableIterable tasks1 = table.newScan().planFiles(); + List dataFiles1 = Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file)); + Assert.assertEquals("Should have 7 data files before rewrite", 7, dataFiles1.size()); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records1); + expectedRecords.addAll(records2); + expectedRecords.addAll(records3); + expectedRecords.addAll(records4); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1", "c2", "c3") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + } + + @Test + public void testRewriteLargeTableHasResiduals() { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build(); + Map options = Maps.newHashMap(); + options.put(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100"); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + // all records belong to the same partition + List records = Lists.newArrayList(); + for (int i = 0; i < 100; i++) { + records.add(new ThreeColumnRecord(i, String.valueOf(i), String.valueOf(i % 4))); + } + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); + writeDF(df); + + table.refresh(); + + CloseableIterable tasks = table.newScan() + .ignoreResiduals() + .filter(Expressions.equal("c3", "0")) + .planFiles(); + for (FileScanTask task : tasks) { + Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual()); + } + List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); + Assert.assertEquals("Should have 2 data files before rewrite", 2, dataFiles.size()); + + Result result = basicRewrite(table) + .filter(Expressions.equal("c3", "0")) + .execute(); + Assert.assertEquals("Action should rewrite 2 data files", 2, result.rewrittenDataFilesCount()); + Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFilesCount()); + + table.refresh(); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = resultDF.sort("c1") + .as(Encoders.bean(ThreeColumnRecord.class)) + .collectAsList(); + + Assert.assertEquals("Rows must match", records, actualRecords); + } + + @Test + public void testRewriteDataFilesForLargeFile() throws AnalysisException { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + Assert.assertNull("Table must be empty", table.currentSnapshot()); + + List records1 = Lists.newArrayList(); + + IntStream.range(0, 2000).forEach(i -> records1.add(new ThreeColumnRecord(i, "foo" + i, "bar" + i))); + Dataset df = spark.createDataFrame(records1, ThreeColumnRecord.class).repartition(1); + writeDF(df); + + List records2 = Lists.newArrayList( + new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"), + new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD") + ); + writeRecords(records2); + + table.refresh(); + + CloseableIterable tasks = table.newScan().planFiles(); + List dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file)); + DataFile maxSizeFile = Collections.max(dataFiles, Comparator.comparingLong(DataFile::fileSizeInBytes)); + Assert.assertEquals("Should have 3 files before rewrite", 3, dataFiles.size()); + + spark.read().format("iceberg").load(tableLocation).createTempView("origin"); + long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count(); + List originalRecords = sql("SELECT * from origin sort by c2"); + + long targetSizeInBytes = maxSizeFile.fileSizeInBytes() - 10; + Result result = basicRewrite(table) + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(targetSizeInBytes)) + .option(BinPackStrategy.MIN_FILE_SIZE_BYTES, Long.toString(targetSizeInBytes - 1)) + .option(BinPackStrategy.MAX_FILE_SIZE_BYTES, Long.toString(targetSizeInBytes + 1)) + .execute(); + + Assert.assertEquals("Action should delete 3 data files", 3, result.rewrittenDataFilesCount()); + Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFilesCount()); + + spark.read().format("iceberg").load(tableLocation).createTempView("postRewrite"); + long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count(); + List rewrittenRecords = sql("SELECT * from postRewrite sort by c2"); + + Assert.assertEquals(originalNumRecords, postRewriteNumRecords); + assertEquals("Rows should be unchanged", originalRecords, rewrittenRecords); + } + + @Test + public void testPartialProgressEnabled() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + List originalData = currentData(); + + // Perform a rewrite but only allow 2 files to be compacted at a time + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100)) + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "10") + .execute(); + + Assert.assertEquals("Should have 10 fileGroups", result.resultMap().keySet().size(), 10); + + table.refresh(); + + shouldHaveSnapshots(table, 11); + shouldHaveACleanCache(table); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + } + + @Test + public void testMultipleGroups() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + List originalData = currentData(); + + // Perform a rewrite but only allow 2 files to be compacted at a time + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100)) + .option(BinPackStrategy.MIN_INPUT_FILES, "1") + .execute(); + + Assert.assertEquals("Should have 10 fileGroups", result.resultMap().keySet().size(), 10); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + + @Test + public void testPartialProgressMaxCommits() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + List originalData = currentData(); + + // Perform a rewrite but only allow 2 files to be compacted at a time + RewriteDataFiles.Result result = + basicRewrite(table) + .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100)) + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "3") + .execute(); + + Assert.assertEquals("Should have 10 fileGroups", result.resultMap().keySet().size(), 10); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 4); + shouldHaveACleanCache(table); + } + + @Test + public void testSingleCommitWithRewriteFailure() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + List originalData = currentData(); + + BaseRewriteDataFilesSparkAction realRewrite = + (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction) + basicRewrite(table) + .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100)); + + BaseRewriteDataFilesSparkAction spyRewrite = Mockito.spy(realRewrite); + + // Fail groups 1, 3, and 7 during rewrite + GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7); + doThrow(new RuntimeException("Rewrite Failed")) + .when(spyRewrite) + .rewriteFiles(any(), argThat(failGroup), any(), any()); + + AssertHelpers.assertThrows("Should fail entire rewrite if part fails", RuntimeException.class, + () -> spyRewrite.execute()); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 1); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + + @Test + public void testSingleCommitWithCommitFailure() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + List originalData = currentData(); + + BaseRewriteDataFilesSparkAction realRewrite = + (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction) + basicRewrite(table) + .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100)); + + BaseRewriteDataFilesSparkAction spyRewrite = Mockito.spy(realRewrite); + + // Fail to commit + doThrow(new RuntimeException("Commit Failure")) + .when(spyRewrite) + .commitFileGroups(any()); + + AssertHelpers.assertThrows("Should fail entire rewrite if commit fails", RuntimeException.class, + () -> spyRewrite.execute()); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 1); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + + @Test + public void testParallelSingleCommitWithRewriteFailure() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + List originalData = currentData(); + + BaseRewriteDataFilesSparkAction realRewrite = + (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction) + basicRewrite(table) + .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100)) + .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "3"); + + BaseRewriteDataFilesSparkAction spyRewrite = Mockito.spy(realRewrite); + + // Fail groups 1, 3, and 7 during rewrite + GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7); + doThrow(new RuntimeException("Rewrite Failed")) + .when(spyRewrite) + .rewriteFiles(any(), argThat(failGroup), any(), any()); + + AssertHelpers.assertThrows("Should fail entire rewrite if part fails", RuntimeException.class, + () -> spyRewrite.execute()); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 1); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + + @Test + public void testPartialProgressWithRewriteFailure() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + List originalData = currentData(); + + BaseRewriteDataFilesSparkAction realRewrite = + (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction) + basicRewrite(table) + .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100)) + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "3"); + + BaseRewriteDataFilesSparkAction spyRewrite = Mockito.spy(realRewrite); + + // Fail groups 1, 3, and 7 during rewrite + GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7); + doThrow(new RuntimeException("Rewrite Failed")) + .when(spyRewrite) + .rewriteFiles(any(), argThat(failGroup), any(), any()); + + RewriteDataFiles.Result result = spyRewrite.execute(); + + Assert.assertEquals("Should have 7 fileGroups", result.resultMap().keySet().size(), 7); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + // With 10 original groups and Max Commits of 3, we should have commits with 4, 4, and 2. + // removing 3 groups leaves us with only 2 new commits, 4 and 3 + shouldHaveSnapshots(table, 3); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + + @Test + public void testParallelPartialProgressWithRewriteFailure() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + List originalData = currentData(); + + BaseRewriteDataFilesSparkAction realRewrite = + (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction) + basicRewrite(table) + .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100)) + .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "3") + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "3"); + + BaseRewriteDataFilesSparkAction spyRewrite = Mockito.spy(realRewrite); + + // Fail groups 1, 3, and 7 during rewrite + GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7); + doThrow(new RuntimeException("Rewrite Failed")) + .when(spyRewrite) + .rewriteFiles(any(), argThat(failGroup), any(), any()); + + RewriteDataFiles.Result result = spyRewrite.execute(); + + Assert.assertEquals("Should have 7 fileGroups", result.resultMap().keySet().size(), 7); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + // With 10 original groups and Max Commits of 3, we should have commits with 4, 4, and 2. + // removing 3 groups leaves us with only 2 new commits, 4 and 3 + shouldHaveSnapshots(table, 3); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + + @Test + public void testParallelPartialProgressWithCommitFailure() { + Table table = createTable(20); + int fileSize = averageFileSize(table); + + List originalData = currentData(); + + BaseRewriteDataFilesSparkAction realRewrite = + (org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction) + basicRewrite(table) + .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100)) + .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "3") + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "3"); + + BaseRewriteDataFilesSparkAction spyRewrite = Mockito.spy(realRewrite); + + // First and Third commits work, second does not + doCallRealMethod() + .doThrow(new RuntimeException("Commit Failed")) + .doCallRealMethod() + .when(spyRewrite) + .commitFileGroups(any()); + + RewriteDataFiles.Result result = spyRewrite.execute(); + + // Commit 1: 4, Commit 2 failed : 4, Commit 3: 2 + Assert.assertEquals("Should have 6 fileGroups", 6, result.resultMap().keySet().size()); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + // Only 2 new commits because we broke one + shouldHaveSnapshots(table, 3); + shouldHaveNoOrphans(table); + shouldHaveACleanCache(table); + } + + @Test + public void testInvalidOptions() { + Table table = createTable(20); + + AssertHelpers.assertThrows("No negative values for partial progress max commits", + IllegalArgumentException.class, + () -> basicRewrite(table) + .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true") + .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "-5") + .execute()); + + AssertHelpers.assertThrows("No negative values for max concurrent groups", + IllegalArgumentException.class, + () -> basicRewrite(table) + .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "-5") + .execute()); + + AssertHelpers.assertThrows("No unknown options allowed", + IllegalArgumentException.class, + () -> basicRewrite(table) + .option("foobarity", "-5") + .execute()); + } + + private List currentData() { + return rowsToJava(spark.read().format("iceberg").load(tableLocation).sort("c1").collectAsList()); + } + + private void shouldHaveSnapshots(Table table, int expectedSnapshots) { + int actualSnapshots = Iterables.size(table.snapshots()); + Assert.assertEquals("Table did not have the expected number of snapshots", + expectedSnapshots, actualSnapshots); + } + + private void shouldHaveNoOrphans(Table table) { + Assert.assertEquals("Should not have found any orphan files", ImmutableList.of(), + actions().removeOrphanFiles(table).execute().orphanFileLocations()); + } + + private void shouldHaveACleanCache(Table table) { + Assert.assertEquals("Should not have any entries in cache", ImmutableSet.of(), + cacheContents(table)); + } + + /** + * Create a table with a certain number of files, returns the size of a file + * @param files number of files to create + * @return size of a file + */ + private Table createTable(int files) { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + Assert.assertNull("Table must be empty", table.currentSnapshot()); + + List records1 = Lists.newArrayList(); + + IntStream.range(0, 2000).forEach(i -> records1.add(new ThreeColumnRecord(i, "foo" + i, "bar" + i))); + Dataset df = spark.createDataFrame(records1, ThreeColumnRecord.class).repartition(files); + writeDF(df); + table.refresh(); + + return table; + } + + private int averageFileSize(Table table) { + return (int) Streams.stream(table.currentSnapshot().addedFiles().iterator()) + .mapToLong(DataFile::fileSizeInBytes) + .average() + .getAsDouble(); + } + + private void writeRecords(List records) { + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class); + writeDF(df); + } + + private void writeDF(Dataset df) { + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + } + + class GroupInfoMatcher implements ArgumentMatcher { + private final Set groupIDs; + + GroupInfoMatcher(Integer... globalIndex) { + this.groupIDs = ImmutableSet.copyOf(globalIndex); + } + + @Override + public boolean matches(FileGroupForRewrite argument) { + return groupIDs.contains(argument.globalIndex()); + } + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java b/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java index e35b533edb56..204f3bd353d9 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java @@ -93,7 +93,7 @@ private Set fetchRewrittenDataFiles(Table table, Set fileSetID return Collections.unmodifiableSet(rewrittenDataFiles); } - private Set fetchNewDataFiles(Table table, Set fileSetIDs) { + public Set fetchNewDataFiles(Table table, Set fileSetIDs) { List> results = Lists.newArrayList(); for (String fileSetID : fileSetIDs) { diff --git a/spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java b/spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java index b43f1c6bab3e..0487389fd8e1 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; @@ -58,9 +60,19 @@ public List removeTasks(Table table, String setID) { return tasksMap.remove(id); } - private Pair toID(Table table, String setID) { + public Set fetchSets(Table table) { + return tasksMap.keySet().stream() + .filter(e -> e.first().equals(toID(table))) + .map(Pair::second) + .collect(Collectors.toSet()); + } + + private String toID(Table table) { TableOperations ops = ((HasTableOperations) table).operations(); - String tableUUID = ops.current().uuid(); - return Pair.of(tableUUID, setID); + return ops.current().uuid(); + } + + private Pair toID(Table table, String setID) { + return Pair.of(toID(table), setID); } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java new file mode 100644 index 000000000000..d26f292f35a7 --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.util.Set; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BinPackStrategy; +import org.apache.iceberg.actions.RewriteDataFiles; +import org.apache.iceberg.actions.SortStrategy; +import org.apache.iceberg.spark.FileRewriteCoordinator; +import org.apache.iceberg.spark.FileScanTaskSetManager; +import org.apache.iceberg.spark.actions.rewrite.Spark3BinPackStrategy; +import org.apache.iceberg.spark.actions.rewrite.Spark3SortStrategy; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseRewriteDataFilesSpark3Action extends BaseRewriteDataFilesSparkAction { + private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesSpark3Action.class); + private final FileRewriteCoordinator coordinator = FileRewriteCoordinator.get(); + private final FileScanTaskSetManager manager = FileScanTaskSetManager.get(); + + protected BaseRewriteDataFilesSpark3Action(SparkSession spark, Table table) { + super(spark, table); + } + + @Override + protected BinPackStrategy binPackStrategy() { + return new Spark3BinPackStrategy(table(), spark()); + } + + @Override + protected SortStrategy sortStrategy() { + return new Spark3SortStrategy(table(), spark()); + } + + @Override + protected void commitFileGroups(Set completedGroupIDs) { + coordinator.commitRewrite(table(), completedGroupIDs); + completedGroupIDs.forEach(groupID -> manager.removeTasks(table(), groupID)); + } + + @Override + protected void abortFileGroup(String groupID) { + try { + coordinator.abortRewrite(table(), groupID); + manager.removeTasks(table(), groupID); + } catch (Exception e) { + LOG.error("Unable to cleanup rewrite file group {} for table {}", groupID, table().name(), e); + } + } + + @Override + protected RewriteDataFiles self() { + return this; + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java index 149784575edb..3230728261c9 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java @@ -19,8 +19,10 @@ package org.apache.iceberg.spark.actions; +import org.apache.iceberg.Table; import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.actions.MigrateTable; +import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.SnapshotTable; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier; @@ -62,4 +64,9 @@ public MigrateTable migrateTable(String tableIdent) { CatalogAndIdentifier catalogAndIdent = Spark3Util.catalogAndIdentifier(ctx, spark(), tableIdent, defaultCatalog); return new BaseMigrateTableSparkAction(spark(), catalogAndIdent.catalog(), catalogAndIdent.identifier()); } + + @Override + public RewriteDataFiles rewriteDataFiles(Table table) { + return new BaseRewriteDataFilesSpark3Action(spark(), table); + } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3BinPackStrategy.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3BinPackStrategy.java new file mode 100644 index 000000000000..cbe160b52b59 --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3BinPackStrategy.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions.rewrite; + +import java.util.List; +import java.util.Set; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BinPackStrategy; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.FileRewriteCoordinator; +import org.apache.iceberg.spark.FileScanTaskSetManager; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; + +public class Spark3BinPackStrategy extends BinPackStrategy { + private final Table table; + private final SparkSession spark; + private final FileScanTaskSetManager manager = FileScanTaskSetManager.get(); + + public Spark3BinPackStrategy(Table table, SparkSession spark) { + this.table = table; + this.spark = spark; + } + + @Override + public Table table() { + return table; + } + + @Override + public Set rewriteFiles(String groupID, List filesToRewrite) { + manager.stageTasks(table, groupID, filesToRewrite); + + // Disable Adaptive Query Execution as this may change the output partitioning of our write + SparkSession cloneSession = spark.cloneSession(); + cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false); + + Dataset scanDF = cloneSession.read().format("iceberg") + .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID) + .option(SparkReadOptions.SPLIT_SIZE, Long.toString(targetFileSize())) + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .load(table.name()); + + // write the packed data into new files where each split becomes a new file + FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get(); + try { + scanDF.write() + .format("iceberg") + .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID) + .mode("append") + .save(table.name()); + } catch (Exception e) { + try { + rewriteCoordinator.abortRewrite(table, groupID); + manager.removeTasks(table, groupID); + } finally { + throw new RuntimeException("Cannot complete rewrite, an exception was thrown during the write operation", e); + } + } + + // Actual commit is performed with the groupID + return rewriteCoordinator.fetchNewDataFiles(table, ImmutableSet.of(groupID)); + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3SortStrategy.java b/spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3SortStrategy.java new file mode 100644 index 000000000000..4f9db72d8c9d --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/actions/rewrite/Spark3SortStrategy.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions.rewrite; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteStrategy; +import org.apache.iceberg.actions.SortStrategy; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.FileRewriteCoordinator; +import org.apache.iceberg.spark.FileScanTaskSetManager; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.plans.logical.Repartition; +import org.apache.spark.sql.catalyst.plans.logical.Sort; +import org.apache.spark.sql.connector.iceberg.distributions.Distribution; +import org.apache.spark.sql.connector.iceberg.expressions.SortOrder; +import org.apache.spark.sql.internal.SQLConf; +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.Seq$; +import scala.collection.mutable.Buffer; + +public class Spark3SortStrategy extends SortStrategy { + + public static final String SHUFFLE_PARTITIONS = "shuffle-partitions"; + + private final FileScanTaskSetManager manager = FileScanTaskSetManager.get(); + private final Table table; + private final SparkSession spark; + + private int shufflePartitions; + + public Spark3SortStrategy(Table table, SparkSession spark) { + this.table = table; + this.spark = spark; + } + + @Override + public Table table() { + return table; + } + + @Override + public Set validOptions() { + return ImmutableSet.builder() + .addAll(super.validOptions()) + .add(SHUFFLE_PARTITIONS) + .build(); + } + + @Override + public RewriteStrategy options(Map options) { + shufflePartitions = PropertyUtil.propertyAsInt(options, + SHUFFLE_PARTITIONS, + spark.sessionState().conf().numShufflePartitions()); + + Preconditions.checkArgument(shufflePartitions > 0, + "Cannot use Spark3Sort Strategy without %s being positive, found %s", + SHUFFLE_PARTITIONS, shufflePartitions); + + return super.options(options); + } + + @Override + public Set rewriteFiles(String groupID, List filesToRewrite) { + Distribution distribution = Spark3Util.buildRequiredDistribution(table); + Buffer ordering = null; + + Spark3Util.describe(sortOrder()); + + manager.stageTasks(table, groupID, filesToRewrite); + + // Disable Adaptive Query Execution as this may change the output partitioning of our write + SparkSession cloneSession = spark.cloneSession(); + cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false); + + Dataset scanDF = cloneSession.read().format("iceberg") + .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID) + .option(SparkReadOptions.SPLIT_SIZE, Long.toString(targetFileSize())) + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .load(table.name()); + + // write the packed data into new files where each split becomes a new file + FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get(); + try { + Repartition repartition = new Repartition(shufflePartitions, true, scanDF.logicalPlan()); + Sort sort = new Sort(ordering, true, repartition); + Dataset sortedDf = + new Dataset<>(spark, new Sort(ordering, true, , scanDF.encoder()); + sortedDf.write() + .format("iceberg") + .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID) + .mode("append") + .save(table.name()); + } catch (Exception e) { + try { + rewriteCoordinator.abortRewrite(table, groupID); + manager.removeTasks(table, groupID); + } finally { + throw new RuntimeException("Cannot complete rewrite, an exception was thrown during the write operation", e); + } + } + + // Actual commit is performed with the groupID + return rewriteCoordinator.fetchNewDataFiles(table, ImmutableSet.of(groupID)); + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java index 20d2269d1107..9e0f09f81cf6 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java @@ -27,6 +27,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.FileScanTaskSetManager; @@ -62,6 +63,9 @@ class SparkFilesScan extends SparkBatchScan { long tableSplitSize = PropertyUtil.propertyAsLong(props, SPLIT_SIZE, SPLIT_SIZE_DEFAULT); this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + Preconditions.checkArgument(splitSize > 0, + "Cannot create a plan with a split size that is not positive, splitSize = %s", splitSize); + int tableSplitLookback = PropertyUtil.propertyAsInt(props, SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); diff --git a/spark3/src/test/java/org/apache/iceberg/actions/TestNewRewriteDataFilesAction3.java b/spark3/src/test/java/org/apache/iceberg/actions/TestNewRewriteDataFilesAction3.java new file mode 100644 index 000000000000..0c302a3b865e --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/actions/TestNewRewriteDataFilesAction3.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.actions; + +import java.util.Set; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.FileRewriteCoordinator; +import org.apache.iceberg.spark.FileScanTaskSetManager; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.iceberg.spark.actions.TestNewRewriteDataFilesAction; + +public class TestNewRewriteDataFilesAction3 extends TestNewRewriteDataFilesAction { + private final FileRewriteCoordinator coordinator = FileRewriteCoordinator.get(); + private final FileScanTaskSetManager manager = FileScanTaskSetManager.get(); + + @Override + protected ActionsProvider actions() { + return SparkActions.get(); + } + + @Override + protected Set cacheContents(Table table) { + return manager.fetchSets(table); + } +}